Friday, 27 November 2015

NetAgg: Using Middleboxes for Application-specific on-path aggregation in data centres

Many applications in the data centers (DC) achieve horizontal scalability by adopting the aggregation pattern. The aggregation pattern can clog the network due to scarce inbound bandwidth or limited inter-rack bandwidth. Is this paper, the authors propose NETAGG, a software middlebox platform that provides an on-path aggregation service.

A middlebox is a network appliance attached to a switch that provides services such as firewalls, web proxies, SSL offloading, and load balancing. To maximise performance, middleboxes are often implemented in hardware, and adopt a vertically integrated architecture focusing on a narrow function, e.g. processing standard packet headers or performing relatively simple payload inspection.

They use NETAGG for application-specific aggregation functions for data reduction an each hop, and consequently reducing the possibility of networking bottleneck and, ultimately, improving the preformance. NETAGG use shim layers at edge servers in order to intercept application traffic and redirect it transparently to the agg boxes. They assume that the agg boxes are attached to the network switches and perform on-path aggregation. In this figure, the traffic comes for top-of-the-rack switches (ToR) to a top switch that is connected to a middlebox. The traffic is relayed from the switch to the middleboxes before continuing its course.

They have tested NETAGG with Apache Hadoop and noticed that the intermediate data size was reduced considerately by a factor of 5X on 128GB of intermediate data. In terms of time, NETAGG reduces the shuffle and reduce time in a factor by 4.5X. As a result, this reduces the processing time as well as disk I/O. The only case that NETAGG does not give any advantage is in the case where the job does not reduce the data like in a sorting algorithm.

In conclusion, NETAGG is an on-path aggregation service that transparently intercepts aggregation flows at edge servers using shim layers and redirects them to aggregation nodes (agg boxes). Since NETAGG reduces the data to be transferred between switches, the overall performance of MapReduce jobs can be reduced.

Cloudlab testbed

Cloudlab is a distributed network similar to GENI that comprises around 5000 cores and 300-500 Terabytes of storage. Cloudlab provide 2x10Gbps network interfaces to every node through Software-Defined Networking. The infrastructure contains 100Gbps full-mesh SDN that lets users interconnect a wide range of in-cluster experimental topologies, e.g., fat trees, rings, hypercubes, etc. CloudLab provides two major types of storage: per-server storage (a mix of high-performance flash and high-capacity magnetic disks at a ratio of about 1 disk per every 4 cores), and a centralized storage system. This storage mix enables a range of experiments with file systems, storage technologies, and big data, while providing reliable file systems to researchers who are not interested in storage experiments.

Cloudlab federates itself with GENI. This means that GENI users can access CloudLab with their existing accounts, and CloudLab users have access to all of the hardware resources federated with GENI. CloudLab sites interconnect with each other via IP and Layer-2 links to regional/national research networks, and the core GENI network. A single experiment can include GENI Racks (small clusters distributed across the United States)

Still, Cloudlab offer 3 sites where they have available several type of hardware that are located in the Utah University, Wisconsin University, and Clemson University.

Wednesday, 25 November 2015

Improving availability in distributed systems with failure informers

When a distributed system acts on failure reports, the system's correctness and availability depend on the granularity and semantics of those reports. The system's availability also depends on coverage (failures are reported), accuracy (reports are justified), and timeliness (reports come quickly). The availability is a paramount concern of distributed applications, and it is important that the systems can recover quickly when a failure happens. Existing mechanisms for reporting failures are coarse-grained, lack coverage, lack accuracy, or do not handle latent failures.

The paper presents Pigeon, a service for reporting host and network failures to highly available distributed applications. The application accurately detects a failure, and report it back in order to the system recover immediately. Pigeon classifies failures into four types: whether the problem certainly occurred versus whether it is expected and imminent, and whether the target is certainly and permanently stopped versus not.

Pigeon has several limitations and operating assumptions. First, Pigeon assumes a single administrative domain (but there are many such networks, including enterprise networks and data centers). Second, Pigeon requires the ability to install code in the application and network routers (but doing so is viable in single administrative domains). Third, for Pigeon to be most effective, the administrator or operator must perform environment-specific tuning (but this needs to be done only once)

Pigeon uses sensors that must detect faults quickly and confirm critical faults; the latter requirement ensures that Pigeon does not incorrectly report stops. The architecture accommodates pluggable sensors, and our prototype includes four types: a process sensor and an embedded sensor at end-hosts, and a router sensor and an OSPF sensor in routers.

There is also a component called interpreter that gathers information about faults and outputs the failure conditions. The interpreter must (1) determine which sensors correspond to the client- specified target process, (2) determine if a condition is implied by a fault, (3) estimate the condition's duration, (4) report the condition to the application via the client library, and (5) never falsely report a stop condition.

Pigeon is not suitable for applications like DNS because the client recovery is lightweight, so there is little benefit over using short end-to-end timeouts, since the cost of inaccuracy is low. Some applications do not make use of any information about failures; such applications likewise do not gain from Pigeon. For example, NFS (on Linux) has a hard-mount mode, in which the NFS client blocks until it can communicate with its NFS server; this NFS client does not expose failures or act on them.

Tuesday, 24 November 2015

Introduction to Celery

Celery is a distributed system for processing messages on a task queue with a focus or real-time processing and support for task scheduling. Celery communicates with a message broker and distributes work. Here is an boilerplate example of Celery.

#project/celery.py
from celery import Celery
from django.conf import settings

app = Celery('appname', broker=settings.BROKER_URL)
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(settings.INSTALLED_APPS)


celery -A appname worker

app is the app instance of all project. Which means it is only possible to have one Celery running per project. The appname is the name of the application and the broker is URL of the message broker where the worker is going to attach.

Celery uses a @task decorator that gives the ability to hook to the celery.

@app.task
def generate_thumbnails_task(image_path, sizes):
    for size in sizes:
        make_thumb(image_path, size)

Celery is used mostly for:

  1. Generating assets after upload
  2. Notifying a set of users when an event happens.
  3. Keeping a search index up to date
  4. Replacing cronjobs (backups, cleanup)

The message broker have the role to send the tasks to the consumers. The consumer is a process that has the role to consume tasks from the queue. The consumer then delegates the tasks to the worker pool to be processed.

Celery was made to work with RabbitMQ, although it can work with other message brokers like Redis. RabbitMQ implements AMQP (Advanced Message Queuing Protocol) that delivers and tracks message acknowledgments. AMQP backend spawns an Erlang process and a queue for each task, which is very expensive. So, in some cases it is useful to keep a cache for previous executed tasks and save these results in key/value store.

#settings.py
CELERY_RESULT_BACKEND = 'redis://:password@host:port/db’

Some tasks can return data like this.

@app.task
def add(a, b):
    return a + b

So, to track the state of a task and its return values, hen execution is finished a "result backend" is used.

When using celery, it is necessary to define the number of workers. To not impact performance, it is recommended that the number of workers be n+1 workers (n is the number of cores).

celery worker -A appname -c 5
celery worker -A appname -P eventlet -c 1000
celery worker -A appname -P gevent -c 1000

There are some pitfalls that is necessary to take care when using Celery:

  1. Parallel Safety
  2. Poor Task Granularity
  3. Misguided Optimizations

In the case of Parallel Safety, you need to write tasks to be parallel safe. If you had no idea when tasks run, you need to plan. If you are trying to disallow certain tasks from running in parallel because of some possible deadlocks, you are focusing on the wrong problem. It means that you need to design better your application. Don't pass objects as task parameters use values like JSON or YAML, and be careful to not have parallel transactions in the database because you can return the wrong value.

In this example, it is used report_id to avoid the PDF being generated if there is already the up-to-date version. When two requests ran in parallel, just one request will be accepted.

#tasks.py
from datetime import datetime

from project.celery import app
from project.utils import create_pdf_from_html

@app.task
def generate_pdf_report(report_id):
    report = Report.objects.get(id=report_id)
    if report.pdf_last_modified < report.html_last_modified:
        pdf_data = create_pdf_from_html(report.html)
        now = datetime.now()
        Report.objects.filter(id=report_id).update(
            pdf_data=pdf_data, pdf_last_modified=now)

When creating a task, it is necessary to have the right granularity of a task. Too big task code is hard to read and maintain and takes many seconds, even minutes to run. Also, it is harder to scale horizontally. When you design tasks around small units, you can execute them in parallel. Don’t parallel everything, because some problems can only execute in sequential.

#tasks.py

from project.celery import app

from forum.utils import notify_user
from forum.models import ForumPost

@app.task
def notify_forum_users(post_id):
    post = ForumPost.objets.get(post_id)
    users = post.parent_thread.users.all()
    for user in users:
        notify_user(user)

Granularity Problem



#tasks.py

from project.celery import app

from forum.utils import notify_user
from forum.models import ForumPost

@app.task
def notify_forum_users(post_id):
    post = ForumPost.objets.get(id=post_id)
    users = post.parent_thread.users.all()
    for user in users:
        notify_user_task.delay(user.id)

@app.task
def notify_user_task(user_id)
    user = User.objects.get(id=user_id)
    notify_user(user)

Granularity Solution

When optimizing Celery, don't just turn the number of workers up without measuring. Too much concurrency and resources become constrained. Like most code, network and storage I/O tend to be the bottleneck. Also profile your code if it is slow and look for slow DB queries, storage or network intensive work.

This notes came from bit.ly/1k1fVLz.

Monday, 23 November 2015

Participatory Networking: An API for Application Control of SDNs

This paper presents an API called PANE that is implemented by an OpenFlow controller that delegates read and write authority to the end users and allows applicants and end-hosts to control software-defined networking (SDN) bandwidth. In a common network or SDN, the absence of security guarantees and limited authorities, participatory networks would be places of anarchy. To be usable, such networks must provide isolation for their independent principals, preventing malicious users from consuming and controlling the bandwidth, dropping packets, or worse. While isolation could be provided through network virtualization, the authors believe that is not always the right abstraction as it hides the fundamentally shared aspect of networks. PANE delegates read and write authority, with optional restrictions, from the network's administrators to the users, or applications and hosts acting on their behalf. This API have to safely decompose control and visibility of the network, and to resolve conflicts between participants and across requests.

PANE is designed for networks within a single administrative domain including corporate WANs, datacenters, campus or enterprise networks, and home networks. In such networks, there is, logically, a single owner from which authority can be delegated. Conflicts arises naturally in a participatory network, as PANE is designed to allow multiple, distributed principals to author the network configuration. PANE represents the user requests in a Hierarchical Flow Tables (HFT), and to resolve conflicts the policies take a conflict resolution operator that takes as input two conflicting requests, and return a single resolved request. For instance, if we have requests that demand bandwidth of 10Mbps and 30Mbps, this conflict is resolved by guaranteeing the higher bandwidth.

PANE uses multiple types allow HFTs to resolve different types of conflicts using independent logic: 1. resolve conflicts between requests within the same share 2. resolve conflicts between conflicting requests in parent and child shares, 3. and resolve conflicts between sibling shares.

A network can be partitioned due to a failure in the switches or routers. PANE controller considers two types of failures: the failure of network elements, and the failure of the controller itself.

In the first case, when a switch or link fails, or when a link's configuration changes, the PANE runtime must recompile the policy tree to new individual switch flow tables, as previously used paths may no longer be available or acceptable. In the second case, PAN can keep a database-like persistent log of accepted requests that are periodically compacted by removing the requests that expired. Upon a recovery, PANE controller can restore its state.

They tested some applications with PANE to check the benefits of the solution. One of the test was made with Hadoop MapReduce, where they augmented Hadoop 2.0.3 to work with PANE API. By using PANE, Hadoop was able to reserve guaranteed bandwidth for some operations like the shuffle and sort, and the writing the final output. The first set of reservations occurs during the shuffle where each reducer reserves bandwidth for transferring data from the mappers. The second set of reservations reserves bandwidth when writing the final output. These few reservations improved the overall performance by 19%.

Tuesday, 1 September 2015

Improving Hadoop Performance in Intercloud Environments

In this short paper published in SIGMETRICS'11, the authors propose a new scheduler to improve the performance without the help of speculative execution in intercloud environments. They have set a single mapreduce runtime to work in two clouds, and they have put the data in only one of the cloud (private cloud). Mappers are launched in both clouds, which means that they can read data locally or remotely.
The authors claim that speculative execution can add additional costs to the cloud (I am talking about money), and they can degrade performance because all tasks in the end start to compete for scarce resources.
This new algorithm, ICMR, aims to make all map workers finish at the same time, so that all reducers can start at the same time. When we make all map workers finish at the same time, we are saying that any mapper can stall the beginning of the reduce phase.
In the Figure 1, Node 1 and 2 represent the private clusters and compute nodes from the clouds, respectively. In Node 1, map processing time is longer than shuffling time and in node 2, shuffling time is longer than map processing time due to the slow wide-area network.
Total running time of node $i$, $(T_i)$, is composed of total map processing time $(M_i)$, total shuffling time $(S_i)$ and overlapped time between $M_i$ and $S_i$. The total map processing time $(M_i)$ is as follows:
  1. $Mi = \frac{W_i}{v_i} _{\leftarrow\ data\ processing\ speed}^{\leftarrow\ input\ data}$ $, vi=min(fsi, pi)$
where $W_i$ is the amount of input data for node $i$, and $vi$ is the processing speed of node $i$ (not CPU clock). The processing speed $vi$ is determined by two performance metrics, which are data reading rate from storage $(fsi)$ and the computing power of node $i$ $(p_i)$. In compute nodes from the cloud - the ones that maps need to read data remotely, $fsi$ (data reading rate) has a lower value than $p_i$ (computing power).
The total map shuffling time $(Si)$ is as follows:
  1. $S_i=\frac{I_i}{t_i/N_i} _{\leftarrow data\ transfer\ rate\ per\ mapper}^{\leftarrow total\ intermediate\ data}$
where $I_i$ is the total amount of intermediate data produced at node $i$, $t_i$ is the data transfer rate to reduce workers, and $N_i$ is the number of map workers in node $i$ $(Ni)$. The $t_i/N_i$ will give the data transfer rate to reducers divided by the number of mappers in that node $N_i$.
From the equations (1) and (2), they get the total running time of the node $i$ $(T_i)$:
  1. $T_i=M_i+S_i−overlapped\ time$
If the total running time is not short enough, the overlapped time is almost same as $M_i$ or $S_i$ depending on the execution environment. Thus, $T_i$ can be represented as follows:
  1. $T_i \approx max(M_i, S_i)$
Finally we get a total running time for the whole job $(T)$ as:
  1. $T= max(T_1, ..., T_n)$
The objective of the ICMR is to finish the shuffle phases of all map tasks at the same time. By doing this, idle resources are reduced, and the reduce phase can start as soon as possible.
They assume the amount of intermediate data $(I_i)$ is proportional to the amount of input data $(W_i)$.
  1. $I_i=\alpha \times W_i$
where $\alpha$ is measured and updated whenever one map task is completed. In other words, $\alpha$ is the ratio between the generated input data and the total input data.
The total amount of input data $(W_{total})$ is the sum of the partial input data that are split to each mapper. This means that ICMR calculate the amount of data eacha mapper will process.
If all mappers finish shuffle phases at the same time, we get the following relation:
  1. $max ( \frac{W_i}{v_i}, \frac{\alpha W_i}{t_1/N_i})$
The equation (7) just tells that the time that it takes to execute the whole job it will be based on the maximum time that it took to process the mappers and to shuffle the data.
ICMR dynamically adjusts $W_i$ in proportion to the performance of each compute node. First, ICMR assigns equal amount of task to all compute nodes, because it has no measured values for essential metrics of $\alpha$, $v_i$ and $t_i$. After several tasks are finished, the central job tracker of Hadoop gathers initial values for the metrics, and ICMR can calculate $W_i$ for all compute nodes. During processing of jobs, the performance of network and CPU would be varied. Therefore, the performance metrics which include $\alpha$, $v_i$ , $t_i$ are measured and updated periodically.
The values of $\alpha$, $v_i$ and $t_i$ are calculated per job.
They have tested the scheduler in 2 clouds from Amazon EC2. They have used four instances from U.S. west and east each, and the total number of compute nodes was eight. They claim that ICMR scheduler reduced total map and shuffling time by about 19%, because all nodes had almost the same performance. The difference was due to the limitation of the network bandwidth across the 2 clouds.
Moreover, because the processing speed of node $v_i$ in the ICMR scheduling model measures not a CPU clock rate but data processing speed, the total running time of map processing time was also decreased.

Monday, 24 August 2015

What is byzantine fault tolerance, and companies really do care about it?

Byzantine failures are defined as arbitrary deviations of a process from its assumed behavior based on the algorithm it is supposed to be running and the inputs it receives. Such failures can occur, e.g., due to a software bug, a (transitional or permanent) hardware malfunction, or a malicious attack.

The term Byzantine refers to the Byzantine Generals' Problem, an agreement problem described by Leslie Lamport, Robert Shostak and Marshall Pease in 1982 paper, "The Byzantine Generals Problem". In this paper, they described a problem between Generals during a battle to attack a city. In its simplest form, every general must agree to attack to win the city, or to retreat. Those general are far away from each other, and the only way to reach an agreement is sending the same message to all of them through the messengers. The problem is complicated by the presence of traitorous generals who may cast different votes to different parts.

This problem can be reduced to solving a "Commander and Lieutenants" problem where Loyal Lieutenants must all act in unison and that their action must correspond to what the Commander ordered in the case that the Commander is Loyal.

  1. One solution considers scenarios in which messages may be forged, but which will be Byzantine-fault-tolerant as long as the number of traitorous generals does not equal or exceed one third of the generals ($f<\frac{n}{3}$);
  2. A second solution requires unforgeable message signatures which can provide Byzantine fault tolerance in the presence of an arbitrary number of traitorous generals.

In 1999, Miguel Castro and Barbara Liskov introduced the "Practical Byzantine Fault Tolerance" (PBFT) algorithm, which provides high-performance Byzantine state machine replication very efficiently. PBFT triggered Byzantine fault tolerant replication research with protocols like Zyzzyva, BFT-SMaRt, and others.

State machine replication (SMR) or state machine approach (SMA) is a general method for implementing a fault-tolerant service by replicating servers and coordinating client interactions with server replicas. SMR can used to tolerate fail-stop failures and also Byzantine failures. The idea is to implement a service using a set of server replicas in such a way that the overall service can continue to behave as specified even if a number of servers is faulty. If the service is designed to tolerate arbitrary faults, which include attacks and intrusions, then the service can be said to be intrusion-tolerant, or Byzantine resilient.

The resilience of a protocol can be defined as the maximum number of faults in the presence of which the protocol still behaves according to its specification. Practical intrusion-tolerant replicated systems based on the state machine approach can handle at most $f$ Byzantine components out of a total of $n=3f+1$, which is the maximum resilience in asynchronous systems.

Although most of the companies are more concerned with fail-stop failures, it does not mean that systems that tolerates arbitrary faults are useless. The Bitcoin network works in parallel to generate a chain of Hashcash style proof-of-work. The proof-of-work chain is the key to overcome Byzantine failures and to reach a coherent global view of the system state.

Any company operating at a scale like IBM/Google/Yahoo/Microsoft/Apple cares, because arbitrary faults happen and when you have 500.000 or more computers, and it is difficult to just have IT guys deal with it. One example, is that these companies implemented the Paxos algorithm in some of their services.

Thursday, 13 August 2015

Computer Science Conference Rank

In this site you can check the conference ranking for computer science.
I have found out that SIGMETRICS is not in the above list, so I leave here a second link where you can check the conference ranking.

Monday, 20 July 2015

Revisiting Memory Errors in Large-Scale Production Data Centers

Several work exists to talk about memory errors in Large Scale production data centers1. This paper is a continuation of a previous study and it took 14 months long to be completed and it was performed in the Facebook servers. With this study, the authors:

  1. analyze new DRAM failure trends in modern devices and workloads;
  2. develop a model for examining the memory failure rates of systems with different characteristics;
  3. describe and perform the first analysis of a large-scale implementation of a software technique proposed in prior work to reduce DRAM error rate.

During the analysis of the logs, they have noticed that:

  1. The number of memory errors per machine follows a Pareto distribution, with decreasing hazard rate. The memory errors tend to occur more frequently in the servers that have crashed more times.
  2. Non-DRAM memory failures, such as those in the memory controller and the memory channel, are the source of the majority of errors that occur. This memory errors can cause a system to crash.
  3. DRAM failure rates increase with newer cell fabrication technologies. As bigger the size of memory is, higher is the failure rate. For instance, 4GB of DRAM have 1. 8x failure rate that 2GB.
  4. The number of DIMM chips and transfer width affect error rate.
  5. The failure rate can vary according to the work load.

They have defined the errors that happen in memory as correctable or uncorrectable errors. Correctable errors (CE) are errors that can be corrected by ECC mechanism, uncorrectable errors (UCE) cannot. CE happen more frequently than UCE, and it can degrade the system performance. UCE can make a system crash. They also have noticed that the more errors a server have, higher is the probability that it will get another error, following the Pareto law. > According to the Schroeder conjecture, if a server has suffered lots of CE, maybe it is better to change the DIMM than waiting for the first UCE. This will reduce the likelihood of uncorrectable errors.

Most of the CE that happen in the socket and in the channel affect a large amount of memory, but they have noticed that these errors only happened in a small fraction of servers. It is the spurious errors that affect most of the servers and it is necessary to have more effective techniques for detecting and reducing the reliability impact of weak cells in the memory.

They have tested the servers with different type of workload. They have split the resource requirements that builds a workload in Processor, Memory, and Storage. On each requirement, they have set a level of Low, Medium, and High. E.g., the Hadoop workload uses high processor and storage, and medium memory.

In the end, they have developed a model for memory failures where they claim that using lower density DIMMs and fewer processors can reduce failure rates by up to 57. 7.

Based on this paper, we can conclude that the failure rate will also affect Hadoop framework, but there is one question related to Hadoop they do not answer, and maybe it is out of scope of this paper. Will 1000 commodities machines with Hadoop framework installed have the same failure rate as the framework running in 1000 big servers? Maybe this is a question that open doors for an interesting study.

Notes:

DRAM: Dynamic random-access memory (DRAM) is a type of random-access memory that stores each bit of data in a separate capacitor within an integrated circuit. Compared with SRAM, only one transistor and a capacitor are required per bit, compared to four or six transistors in SRAM. This allows to reach high capacity.

Tuesday, 30 June 2015

Implementing Fault-Tolerant Services Using the State Machine Approach

A state machine consists of state variables, which encode its state, and commands, which transform its state. The execution of the command is atomic in respect to the other commands and modifies the state variables and produces some output.

Byzantine Failures - the component can exhibit arbitrary and malicious behaviour, perhaps evolving collusion with other faulty components. If a process is faulty, it can overwrite all memory locations, and destroy information. If several machines collude, the processes can write the same wrong result in memory.

Fail-stop Failures - In response to a failure, the component changes to a state that permits other processes to detect that.

When processors can experience Byzantine failures, an ensemble implementing a t fault-tolerant state machine must have at least 2t+1 replicas, and the output of the ensemble is the output produced by the majority of the replicas. This is because with 2t+1 replicas, the majority of the outputs remain correct even after as many as t failures.

In case of fail-stop failures, then it is only necessary t+1 replicas, because only correct outputs are produces by correct processes, and after t failures there is still one process that produces the output.

To implement a fault-tolerant state machine it is necessary to ensure the following:

  • Replica Coordination. All replicas receive and process the same sequence of requests. This can be decomposed into two requirements concerning dissemination of requests to replicas in an ensemble.
  • Agreement. Every nonfaulty state machine replica receives every request.
  • Order. Every nonfaulty state machine replica processes the requests it receives in the same relative order.

The Agreement is concerned to the client interaction with the state machine replicas, and the Order concerns with the behaviour of the state machine replica with respect to the requests. Sometimes, according to the use case, the implementation of the Agreement and Order can be relaxed.

The Agreement can be relaxed for read-only requests when fail-stop processors are being assumed. A request can only be sent to a single nonfaulty state machine replica. This is possible because the response from the replica is always guaranteed to be correct.

Another example of relaxation of the Order property is when 2 requests r and rʹ can commute in a state machine because the result will be always the same independently r is processed before or after rʹ.

It just makes sense to guarantee the Order property if the Agreement property is also guaranteed.

The Agreement requirement is guaranteed if:

  • IC1. All nonfaulty processors agree on the same value.
  • IC2. If the transmitter is non faulty, then all nonfaulty processors use its value as the one on which they agree.

The Order of the requests can be guaranteed if the algorithm uses logical clocks or synchronized real-time clocks to timestamp requests.

Depending on whether the output of the state machine implemented by the ensemble is to be used within the system or outside the system, different architectures must be provided.

If the outputs of the state machine are used by external components, then that device is already a single component whose failure cannot be tolerated. The usual solution to this problem is to replicate the output device and voter. Each voter gets the output of each state machine replica, votes the result and produces a signal that drives one output device. E.g., a flap on an airplane wing might be designed so that when the 2t+1 actuators that control it do not agree, the flap always moves in the direction of the majority (rather than twisting). So, each voter have sent the result to one actuator.

If output devices exhibit only fail-stop failures then it is only necessary to the output device get one result. In this case, it is assumed that the result that the output device receives is correct.

If the outputs of the state machine are used by the client, then the client itself can combine the outputs of state machine replicas in the ensemble. When Byzantine failures are possible, the client waits until it has received t+1 identical responses, each from a different replicas, and takes that as the response from the t fault-tolerant state machine. When only fail-stop failures are possible, the client can proceed as soon as it has received a response.

Thursday, 25 June 2015

ADAPT: Availability-aware MapReduce Data Placement for Non-Dedicated Distributed Computing

In this paper, the authors proposed an Availability-aware data placemente (ADAPT) strategy to improve the application performance without extra storage cost. The objective of the data placement algorithm is to find an optimized mapping from data blocks to the nodes, such that all nodes complete their assigned blocks at the same time. They propose an analytical model to estimate the execution time of MapReduce tasks under non-dedicated distributed computing environments. This way, they can mitigate the impact of volatility and heterogeneity of the nodes. ADAPT dynamically dispatches data blocks onto participating hosts based on their availabilities.

ADAPT was implemented within Hadoop MapReduce platform and incurs minor overheads to the existing Hadoop framework.

They perform extensive experiments and simulations to evaluate the feasibility and payoffs of ADAPT. The experimental results show that ADAPT improves application performance by more than 30%.

Sunday, 21 June 2015

Consensus with Byzantine Failures and Little System Synchrony

In distributed systems, consensus is the act of having all parts agreeing in choosing one value. E.g., in binary consensus problem, every correct process proposes some value in {0,1} and must make an irrevocable decision on a value such that follow these properties:

  • (Agreement) No two correct processes decide differently;
  • (Validity) If some correct process decides v , then v is proposed by some correct process;
  • (Termination) Every correct process eventually decides some value.

The consensus problem is at the core of fault-tolerant distributed systems. Solving consensus is impossible in asynchronous systems subject to process failures. A well-known way to overcome this impossibility is to make partial synchrony assumptions about the system, like relative speeds of processes are bounded, and all links are eventually timely (a message sent at the time T are delayed at most Δ by the link).

This possibility result holds for a system Scrash with crash failures and a system Sbyz with byzantine failures, with a resiliency of n>2f+1 and n>3f+1, respectively, where n is the number of processes and f is the maximum that can fail.

To solve consensus, is it really necessary that all links be eventually timely? What if only some links are eventually timely, while other links can be arbitrarily slow; can consensus still be solved? How these possibilities work for crash-failure and byzantine systems?

It has been proved in previous work that consensus is possible in a weaker system where at least one unknown faulty process whose f outgoing direct links are eventually timed. This results only works for systems with crash failures.

In this paper, the authors evaluates these possibilities for byzantine systems by trying to solve consensus in a system with all processes with some eventually timed links. They have defined two systems (i) Sbyz where all the links are eventually timed and (ii) Sʹbyz where only the to and from links of the correct processes are eventually timed.

Their consensus algorithm uses consistent unique broadcast as subroutine, where messages have a tag to ensure that (1) correct processes deliver the same set of messages, and (2) a correct process delivers at most one message with a given tag. Tags are used to ensure that a byzantine process does not broadcast two different messages in the same round. They also use provable reliable send that guarantees that all message m sent by a correct process p will be delivered to q and other process r will get the proof about the delivery.

Subroutines

Consistent unique broadcast

Consistent unique broadcast is guaranteed by the following properties:

  • (Validity) If a correct process p broadcasts (X,k,v) then all correct processes eventually cudeliver (X,k,v) from p_ ( (X,k) is the tag, and v is the value) ;
  • (Unforgeability) If a correct process p does not broadcasts (X,k,v) then no correct process will ever deliver it.
  • (Uniqueness) For each tag (X,k) and q, a correct process delivers at most one message with the tag. In other words, there is no correct process that will deliver the same message more than once.
  • (Relay) If a correct process delivers (X,k,v) from p then all correct processes will eventually deliver the message.

Provable reliable send

Provable reliable send guarantees that if p is correct then all correct processes r gets the proof that m is in transit, and if a correct process r gets the proof that m is in transit, and q is correct, then q receives m. In other words, a process r gets the proof that m is in transit if p is a correct process and have sent m, and q is a correct process and have received m. The provable reliable send follows these properties:

  • (Integrity) A correct process q receives m from a correct process p at most once, and only if p has previously sent m to q;
  • (Validity) If some correct process p sends m to some correct process q then eventually q receives m from p;
  • (Proof-Integrity) If some correct process r gets the proof of m from some process p to some correct process q then q receives m from p;
  • (Proof-Validity) If some correct process p sends m to some process q then every correct process r gets the proof of m from p to q.
  • (Eventual timeliness) If process q is a bisource (a process whose incoming and outgoing links are eventually timely) then there exists Δʹ and Tʹ such that if some correct process r gets the proof of m from some process p to process q at time t then q receives m from p by time max{t,Tʹ}+Δʹ. In other words, if r have the proof of m being sent at the time t from p, then the message m must be delivered at a time max{t,Tʹ}+Δ .

Consensus

They use the previous subroutines to implement consensus. They have derived their consensus algorithm from Ben-Or's randomized algorithm. Here, each process pi=1...n keeps a current estimate of the decision value, which is initially the value that p0 proposes to consensus. The algorithm proceeds by rounds, where each round has four phases: certification, reporting, proposing, and consulting the coordinator where all process exchange the proposed v until all agree on the same value.

They have concluded with this algorithm that consensus is possible in system Sbyz, in which all non-faulty process whose incoming and outgoing links are all eventually timely. This means that consensus is not possible in a Sʹbyz systems (some correct process does not have all links eventually timed). In other words, it is not possible to tolerate byzantine failures if it exists at least one non-faulty process s whose n1 or f outgoing links are eventually timed (in this situation, consensus is only possible in system that tolerates crash faults).

Sunday, 14 June 2015

Several types of synchrony

The concept of partial synchrony in a distributed system is introduced, and it lies between the cases of a synchronous system and an asynchronous system. In a synchronous system, there is a known fixed upper bound Δ on the time required for a message to be sent from one processor to another and a known fixed upper bound Φ on the relative speeds of different processors. In an asynchronous system no fixed upper bounds Δ and Φ exist. In a partial synchronous system, in one version, the fixed bounds Δ and Φ exist, but they are not known a priori. On the other version, the bounds are know but they are only guaranteed to hold starting some time T.

One way to compare the several models of synchrony is with the problem of agreement. So, lets consider a collection of N processors, p1,,pN, which communicate by sending messages to one another. Initially each processor pi has a value vi drawn from some domain V of values, and the correct processors must all decide on the same value; moreover, if the initial values are all the same, say v, then v must be the common decision. In addition, the consensus protocol should operate correctly if some of the processors are faulty, for example, if they crash (fail-stop faults), fail to send or receive messages when they should (omission faults), or send erroneous messages (Byzantine faults).

Given the assumption about synchronism of the message delivery and the processor speed, we can characterize the model of the consensus by its resiliency - the maximum number of faults that can be tolerated. For example, it might be assumed that there is a fixed upper bound Δ on the time for messages to be delivered (communication is synchronous) and a fixed upper bound Φ on the rate at which one processor's clock can drift than another's (processors are synchronous), and that these bounds are known a priori and can be "built into" the protocol.

In the case that there are no bounds for message delivery or processor speed (asynchronous systems), it is impossible to have consensus because, in the case of process failures, we cannot know if a process has failed, or the message delivery is slow.

A well-known way to overcome this impossibility is to make partial synchrony assumptions about the system. The consensus problem is possible in this model if the speed of the processes are bounded and all links are eventually timely. If there is a process that the message is not delivered at a time T+Δ, the process is considered failed. This possibility result holds for crash failures (Scrash) and byzantine failures (Sbyz). Thus, consensus is achieved from n2f+1 and s3f+1 for crash and Byzantine failures, respectively, where n is the number of processes and f is the maximum number of processes that can fail.

Therefore, for the consensus problem, the resilience of the algorithm depends on the system model that we define.

Based on Cyntha Dwork, et al. paper [1], here is a table that show how many processes N are necessary to tolerate f faults in all type of systems.

Failure type Synchronous Asynchronous Partially synchronous communications and synchronous processors Partially synchronous communication and processors Partially synchronous processors and synchronous communication
Fail-stop f 2f+1 2f+1 f
Omission f 2f+1 2f+1 [2f, 2f+1]
Authenticated Byzantine f 3f+1 3f+1 2f+1
Byzantine 3f+1 3f+1 3f+1 3f+1

[1] Cynthia Dwork, et al., Consensus in the Presence of Partial Synchrony, 1988

Thursday, 11 June 2015

Smart redundancy for Distributed Computation

Many software today are distributed systems composed by a large numbers of autonomous software and hardware participants that interact over untrusted networks. There are distributed data stores (e.g., Freenet), peer-to-peer A/V streaming applications (e.g., Skype), and distributed computation architectures (DCA) which solve massive problems by deploying highly parallelizable computations (i.e., sets of independent tasks) to dynamic networks of potentially faulty and untrusted computing nodes (e.g. Hadoop). These systems use redundancy mechanisms to tolerate faults and achieve acceptable levels of reliability.

In this paper, the authors focus on the existent redundancy mechanisms (traditional and progressive redundancy) for DCA and present a new method called iterative redundancy which ensures efficient replication of computation and data given finite processing and storage resources, even when facing Byzantine faults. They claim that iterative redundancy is more efficient and more adaptive than comparable state-of-the-art techniques (traditional and progressive redundancy) that operate in environments with unknown system resource reliability.

Traditional redundancy

This k-vote redundancy performs k ∈ {3,5,7,...} independent executions of the same task in parallel, and then takes a vote on the correctness of the result. If at least some minimum number of executions agree on a result, a consensus exists, and that result is taken to be the solution. Briefly, the algorithm just needs a minimum number of matching of the same result (e.g., majority - k+12) to find consensus.

Progressive redundancy

Sometimes traditional redundancy reaches a consensus quickly but still continues to distribute jobs that do not affect the task's outcome. Progressive redundancy minimizes the number of jobs needed to produce a consensus. It starts a minimum of k+12 jobs and checks if they return the same result. If so, there will be a consensus and the results produced by any subsequent jobs of the same task become irrelevant. If some nodes agree, but not enough to produce a consensus, the task server automatically distributes the minimum number of additional copies of the job necessary until it reaches a consensus.

Iterative redundancy

Progressive redundancy is guaranteed to distribute the fewest jobs to achieve a consensus. In contrast, iterative redundancy is guaranteed to distribute the fewest jobs needed to achieve a desired system reliability.

There is a confidence level that varies according the apparent risk of failure of a task. The user specifies how much improvement it needs, and the system uses the available resources to achieve the highest possible system reliability.

The iterative redundancy distributes the minimum number of jobs required to achieve a desired confidence level in the result. Then, if all jobs agree, the task is completed. However, if some results disagree, the confidence level associated with the majority result is diminished and distributes the minimum number of additional jobs that would achieve the desired level of confidence.

This algorithms does not require knowledge of node reliability and can thus be applied to a wider class of systems than credibility-based fault tolerance and blacklisting. For instance, in volunteering computing, the system has no information about new volunteers. If the system collects information about the reliability of nodes over time, malicious nodes that have developed a bad reputation can change their identity. In iterative redundancy this does not happen.

Comparison between redundancies

Progressive and iterative redundancy need to deploy several jobs and wait for the responses before possibly choosing to deploy more. Traditional redundancy launches all the tasks and looks for a consensus based on all results. At first sight, it seems that the response time is much larger for progressive and iterative redundancy than in traditional redundancy. But, in the realm of DCAs, as the number of tasks is far larger than the number of nodes, so the increased response time does not present a problem because the nodes can always execute jobs related to other tasks. There will be few times that a full execution just needs to wait for the failed tasks finish successfully. In other words, no node will ever be idle and all nodes processing capability will be fully utilized.

They have concluded in the evaluation that the average response time for tasks is bigger (1.4 and 2.8) in iterative redundancy than in the progressive and traditional technique, respectively. Moreover, iterative redundancy can guarantee better reliability than the other techniques by the same cost factor. Iterative redundancy peaks at 2.8 times better than traditional redundancy in terms of reliability.

Cost factor

In the above text I have used the term cost factor. To characterize the behaviour of each technique, they derive formulae for two measures of their effect on systems: the system reliability R(r) achieved by and the cost factor C(r) of applying the redundancy technique.

Taking the traditional redundancy example to try to understand what is the cost factor, the reliability of k-vote redundancy is the probability that at least a consensus of jobs does not fail. The cost represents how many tasks will be needed to launch to achieve the desired reliability.

Monday, 8 June 2015

Heading off correlated failures through Independence-as-a-Service

Cloud services normally require high reliability, and rely on redundancy techniques to ensure this reliability. Contrary as expected, seemingly independent infrastructure components, however, may share deep, hidden dependencies. Failures in these shared dependencies may lead to unexpected correlated failures, undermining redundancy efforts. For instance, Amazon S3 replicates each data object across multiple racks in an S3 region, although a failure on a main switch can compromise the access to the infrastructure.

Discovering unexpected common dependencies is extremely challenging, and many of them are diagnosticaded after they have occurred. These retroactive approaches require human intervention, leading to prolonged failure recovery time.

Worse, correlated failures can be hidden not just by inadequate tools or analysts within one cloud provider, but also by non-transparent business contracts between cloud providers and lower-level services

In this paper, they propose an Independence-as-a-Service or INDaaS, a novel architecture that aims to address the above problems proactively. Rather than localizing and tolerating failures after an outage, INDaaS collects and audits structural dependency data to evaluate the independence of redundant systems before failures occur.

This proactive strategy uses acquisition modules that collect dependency data and adapt them into common format, and an auditing agent that employs a similarly pluggable set of auditing modules to quantify the independence of redundant systems and identify common dependencies that may introduce unexpected correlated failures. At the end, an auditing report quantifies the independence of various redundancy deployments, optionally computing some useful information such as the estimates of correlated failure probabilities and ranked lists of potential risk groups.

This is a very thorough paper that details every step of INDaaS. They also present the concept of Risk Group (RG) and a couple of algorithms (Minimal RG and failure sampling alg.) that they use to build and rank RG for auditing. It is with this data that the service will build the final report.

They have evaluated the service in three small "but realistic" case study. They have emulated a real data center topology using 4 servers, and 4 switches. Those 4 servers have 8 VMs running in total. With the tests they have found out that only 14% of probability a user is able to put a service running in servers that does not suffer from correlated faults. As result, INDaaS auditing results gave them hints about weakest parts of the topology. They have also compared the RG algorithms, and concluded that the failure sampling algorithm runs much more efficiently than the minimal RG algorithm and still achieving a reasonable high accuracy. The failure sampling algorithm took 96 minutes to find 92% of all the RGs, in comparison to 1046 minutes for the minimal RG algorithm.

In my opinion the evaluation section has a lot to improve. They did not proof how we can relate the topology results with a real-case scenario. In overall, they have explained thoroughly INDaaS, but presented a shallow Evaluation section.

Risk Group

In redundant systems, a risk group (RG) is a set of components whose simultaneous failures could cause a service outage. Suppose some service A replicates critical state across independent servers B, C and D located in 3 separated racks. The intent of this 3-way redundancy configuration is to all 3 RG be the size of 3, i.e., 3 servers must fail simultaneously to cause an outage. Now imagine that these 3 racks share the same switch. You can see easily that, if the switch become unavailable, the 3 racks will also become unavailable. In this case, a common dependency introduced a RG whose failure could disable the whole service despite redundancy efforts. Also, correlated failures can be hidden not just by inadequate tools or analysts within one cloud provider, but also by non-transparent business contracts between cloud providers. One time, a storm in Dublin recently took down a local power source and its backup generator, disabling both the Amazon and Microsoft clouds in that region for hours In this paper, the authors propose a novel architecture called Independence-as-a-service or INDaaS that aims to collects and audits structural dependency data to evaluate the independence of redundant systems before failures occur.

Discovering unexpected common dependencies is challenging. Many diagnostics attempts to tolerate faults after they have happened. Most of the times, it requires human intervention. Worse, correlated faults can be hidden by not just by inadequate tools or analysts, but also by private contracts between cloud providers.

Tuesday, 26 May 2015

Taming uncertainty in distributed systems with help from the network

In this paper, the authors present Albatross, a service that quickly reports to applications the current status of a remote process—whether it is working and reachable, or not. If Albatross reports a process as "disconnected", it is safe to assume that process cannot affect the world. Albatross is targeted at data centers equipped with software-defined networks (SDNs). Using SDN functionality, Albatross receives notifications about the state of the network, determine which processes are reachable and enforce their determinations by installing drop rules on switches.

The processes are disconnected permanently by the service, and they are only reconnected after they have rollbacked their state to some checkpoint that causally precedes Albatross's disconnected report. By rolling back their state, excluded processes accept their effective deaths, and can be safely reintegrated using standard catch-up techniques (eg, replay).

This service can be useful in different situations. The authors stated that i) Albatross detects network failures an order of magnitude more quickly than the ZooKeeper membership service; ii) integrating RAMCloud with Albatross prevents clients from communicating with servers that have been declared failed, which eliminates a consistency bug in RAMCloud; iii) they have also implemented Zab protocol that uses Albatross - Aab. Aab has a smaller description, fewer phases, fewer round-trips, fewer message types, and fewer counters for ordering messages. Moreover, it tolerates the failure of all but one process. By contrast, Zab tolerates the failure of fewer than half of the processes.

In conclusion, Albatross detects network and processes problems faster than Zookeeper and Falcon, it handles failures at end-hosts, making it a complete solution for failure detection in a data center environment, and it can be also theoretically considered as the first service to apply modern networking techniques to refine the guarantees of distributed systems.

Wednesday, 13 May 2015

C3: Cutting Tail Latency in Cloud Data Stores via Adaptive Replica Selection

In this article presented at NSDI'15, Suresh et al. present an adaptive replica selection mechanism, C3, that works with Cassandra and it is robust to performance variability in the environment.

Systems that respond to user actions very quickly (within 100 milliseconds) feel more fluid and natural to users than those that take longer. Improvements in Internet connectivity and the rise of warehouse-scale computing systems have enabled Web services that provide fluid responsiveness while consulting multi-terabyte datasets that span thousands of servers.

Large online services that need to create a predictably responsive whole out of less predictable parts are called latency tail-tolerant, or tail-tolerant for brevity.

It is challenging to deliver consistent low latency application on the internet. There are some web applications that use databases to retrieve data, and still require low and predictable latencies. Other applications like Hadoop, it is used for distributed computing, and it needs to query big data. If you have a web application that processes client requests using Hadoop, you don't want to keep the client waiting forever for the computation result because you have created a bottleneck by submitting all jobs to the same runtime. You need to find a way to deliver a fast response.

A recurring pattern to reducing tail latency is to exploit the redundancy built into each tier of the application architecture, wherein a client node has to make a choice about selecting one out of multiple replica servers to serve a request.

The replica selection strategy has a direct effect on the tail of the latency distribution. This is particularly so in the context of data stores that rely on replication and partitioning for scalability, such as key-value stores. Replica selection can compensate for these conditions by preferring faster replica servers whenever possible.

C3, an adaptive replica selection mechanism that is robust in the face of fluctuations in system performance. C3 uses a combination of in-band feedback from servers to rank and prefer faster replicas along with distributed rate control and backpressure in order to reduce tail latencies in the presence of service-time fluctuations.

Through comprehensive performance evaluations, they have demonstrated that C3 improves Cassandra’s mean, median and tail latencies.



Monday, 4 May 2015

List of the most used algorithms

Our world is full of algorithms that we find everywhere. But there are some algorithms that are more used than anothers. You can check it here.

Monday, 20 April 2015

SSH - connect without password.

Some time to time, we are scavenging the internet to find how to connect to a remote host without a password with SSH. I have found this link that contains some info about it.

Also, I have decided to use KDE's 'keychain' instead of 'ssh-agent' to have a passwordless session. For this, you must add the private key or the certificate to the keychain instance. When you want to login with ssh, keychain will look for the key or certificate to login. Here are the commands to add keys to the 'keychain':


$ keychan ~/.ssh/my_pem.pem
$ ssh user@host 


If you also face bad network connection, you can use SSH in "tmux" or with "screen" to restore broken ssh connections. Alternatively, and my favourite, you also have Mosh shell which works very well in irregular network connections scenarios. With this tool, you will stop seeing "Broken pipe error" messages forever.

Wednesday, 1 April 2015

Vim is great

I want to avoid an ethereal war that can exist between emacs and vim when discussing which editor is the best. I have used both editors for a long time, so long that I have developed an opinion.

In my opinion, vim is much better than emacs and, to justify myself, I need to point out the drawbacks from emacs.

Vim is the most peaceful application that I have come across. Working with this editor is like being in the Zen state. It is so calm and blissful, and yet so powerful.

To start, I must tell you that I use Dvorak layout to avoid RSI problems. I advise you to use Dvorak or Colemak layout rather than QWERTY. QWERTY was built on purpose to be that awful.

Well, back to our topic. Emacs is a heavy editor that is meant for you to open once, and work there until the end of the day. People say that emacs is an OS, and they are right, but, in some situations, I start to doubt it. For example, the eshell plugin will never replace the power of the Terminator shell program.

It is painful to work with the Ctrl and Meta keys. I have even remapped the Ctrl, but the problem with the Meta key location is still there. For me, it is much more comfortable to access vim prompt than use these keys.

I have no words to express how horrible are the emacs keybindings. Most of the times, they don't make any sense at all. Therefore, I am more used to write the emacs commands than using the keybindings. In vim, the keybindings are acronyms to actions.

And finally, I have tried to learn unsuccessfully Emacs Lisp several times. Finally, I gave up and decided to go to vim once and for all. Lisp has really lots of (Lost In) Stupid Parenthesis.

The only drawback that I find in vim is since you command vim with keys, sometimes I look like an elephant trying to dance ballet on the top of a water lily.

VIM plugins

CTRLP is a fuzzy file finder plugin for vim. Like many other plugins, it is full of options that helps you navigate through files without having to list files in the main window. You can have more information about the plugin here.

This site lists vim plugins  that you can use for you to enable a tailored environment.

Tuesday, 24 March 2015

Conference ranking

You can check conference ranking here, and get more info what evaluation rankings exist here.
If you want to check the conference calendar, check it here.

Tuesday, 3 March 2015

Fully Distributed Hadoop Federation Cluster

Federation Concepts

"HDFS Federation improves the existing HDFS architecture through a clear separation of namespace and storage, enabling generic block storage layer. It enables support for multiple namespaces in the cluster to improve scalability and isolation. Federation also opens up the architecture, expanding the applicability of HDFS cluster to new implementations and use cases.
I have created two guest machines for the namenodes and datanodes to run on. Each VM is allocated 1GB of RAM and 20GB of HDD. Each machine is connected through host-only adapter. Client will be the host machine itself which is able to ssh and connect to both namenodes inside the VirtualBox virtual machines."
-- An excerpt from a Hortonworks blog by Suresh Srinivas.
Find more on Apache Hadoop Federation.

Setup Summary

I have implemented Hadoop Federation inside ESXi server to create a fully distributed cluster on Apache Hadoop 2.6. Setup lists two namenodes and two datanodes each with 1GB RAM and 20GB HDD alloted. Also I have configured a HDFS client on a separate machine with same hardware configuration. All the nodes are running on Ubuntu 14.10. It is also recommended to use latest Oracle JDK for namenodes and datanodes. This setup only demonstrates how to build a federated cluster and hence YARN is not configured.
Below mentioned hostnames and ip-addresses are just an example to provide a distinctive view for understanding the cluster setup.
namenode1 fed-nn01   192.168.56.101
namenode2 fed-nn02   192.168.56.102
datanode1 fed-dn01   192.168.56.103
datanode2 fed-dn02   192.168.56.104
client    fed-client 192.168.56.105

Downloads

Download the below packages and place the tarballs on all namenodes, datanodes and client machines. Apache Hadoop 2.6 Download Oracle JDK8 Download
Note: Before moving directly towards hadoop installation and configuration - 1. Disable firewall on all namenodes, datanodes and client machines. 2. Comment 127.0.1.1 address line in /etc/hosts file on all namenodes, datanodes and client machines. 3. Update correct hostnames and their respective ip-addresses of all namenodes, datanodes and client machines in their /etc/hosts file. 4. Install ssh on all namenodes and datanodes. Not required for client machine.

INSTALLATION & CONFIGURATION

Hadoop installation and configuration includes setting up of hadoop user, installing java, passwordless ssh, and finally hadoop installation, configuration and monitoring.

User Configuration

Location: fed-nn01, fed-nn02, fed-dn01, fed-dn02, fed-client
First we will create a group “hadoop” and a user “huser” for all hadoop administrative tasks and set password for huser.
$ sudo groupadd hadoop
$ sudo useradd -m -d /home/huser -g hadoop huser
$ sudo passwd huser
Note: Here onwards we shall use newly created huser for all hadoop tasks that we perform.

Java Installation

We will go for the recommended Oracle JDK installation and configuration. At the time of writing this document JDK8 was the latest that was available.
Location: fed-nn01, fed-nn02, fed-dn01, fed-dn02
Make a directory named java inside /usr
huser:~$ sudo mkdir /usr/java

Copy the tarball.

huser:~$ sudo cp /path/to/jdk-8u25-linux-x64.tar.gz /usr/java
Extract the tarballs
huser:~$ sudo tar -xzvf /usr/java/ jdk-8u25-linux-x64.tar.gz
Set the environment for jdk8.
huser:~$ sudo vi /etc/profile.d/java.sh
JAVA_HOME=/usr/java/jdk1.8.0_25/
PATH=$JAVA_HOME/bin:$PATH
export PATH JAVA_HOME
export CLASSPATH=.
huser:~$ sudo chmod +x /etc/profile.d/java.sh
huser:~$ source /etc/profile.d/java.sh
Testing Java Installation
huser:~$ java -version
java version "1.8.0_25"
Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.25-b02, mixed mode)
Note: Java installation on client can be skipped if it already has openjdk installed. If not above method can be repeated for the client also.

Passwordless SSH Configuration

Passwordless ssh is required by the namenode only to start the HDFS and MapReduce daemons in various nodes.
Location: fed-nn01
huser@fed01:~$ ssh-keygen -t rsa
huser@fed01:~$ touch /home/huser/.ssh/authorized_keys
huser@fed01:~$ cp /home/huser/.ssh/id_rsa.pub authorized_keys
huser@fed01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-nn02
huser@fed01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-dn01
huser@fed01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-dn02
Location: fed-nn02
huser@fed02:~$ ssh-keygen -t rsa
huser@fed02:~$ touch /home/huser/.ssh/authorized_keys
huser@fed02:~$ cp /home/huser/.ssh/id_rsa.pub authorized_keys
huser@fed02:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-nn01
huser@fed02:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-dn01
huser@fed02:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-dn02
Testing passwordless ssh on fed-nn01 & fed-nn02
Run below commands to test the passwordless logins from both fed-nn01 & fed-nn02.
huser:~$ ssh fed-nn01
huser:~$ ssh fed-nn02
huser:~$ ssh fed-dn01
huser:~$ ssh fed-dn02
Note: Client does not require any ssh settings to be configured. The reason behind it is that the client communicates with the namenode using namenode's configurable TCP port, which is a RPC connection
Whereas, client communicates with the datanode directly for I/O operations (read/write) using Data Transfer Protocol defined in DataTransferProtocol.java. For the purpose of performance this is a streaming protocol and not RPC. Before streaming the block to the datanode, the client buffers the data until a full block (64MB/128MB) has been created.

Hadoop Installation

We will go forth with the latest stable Apache Hadoop 2.6.0 release.
Location: fed-nn01, fed-nn02, fed-dn01, fed-dn02, fed-client
We will untar the tarball and place it in /opt directory.
huser:~$ tar -xzvf hadoop-2.6.0.tar.gz
Changing the ownership of the directory to huser.
huser:~$ sudo chown -R huser:hadoop /opt/hadoop-2.6.0/
Setting the environment variables in .bashrc file of “huser” user.
huser:~$ sudo vi ~/.bashrc
###JAVA CONFIGURATION###
JAVA_HOME=/usr/java/jdk1.8.0_25/
PATH=$JAVA_HOME/bin:$PATH
export PATH JAVA_HOME
export CLASSPATH=.

###HADOOP CONFIGURATION###
HADOOP_PREFIX=/opt/hadoop-2.6.0/
PATH=$HADOOP_PREFIX/bin:$PATH
PATH=$HADOOP_PREFIX/sbin:$PATH
export PATH
Activate the configured environment settings for "huser"" user by running the below command.
huser:~$ exec bash
Testing Hadoop Installation
huser:~$ hadoop version
Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using /opt/hadoop-2.6.0/share/hadoop/common/hadoop-common-2.6.0.jar
Note: There is no need to mention the java environment variables in the .bashrc file of client machine if it already has java installed.

Hadoop Configuration

There are a few files to be configured which will make hadoop up and running. For us these configuration files reside in /opt/hadoop-2.6.0/etc/hadoop/ directory.
hadoop-env.sh Location: fed-nn01, fed-nn02, fed-dn01, fed-dn02, fed-client
huser:~$ vi /opt/hadoop-2.6.0/etc/hadoop/hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_25/
export HADOOP_LOG_DIR=/var/log/hadoop/
Create a log directory in the specified path mentioned in hadoop-env.sh file under the parameter HADOOP_LOG_DIR and change the ownership of the directory to "huser"" user.
$ sudo mkdir /var/log/hadoop
$ sudo chown -R huser:hadoop /var/log/hadoop
Note: In the client machine there is no need to specify and create a log directory. Similarly, it is needless to declare java's home directory if it is pre-installed.
core-site.xml
Location: fed-nn01
huser@fed-nn01:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://fed-nn01</value>
</property>
</configuration>
Location: fed-nn02
huser@fed-nn02:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://fed-nn02</value>
</property>
</configuration>
Location: fed-dn01, fed-dn02
huser@fed-dn01:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://fed-nn01,hdfs://fed-nn02</value>
</property>
</configuration>
Location: fed-client
huser@fed-client:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>viewfs://fedcluster/</value>
</property>
<property>
<name>fs.viewfs.mounttable.fedcluster.link./ns1</name>
<value>hdfs://fed-nn01:8020</value>
</property>
<property>
<name>fs.viewfs.mounttable.fedcluster.link./ns2</name>
<value>hdfs://fed-nn02:8020</value>
</property>
</configuration>
Note: Client is configured to use ViewFS plugin for a more user-friendly & aggregated view of HDFS / filesystems namespaces mounted under respective /ns1 & /ns2 mountpoints. Here we have provided mounttable name as “fedcluster”, which if not specified “default” is taken.
hdfs-site.xml
Location: fed-nn01, fed-nn02
hsuer:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.name.dir</name>
<value>file:///hdfs/name</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>ns1,ns2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns1</name>
<value>fed01:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns2</name>
<value>fed02:8020</value>
</property>
</configuration>
Create a directory for the namenode to store it's persistent metadata and change it's ownership to "huser"" user.
$ sudo mkdir -p /hdfs/name
$ sudo chown -R huser:hadoop /hdfs/
Location: fed-dn01, fed-dn02
hsuer:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>file:///hdfs/data</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>ns1,ns2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns1</name>
<value>fed01:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns2</name>
<value>fed02:8020</value>
</property>
</configuration>
Create a directory for the datanode to store blocks and change it's ownership to "huser" user.
huser:~$ sudo mkdir -p /hdfs/data
huser:~$ sudo chown -R huser:hadoop /hdfs/
Location: fed-client
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>ns1,ns2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns1</name>
<value>fed-nn01:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns2</name>
<value>fed-nn02:8020</value>
</property>
</configuration>
slaves
Location: fed-nn01, fed-nn02
huser:~$ vi /opt/hadoop-2.6.0/etc/hadoop/slaves
fed-dn01
fed-dn02

Formatting HDFS Filesystem

Location: fed-nn01, fed-nn02
huser:~$ hdfs namenode -format -clusterID fedcluster
Note: ClusterID if not specified is randomly chosen by the system.

Starting HDFS

Run the below command either on fed-nn01 or fed-nn02 namenode to start the dfs.
$ start-dfs.sh
At this point, listing of files directly through fed-nn01 and fed-nn02 will show the files that are present only in their respective configured namespaces i.e. ns1 and ns2. Whereas, listing of files on client machine will show both namespaces mountpoints mounted under /. Though for list or copy/retrieve file operations from one namenode to other can be done by specifying full HDFS URI in the commands.
Examples:
Listing files in fed-nn01 namenode
huser@fed-nn01:~$ hdfs dfs -ls -h /
Found 2 items
-rw-r--r-- 2 huser hadoop 128 M 2015-01-02 16:23 /512mb-junk
drwxr-xr-x - huser hadoop 0 2015-01-02 16:28 /user
Listing files in fed-nn02 namenode from fed-nn01 namenode
huser@fed-nn01:~$ hdfs dfs -ls hdfs://fed-nn02:8020/user
Found 4 items
-rw-r--r-- 2 huser hadoop 128 M 2015-01-02 16:26 hdfs://hdfs-fed02:8020/user/512mb-junk
drwxr-xr-x - huser hadoop 0 2015-01-03 02:32 hdfs://hdfs-fed02:8020/user/dir1
-rw-r--r-- 2 huser hadoop 1 G 2015-01-02 15:25 hdfs://hdfs-fed02:8020/user/lgb-junk
-rw-r--r-- 2 huser hadoop 1 G 2015-01-02 15:31 hdfs://hdfs-fed02:8020/user/lgb-junk.1
Listing files from fed-client machines
huser@fed-client:~$ hdfs dfs -ls /
Found 2 items
-r-xr-xr-x - ibm ibm 0 2015-01-03 02:55 /ns1
-r-xr-xr-x - ibm ibm 0 2015-01-03 02:55 /ns2
Listing files on fed-nn02 from fed-client
$ hdfs dfs -ls -h /b/user
Found 4 items
-rw-r--r-- 2 huser hadoop 128 M 2015-01-02 16:26 hdfs://hdfs-fed02:8020/user/512mb-junk
drwxr-xr-x - huser hadoop 0 2015-01-03 02:32 hdfs://hdfs-fed02:8020/user/dir1
-rw-r--r-- 2 huser hadoop 1 G 2015-01-02 15:25 hdfs://hdfs-fed02:8020/user/lgb-junk
-rw-r--r-- 2 huser hadoop 1 G 2015-01-02 15:31 hdfs://hdfs-fed02:8020/user/lgb-junk.1
As you must have seen from above examples that the client provides an aggregated view of the HDFS filesystem using viewfs. Similarly we can copy/retrieve and create files/directories from client machine.

Monitoring

Below http links are also helpful in monitoring and browsing HDFS filesystem using a web-browser on any of the namenodes or client machines.
http://fed-nn01:50070/dfshealth.jsp

http://fed-nn02:50070/dfshealth.jsp

http://fed-nn01:50070/dfsclusterhealth.jsp

http://fed-nn02:50070/dfsclusterhealth.jsp


http://hadoop.apache.org/docs/r2.4.0/hadoop-project-dist/hadoop-hdfs/ViewFs.html

http://hashprompt.blogspot.pt/2015/01/fully-distributed-hadoop-federation.html

http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/Federation.html

https://www.usenix.org/legacy/event/hotos09/tech/full_papers/ko/ko_html/

http://hortonworks.com/blog/webhdfs-%E2%80%93-http-rest-access-to-hdfs/

http://hadoop.apache.org/docs/r1.0.4/webhdfs.html#File+and+Directory+Operations

Friday, 16 January 2015

Practical Fine-Grained Decentralized Information Flow Control

Paper 1 Here

Paper 2 Here

Access control is any mechanism by which a system grants or revokes the right to access some data, or perform some action. Traditional access control mechanisms are all-or-nothing; once an application has the right to read a file, it can do anything with that file's data. In contrast, Differential Information Flow Control (DIFC) enforces more powerful policies, such as permitting an application to read a file, but disallowing the broadcast of the contents of that file over an unsecured network channel.

As an example of the DIFC model, consider Alice and Bob who want to schedule a meeting while keeping their calendars mostly secret. Alice and Bob each place a secrecy label on their calendar file, and then only a thread with those secrecy labels can read it. Once a thread has a secrecy label, it has been tainted by that label, and can no longer write to an unlabeled output, such as standard output or the network. If the thread has the capability to declassify the information, it may remove the secrecy label and then write the data to an unlabeled output. In the calendar example, the program obtains both Alice and Bob's secrecy label to read both calendar files, but then it cannot remove the labels. When the thread is ready to output an acceptable meeting time, it must call a function that then declassifies the result. The declassification function checks that its output contains no secret information. For example, the output is simply a date and does not include other information of Bob's calendar.

In a DIFC system, any principal can create a new tag for secrecy or integrity. Principals assign labels to data objects - data structures (arrays, list, etc...) and system resources (files and sockets). Secrecy label will prevent the data to be disclosed. Integrity label will assure that the data is not modified since it was endorsed by the principal. For example, a web application might create one secrecy tag for its user database and a separate secrecy tag for each user's data. The secrecy tag on the user database will prevent authentication information from leaking to the network. The tags on user data will prevent a malicious user from writing another user's secret data to an untrusted network connection. Also, if Microsoft endorse a data file and integrity is preserved, a user can trust the file content if it trust Microsoft label.

Programmers express security policies by labeling data with secrecy and integrity, and access labeled data scoped in security regions.

It exists several applications that implements the DIFC model, like HiStar, Flume, and Laminar. I will just focus on Flume and Laminar.

Laminar

Laminar is a Decentralized information flow control (DIFC) that combines the language-level (PL) and operating-system level (OS). It is the first system to implement decentralized information flow control using a single set of abstractions for OS resources and heap-allocated objects.

The Laminar OS extends a standard operating system with a Laminar security module for information flow control. The Laminar OS security module governs information flows through all standard OS interfaces, including through devices, files, pipes and sockets. The OS regulates communication between threads of the same or different processes that access the labeled or unlabeled system resources or that use OS inter-process communication mechanisms, such as signals. OS enforcement applies to all applications, preventing unlabeled or non-Laminar applications from circumventing the DIFC restrictions.

The Laminar VM regulates information flow between heap objects and between threads of the same process via these objects. These flows are regulated by inserting dynamic DIFC checks in the application code. Because the Laminar VM regulates these flows within the address space, the OS allows data structures and threads to have heterogeneous labels. All threads in multithreaded processes without a trusted VM must have the same labels and capabilities.

The Laminar OS exports security system calls to the trusted VM for capability and label management. The VM and the OS do not allow code outside the security region to access labeled data objects. During the execution of a security region, the VM gives the thread the labels and capabilities of the security region so that the OS can mediate access to system resources according to the security region's labels. Security regions are not visible to the OS, so the thread itself must have the labels and capabilities that is given by the VM. At the end of the security region, the VM restores the thread's original capabilities and labels.

It is necessary to modify Jikes RVM and the Linux operating system so that Laminar can provide DIFC. Jikes RVM is a Java VM implemented in C++. Airavat uses Laminar to provide security policies. Laminar is publicly available the the Jikes Archives.

Flume

Flume is an open source web application secure infrastructure based of DIFC model. Flume's design is a component within the kernel rather than an entire Operating System like HiStar. It is immediately visible that, any process running outside of Flume is vulnerable because of this. Currently there are two different implementations of Flume, one for Linux and one for OpenBSD. The Linux implementation runs as a component within the kernel, while the OpenBSD version utilizes systrace system calls.

A typical Flume application consists of processes of two types. Untrusted processes do most of the computation. They are constrained by, but possibly unaware of, DIFC controls. Trusted processes, in contrast, are aware of DIFC and set up the privacy and integrity controls that constrain untrusted processes. Trusted processes also have the privilege to selectively violate classical information flow control - for instance, by declassifying private data (perhaps to export it from the system), or by endorsing data as high integrity.

Flume is embedded in FlumeWiki, which is created by MoinMoin. Flume can be 34% slower in writes than Moin is, and 43% slower than reads than Moin is.

Linux Secure Modules (LSM)

Both Flume and Laminar uses LSM to hook into the kernel to allow customization authorization rules. LSM was designed to provide the specific needs of everything needed to successfully implement a mandatory access control module, while imposing the fewest possible changes to the Linux kernel.

Flume's LSM policy disallows all direct access to file systems by confined processes. Fork is blocked in Flume, due to the fact that all process spawning must occur from within the dedicated spawner

Laminar uses LSM to intercept inode and file accesses, which are used to perform operations on files and file handlers, and it do a straigthforward check of the secrecy rules and labels.