Thursday, 28 July 2016

Changing the Log4j debug with `hadoop jar` in MapReduce jobs

Source: http://goo.gl/v7P1x2

One of the most common questions I come across when trying to help debug MapReduce jobs is: "How do I change the Log4j level for my job?" Many times, a user has a JAR with a class that implements Tool that they invoke using the hadoop jar command. The desire is to change the log level without changing any code or global configuration files:
  1. hadoop jar MyApplication.jar com.myorg.application.ApplicationJob <args ...>
There is an extremely large amount of misinformation because how to do this has drastically changed from the 0.20.x and 1.x Apache Hadoop days. Most posts will inform you of some solution involving environment variables or passing Java opts to the mappers/reducers. In practice, there is actually a very straightforward solution.
To change the Mapper Log4j level, set mapreduce.map.log.level. To change the Reducer Log4j level, set mapreduce.reduce.log.level. If for some reason you need to change the Log4j level on the MapReduce ApplicationMaster (e.g. to debug InputSplit generation), you need to set yarn.app.mapreduce.am.log.level. This is the proper way for the Apache Hadoop 2.x release line. These options do not allow configuration of a Log4j level on a certain class or package -- this would require custom logging setup to be provided by your application.
It's important to remember that you are able to define configuration properties (which will appear in your job via the Hadoop Configuration) using the `hadoop jar` command:
  1. hadoop jar <jarfile> <classname> [-Dkey=value ...] [arg, ...]
The `-Dkey=value` section can be used to define the Log4j configuration properties when you launch the job.
For example, to set the DEBUG Log4j level on Mappers:
  1. hadoop jar MyApplication.jar com.myorg.application.ApplicationJob -Dmapreduce.map.log.level=DEBUG <args ...>
To set the WARN Log4j level on Reducers:
  1. hadoop jar MyApplication.jar com.myorg.application.ApplicationJob -Dmapreduce.reduce.log.level=WARN <args ...>
To set the DEBUG Log4j level on the MapReduce Application Master:
  1. hadoop jar MyApplication.jar com.myorg.application.ApplicationJob -Dyarn.app.mapreduce.am.log.level=DEBUG <args ...>
And, of course, each of these options can be used with one another:
  1. hadoop jar MyApplication.jar com.myorg.application.ApplicationJob -Dmapreduce.map.log.level=DEBUG -Dmapreduce.reduce.log.level=DEBUG -Dyarn.app.mapreduce.am.log.level=DEBUG <args ...>

Wednesday, 13 July 2016

Overview of the HDFS

HDFS is a distributed filesystem that works in a cluster, and it is used by the Hadoop framework. The HDFS is composed by one Namenode and several Datanodes.

The role of the Namenode is to control the execution of Datanodes, have a map of the filesystem, and issue filesystem commands like read and write files. The Datanodes store the blocks of data.



The Namenode receives periodically heartbeats from the Datanodes, and this is the way to know which are running. The  blocks of data can be replicated in several clusters. We  can see the replicated blocks in the figure. By default, the replication factor is 3.

The checksum is used to ensure blocks or files are not corrupted while files are being read from HDFS or written to HDFS.

HDFS is a fault tolerant system that uses replication to protect files against hardware failures. The network failures are addressed by using multiple racks with multiple switches.



Speculative Execution in MapReduce

MapReduce is a widely used parallel computing framework for large scale data processing. The two major performance metrics in MapReduce are job execution time and cluster throughput.

A job can be seriously impacted by straggler machines — machines on which tasks take an unusually long time to finish. Speculative execution is a common approach for dealing with the straggler problem by simply backing up those slow running tasks on alternative machines.

Hadoop is a widely used open-source implementation of MapReduce. The original speculative execution strategy used in Hadoop-0.20 (we call it Hadoop-Original) simply identifies a task as a straggler when the task’s progress falls behind the average progress of all tasks by a fixed gap.

Multiple speculative execution strategies have been proposed, but they have some pitfalls:
  1. Use average progress rate to identify slow tasks while in reality the progress rate can be unstable and misleading, 
  2. Cannot appropriately handle the situation when there exists data skew among the tasks, 
  3. Do not consider whether backup tasks can finish earlier when choosing backup worker nodes.
 We can categorize the causes for stragglers into internal and external reasons. Internal reasons can be solved by the MapReduce service, while external reasons cannot.

The internal reasons for a straggler are the heterogeneous resources capacity of worker nodes, and the existence of resource competition from other tasks running on the same worker node.

But MapReduce cannot deal with resoure competition due to co-hosted applications, input data skew, faulty hardware, and remote input and output source being too slow.

Hadooop's speculative execution is driven by two factors: SPECULATIVE_LAG and SPECULATIVE_GAP.

Hadoop decides to launch stragglers after a running for a time period greater that SPECULATIVE_LAG and when the progress score is less than the average score minus SPECULATIVE_GAP.

The default value of SPECULATIVE_LAG is 60 seconds and SPECULATIVE_GAP is 0.2.