Wednesday, 23 November 2016

XFT: Practical Fault Tolerance Beyond Crashes

In the OSDI'16, it was introduced a new fault tolerant approach called XFT to build reliable and secure distributed systems to tolerate crash and non-crash (Byzantine) faults. Paper here.

XFT allows to design reliable protocols that tolerate crash machine faults regardless on the number of network faults, and, at the same time, tolerate non-crash machine faults when the number of faulty machines or partitioned are within a threshold.

The intuition behind the XFT protocol is that conditions when there is a total control of the network and the nodes are very rare. Therefore, using most of the BFT protocols in applications are excessive and a little over the top. The XFT is for the cases in which an adversary cannot easily coordinate enough network partitions and non-crash faulty machine actions at the same time. XFT can be used to protect against "accidental" non-crash faults, which happen occasionally. Or, to protect against malicious non-crash faults as long as the attacker do not perform a coordinated attack to compromise Byzantine machines and partition the network under a certain threshold. Or, the attacker cannot compromise the communication among a large number of correct participants.




State-of-the art asynchronous CFT protocols guarantee consistency despite any crash faults or  n-1 partitioned replicas. They also guarantee availability whenever a majority of replicas are correct.

In the case of XFT, guarantee consistency in two modes: (i) without non-crash faults, despite any number of crash-faulty and partitioned replicas, and (ii) with non-crash faults, whenever a majority of replicas are correct and not partitioned, i.e., provided the sum of all kinds of faults (machine or network faults) does not exceed a majority of correct process. Similarly, it also guarantees availability whenever a majority of replicas are correct and not partitioned.

The consistency guarantees of XFT are incomparable to those of asynchronous BFT. On the one hand,  XFT is consistent with the number of non-crash faults in the range [n/3; n/2), whereas asynchronous BFT is not. On the other hand, unlike XFT, asynchronous BFT is consistent when the number of non-crash faults is less than n/3.

In this paper, the authors also present XPaxos, a Paxos-like algorithm designed specifically in the XFT model. I will explain this protocol in another post.

Thursday, 28 July 2016

Changing the Log4j debug with `hadoop jar` in MapReduce jobs

Source: http://goo.gl/v7P1x2

One of the most common questions I come across when trying to help debug MapReduce jobs is: "How do I change the Log4j level for my job?" Many times, a user has a JAR with a class that implements Tool that they invoke using the hadoop jar command. The desire is to change the log level without changing any code or global configuration files:
  1. hadoop jar MyApplication.jar com.myorg.application.ApplicationJob <args ...>
There is an extremely large amount of misinformation because how to do this has drastically changed from the 0.20.x and 1.x Apache Hadoop days. Most posts will inform you of some solution involving environment variables or passing Java opts to the mappers/reducers. In practice, there is actually a very straightforward solution.
To change the Mapper Log4j level, set mapreduce.map.log.level. To change the Reducer Log4j level, set mapreduce.reduce.log.level. If for some reason you need to change the Log4j level on the MapReduce ApplicationMaster (e.g. to debug InputSplit generation), you need to set yarn.app.mapreduce.am.log.level. This is the proper way for the Apache Hadoop 2.x release line. These options do not allow configuration of a Log4j level on a certain class or package -- this would require custom logging setup to be provided by your application.
It's important to remember that you are able to define configuration properties (which will appear in your job via the Hadoop Configuration) using the `hadoop jar` command:
  1. hadoop jar <jarfile> <classname> [-Dkey=value ...] [arg, ...]
The `-Dkey=value` section can be used to define the Log4j configuration properties when you launch the job.
For example, to set the DEBUG Log4j level on Mappers:
  1. hadoop jar MyApplication.jar com.myorg.application.ApplicationJob -Dmapreduce.map.log.level=DEBUG <args ...>
To set the WARN Log4j level on Reducers:
  1. hadoop jar MyApplication.jar com.myorg.application.ApplicationJob -Dmapreduce.reduce.log.level=WARN <args ...>
To set the DEBUG Log4j level on the MapReduce Application Master:
  1. hadoop jar MyApplication.jar com.myorg.application.ApplicationJob -Dyarn.app.mapreduce.am.log.level=DEBUG <args ...>
And, of course, each of these options can be used with one another:
  1. hadoop jar MyApplication.jar com.myorg.application.ApplicationJob -Dmapreduce.map.log.level=DEBUG -Dmapreduce.reduce.log.level=DEBUG -Dyarn.app.mapreduce.am.log.level=DEBUG <args ...>

Wednesday, 13 July 2016

Overview of the HDFS

HDFS is a distributed filesystem that works in a cluster, and it is used by the Hadoop framework. The HDFS is composed by one Namenode and several Datanodes.

The role of the Namenode is to control the execution of Datanodes, have a map of the filesystem, and issue filesystem commands like read and write files. The Datanodes store the blocks of data.



The Namenode receives periodically heartbeats from the Datanodes, and this is the way to know which are running. The  blocks of data can be replicated in several clusters. We  can see the replicated blocks in the figure. By default, the replication factor is 3.

The checksum is used to ensure blocks or files are not corrupted while files are being read from HDFS or written to HDFS.

HDFS is a fault tolerant system that uses replication to protect files against hardware failures. The network failures are addressed by using multiple racks with multiple switches.



Speculative Execution in MapReduce

MapReduce is a widely used parallel computing framework for large scale data processing. The two major performance metrics in MapReduce are job execution time and cluster throughput.

A job can be seriously impacted by straggler machines — machines on which tasks take an unusually long time to finish. Speculative execution is a common approach for dealing with the straggler problem by simply backing up those slow running tasks on alternative machines.

Hadoop is a widely used open-source implementation of MapReduce. The original speculative execution strategy used in Hadoop-0.20 (we call it Hadoop-Original) simply identifies a task as a straggler when the task’s progress falls behind the average progress of all tasks by a fixed gap.

Multiple speculative execution strategies have been proposed, but they have some pitfalls:
  1. Use average progress rate to identify slow tasks while in reality the progress rate can be unstable and misleading, 
  2. Cannot appropriately handle the situation when there exists data skew among the tasks, 
  3. Do not consider whether backup tasks can finish earlier when choosing backup worker nodes.
 We can categorize the causes for stragglers into internal and external reasons. Internal reasons can be solved by the MapReduce service, while external reasons cannot.

The internal reasons for a straggler are the heterogeneous resources capacity of worker nodes, and the existence of resource competition from other tasks running on the same worker node.

But MapReduce cannot deal with resoure competition due to co-hosted applications, input data skew, faulty hardware, and remote input and output source being too slow.

Hadooop's speculative execution is driven by two factors: SPECULATIVE_LAG and SPECULATIVE_GAP.

Hadoop decides to launch stragglers after a running for a time period greater that SPECULATIVE_LAG and when the progress score is less than the average score minus SPECULATIVE_GAP.

The default value of SPECULATIVE_LAG is 60 seconds and SPECULATIVE_GAP is 0.2.