Sunday 28 September 2014

What good are models for (1993)

In this paper it is explained why distributed are hard to design and understand. The author start to show that in the past, similar science problems always could be solved by two approaches:
  • Experimental observation
  • Modelling analysis
These approaches masquerade a dichotomy between "theory" and "practice", and the best solution is when the 2 go along, and both can learn from each other. When devising a distributed system, it is necessary to create tractable models that are ruled by interaction that will support the analysis. Mathematical and logical formulas (theory), and computer simulations (practice) are a way to create models. These models will help to devise a solution. We must keep in mind that is easy to create accurate models, but it is hard to create tractable models. When building a model we must keep in mind that we must identify 2 things: - feasibility - cost The first one must be able to recognize an unsolvable problem lurking beneath a system's requirements, and the second one must identify the cost implications. Solving these 2 points, we have a yardstick with which we can evaluate any solution that it is devised. In the end, having defined a model, we have refined our intuition since the problem was introduced.
There are many problems that we can face when designing a distributed system. We must take care of coordination problems, or synchronous or asynchronous assumptions, or election protocols, or failure models. The author details the importance of the models in each problem.
  • For coordination problems, it is impossible to create a protocol that coordinate the behaviour of 2 processes, and those process must execute the same action, in a faulty scenario. This coordination problem could not be detected by intuition.
  • For synchronous or asynchronous assumptions, it is necessary to see if there are temporal limits about the speed of process execution, or message delivery delays. = for asynchronous systems, there are no temporal limits, or the temporal limit is 0 (when a message is processed immediately after being delivered). = for synchronous systems, it is necessary to coordinate messages and process execution with schedulers, take care of performance degradation. In practice, it is necessary to take care when implementing processes and communication channels.
  • Solving the election protocol in an asynchronous system is not difficult and expensive. In a synchronous system, it is possible to solve the election problem with only a single broadcast.
  • There are several failure models that assign responsibility for faulty behaviour to the system's components -- processors, and communication channels. A process can slow, or crashed, a message delivery can be delayed, and this is difficult to detect. So, in an asynchronous system, it is harder to deal with crash failures than fail-stop failures; in a synchronous system crash and fail-stop models are equivalent.
For practitioners, models can help to define a set of assumptions that they must take care when devising a solution. For theoreticians, the models will allow them to create accurate ones that deal with tractable problems.

Tuesday 23 September 2014

Summary of several papers

This post is composed by a set of summaries of papers that I have read. This can be useful to whoever read these papers.

Feedback Dynamic Algorithms for Preemptable Job Scheduling in Cloud Systems

2010LiFeedbackDynamicAlgorithms_WIC.pdf
we present a preemptable job scheduling mechanism in IaaS cloud systems. In addition we propose two feedback dynamic scheduling algorithms for this scheduling mechanism.
In our proposed cloud scheduling mechanism, every provider has a scheduler software running in its data center. These schedulers know the current statuses of VMs in their own clouds. The schedulers communicate with each other. Therefore they can make schedule decision.
When a job is submitted to a cloud, the scheduler first partitions the job into several tasks. Then for each task in this job, the scheduler decides a cloud used to execute this task based on the information from all other schedulers.
If the scheduler assigns a task to its own cloud, it will store the task in a queue. When the resources and the data are ready, this task’s execution begins. If the scheduler of cloud A assigns a task to cloud B, B's scheduler first checks whether its resource availabilities can meet the requirement of this task. If so, the task will enter a queue waiting for execution. Otherwise, the scheduler of B will reject the task.
Before a task in the queue of a scheduler is about to be executed, the scheduler transfer a disk image to all the com-puting nodes which provide enough VMs for task execution. When a VM finishes its part of the task, the disk image is discarded.
The resource allocation model
  • Advance Reservation (AR): Resources are reserved in advance. They should be available at a specific time;
  • Best-effort: Resources are provisioned as soon as possi-ble. Requests are placed in a queue.
  • Immediate: When a client submits a request, either the resources are provisioned immediately, or the request is rejected, based on the resource availabilities.
Dynamic list scheduling while(P not empty) { T=top(P); if(predecessors of T are done) { send task checking request to all schedulers receive the earliest resource available response from all schedulers find the cloud C with earliest estimated finished time assign T to cloud C remove T from P } else { wait for the finishes of the predecessor tasks of T } }
Dynamic min-min scheduling
Min-min is another popular greedy algorithm. The original min-min algorithm does not consider the dependencies among tasks. Thus in the dynamic min-min algorithm used in this paper, we need to update the mappable task set in every step to maintain the task dependencies.
while(there are tasks not assigned) update mappable task set P -> maintain task dependency for task v do send task v to other schedulers to find the cloud with earliest finish time for that task save data received in pair end for find the task-cloud pair with earliest finish time Assign task remove task from P update mappable task set P }
In each cloud
A scheduler uses a slot table to record execution schedule of all VMs in its cloud. When an AR task is assigned to a cloud, this scheduler will first check the resource availability in the cloud. If there are enough resources for this AR task, a set of required VMs are selected arbitrarily.

#

Heuristic Algorithms for Scheduling Independent Tasks on Nonidentical Processors

(não percebi este paper, mas também é de 1975) ibarra.pdf
concerned with the problem of finding a schedule whose finishing time is as small as possible (such a schedule will be called optimal).

#

Xen and the Art of Cluster Scheduling

2006FallenbackXenArtofClusterScheduling_SuperComputing.pdf
Create dynamic virtual cluster partitions using para-virtualization techniques, to deal with the conflicts between parallel and serial jobs.
The system dynamically adjusts to different types of job loads and offers easy and transparent use and configuration to both users and administrators. Xen Grid Engine (XGE) -our solution to satisfy the stated requirements of shared use cluster scheduling for parallel and serial jobs using virtualization technology.
The aim of XGE is to combine the advantages of cluster partitioning (simple but inefficient) and reservation with backfilling (complex customization, somewhat inflexible but more efficient).
Algorithms such as EASY backfilling, where pending short jobs are allowed to move ahead, provided they do not delay the first job in the queue [7], or more conservative versions of backfilling, in which jobs move forward provided they do not delay any previously scheduled job [8], are used to fill idle time created when parallel and serial jobs are mixed.
XGE uses the Xen hypervisor to separate every compute node of the cluster into a host platform and two virtual platforms.
We introduced a novel approach for cluster scheduling using operating system virtualization techniques. We presented a solution to enable the execution of short running parallel jobs within a cluster filled with serial jobs. Ensures fair use and enables maximum system performance by avoiding idle times used for reservation or static cluster partitioning.
Problems to solve:
1) User should be entitled to speedy job execution within their quotas. 2) Unused CPU time of a user may be consumed freely by other users when needed. 3) To maximize overall cluster performance, serial jobs should run whenever possible. 4) Parallel jobs should have waiting times as short as possible. 5) To minimize response time, parallel jobs should get as many CPUs as needed (definitively more than 32) without increasing the waiting time or reducing the overall cluster performance. 6) Any modification of the scheduling strategy should be easy to use and transparent for administrators and users to avoid arguments.
A host can be running a domU serial and a domU parallel. When tasks are distributed, the topology of the cluster is taken in consideration. The administrator can partitioning the cluster for better suit (Problem 1 and 4 solved, in the same way traditional partitioning of a cluster would, but reduces the overall performance of the cluster, since idle nodes in parallel mode will not be used by waiting serial jobs). Requirement 2 is only partially met, since only unused resources within the same group can be shared. Requirements 3 and 5 are not met outside of the quota based partitioning.
If the nodes in a subset are overloaded and jobs must be queued, the idle nodes of the other subset can provide their computing power to that queue. The XGE automatically transfers nodes from the parallel virtual cluster into the serial virtual cluster if there are idle nodes available. The XGE does not transfer idle serial nodes (which are very rare in any case) to the parallel cluster. (This extension meets requirements 2 and 3 for serial jobs).
The remaining problem that serial jobs can acquire idle parallel resources but parallel jobs can not acquire free serial resources stems from two issues : first, there are hardly any free serial resources and second, a node on which a parallel job is working cannot be reclaimed by the serial cluster, since parallel jobs cannot simply be hibernated by Xen because many parallel job managers do not recover from partial node failure and messages in transit would be lost.

#

Cloud federation and intercloud

cloudfederation.pdf
Interclouds refers to a mesh of clouds that are interconnected based on open standards. With this vision, all clouds will have common understanding of how applications should be deployed. Federation of clouds is a mesh of clouds that uses a vendor version.

#

Using inaccurate estimates accurately

2010TsafrirUsingInaccurateEstimates_JSSPP.pdf
Backfilling drastically improves the system utilization by allowing jobs to run ahead of their time, provided they do not delay higher-priority jobs. But in order to do so, backfill systems require users to estimate how long their jobs would run.
To evaluate the impact of inaccuracy, researchers associated each job with artificial estimate that is a multiple of the actual runtime r with some "badness" factor F, such that e=r*F.
We thus proclaim that the popular F-model is inappropriate for being used in studies that wish to learn the affect of inaccurate user estimates. Researchers should stop using multiples of actual runtime as estimates, or else they would likely get invalid results . To get trustworthy results, researchers should preserve the modal nature of user estimates.

#

Characterizing Cloud Federation for Enhancing Providers' Profit

2010-Goiri-CharacterizingCloudFederationforEnhancing_Providers-IEEE.pdf
Cloud federation has been proposed as a new paradigm that allows providers to avoid the limitation of owning only a restricted amount of resources, which forces them to reject new customers when they have not enough local resources to fulfill their customers’ requirements.
We present an analytical model that characterizes Cloud federation and can be used to drive provider’s decisions about resource outsourcing, insourcing, and node shut-down.
Provider requires a minimum utilization and a minimum price per VM in order to be profitable when all the nodes are operative. In addition, local resources are preferred over outsourced resources, though the latter can enhance the provider's profit when the workload cannot be supported locally.
Based on these results, we plan to develop a scheduler for a real Cloud system that uses the presented characterization in order to take resource management decisions.
We analyse the impact of federation as a mechanism for maximizing Cloud providers’ revenue in a scenario that federates Private and Public Clouds.
There's a global Scheduler on each provider that is responsible for deciding the placement and the allocated resources for all the VMs running in that provider. The Scheduler can shut down nodes that remain unused in order to reduce power consumption in the provider
They also analyse the revenues comparing the costs of using.
Having the option to offer unused resources to other providers, or to shut down them to reduce power consumption, the provider could doubt on which is the more profitable decision. The scheduler takes decisions to obtain better profit.
I think it misses the calculation of power consumption based on computer characteristics and CUE (and not PUE), to the hours used, since they only consider cost of maintaining all the nodes in the provider up during a period of time.

#

Dynamic job Scheduling in Cloud Computing based on horizontal load

2011-Paul-DynamicJobSchedulinginCloudComputing-InternationJournal_Computing.pdf
Not well-written article
Scheduling mechanism which follows the Lexi-search approach to find an optimal feasible assignment .
The proposed method considers the scheduling problem as the assignment problem in mathematics where the cost matrix gives the cost of a task to be assigned into a resource. Here cost has been considered as credit or the probabilistic measurement thus only the processing time of a job is not been given importance but the other issues are considered such as the probability of a resource to be free soon after executing a task so that it will be available for other waiting job. Job which has the highest probability to get a resource as well as the resource which fits better for a job are assigned in a manner that one resource get one job at a time. The load balancing mechanism in the central middleware reduces the overhead of scheduling on a single middleware by partitioning the job queue thus scalability issues is well maintained and making the replication of the partitioned job queue ensures the fault tolerant in the cloud since if any of the client fail then that job could be reassigned into another client by another local middleware as the local middleware interact each other for every job updates.
First the requests or job coming from the user side are stored in a job pool or the central are partitioned and making the replication of these partitioned jobs into their local middleware, ensures the fault tolerant by the internal interaction among the nodes. The aim is to find that assignments of the resources to the jobs for which the corresponding time of completion of all jobs is the minimum .

#

An Integrated Approach to Parallel Scheduling Using Gang-Scheduling, Backfilling and Migration

Several techniques to enhance job scheduling for large parallel systems. We started with an analysis of two commonly used strategies: backfilling and gang-scheduling.
We showed how the two could be combined into a backfilling gang-scheduling (BGS) strat-egy that is always superior to its two components when the context switch overhead is kept low. With BGS,we observe a monotonic improvement in job slowdown, job wait time, and maximum system utilization
We have demonstrated that both plain gang-scheduling and backfilling gang-scheduling benefit from migration.

#

The Dynamics of Backfilling: Solving the Mystery of Why Increased Inaccuracy May Help

2006-Tsafrir-TheDynamicsof_Backfilling-IEEE.pdf
This paper shows that, when the estimation isn't accurate, smaller jobs tends to delay the execution of bigger ones. Much of this research used the "f-model", in which estimates are assumed to be some multiple of the real runtime, and led to some surprising conclusions, such as the claim that inaccurate estimates result in improved performance.
Larger f (or when multiplying [real] estimates by two), jobs with long runtimes can have very large runtime overestimation, which leaves larger ’holes’ for backfilling shorter jobs. Overestimation impacts both the jobs that are running and the jobs that are waiting.
Bigger f implies more jobs that enjoy backfilling. On average, these jobs are longer and wider.
While bigger f means more backfilling (which short jobs enjoy more than longer ones), the bigger holes do in fact allow longer jobs to backfill. Under the random model, the bigger thef,themoreitis probable the scheduler would erroneously view short jobs as long and vice-versa.

#

What Good are models?

There are 2 ways to define an intuition about new domain:
Experimental observation: we build thins based on observation. Even we might not understand completely, we try to imitate. Modeling and analysis: we formulate the model simplifying the object of study and postulate set of rules. We then analyze the object and infer consequences.
This infers to the theory/practice theory. Practitioners complain that they learn little from theory. They generalize from experience and concentrate on the wrong attributes. Theoreticians simplify too much when defining a model. But the reality is that, without observation we won't start the models, and without theory we won't master the complexity of a model.
We seek 2 questions about building models: feasibility: what class problems can be solved? cost: how expensive the solution is?
To devise a model we need to: 1 - recognize an unsolvable problem 2 - knowing the cost implication 3 - knowing the gain of the solved problem.
synchronous vs asynchronous systems every system is asynchronous by default, if we don't make assumptions about speeds and message delivery delays. If we have bounds about that, it's a synchronous system.

Implementing Fault-Tolerant Services Using Implementing Fault-Tolerant Services Using

dcs10-garg.pdf
This paper describes a method to implement fault-tolerant services in distributed systems based on the idea of fused state machines. The theory of fused state machines uses a combination of coding theory and replication to ensure efficiency. Assuming k different state machines, pure replication based schemes require k(f+1) replicas to tolerate f crash faults in a system and k(2f+1) replicas to tolerate f Byzantine faults. For crash faults, we give an algorithm that requires the optimal f backup state machines for tolerating f faults in the system of k machines. For Byzantine faults, we propose an algorithm that requires only kf+f additional state machines, as opposed to 2kf state machines.
If we send every symbol f+1 times, f erasures(detectable corruption of symbols) can be tolerated.

CIEL - a universal execution engine for distributed data-flow computing

2011-Murray-CIELauniversalexecutionenginefordistributeddata-flowcomputing-NSDI.pdf
This work presents CIEL - a universal execution engine for distributed data-flow program. CIEL allows to computere iterative and reducrsive algorithms. It buils dynamically a DAG.
Hadoop MR jobs are chained together and the latency multiplied. Mahout is used for parallel iterative computations with large inputs. For each iteration of the job, map output submits a jobs and then converge results. The Mahout does not benefit from transparent fault-tolerance.
This paper talks about other tools:
Dryad allows data flow to follow a DAG, but it must be fully specified (not dynamic) before starting the job. It is not supported a data-dependent control flow, i.e, depending on the output a direction is taken in the flow.
Pregel is a Bulk Synchronous Parallel (BSP) system designed for executing graph algorithms (such as PageRank). For each vertex in the graph it is computed a function.
CGL-MapReduce is a new implementation of MapReduce that caches static data in RAM across several MapReduce jobs.
Haloop extends Hadoop with the ability to evaluate convergence function on reduce outputs.
Piccolo is a new programming model for data parallel programming that uses partitioned in-memory kay-value table to replace the reduce phase.
CIEL is the first framework to support dynamic control flow, task dependencies, fault tolerance, data locality and transparent scaling. CIEL execute tasks that belongs to a job. CIEL contains 3 tables: object table that saves the produced objects to be consumed later, a task that is a non-blocking atomic computation that executes one single machine, and references that are names that points to the object. A task has dependencies.
Dynamic task graphs save relations between tasks and objects. object -> task means that the task depends on the object. task -> object means that the task is expected to output the object. As the job runs, new tasks are added to the dynamic task graph, and the edges are rewritten.
CIEL is composed by a master that maintains the current state of the dynamic task graph in the object table and task table. And workers that execute tasks and stores objects. The master schedule is responsible for making progress in CIEL computation. The master distribute the tasks to the worker nearest the data. If the worker needs to fetch a remote object, it reads directly from another worker. When a task finishes successfully, it will reply to the master with the set of references that it wishes to publish, and a list of tasks that wishes to spawn. CIEL uses a single master.
Skywriting is a language for expressing task-level parallelism that runs on top of CIEL.a Skywriting can express arbitrary data-dependent control flow. It can decide what to do next according to the output objects.

DryadLINQ: A System for General-Purpose Distributed Data-Parallel Computing Using a High-Level Language

2008-Yu-DryadLINQASystemforGeneral-PurposeDistributedData-Parallel-osdi.pdf
Dryad is investigating programming models for writing parallel and distributed programs to scale from a small cluster to a large data-center. Dryad is an infrastructure which allows a programmer to use the resources of a computer cluster or a data center for running data-parallel programs. A Dryad programmer can use thousands of machines, each of them with multiple processors or cores, without knowing anything about concurrent programming.
A Dryad programmer writes several sequential programs and connects them using one-way channels. The computation is structured as a directed graph: programs are graph vertices, while the channels are graph edges. A Dryad job is a graph generator which can synthesize any directed acyclic graph. These graphs can even change during execution, in response to important events in the computation.

Dryad: Distributed Data-Parallel Programs from Sequential Building Blocks

2007-isard-DryadDistributedData-Parallel_Programs-eurosys.pdf
DryadLINQ is a system and a set of language extensions that enable a new programming model for large scale distributed computing. A Dryad application combines computational “vertices” with communication “channels” to form a dataflow graph. Dryad runs the application by executing the vertices of this graph on a set of available computers, communicating as appropriate through files, TCP pipes, and shared-memory FIFOs. Each channel is used to transport data in some way (TCP, shared-memory, files), and the nodes are the programs. When 2 nodes run at same times, we've a parallel issues.
A Dryad job is coordinated by a process called the “job manager”. Job manager is responsible for control decisions and is not a bottleneck for any data transfers. The JM consults the name server (NS) to discover a list of available computers. It maintains the job graph and schedules running vertices. The daemons (D) run process in behalf of JM. The JM communicate with the process via D. A simple task scheduler is used to queue batch jobs.
+---------+ |Job sched| +---+ +---+ +--+ +--+ | | |NS | |D | |D | |D | | +---+ | +-+-+ +-+-+ +--+ +--+ | |JM +-+---+------+----+----+ | +---+ | | | | | +---------+
Related to failures: if the process crashes the daemon notifies the job manager; and if the daemon fails for any reason the job manager receives a heartbeat timeout.
Each vertex belongs to a stage, and each stage has a manager object that receives a callback on every state transition of a vertex execution, and on a regular time interrupt. Within the stage manager, it implements behaviours, like detecting vertices that are running slower, and schedule duplicate executions.
Tests were made around 1800 computers.
Dryad application may specify an arbitrary communication DAG rather than re-quiring a sequence of map/distribute/sort/reduce operations. Graph vertices may consume multiple inputs, and generate multiple outputs, of different types.

MadLINQ: Large-Scale Distributed Matrix Computation for the Cloud

2012-Qian-MadLINQLarge-ScaleDistributedMatrixcomputation-eurosys.pdf
The MadLINQ project ad-dresses the following two important research problems: the need for a highly scalable, efficient and fault-tolerant matrix computation system that is also easy to program. MadLINQ exposes a unified programming model to both matrix algorithm and application developers. Matrix algorithms are expressed as sequential programs operating on tiles (i.e., sub-matrices). For application developers, MadLINQ provides a distributed matrix computation library for .NET languages.
A vertex is considered to be ready when its parents have produced all the data. We call such execution staged.
In a pipelined execution, a vertex is ready when each input channel has partial results, and it can start executing while consuming additional inputs
Pipelining requires each vertex to consume and produce data at a finer granularity, which we call a block. As a trivial example, suppose we are adding twomatrices A and B, each is divided into a 4x4 grid, for a total of 16 tiles. Each tile is recursively divided into 16 blocks, then each of the 16 addition vertices can stream in blocks of its corresponding A and B tile, and similarly output C blocks, all in a pipelined fashion.

MapReduce is good enough?

2012LinMRGoodEnough_Arxiv.pdf
The goal of this paper is to show that, if MR is not amenable to a particular class of algorithm, it exists a replaceable algorithm that amenable to MR that will work.
Iterative algorithms are not well supported by MR, so it advices to avoid them.
PageRank algorithm: for every vertex v, it's calculated the likelihood that a random walk will arrive to the vertex vi. The MR implementation is, the graph is serialized as adjacency lists for each vertex. Mappers process all vertexes in parallel. For each vertex, it's emitted an intermediate key-value . The reducers sums up the partial PageRank contributions. Each iteration of PageRank corresponds to a MR job. The shortcomings with this algorithm are: MR have high start up costs. This algorithm is prone to stragglers. At each iteration the graph is shuffled, and the PageRank vector is serialized to HDFS. It exists application like Haloop for these shortcomings but they aren't Hadoop. It also exists tweaks like Schimmy pattern to avoid the need to shuffle the graph structure by consistent partitioning and performing a parallel merge join. Also PageRank values never start from scratch, but from uniform distributions, and, for last, the existence of streaming algorithms suggest that it exist non-iterative solution. The streaming algorithms mean that instead of waiting for some point that all the values are gathered, the next iteration is influenced by small portions of values, or 1 value at a time.
The big question is, it make sense spending time to create a customized solution using Haloop,Twister, PrIter, etc, or just using Hadoop.
This paper shows that it is preferable to use Hadoop, instead of creating interfaces for the input and output data and convert it for different applications. Hadoop is also open-source, which benefits the application, because users can tweak it, the company benefits of new features, and the students gain valuable experience.
Hadoop has drawbacks, like it's a new technology, it work in batch mode. This is a limitation with increasing frequency of job submission. Jobs take time to start up and to process big chunks of data. It exists solutions like using streaming engines with Hadoop.
Also, it is a burden for scientists test the map reduce program that they created using for example Piglatin. If his program fail, he has to found the error, correct it, and submit again.

MapReduce is Good Enough? If All You Have is a Hammer, Throw Away Everything That’s Not a Nail!

2012-Lin-MapReduceisGood_Enough-Arxiv.pdf
This paper defends that, if there's an algorithm that it's not suitable for MR, it exist a replaceable algorithm that will do the same as the previous one and it works fine in MR.
For the author, MR is good enough, and instead of creating tweaked solutions for specific cases, we should be more generic and adapt the situations to the current MR framework. He defends his view showing solutions for an iterative algorithm (PageRank), gradient descent and expectation maximization (Markov models).
Despite Hadoop have limitations, like works in batch-mode, and it takes time to create a error-free MR user functions, it's better to use this proven general purpose tool. If someone want streaming in there, it's better to integrate MR with stream processing engine.

Only Aggressive Elephants are Fast Elephants

2012-Dittrich-OnlyAggressiveElephantsareFastElephants-arxiv.pdf
This paper present HAIL that improves the upload pipeline of HDFS to create different clustered indexes on each replica. He proposes that for each replicated file in Datanode are indexed with different keys to improve vertical partitioning. So HDFS will store each copy in a different vertical layout.
HAIL, an enhancement of HDFS and MR, keeps the existing physical replicas of an HDFS block in different sort orders and with different clustered indexes. When files are saved in HDFS (uploaded), the indexes are updated. This solution improves the upload and query time. There's no problem if blocks are corrupted in HDFS, because they just change physical representation of each replica, therefore, from each replica is possible to recover the logical data.

Distributed Apriori in Hadoop MR

Report that shows how they transformed a centralized Apriori algorithm to work in a distributed structure that is MR.
Apriori algorithm is used to find associations between different sets of data.

Cloud MapReduce: a MR implementation on top of a Cloud operating System

YYYY-Liu-CloudMapReduce:aMapReduceImplementationontop_Cloud-CiteseerX.pdf
It says that: - faster than other implementations - more scalable because has no single point of bottleneck - it only has 3000 lines of code
The CAP theorem says that, two out of three properties can be achieved at any given time - data consistency, system availability, tolerance to network partition.
Clouds have to be highly available and always visible, so data consistency has to be dropped, so Amazon OS embraced weaker consistency model - "Eventual consistency".
Eventual consistency - the system guarantees that if no updates are made to an object, eventually all accesses will return the last updated value. The inconsistency in this systems depends on communication delays, the load on the system, the number of replicas involved, and the extent of components failure. E.g. of this kind of system is DNS - updates will be distributed according to a configured pattern, and eventually all clients will see the update.
There are cloud challenges: - long latency - a simple word count of 10 MB takes 2 hours to complete in 10 nodes, whereas in a single node takes minutes. This is solved in this work using message aggregation and multi-threading. - Duplicate message - since they're using eventual consistency, and despite the message disappear from the queue after being read, it's no checkable if two workers read the same message at the same time. They use conflict resolution for the master reduce queue to avoid this possibility. - potential node failure - the status is updated in SimpleDB, and so it's possible to check uncommitted results. - indeterministic eventual consistency windows - due to the implementation of SQS, they frequently found that the queue was empty, although there are still messages left. Also, in SimpleDB, sometimes when they read after writing they wouldn't get the latest result. They opted for setting a expectation before reading, e.g., the reduce phase know exactly how many key-value pairs should expect, and they poll until all are read. Also, they report if they get all status from all tasks.
This solution uses: - EC2 APIs to spawn up new virtual machines to process MapReduce jobs. - The input and output data is stored in S3- - Use SQS - simple queue service, it's a synchronization point where workers can coordinate job assignments, serves as a decoupling mechanism to coordinate data flow between different stages, and serves as a central job coordination point.
It has: - one input queue, holds the inputs. It uses SQS to send the localization of the input split . - one master reduce queue that is used to assign reduce tasks. It sends RT to the reduce queues. - the map output will saved in the reduce queues. - once the map workers finish, reduce workers will start. - After the reduce function finishes, it will fetch the next message. - the reduce output is saved in the output queue - reduces start only after map finish - the workers keep their status in the SimpleDB.
The architecture is below
                                            +-------------------------------+
         Reduce workers           +---> | talk,"3", walk,"5", .....     |   output queue
         ---------                |     +-------------------------------+
      --/         \---------------------------+
    -/     RW        \-
   /                   \
+-->| RW | +--------------------------------+ | \ RW /<--------------------------| 1 | 2 | 3 | ..... | Master reduce queue | -\ RW /- +--+-----+-----------------------+ | --\ A--------------------------------------+ | +----------------+ | ---------/ | | | | | | | | |--V-------+ +-V-------+ | Map workers +---->|Walk, "1" | |he, "1" | Reduce queues | --------- | | | |talk, "2"| | --/ \ | | | | | | -/ MW - | | 1 | | 2 | | / MW ----------------------+ | | | | | | | | | | | | \ MW / | | +---------+ | -\ MW /- <-----------+ | | | --\ /-- | | | | --------- | +----------+ | A | | | | | | | | | | | V | | +-----------+ | +---------------------------------+ +----->| | +--------------+ doc1 | doc2 | doc3 | ..... | Input Map/queue | | +---------|------+----------------+ | | + | +--------------------+ | | | +---------------------+ | +-----------+ | | | SimpleDB +------ +----------+ | | ---+--> O | | | | | | | | O <---+------+ | | | | | O <---+------------+ +----------+ S3

Using realistic simulation for performance analysis of MR Setups

They created a MR simulator, MRPerf, for facilitating exploration of MR design space. It captures aspects of MR setup (run-time parameters and cluster design), and uses the information to predict expected application performance. It difficult to build this platform because, if every component is simulated thoroughly, it may take long time to produce results; conversely, if important components are neglected, the results may not be accurate. At the end, they compare with real results.
It exists several important configuration parameters to take care of: - Data replication factor: - Data block: - Number of map and reduce tasks: - Data placement algorithm, which decide where to place data blocks - Task scheduling algorithm, which decide where to place tasks
This application take several files as input, including node specification, cluster topology, data layout and job description. The output is a detailed trace which provides the job execution time, the amount of data transferred, and the timeline of each map reduce task.
They use the application using NS-2 (network simulator) to the MRperf, firstly, set the network.
The simulator does not process actual data. Only the size of data is transferred for ns-2 calculate the network traffic and transfer latencies.
They compare the results with executions of Hadoop, and used a single and double rack with 16, 32, 64 and 128 cores.
The MRperf shows a map phase performance withing average 3.42%, and reduce of 19.32% for a single rack.
For double-rack is 5.22% for map phase, and 12.83% for reduce phase.
For each phase, map, sort, spill, merge and reduce the error is within 13.55%.
The use of models allow to evaluate a system without having to implement everything. It is also useful to study new models and to find some errors that algorithms can have. MRPerf is useful for that. He tries to emulate MR, without reading and writing to disk, without performing the real map and reduce operations, and without transferring data between map and reduce. All of these values are simulated.

G-Hadoop: MR across distributed data-centers for data-intensive computing

G-Hadoop is a MR framework that was built over Hadoop to enable large distributed computing across multiple clusters. Its target are multiple distributed High End Computing clusters, that are connected with high performance networks like Infiniband. The framework uses Gfarm distributed file system.
G-Hadoop extends to other clusters by deploying G-Hadoop on the slave nodes of the cluster.
What it can do: G-Hadoop schedule task processing task across nodes of multiple clusters. It duplicates map and reduce tasks across multiple clusters and it doesn't rely on nodes of single cluster. Offers access to a larger pool of processing and storage nodes.
Gfarm is a distributed FS designed to share vast amounts of data between distributed clusters via WAN. It has a Metadata server responsible for managing the FS metadata such as file names, and locations, and Data nodes that stores the data. The files are saved in the hard disk - the filesystem is local to the OS.
G-Hadoop uses Gfarm instead of HDFS. JT and TT uses GFARM to store configuration data. TT accepts and execute tasks sent by DRMAAA. TT fetch tasks from the JT and submit in the Torque using DRMAA interface.
Each TT is responsible of several nodes, instead of 1.
Performance: G-Hadoop present similar performance has Hadoop. For them, the reason it's in the scheduling algorithm that they use, because tasks are submitted using sequential approach on the TT. They observed that submitting hundred of tasks at once requires 1 minute until the last task is put in the queue.
G-Hadoop sends accumulated heartbeats to the JT, making less messages to flow in the network. Also, the total amount of data is less than in Hadoop. To execute BPP algorithm, G-Hadoop required 5.5MB combined amount of data, and in Hadoop it was 32MB. 6x times of data transfered over the WAN. In the Hadoop configuration, there was 8 - 64 slave nodes with dN and TT. In G-Hadoop there's only 1 TT configured to run 16-128 tasks simultaneously.
Workflow systems provide: A coarse-grained parallelism and cannot fulfill the requirement if high throughput data processing. Workflow systems for data intensive computing requires large data transfer for between tasks Have to take care of fault tolerance for task execution and data transfer.

X-Trace: a pervasive network tracing framework

X-trace is a tracing framework that provides a comprehensive view of the systems. It gives tracing information about a connection in the network, and that is using different protocols. E.g., User1 want to access Site1, and has to go trought the ISP. When X-trace is invoked, the trace information of their network is delivered locally, the trace information of the ISP is delivered to ISP, and the trace information from the web is sent to the web operator. Each of these parties then can share the logs if they want.
Xtrace trace multiple applications at different network layers, and across administrative domains. Administrative domain: A collection of networks, computers, and databases under a common administration, e.g., the network of a company.
The xtrace has some drawbacks, like the protocols must be altered to carry xtrace metadata, when xtrace is partially deployed, the ability to trace those parts are impaired, and lost of trace reports can limit reconstruction of the request.
In protocols that allow extensions, like HTTP, TCP, IP, SIP, it's easy to embed X-Trace metadata in the messages is exchanges. Protocols without extension mechanims, it's necessary to change the protocol of overload the existing functionality.

Storm

Storm is a distributed and fault-tolerant realtime computation: stream processing, continuous computation, distributed RPC. Similar to how Hadoop provides a set of general primitives for doing batch processing, Storm provides a set of general primitives for doing realtime computation. Do distributed RPC and stream processor. Storm is fault-tolerant, and not fault-proof. I.e., it guarantee that data will be processed at least one, but it could be processed more than once.
Streams - pair of unbounded sequence of tuples Spouts - sources of streams in computation. It's what injects data to the storm. Bolts - process any number of input streams and output any number of streams Topologies - a network of spouts and bolts, that are connected by streams.
spouts and bolts execute many tasks across the cluster.
Bolts are composes by several task, and if we want to connect several bolts, stream grouping is used to know which task will send the message to.
Stream grouping: shuffle grouping, distributed data through all the tasks. fields grouping, hashes specific set of fields and commit them to the same bolt the same specific hash. all grouping, send everything to every one. global grouping, send everything to a single task.
Guarantee message processing - get rid of intermedium brokers. Each tuple is dependent of the tuple before. A spout tuple is not fully processed until all tree has been exhausted, and every node in the tree has been completed. If the tuple tree is not completed within a specified timeout, the spout tuple is replayed.
Transactional topology - process small batches of tuples. After the processing phase, you move on to the commit phase. In this topology it's guaranteed that commits are ordered. If there's a failure, you probably have to re-commit the batch. Multiple batches can be processed in parallel, but commits are guaranteed to be ordered.
Examples of use: crawlers that save the urls and content that a user clicked and stores in the DB, twitter web analytics, know the 10 url's a user clicked and save in a in-memory database like.
Storm doesn't do persistence, it doesn't store messages, just process them. Doesn't process batches reliably, doesn't protect human error,

Cloud Computing: a Perspective Study

This paper gives a brief introduction of what's a Cloud. It's useful to recall concepts.
A computing Cloud is a set of network enabled services, providing scalable, QoS guaranteed, normally personalized, inexpensive computing infrastructures on demand, which could be accessed in a simple and pervasive way. cloud=HaaS+SaaS+DaaS+PaaS. Sometimes it's considered IaaS=Haas, but infrastructure is all the computers together, and HaaS is a service that allows to add and remove nodes to the IaaS.
At the ends talks about new programming model like MR.

Gfarm Grid File System

Gfarm Grid file system is a global distributed file system to share data and to support distributed data-intensive computing. Gfarm file system provides a global file system from a federated of local file systems. Since each administrative domain has their own user and group management, global user accounts and global groups are required for a global file system. Gfarm file system does not require a dedicated storage cluster, but federates local file systems on compute nodes.
Since it's a shared filesystem, it needs performance, and the use of file replicas is a good technique to cope with network and disk failures.
Gfarm has administrative domains that manages user access permission.
For consistency in FS, when decoupling storage of file data and file system metadata, there's the risk to break consistency. There are 3 problems with that:
1 - The first problem is failure to update the file system metadata. Solution: Every node has an IO server that updates metadata for the local FS updates. 2 - Unauthorized or unintended direct access to a file data. The IO server always check the metadata before accessing to the physical file data. 3 - In globally distributed environment, all nodes that store files are not always available. When deleting a file, all physical file replicas cannot be always deleted. Gfarm has a list of removed file replicas. When some nodes are not available at deletion time, the files will be deleted when they are available.
MDS manages FS metadata. The metadata includes inodes, directory, replica locations and removed file replicas.
MDS is implemented in a sigle central server where clients access. All metadata is cached in memory, but it's also maintained in a backend DB server by a backend thread.
When a file is being updated, is blocked from any other access. Other clients that are accessing the same file, will access different replicas. If another process wants to open the same file in RW mode, it accesses the same physical file replica accessed by the process that already opened in RW mode after siabling the client-side buffer cache for both updates to be effective. After a process closes the file, all replicas should contain the same content.
Performance related:
At the peak, it was created 3568 directories/second by more then 100 clients at the same time.
Around 100 clients can read at most 4370 MiByte/sec and read 25170 MiByte/sec. A MiByte is 1024^3 bytes.
The average increment of IO bandwidth with r=1, it cost 57.3MiBytes/sec, r=2, 66.8MiBytes/sec, r=4, 71.1MiBytes/sec, r=8 71.3MiBytes/sec.

Evaluating MR for Multicore and Multiprocessor system

L1-cache is the fastest cache and it usually comes within the processor chip itself. The L1 cache typically ranges in size from 8KB to 64KB and uses the high-speed SRAM (static RAM) instead of the slower and cheaper DRAM (dynamic RAM) used for main memory. The Intel Celeron processor uses two separate 16KB L1 caches, one for the instructions and one for the data.
L2 cache comes between L1 and RAM(processor-L1-L2-RAM) and is bigger than the primary cache (typically 64KB to 4MB).
L3 cache is not found nowadays as its function is replaced by L2 cache. L3 caches are found on the motherboard rather than the processor. It is kept between RAM and L2 cache.
Phoenix is an implementation of MR for shared-memory systems that includes a programming API. Phoenix manages threads creation, dynamic task scheduling, data partitioning, and fault-tolerance aross processor nodes. It handles parallelization, resource management, and fault recovery.
User just need MR functions, functions for partition the data, and function that implements key comparison.
It has one scheduler that creates and manages threads. For each core, it spawns a worker thread that is dynamically assigned some number of MR tasks.
All buffers are located in shared-memory but are accessed in specified way. Mapoutput goes to L1 cache. The reducers get the address where to fetch the data from the memory.
Faults are signaled when Phoenix notifies the hardware about which load/store addresses should be considered safe, or faulted.

A view of cloud computing

Cloud - appearance that resources are infinite, escalable according to the needs, ability to pay for use. Use of automatic allocation with the use of Virtualization. It exists several level of services that go from hardware to service level.
It's difficult to predict the number of resources needed. Overprovision can cost money to the company. Underprovision cannot satisfy user needs.
Companies that depends on other companies can fall because the latter company didn't provide a good service. For example, Company A built programs using B's API to saved data in company B. Company B lost A's data, and from that reason, A lost all its clients. One solution, is to spread the data over several clouds.
Data must be confidential. The cloud provider is responsible for physical security. The client is responsible for application security.
Clouds must have a good network to avoid bottlenecks.
Have cloud that adjust quickly according to the needs.

Naiad: Incremental and Iterative Data-Parallel Computation

Supports loops and incremental updates, has an interface to program workflows like Dryad. It automatically runs in single and multiple codes. Each node is running asynchronously. There's no barrier.
The goal is hide complexity in parallelization, data partitioning and distribution, scheduling and load balancing and FT. It's not fault tolerant, but it's in the works.
Dryad doesn't support iterative computation very well, and this app does. If the input changes, the entire job must re-run.
User have to program MR functions using LINQ.
MR is restrictive. 1st, it's a 2 stage data-flow, it has unnecessary disk copies and network transfers. Then, it comes Dryad that is a generalized framework for data-parallel computing, programs are represented as acyclic graphs.
Naiad hides the complexity, simplify the programming. Naiad is based of differential dataflow.
Naiad has shards and epochs. A state of each vertex is split into shards. One worker is reponsible for each shard of every vertex. A shard is a process of map or reduce to a partition. Each worker executes asynchronously the dataflow graph. When shard finish, it exchanges data with other shards through channels. Each set of updates of input made by the shards are called epochs.
Incremental unary operator x @1 > f(x) @1 y @2 > f(x+y) - f(x) @2 z @3 > f(x+y+z) - f(x+y) @3
Fix-point operator: It's a subgraph of it's own. IN > f(incremental) > out
Detect convergence: Uses incremental fixed-point. What Naiad does is, at any time the input arrives, the fixed-point operator puts a timestamp and the loop iteration
Differential dataflow: Can detect differences in the input that are growing.

Breaking the MapReduce Stage Barrier

This paper shows a simple solution of overcoming the barrier that is the shuffling, and can cause bad performance. This barriers stalls the beginning of reduce. With this solution, as the map produces the output, it is being consumed by the reduces. This solution removes the time to sort the map output by key, and the time that reduces have to wait to start.
The solution is just updating and intermedium buffer that reduce will be fetching constantly for new data. The output of the reduce will be updated while it's being executed.

"No Silver Bullet — Essence and Accidents of Software Engineering"

Brooks argues that "there is no single development, in either technology or management technique, which by itself promises even one order of magnitude [tenfold] improvement within a decade in productivity, in reliability, in simplicity." He also states that "we cannot expect ever to see two-fold gains every two years" in software development, like there is in hardware development.
Brooks makes a distinction between accidental complexity and essential complexity, and asserts that most of what software engineers now do is devoted to the essential, so shrinking all the accidental activities to zero will not give an order-of-magnitude improvement. Brooks advocates addressing the essential parts of the software process. While Brooks insists that there is no one silver bullet, he believes that a series of innovations attacking essential complexity could lead to significant (perhaps greater than tenfold in a ten-year period) improvements.
Accidental complexity relates to problems that we create on our own and which can be fixed; for example, the details of writing and optimizing assembly code or the delays caused by batch processing. Essential complexity is caused by the problem to be solved, and nothing can remove it; if users want a program to do 30 different things, then those 30 things are essential and the program must do those 30 different things.
Brooks claims that we have cleaned up much of the accidental complexity, and today's programmers spend most of their time addressing essential complexity. One technology, that had made significant improvement in the area of accidental complexity was the invention of high level programming languages, such as Fortran at that time. Today's languages, such as C, C++, C# and Java, are considered to be improvements, but not of the same order of magnitude.
Brooks advocates "growing" software organically through incremental development. He suggests devising and implementing the main and subprograms right at the beginning, filling in the working sub-sections later. He believes that programming this way excites the engineers and provides a working system at every stage of development.
Brooks goes on to argue that there is a difference between "good" designers and "great" designers. He postulates that as programming is a creative process, some designers are inherently better than others. He suggests that there is as much as a tenfold difference between an ordinary designer and a great one. He then advocates treating star designers equally well as star managers, providing them not just with equal remuneration, but also all the perks of higher status: large office, staff, travel funds, etc.

Practical Hardening of Crash-Tolerant Systems

Fault-tolerant distributed systems need to tolerate not only crashes but also random data corruptions due to disk failures, bit-flips in memory, or CPU/bus errors. These random errors can propagate and have unpredictable effects. However, manually adding error detection checks is cumbersome and difficult. Which checks need to be added? Where to place them? What if the variables used for the checks are themselves corrupted?
PASC is for Handling data corruption in distributed systems. What important is dependability. Error propagation between process can corrupt states between systems. It's important to garantee failure isolation. So it was defined: - General model of process of behaviour - Arbitrary State Corruption fault model - Guarantee error isolation through hardening
PASC is a Java library that automates the detection of Arbitrary State Corruption faults in processes of a distributed system. The motivation behind PASC is that developers should not care about these problems. They should focus on making their distributed system crash-tolerant. The PASC library wraps the processes of the system and takes care of executing all the checks that are necessary to transform arbitrary state corruptions into crashes and dropped messages. All this is transparent to the application. If a faulty process wrapped with PASC sends a corrupted messages, this corruption is exposed to the receiver, which discards the message.
Hardening is usually the process of securing a system by reducing its surface of vulnerability.
They look for data corruption that happens on disks, memory modules failures or CPU failures - ASC hardening
The Nysiad performed Hardening for error isolation based on SMR and replication and performance costs.
At any time a message is received, a fault can happen. It admit that variables can be replicated after fault, and if the data is corrupted is going to be a different value, because a corruption is random. So, it's difficult that a variable and a replica can have the same corrupted value.
Hardening doesn't protect you agains error propagation.
The goal of ASC (arbitrary state corruption) is transforming ASC faults into crashes. This must be transparent to the system. All the checks are local, and ASC hardening is untrusted.
A m+-----------------+ributed systems consists of processes. Processes can be modeled as collections of message handlers, each processing a different type of messages. All message handlers modify the state of the process. +--- | A P|SC process is imp|emented by specifying classes describing the process state, event handlers and the messages using the templates provided by the library | | The+-----------------+ening, having two replicas. The checks are compared between the original and the replica.
Orig. Rep. MSGA == MSGB? If so, send message, if not some error happened.
But error can happen during the check, or when sending the message. To control this, some output message that come from the replica is attached to the original message (e.g. CRC32) and avoiding error propagation.
If there's a fault during a check, do a redundant check.
They also have an incremental buffer, that records any state change of the processes. From that they can do the redudant event handling, do the check and build the message where they append CRC32 coming from the replica.
To control flow errors, they had some check variables to set the step where the process is.
To test they implemented an echo server, and compared the results with and without PASC Paxos, and PBFT. PASC Paxos is 70% better than PBFT, and only needs 2f+1 replicas. And compared with unprotected PAXOS it's worse 15%. The same results when tested in Zookeeper.
PASC Paxos has more memory overhead, but not too much, part that more of objects need to be garbage collected.

Cloud federation

This papers talks about cloud federation ad its form of applications.
Cloud computing services are growing and services of services are also growing. For example, one company can offer cloud computing that uses resources from another company. This dependency makes interesting talking about this.
Vendor lock-in it's when changing cloud environment costs more to the client than expected. E.g., when for a Amazon application it's need to program everything from scratch to work with Azure - when the user depends on the business strategy of the service provider. Amazon uses open standards that makes the cost less. A lock-in can make a company decide to reduce investment in cloud computing.
Cloud federation comprises services from different providers aggregated - resource migration, resource redundancy, and combination of complementary services. Cloud federation can help the clients to lower the costs and get better performance.
Migration allows the relocation of resources, such as virtual machine images, data items, source code, etc. from one service domain to another domain. Redundancy allows concurrent usage of similar services in different domains.
Horizontal federation: Horizontal federation across one level of cloud stack. To used for redundancy and migration (redundant deployment and computation, parallel computation, data replication and fragmentation and erasure codes. Shadow and redundant migration, and partial migration.) Vertical federation: spans on multiple levels.
Talks also about interoperability challenges from the perspective of service provider and user. It shows two design forms: one for migration scenario, and another for redundancy strategy.

Time, clocks and the ordering of events - Lamport

This text defines a partial ordering of events in distributed systems. Logical clocks can be used to totally order events. Partial ordering of events can be extended to totally order of events.
In partial ordering, if a->b in the same process, then a happens before b. If Pi is sending a message a to Pj, and Pj receives the message, a->b. Two events are concurrent if neither can causally affect the other.
Logical clocks have the goal to put sequence of causally affect events in time between different processes. If a->b, so C(a)b. C1- If a->b in the same process, so C(a)
To guarantee that the system of clocks satisfies the Clock condition, C1 and C2 must be assured. For C1: IR1 - each process Pi increments Ci between any two successive events.
For C2: IR2 -a) If event a in process Pi is sending message m, then message m contains the timestamp Tm=Ci(a). b) Upon receiving message m, Pj sets Cj greater than, or equal to its present value and greater than Tm.
We can use C1 and C2 to place a total ordering of all system events. Logical clocks are used for total ordering of events.
In total ordering => is a way of completing the "happened before" partial ordering in total ordering. E.g., a=>b, iff: i) Ci(a)
It's impossible to have a view from a system that we can order easily events. For that it's needed a strong clock consistency, where it is possible to assume that it is correct that if C(a)b, and for that it is used vector clocks. Vector clocks is an algorithm to generate partial ordering of events and detect causality violations.
The total ordering defined by the algorithm can produce anomalous behaviour if there's a problem ordering events between all processes. This can be prevented by using properly synchronized physical clocks. Synchronizing physical clock in a distributed system is difficult. In distributed events the order in which events occur is only partial ordering.

IMPLEMENTATION OF INFORMATION RETRIEVAL (IR) ALGORITHM FOR CLOUD COMPUTING: A COMPARATIVE STUDY BETWEEN WITH AND WITHOUT MAPREDUCE MECHANISM (not very good paper)

It creates a new way of processing map and reduce tasks for information retrieval from the cloud without using MR. It also compares the results of the new algorithm with MR.
The algorithm takes search requests as inputs, it breaks the search requests into chunks for information retrieval from the cloud. The algorithm does the mapping functionalities and determines the number of buckets necessary to perform reduce function. It presents a more simpler algorithm than the MR.
The algorithm works on three fold: a) The requests are broken into number of parts. b) Each of these parts are processed in sequential order at different datacenter and response is send back to the main server. c) The main server which has IR Algorithm joins each of the response and sends back to the user.
Analysing the results, MR can offer a better and stable performance.

Composable incremental and iterative data-parallel computation with Naiad

Naiad is a composable incremental and iterative computation using a similar declarative programming model. Naiad allows incremental inpu processing. This work gives 2 contributions: 1 - differential dataflow: that allows incremental and interative work of thedataflow to responds incremental changes in the input. 2 - use LINQ.
INCOMPLETE

Graphlab: a new framework for parallel machine learning

This paper presents Graphlab which improves abstractions like MR by expressing asynchronous iterative algorithms while ensuring data consistency and achieve high degree of parallel performance. Graphlab it enables ML experts to design and implement scalable parallel algorithms by composing problem specific, data-dependency and scheduling.
The paper defines Dag as directed acyclic graphs with data flowing along edges between vertices. Vertices are functions. And Systolic abstractions forces the computation be decomposed into small atomic components with little communication between components. In a iteration, each processor reads all incoming messages, performs computation and write output messages, and a barrier synchronization is performed between each iteration, ensuring that all processors compute and communicate in lock step. In other words, abstractions are performed by each processor and it's assured that they all finished before going to the next iteration.
GraphLab is composed by: - data graph which represents the data and computational dependencies; - update functions which describe local computation - sync mechanism for aggregation global state - data consistency model to prevent race conditions in updating variables. They created vertex consistency, edge consistency and full consistency. - scheduling primitives which represents the order of computation.

Time, clocks, and the ordering of events in a distributed system

This paper introduces to a model that allows ordering events in a distributed system - partial ordering of the events (poset).
A poset consists of a set together with a binary relation that indicates that, for certain pairs of elements in the set, one of the elements precedes the other. Such a relation is called a partial order to reflect the fact that not every pair of elements need be related: for some pairs, it may be that neither element precedes the other in the poset. Thus, partial orders generalize the more familiar total orders, in which every pair is related.
In set theory, a total order, linear order, simple order, or (non-strict) ordering is a binary relation (here denoted by infix ≤) on some set X. The relation is transitive, antisymmetric, and total. A set paired with a total order is called a totally ordered set, a linearly ordered set, a simply ordered set, or a chain. If X is totally ordered under ≤, then the following statements hold for all a, b and c in X: If a ≤ b and b ≤ a then a = b (antisymmetry); If a ≤ b and b ≤ c then a ≤ c (transitivity); a ≤ b or b ≤ a (totality - set X is total if for all a and b in X, a is related to b or b is related to a (or both)). (totality doesn't exist in a poset)
Contrast with a partial order, which has a weaker form of the third condition (it only requires reflexivity, not totality). A relation having the property of "totality" means that any pair of elements in the set of the relation are mutually comparable under the relation.
Lamport do this with logical clocks. - A process increments its counter before each event in that process; - When a process sends a message, it includes its counter value with the message; - On receiving a message, the receiver process sets its counter to be greater than the maximum of its own value and the received value before it considers the message received.
Logical clocks also provide a starting point for the more advanced vector clock method.
This paper also tries to define a total ordering, but the result is somewhat arbitrary. It can produce anomalous behaviour if it disagrees with the ordering perceived by the system. This can be prevented with synchronized physical clocks. Example of anomalous work: P1 sends a request to P0 and then sends a message to P2. P2 upon receiving the message, sends a request to P0. It's possible that P2 message reach P0 before P1. This problem won't exist anymore if P0 could distinct which request is first, and know the requests that will reach him.

Improving MR performance in Heterogeneous Environments

This work shows a better scheduling algorithm when running MR in a cluster of VMs. Clusters of VMs use different hardware with different characteristics. A task that runs on slow hardware doesn't mean that it is not processing. They developed a scheduling algorithm that manage where the tasks will be launched, detects if they are slow, defines a new model from measure progressing rate , and estimates finish time.
This solution is good in most cases, although there are situation that the scheduler consider a task slow, where it isn't.

GraphLab: A New Framework For Parallel Machine Learning

Is a free implementations of scalable machine learning algorithms on multicore machine and clusters. It improves MR abstractions like by expressing asynchronous iterative algorithms with sparse computation dependencies while ensuring data consistency and achieving a high degree of parallel performance.
INCOMPLETE

Survey of large scale data management approaches in cloud environments

This survey gives a comprehensive view on the several approaches and mechanisms of deploying data-intensive applications in the cloud.
Cloud computing offers economical advantages, when hosting deployments of data-intensive applications: 1 - clients don't need to buy hardware to host their applications; 2 - clients only pay what they use; 3 - clients don't pay operational costs, 4 - unlimited throughput by adding servers if the workload increases.
This paper show the following: 1 - Highlight the main goals an challenges of deploying data-intensive apps in cloud. 2 - provide a survey on mechanisms to tackle these 3 - analyse various design decisions, like network performance, and data management, to support certain applications and end-users. 4 - discuss the issues to find a right balance between scalability, consistency, and economical aspects. 5 - report some real world use cases.
There are several classes of applications that work with cloud computing: 1 - mobile interactive apps - this apps must be always available, and rely on large datasets; 2 - parallel batch processing 3 - analytical apps - understand customers behaviour, efficient supply chains management and recognizing buying habits. 4 - backend support for compute-intensive desktop apps - CPU intensive apps
MR/SQL Pig Latin: is a high-level platform for creating MapReduce programs used with Hadoop. The language for this platform is called Pig Latin. Pig Latin abstracts the programming from the Java MapReduce idiom into a notation which makes MapReduce programming high level, similar to that of SQL for RDBMS systems. Pig Latin can be extended using UDF (User Defined Functions) which the user can write in Java, Python or JavaScript and then call directly from the language. Sawzall - procedural scripting language used on top of MR to process large of individual log records, to do calculations. SCOPE - scripting language for large-scale data analysis and data-mining apps in Microsoft. Cosmos - distributed computing platform for storing and analysing massive data sets. It runs on large clusters consisting of thousands of commodity servers. SCOPE language is used in Cosmos. In Cosmos, an application is modeled as a DAG where each vertex is a program and an edge is a data channel. Skywriting - similar to SCOPE. Is a functional script language with its execution engine for performing distributed and parallel computation.
It also gives explain the cloud applications (and programming models) that exist.
Definition: scaling up aims to get bigger machine, scaling out aims at partitioning data across more machines. MapReduce: Is a scaling out app with a shared nothing architecture that use commodity machines. It's a easy to administer app. The limitations of MR is that does not support the joining of multiple datasets. For this, it was proposed a MR-Merge model that enables processing of multiple datasets. MR has a checkpoint between Map and Reduce tasks. For that, streaming allows the reduce tasks compute continuously as new data arrives. The paper it also shows the limitations of MR.
Hybrid systems: Hive: is a data warehouse system for Hadoop that facilitates easy data summarization, ad-hoc queries, and the analysis of large datasets stored in Hadoop compatible file systems. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.
BigTable is a compressed, high performance, and proprietary data storage system built on Google File System. It is not a relational database and can be better defined as a sparse, distributed multi-dimensional sorted map. BigTable is designed to scale into the petabyte range across "hundreds or thousands of machines, and to make it easy to add more machines to the system and automatically start taking advantage of those resources without any reconfiguration". AppEngine Datastore: is a platform as a service (PaaS) cloud computing platform for developing and hosting web applications in Google-managed datacenters. Yahoo Pnuts:

Machine Learning in the Cloud with GraphLab

This presentation shows the Graphlab. Structured problems: is when the dependencies can affect a node. Graphlab: GraphLab is an open source project to produce free implementations of scalable machine learning algorithms on multicore machine and clusters. Is used for ML problems, express data dependencies and it's iterative. Abstracts hardware issues, automatic synchronization, addresses multiple hardware architectures (multicore, distributed, cloud computing, GPU).
Is composed by: - Data graph and update function - Shared data table - it's used to share data for all vertex - Update functions - scheduling
A graph can be being executed by two points at the same time, and so there's a problem of race conditions. Graph lab deals with race conditions when propagating the data between vertex. For this there's scope rules: - full consistency - no one touched the data in an updated, in nodes that are 2 hops away. - edge consistency - 2 nodes just only touched edge data. - local consistency - update is done only locally, similar to mapreduce.
The races conditions are important to guarantee the sequential execution of an parallel algorithm. If you have an algorithm that works in sequential processing, and a tool that guarantee a sequence consistency of the algorithm, than the algorithm can work in parallel.
Graphlab Do parallel inference and gradient with MR. With the COEM algorithm, the Hadoop MR tooks 7.5 hours to compute a solution with 95 cores. Graphlab it took 30 minutes with 16 cores. Graphlab in cloud it took 80 seconds
Graphlab in cloud Building a cluster is expensive, to buy computers and mostly maintaining. Most of the time they are idle.
Challenge               Graphlab Solution
distributed memory  ->  optimized data partition
limited bandwidth   ->  smart caching, interleave computation/communication
high latency        ->  push data, latency hiding mechanism

Hadoop Security Design

Hadoop is composed by this two main components: HDFS and Mapreduce.
When Hadoop was created in 2004 no effort was expended on creting secure distributed computing. In 2009 developers of Hadoop decided to to include strong mutual authentication of users and services that would be transparent for the users. The developers decided to use Kerberos to authenticate users to the service. Also, operating system principles are matched to a set of user and group ACL maintained in flat configuration files.
The new Hadoop design make use of Delegation Tokens, Job Tokens and Block Access Tokens. Each of these tokens uses a symmetric encryption and the key may be distributed to hundreds of hosts. If the key is found by an hacker, the security of the system is compromised. But the Token is only obtained after the authentication with Kerberos.
The new Hadoop security model still requires additional time and effort before meet the requirements of many large enterprises. The wide distribution of symmetric cryptographic keys should be reviewed.
Delegation Token: Are used by the user job to authenticate to the NameNodes. The token is a secret key shared between the user and the Namenode. But the Token is only obtained after the authentication with Kerberos.
Block Access Token: Pass data access authorization from NameNode to DataNode. It enables the owner to access certain data blocks. One way to make this token non-transferable is to include the owner's ID inside it, and require whoever uses the token to authenticate as the owner specified, but DataNode does not verify it. This tokens should always be cached in memory and never written in the disk.
Job Token: When a job is submitted, the job configuration, the input splits and metadata information about the splits is written in the directory. The directory can be read or write by the user. The Map tasks have to read HDFS data, and this token is shared with the Namenodes. The security credentials are stored in the JobTracker's system directory in HDFS. The secret key in the Job token is used by the tasks in order to identify themselves. In the shuffle phase, it's guaranteed that other reduce tasks do not access incorrect map outputs.

XIA Demo Geni

Internet is host based communication, even if the services just care about services, and not hosts.
Now it sends packets to entities, called principals. This is about how the internet handles the destinations.
Services can be replicated, and the internet can pick one which is better.
Support evolvability - begin able to evolve certain principles. If the router doesn't recognize CID (content identifier)address, it will fallback to the AD (domain id) HID (host id) address.
A node can have multiple outgoing edges.
XIA uses self-certifying identifiers that guarantee security properties for communication operation.
HID = H(Kpub) SID = H(Kcert) CID = H(Content)
Workflow between client and host CID:123abc C > H data C < H H(data) == CID? (in the client)
When a packet is sent, a channel is built between the several hosts, when the data is sent and receive. The channel is build just for one request. If a new request is made, a new channel is built.

Discovering URLs through user feedback - p7i7-bai.pdf

A web crawler discovers new URLs by following the link structure induced by links on Web pages. This paper evaluate the benefits of integrating a passive URL discovery. The crawler will not fetfh the documents from the web. They use the toolbar logs of Yahoo to characterize the URLs that are accessed by users. The goal is to understand if user feedback is beneficial to improve URL discovery, using the feedback from the toolbar logs to improve the effectivness of the crawling.
Active URL discovery
it relies on link following
  1. crawler fetch pages pointed by the seed links, download and put it in the DB
  2. parses the new pages to discover new links
  3. the new pages links are stored in the download queue (frontiers of the crawler)
Passive URL
E.g. users push the newly created pages. In a extreme scenario, information can be pushed by an ISP.

Improving Parallel Job Scheduling Using Runtime measurements
This papers uses runtime information in parallel job scheduling to improve throughput on a parallel computer. The runtime informaion is IO info, CPU info and communication info.
The tests are made using a network of workstations. Myrinet, ANSI/VITA 26-1998, is a high-speed local area networking system designed by Myricom to be used as an interconnect between multiple machines to form computer clusters.
The papers references that for embarassingly parallel jobs require fair scheduling without coordinated scheduling.
/,.pyfgcrl aoeuidhtns ç;qjkxbmwvz

Relacao custo/beneficio para explicar as coisas How many clouds explodes per year. Explain why in CoC it worth it spend more money renting several clouds.

Job Scheduling for Multi-User MapReduce Clusters

This paper develops a fair scheduler for MapReduce at Facebook, which runs a 600-node multi- user data warehouse on Hadoop. We developed two simple techniques, delay scheduling and copy-compute splitting, which improve throughput and response times by factors of 2 to 10.
Hadoop’s built-in scheduler runs jobs in FIFO order, with five priority levels. When a task slot becomes free, the scheduler scans through jobs in order of priority and submit time to find one with a task of the required type.
For maps, the scheduler uses a locality optimization as in Google's MapReduce: after selecting a job, the scheduler picks the map task in the job with data closest to the slave (on the same node if possible, otherwise on the same rack, or finally on a remote rack). Finally, Hadoop uses backup tasks to mitigate slow nodes, but backup scheduling is orthogonal to the problems we study in this paper.
The disadvantage of FIFO scheduling is poor response times for short jobs in the presence of large jobs. The first solution to this problem in Hadoop was Hadoop On Demand (HOD), which provisions private MapReduce clusters over a large physical cluster using Torque.
But HOD have 2 problems: Poor locality, and poor utilization.
The FAIR scheduler offers:
1. Isolation: Give each user (job) the illusion of owning (running) a private cluster. 2. Statistical Multiplexing: Redistribute capacity un-used by some users (jobs) to other users (jobs).
 Hadoop Cluster  -------------------> Pool1: allocation: 60
 capacity: 100                    |-> Pool2: allocation: 40
                                  --> Pool3: allocation (prod. job): 0
The scheduler ensures that each pool will receive its minimum share as long as it has enough demand, and the sum over the minimum shares of all pools does not exceed the system capacity.
When a pool is not using its full minimum share, other pools are allowed to use its slots. In practice, we create one pool per user and special pools for production jobs.
Algorithm 2 Slot Allocation in FAIR SA = S; // un-allocated pools SB = 0; // allocated pools - pool that have jobs M = F; // un-allocated slots; F - total number of slots in the system.
// Let di and mi denote the demand and minimum share of pool i. Let S denote the set of active // pools, where an active pool is a pool that has at least one // when minimum share is bigger than demand, the bucket has no mark. Otherwise, there is a mark.
// every loop whose demand is no larger than its minimun share. // it fills each unmarked buckets to its demands. (mi > di) for (each pool i ∈ SA ) do if di < mi then fi = di; M −= di;// SA = SA \ {i}; SB = SB ∪ {i};// add job to SB and remove it from SA end if
end for
// allocate min. share to remaining pools with demands greater than their minimun shares. // This loop makes sure that every pool receives at least its minimun share // it fills remaining buckets up to their marks. for (each pool i ∈ SA ) do fi = mi; M− = mi ; end for
// distribute remaining slots across the pools, by equally increasing the shares of the pools // with the lowest shares until either their demands are met or free slots are exhausted. while ((SA = 0) ∧ (M > 0)) do dmin = smallest demand among pools in SA ; fmin = smalest share among pools in SA; Smin = set of all pools whose share is fmin; fnext−min = next smallest share among pools in SA ; ∆ f = min(M/|Smin|, dmin − fmin , fnext−min − fmin) for (each pool i ∈ Smin) do fi+= ∆F; M−= ∆F; SA = SA \ {i}; SB = SB ∪ {i}; end for end while
When a new job starts, FAIR starts allocating slots to it as other jobs’ tasks finish. Since MapReduce tasks are typically short (15-30s for maps and a few minutes for reduces), a job will achieve its fair share (as computed by FAIR) quite fast.
In this work, jobs can launch non-local tasks. When a node requests a task, if the head-of-line job cannot launch a local task, we skip this job and look at subsequent jobs. However, if a job has been skipped long enough, we let it launch non-local tasks, avoiding starvation.

SEHadoop: Security Enhanced Hadoop for Cloud

There are vulnerabilities in authentication of MRv2: overloaded authentication key, and the lack of fine-grained access control at the data access level. We propose and develop a security enhancement for Cloud-based Hadoop, that we called SEHadoop.
Overload of authentication key occurs when all components of an Hadoop Distributed File System (HDFS), i.e., a Name Node and Data Nodes, share the same symmetric authentication key. If any of the nodes is compromised, potentially the whole HDFS is compromised.
Fine-grained access control lacks can be traced to lacks in Delegation Tokens. A Delegation Token is used by a Name Node to authenticate and authorize processes of a user to access HDFS (e.g., map process, reduce process, Application Master, Job Client). Currently a Delegation Token can delegate for a user, but it can not distinguish different parts of a user’s data in HDFS.
By intercepting one Delegation Token, an attacker can impersonate the user and access the user’s entire data set.
In this paper, we describe a SEHadoop runtime model, SEHadoop Block Token and SEHadoop Delegation Token. All are designed to fix these two vulnerabilities, and enhance Hadoop security when it runs in a Cloud.
A. These are the Existing Security Mechanisms
1) Initial Authentication: When Hadoop starts to run, it uses Kerberos for initial authentications as shown in Figure 3. In the YARN framework, Node Managers are authenticated by a Resource Manager through Kerberos. During initialization stage in HDFS, a Name Node uses Kerberos to authenticate each Data Node.
They shom 2 attacks: YARN attack and a HDFS attack. In the HDFS attack, the attacker got the credentials that allows to access all data nodes. In Hadoop SE, the attacker got the credentials, but it can only attack one data node.
In the YARN attack, the malicious node manager used map tasks to read and download the entire content of files. In Hadoop SE, the tokens define a data range to access in a file, and other accesses are forbidden.
In a second YARN attack, it is possible to get the metadata of a file that can be used for later attacks. In Hadoop SE, the attacker don't get the metadata.
The SASL/MD5 is used to the authentication between components: Then, the process contacts the targeted Data Node and uses Simple Authentication and Security Layer (SASL/DIGEST-MD5) to authenticate itself. The authenticator should be the same as the process already has if the token is valid. In SASL/DIGEST-MD5 protocol, the process and the Data Node should be able to authenticate each other by using their Block Token Authenticator as the shared secret key and DIGEST-MD5. The SASL/DIGEST-MD5 protocol is used for authentication, and a Delegation Token ID is used as an Authorization ID, and a Delegation Token Authenticator is used as an Authentication Credential. The meta-data and SEHadoop Block Tokens are sent to the HDFS Client through JAVA SASL protocol.
*# COMMENTS:
Evaluation: In general, the article is ACCEPTED. It describes and exploits the vulnerabilities in the security of Hadoop MRv2 and do a contribution to enhance the security.
Summary: This is an article that exploits a vulnerability in the security in Hadoop MRv2. The vulnerabilities are overloaded authentication key, and the lack of fine-grained access control at the data access level: These vulnerabilities allows an attacker access and modify files in the HDFS. Overload of authentication key occurs when all components of an Hadoop Distributed File System (HDFS), i.e., a Name Node and Data Nodes, share the same symmetric authentication key. If any of the nodes is compromised, potentially the whole HDFS is compromised. Fine-grained access control lacks can be traced to lacks in Delegation Tokens. A Delegation Token is used by a Name Node to authenticate and authorize processes of a user to access HDFS (e.g., map process, reduce process, Application Master, Job Client). Currently a Delegation Token can delegate for a user, but it can not distinguish different parts of a user’s data in HDFS. By intercepting one Delegation Token, an attacker can impersonate the user and access the user’s entire data set.
They propose and develop a security enhancement for Cloud-based Hadoop called SEHadoop. The enhancement is focused only in forcing the maps and reduce tasks get the correct portion of data in the Datanode, and that Datanodes are protected to intrusion attacks made by fake Namenodes. The security is made by the use of tokens. The Namenode uses different tokens for each Datanode, and the map tasks use different token to access only parts of a file. To use the SEHadoop, it is just necessary to migrate Hadoop jobs to SEHadoop by customizing the splitter to ensure the splitter sets correct boundary for each FileSplit. The experiments demonstrate the performance of: 1. In the first case, they examined how much overhead SEHadoop incurs when a Client uses SEHadoop Block Token to read a file. 2. In the second case, they looked at how much SEHadoop’s overhead when a job uses SEHadoop Delegation Token.
Positive things: This papers describes well the problem and how they solve it. It is a well-defined contribution to improve the security to access the data in the Datanodes. I understand what they wrote, but I still have some questions that are made in the critics section.
Assumptions: The authors assumed that all MRv2 components run in a secured zone that can prevent any attack, except for the NodeManager and the containers where they run the map and reduce tasks, and the Datanodes.
Critics:
In section 3,
A.: It would be easy to understand if the BTID, BTA, BlockToten, DTID, DTA and DelegationToken were represented in the figure 3. The legends over the arrow in the figure 3 do not help understanding the authentication.
In the Figure 4: Kerberos is not depicted as a protected component, but it is referred in the text that runs in a secured zone. Kerberos shouldn't be depicted as a secured component? Why some Node Manager components are inside a secure zone, and the other one is not? If we look to the implementation, they are the same component.
In section 4.A I don't understand the sentence: "Some processes in Hadoop, such as Kerberos, Name Node, Resource Manager, Job Clients, Application Masters, and Node Managers manage Application Masters or contain critical information. Fortunately the number of these processes is usually small." > Application Masters manage Application Masters?
In section 4.B SEHadoop Block Token uses AES and SHA (I assume SHA-1). Why use both hash algorithms?
In section 4.C The explanation of the acronyms in the middle of the text is a little confusing. Maybe if the acronyms are in a table will make it easy to read this part of the text.
In the sentence, "These Child Delegation Tokens will be passed to a Resource Manager in a job submission and transferred to each map or reduce process., the child delegation token isn't just for the map tasks? In the text it is explained these tokens just to the map tasks, also there are no tests for these tokens to the reduce tasks.
The work uses HMAC-SHA1 for a message authentication to generate a PDTA and a ChildDTA, that is used to a map task get the permission from the Namenode to access a Datanode. But is there confidentiality during the exchange of the tokens between the map and the Namenode? Shouldn't there be confidentiality?
Also, it seems that NEHadoop do not guarantee the property of non-repudiation. In this case, maybe an attacker can eavesdrop the Parent Delegation Token that the Namenode sends to the job client, and compromise the system. Is this possible?
In Figure 5, I mixed up the numbers around circles, with the numbers around ( ). Maybe the numbers in the figures should be in the form - step 1, step 2, etc.... This is a suggestion.
Evaluation: The metrics that they show are very summarized, only showing the performance result and the standard deviation of the results. They show block token and delegation token performance results with the standard deviation. The performance is only for read operations. No more tests were made. I am not sure if it is need more tests????
The reason that test results are acceptable is because they were made in a single blade, and not in a real cloud with several hosts. So, we don't have the results in a real world. I don't know if this is important or not???
Questions? The communication between the components in the SEHadoop still use the SASL to prevent eavesdropping? No. SASL is only used to to sent the metadata to the clients. If an attacker wants to read data with command like distcp, and cat, the SEHadoop protect it? That answer is not answered, but maybe this is outside of the scope of the work.

How to write literature review - david mendonca

learn about something to prepare future work test broader theories *# beginning article intro: motivation, identify contributions address contribution (what`s new), impact (what?), logic and thoroughness address limitations add "what's next?" *# identify relevant literature *# other matters: use the present tense. the conclusion is an exercise of logical reasoning find gaps in literature is one way to formulate propositions *# How write a good paper? title > abstract > conclusion > images > the rest In abstract there should be the results. you need to see the method of the study in the abstract citations are important to contribute to the impact of the work. *# summary literature review represents original research use a literature review to improve your work classify papers based on the impact and citations *# Questions? Is there already a framework? *# TODO: create a framework that organize the titles of the PDFS by title based on keywords.

ExoGENI

ExoGENI is a GENI testbed used for experimentation and computational tasks. ExoGENI orchestrates a federation of independent cloud sites located across the US and circuit providers, and links them to other GENI tools and resources. ExoGENI uses open cloud computing (OpenStack) and dynamic circuit fabrics. *# Orca Flukes To set up the nodes, ExoGENI uses Flukes. Flukes is a Java-based GUI that allows an experimenter to graphically inspect the state of ORCA substrates, create request topologies, submit requests to ORCA and inspect the returned substrate information (called 'manifest'). Several key features of Flukes: - Graphical intuitive interface Uses native ORCA interfaces and resource descriptions (NDL-OWL) for maximum flexibility, unlike other interfaces to ORCA which use GENI RSpecs and GENI AM API and rely on conversions between these resource formats. - Allows the user to submit requests and inspect the output manifest in a graphical environment. - Allows the user to login to provisioned resources Fluxes is a Java Web in http://geni-images.renci.org/webstart/flukes.jnlp. We can also get the app by: $ curl http://geni-images.renci.org/webstart/flukes.jnlp > ~/Downloads/flukes.jnlp $ javaws ~/Downloads/flukes.jnlp More info about flukes in https://geni-orca.renci.org/trac/wiki/flukes

two party computation for forge and lose techniques

roadmap - background, contribution, research directions goals random keys to cipher. How the decipher happens? can you explaind the forge and lose technique?

Using GENI for experimental evaluation of software defined networking is smart grids

Definition: A smart grid is a modernized electrical grid that uses analogue or digital information and communications technology to gather and act on information, such as information about the behaviors of suppliers and consumers, in an automated fashion to improve the efficiency, reliability, economics, and sustainability of the production and distribution of electricity.
OpenFlow is a communications protocol that gives access to the forwarding plane of a network switch or router over the network. In routing, the forwarding plane, sometimes called the data plane, defines the part of the router architecture that decides what to do with packets arriving on an inbound interface.
Contribution: This project is a first-cut exploration into the current capabilities of hardware that supports the OpenFlow technology for Smart Grid operations. Openflow controller that implements an automatic fail-over mechanism, MPLS-like load balancing traffic engineering scheme, and a QoS queuing mechanism. A Demand Response (DR) smart grid application that transmits traffic created by cyberphysical systems.
Example of use:
More specifically, customers provide consent to utility companies seeking to regulate the on/off period of electric appliances to reduce the load during peak periods of demand. In exchange, customers receive fringe benefits such as a lower rate for electricity. What has this achieved? During the peak periods, usually between the hours of 5pm to 7pm, residents return home and air condition units, washers, dryers, and stoves are turned on. It is during this critical period of increasing load that utility companies have to 1.) do nothing and risk cascading failures, 2.) “fire-up” backup generators, which could cost thousands of dollars and will be turned off at the end of the 2 hour period, or 3.) seek alternative means to reduce the peak demand. Therefore, demand response is a compromise that reduces the demand and results in financial rewards for both the utility and the customer. However, demand response is as efficient and reliable as the supporting network infrastructure.
Section 5 show some experiments in the GENI
When testing the throughput in experiment 2, they used UDP traffic to demonstrate the affects of congestion. In a first step, they attempt to transmit 100Mbps of UDP traffic through the network, but immediatly several switches disconnected from the controller. One possible explanation is the overflow of buffers in the switches as the controller is bombarded with packets much quicker than it can install flows in the switches. As one possible solution, they initially streamed UDP traffic at a low rate to allow flows to be installed in the switches, then they initiated the transmission of a 100Mbps stream. Everything was ok, until the switches disconnect again. They are solving this issue.

Disk-Locality in Datacenter Computing Considered Irrelevant

Contribution: This paper takes the position that disk-locality is going to be irrelevant in cluster computing, and considers the implications this will have on datacenter computing research. Take the myth out that remote computation can be slower than local computation. Background: Data is increasing steeply in the clouds, and it is necessary to compress it. Analysis of logs from Facebook show that disk-locality over compressed data results in little, if any, improvement of task lengths. The compressed data maakes disk bandwidth faster. Replacing disks by memory is not feasible: use memory cache to improve job performance. Analysis of Hadoop logs from Facebook show that a large fraction of the data is only accessed once, calling for techniques to prefetch data. So, memory cache is not useful. As observed previously, reads from local disk are comparable to reads from local network. Networks are getting faster, and Figure 1 confirms this by showing that the bandwidth difference is about 8%. Positive: have a good description that corroborate the contribution. Negative: We don't know the details of the tests, or we do not have a description of network where the tests where made. They just present the results. Future: Question all the work that is generally accepted. It is necessary more details of the testbed.

Towards a Fault-Resilient Cloud Management Stack

Contribution: This paper described the first attempt to address the fault-resilience issues related to the emerging cloud management stacks. With a preliminary fault-injection we studied the fault-resilience of OpenStack, provided in-depth discussions on six categories of fault-resilience issues, and proposed suggestions on how to strengthen this software layer. Background: Cloud management stacks have become a new important layer in cloud computing infrastructure, simplifying the configuration and management of cloud computing environments. As the resource manager and controller of an entire cloud, a cloud management stack has significant impact on the fault-resilience of a cloud platform. Positive; Show a description of the 6 types of faults they have found. Negative: it is not a well-done doc, but i have nothing to point out. Future work: when using cloud stack, be careful with the faults. faults in clouds are very dangerous.

Distributed Abstraction Algorithm for Online Predicate Detection

Contribution: The authors presented a distributed online algorithm for performing computation slicing, a technique to abstract the computation with respect to a regular predicate. Background: Global predicate detection for runtime verification is an important technique for detecting violations of invariants for debugging and fault-tolerance in distributed systems. It is a challenging task on a large system with a large number of processes due to the combinatorial explosion of the state space. Computation slicing is an abstraction technique for efficiently finding all global states, of a distributed computation, that satisfy a given global predicate, without explicitly enu- merating all such global state. Conjunctive Predicates: Global predicates which are conjunctions of local predicates. Monotonic Channel Predicates: Some examples are: all messages have been delivered (or all channels are empty), at least k messages have been sent/received, there are at most k messages in transit between two processes, the leader has sent all “prepare to commit” messages, etc. Slicing an algorithm, makes them possible to run in a distributed system, and much more faster. Positive: Very detailed version. Negative: Nothing to point out. Future work: Not my area, but having good distributed algs in Hadoop, makes it run faster. `# RBFT: Redundant Byzantine Fault Tolerance Background: They have shown that a malicious primary replica can drastically degrade performance.
Contribution: To tackle the robustness problem, they have proposed a new approach: RBFT, for Redundant Byzantine Fault Tolerance. The key idea in RBFT is to run several instances of a BFT protocol in parallel, and to monitor their performance in order to detect a malicious primary.

The eGenVar data management system—cataloguing and sharing sensitive data and metadata for the life sciences

This paper describes a method and a software suite for sharing information without compromising privacy or security. This system constructs a metadata catalogue, which could be used to locate data while the original files remain in a secure location. Specifically, our system, called the eGenVar data-management system (EGDMS), allows users to report, track, and share information on data content, provenance and lineage of files. The system is designed to bridge the gap between current LIMS and workflow systems and to keep provenance for data being processed through disparate systems at different locations.

State Machine Replication for the Masses with BFT-SMART

The main contribution of this paper is to fill a gap in the BFT literature by documenting the implementation of this kind of system, including protocols for state transfer and reconfiguration. This texts shows an analyse of the paper and explores the performance, comparing it with PBFT and Upright.

Spanner: Google’s Globally-Distributed Database

Small Byzantine Quorum Systems Masking quorum systems guarantees data availability in the presence of

arbitrary faults. They also introduce a special class of quorum systems, dissemination quorum systems, which can be used by services that support self-verifying data, i.e. data that cannot be undetectably altered by a faulty server, such as data that have been digitally signed. Masking quorum systems includes 4f+1 dissemination quorum systems includes 3f+1
In this paper, they present two new quorum systems, one for generic data and the other for self-verifying data, that need only 3f+1 and 2f+1 servers (a-masking, a-dissemination). they are asymmetric with respect to the operations they support: reads and writes use quorum of different sizes. Read require a quorum, writing do not need be acknowledge by a write quorum.
A reliable communication abstraction guarantees that every value written by a correct client will eventually be stored by every correct server in the write quorum, and the writer itself has no use for the knowledge that the write completed.
We call read and write protocols that exploit this insight Small Byzantine Quorum (SBQ) protocols.
Reducing the number of servers is particularly important where Byzantine protocols protect against security breaches of servers.
Reliable asynchronous communication is a common model for Byzantine quorum algorithms, and our protocol aggressively exploits that model’s properties to improve efficiency. When the communication is unreliable the crash failures can be a problem because it is not possible to distinguish between slow server or crashed server.
In reliable and synchronous communication, read and write protocols need 2f+1 servers for generic data and f+1 for self-verifying data. These protocols are vulnerable to slow reads (this can be considered a fault).
this paper shows a new class of synchronous protocols: S-SBQ protocols can be tuned with respect to two parameters: f, the maximum number of faulty servers for which the protocol is safe and live, and t (t<=f), the maximum number of faulty servers for which the protocol is free from slow reads. when t=0, S-SBQ uses the same number of servers as the synchronous protocols. When t=f, S-SBQ is identical to asynchronous protocols.
For generic data, 2f+1 servers are needed for synchronous reliable network systems where timeouts are short, 2f+1 to 3f+1 for synchronous reliable network systems where where timeouts are long. 3f+1 for asynchronous reliable network systems 4f+1 for asynchronous unliable network systems
Self-verifying data allows systems to be built for each of these scenarios using f fewer servers.
Models of communication: Reliable synchronous Reliable asynchronous Authenticated Unreliable asynchronous
     d

Sedic: Privacy-Aware Data Intensive Computing on Hybrid Clouds Sedic:

  Sedic is the first secure data-computing system that leverages
  the special features of MapReduce to automatically partition a computing job according to the
  security levels of the data it works on (schedule map tasks over a carefully placed data plan) and
  arrange the computation across an hybrid cloud in a way that private data is never executed in a public cloud.
  In one word, the data is processed in the first place in the public cloud before being sent to the private cloud for the final aggregation.

  Specifically, they modified MapReduce's distributed file system to strategically replicate data over
  a cloud and never compromise data privacy.
  Over this data placement, map tasks are carefully scheduled to outsource as much workload to the public clouds, and to send only public or sanitized
  data blocks to the public cloud. 
  Sensitive data always stay on the private cloud. 
  The reduce part must be planned carefully, because it can be necessary to copy the public map output to the private cloud, and this is expensive.
  To reduce such inter-cloud data transfer as well as move part of the reduce computation to the public cloud, they developed
  a new technique that automatically analyzes and transforms reducers to make them suitable for running on the hybrid cloud.
  They use the combiner to pre-process public map output so as to compress the volume of the data to be delivered to the private cloud.
  Alternatively, they can plan the scheduling of map tasks to ensure the maximum amount of the map
  output will be already in the public clouds, or use combiners that take map output data from the
  public clouds and send the result to the private cluster.
  Split data between private and public clouds are possible because MR operations are foldable, i.e, I can split the data into private and 
  public input splits, and put them on appointed hosts.

  Contributions:
  ~ A new and user-transparent secure data-intensive computing framework.
  We have developed the first hybrid-cloud based secure data-intensive computing framework.
  Our framework ensures that sensitive user data will not be exposed to the public cloud without the
  user’s consent, while still letting the public cloud shoulder most of the computing workload when possible.
  ~ Automatic reducer analysis and transformation. We have built a new program analysis tool that automatically evaluates 
    and transforms the reduction structure of a computing job to optimize it for hybrid-cloud computing.
    The tool breaks down a reducer into components that can work on the public and private clouds respectively,
    which not only moves part of the reduce computation away from the private cloud but also helps control the amount 
    of the intermediate outcomes to be delivered back to the private cloud which could cause significant delay and bandwidth charges on today’s
    cloud model.

FlumeJava: easy, efficient data-parallel pipelines

This paper shows how Flume Java works, detailing the architecture and commands. This is more a technical paper than a paper that approach scientific topics, and it could be better written. Flumejava is a java lib to develop data-parallel pipelines to chain together separate MR stages, where it is possible to build direct acyclic graphs and every stage of the graph is a MR job. This lib implements commands to aggregate multiple file shards (chunks of data files) into a single logical object, compute MR stages, and manipulate data. The lib uses deferred evaluation to execute parallel operations in forward topological order and evaluates the results. The lib implements an optimizer that transforms a user execution plan (acyclic graph) to a faster plan, using transformations over the graph to execute it efficiently. An example of the commands are Paralleldo and MSCR. The evaluation section compares performance results of jobs that ran in FlumeJava and in Hadoop MR. In general, FlumeJava gives better results (faster performance) than MR jobs. The biggest reason for this to happen is due to the optimizer that can reduce the number of operations over a job. In the related work section, the authors compare FlumaJava with Cascading. Cascading seems a better solution than flumejava, maybe because it is a more mature product, but the paper does not make a fair comparison.

On the Feasibility of Byzantine Fault-Tolerant MapReduce in Clouds-of-Clouds

THIS IS OUR PAPER Nós estamos a tentar usar vários runtimes do mapreduce instalados em cada cloud. Nós dizemos dizemos que executa-se f+1 tarefas em t+1 cloud. Dito de outra forma, lança-se t+1 jobs e cada job executa f+1 tarefas. Depois dizemos que os jobs lançados em t clouds colaboram com a execução do job, mas não executam tarefas caso que não haja faltas. Se houver faltas, o job das t clouds executam as tarefas que falharam.
Detail of implementation: a) Distributed job tracker: In the original MapReduce, there is a single job tracker that controls the execution of tasks in the available task trackers, whereas in our BFT MapReduce there is one job tracker per cloud that controls the execution of tasks in that cloud, but not in the other clouds (namely, heartbeats are exchanged only inside clouds, not between clouds). Reduce tasks obtain information about the map tasks that finished processing, from the job tracker. In the BFT version, each job tracker periodically sends the other tracker replicas, information about finished map tasks. Therefore, reduce tasks can obtain that information from their local job tracker. Faulty job trackers or faulty clouds can stop collaborating in the execution of the job or return wrong information about the status of the task replicas they execute. Nevertheless, the existence of redundancy and the voting scheme allows job execution to progress and finish, in the presence of up to t faulty clouds. b) Deferred execution: This mechanism consists in executing only f +1 replicas of every task in t+1 clouds. In fact, arbitrary faults tend to be rare so there is no need to execute 2f + 1 replicas of every task. The job trackers in the other t clouds still collaborate in the execution of the job but run no tasks. We call these t clouds standby clouds because they do not perform task computations in the absence of faults. Typically there will be f + 1 matching outputs for every task so this degree of replication is enough. If there are less than f + 1 matching outputs, more replicas of the same task (map or reduce) are executed until a match is found. First the standby clouds are used to run these extra tasks, but if more are needed then replicas start to be executed in all clouds until a match is found. c) Digest communication: In MapReduce, each reduce task fetches the output of each map task. In our BFT MapReduce tasks are replicated f + 1 times (with deferred execution and no faults), therefore the number of fetches is multiplied by (f + 1) 2 , most of them done between different clouds. The digest communication mechanism plays the extremely important role of reducing the overhead of this communication. The mechanism consists in dividing fetches in two cases: • intra-cloud fetch: a fetch of the output of a map task by a reduce task of the same cloud is done normally, i.e., the output is moved from the map to the reduce task; • inter-cloud fetch: if the map and reduce tasks are in different clouds, only a cryptographic hash of the output is moved (e.g., an SHA-1 hash). This mechanism replaces the transmission of a possibly large output, by the transmission of a small, fixed-size, hash (e.g., 20 bytes in the case of SHA-1), thus drastically reducing the communication through the internet. Instead of comparing the outputs of the map replicas, a reduce task calculates the hash of the local outputs and compares all the hashes. Again, since arbitrary faults are rare, the values obtained from local replicas are usually correct, making this a viable mechanism.

Countering GATTACA: Efficient and Secure Testing of Fully-Sequenced Human Genomes

This paper explains the importance of genomic privacy by focusing on three important applications: Paternity Tests, Personalized Medicine, and Genetic Compatibility Tests. They propose a set of efficient techniques based on private set operations. The main security and privacy challenge is how to support such queries with low storage costs and reasonably short query times, while satisfying privacy and security requirements associated with a given type of transaction. Biology background: - Genomes represent the entirety of an organism’s hereditary information. They are encoded either in DNA or, for many types of viruses, in RNA. - Restriction Fragment Length Polymorphisms (RFLPs) refers to a difference between samples of homologous DNA molecules that come from differing locations of restriction enzyme sites, and to a related laboratory technique by which these segments can be illustrated. In RFLP analysis, a DNA sample is broken into pieces (digested) by restriction enzymes and the resulting restriction fragments are separated according to their lengths by gel electrophoresis. RFLP provides information about the length of DNA subsequences occurring between known subsequences recognized by particular enzymes. - Single Nucleotide Polymorphisms (SNPs) are the most common form of DNA variation occurring when a single nucleotide (A, C, G, or T) differs between members of the same species or paired chromosomes of an individual. SNP variations are often associated with how individuals develop diseases and respond to pathogens, chemicals, drugs, vaccines, and other agents. Thus SNPs are key enablers in realizing personalized medicine. - Short Tandem Repeats (STRs) occur when a pattern of two or more nucleotides are repeated and repeated sequences are directly adjacent to each other. STRs are often used to differentiate between individuals. Crypto background: - Private Set Intersection (PSI)

Parallel Data Processing with MapReduce: A Survey (2011)

The goal of this survey is to provide a timely remark on the status of MapReduce studies and related work focusing on the current research aimed at improving and enhancing the MapReduce framework. Despite all MR description, I just note this. With the runtime scheduling, MapReduce achieves fault tolerance by detecting failures and reassigning tasks of failed nodes to other healthy nodes in the cluster. DBMS are not suitable for solving extremely large scale data processing tasks, and there is a demand to for special-purpose data processing tools tailored for big data. MR is a new way to do it, but it is no a DBMS. Some critics are raised. - Hadoop is 2~50 times slower than parallel DBMS. - current Hadoop system is scalable, but achieves very low efficiency per node, less than 5MB/s processing rates, repeating a mistake that previous studies on high-performance systems often made by “focusing on scalability but missing efficiency”. - Although Hadoop won the 1st position in GraySort benchmark test for 100 TB sorting(1 trillion 100-byte records) in 2009, its winning was achieved with over 3,800 nodes. MapReduce or Hadoop would not be a cheap solution if the cost for constructing and maintaining a cluster of that size was considered. Advantages: Simple and easy to use Flexible MapReduce does not have any dependency on data model and schema. Independent of the storage MapReduce is basically independent from underlying storage layers. Fault tolerance MapReduce is highly fault-tolerant. High scalability The best advantage of using MapReduce is high scalability. Pitfalls: - No high-level language MapReduce itself does not support any high-level language like SQL in DBMS and any query optimization technique. - No schema and no index MapReduce is schema-free and index-free. An MR job can work right after its input is loaded into its storage. However, this impromptu processing throws away the benefits of data modeling. - A Single fixed dataflow MapReduce provides the ease of use with a simple abstraction, but in a fixed dataflow. Therefore, many complex algorithms are hard to implement with Map and Reduce only in an MR job. - Low efficiency With fault-tolerance and scalability as its primary goals, MapReduce operations are not always optimized for I/O efficiency. In addition, Map and Reduce are blocking operations. A transition to the next stage cannot be made until all the tasks of the current stage are finished. - Very young MapReduce has been popularized by Google since 2004, thus, codes are not mature yet and third-party tools available are still relatively few. There are other solutions better than MR, and I list them here. High-level language - Microsoft SCOPE, Apache Pig, and Apache Hive all aim at supporting declarative query languages for the MapReduce framework. The declarative query languages allow query independence from program logics, reuse of the queries and automatic query optimization features like SQL does for DBMS. Flexible data flow - There are many algorithms which are hard to directly map into Map and Reduce functions. For example, some algorithms require global state information during their processing. Loop is a typical example that requires the state information for execution and termination. However, MapReduce does not treat state information during execution. Thus, MapReduce reads the same data iteratively and materializes intermediate results in local disks in each iteration, requiring lots of I/Os and unnecessary computations. HaLoop, Twister, and Pregel are examples of systems that support loop programs in MapReduce. HaLoop and Twister avoid reading unnecessary data repeatedly by identifying and keeping invariant data during iterations. Similarly, Lin et al propose an in-mapper combining technique that preserves mapped outputs in a memory buffer across multiple map calls, and emits aggregated outputs at the last iteration. Map-Reduce-Merge addresses the support of the relational operators by simply adding a third merge stage after reduce stage. The merge stage combines two reduced outputs from two different MR jobs into one. - Clustera, Dryad and Nephele/PACT allow more flexible dataflow than MapReduce does. Blocking operators: - Map and Reduce functions are blocking operations in that all tasks should be completed to move forward to the next stage or job. The reason is that MapReduce relies on external merge sort for grouping intermediate results. Logothetis et al address this problem for the first time when they build MapReduce abstraction onto their distributed stream engine for ad-hoc data processing. There is also the MapReduce Online that support online aggregation and continuous queries in MapReduce. In this solution, Mappers push their data temporarily stored in local storage to Reducers periodically in the same MR job. - Li et al and Jiang et al have found that the merge sort in MapReduce is I/O intensive and dominantly affects the performance of MapReduce. This leads to the use of hash tables for better performance and also incremental processing. IO optimization - There are also approaches to reducing I/O cost in MapReduce by using index structures, column-oriented storage, or data compression. Hadoop++ provides an index-structured file format to improve the I/O cost of Hadoop. However, as it needs to build an index for each file partition at data loading stage, loading time is significantly increased. If the input data are processed just once, the additional cost given by building index may not be justified. HadoopDB also benefits from DB indexes by leveraging DBMS as a storage in each node. Google’s BigTable proposes the concept of column family that groups one or more columns as a basic working unit. Record Columnar File(RCFile), developed by Facebook and adopted by Hive and Pig, is a column-oriented file format on HDFS. Scheduling - MapReduce uses a block-level runtime scheduling with a speculative execution. A separate Map task is created to process a single data block. A node which finishes its task early gets more tasks. Tasks on a straggler node are redundantly executed on other idle nodes. Hadoop scheduler implements the speculative task scheduling with a simple heuristic method which compares the progress of each task to the average progress. This heuristic method is not well suited in a heterogeneous environment where each node has different computing power. Join Join is a popular operator that is not so well dealt with by Map and Reduce functions. We roughly classify join methods within MapReduce into two groups: Map-side join and Reduce-side join. This article shows better ways to do join, Okcan et al propose how to efficiently perform θ-join with a single MR job only. Their algorithm uses a Reducer-centered cost model that calculates the total cost of Cartesian product of mapped output. With the cost model, they assigns mapped output to reducers that minimizes job completion time. The support of Semi-join is proposed in [57]. Vernica et al propose how to efficiently parallelize set-similarity joins with Mapreduce [56]. They utilize prefix filtering to filter out non-candidates before actual comparison. It requires to extract common pre- fixes sorted in a global order of frequency from tuples, each of which consists of a set of items.

Asynchronous Byzantine Consensus with 2f+1 Processes

Contribution: This paper contributes to a better understanding of the problem of consensus with only 2f +1 processes. To reach this objective, the paper presents a methodology to transform asynchronous consensus algorithms that tolerate crash faults and require 2f + 1 processes, into similar algorithms that tolerate Byzantine faults also with 2f + 1 processes.The wormholes makes a system hybrid and protects the system of becoming faulty or corrupted. Using a TTCB (wormhole) in a SMR, it defines a order on client's requests with 2f+1 servers, and defines an order of the requests when f+1 servers have the client's request. The Coan, and Neiger and Touan work present two transformations for synchronous systems: one from crash to omission faults and another from omission faults to Byzantine faults, but it considers only the case of approximate agreement and does not provide Byzantine fault-tolerant algorithms for 2f+1 processes. Def. of reliable broadcast: - RB1 Validity. If a correct process broadcasts a message m, then some correct process eventually delivers m. - RB2 Agreement. If a correct process delivers a message m, then all correct processes eventually deliver m. - RB3 Integrity. For any identifier id and sender p, every correct process q delivers at most one message m with identifier id from sender p, and if p is correct then m was previously broadcast by p. Def. of Byzantine asynch multi-valued consensus: - MVC1 Validity. If a correct process decides v, then v was proposed by some process. - MVC2 Agreement. No two correct processes decide differently. - MVC3 Termination. Every correct process eventually decides. The paper presents a methodology to transform crash consensus algs into Byzantine consensus. The authors modified Mostefaoui and Raynal’s Algorithm by using: Communication channels: communication channels are substituted by authenticated reliable channels. These channels constrain the power of the adversary in the network in the sense that these channels do not allow the creation, modification or dropping of messages. Broadcast communication: broadcasts are substituted by reliable broadcasts. This mechanism constrains the power of the adversary by preventing it from delivering different messages with the same identifiers to different processes. This requires the reliable broadcast algorithm defined above. Message validation: message receptions are enhanced with message validations, i.e., when a message is received it is only considered if it is valid as defined above. The objective is to force the adversary to conform to the algorithm. Reception quorum: receptions of messages from quorums of n − f processes are substituted by: reception of messages from at least n−f processes plus the FD suspicion of all other processes (line 9 in Algorithm 2). Here, the authors showed that it is possible to solve asynchronous Byzantine consensus problems with 2f +1 processes, using a reliable broadcast algorithm that needs 2f +1 (or less) processes and an eventually perfect muteness FD (failure detector). Next, it is introduced a novel flavor of consensus (endorsement consensus) that can be solved with 2f+1 processes, and shows how it can be used to solve atomic broadcast. The idea behind endorsement consensus is to consider that a correct process can have a notion about which values are adequate decisions for the consensus. - EC1 Validity. If a correct process decides v, then v was endorsed by some correct process. The problem assumes that the endorsement sets satisfy the following properties: Initial endorsement. Increasing endorsement. Eventual endorsement. Byzantine atomic broadcast can be defined similarly to reliable broadcast (properties RB1-3 above) plus an additional order property: - AB4 Total order: If two correct processes deliver two messages M1 and M2 then both processes deliver the two messages in the same order. The transformation provided in [17] is for crash faults. The idea is the following. When the algorithm is requested to do atomic broadcast of a message, it does reliable broadcast of that message. When a process receives such a message, it inserts it in the R delivered set. When there are messages in that set that have still not been ordered, each process proposes the set of those messages to a consensus. Consensuses are done in an ordered fashion and messages within a consensus can be trivially ordered (e.g., in lexicographical order), so this provides a total order of messages.

SOAP Handlers for Detecting Byzantine Faults

This paper shows a way to use SOAP handlers for intercepting client requests to detect arbitrary faults.
Pros: - This is a technical paper that uses SOAP handlers to detect the presence of suspicious code, and to prevent the service from behaving abnormally. Cons: - In overral, it misses lots of detailed information about the solution and about the evaluation, that doesn't show that this work deserves to be on the paper. They just present a magic solution, but they don't detail it. - They talk to use replication as a way to tolerate BFT. They replicate messages and nodes. "Replicas are created for all the nodes and messages are transmitted among themselves. All the broadcast messages are processed and the faulty process is identified." But, how many replicas they use for the messages and for the system? Did they use quorum to check if they got the same response to the client? What they really replicated about the system? Where are the replicated components located? - They talk in using handlers to detect the presence of suspicious code, but they never said how they know what is suspicious? How the system knows that it was transmitted a malicious request? - The evaluation section is very short. They just present the throughput and the average RTT values that they got from the tests, but I don't have nothing to compare the values. How many times is this solution slower than running the application without fault tolerance? - I also don't see tests with faults. If they say that the app reacts against malevolent requests, where are the tests that show this? How the system reacted to these requests?

Fault-tolerance - challenges, techniques, and implementation in cloud computing

This paper discusses the existing fault tolerance techniques in cloud computing based on their policies, tools used and research challenges. + FT techniques based on policies First, it describe solutions for reactive and proactive ft, enumerating techinques that are based on policies Check pointing/restart, replay and retry. Here are the reactive techniques - Check pointing/restart, replication of various tasks, job migration (during a failure of a task, the job is migrated to another machine.), SGuard (less disruptive stream processing), retry (retry tasks on the same cloud resource), task resubmission, user defined exception handling, rescue workflow (it allows that the workflow continues in case of failure even if a task fail, until it becomes impossible to move forward). And the proactive techniques (predict faults, errors and failures and proactively replace the suspected components.) - Software rejuvenation (system with periodic reboots), self-healing (when multiple instances are running on multiple virtual machines, it handle failures), preemptive migration (feedback loop control mechanism where the app is monitorized) + Challenges implementing FT in clouds - Providing FT requires careful analysis because of the complexity, inter-dependability, and ensure high reliability and availability multiple clouds computing providers, and autonomic fault tolerance must react to synchronization among various clouds. + Comparison between various tools used for implementing fault tolerance techniques (R - reactive, PR - proactive) Fault Tolerance Techniques Policies System +-----------------------------------+----------+----------------+ Self Healing, Job Migration R/PR HAProxy Check pointing R SHelp Check pointing, Retry, Self Healing R/PR Assure Job Migration, replication, Sguard, Resc R/PR Hadoop Replication, Sguard, task, resubmis- sion R/PR Amazon EC2 + Propose cloud virtualized architecture This architecture just uses HAProxy to monitor the availability of the servers, so that, when a server goes down, the connection is redirected. There are 3 VMS running, Server 1 and server 2 run the app, and server 3 has the HAProxy running. This is summary of the FT techniques available, that does not develop any one of them.

Byzantine quorum systems

In this paper, they present a quorum system that can be used to building other protocols in addition to shared read/write register emulation. + In the writes: to a client c write the value v, it queries servers to obtain a set of timestamps A = {}u∈Q for some quorum Q; it chooses a timestamp greater than the highest timestamp value in A and greater than any timestamp it has chosen in the past; and sends the update to servers until it has received an acknowledgement for this update from every server in some quorum Q. + In the reads: it is also necessary a quorum. the client queries servers to look for a quorum of value/timestamps, and from that get the final result. Masking is possible if - a quorum system is a masking quorum for a fail-prone system when satisfies M-consistency and M-availability properties. - M-Consistency guarantees that every pair of quorums intersect in at least 2f+1 elements, and thus in f+1 correct ones. - M-Availability guarantees that the intersection of the failed nodes with the quorum is empty. Dissemination quorum system: is a service to support the use of digital signatures to verify data, which has weaker requirements than masking quorums, but ensures that in apps self-verifying writes will be propagated to all subsequent read operations despite the arbitrary failure of some servers. To achieve this, it is necessary the intersection of 2 quorums to not be contained in any set of potential faulty servers. Opaque masking quorum: sometimes is necessary to not dissiminate the faulty scenario to which is vulnerable, especially in an absence of client collusion.

Map-Reduce-Merge: Simplified Relational Data Processing on Large Clustersza

This paper presents a solution that create a new MR phase that is executed after the reduce tasks that merge the reduce output. MR can not handle DB that are not homogeneous, and nowadays the big data is composed by heterogenous data that needs to be combined. E.g., the travel sites contains DBs of the hotel and of the flights, and sometimes it is necessary to combine this data. They have created a new MR phase called merge that combine the 2 results from the reduce output. The merge phase reads from the 2 reduce outputs that cover the same key, and produce a new output. Just to recall, the MR takes the following key-value pairs: map: (k1, v1) -> [(k2, v2)] reduce: (k2, [v2]) -> [v3]
The MR-merge take the following pairs: map: (k1, v1) -> [(k2, v2)] reduce: (k2, [v2]) -> (k2, [v3]) merge: ((k2, [v3]), (k3, [v4])) -> [(k4, v5)]
The input data that is going to be used in the merge phase, must have a common ID where the 2 data can be correlated. Using MR-merge in some way can reduce the number of steps to reach the same solution. In the paper, they showed an example that reduce from 13 to 6. Basically, in the example they have sequenced several steps of MR-merge jobs that run to produce the output. Opinion: This paper tries to deal with the same problem as my work (hadoop proxy manager), but it does not take care of BFT. They do not event talk about dealing with faults. So I imagine, that they did not change the original behaviour of mapreduce in case of faults. In my solution, I can merge a n-number of reduce outputs, they can just merge 2.

A Survey of Fault-Tolerance and Fault-Recovery Techniques in Parallel Systems

This is a survey that shows briefly the fault-tolerance and fault-recovery techniques in large scale cluster computing systems. These techniques can be grouped into two categories: (a.) protection for the cluster management hardware and software infrastructure, (b.) and protection for the computation nodes and the long-running applications that execute on them. (a.) Cluster management hardware and software fault-tolerance typically makes use of redundancy. When a component fails, the redundant components take over the responsibilities of the failed parts. Redundancy can also be used for fault detection by comparing the outputs produced by each replica and looking for discrepancies. (b.) Cluster applications are protected from faults using checkpointing and rollback recovery techniques. Each process cooperating in the application periodically records its state to a checkpoint file in reliable, stable storage. In the event of a process failure, the application state is restored from the most recent set of checkpoints. There are a variety of protocols that have been developed to determine when processes should record checkpoints and how to restore the application state.
Summary: First, they define 3 types of faults: 1. The Byzantine fault# model represents the most adversarial model of failure. This fault model allows failed nodes to continue interacting with the rest of the system. Behavior can be arbitrary and inconsistent, and failed nodes are allowed to collude in order to devise more malicious output. Correctly operating nodes cannot automatically detect that any nodes have failed, nor do they know which nodes in particular have failed if the existence of a failure is known. This model can represent random system failures as well as malicious attacks by a hackers. It has been proven that no guarantees can be made concerning correct operation of a system of 3f+1 nodes if more than m nodes are experiencing Byzantine failures. 2. *fail-stop model# represents the simplest model for tolerating faults. This model allows any node to fail at any time, but when the failure occurs it ceases producing output and interacting with the rest of the system. 3. *fail-stutter faults# it is a model that needs more study. It is a model that is a middle ground model between the 2 defined fault-models. It attempts to maintain the tractability (easiness) of that model while expanding the set of real-world faults that it includes. The fail-stutter model includes all provisions of the fail-stop model, but it also allows for performance faults. A performance fault is an even in which a component provides unexpectedly low performance, but to continues to function correctly with regard to its output. This extension allows the model to include faults such as poor latency performance of network switch when suddenly hit with a very high traffic load. There are ways for fault-tolerance in centralized components: ~Replication~ 1. Active replication: There is a second machine that receives a copy of all inputs to the primary node and independently generates an identical system state by running its own copy of all necessary software (it is a replicate machine that do the same execution). Additionally, the backup node monitors the primary node for incorrect behavior. In the event that unexpected behavior is observed (such as a system crash), the backup node promotes itself to primary status and takes over the critical functionality for the system. *Adv.# Since its system state is already identical to that of the primary, this changeover requires a negligible amount of time. *Disadv.# This type of replication is infeasible for compute nodes because doubling the number of these nodes would increase the cost of the system by nearly double without increasing the computational capacity of the system. A *variant# on active replication calls for multiple backup systems for the primary. Basically, it is using BFT. All replicas receive all input messages. When output is generated, all replicas compare their results using a Byzantine algorithm in order to vote on what the correct output should be. If one or more nodes generate incorrect output beyond a threshold number of times, they are marked as faulty and ignored until they can be repaired by maintenance procedures. This variant *is capable of handling Byzantine faults, while the previous implementation of active replication can only tolerate faults in the fail-stop model. 2. Passive replication: is using a machine as a backup system of primary. If the primary machine fails, the cold spare machine takes over control. We can use checkpointing for components that contains complex internal state. ~Reliable communication~ Active replication of system components is viable only if all replicas are guaranteed to receive exactly the same inputs. Although communication with centralized nodes can be implemented with some kind of multicast protocol, there is no guarantee that the multicast is perfectly consistent across all recipients. If this guarantee is not provided, replicas can end up out of sync if a transient network error results in a message being transmitted differently to different replicas. One multicast protocol# uses a token-based system. A virtual token is passed from node to node, and only the holder of the token is allowed to send messages. This prevents multiple simultaneous messages from being sent and interfering with message ordering. *Separates multicast atomicity from multicast reliability# Atomicity is implemented by using a sequencer node. The sequencer can specify ordering in one of two ways. (1.) All messages are sent to the sequencer, which then multicasts the message to all intended recipients. Since the sequencer acts as an intermediary in this scenario, it is able to impose an order on all messages. (2.) A sender node multicasts a message directly to all recipients, and also sends the message to the sequencer. The sequencer then multicasts a second message to all recipients of the first message indicating the order in which the first message should be received. ~Monitoring~ For a system to automatically activate a backup replica, it must detect that a primary component has failed. 1. *heartbeat# this is the most basic process that listens periodically messages from the monitored components. 2. *byzantine consensus# The replicas vote on the correct output or action based on the observed inputs. In the event of a disagreement, the minority is considered faulty. 3. *self-consistency check# A node can periodically run diagnostics in order to determine whether or not internal components are operating correctly. ~Fault-tolerance of parallel applications~ The most basic form of fault tolerance for parallel applications consists of checkpointing and rollback recovery. Checkpointing and rollback recovery techniques generally assume a fail-stop model of faults. To ensure application reliability, the need to preserve an application's state in order to preserve completed computation in the event of a system failure is extremely important. Rollback recovery techniques are a common form of this type of state preservation, and they have received a great deal of attention from the research community. A set of checkpoints is *consistent# if, for each message that a checkpoint registers as being received another checkpoint records having sent the message. There are three subcategories in the category of checkpoint-based recovery protocols: uncoordinated checkpointing, coordinated checkpointing, and communication-induced checkpointing. 1. *Uncoordinated checkpointing schemes, each process decides when to take its own checkpoints. 2. Coordinated checkpointing techniques# force processes to organize their checkpoints such that together they generate a single, consistent application checkpoint. 3. *Communication-induced, each process takes checkpoints locally, as in uncoordinated checkpointing. However, such protocols allow for processes to be forced to take a checkpoint in order generate a global checkpointed state that will not succumb to the domino effect. ~Log-back roll-based recovery~ Log-based rollback recovery protocols, or message logging protocols, supplement normal checkpointing with a record of messages sent by and received by each process. There are 3 techniques: 1. Pessimistic logging techniques, sometimes called synchronous logging, record the determinant of each event to stable storage before the event is allowed to affect the computation. 2.Optimistic logging techniques, or asynch logging, record logs to volatile storage, which is then periodically written to stable storage. 3. *Causal logging protocols# maintain the advantages of both optimistic and pessimistic logging, but at the expense have requiring much more complex recovery techniques.

MR Online - 2010

In this paper the show an extension of the MR programming model beyond batch processing, to reduce completion times and improve system utilization for batch jobs as well. They modified Hadoop MapReduce framework to support online aggregation. The Hadoop Online Prototype (HOP) supports continuous queries, which enable MapReduce programs to be written for applications such as event monitoring and stream processing. This means, that it supports stream processing. It also supports online aggregation that provides estimation of results much better than the current implementation. To show what they created, they first define an initial concept - naive pipelining.

naive pipelining:

Each map output is pushed to a reduce task. The reduce tasks store in-memory, or in a file the data and it only starts to execute when it got all the map data. This solution brings several problems: + it was necessary to set a large number of sockets between every mapper and every reducer. + reducers suffer from concurrency because lots of mappers are connected to the reducer. + In detail, the thread that leads with communication is the same as the one that runs the reduce() function. + If the reducers start immediatly, this prevents the use of map-side combiners. + Also the streaming prevents the use of sort function in the reduce side. For that, the solution is: + Just some tasks run. If a reduce task has not yet been scheduled, the data is saved into a file. + The reducer is configured to receive just a limited number of connections. + There is a thread for receiving the data, and another thread for executing the map() or reduce(). + The reducers's buffer wait to reach a threshold before start to execute. + For the sort, the reduce must do an full external sort that it takes some time. ( I don't know in which reduce phase they do the sort exactly)

Pipelining jobs

When it is necessary to pipeline jobs, they are set one behing another, but ther What I mean is, the second job can only start after the first one finish. But, the data can be sent directly from the job1 to the job2.

Evaluation

The tests that got 10GB with 5 reducers, the pipelining execution offer a more fluent execution that the blocking one. The performance improved 17% in streaming. The same happened when executing with 20 reducers. It worth to notice that most of the work is focused on the shuffle phase. The map execution is also extended because they keep running longer.
When running with one reduce, all the work must be handled by this single task, so in this case the pipelining execution took more time (17%) than the blocking execution.