Wednesday, 26 November 2014

Wait-free synchronization

Paper here

This is a seminal paper from 1991 that changed the field of concurrency. Wait-free synchronization represented at the time a qualitative break with the traditional locking-based techniques for implementing concurrent objects.

A wait-free implementation of a concurrent data object is one that guarantees that any process can complete any operation in a finite number of steps, regardless the execution speed of other processes.

First, lets define what is a concurrent object. A concurrent object is a data structure shared by concurrent processes. The traditional approach to implement such objects centers around the use of critical sections, where only one process at a time is allowed to operate on the object. At the time, the use of locks was the only way to guarantee safe access to objects when dealing with concurrency. Critical sections are poorly suited for asynchronous, fault-tolerant systems: if a faulty process is halted or delayed in a critical section, non-faulty processes will also be unable to progress. If one thread attempts to acquire a lock that is already held by another thread, the thread will block until the lock is free.

The wait-free condition provides fault-tolerance: no process can be prevented from completing an operation by undetected halting failures of other processes, or by arbitrary variations in their speed.

The paper enumerates a list of shared objects that can be used to solve the consensus. In the table below we have a column of consensus number that indicates the maximum number of processes for which the object can solve a simple consensus problem, and the object column contains the synchronization primitives and objects that are supported.

Here it is another definition: The consensus number for object X is the largest n for which X solves consensus among n processes. If no largest n exists, the consensus number is said to be infinite. Also, in a system of n or more concurrent processes, it is impossible to construct a wait-free implementation of an object with consensus number n from an object with lower consensus number. E.g., it is possible to have consensus between 2 concurrent processes with a queue or a stack object that support test&set, swap, or fetch&add operations.

Consensus number Object
1 r/w registers
2 test&set, swap, fetch&add, queue, stack
... ...
2n-2 n-register assignment
... ...
memory-to-memory move and swap, augmented queue,
compare&swap, fetch&cons, sticky byte

It is impossible to implement consensus with atomic read and write (r/w) registers. Therefore, it is impossible to build wait-free implementation of common data type such as sets, queues, stacks, priority queues, or lists, and it is impossible to have most, if not all, classical synchronization primitives with registers, such as test&set, compare&swap, and fetch&add. E.g., if we have two concurrent processes P and Q that try to write in register r, both processes must first check if the register is already written or not. Both processes can check the variable at the same time before writing. Thus, P and Q will write in the same register and the register will be bivalent - both processes think that the register have their value, but just one process got it right. This is what is expected: at the end of consensus, both concurrent processes must have the same value. Also, it is impossible to construct a wait-free implementation of any object with consensus number greater than 1 using atomic r/w registers.

The test&set, compare&swap, and fetch&add operations are only possible with read-modify-write registers, or any other object with consensus number equal or bigger than 2.

Impossibility results

One fundamental problem of wait-free synchronization can be phrased as follows:

Given two concurrent objects X and Y, does there exist a wait-free implementation of X by Y?

To see that an object X with consensus number m is not possible to construct a wait-free implementation of any object Y with a consensus number n where n>m, we must look to the impossibility results.

Informally, a consensus protocol is a system of n processes that communicate through a set of shared objects {X1, . . . . , Xn}. Each processes start with an input value from some domain; they communicate with one another by applying operations to the shared objects; and they eventually agree (decide function) on a common input value and halt. A consensus protocol is required to be:

  1. consistent: distinct processes never decide on distinct values;
  2. wait-free: each process decides after a finite number of steps;
  3. valid: the common decision value is the input to some process.

It is an immediate consequence of the definitions that if Y implements X, and X solves n-process consensus, then Y also solves n-process consensus. Therefore, there is no object X with a consensus number m that can be used to implement a wait-free algorithm for Y that has a consensus number n, where n>m.

As I said before about atomic r/w registers, it is not possible to have consensus with this type of object, because all processes can overwrite inadvertently each other value. Thus, it is not possible to have consensus in another object with n>1 using atomic r/w registers.

Read-Modify-Write registers

I asserted that it is not possible to have an wait-free implementation with atomic r/w registers. But what about with read-modify-write registers (RMW)?

Many of the synchronization primitives can be expressed as RMW operations, which both read a memory location and write a new value into it simultaneously, either with a completely new value or some function of the previous value. These operations prevent race conditions in multi-threaded applications and they are used to build mutexes and semaphores. Read-modify-write instructions often produce unexpected results when used on I/O devices, as a write operation may not affect the same internal register that would be accessed in a read operation.

Let me describe here RMW and decide functions:

RMW(r: register, f:function) returns(value)
    previous := r
    r := f(r)
    return previous
end RMW


decide(input: value) returns(value)
   prefer[P] := input
   if RMW(r,f) = v
      then return prefer[P]
      else return prefer[Q]
   end if
end decide

Looking to the decide function, and supposing that there are two concurrent processes P and Q trying to write in register r, if the input that is suggested is the one returned from RMW, then it means that P has written first. Otherwise, Q was the first one to write in the register. The decide function prevents concurrent access to the register.

So, in what differs atomic r/w registers from RMW registers? What differs is the use of the function RMW and decide functions when dealing with registers. Simple atomic r/w registers are not enough to have consensus in a wait-free implementation.

Although read-modify-write registers are more powerful than atomic read/write registers, many common read-modify-write operations are still computationally weak. In particular, one cannot construct a wait-free solution to three process consensus using registers that support any combination of read, write, test&set, swap, and fetch&add operations. Therefore, there is no wait-free solution to three-process consensus using any combination of read-modify-write operations.

Compare & Swap (CAS)

CAS is an atomic instruction used in multithreading to achieve synchronization. It compares the contents of a memory location to a given value and, only if they are the same, modifies the contents of that memory location to a given new value. This is done as a single atomic operation. The atomicity guarantees that the new value is calculated based on up-to-date information; if the value had been updated by another thread in the meantime, the write would fail.

The CAS operation is expressed in the following way in C language:

int compare_and_swap(int* reg, int oldval, int newval)
{
  ATOMIC();
  int old_reg_val = *reg;
  if (old_reg_val == oldval)
     *reg = newval;
  END_ATOMIC();
  return old_reg_val;
}

CAS is used for implementing synchronization primitives like semaphores and mutexes, likewise more sophisticated lock-free and wait-free algorithms. The author proved that CAS can implement more wait-free algorithms than with atomic read, write, or fetch-and-add. Algorithms built around CAS typically read some key memory location and remember the old value. Based on that old value, they compute some new value.

To implement a wait-free n-process consensus, we need a decide function that uses CAS operation and returns the correct result. This is shown in the next block of code.

decide(input: value) returns(value)
  first:=CAS(r,┴,input)
  if first=┴
    then return input
    else return first
  end if
end decide

In the decide function, the processes share a register r initialized to . Each process attempts to replace with its input; the decision value is established by the process that succeeds.

This protocol is clearly wait-free, since it contains no loops and the function will always end with a finite number of steps.

All the objects that implements CAS are universal because they are the objects with the biggest consensus number, and they can implement all objects and operations with a lower consensus number.

To proof this, we must follow this basic idea. We represent the operations (fetch&add, test&set, etc...) to be applied on the object as a linked list, where the sequence of cells represents the sequence of operations applied to the object.

Applying an operation p to the object in state s leaves the object in state s' and returns the result value r. At the end of operation, it must be necessary to execute a consensus to decide for the next operation that will be applied to the object. When the consensus is used to decide for the next operation to execute on the object, we are turning concurrent operations made by concurrent processes into linear operations. Therefore, the object will never halt the execution because of concurrency.

In conclusion, if CAS is a universal operation, then it can implement all operations with lower consensus number.

Tuesday, 25 November 2014

Lock-free vs. wait-free concurrency

There are two types of non-blocking thread synchronization algorithms - lock-free, and wait-free.

In lock-free systems, while any particular computation may be blocked for some period of time, all CPUs are able to continue performing other computations. Saying in other words, while a given thread might be blocked by other threads in a lock-free system, all CPUs can continue doing other useful work without stalls. Saying again in other words, if some threads block, the whole system keeps running.

A non-blocking algorithm is lock-free if there is guaranteed system-wide progress regardless of scheduling. Lock-free algorithms increase the overall throughput of a system by occasionally increasing the latency of a particular transaction. Most high- end database systems are based on lock-free algorithms, to varying degrees.

By contrast, wait-free algorithms ensure that in addition to all CPUs continuing to do useful work, no computation can ever be blocked by another computation. Confusing, isn't it? I am going to try to explain it simpler.

Lock-freedom

Lock-freedom allows individual threads to starve but guarantees system-wide throughput. An algorithm is lock-free if it satisfies that when the program threads are run sufficiently long at least one of the threads makes progress.

Wait-freedom

An algorithm is wait-free if every operation has a bound on the number of steps the algorithm will take before the operation completes, regardless of the execution speeds of other processes. Wait-freedom is the strongest non-blocking guarantee of progress, and ensure a high throughput without sacrificing latency of a particular transaction. They are also much harder to implement, test, and debug. Wait-free algorithms are always lock-free algorithms.

In a situation where a system handles dozens of concurrent transactions and has soft latency requirements, lock-free systems are a good compromise between development complexity and high concurrency requirements. A database server for a website is a good candidate for a lock-free design. While any given transaction might block, there are always more transactions to process in the meantime, so the CPUs will never stay idle. The challenge is to build a transaction scheduler that maintains a good mean latency, and a well bounded standard deviation.

In a scenario where a system has roughly as many concurrent transactions as CPU cores, or has hard real-time requirements, the developers need to spend the extra time to build wait-free systems. In these cases blocking a single transaction is not acceptable - either because there are no other transactions for the CPUs to handle, minimizing the throughput, or a given transaction needs to complete with a well defined non-probabilistic time period. Nuclear reactor control software is a good candidate for wait-free systems.

Security and Privacy in Cloud Computing - Survey

Paper here

This paper gives a survey about security and privacy in terms of cloud computing. It explores several applications that are a solution to the mentioned problems that exist in a cloud. In this summary, I will not focus much on the applications, but on the problems that exist in a cloud.
Cloud computing system, denoted as Cloud in short, has become a buzzword nowadays, and it has become a great business for several companies like Amazon, Google, and Microsoft. Cloud computing can provide infinite computing resources on demand due to its high scalability in nature, which eliminates the needs for Cloud service providers to plan far ahead on hardware provisioning. Cloud providers charge clients in terms of computing usage, and can release computing resources as they need - utility computing.

A strong barrier that have been raised against cloud computing relates to security and privacy. This is a small list of incidents that undermine cloud computing, and much more will happen:
  • Google Docs found a flaw that inadvertently shares users docs in March 2009.
  • A Salesforce.com employee fell victim to a phishing attack and leaked a customer list, which generated further targeted phishing attacks in October 2007.
  • Epic.com lodged a formal complaint to the FTC against Google for its privacy practices in March 2009. EPIC was successful in an action against Microsoft Passport.
  • Steven Warshak stops the government's repeated secret searches and seizures of his stored email using the federal Stored Communications Act (SCA) in July, 2007
Cloud computing allows providers to develop, deploy and run applications that can easily grow in capacity (scalability), work rapidly (performance), and never (or at least rarely) fail (reliability), without any concerns on the properties and the locations of the underlying infrastructures. The penalties of obtaining these properties of Cloud Computing are to store individual private data on the other side of the Internet and get service from other parties (i.e. Cloud providers, Cloud service providers), and consequently result in security and privacy issues.

The papers shows that availability, confidentiality, data integrity, control and audit are important to achieve adequate security.

Availability


The goal of availability for Cloud Computing systems (including applications and its infrastructures) is to ensure that users can access the cloud at any time, at any place. This is true for all cloud computing systems - DaaS, SaaS, PaaS, IaaS, and etc. Hardening and redundancy are two strategies to improve availability.

Cloud computing vendors provide Cloud infrastructures and platforms based on virtualization. E.g., Amazon uses Xen to provide separated memory, storage, and CPU virtualization on a large number of commodity PCs. Hence, the virtual machine is the basic component in the cloud providers. Virtual machines have the capability to provide on demand service in terms of users' individual resources requirements, and they are used to tie commodity computers to provide a scalable, and robust system.

Furthermore, cloud system vendors offer the ability to block and filter traffic based on IP by the user in the virtual machines by the client, which in turn enhances the availability of the provided infrastructure.

As for redundancy, cloud system vendors offer geographic redundancy to enable high availability on a single provider. Availability zones are distinct locations that are engineered to be insulated from failures in other availability zones and provide inexpensive, low latency network connectivity to other availability zones in the same region. Using instances in separate availability zones, one can protect applications from failure of a single location.

There to say, Cloud system has capability in providing redundancy to enhance the high availability of the system.

Confidentiality


Confidentiality is a big obstacle for the users to overcome. They want to keep data secret in the cloud system.

Cloud computing systems are public networks and are exposed to more attacks when compared to those hosted in the private data centers. Therefore, it is fundamental requirement to keep all data confidential.

There are two basic approach to guarantee confidentiality - physical isolation and cryptography. VLANs and network middleboxes are are used to achieve virtual physical isolation. Encrypted storage is another choice to enhance the confidentiality. For example, encrypting data before placing it in a cloud maybe even be more secure than unencrypted data in a local data center.

Data integrity


Data integrity in the Cloud system means to preserve information integrity and it is fundamental for DaaS, SaaS, and PaaS infrastructures. In a cloud system, we are talking about Terabytes and Petabytes of data. To try to supply the popularization of the clouds, vendors need add more hard drives. This may consequently result in increased high probability of either node failure or disk failure or data corruption or even data loss. Secondly, disk drives (or solid state disks) are getting bigger and bigger in terms of their capacity, while not getting much faster in terms of data access.

There are storage services like Zetta, GFS, and HDFS that try to take are of the integrity in different ways. Zetta provides integrity based on RAIN-6 (Redundant Array of Independent Nodes) that it is similar to RAID6.

Digital signature is also a technique used for data integrity testing. This is used in the GFS and HDFS. When a block is stored, a digital signature is attached to it. The signature is able to recover data from corruption.

Control


Control the cloud means to regulate the use of the system, including the applications, its infrastructure and the data Performing distributed computation in the Cloud Computing systems on sensitive individual data, like genomic data, raises serious security and privacy concerns. Data and computation must be protected from leaks or malicious hosts.

In cloud computing, Airavat integrates decentralized information flow control (DIFC) and differential privacy to provide rigorous privacy and security control in the computation for the individual data in the MapReduce framework. Airavat uses DIFC to ensure that the system is free from unauthorized storage access.

It prevents Mappers to leak data over unsecured network connections or leave the intermediate result data in unsecured local files. By providing several trusted initial mappers and trusted reducers, Airavat is able to carry out privacy-preserving computations in the MapReduce framework, eventually allowing users to insert their own mappers while dynamically ensuring differential privacy.

Hence, efficient and effective control over the data access in the Cloud Computing system and regulate the behaviours of the applications (services) hosted on the Cloud Computing systems will enhance the security of systems.

Audit


Audit means to watch what happened in the Cloud system. Three main attributes should be audited:
  • Events: The state changes and other factors that effected the system availability.
  • Logs: Comprehensive information about users' application and its runtime environment.
  • Monitoring: Should not be intrusive and must be limited to what the Cloud provider reasonably needs in order to run their facility.
The auditability is a law issue because it involves the law of the country where the data is. In Internet there is no frontiers, but the site where the data is, it counts for the justice.

The rest of the papers dwells in legal issues related to privacy. I am not going to focus on the legal issues, but what I can say is, such a new feature (Auditability) reinforces the cloud computing developers to provide a virtualized system over the virtual machine to watch what is happening in the system.

In conclusion, this is a general paper that shows the main challenges that the cloud world is facing, and we can see that cloud computing is not a light issue and it can involve the government when we start to talk about privacy and security.

Wednesday, 19 November 2014

The HybrEx Model for Confidentiality and Privacy in Cloud Computing

Paper here
This paper was presented in HotCloud'11 and proposes a new execution model for confidentiality and privacy in cloud computing. This framework (no evaluation is shown) uses partioning of data and computation as a way to provide confidentiality and privacy.
The authors assume that many organizations have not widely adapted the use of clouds due to the concerns of confidentiality and privacy. Organizations prefer their internal infrastructure than a third-party cloud service. These concerns are well-founded. Researchers have shown that an outside attacker can extract unauthorized information in Amazon EC2. Other researchers found a vulnerability that allow user impersonation in Google Apps.
Encryption can only provide a limited guarantee, since any computation on encrypted data either involves decrypting the data or has yet to be practical even with fully homomorphic encryption1.
In general, public clouds give better computing power than a private cloud. Normally, this is due to the fact that cloud providers have very high-performance datacenters.
One way to make the cloud more popular is to make it more secure. Recognizing this difficulty, they propose a model that uses public cloud for safe operations and integrate it with the private cloud.
The HybrEx model uses public cloud only for non-sensitive data and computation classified as public. E.g., when a company declares that some data or computation are non-sensitive. Realizing HybrEx model with MR, it is necessary to partition data and computation, use public and private clouds for single MR jobs over the Internet, and integrate the results. The main benefit of this model is to integrate with safety public and private clouds without concerns for confidentiality and privacy. They explore this model using the Hadoop MapReduce (MR) architecture.
MR is the most popular execution environment in cloud computing, and used to compute embarrassingly parallel problems.
The Fig. 2 shows several types of partitioning. I have to remind that the authors only specifies a way to do partitioning with HybrEx. They have never shown results of a functioning framework.

Horizontal Partitioning

There are two popular usage cases for the current public clouds. The first case is long-term archiving of an organization's data, where the organization encrypts their private data before storing it on a public cloud. The second case is exporting and importing data for running periodic MapReduce jobs in a public cloud. E.g., a company put data it Amazon Elastic MR, execute a job there and import the result back to its own private storage.
The authors say that, in the long-term archiving case (2b), HybrEx MapReduce can run Map tasks that encrypt private data in the private cloud, sanitize them optionally, transfer encrypted data to the public cloud via the Shuffle phase, and run Reduce tasks that store the data in the public cloud.

Vertical Partitioning

Vertical Partitioning (2c) in this model, it uses public cloud by executing MR jobs (e.g., two jobs) independently in public and private cloud and avoid inter-cloud shuffling of intermediate data. Each job consumes data from the respective cloud, and store the result in the same cloud. HybrEx MapReduce can run a MR job this way when the job can process private and public data in isolation. Unfortunetely, I cannot find a practical case. Just the theory.

Hybrid

In this model (2d), HybrEx uses MR in both private and public cloud in all three phases (map, shuffle and sort, and reduce).
As said before, it is necessary to overcome three main challenges to implement the HybrEx model — data partitioning, system partitioning, and integrity — and achieve integration with safety.

Data Partitioning

This model proposes the use of partitioning for confidentiality and privacy. So, it is necessary to know how to partition data. In this model, the data is labelled as public and private. MR will detect this labels and it will place the data in the right cloud and compute it accordingly.

System Partitioning

Since HybrEx model uses both private and public clouds, it is necessary to have some components in both clouds, to keep public components away from private data, and to reduce inter-cloud data transference if we really need to transfer data between clouds.
There is a master and a shadow master, the public counterpart, that have their own workers. There is no master-slave wide-area communication. What it can have a shuffle proxies that transfer intermediate data between clouds.
Shuffle proxies give the benefit to separate architectural components, where we can apply techniques (caching, aggregation, compression, and de-duplication of intermediate data) to reduce wide-area overhead.

Integration

It is necessary to provide integrity of the data and computation to an untrustworthy public cloud.
For computation integrity, HybrEx MapReduce checks the integrity of the results from the public cloud in two modes that provide different levels of fidelity.
  1. Full integrity checking: The private clouds re-execute every map and reduce task that the public cloud has executed.
  2. Quick integrity checking: The private cloud checks selectively the integrity of the results from the public cloud.
In 1., this is a way to enable auditing at later time. Obviously, it has a great overhead in doing a fully integrity.
In 2., the HybrEx MR checks the integrity at runtime for probabilistic detection of suspicious activities in the public cloud. E.g., for a MR job that counts words in a public document, we can either add new unique words to the document or select existing words at random from the document, and store them in the private cloud. Then, we can verify that the result from the public cloud contains the accurate counts of these words by running the same job in the private cloud and check the result in the selected regions.

Conclusion

This paper presents a model to allow MR and BigTable (I do not talk about BigTable here) computation in public and private clouds. We know that, in big data era, it is very expensive to have a private and internal cloud to deal with the terabytes, or petabytes that it must process. Therefore, it is a good solution to use cloud providers to process them. At the same time, there are some data that must not be disclosed. Thus, we need to protect privacy of this data, and still use the computational power of the public cloud to deal with rest of the job.
What this paper should have, was the real implementation and, respective evaluation, of this model. Unfortunately, it is not presented.

  1. Homomorphic encryption is a form of encryption which allows specific types of computations to be carried out on ciphertext and generate an encrypted result which, when decrypted, matches the result of operations performed on the plaintext.

Friday, 7 November 2014

Unreliable failure detectors for reliable distributed systems

Paper here

This is a seminal paper about failure detectors. A failure detector is an application or a subsystem that is responsible for detection of node failures or crashes in a distributed system. Failure detectors are only interesting if you can actually build them. In a fully asynchronous system, you cannot build one, but with timeouts, it's not hard: have each process ping each other process from time to time, and suspect the other process if it does not respond to the ping within twice the maximum round-trip time for any previous ping.

Classification of failure detectors

In this paper it is defined eight classes of failure detectors, based on when they suspect faulty processes and non-faulty processes. Suspicion of faulty processes comes under the heading of completeness; of non-faulty processes, accuracy.

Degrees of completeness

  1. Strong completeness: Every faulty process is eventually permanently suspected by every non-faulty process (Everybody suspect a dead process).
  2. Weak completeness: Every faulty process is eventually permanently suspected by some non-faulty process (Somebody suspects a dead process).
"Eventually permanently" means that there is some time $t0$ such that for all times $t≥t0$ (subsequent time), the process is suspected.
Note that completeness says nothing about suspecting non-faulty processes: a paranoid failure detector that permanently suspects everybody has strong completeness.

Degrees of accuracy

These describe what happens with non-faulty processes, and with faulty processes that haven't crashed yet.
  1. Strong accuracy: No process is suspected (by anybody) before it crashes.
  2. Weak accuracy: Some non-faulty process is never suspected. There are some non-faulty processes that are mistakenly suspected.
  3. Eventual strong accuracy: After some initial period of confusion or chaos, no process is suspected before it crashes. This can be simplified to say that no non-faulty process is suspected after some time.
  4. Eventual weak accuracy: After some initial period of confusion or chaos, some non-faulty process is never suspected.
strong and weak mean different things for accuracy vs completeness:
  1. for accuracy, we are quantifying over suspects,
  2. for completeness, we are quantifying over suspectors (processes that suspects other processes).
Even a weakly-accurate failure detector guarantees that all processes trust the one visibly good process.
Below, is an example of weak completeness. Here, if p crashes, somebody will eventually suspect it, and notices all the processes. The suspicion is true if p does not contradict.

initially suspects = ø

do forever:
    for each process p:
        if my weak-detector suspects p, then send p to all processes

upon receiving p from some process q:
    suspects := suspects + p - q

This example preserves accuracy, because if there is some good guy, non-faulty process, that everybody trusts (as in weak accuracy), nobody ever reports p as suspected. For strong accuracy, every non-faulty process will suspect p. For eventual weak accuracy, it is necessary to wait to everybody contact p, and wait for the response, so that someone stop suspecting p. In eventual strong accuracy everybody stops suspecting.

Failure detector classes


There are two degrees of completeness, and four degrees of accuracy, that gives eight classes of failure detectors.





The distinction between strong and weak completeness turns out to be spurious (false); a weakly-complete failure detector can simulate a strongly-complete one (but this requires a proof). We can use this as an excuse to consider only the strongly-complete classes:
  1. P (perfect): Strongly complete and strongly accurate: non-faulty processes are never suspected; faulty processes are eventually suspected by everybody. Easily achieved in synchronous systems.
  2. S (strong): Strongly complete and weakly accurate. The name is misleading if we've already forgotten about weak completeness, but the corresponding W (weak) class is only weakly complete and weakly accurate, so it's the strong completeness that the S is referring to.
  3. ⋄P (eventually perfect): Strongly complete and eventually strongly accurate.
  4. ⋄S (eventually strong): Strongly complete and eventually weakly accurate.
Jumping to the punch line:
  1. P can simulate any of the others,
  2. S and ⋄P can both simulate ⋄S but can't simulate P or each other,
  3. ⋄S can't simulate any of the others.
Thus, ⋄S is the weakest class of failure detectors in this list, but it is strong enough to solve consensus. Any failure detector (whatever its properties) that can solve consensus is strong enough to simulate ⋄S - this makes ⋄S the "weakest failure detector for solving consensus". We can consider ◊W and ◊S similar, because it is easy to obtain strong completeness from weak completeness. For this, it is just needed to execute this code (same as above).

initially suspects = ø

do forever:
    for each process p:
        if my weak-detector suspects p, then send p to all processes

upon receiving p from some process q:
    suspects := suspects + p - q

Consensus

Consensus is a decision-making by all elements of the group. The failure detectors help to build consensus in asynchronous systems with crash failures. Consensus can be solved even with unreliable failure detectors that make an infinite number of mistakes, and determine which ones can be used to solve Consensus despite any number of crashes, and which ones require a majority of correct processes. Consensus and Atomic Broadcast are reducible to each other in asynchronous systems with crash failures.

Consensus with S

Here, there is some non-faulty process c that nobody ever suspects, and this is enough to solve consensus. The basic idea of the protocol is, there are 3 phases:
  1. Processes gossip about the input values for n-1 rounds.
  2. They prune out any that are not universally known.
  3. Everybody agrees on the lowest-id input that has not been pruned.
This protocol satisfies validity because after round 1, a value v is in every process because it was gossiped. In round 2, all processes intersects every values and they still leaves v, so that in round 3 one of them is picked up.

To get termination, nobody ever waits forever for a message it wants. The first non-faulty process that gets stuck eventually is informed by the S-detector that the process it is waiting for is dead.

For agreement we must guarantee that all processes have the same set of values. This is achieved because of the intersection in the round 2.

Consensus with ⋄S and f<n/2





The consensus protocol for S depends on some process c never being suspected; if c is suspected during the entire execution of the protocol, as it can happen with ⋄S, then it is possible that no process will wait to hear from c (or anybody else) and the processes will all decide their own inputs.
To solve consensus with ⋄S we will need to assume less than $f=n/2$ failures, allowing any process to wait to hear from a majority no matter what lies its failure detector is telling it.
The protocol uses reliable broadcast to guarantee that any message sent is received by all non-faulty processes, or it is not received by no process.
  • Each process keeps track of a preference (initially its own input value) and a timestamp, the round number in which it last updated its preference.
  • The processes go through a sequence of asynchronous rounds, each divided into four phases:
    1. All processes send (round, preference, timestamp) to the coordinator for the round.
    2. a) The coordinator waits to hear from a majority of the processes (possibly including itself). b) The coordinator sets its own estimate to some estimate with the largest timestamp of those it receives, c) and sends (round, estimate) to all processes.
    3. Each process waits for the new proposal from the coordinator or for the failure detector to suspect the coordinator. If it receives a new estimate, it adopts it as its own, sets timestamp ← round, and sends (round, ack) to the coordinator. Otherwise, it sends (round, nack) to the coordinator.
    4. The coordinator waits to receive ack or nack from a majority of processes. If it receives ack from a majority, it announces the current estimate as the protocol decision value using ReliableBroadcast.
  • Any process that receives a value in a ReliableBroadcast decides on it immediately.
The properties for ◊S consensus are:
  1. Termination: Every correct process eventually decides on some value. All processes or are not waiting, or are waiting for a majority of decided values from non-faulty processes. The problem is on processes that decided to stop participating in the protocol; but because any non-faulty process retransmits the decision value in the ReliableBroadcast, if a process is waiting for a response from a non-faulty process that already terminated, eventually it will get the ReliableBroadcast instead and terminate itself. In phase 3, a process might get stuck waiting for a dead coordinator, but the strong completeness of ⋄S means that it suspects the dead coordinator eventually and escapes. So at worst, we do infinitely many rounds.
  2. Validity: If a process decides v, then v was proposed by some process. The decision value is an estimate, and all estimates start from inputs.
  3. Agreement: No two correct processes decide differently. It is possible that two coordinators both initiate a ReliableBroadcast and some processes choose the value from the first, and other process the value from the second. In this case the first coordinator collected acks from a majority of processes in some round r, and all subsequent coordinators collected estimates from an overlapping majority of processes in some round $r'>r$. By applying the same induction argument as for Paxos we get that all subsequent coordinators choose the same estimate as the first coordinator, and so we get agreement. Basically, the quorums will overlap, and so, they will just return one correct value.

f<n/2 is still required even with ⋄P


With a majority of failures, $f≥n/2$, it can bring troubles with ⋄P (and thus with ⋄S, which is trivially simulated by ⋄P).
The reason is that ⋄P can lie to us for some long initial interval of the protocol, and consensus is required to terminate eventually despite these lies. For example, we start half of the processes with input 0, half with 1, and run both halves independently with ⋄P suspecting the other half until the processes in both halves decide on their common inputs (different between the halves). With $f<n/2$, the processes will get just one value that came from a majority.

Relationship among classes

Failure detectors can be compared. A failure detector D2 is said to be weaker than a failure detector D1 if there is an asynchronous algorithm, called a reduction algorithm, which, using D1, can emulate D2. The existence of a reduction depends on the environment that it is considered.
P simulates S, and ◊P simulates ◊S without modification. Also, P simulates ◊P, and S simulates ◊S. There is no simulation from ◊P to S, S to ◊P, and from ◊S to any other classes.


Partial synchrony

Fischer, Lynch and Paterson showed that Consensus cannot be solved in an asynchronous system subject to crash failures. The fundamental reason why Consensus cannot be solved in completely asynchronous systems is the fact that, in such systems, it is impossible to reliably distinguish a process that has crashed from one that is merely very slow. In other words, Consensus is unsolvable because accurate failure detection is impossible. On the other hand, it is well-known that Consensus is solvable (deterministically) in completely synchronous systems — that is, systems where clocks are perfectly synchronised, all processes take steps at the same rate and each message arrives at its destination a fixed and known amount of time after it is sent. In such a system, we can use timeouts to implement a "perfect" failure detector — i.e., one in which no process is ever wrongly suspected, and every faulty process is eventually suspected Thus, the capacity to solve consensus is related to the failure detection capabilities.


Between asynchronous and synchronous system, there are the partial synchronous systems. In this systems, the bounds are known after some time during the execution. In this ◊P algorithm, a process p is sending periodically a "p_is_alive" message to all other processes. Lets suppose that q is the receiver. If p suspects that q did not receive the message, it will increase the timeout to get the response from q, and it will resend the message to q. Eventually, the timeout will be so big, that all others timeouts will be within the q timeout interval, that will make this model synchronous. After some time t', all correct processes will suspect q, and then the strong completeness is satisfied.
This model holds strong completeness, and eventual strong accuracy.

Using atomic broadcast to get Consensus


Consensus can easily be implemented using atomic broadcast Atomic Broadcast requires that all correct processes deliver the same messages in the same order. The total order and agreement properties of Atomic Broadcast ensure that all correct processes deliver the same sequence of messages. Consensus and Atomic Broadcast are equivalent in asynchronous systems with crash failures.
  1. Atomic Broadcast cannot be solved by a deterministic algorithm in asynchronous systems, even if we assume that at most one process may fail, and it may only fail by crashing. This is because Consensus has no deterministic solution in such systems.
  2. Atomic Broadcast can be solved using randomisation or unreliable failure detectors in asynchronous systems. This is because Consensus is solvable using these techniques in such systems.
Consensus can be easily reduced to Atomic Broadcast as follows. To propose a value, a process atomically broadcasts it. To decide a value, a process picks the value of the first message that it atomically delivers. By total order of Atomic Broadcast, all correct processes deliver the same first message. Hence they choose the same value and agreement of Consensus is satisfied.

Using Consensus to get atomic broadcast


The atomic algorithm uses several executions of consensus to get atomic broadcast. Intuitively, the kth execution of Consensus is used to decide on the kth batch of messages to be atomically delivered. Processes disambiguate between these executions by tagging all the messages pertaining to the kth execution of Consensus with the counter k.



In this algorithm, a message is A-broadcast using R-broadcast primitive. When the message is R-delivered, it adds m to the set of R_deliveredp. When m is delivered atomically, it is added to A-deliveredp.

After m being R-delivered, the process p will wait for the consensus of other hosts (task 3) to decide which message should be delivered. If the consensus is reached, the message is A_delivered. The message is A-delivered in some deterministic order that was agreed by all processes, e.g., lexicographical order.

Failure detectors

In distributed computing, a failure detector is an application or a subsystem that is responsible for detection of node failures or crashes in a distributed system.
There are 2 mechanisms of failure detection:
  1. Heartbeat mechanism: The heartbeat mechanism monitors the connection between a manager and an agent, and are sent by the agent to indicate that it is alive. When the messages are not received, the manager can consider the agent faulty.
  2. Probe mechanism: Each node require to probe another node. Basically, one node sweeps all other nodes, and see who has replied. The probe mechanism is the origin of the failure detectors.
Failure detectors must guarantee safety and liveness.
  1. Safety: safety properties informally require that "something bad will never happen". Safety properties can be violated by a finite execution of a distributed system.
  2. Liveness: liveness properties informally require that "something good eventually happens". A liveness property cannot be violated in a finite execution of a distributed system because the "good" event might only theoretically occur at some time after execution ends. Eventual consistency is an example of a liveness property.
Eventual consistency is a consistency model used in distributed computing to achieve high availability that informally guarantees that, if no new updates are made to a given data item, eventually all accesses to that item will return the last updated value. A system that has achieved eventual consistency is often said to have converged.

If "perfect channels" are available, heartbeat exchanges meet strong accuracy and strong completeness. Such detector is called "Perfect" failure detector - if a node crashes, all correct nodes with notice the absence of the heartbeat. But in the internet, there is no perfect channel. If we knew how much packets will be lost in the network, lets say k packets, we just needed to send k + 1 packets to ensure a perfect detector. But this is not possible due to the lack of bounds on the number of communication faults that may happen in a channel. Perfect failure detectors cannot work on the internet, or in asynchronous systems. Therefore, there are weaker failure detectors models.

The explanation of the several types of failure detectors can be viewed in my post called " Unreliable failure detectors for reliable distributed systems"

Thursday, 6 November 2014

A Guided Tour on the Theory and Practice of State Machine Replication

Paper here and Paxos paper here

In this paper it is presented the fundamentals and applications of the State Machine Replication (SMR) technique for implementing consistent fault-tolerant services.
Service replication is a technique in which copies of the service are deployed in a set of servers. This technique is often used to: 1. increase the system performance and capacity 2. provide fault tolerance.
Replication can increase the service' performance and capacity since the replicas provide more resources to the service. Fault tolerance can be achieved with several replicas through the use of spatial redundancy to the system, making it continue to operate despite the failure of a fraction of these replicas.
Regarding the maintenance of consistent state in replicated systems, there are two fundamental approaches: primary-backup (or passive) replication and state machine (or active) replication.
In the primary-backup replication model there is one primary replica that executes all operations issued by the clients and, periodically, pushes state updates to the backup replicas. Furthermore, these replicas keep monitoring the primary to ensure that one of them takes over its role in case it fails. In the state machine replication model clients issue commands to all replicas, which execute them in a coordinated way and produce a reply. In this model, no monitoring and synchronization is necessary.

SMR

State machine replication is usually defined through a set of clients submitting commands to a set of replicas behaving as a "centralized" service, or a replicated state machine(RSM). This implementation holds 3 properties:
  1. Initial state: All correct replicas start on the same state;
  2. Determinism: All correct replicas receiving the same input on the same state produce the same output and resulting state;
  3. Coordination: All correct replicas process the same sequence of commands
From the practical point of view, Initial State and Determinism are not easy to ensure. In the former, there is complexity when replicas crash and return, and the states are outdated. For Determinism, it can constrain the service performance.
To satisfy Determinism in RSM, the service supported by the replicas is single threaded.
In RSM, it just answers client requests. It does not initiate communication.
The RSM must satisfy the following safety and liveness:
  1. Safety: all correct replicas execute the same sequence of operations (something bad will never happ);
  2. Liveness: all correct clients operations are executed. (something good eventually will happen.)
The Safety property allows the implementation of strongly consistent services, satisfying the consistency property known as Linearizability (single operations in single object. Writes should appear instantaneous. Linearizability for reads and writes is a synonymous of atomic consist).

Fault Models

2 Kind of faults are considered for the processes in SMR based system:
  1. Fail-stop: A faulty (crashed) process halts and does not execute any further operations during the system execution;
  2. Byzantine: A faulty process may behave arbitrarily, i.e., deviating from its algorithm and taking any action.
It is also important to define which kind of communication channels errors can affect the system. The main fault models for channels are:
  1. Reliable: every message sent will be delivered;
  2. Unreliable: messages can be modified, generated or dropped;
  3. Lossy: messages may be lost, but not modified;
  4. Fair: if a message is resent infinitely often, it will arrive infinitely often.
An important implicit assumption in replicated systems is that correlated failures will not happen, i.e., that the probability of a replica to fail is statistically independent from the probability of any other replica of the system to fail.

Synchrony

Asynchronous: The weakest synchrony model is the asynchronous distributed system. In this model, processing and communication have no time bounds, and there are no physical clocks. Therefore, in this model the notion of time does not even exists.
Synchronous: In these systems, there are known bounds in terms of processing, communication and clock error in different processes. This modified represents real-time systems.
Partially Synchronous: This model considers that the system behaves asynchronously until a certain (unknown) instant GST (Global Stabilization Time), in which the system becomes synchronous, respecting some (unknown) processing and communication time bounds. This system does not need to be synchronous forever, just until a defined point.

Cryptography

Byzantine fault-tolerant SMR protocols usually consider the existence of authenticated communication channels, which can only be implemented if either a Public-key infrastructure is available for supporting the use of asymmetric cryptography for message signatures or the existence of shared secrets between each pair of processes (which can be supported by a key distribution center like Kerberos) for enabling the use of Message Authentication Codes (MAC).

Fundamentals results of distributed computing

There are 5 fundamentals results from distributed computing theory that constrains the design of SMR protocols.
  1. (R1) Impossibility of reliable communication on top of unreliable channels.
  2. (R2) Equivalence between total order multicast and consensus
  3. (R3) Impossibility of fault-tolerant consensus in asynchronous systems
  4. (R4) Minimum synchrony required for fault-tolerant consensus.
  5. (R5) Fault thresholds.
For R1, TCP-IP is built over a best-effort protocol (IP), which gives no reliability guarantees. However, Writes the fair link assumption defined above, we can make a reliable channel by sending repeatedly the message until it is ACKed.
For R2, a fundamental requirement for implementing SMR is the coordination of replicas, which requires that all correct replicas receive the same messages in the same order. Conceptually, satisfying this requirement requires the implementation of a total order multicast (or atomic multicast) protocol. A fundamental result in distributed computing is the equivalence between the problem of implementing this kind of communication primitive and solving consensus (Using atomic broadcast to make consesus). In the consensus problem (Use consensus to make atomic broadcast), processes propose some value and try to reach agreement about one value to decide, which must be one of the proposed values (or the first one). This equivalence is important because it shows that the implementation of a RSM is subject to the same constraints and results of the well-studied consensus problem, which leads us to our next fundamental result.
For R3, one of the most well-known results in distributed computing is the impossibility of achieving (deterministic) consensus in an asynchronous system in which a single process can crash, because we do not know if a process has really crashed or it is slow. This can lead to problem that the consensus never terminate of can lead to different processes decide different values.
For R4, by separating safety and liveness, it is possible to solve consensus in a Partially Synchronous system model.
For R5, different system models require a different number of processes to implement consensus and total order broadcast tolerating up to f failures.
In the table we consider the synchronous and partially synchronous system models for crash, Byzantine and authenticated Byzantine failures. These results reflect the exact number of replicas a f-resilient RSM must contain in order to implement replica coordination. We complement these results with the minimal number of replicas required to execute commands after it is delivered in total order However, once the total order is established, f less replicas are required to execute the operations in both fault models.
Failure typeSynchronousPartially Synch.Replicated Execution
Fail-stopf+12f+1f+1
Auth. Byz.f+13f+12f+1
Byzantine3f+13f+12f+1

Baseline protocols

In this section it is described the Viewstamped replication and Paxos for implementing RSM. The main challenge in implementing RSMs is to ensure that the operations requested by clients are executed in the same order at all replicas in spite of concurrency and failures.

Paxos and Viewstamped Replication (VR)

VR was devised at the same time as Paxos, and it is similar to Paxos. VR works is an asynchronous network like internet, but deep down, it needs a way to make things synchronous in some way so that all processes vote for the same value.
The relevance of Paxos/VR comes from the fact this algorithm has served as the foundation for many recent replicated (consistent and fault-tolerant) data stores, coordination and configuration management system.
Paxos/VR requires n=2f+1 replicas, in which up to f can be subject to crash faults. The system must be able to execute a request without waiting for f replicas.
To ensure safety, the protocol must ensure that, even if these f replicas fail, there will be at least one replica that processed the request and that will participate in other protocol executions. This implies that any two quorums of replicas accessed in the protocol must intersect at least in one correct replica. Consequently, each step of the protocol must be processed by a quorum of at least f+1 replicas that, together with the other f that may or may not reply, compose the n.

Normal operation



  1. The client sends a Request with the operation to be executed to the leader replica;
  2. The leader chooses a sequence number i for the received request, writes it to its log, and disseminates the request and its sequence number to all other replicas in a PREPARE message;
  3. If the replicas did not assign any other request to sequence number i, they accept the leader proposal and write the update (request plus sequence number) to their log, replying with a PREPARE-OK message to the leader;
  4. The leader waits for f confirmations from other replicas and then executes the request and sends the Reply to the client. This ensures that a majority of the replicas have the request in their logs and, consequently, the request will be visible even if the leader fails;
  5. Normally, the leader informs the other replicas about this request execution in the next PREPARE message, making them also execute the client request for updating their states, without sending a reply to the client.
If a client does not receive a reply for a request within a given time period, it resends the request to all replicas, ensuring that it reaches a new leader.
VR shows two improvements when compared with Paxos for implementing RSMs (Multi-Paxos).
  1. In Paxos, the leader sends an explicit COMMIT message to make the other replicas learn that the request can be executed.
  2. Paxos explicitly considers the durability of the service, all accepted PREPARE messages are logged in stable storage.



We can still optimize VR by adding the following:
  1. Read-only operations can be executed directly by the leader without contacting the other replicas.
  2. if the replicas need to learn that the client request was executed, instead of having the explicit COMMIT message as in classical Paxos, they can send the PREPARE-OK messages to all other replicas, and each replica can execute the request if it receives f of these messages

View change

If the leader fails, messages will not be ordered and the system will stop executing client requests. To recover from this situation, non-leader replicas monitor the leader and if they suspect that it is faulty, a new leader is elected to ensure the algorithm makes progress.




The rationale behind this protocol is that in order for a request to be executed, it must be in the log of at least f+1 replicas. Consequently, the order assigned for a request is preserved by the view change protocol since the new leader collects f+1 logs

Recovery

When a replica restarts after a crash, it cannot participate in request processing and view changes until it has a state at least as recent as when it failed. The recovering works like this:
  1. The recovering replica sends a message to all other replicas asking for the current state;
  2. Each replica sends a reply with its log, among other information;
  3. The recovering replica waits for f+1 of such messages, including one from the current leader it discovered from the received messages. Then, it updates its state from the received leader log. At this point the replica is recovered and can resume execution.


Paxos

This summary comes from the Lamport's article "Paxos made simple". The paper is provided in the link at the top of the article. /safe Paxos is a family of protocols for solving consensus in a network of unreliable processors.
The Synod algorithm is the center piece of the Paxos algorithm. It is executed separately for each log index, i.e., one Synod execution for the first item in the log, another Synod execution for the second item in the log, and so on. Multiple Synod algorithms can be executed concurrently, without affecting each other's results.
In a nutshell, the Synod algorithm is an agreement (consensus) algorithm which guarantees that:
  1. all the nodes participating have the same output value,
  2. the output value is proposed by some node (not just an arbitrary value),
  3. if enough nodes can communicate with each other, eventually the algorithm will finish.
The algorithm is constructed such that any node can propose a value, and at the end it is assured that only one of those values is chosen. However, it is possible that different nodes will propose values at different times such that the algorithm runs forever. To assure the termination of the Synod algorithm, only the leader node can propose values.
In order to guarantee safety, Paxos defines three safety properties and ensures they are always held, regardless of the pattern of failures:
  1. Non-triviality: Only proposed values can be learned.
  2. Safety: At most one value can be learned (i.e., two different learners cannot learn different values).
  3. Liveness: If value C has been proposed, then eventually learner L will learn some value (if sufficient processors remain non-faulty).
To sum up, Paxos always guarantees "safety", and ensures "liveness" if a majority of the nodes are communicating with each other.
The processes can have 3 roles: proposers, acceptors, and learners.



This protocol is the most basic of the Paxos family. Each instance of the Basic Paxos protocol decides on a single output value. The protocol proceeds over several rounds. A successful round has two phases. A Proposer should not initiate Paxos if it cannot communicate with at least a Quorum of Acceptors.




Phase 1

Phase 1a: Prepare

A Proposer (the leader) creates a proposal identified with a number N. This number must be greater than any previous proposal number used by this Proposer. Then, it sends a Prepare message containing this proposal to a Quorum of Acceptors. The Proposer decides who is in the Quorum.

Phase 1b: Promise

If the proposal's number N is higher than any previous proposal number received from any Proposer by the Acceptor, then the Acceptor must return a promise to ignore all future proposals having a number less than N. If the Acceptor accepted a proposal at some point in the past, it must include the previous proposal number and previous value in its response to the Proposer. Otherwise, the Acceptor can ignore the received proposal. It does not have to answer in this case for Paxos to work. However, for the sake of optimization, sending a denial (Nack) response would tell the Proposer that it can stop its attempt to create consensus with proposal N.

Phase 2

Phase 2a: Accept Request

If a Proposer receives enough promises from a Quorum of Acceptors, it needs to set a value to its proposal. If any Acceptors had previously accepted any proposal, then they'll have sent their values to the Proposer, who now must set the value of its proposal to the value associated with the highest proposal number reported by the Acceptors. If none of the Acceptors had accepted a proposal up to this point, then the Proposer may choose any value for its proposal. The Proposer sends an Accept Request message to a Quorum of Acceptors with the chosen value for its proposal.

Phase 2b: Accepted

If an Acceptor receives an Accept Request message for a proposal N, it must accept it if and only if it has not already promised to only consider proposals having an identifier greater than N. In this case, it should register the corresponding value v and send an Accepted message to the Proposer and every Learner. Else, it can ignore the Accept Request. Note that an Acceptor can accept multiple proposals. These proposals may even have different values in the presence of certain failures. However, the Paxos protocol will guarantee that the Acceptors will ultimately agree on a single value. Rounds fail when multiple Proposers send conflicting Prepare messages, or when the Proposer does not receive a Quorum of responses (Promise or Accepted). In these cases, another round must be started with a higher proposal number. Notice that when Acceptors accept a request, they also acknowledge the leadership of the Proposer. Hence, Paxos can be used to select a leader in a cluster of nodes.

Key to the efficiency of this approach is that, in the Paxos consensus algorithm, the value to be proposed is not chosen until phase 2. After completing phase 1 of the proposer's algorithm, either the value to be proposed is determined or else the proposer is free to propose any value.