Hadoop MapReduce Technical Terms

1 August 2018

This blog post includes components which needs to be understood in order to learn Hadoop and MapReduce Framework.These components and topics are included in asked commonly in Hadoop/Mapreduceinterview questions.

Different Modes in Hadoop

Hadoop can run in three modes:

  • Standalone Mode: Default mode of Hadoop, it uses local file system for input and output operations. This mode is mainly used for debugging purpose, and it does not support the use of HDFS. Further, in this mode, there is no custom configuration required for mapred-site.xml, core-site.xml, hdfs-site.xml files. Much faster when compared to other modes.

  • Pseudo-Distributed Mode (Single Node Cluster): In this case, you need configuration for all the three files mentioned above. In this case, all daemons are running on one node and thus, both Master and Slave node are the same.
  • Fully Distributed Mode (Multiple Cluster Node): This is the production phase of Hadoop (what Hadoop is known for) where data is used and distributed across several nodes on a Hadoop cluster. Separate nodes are allotted as Master and Slave.

Namenode

Namenode is the node which stores the file system metadata i.e. which file maps to what block locations and which blocks are stored on which datanode. The namenode maintains two in-memory tables, one which maps the blocks to datanodes (one block maps to data nodes for a replication value of and a datanode to block number mapping. Whenever a datanode reports a disk corruption of a particular block, the first table gets updated and whenever a datanode is detected to be dead (because of a node/network failure) both the tables get updated.

DataNode

The datanode is where the actual data resides.

Some interesting traits of the datanodes are as follows:

All datanodes send a heartbeat message to the namenode every 3 seconds to say that they are alive. If the namenode does not receive a heartbeat from a particular data node for 10 minutes, then it considers that data node to be dead/out of service and initiates replication of blocks which were hosted on that data node to be hosted on some other data node.

The data nodes can talk to each other to rebalance data, move and copy data around and keep the replication high.When the datanode stores a block of information, it maintains a checksum for it as well. The data nodes update the namenode with the block information periodically and before updating verify the checksums. If the checksum is incorrect for a particular block i.e. there is a disk level corruption for that block, it skips that block while reporting the block information to the namenode. In this way, namenode is aware of the disk level corruption on that datanode and takes steps accordingly.

JobTracker

The primary function of the JobTracker is resource management (managing the task trackers), tracking resource availability and task life cycle management (tracking its progress, fault tolerance etc.)

TaskTracker

It has a simple function of following the orders of the job tracker and updating the job tracker with its progress status periodically.The task tracker is pre-configured with a number of slots indicating the number of tasks it can accept. When the job tracker tries to schedule a task, it looks for an empty slot in the tasktracker running on the same server which hosts the datanode where the data for that task resides. If not found, it looks for the machine in the same rack. There is no consideration of system load during this allocation.

Most common Input Formats in Hadoop

There are three most common input formats in Hadoop:

  • Text Input Format: Default input format in Hadoop.
  • Key Value Input Format: used for plain text files where the files are broken into lines
  • Sequence File Input Format: used for reading files in sequence

SequenceFile in Hadoop

Extensively used in MapReduce I/O formats, SequenceFile is a flat file containing binary key/value pairs. The map outputs are stored as SequenceFile internally. It provides Reader, Writer and Sorter classes. The three SequenceFile formats are:

  1. Uncompressed key/value records.
  2. Record compressed key/value records – only ‘values’ are compressed here.
  3. Block compressed key/value records – both keys and values are collected in ‘blocks’ separately and compressed. The size of the ‘block’ is configurable.

Rack Awareness in Hadoop

HDFS is rack aware in the sense that the namenode and the job tracker obtain a list of rack ids corresponding to each of the slave nodes (data nodes) and creates a mapping between the IP address and the rack id. HDFS uses this knowledge to replicate data across different racks so that data is not lost in the event of a complete rack power outage or switch failure

Distributed Cache

Distributed Cache distributes application-specific, large, read-only files efficiently. DistributedCache is a facility provided by the Map/Reduce framework to cache files (text, archives, jars and so on) needed by applications. The framework will copy the necessary files to the slave node before any tasks for the job are executed on that node. Its efficiency stems from the fact that the files are only copied once per job and the ability to cache archives which are un-archived on the slaves. It can also be used as a rudimentary software distribution mechanism for use in the map and/or reduce tasks. It can be used to distribute both jars and native libraries and they can be put on the classpath or native library path for the map/reduce tasks.

The DistributedCache is designed to distribute a small number of medium-sized artifacts, ranging from a few MBs to few tens of MBs. One drawback of the current implementation of the Distributed Cache is that there is no way to specify map or reduce specific artifacts.

Counters

Counters represent global counters, defined either by the Map/Reduce framework or applications. Applications can define arbitrary Counters and update them in the map and/or reduce methods. These counters are then globally aggregated by the framework.

Counters are appropriate for tracking few, important, global bits of information. They are definitely not meant to aggregate very fine-grained statistics of applications.

Counters are a useful channel for gathering statistics about the job: for quality control or for application level-statistics. They are also useful for problem diagnosis.Counters are very expensive since the Job Tracker has to maintain every counter of every map/reduce task for the entire duration of the application.

Speculative Execution

Speculative execution is a way of coping with individual Machine performance. In large clusters where hundreds or thousands of machines are involved there may be machines which are not performing as fast as others. This may result in delays in a full job due to only one machine not performing well. To avoid this, speculative execution in hadoop can run multiple copies of same map or reduce task on different slave nodes. The results from first node to finish are used.

JobTracker makes different TaskTrackers process same input. When tasks complete, they announce this fact to the JobTracker. Whichever copy of a task finishes first becomes the definitive copy. If other copies were executing speculatively, Hadoop tells the TaskTrackers to abandon the tasks and discard their outputs. The Reducers then receive their inputs from whichever Mapper completed successfully, first.

FSImage and Edit Log

The fsimage is a file that represents a point-in-time snapshot of the filesystem’s metadata. However, while the fsimage file format is very efficient to read, it’s unsuitable for making small incremental updates like renaming a single file. Thus, rather than writing a new fsimage every time the namespace is modified, the NameNode instead records the modifying operation in the edit log for durability. This way, if the NameNode crashes, it can restore its state by first loading the fsimage then replaying all the operations (also called edits or transactions) in the edit log to catch up to the most recent state of the namesystem. The edit log comprises a series of files, called edit log segments, that together represent all the namesystem modifications made since the creation of the fsimage.

Fault tolerance

Failure is not an exception; it’s the norm. MapReduce treats failure as a first-class citizen and supports re-execution of failed tasks on healthy worker nodes in the cluster. Should a worker node fail, all tasks are assumed to be lost, in which case they are simply rescheduled elsewhere. The unit of work is always the task, and it either completes successfully or it fails completely .

The task tracker spawns different JVM processes to ensure that process failures do not bring down the task tracker.

Process Isolation

In MapReduce, records are processed in isolation by tasks called Mappers. The output from the Mappers is then brought together into a second set of tasks called Reducers, where results from different mappers can be merged together.

Task Isolation

Hadoop allows multiple user to utilize the cluster, while keeping the cluster functioning, and keeping the QOS of important jobs, regardless to how malicious (or incompetent) the users running tasks on the cluster are.

Input Split in Hadoop

Input splits are a logical division of your records whereas HDFS blocks are a physical division of the input data. It’s extremely efficient when they’re the same, but in practice it’s never perfectly aligned. Records may cross block boundaries. Hadoop guarantees the processing of all records . A machine processing a particular split may fetch a fragment of a record from a block other than its “main” block and which may reside remotely. The communication cost for fetching a record fragment is inconsequential because it happens relatively rarely.

In HDFS data is split into blocks and distributed across multiple nodes in the cluster. Each block is typically 64Mb or 128Mb in size. Each block is replicated multiple times. Default is to replicate each block three times. Replicas are stored on different nodes. HDFS utilizes the local file system to store each HDFS block as a separate file. HDFS Block size can not be compared with the traditional file system block size.

Edge Node/Staging Node

Edge nodes or Staging Nodes in a Hadoop cluster are typically nodes that are responsible for running the client-side operations of a Hadoop cluster. Typically edge-nodes are kept separate from the nodes that contain Hadoop services such as HDFS, MapReduce, etc, mainly to keep computing resources separate. For smaller clusters only having a few nodes, it’s common to see nodes playing a hybrid combination of roles for master services (Job Tracker, NameNode, etc.) , slave services (Task Tracker, Data Node, etc) and gateway services.

Note that running master and slave Hadoop services on the same node is not an ideal setup, and can cause scaling and resource issues depending on what’s at use. This kind of configuration is typically seen on a small-scale dev environment

Partitioner in Hadoop

Partitioner is built in Mapper phase .It is a phase in MapReduce in which similar type of data {data having same keys} are sent to one reducer .It is mainly done for performance reasons of the Hadoop Cluster. Each mapper must determine for all of its output (key, value) pairs which reducer will receive them. It is necessary that for any key, regardless of which mapper instance generated it, the destination partition is the same.

A partitioner divides the intermediate key-value pairs produced by map tasks into partition. The total number of partition is equal to the number of reducers where each partition is processed by the corresponding reducer. The partitioning is done using the hash function based on a single key or group of keys. The default partitioner available in Hadoop is HashPartitioner.

Role of RecordReader in Hadoop MapReduce

InputSplit defines a slice of work, but does not describe how to access it. The RecordReader class loads the data from its source and converts it into (key, value) pairs suitable for reading by the “Mapper” task. The “RecordReader” instance is defined by the “Input Format”.

Various configuration parameters required to run a MapReduce job

The main configuration parameters which users need to specify in “MapReduce” framework are:

  • Job’s input locations in the distributed file system
  • Job’s output location in the distributed file system
  • Input format of data
  • Output format of data
  • Class containing the map function
  • Class containing the reduce function
  • JAR file containing the mapper, reducer and driver classes

Identity Mapper and Identity Reducer

Identity mapper is the default mapper provided by the Hadoop framework. It runs when no mapper class has been defined in the MapReduce program where it simply passes the input key – value pair for the reducer phase.

Like Identity Mapper, Identity Reducer is also the default reducer class provided by the Hadoop, which is automatically executed if no reducer class has been defined. It also performs no computation or process, rather it just simply write the input key – value pair into the specified output directory.

Submitting extra files or data ( like jars, static files, etc. ) for a MapReduce job during runtime

The distributed cache is used to distribute large read-only files that are needed by map/reduce jobs to the cluster. The framework will copy the necessary files from a URL on to the slave node before any tasks for the job are executed on that node. The files are only copied once per job and so should not be modified by the application.

InputFormat in Hadoop

Another important feature in MapReduce programming, InputFormat defines the input specifications for a job. It performs the following functions: • Validates the input-specification of job. • Split the input file(s) into logical instances called InputSplit. Each of these split files are then assigned to individual Mapper. • Provides implementation of RecordReader to extract input records from the above instances for further Mapper processing

Difference between HDFS block and InputSplit

An HDFS block splits data into physical divisions while InputSplit in MapReduce splits input files logically. While InputSplit is used to control number of mappers, the size of splits is user defined. On the contrary, the HDFS block size is fixed to 64 MB, i.e. for 1GB data , it will be 1GB/64MB = 16 splits/blocks. However, if input split size is not defined by user, it takes the HDFS default block size.

Debugging Hadoop Job

First, check the list of MapReduce jobs currently running. Next, we need to see that there are no orphaned jobs running; if yes, you need to determine the location of Resource Manager logs.

  1. Run: “ps –ef | grep –I ResourceManager” and look for log directory in the displayed result. Find out the job-id from the displayed list and check if there is any error message associated with that job.

  2. On the basis of RM logs, identify the worker node that was involved in execution of the task.

  3. Now, login to that node and run – ps –ef | grep –iNodeManager

  4. Examine the Node Manager log. The majority of errors come from user level logs for each map-reduce job.

Handling failures by Name Node

Datanode constantly communicates with the Namenode, each Datanode sends a Heartbeat message to the Namenode periodically.

If the signal is not received by the Namenade as intended, the Namenode will consider that Datanode as a failure and doesn’t send any new request to the dead datanode. If the Replication Factor is more than 1, the lost Blocks from the dead datanode can be recovered from other datanodes where the replica is available thus providing features like data availability and Fault tolerance.

Fault Tolerance in Hadoop

Fault tolerance in HDFS refers to the working strength of a system in unfavourable conditions and how that system can handle such situation. HDFS is highly fault tolerant. It handles faults by the process of replica creation. The replica of users data is created on different machines in the HDFS cluster. So whenever if any machine in the cluster goes down, then data can be accessed from other machine in which same copy of data was created. HDFS also maintains the replication factor by creating replica of data on other available machines in the cluster if suddenly one machine fails.

Serialization and Deserialization in Hadoop

Serilaization is the process of converting structured objects into a byte stream. It is done basically for two purposes one, for transmission over a network(interprocess communication) and for writing to persistent storage. In Hadoop the interprocess communication between nodes in the system is done by using remote procedure calls i.e. RPCs. The RPC protocol uses serialization to make the message into a binary stream to be sent to the remote node, which receives and deserializes the binary stream into the original message.

RPC serialization format is expected to be:

  • Compact: To efficenetly use network bandwidth.
  • Fast: Very little performance overhead is expected for serialization and deserilization process.
  • Interoperable: The format needs to be designed to support clients that are written in different languages to the server. It should be noted that the data format for persistent storage purposes would have different requirements from serilaization framework in addition to four expected properties of an RPC’s serialization format mentioned above.
  • Compact : To efficenetly use storage space.
  • Fast : To keep the overhead in reading or writing terabytes of data minimal.
  • Extensible : To transparently read data written in older format.
  • Interoperable :To read and write persistent using different languages.

.CRC file in Hadoop

The Hadoop LocalFileSystem performs client-side checksumming. This means that when you write a file called filename, the filesystem client transparently creates a hidden file, .filename.crc, in the same directory containing the checksums for each chunk of the file. Like HDFS, the chunk size is controlled by the io.bytes.per.checksum property, which defaults to 512 bytes. The chunk size is stored as metadata in the .crc file, so the file can be read back correctly even if the setting for the chunk size has changed. Checksums are verified when the file is read, and if an error is detected, LocalFileSystem throws aChecksumException.

References

Apache Hadoop

Gautam, N. “Analyzing Access Logs Data using Stream Based Architecture.” Masters, North Dakota State University ,2018.Available

Share: Twitter Facebook Google+ LinkedIn
comments powered by Disqus