Friday, 27 November 2015

NetAgg: Using Middleboxes for Application-specific on-path aggregation in data centres

Many applications in the data centers (DC) achieve horizontal scalability by adopting the aggregation pattern. The aggregation pattern can clog the network due to scarce inbound bandwidth or limited inter-rack bandwidth. Is this paper, the authors propose NETAGG, a software middlebox platform that provides an on-path aggregation service.

A middlebox is a network appliance attached to a switch that provides services such as firewalls, web proxies, SSL offloading, and load balancing. To maximise performance, middleboxes are often implemented in hardware, and adopt a vertically integrated architecture focusing on a narrow function, e.g. processing standard packet headers or performing relatively simple payload inspection.

They use NETAGG for application-specific aggregation functions for data reduction an each hop, and consequently reducing the possibility of networking bottleneck and, ultimately, improving the preformance. NETAGG use shim layers at edge servers in order to intercept application traffic and redirect it transparently to the agg boxes. They assume that the agg boxes are attached to the network switches and perform on-path aggregation. In this figure, the traffic comes for top-of-the-rack switches (ToR) to a top switch that is connected to a middlebox. The traffic is relayed from the switch to the middleboxes before continuing its course.

They have tested NETAGG with Apache Hadoop and noticed that the intermediate data size was reduced considerately by a factor of 5X on 128GB of intermediate data. In terms of time, NETAGG reduces the shuffle and reduce time in a factor by 4.5X. As a result, this reduces the processing time as well as disk I/O. The only case that NETAGG does not give any advantage is in the case where the job does not reduce the data like in a sorting algorithm.

In conclusion, NETAGG is an on-path aggregation service that transparently intercepts aggregation flows at edge servers using shim layers and redirects them to aggregation nodes (agg boxes). Since NETAGG reduces the data to be transferred between switches, the overall performance of MapReduce jobs can be reduced.

Cloudlab testbed

Cloudlab is a distributed network similar to GENI that comprises around 5000 cores and 300-500 Terabytes of storage. Cloudlab provide 2x10Gbps network interfaces to every node through Software-Defined Networking. The infrastructure contains 100Gbps full-mesh SDN that lets users interconnect a wide range of in-cluster experimental topologies, e.g., fat trees, rings, hypercubes, etc. CloudLab provides two major types of storage: per-server storage (a mix of high-performance flash and high-capacity magnetic disks at a ratio of about 1 disk per every 4 cores), and a centralized storage system. This storage mix enables a range of experiments with file systems, storage technologies, and big data, while providing reliable file systems to researchers who are not interested in storage experiments.

Cloudlab federates itself with GENI. This means that GENI users can access CloudLab with their existing accounts, and CloudLab users have access to all of the hardware resources federated with GENI. CloudLab sites interconnect with each other via IP and Layer-2 links to regional/national research networks, and the core GENI network. A single experiment can include GENI Racks (small clusters distributed across the United States)

Still, Cloudlab offer 3 sites where they have available several type of hardware that are located in the Utah University, Wisconsin University, and Clemson University.

Wednesday, 25 November 2015

Improving availability in distributed systems with failure informers

When a distributed system acts on failure reports, the system's correctness and availability depend on the granularity and semantics of those reports. The system's availability also depends on coverage (failures are reported), accuracy (reports are justified), and timeliness (reports come quickly). The availability is a paramount concern of distributed applications, and it is important that the systems can recover quickly when a failure happens. Existing mechanisms for reporting failures are coarse-grained, lack coverage, lack accuracy, or do not handle latent failures.

The paper presents Pigeon, a service for reporting host and network failures to highly available distributed applications. The application accurately detects a failure, and report it back in order to the system recover immediately. Pigeon classifies failures into four types: whether the problem certainly occurred versus whether it is expected and imminent, and whether the target is certainly and permanently stopped versus not.

Pigeon has several limitations and operating assumptions. First, Pigeon assumes a single administrative domain (but there are many such networks, including enterprise networks and data centers). Second, Pigeon requires the ability to install code in the application and network routers (but doing so is viable in single administrative domains). Third, for Pigeon to be most effective, the administrator or operator must perform environment-specific tuning (but this needs to be done only once)

Pigeon uses sensors that must detect faults quickly and confirm critical faults; the latter requirement ensures that Pigeon does not incorrectly report stops. The architecture accommodates pluggable sensors, and our prototype includes four types: a process sensor and an embedded sensor at end-hosts, and a router sensor and an OSPF sensor in routers.

There is also a component called interpreter that gathers information about faults and outputs the failure conditions. The interpreter must (1) determine which sensors correspond to the client- specified target process, (2) determine if a condition is implied by a fault, (3) estimate the condition's duration, (4) report the condition to the application via the client library, and (5) never falsely report a stop condition.

Pigeon is not suitable for applications like DNS because the client recovery is lightweight, so there is little benefit over using short end-to-end timeouts, since the cost of inaccuracy is low. Some applications do not make use of any information about failures; such applications likewise do not gain from Pigeon. For example, NFS (on Linux) has a hard-mount mode, in which the NFS client blocks until it can communicate with its NFS server; this NFS client does not expose failures or act on them.

Tuesday, 24 November 2015

Introduction to Celery

Celery is a distributed system for processing messages on a task queue with a focus or real-time processing and support for task scheduling. Celery communicates with a message broker and distributes work. Here is an boilerplate example of Celery.

#project/celery.py
from celery import Celery
from django.conf import settings

app = Celery('appname', broker=settings.BROKER_URL)
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(settings.INSTALLED_APPS)


celery -A appname worker

app is the app instance of all project. Which means it is only possible to have one Celery running per project. The appname is the name of the application and the broker is URL of the message broker where the worker is going to attach.

Celery uses a @task decorator that gives the ability to hook to the celery.

@app.task
def generate_thumbnails_task(image_path, sizes):
    for size in sizes:
        make_thumb(image_path, size)

Celery is used mostly for:

  1. Generating assets after upload
  2. Notifying a set of users when an event happens.
  3. Keeping a search index up to date
  4. Replacing cronjobs (backups, cleanup)

The message broker have the role to send the tasks to the consumers. The consumer is a process that has the role to consume tasks from the queue. The consumer then delegates the tasks to the worker pool to be processed.

Celery was made to work with RabbitMQ, although it can work with other message brokers like Redis. RabbitMQ implements AMQP (Advanced Message Queuing Protocol) that delivers and tracks message acknowledgments. AMQP backend spawns an Erlang process and a queue for each task, which is very expensive. So, in some cases it is useful to keep a cache for previous executed tasks and save these results in key/value store.

#settings.py
CELERY_RESULT_BACKEND = 'redis://:password@host:port/db’

Some tasks can return data like this.

@app.task
def add(a, b):
    return a + b

So, to track the state of a task and its return values, hen execution is finished a "result backend" is used.

When using celery, it is necessary to define the number of workers. To not impact performance, it is recommended that the number of workers be n+1 workers (n is the number of cores).

celery worker -A appname -c 5
celery worker -A appname -P eventlet -c 1000
celery worker -A appname -P gevent -c 1000

There are some pitfalls that is necessary to take care when using Celery:

  1. Parallel Safety
  2. Poor Task Granularity
  3. Misguided Optimizations

In the case of Parallel Safety, you need to write tasks to be parallel safe. If you had no idea when tasks run, you need to plan. If you are trying to disallow certain tasks from running in parallel because of some possible deadlocks, you are focusing on the wrong problem. It means that you need to design better your application. Don't pass objects as task parameters use values like JSON or YAML, and be careful to not have parallel transactions in the database because you can return the wrong value.

In this example, it is used report_id to avoid the PDF being generated if there is already the up-to-date version. When two requests ran in parallel, just one request will be accepted.

#tasks.py
from datetime import datetime

from project.celery import app
from project.utils import create_pdf_from_html

@app.task
def generate_pdf_report(report_id):
    report = Report.objects.get(id=report_id)
    if report.pdf_last_modified < report.html_last_modified:
        pdf_data = create_pdf_from_html(report.html)
        now = datetime.now()
        Report.objects.filter(id=report_id).update(
            pdf_data=pdf_data, pdf_last_modified=now)

When creating a task, it is necessary to have the right granularity of a task. Too big task code is hard to read and maintain and takes many seconds, even minutes to run. Also, it is harder to scale horizontally. When you design tasks around small units, you can execute them in parallel. Don’t parallel everything, because some problems can only execute in sequential.

#tasks.py

from project.celery import app

from forum.utils import notify_user
from forum.models import ForumPost

@app.task
def notify_forum_users(post_id):
    post = ForumPost.objets.get(post_id)
    users = post.parent_thread.users.all()
    for user in users:
        notify_user(user)

Granularity Problem



#tasks.py

from project.celery import app

from forum.utils import notify_user
from forum.models import ForumPost

@app.task
def notify_forum_users(post_id):
    post = ForumPost.objets.get(id=post_id)
    users = post.parent_thread.users.all()
    for user in users:
        notify_user_task.delay(user.id)

@app.task
def notify_user_task(user_id)
    user = User.objects.get(id=user_id)
    notify_user(user)

Granularity Solution

When optimizing Celery, don't just turn the number of workers up without measuring. Too much concurrency and resources become constrained. Like most code, network and storage I/O tend to be the bottleneck. Also profile your code if it is slow and look for slow DB queries, storage or network intensive work.

This notes came from bit.ly/1k1fVLz.

Monday, 23 November 2015

Participatory Networking: An API for Application Control of SDNs

This paper presents an API called PANE that is implemented by an OpenFlow controller that delegates read and write authority to the end users and allows applicants and end-hosts to control software-defined networking (SDN) bandwidth. In a common network or SDN, the absence of security guarantees and limited authorities, participatory networks would be places of anarchy. To be usable, such networks must provide isolation for their independent principals, preventing malicious users from consuming and controlling the bandwidth, dropping packets, or worse. While isolation could be provided through network virtualization, the authors believe that is not always the right abstraction as it hides the fundamentally shared aspect of networks. PANE delegates read and write authority, with optional restrictions, from the network's administrators to the users, or applications and hosts acting on their behalf. This API have to safely decompose control and visibility of the network, and to resolve conflicts between participants and across requests.

PANE is designed for networks within a single administrative domain including corporate WANs, datacenters, campus or enterprise networks, and home networks. In such networks, there is, logically, a single owner from which authority can be delegated. Conflicts arises naturally in a participatory network, as PANE is designed to allow multiple, distributed principals to author the network configuration. PANE represents the user requests in a Hierarchical Flow Tables (HFT), and to resolve conflicts the policies take a conflict resolution operator that takes as input two conflicting requests, and return a single resolved request. For instance, if we have requests that demand bandwidth of 10Mbps and 30Mbps, this conflict is resolved by guaranteeing the higher bandwidth.

PANE uses multiple types allow HFTs to resolve different types of conflicts using independent logic: 1. resolve conflicts between requests within the same share 2. resolve conflicts between conflicting requests in parent and child shares, 3. and resolve conflicts between sibling shares.

A network can be partitioned due to a failure in the switches or routers. PANE controller considers two types of failures: the failure of network elements, and the failure of the controller itself.

In the first case, when a switch or link fails, or when a link's configuration changes, the PANE runtime must recompile the policy tree to new individual switch flow tables, as previously used paths may no longer be available or acceptable. In the second case, PAN can keep a database-like persistent log of accepted requests that are periodically compacted by removing the requests that expired. Upon a recovery, PANE controller can restore its state.

They tested some applications with PANE to check the benefits of the solution. One of the test was made with Hadoop MapReduce, where they augmented Hadoop 2.0.3 to work with PANE API. By using PANE, Hadoop was able to reserve guaranteed bandwidth for some operations like the shuffle and sort, and the writing the final output. The first set of reservations occurs during the shuffle where each reducer reserves bandwidth for transferring data from the mappers. The second set of reservations reserves bandwidth when writing the final output. These few reservations improved the overall performance by 19%.