Sunday, 14 June 2015

Several types of synchrony

The concept of partial synchrony in a distributed system is introduced, and it lies between the cases of a synchronous system and an asynchronous system. In a synchronous system, there is a known fixed upper bound Δ on the time required for a message to be sent from one processor to another and a known fixed upper bound Φ on the relative speeds of different processors. In an asynchronous system no fixed upper bounds Δ and Φ exist. In a partial synchronous system, in one version, the fixed bounds Δ and Φ exist, but they are not known a priori. On the other version, the bounds are know but they are only guaranteed to hold starting some time T.

One way to compare the several models of synchrony is with the problem of agreement. So, lets consider a collection of N processors, p1,,pN, which communicate by sending messages to one another. Initially each processor pi has a value vi drawn from some domain V of values, and the correct processors must all decide on the same value; moreover, if the initial values are all the same, say v, then v must be the common decision. In addition, the consensus protocol should operate correctly if some of the processors are faulty, for example, if they crash (fail-stop faults), fail to send or receive messages when they should (omission faults), or send erroneous messages (Byzantine faults).

Given the assumption about synchronism of the message delivery and the processor speed, we can characterize the model of the consensus by its resiliency - the maximum number of faults that can be tolerated. For example, it might be assumed that there is a fixed upper bound Δ on the time for messages to be delivered (communication is synchronous) and a fixed upper bound Φ on the rate at which one processor's clock can drift than another's (processors are synchronous), and that these bounds are known a priori and can be "built into" the protocol.

In the case that there are no bounds for message delivery or processor speed (asynchronous systems), it is impossible to have consensus because, in the case of process failures, we cannot know if a process has failed, or the message delivery is slow.

A well-known way to overcome this impossibility is to make partial synchrony assumptions about the system. The consensus problem is possible in this model if the speed of the processes are bounded and all links are eventually timely. If there is a process that the message is not delivered at a time T+Δ, the process is considered failed. This possibility result holds for crash failures (Scrash) and byzantine failures (Sbyz). Thus, consensus is achieved from n2f+1 and s3f+1 for crash and Byzantine failures, respectively, where n is the number of processes and f is the maximum number of processes that can fail.

Therefore, for the consensus problem, the resilience of the algorithm depends on the system model that we define.

Based on Cyntha Dwork, et al. paper [1], here is a table that show how many processes N are necessary to tolerate f faults in all type of systems.

Failure type Synchronous Asynchronous Partially synchronous communications and synchronous processors Partially synchronous communication and processors Partially synchronous processors and synchronous communication
Fail-stop f 2f+1 2f+1 f
Omission f 2f+1 2f+1 [2f, 2f+1]
Authenticated Byzantine f 3f+1 3f+1 2f+1
Byzantine 3f+1 3f+1 3f+1 3f+1

[1] Cynthia Dwork, et al., Consensus in the Presence of Partial Synchrony, 1988

Thursday, 11 June 2015

Smart redundancy for Distributed Computation

Many software today are distributed systems composed by a large numbers of autonomous software and hardware participants that interact over untrusted networks. There are distributed data stores (e.g., Freenet), peer-to-peer A/V streaming applications (e.g., Skype), and distributed computation architectures (DCA) which solve massive problems by deploying highly parallelizable computations (i.e., sets of independent tasks) to dynamic networks of potentially faulty and untrusted computing nodes (e.g. Hadoop). These systems use redundancy mechanisms to tolerate faults and achieve acceptable levels of reliability.

In this paper, the authors focus on the existent redundancy mechanisms (traditional and progressive redundancy) for DCA and present a new method called iterative redundancy which ensures efficient replication of computation and data given finite processing and storage resources, even when facing Byzantine faults. They claim that iterative redundancy is more efficient and more adaptive than comparable state-of-the-art techniques (traditional and progressive redundancy) that operate in environments with unknown system resource reliability.

Traditional redundancy

This k-vote redundancy performs k ∈ {3,5,7,...} independent executions of the same task in parallel, and then takes a vote on the correctness of the result. If at least some minimum number of executions agree on a result, a consensus exists, and that result is taken to be the solution. Briefly, the algorithm just needs a minimum number of matching of the same result (e.g., majority - k+12) to find consensus.

Progressive redundancy

Sometimes traditional redundancy reaches a consensus quickly but still continues to distribute jobs that do not affect the task's outcome. Progressive redundancy minimizes the number of jobs needed to produce a consensus. It starts a minimum of k+12 jobs and checks if they return the same result. If so, there will be a consensus and the results produced by any subsequent jobs of the same task become irrelevant. If some nodes agree, but not enough to produce a consensus, the task server automatically distributes the minimum number of additional copies of the job necessary until it reaches a consensus.

Iterative redundancy

Progressive redundancy is guaranteed to distribute the fewest jobs to achieve a consensus. In contrast, iterative redundancy is guaranteed to distribute the fewest jobs needed to achieve a desired system reliability.

There is a confidence level that varies according the apparent risk of failure of a task. The user specifies how much improvement it needs, and the system uses the available resources to achieve the highest possible system reliability.

The iterative redundancy distributes the minimum number of jobs required to achieve a desired confidence level in the result. Then, if all jobs agree, the task is completed. However, if some results disagree, the confidence level associated with the majority result is diminished and distributes the minimum number of additional jobs that would achieve the desired level of confidence.

This algorithms does not require knowledge of node reliability and can thus be applied to a wider class of systems than credibility-based fault tolerance and blacklisting. For instance, in volunteering computing, the system has no information about new volunteers. If the system collects information about the reliability of nodes over time, malicious nodes that have developed a bad reputation can change their identity. In iterative redundancy this does not happen.

Comparison between redundancies

Progressive and iterative redundancy need to deploy several jobs and wait for the responses before possibly choosing to deploy more. Traditional redundancy launches all the tasks and looks for a consensus based on all results. At first sight, it seems that the response time is much larger for progressive and iterative redundancy than in traditional redundancy. But, in the realm of DCAs, as the number of tasks is far larger than the number of nodes, so the increased response time does not present a problem because the nodes can always execute jobs related to other tasks. There will be few times that a full execution just needs to wait for the failed tasks finish successfully. In other words, no node will ever be idle and all nodes processing capability will be fully utilized.

They have concluded in the evaluation that the average response time for tasks is bigger (1.4 and 2.8) in iterative redundancy than in the progressive and traditional technique, respectively. Moreover, iterative redundancy can guarantee better reliability than the other techniques by the same cost factor. Iterative redundancy peaks at 2.8 times better than traditional redundancy in terms of reliability.

Cost factor

In the above text I have used the term cost factor. To characterize the behaviour of each technique, they derive formulae for two measures of their effect on systems: the system reliability R(r) achieved by and the cost factor C(r) of applying the redundancy technique.

Taking the traditional redundancy example to try to understand what is the cost factor, the reliability of k-vote redundancy is the probability that at least a consensus of jobs does not fail. The cost represents how many tasks will be needed to launch to achieve the desired reliability.

Monday, 8 June 2015

Heading off correlated failures through Independence-as-a-Service

Cloud services normally require high reliability, and rely on redundancy techniques to ensure this reliability. Contrary as expected, seemingly independent infrastructure components, however, may share deep, hidden dependencies. Failures in these shared dependencies may lead to unexpected correlated failures, undermining redundancy efforts. For instance, Amazon S3 replicates each data object across multiple racks in an S3 region, although a failure on a main switch can compromise the access to the infrastructure.

Discovering unexpected common dependencies is extremely challenging, and many of them are diagnosticaded after they have occurred. These retroactive approaches require human intervention, leading to prolonged failure recovery time.

Worse, correlated failures can be hidden not just by inadequate tools or analysts within one cloud provider, but also by non-transparent business contracts between cloud providers and lower-level services

In this paper, they propose an Independence-as-a-Service or INDaaS, a novel architecture that aims to address the above problems proactively. Rather than localizing and tolerating failures after an outage, INDaaS collects and audits structural dependency data to evaluate the independence of redundant systems before failures occur.

This proactive strategy uses acquisition modules that collect dependency data and adapt them into common format, and an auditing agent that employs a similarly pluggable set of auditing modules to quantify the independence of redundant systems and identify common dependencies that may introduce unexpected correlated failures. At the end, an auditing report quantifies the independence of various redundancy deployments, optionally computing some useful information such as the estimates of correlated failure probabilities and ranked lists of potential risk groups.

This is a very thorough paper that details every step of INDaaS. They also present the concept of Risk Group (RG) and a couple of algorithms (Minimal RG and failure sampling alg.) that they use to build and rank RG for auditing. It is with this data that the service will build the final report.

They have evaluated the service in three small "but realistic" case study. They have emulated a real data center topology using 4 servers, and 4 switches. Those 4 servers have 8 VMs running in total. With the tests they have found out that only 14% of probability a user is able to put a service running in servers that does not suffer from correlated faults. As result, INDaaS auditing results gave them hints about weakest parts of the topology. They have also compared the RG algorithms, and concluded that the failure sampling algorithm runs much more efficiently than the minimal RG algorithm and still achieving a reasonable high accuracy. The failure sampling algorithm took 96 minutes to find 92% of all the RGs, in comparison to 1046 minutes for the minimal RG algorithm.

In my opinion the evaluation section has a lot to improve. They did not proof how we can relate the topology results with a real-case scenario. In overall, they have explained thoroughly INDaaS, but presented a shallow Evaluation section.

Risk Group

In redundant systems, a risk group (RG) is a set of components whose simultaneous failures could cause a service outage. Suppose some service A replicates critical state across independent servers B, C and D located in 3 separated racks. The intent of this 3-way redundancy configuration is to all 3 RG be the size of 3, i.e., 3 servers must fail simultaneously to cause an outage. Now imagine that these 3 racks share the same switch. You can see easily that, if the switch become unavailable, the 3 racks will also become unavailable. In this case, a common dependency introduced a RG whose failure could disable the whole service despite redundancy efforts. Also, correlated failures can be hidden not just by inadequate tools or analysts within one cloud provider, but also by non-transparent business contracts between cloud providers. One time, a storm in Dublin recently took down a local power source and its backup generator, disabling both the Amazon and Microsoft clouds in that region for hours In this paper, the authors propose a novel architecture called Independence-as-a-service or INDaaS that aims to collects and audits structural dependency data to evaluate the independence of redundant systems before failures occur.

Discovering unexpected common dependencies is challenging. Many diagnostics attempts to tolerate faults after they have happened. Most of the times, it requires human intervention. Worse, correlated faults can be hidden by not just by inadequate tools or analysts, but also by private contracts between cloud providers.

Tuesday, 26 May 2015

Taming uncertainty in distributed systems with help from the network

In this paper, the authors present Albatross, a service that quickly reports to applications the current status of a remote process—whether it is working and reachable, or not. If Albatross reports a process as "disconnected", it is safe to assume that process cannot affect the world. Albatross is targeted at data centers equipped with software-defined networks (SDNs). Using SDN functionality, Albatross receives notifications about the state of the network, determine which processes are reachable and enforce their determinations by installing drop rules on switches.

The processes are disconnected permanently by the service, and they are only reconnected after they have rollbacked their state to some checkpoint that causally precedes Albatross's disconnected report. By rolling back their state, excluded processes accept their effective deaths, and can be safely reintegrated using standard catch-up techniques (eg, replay).

This service can be useful in different situations. The authors stated that i) Albatross detects network failures an order of magnitude more quickly than the ZooKeeper membership service; ii) integrating RAMCloud with Albatross prevents clients from communicating with servers that have been declared failed, which eliminates a consistency bug in RAMCloud; iii) they have also implemented Zab protocol that uses Albatross - Aab. Aab has a smaller description, fewer phases, fewer round-trips, fewer message types, and fewer counters for ordering messages. Moreover, it tolerates the failure of all but one process. By contrast, Zab tolerates the failure of fewer than half of the processes.

In conclusion, Albatross detects network and processes problems faster than Zookeeper and Falcon, it handles failures at end-hosts, making it a complete solution for failure detection in a data center environment, and it can be also theoretically considered as the first service to apply modern networking techniques to refine the guarantees of distributed systems.

Wednesday, 13 May 2015

C3: Cutting Tail Latency in Cloud Data Stores via Adaptive Replica Selection

In this article presented at NSDI'15, Suresh et al. present an adaptive replica selection mechanism, C3, that works with Cassandra and it is robust to performance variability in the environment.

Systems that respond to user actions very quickly (within 100 milliseconds) feel more fluid and natural to users than those that take longer. Improvements in Internet connectivity and the rise of warehouse-scale computing systems have enabled Web services that provide fluid responsiveness while consulting multi-terabyte datasets that span thousands of servers.

Large online services that need to create a predictably responsive whole out of less predictable parts are called latency tail-tolerant, or tail-tolerant for brevity.

It is challenging to deliver consistent low latency application on the internet. There are some web applications that use databases to retrieve data, and still require low and predictable latencies. Other applications like Hadoop, it is used for distributed computing, and it needs to query big data. If you have a web application that processes client requests using Hadoop, you don't want to keep the client waiting forever for the computation result because you have created a bottleneck by submitting all jobs to the same runtime. You need to find a way to deliver a fast response.

A recurring pattern to reducing tail latency is to exploit the redundancy built into each tier of the application architecture, wherein a client node has to make a choice about selecting one out of multiple replica servers to serve a request.

The replica selection strategy has a direct effect on the tail of the latency distribution. This is particularly so in the context of data stores that rely on replication and partitioning for scalability, such as key-value stores. Replica selection can compensate for these conditions by preferring faster replica servers whenever possible.

C3, an adaptive replica selection mechanism that is robust in the face of fluctuations in system performance. C3 uses a combination of in-band feedback from servers to rank and prefer faster replicas along with distributed rate control and backpressure in order to reduce tail latencies in the presence of service-time fluctuations.

Through comprehensive performance evaluations, they have demonstrated that C3 improves Cassandra’s mean, median and tail latencies.



Monday, 4 May 2015

List of the most used algorithms

Our world is full of algorithms that we find everywhere. But there are some algorithms that are more used than anothers. You can check it here.

Monday, 20 April 2015

SSH - connect without password.

Some time to time, we are scavenging the internet to find how to connect to a remote host without a password with SSH. I have found this link that contains some info about it.

Also, I have decided to use KDE's 'keychain' instead of 'ssh-agent' to have a passwordless session. For this, you must add the private key or the certificate to the keychain instance. When you want to login with ssh, keychain will look for the key or certificate to login. Here are the commands to add keys to the 'keychain':


$ keychan ~/.ssh/my_pem.pem
$ ssh user@host 


If you also face bad network connection, you can use SSH in "tmux" or with "screen" to restore broken ssh connections. Alternatively, and my favourite, you also have Mosh shell which works very well in irregular network connections scenarios. With this tool, you will stop seeing "Broken pipe error" messages forever.

Wednesday, 1 April 2015

Vim is great

I want to avoid an ethereal war that can exist between emacs and vim when discussing which editor is the best. I have used both editors for a long time, so long that I have developed an opinion.

In my opinion, vim is much better than emacs and, to justify myself, I need to point out the drawbacks from emacs.

Vim is the most peaceful application that I have come across. Working with this editor is like being in the Zen state. It is so calm and blissful, and yet so powerful.

To start, I must tell you that I use Dvorak layout to avoid RSI problems. I advise you to use Dvorak or Colemak layout rather than QWERTY. QWERTY was built on purpose to be that awful.

Well, back to our topic. Emacs is a heavy editor that is meant for you to open once, and work there until the end of the day. People say that emacs is an OS, and they are right, but, in some situations, I start to doubt it. For example, the eshell plugin will never replace the power of the Terminator shell program.

It is painful to work with the Ctrl and Meta keys. I have even remapped the Ctrl, but the problem with the Meta key location is still there. For me, it is much more comfortable to access vim prompt than use these keys.

I have no words to express how horrible are the emacs keybindings. Most of the times, they don't make any sense at all. Therefore, I am more used to write the emacs commands than using the keybindings. In vim, the keybindings are acronyms to actions.

And finally, I have tried to learn unsuccessfully Emacs Lisp several times. Finally, I gave up and decided to go to vim once and for all. Lisp has really lots of (Lost In) Stupid Parenthesis.

The only drawback that I find in vim is since you command vim with keys, sometimes I look like an elephant trying to dance ballet on the top of a water lily.

VIM plugins

CTRLP is a fuzzy file finder plugin for vim. Like many other plugins, it is full of options that helps you navigate through files without having to list files in the main window. You can have more information about the plugin here.

This site lists vim plugins  that you can use for you to enable a tailored environment.

Tuesday, 24 March 2015

Conference ranking

You can check conference ranking here, and get more info what evaluation rankings exist here.
If you want to check the conference calendar, check it here.

Tuesday, 3 March 2015

Fully Distributed Hadoop Federation Cluster

Federation Concepts

"HDFS Federation improves the existing HDFS architecture through a clear separation of namespace and storage, enabling generic block storage layer. It enables support for multiple namespaces in the cluster to improve scalability and isolation. Federation also opens up the architecture, expanding the applicability of HDFS cluster to new implementations and use cases.
I have created two guest machines for the namenodes and datanodes to run on. Each VM is allocated 1GB of RAM and 20GB of HDD. Each machine is connected through host-only adapter. Client will be the host machine itself which is able to ssh and connect to both namenodes inside the VirtualBox virtual machines."
-- An excerpt from a Hortonworks blog by Suresh Srinivas.
Find more on Apache Hadoop Federation.

Setup Summary

I have implemented Hadoop Federation inside ESXi server to create a fully distributed cluster on Apache Hadoop 2.6. Setup lists two namenodes and two datanodes each with 1GB RAM and 20GB HDD alloted. Also I have configured a HDFS client on a separate machine with same hardware configuration. All the nodes are running on Ubuntu 14.10. It is also recommended to use latest Oracle JDK for namenodes and datanodes. This setup only demonstrates how to build a federated cluster and hence YARN is not configured.
Below mentioned hostnames and ip-addresses are just an example to provide a distinctive view for understanding the cluster setup.
namenode1 fed-nn01   192.168.56.101
namenode2 fed-nn02   192.168.56.102
datanode1 fed-dn01   192.168.56.103
datanode2 fed-dn02   192.168.56.104
client    fed-client 192.168.56.105

Downloads

Download the below packages and place the tarballs on all namenodes, datanodes and client machines. Apache Hadoop 2.6 Download Oracle JDK8 Download
Note: Before moving directly towards hadoop installation and configuration - 1. Disable firewall on all namenodes, datanodes and client machines. 2. Comment 127.0.1.1 address line in /etc/hosts file on all namenodes, datanodes and client machines. 3. Update correct hostnames and their respective ip-addresses of all namenodes, datanodes and client machines in their /etc/hosts file. 4. Install ssh on all namenodes and datanodes. Not required for client machine.

INSTALLATION & CONFIGURATION

Hadoop installation and configuration includes setting up of hadoop user, installing java, passwordless ssh, and finally hadoop installation, configuration and monitoring.

User Configuration

Location: fed-nn01, fed-nn02, fed-dn01, fed-dn02, fed-client
First we will create a group “hadoop” and a user “huser” for all hadoop administrative tasks and set password for huser.
$ sudo groupadd hadoop
$ sudo useradd -m -d /home/huser -g hadoop huser
$ sudo passwd huser
Note: Here onwards we shall use newly created huser for all hadoop tasks that we perform.

Java Installation

We will go for the recommended Oracle JDK installation and configuration. At the time of writing this document JDK8 was the latest that was available.
Location: fed-nn01, fed-nn02, fed-dn01, fed-dn02
Make a directory named java inside /usr
huser:~$ sudo mkdir /usr/java

Copy the tarball.

huser:~$ sudo cp /path/to/jdk-8u25-linux-x64.tar.gz /usr/java
Extract the tarballs
huser:~$ sudo tar -xzvf /usr/java/ jdk-8u25-linux-x64.tar.gz
Set the environment for jdk8.
huser:~$ sudo vi /etc/profile.d/java.sh
JAVA_HOME=/usr/java/jdk1.8.0_25/
PATH=$JAVA_HOME/bin:$PATH
export PATH JAVA_HOME
export CLASSPATH=.
huser:~$ sudo chmod +x /etc/profile.d/java.sh
huser:~$ source /etc/profile.d/java.sh
Testing Java Installation
huser:~$ java -version
java version "1.8.0_25"
Java(TM) SE Runtime Environment (build 1.8.0_25-b17)
Java HotSpot(TM) 64-Bit Server VM (build 25.25-b02, mixed mode)
Note: Java installation on client can be skipped if it already has openjdk installed. If not above method can be repeated for the client also.

Passwordless SSH Configuration

Passwordless ssh is required by the namenode only to start the HDFS and MapReduce daemons in various nodes.
Location: fed-nn01
huser@fed01:~$ ssh-keygen -t rsa
huser@fed01:~$ touch /home/huser/.ssh/authorized_keys
huser@fed01:~$ cp /home/huser/.ssh/id_rsa.pub authorized_keys
huser@fed01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-nn02
huser@fed01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-dn01
huser@fed01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-dn02
Location: fed-nn02
huser@fed02:~$ ssh-keygen -t rsa
huser@fed02:~$ touch /home/huser/.ssh/authorized_keys
huser@fed02:~$ cp /home/huser/.ssh/id_rsa.pub authorized_keys
huser@fed02:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-nn01
huser@fed02:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-dn01
huser@fed02:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@fed-dn02
Testing passwordless ssh on fed-nn01 & fed-nn02
Run below commands to test the passwordless logins from both fed-nn01 & fed-nn02.
huser:~$ ssh fed-nn01
huser:~$ ssh fed-nn02
huser:~$ ssh fed-dn01
huser:~$ ssh fed-dn02
Note: Client does not require any ssh settings to be configured. The reason behind it is that the client communicates with the namenode using namenode's configurable TCP port, which is a RPC connection
Whereas, client communicates with the datanode directly for I/O operations (read/write) using Data Transfer Protocol defined in DataTransferProtocol.java. For the purpose of performance this is a streaming protocol and not RPC. Before streaming the block to the datanode, the client buffers the data until a full block (64MB/128MB) has been created.

Hadoop Installation

We will go forth with the latest stable Apache Hadoop 2.6.0 release.
Location: fed-nn01, fed-nn02, fed-dn01, fed-dn02, fed-client
We will untar the tarball and place it in /opt directory.
huser:~$ tar -xzvf hadoop-2.6.0.tar.gz
Changing the ownership of the directory to huser.
huser:~$ sudo chown -R huser:hadoop /opt/hadoop-2.6.0/
Setting the environment variables in .bashrc file of “huser” user.
huser:~$ sudo vi ~/.bashrc
###JAVA CONFIGURATION###
JAVA_HOME=/usr/java/jdk1.8.0_25/
PATH=$JAVA_HOME/bin:$PATH
export PATH JAVA_HOME
export CLASSPATH=.

###HADOOP CONFIGURATION###
HADOOP_PREFIX=/opt/hadoop-2.6.0/
PATH=$HADOOP_PREFIX/bin:$PATH
PATH=$HADOOP_PREFIX/sbin:$PATH
export PATH
Activate the configured environment settings for "huser"" user by running the below command.
huser:~$ exec bash
Testing Hadoop Installation
huser:~$ hadoop version
Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a
This command was run using /opt/hadoop-2.6.0/share/hadoop/common/hadoop-common-2.6.0.jar
Note: There is no need to mention the java environment variables in the .bashrc file of client machine if it already has java installed.

Hadoop Configuration

There are a few files to be configured which will make hadoop up and running. For us these configuration files reside in /opt/hadoop-2.6.0/etc/hadoop/ directory.
hadoop-env.sh Location: fed-nn01, fed-nn02, fed-dn01, fed-dn02, fed-client
huser:~$ vi /opt/hadoop-2.6.0/etc/hadoop/hadoop-env.sh
export JAVA_HOME=/usr/java/jdk1.8.0_25/
export HADOOP_LOG_DIR=/var/log/hadoop/
Create a log directory in the specified path mentioned in hadoop-env.sh file under the parameter HADOOP_LOG_DIR and change the ownership of the directory to "huser"" user.
$ sudo mkdir /var/log/hadoop
$ sudo chown -R huser:hadoop /var/log/hadoop
Note: In the client machine there is no need to specify and create a log directory. Similarly, it is needless to declare java's home directory if it is pre-installed.
core-site.xml
Location: fed-nn01
huser@fed-nn01:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://fed-nn01</value>
</property>
</configuration>
Location: fed-nn02
huser@fed-nn02:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://fed-nn02</value>
</property>
</configuration>
Location: fed-dn01, fed-dn02
huser@fed-dn01:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://fed-nn01,hdfs://fed-nn02</value>
</property>
</configuration>
Location: fed-client
huser@fed-client:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/core-site.xml
<configuration>
<property>
<name>fs.default.name</name>
<value>viewfs://fedcluster/</value>
</property>
<property>
<name>fs.viewfs.mounttable.fedcluster.link./ns1</name>
<value>hdfs://fed-nn01:8020</value>
</property>
<property>
<name>fs.viewfs.mounttable.fedcluster.link./ns2</name>
<value>hdfs://fed-nn02:8020</value>
</property>
</configuration>
Note: Client is configured to use ViewFS plugin for a more user-friendly & aggregated view of HDFS / filesystems namespaces mounted under respective /ns1 & /ns2 mountpoints. Here we have provided mounttable name as “fedcluster”, which if not specified “default” is taken.
hdfs-site.xml
Location: fed-nn01, fed-nn02
hsuer:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.name.dir</name>
<value>file:///hdfs/name</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>ns1,ns2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns1</name>
<value>fed01:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns2</name>
<value>fed02:8020</value>
</property>
</configuration>
Create a directory for the namenode to store it's persistent metadata and change it's ownership to "huser"" user.
$ sudo mkdir -p /hdfs/name
$ sudo chown -R huser:hadoop /hdfs/
Location: fed-dn01, fed-dn02
hsuer:~$ sudo vi /opt/hadoop-2.6.0/etc/hadoop/hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>file:///hdfs/data</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>ns1,ns2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns1</name>
<value>fed01:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns2</name>
<value>fed02:8020</value>
</property>
</configuration>
Create a directory for the datanode to store blocks and change it's ownership to "huser" user.
huser:~$ sudo mkdir -p /hdfs/data
huser:~$ sudo chown -R huser:hadoop /hdfs/
Location: fed-client
<configuration>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<property>
<name>dfs.nameservices</name>
<value>ns1,ns2</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns1</name>
<value>fed-nn01:8020</value>
</property>
<property>
<name>dfs.namenode.rpc-address.ns2</name>
<value>fed-nn02:8020</value>
</property>
</configuration>
slaves
Location: fed-nn01, fed-nn02
huser:~$ vi /opt/hadoop-2.6.0/etc/hadoop/slaves
fed-dn01
fed-dn02

Formatting HDFS Filesystem

Location: fed-nn01, fed-nn02
huser:~$ hdfs namenode -format -clusterID fedcluster
Note: ClusterID if not specified is randomly chosen by the system.

Starting HDFS

Run the below command either on fed-nn01 or fed-nn02 namenode to start the dfs.
$ start-dfs.sh
At this point, listing of files directly through fed-nn01 and fed-nn02 will show the files that are present only in their respective configured namespaces i.e. ns1 and ns2. Whereas, listing of files on client machine will show both namespaces mountpoints mounted under /. Though for list or copy/retrieve file operations from one namenode to other can be done by specifying full HDFS URI in the commands.
Examples:
Listing files in fed-nn01 namenode
huser@fed-nn01:~$ hdfs dfs -ls -h /
Found 2 items
-rw-r--r-- 2 huser hadoop 128 M 2015-01-02 16:23 /512mb-junk
drwxr-xr-x - huser hadoop 0 2015-01-02 16:28 /user
Listing files in fed-nn02 namenode from fed-nn01 namenode
huser@fed-nn01:~$ hdfs dfs -ls hdfs://fed-nn02:8020/user
Found 4 items
-rw-r--r-- 2 huser hadoop 128 M 2015-01-02 16:26 hdfs://hdfs-fed02:8020/user/512mb-junk
drwxr-xr-x - huser hadoop 0 2015-01-03 02:32 hdfs://hdfs-fed02:8020/user/dir1
-rw-r--r-- 2 huser hadoop 1 G 2015-01-02 15:25 hdfs://hdfs-fed02:8020/user/lgb-junk
-rw-r--r-- 2 huser hadoop 1 G 2015-01-02 15:31 hdfs://hdfs-fed02:8020/user/lgb-junk.1
Listing files from fed-client machines
huser@fed-client:~$ hdfs dfs -ls /
Found 2 items
-r-xr-xr-x - ibm ibm 0 2015-01-03 02:55 /ns1
-r-xr-xr-x - ibm ibm 0 2015-01-03 02:55 /ns2
Listing files on fed-nn02 from fed-client
$ hdfs dfs -ls -h /b/user
Found 4 items
-rw-r--r-- 2 huser hadoop 128 M 2015-01-02 16:26 hdfs://hdfs-fed02:8020/user/512mb-junk
drwxr-xr-x - huser hadoop 0 2015-01-03 02:32 hdfs://hdfs-fed02:8020/user/dir1
-rw-r--r-- 2 huser hadoop 1 G 2015-01-02 15:25 hdfs://hdfs-fed02:8020/user/lgb-junk
-rw-r--r-- 2 huser hadoop 1 G 2015-01-02 15:31 hdfs://hdfs-fed02:8020/user/lgb-junk.1
As you must have seen from above examples that the client provides an aggregated view of the HDFS filesystem using viewfs. Similarly we can copy/retrieve and create files/directories from client machine.

Monitoring

Below http links are also helpful in monitoring and browsing HDFS filesystem using a web-browser on any of the namenodes or client machines.
http://fed-nn01:50070/dfshealth.jsp

http://fed-nn02:50070/dfshealth.jsp

http://fed-nn01:50070/dfsclusterhealth.jsp

http://fed-nn02:50070/dfsclusterhealth.jsp


http://hadoop.apache.org/docs/r2.4.0/hadoop-project-dist/hadoop-hdfs/ViewFs.html

http://hashprompt.blogspot.pt/2015/01/fully-distributed-hadoop-federation.html

http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-hdfs/Federation.html

https://www.usenix.org/legacy/event/hotos09/tech/full_papers/ko/ko_html/

http://hortonworks.com/blog/webhdfs-%E2%80%93-http-rest-access-to-hdfs/

http://hadoop.apache.org/docs/r1.0.4/webhdfs.html#File+and+Directory+Operations

Friday, 16 January 2015

Practical Fine-Grained Decentralized Information Flow Control

Paper 1 Here

Paper 2 Here

Access control is any mechanism by which a system grants or revokes the right to access some data, or perform some action. Traditional access control mechanisms are all-or-nothing; once an application has the right to read a file, it can do anything with that file's data. In contrast, Differential Information Flow Control (DIFC) enforces more powerful policies, such as permitting an application to read a file, but disallowing the broadcast of the contents of that file over an unsecured network channel.

As an example of the DIFC model, consider Alice and Bob who want to schedule a meeting while keeping their calendars mostly secret. Alice and Bob each place a secrecy label on their calendar file, and then only a thread with those secrecy labels can read it. Once a thread has a secrecy label, it has been tainted by that label, and can no longer write to an unlabeled output, such as standard output or the network. If the thread has the capability to declassify the information, it may remove the secrecy label and then write the data to an unlabeled output. In the calendar example, the program obtains both Alice and Bob's secrecy label to read both calendar files, but then it cannot remove the labels. When the thread is ready to output an acceptable meeting time, it must call a function that then declassifies the result. The declassification function checks that its output contains no secret information. For example, the output is simply a date and does not include other information of Bob's calendar.

In a DIFC system, any principal can create a new tag for secrecy or integrity. Principals assign labels to data objects - data structures (arrays, list, etc...) and system resources (files and sockets). Secrecy label will prevent the data to be disclosed. Integrity label will assure that the data is not modified since it was endorsed by the principal. For example, a web application might create one secrecy tag for its user database and a separate secrecy tag for each user's data. The secrecy tag on the user database will prevent authentication information from leaking to the network. The tags on user data will prevent a malicious user from writing another user's secret data to an untrusted network connection. Also, if Microsoft endorse a data file and integrity is preserved, a user can trust the file content if it trust Microsoft label.

Programmers express security policies by labeling data with secrecy and integrity, and access labeled data scoped in security regions.

It exists several applications that implements the DIFC model, like HiStar, Flume, and Laminar. I will just focus on Flume and Laminar.

Laminar

Laminar is a Decentralized information flow control (DIFC) that combines the language-level (PL) and operating-system level (OS). It is the first system to implement decentralized information flow control using a single set of abstractions for OS resources and heap-allocated objects.

The Laminar OS extends a standard operating system with a Laminar security module for information flow control. The Laminar OS security module governs information flows through all standard OS interfaces, including through devices, files, pipes and sockets. The OS regulates communication between threads of the same or different processes that access the labeled or unlabeled system resources or that use OS inter-process communication mechanisms, such as signals. OS enforcement applies to all applications, preventing unlabeled or non-Laminar applications from circumventing the DIFC restrictions.

The Laminar VM regulates information flow between heap objects and between threads of the same process via these objects. These flows are regulated by inserting dynamic DIFC checks in the application code. Because the Laminar VM regulates these flows within the address space, the OS allows data structures and threads to have heterogeneous labels. All threads in multithreaded processes without a trusted VM must have the same labels and capabilities.

The Laminar OS exports security system calls to the trusted VM for capability and label management. The VM and the OS do not allow code outside the security region to access labeled data objects. During the execution of a security region, the VM gives the thread the labels and capabilities of the security region so that the OS can mediate access to system resources according to the security region's labels. Security regions are not visible to the OS, so the thread itself must have the labels and capabilities that is given by the VM. At the end of the security region, the VM restores the thread's original capabilities and labels.

It is necessary to modify Jikes RVM and the Linux operating system so that Laminar can provide DIFC. Jikes RVM is a Java VM implemented in C++. Airavat uses Laminar to provide security policies. Laminar is publicly available the the Jikes Archives.

Flume

Flume is an open source web application secure infrastructure based of DIFC model. Flume's design is a component within the kernel rather than an entire Operating System like HiStar. It is immediately visible that, any process running outside of Flume is vulnerable because of this. Currently there are two different implementations of Flume, one for Linux and one for OpenBSD. The Linux implementation runs as a component within the kernel, while the OpenBSD version utilizes systrace system calls.

A typical Flume application consists of processes of two types. Untrusted processes do most of the computation. They are constrained by, but possibly unaware of, DIFC controls. Trusted processes, in contrast, are aware of DIFC and set up the privacy and integrity controls that constrain untrusted processes. Trusted processes also have the privilege to selectively violate classical information flow control - for instance, by declassifying private data (perhaps to export it from the system), or by endorsing data as high integrity.

Flume is embedded in FlumeWiki, which is created by MoinMoin. Flume can be 34% slower in writes than Moin is, and 43% slower than reads than Moin is.

Linux Secure Modules (LSM)

Both Flume and Laminar uses LSM to hook into the kernel to allow customization authorization rules. LSM was designed to provide the specific needs of everything needed to successfully implement a mandatory access control module, while imposing the fewest possible changes to the Linux kernel.

Flume's LSM policy disallows all direct access to file systems by confined processes. Fork is blocked in Flume, due to the fact that all process spawning must occur from within the dedicated spawner

Laminar uses LSM to intercept inode and file accesses, which are used to perform operations on files and file handlers, and it do a straigthforward check of the secrecy rules and labels.

Monday, 12 January 2015

Perspectives on the CAP theorem

Paper here

This paper is a reflection about the CAP theorem and it places the theorem within a broader context of distributed computing theory.

CAP theorem appeared in the context of web services. A web service is implemented by a set of servers, perhaps distributed over a set of geographically distant data centers. The CAP theorem, also known as Brewer's theorem, states that it is impossible for a distributed computer system to simultaneously provide all three of the following guarantees:

  1. Consistency (all nodes see the same data at the same time)
  2. Availability (a guarantee that every request receives a response about whether it succeeded or failed)
  3. Partition tolerance (the system continues to operate despite arbitrary message loss or failure of part of the system)

There is a trade-off that has been much discussed ever since: the impossibility of guaranteeing both safety, and liveness in an unreliable distributed system.

Consistency (as defined in the CAP Theorem) is a classic safety property: every response sent to a client is correct. I remember that an algorithm is safe or consistent if nothing bad ever happens. In case of web services, consistency means to return the right response to the client request. Thus, consistency depends on the service provided.

Availability is a classic liveness property: eventually, every request receives a response. An algorithm is live if eventually something good happen. Availability simply means that each request eventually receive a response. Obviously, a fast response is better than a slow response, but this property can be acceptable, or not, according to the system requirements. In a real-time system, a response that is late can be sufficient to create problems. In a purchase in the Amazon site, this is acceptable.

Partitions, crash failures, message loss, malicious attacks can turn a system unreliable. The CAP theorem only cares about network partitions. Network partitions can happen because some servers become unreachable, or some router partitioned the network. This is a common failure that an effect on distributed systems.

The paper states that it is impossible to achieve both consistency and availability in an unreliable distributed system, and it is necessary to sacrifice one of these two properties, or even both properties.

  1. There are systems that guarantee strong consistency and provide best effort availability.
  2. There are systems that guarantee availability, and provide best effort consistency.
  3. Some systems may sacrifice both consistency and availability.

This sacrifice is acceptable if the characteristics of the service that is designed accepts it. If not, the service is not feasible.

Tuesday, 6 January 2015

Privacy Integrated Queries (PINQ)

Paper here

PINQ is a LINQ-like1 API for computing on privacy-sensitive data sets, while providing guarantees of differential privacy for the underlying records. It is a trustworthy platform for privacy-preserving data analysis, and provides private access to arbitrarily sensitive data. A user just need to write a declarative program and it does not need to worry about the privacy of the data. To guarantee that querying will not disclose data, there is the PINQ layer that is an interface that allows to do some statistical query.

There are a sheer number of research papers on privacy-preserving data analysis, resulting in many distinct approaches. Each work needs a substantial effort in design, analysis, and implementation. The programs written in PINQ do not.

There are 2 approaches to protect data against people that what to check all the data besides differential privacy:

  1. Using anonymization allows that the data provider does not need to trust or understand the analyst that checks the data. The drawback is that the analyst does not get access to the rich and high fidelity data. They have to work with the data that is reduced, specially in quality. It is not easy to sanitize data.
  2. The analyst sends the analysis to the data set, and it runs raw data. The drawback is that the provider must allow the analyst to execute together with the full data.

With PINQ, the analysis run with the full data and the provider does not need to understand or trust the analyst. PINQ is an intermediate layer that the analyst interact to get the results he want. The PINQ will guarantee that the responses does not disclose private information.

To help understanding the importance of PINQ, we must look to a LINQ example that count the number of persons that like the sport cricket. Lets suppose that the program was written by an analyst (or user).

// Open sensitive data set with security
IQueryable<SearchRecord> queryLog = OpenSecretData(password);

// Group queries by User and identify cricket fans who queried the word at least 5 times
var bigUsers = queryLog.Where( x=> x.Query == "cricket")
                       .GroupBy( x=>x.User)
                       .Where(x=> x.Count() > 5);

// Transform each cricket fan into the Zip code by IP address
var userZIPs = bigUsers.Join(IPandZIP, x=>x.IP, y=>y.IP, (x,y) => y.ZIPCode);

// Count up ZIP codes containing at least 10 cricket fans
var PopularZIPs = userZIPs.GroupBy(x=>x)
                          .Where(x=>x.Count() > 10);

Console.WriteLine(PopularZIPs.Count());

The userZIPs attribution contains the IP address and Zip code of the users. This is considered sensitive data that should never get to the user side.

Now, lets look to the PINQ example.

// Open sensitive data set with security
PINQueryable<SearchRecord> queryLog = OpenSecretData(password);

// Group queries by User and identify cricket fans who queried the word at least 5 times
var bigUsers = queryLog.Where( x=> x.Query == "cricket")
                       .GroupBy( x=>x.User)
                       .Where(x=> x.Count() > 5);

// Transform each cricket fan into the Zip code by IP address
var userZIPs = bigUsers.Join(IPandZIP, x=>x.IP, y=>y.IP, (x,y) => y.ZIPCode);

// Count up ZIP codes containing at least 10 cricket fans
var PopularZIPs = userZIPs.GroupBy(x=>x)
                          .Where(x=>x.Count() > 10);

Console.WriteLine(PopularZIPs.Count(є));

The є (Epsilon) guarantees the differential privacy to the set PopularZIPs. If I put є=0, it means 0 accurate, and the noise will be unbounded. Most realistic is using 0.1. The є depends on the sensitivity of the data and how useful is the analysis. We could test it with different values, and see how it costs in terms of privacy. You can look this paper 2 and get more information about choosing the Epsilon.

You can question why the PINQ example still uses the sensitive fields IP and the ZIPCode? Should not this fields be removed, or masked by PINQ?

The main point of PINQ is that you are allowed to use these sensitive fields, but because PINQ perturbs aggregates before they are returned to the user, it still provides differential privacy. That is, its privacy guarantees are not the result of avoiding sensitive fields, but rather about writing queries that the data provider can be sure will have differential privacy guarantees.

Transformations and aggregations

Transformations and aggregation must respect differential privacy. Here is some examples of transformations in LINQ:

Where and Select

IQueryable<T> Where(Expression<Func<T,bool>> predicate) IQueryable<S> Select<S>(Expression<Func<T, S>> function)

Output each changes by at most one with a change to input. We can query as many times as we like, that we do not compromise subsequent queries.

GroupBy

IQueryable<IGrouping<K,T>> GroupBy<K>(Expression<Func<T,K>> k)

Changing an input record can change at most two output records. The output is a coalition of two objects. When I use group by is a set, I am taking the element out of that set, and adding to a group object. Thus, 2 objects are modified. If we have a query with a chain of GroupBy, this increases by a factor of 2. We can break differential privacy by applying a chain of GroupBy.

Join

IQueryable<R> Join<S,K,R>(IQueryable<S> innerTable,
                          Expression<Func<T,K>> outerKey,
                          Expression<Func<S,K>> innerKey,
                          Expression<Func<T,S,R>> reducer)

Joins are a binary transformation that takes 2 inputs and 2 keys selection functions, and will output a pair of records using the reduction function. From a privacy point of view this is very dangerous. E.g., if I have a yellow page in one hand, and my medical record on the other hand, and I join them together, I splatter my medical record in each of the yellow pages entries.

A PINQueryable contains only two member variables: IQueryable source; // any LINQ data source PINQAgent agent; // acts as privacy proxy

The PINQAgent is a very important part of PINQ, controlling how much accuracy the analyst should have access to. The agent controls a resource, the units of differential privacy, and is given the opportunity to reject any depletion of this resource. It accepts or rejects requests for additional epsilon.

You can check more information about PINQ API here.


  1. LINQ is a database style access language for .NET applications.

  2. Differential Privacy: An Economic Method for Choosing Epsilon, Justin Hsu et al.

Friday, 2 January 2015

Airavat: Security and Privacy for MapReduce

Paper Here

This paper presents Airavat, a MapReduce-based system which provides strong security and privacy guarantees for distributed computations on sensitive data. It integrates access control and differential privacy to protect data in the cloud.

Anonymization is insecure against attackers with external information and can lead to leaks of the data. Airavat uses access control policy and differential privacy to guarantee anonymization of the data.

Airavat can be used in the multiple applications that need to have the data available in the cloud and, at the same time, it is necessary to keep the privacy whilst performing data mining. Clouds with genomic data do not want to disclose their content. An algorithm called Cloudburst uses Airavat for mapping next-generation sequence data to the human genome.

Data providers control the security policy for their sensitive data. Users without security expertise can perform computations on the data, but Airavat confines these computations, preventing information leakage beyond the provider's policy. The prototype is efficient, with run times on Amazon's cloud computing infrastructure within 32% of a MapReduce system with no security.

Airavat can run on a cloud infrastructure, like AWS. The application trusts the cloud infrastructure, and the data provider that uploads the data, but it does not trust the computation provider because it can be malicious (even without any purpose) and perform damaging tasks.

Airavat is built in Java and it uses Jikes RVM and Sun JVM. Jikes RVM is not mature enough to run code as large and complex as the Hadoop framework. Therefore, it uses Hadoop's streaming feature to ensure that mappers run on Jikes and that most of the framework executes on Sun's JVM.

Programming model

They have split MR into untrusted mapper + trusted reducer. Several techniques are added to the mappers so that they can be trusted when running untrusted code. Reducers are trusted because they compute over data already formatted according to Airavat's policies.

Mappers

There are few challenges that it must be considered in the map side:

  1. The untrusted mapper code copies data and send it over the network.
  2. The output of the computation in the mapper code can be also an information channel. E.g., there could be a piece of information in the map output data that can identify the person.

Mandatory access control and differential privacy can be used to prevent leaks, and have end-to-end privacy. The former prevent leaks through storage channels like network connections, or files. The latter, prevent leaks through the output of the computation.

Airavat confines the untrusted code by adding mandatory access control (MAC) and policy. The Airavat runs over SELinux ( Linux kernel security module that provides mechanisms for supporting access control security policies), and this OS allows a user to define policies to create trusted and untrusted domains in order to restrict interactions. E.g., mappers that run untrusted code cannot access the network.

Airavat labels the input, intermediate values, and output data, and every time a user access a label, there will be security checks so that untrusted code do not leak any data. Just the reduce output is not labelled.

Access control solves many problems, but it is not enough. The output of the data at the end of the execution can also disclose private data by mistake when the label is removed. So, it is necessary mechanisms to enforce that the output does not violate an individual's privacy - differential privacy.

A mechanism is differentially private if every output is produced with similar probability whether any given input is included or not. A way to do is is adding noise to the calculation. E.g., the result of the computation f over x will be f(x)+noise. Differential privacy uses the notion of function sensitivity. A function sensitivity measures the maximum change in the function's output when any single item is removed from or added to its dataset. Differential privacy works better with low-sensitivity computations. The sensitivity computation allows that slightly changed input will produce similar output, so that the malicious user cannot disclose data by perfidious data comparisons.

Mappers in Airavat can be any piece of Java code. A range of mapper outputs must be declared in advance before starting a job to estimate the sensitivity and determine how much noise is added to outputs to ensure differential privacy. Also, if a mapper produces a value outside the range, it is replaced by a value inside the range and the user is never notified in order prevent to possible information leak.

Airavat ensures that mapper invocations are independent - a single input record is allowed to affect the key/value pairs output by the mapper. It is not allowed that a mapper may store an input and use it later when processing another input. If this happened, sensitivity would go totally wrong because one input could affect other outputs. They modified the JVM to enforce mapper independence. Each object is assigned an invocation number, and JVM prevents reuse objects from previous invocation.

Reducers

Airavat trusts reducers because they work with the output of trusted mappers, and they will ensure differential privacy for the reducer's output. Reducers are used for general computation, and so the exact sensitivity for differential privacy can be the number of distinct keys in the map output. There is no need to estimate the sensitivity before executing the reduce tasks like in the map side.

Using differential privacy in the reducer output ensures data privacy when the Airavat's labels are removed.

Airavat + SELinux

Mandatory access control (MAC) is a useful building block for distributed computations. MAC-based operating systems enforce a single access control policy for the entire system. This policy, which cannot be overridden by users, prevents information leakage via storage channels such as files (e.g., files in HDFS), sockets, and program names. Mandatory access control requires that only someone who has access rights to all inputs should have access rights to the output.

SELinux is a Linux kernel security module that provides a mechanism for supporting access control security policies using mandatory access controls (MAC).

Other studies, like in PINQ, or other algorithms (e.g. 1, can work in a normal operating system. In the paper, Airavat works over SELinux distribution to control the access rights to the data to prevent data leakages.

Airavat trusts the cloud provider and the cloud-computing infrastructure. It assumes that SELinux correctly implements MAC and relies on the MAC features. Whilst running malicious computations with untrusted mappers that has full control over the mapper code, it could be possible to leak information in the keys. Therefore, Airavat never outputs keys produced by untrusted mappers. Instead, noise response is returned.

For example, Airavat can be used to compute the noisy answer to the query "What is the total number of iPods and pens sold today?" because the two keys iPod and pen are declared as part of the computation. The query "List all items and their sales" is not allowed in Airavat, unless the mapper trusted. The reason is that a malicious mapper can leak information by encoding it in item names. Trusted Airavat reducers always sort keys prior to outputting them. Therefore, a malicious mapper cannot use key order as a channel to leak information about a particular input record.

A malicious mapper may attempt to encode information by emitting a certain combination of values associated with different keys. Trusted reducers use the declared output range of mappers to add sufficient noise to ensure differential privacy for the outputs. In particular, a combination C of output values across multiple keys does not leak information about any given input record r because the probability of Airavat producing C is approximately the same with or with r in the input dataset. In other words, a difference of one output in the two queries will produce equivalent results.

Conclusion

In summary, MapReduce was modified to support mandatory access control, it was created SElinux policy (domains for trusted and untrusted programs), and the JVM was modified to enforce mapper independence.

Having to change the JVM to run the Airavat may seems that it is not very wise, because it is easier to restart the JVM ( maybe a run through the garbage collection) before running every map task. They prefer this way because this option would give higher overhead, but this is a reasonable cost if we want high security.

See PINQ

Wednesday, 17 December 2014

Differential privacy

Book here and paper here

Wikipedia explains well what is differential privacy, and it is difficult to add more information about it. What I can do, is try to explain in my own words what it is differential privacy, and why it is useful for computer science.

Differential privacy was created to keep data anonymization of public records and still keep querying the data to get results. E.g., imagine there's a table that contains a list of persons that have diabetes, and this list is sorted alphabetically, and the user is just allowed to do statistical queries, but never query an individual, or even get the names.

Name Diabetes
Peter 0
John 1
Rachel 0
Zoe 1

Imagine that there is an hacker that want to know if Zoe have diabetes, and he knows that the Zoe's entry is in the last place. He could do 2 queries that would do partial sums, and find the result. Let's call this query Q(i) that sums the values until position i. He could do Q(4) − Q(3) and now he knows if Zoe has diabetes or not.

Differential privacy basically protects sensitive data to keep anonymous, and even if an hacker would make two queries to get the Zoe's value, he would never get the answer. This technique uses terms like ε-differential privacy and sensitivity.

"'Differential' refers to the difference between two worlds — one in which you allow your sensitive data to be included in the database and one in which you don't," McSherry said. The two worlds cannot be made to work out exactly the same, but they can be made close enough that they are effectively indistinguishable.

ε-differential privacy

The ε-differential privacy guarantees that the presence or absence of an individual entry in the database will not alter the output, and thus assures privacy of individual information in an theoretic sense. Basically, the presence or the absence of Zoe entry in the database will not change significantly the result. If it would, the hacker could do the mentioned attack to get Zoe's info. The algorithms that are created to deal with differential privacy, they try to give accuracy of the statistics estimated in a privacy-preserving manner.

We can consider differential privacy at different levels of granularity. Lets suppose that we have a graph database that encode a social network: each individual is represented by a vertex in the graph, and friendships between individuals are represented by edges. We could consider differential privacy at a level of granularity corresponding to individuals: that is, we could require that differentially private algorithms be insensitive to the addition or removal of any vertex from the graph. This gives a strong privacy guarantee, but might in fact be stronger than we need. The addition or removal of a single vertex could after all add or remove up to n edges in the graph. Depending on what it is, we hope to learn from the graph. Insensitivity to n edge removals might be an impossible constraint to meet. We could on the other hand consider differential privacy at a level of granularity corresponding to edges, and ask our algorithms to be insensitive only to the addition or removal of single, or small numbers of edges from the graph. This is of course a weaker guarantee, but might still be sufficient for some purposes.

Informally speaking, if we promise ε-differential privacy at the level of a single edge, then no data analyst should be able to conclude anything about the existence of any subset of 1 / ε edges in the graph. In some circumstances, large groups of social contacts might not be considered sensitive information: for example, an individual might not feel the need to hide the fact that the majority of his contacts are with individuals in his city or workplace, because where he lives and where he works are public information. On the other hand, there might be a small number of social contacts whose existence is highly sensitive In this case, edge privacy should be sufficient to protect sensitive information, while still allowing a fuller analysis of the data than vertex privacy. Edge privacy will protect such an individual's sensitive information provided that he has fewer than 1/ε such friends.

When ε is small there is a better accuracy. In detail, (ε, 0)-differential privacy asserts that for all pairs of adjacent databases x, y and all outputs o, an adversary cannot distinguish which is the true database on the basis of observing o. When ε is large, it just says there exist neighboring databases and an output o for which the ratio of probabilities of observing o conditioned on the database being, respectively, x or y, is large. An output of o might be very unlikely to happen in a real world. The databases had to be deliberately contrived. There is a growing literature on improving the accuracy for a certain ε. Notice that there is not yet a lower bounds on the accuracy for a given ε, because there exists many ways to add noises to the output.

Sensitivity

A function's sensitivity measures the maximum change in the function's output when any single item is removed from or added to its dataset. Lets consider two examples to understand how sensitivity works.

Consider the function SUM that take integers as inputs and return the sum of them. If all the inputs are 0 and 1, then the sensitivity of the function is low, because the results varies at most 1. Therefore, only little noise needs to be added to the sum to achieve privacy. On the other hand, if we have one input of 1000 and the rest are 0 and 1, a lot of noise must be added to the output in order to hide whether the 1000 was among the inputs.

Differential privacy works best for low-sensitivity computations, where the maximum influence any given input can have on the output of the computation is low.

Sensitivity on aggregated data

The Subsample and Aggregate technique yields a method for "forcing" the computation of a function f(x) to be insensitive, even for an arbitrary function f. Thus, proving privacy will be trivial.

Accuracy depends on properties of the function f and the specific data set x. In particular, if f(x) can be accurately estimated with high probability on f(S), where S is a random subset of the elements in x, then accuracy should be good.

In this Figure, several f computed partially the data. The function f is computed exactly, without noise, independently on each block. The intermediate outcomes f(B1), ... , f(Bm) are then combined via a differentially private aggregation mechanism — typical examples include standard aggregations, such as the α-trimmed mean 1, the Winsorized mean 2 and the median, but there are no restrictions — and then adding Laplace noise scaled to the sensitivity of the aggregation function in question.

The key observation in Subsample and Aggregate is that any single element can affect at most one block, and therefore the value of just a single f(Bi). Thus, changing the data of any individual can change at most a single input to the aggregation function. Even if f is arbitrary, the analyst chooses the aggregation function, and so is free to choose one that is insensitive, provided that choice is independent of the database. Privacy is therefore immediate.

Much work in statistics and machine learning addresses the problem of model selection: Given a data set and a discrete collection of “models,” each of which is a family of probability distributions, the goal is to determine the model that best "fits" the data. The function f might be choosing the best model from the given set of m models, a process known as model fitting, via an arbitrary learning algorithm. The choice of aggregation function can even depend on the database, but the selection must be made in a differentially private fashion. The privacy cost is then the cost of composing the choice operation with the aggregation function.

Where differential privacy is used?

Anonymizing public records makes sense if we are dealing with biobank or customers data. The database of this data have a size of tera, petabytes, or even more. Hospital, or private companies query these sensitive data to use it in a new research or market study, and sometimes the data is spread around different clouds. The differential privacy is an important issue in the era of big data, because there is the need to access this data without disclosing personal information.

A biobank is a type of biorepository that stores biological samples (usually human) for use in research. When there is the need to do some study using biobanks, there is the need to not disclose private data. Institutions that hold these repositories, even governments, are very sensitive to disclosing data. There are also studies that need to aggregate data from different repositories located in different places and are connected through internet. Differential privacy is really important because it prevents that responses will not disclose private information.

What differential privacy does not promise

Differential privacy does not guarantee privacy where none previously exists. If the data that is queried is not protected with privacy techniques, there is nothing to do about it. Differential privacy just guarantees that one participation in a survey will not disclose the data based on the response.


  1. The α-trimmed mean is the mean after the top and bottom α fraction of the inputs have been discarded.

  2. The Winsorized mean is similar to the α-trimmed mean except that, rather than being discarded, the top and bottom α fraction are replaced with the most extreme remaining values.

Monday, 1 December 2014

On the Efficiency of Durable State Machine Replication (SMR)

Paper here

This is a paper that tries to evaluate what happens when we apply durability (logging, checkpoints, and state transfer) in a modular way inside a SMR.

In the Paxos algorithm, we have a set of servers that run a Total Order Multicast, e.g. Paxos agreement, and this protocol ensures that all servers run the same sequence of operations. In this paper, they modified the SMR, and make it durable (executions are save in disk). Even if all machines crash, the client will see the operation result after recovery. This kind of system is important because in many papers, people assume in implementing Paxos in some critical components to avoid single point of failure. We can see this is coordination services like Zookeeper. In some sense, Paxos/SMR are a critical component of modern internet-scale infrastructure.

Durability is important because sometimes we need to shutdown the system, and correlated failures1 happen more often than expected. In practice, application, replication and durability code is mixed with the code for efficiency, and can turn the code very complex, and very difficult to make it right.

There are already papers that talks about agreement protocols (PBFT, Zyzzyva), but they care about synchronizing replicas in the normal execution and do not consider durability (logging, checkpoints, and state transfer). In this paper, they talk about SMR, but the problem of tolerating faults is orthogonal to what they show (i.e., it is not really relevant here). They considered a system model of n replicas that tolerates at most f faults (crash faults: n > 2f, or arbitrary faults: n > 3f). The important thing is that they considered non-synchronous systems - systems that work with quorums - and they require that n − f replicas to set an message order and execute them.

In terms of programming model, the SMR on the server side and the service components can set and get the states. They have a key-value store as a memory table, and a disk log as a permanent file. When a read command is sent, you from read the memory table, and when the write command is sent, you write in the memory table and append the operation in the disk log.

If you use stable logging, writing to a disk cost time. The values below show the throughput of writing (4kB/sec) that is possible to do.

Memory 4772 w/sec
Async Disk 4312 w/sec
Sync Disk 63 w/sec
Sync SSD 1017 w/sec

We can see, they it is possible to write 4772 times per second into memory. Writing into an async disk takes up 4312 operations per second. The problem of writing in asynchronously is, if you are really unlucky, you can loose some operations. Buying SSDs cost a lot, and it solves partially the problem. The idea of the paper is to close the value as much as possible to the values of writing into memory.

If you have checkpoints, you need to control the size of the log for two reasons:

  1. We need to control the size of the log to be stored.
  2. It takes time to recover the log.

The solution is to have periodic snapshots. In SMR, the normal way is to take periodic snapshots in some log size.

When the snapshots happen, the service becomes slow and the system can become unavailable to accept client requests. A solution to this is to use copy-on-write 2, but this adds complexity to the code. Another solution is to use Fuzzy snapshots 3, but it requires additional disks.

When it is necessary to do a state transfer because a replica crashed and recovered, another replica is picked up to do the state transfer and the service must be stopped because we need a quorum to execute requests. A way to solve this is to have additional replicas to deal with recoveries, or avoiding answering client requests.

When we integrate the referred problems and solutions to the code, it can make the code very complex.

In the paper they show several solutions to take care of the same problem without changing the SMR code. Three new techniques are proposed, like parallel logging, sequential checkpoints and a collaborative state transfer.

Parallel logging

To have parallel logging, it is necessary that a reply should only be sent after the state and the log are updated. Also the disk bandwidth must be very good (it takes approximately the same time to write 1kB or 1MB).

What it is proposed is to have an agreement layer that keep ordering requests. The requests are batched, and, at same point, they are sent to a thread that appends the values to the memory table and to the disk log. After the thread finishes, the result is replied to the client.

Sequential checkpoints

Normally, checkpoints happen in a synchronized way - all replicas take the same checkpoint at the same time. The benefit from this solution is that it is very easy to recover a machine with the correct checkpoint. The drawback is that it can make the system unavailable to answer client requests. The authors propose the sequential checkpoint - each server do the checkpoint at a time and there are more than one replica doing checkpoint at the same time - in order to avoid perturbations of the system during this time. The drawback of the sequential checkpoint is that it complicates state transfer.

Collaborative State Transfer

The idea is to ask a several replicas sending parts of the current state. One replica will send the checkpoint, and the remaining ones will send different parts of the log. We can see the optimized solution in the figure (b). In the General CST, just one replica must send the state to the recovered host.

Evaluation

They implemented these features in a durability layer on BFT-SMaRt4 replication library.

In figure 9, they show the number of operations that happen in parallel logging. Since they are stressing the system to its maximum, we can see that with parallel logging the results are similar as a pure memory system (red line in Figure 9 (a)). Also, for large writes, the parallel logging in Disk can be slightly better than with SSD, because the disks absorb more data and can reach pure memory throughput. But, SSDs win in terms of latency.

In the Figure 10, we can see the throughput during sequential checkpointing. In these graphs there are 2 execution: one in high load, and another in medium load for 500MB or 1GB state. Just focusing in the high load in Figure 10 (b), we can see 2 perturbations in the logging related to lagging replica synchronization. In this case, there are some replicas that are in front of others, and so the system becomes slow so that the lagging replica start to pick-up the pace and participate on the agreement. In medium load, this problem never happens because there is always time for the replicas to keep up with the system.

In the Figure 11, we can see what happens when a replica is recovering. In this case, they are considering f = 1 (n = 4) with state validation.

When they try collaborative state transfer (CST), they try to spread the workload among the 3 replicas. We can still see perturbations, but it is still a better result than using a single state transfer. The CST is represented with the blue line, and we can see a perturbation between 90s-120s (Figure 10 (b)). The CST perturbation is one third of the single state transfer, and this happens because, in this execution, they are trying to get part of the logs with the same size as the checkpoints.

In the usual state transfer protocol (you ask the state from a single replica), they get 24% of the normal throughput during state transfer, and 60% with CST. Summing up, it is much better to use CST than sequential state transfer, but the protocol is more complex.

In conclusion, durable SMR is a fundamental technique for critical services, and it is complex and difficult to do it right. In this paper, the authors showed a way to deal with the durability overhead without breaking modularity and the SMR model. They also presented 3 techniques like, parallel logging, sequential checkpoints, and collaborative state transfer.


  1. http://psteitz.blogspot.pt/2011/10/correlated-failure-in-distributed.html

  2. Copy-on-write is the name given to the policy that whenever a task attempts to make a change to the shared information, it should first create a separate - private - copy of that information to prevent its changes from becoming visible to all the other tasks.

  3. Zookeeper takes a snapshots at random time while the system is processing. This works in Zookeeper because they do not synchronize checkpoints with logging and it uses idempotent operations. Even when you are recovering a replica and you have an operation on the logging or on the checkpoint, you can execute the operation many times as you want, that the final state will always be the same.

  4. https://code.google.com/p/bft-smart/