Final Exam Study Guide

The three-hour study guide for the final exam

Paul Krzyzanowski

December 2023

Disclaimer: This study guide attempts to touch upon the most important topics that may be covered on the exam but does not claim to necessarily cover everything that one needs to know for the exam. Finally, don't take the three hour time window in the title literally.

Last update: Wed Dec 13 16:49:54 EST 2023

Introduction

Distributed computing has been around for almost as long as computing itself. What makes distributed systems more interesting now than they were when data networking became mainstream in the 1980s and even more widely used than in the early years of the Internet throughout the 1990s? Several advances in various areas of computing technology and connectivity had a profound effect on the value and design of distributed systems. Since computer networking went mass market in the 1980s, local area network speeds increased by a factor of a thousand and wide-area (Internet) speeds by even more. Connectivity within a local area network (LAN) moved from shared to switched networking, allowing the network to scale without increasing congestion. On the wide area network, Internet access has become available to the population at large, not just to researchers on Department of Defense projects, providing us with worldwide connectivity.

In 1965, Intel co-founder Gordon Moore predicted that the number of transistors on integrated circuits, and hence processor performance, would double approximately every two years. This prediction, known as Moore’s Law has turned out to hold (approximately) true for over fifty years. The performance increases were somewhat fudged over time since processor speeds haven’t increased in recent decades. Instead, we turned to more cores and specialized processor cores (such as graphics processors, image signal processors, and cryptographic processors – using these is called heterogeneous computing). Processor performance, system memory, and disk capacity has also increased by more than a thousandfold over the past couple of decades.

Even though these improvements make it easier to store, process, and move data between systems, what are the motivations for distributing computing?

There are several. If we need higher throughput (more performance), we can scale horizontally or vertically. Vertical scaling is replacing a system with a more powerful one. Horizontal scaling is adding more systems to distribute the load. Performance does not scale linearly with an increase in price with a single computer and we quickly reach a point where we simply cannot get a faster computer or higher-capacity disk; with a collection of computers, linear scaling may be possible. Secondly, distributing systems makes sense in certain environments. For example, databases may reside in different locations than the user. Thirdly, we may need to interact with other people or remote data that are geographically dispersed. Metcalfe’s “Law” states that the value of a telecommunications network is proportional to the square of the number of connected users of the system. As with Moore’s Law, this isn’t a real law, of course. The “square” part comes from the fact that the number of edges in a fully-connected graph is proportional to the square of the number of vertices. A vertex represents a person and an edge represents the communication path from one person to another. Simply put, there is a lot of value in the ability to communicate with many people. Without it, services such as TikTok, Twitter, eBay, Instagram, Facebook, and many others would not be nearly as useful.

Multiprocessors

An architecture where multiple identical processors communicate with a single shared memory is called a multiprocessor.

Multiprocessor systems are characterized by four features: (1) the processors all share the same memory, (2) they all share the same clock, (3) they share the same operating system, and (4) they exhibit an all-or-nothing property to system failure. What this last item means is that if the system is dead, none of the processors are working. With a multicomputer system, it is certainly possible to have one computer functioning while another is not.

The most common architecture for a multiprocessor system is symmetric multiprocessing, or SMP. In this kind of system, all processors are connected to the same shared memory and run the same operating system. No one processor has faster or prioritized access to memory or system peripherals than any other.

Multicomputers

Systems without shared memory are collections of separate computers, each with its own memory and running its own instance of an operating system. The systems do not share a clock and cannot be expected to have the same time of day or execute programs at the exact same time and speed as one another.

Without shared memory, multicomputers have must on a network to communicate and are sometimes referred to as networked computers or multicomputers. Multicomputers are the building blocks of distributed systems.

Fault tolerance

While we do not expect our personal computers to fail at any given time, failure is a fact of life in distributed systems. It’s simply a matter of statistics: if you have a collection of thousands of systems, it is very likely that on any given day something goes wrong: a computer or disk dies, a switch goes bad, a network cable is unplugged, or something loses power.

Should our local computer die, it is an all-or-nothing failure: the entire system is dead. In distributed systems, one system may fail while others continue to work. This is a partial failure. Partial failures can be insidious. If one computer sends a message to another and does not get a reply, what happened? It could be any or all of several things: the system is slow to respond, the receiving process failed, the entire receiving computer failed, a network connection was broken, or a routing issue incurred huge delays.

Handling failure is one of the central themes of distributed system design. We need to be able to handle detection, recovery, and restart.

Detection
Identify the cause of the failure
Recovery
Services - the distributed algorithms - need to work around the failure and continue functioning properly. This might involve starting a service on a different system, electing a new coordinator, or stopping any attempts at communicating with the failed system.
Restart
At some point, the failed element may be brought back into the distributed system. It may be moments later, when a network cable is reinserted, or a far longer time if, for example, a failed power supply or motherboard needs to be replaced. Regardless of the elapsed time. the restarted system needs to reintegrate itself into the whole system. It may have missed messages and its knowledge of the world is outdated. For example, it may have hosted a replicated object store and missed getting information about new, updated, and deleted objects.

Fault tolerance needs to address both availability and reliability. Availability refers to the fraction of time that the system as a whole is usable. Since individual systems may fail, we achieve high availability via redundancy: deploying duplicate, triplicate (and more) systems. The design of redundant systems has to consider all of the three aforementioned points. Failure to properly address the restart of a system may create consistency problems, where one replicated server returns different data than another one.

Redundancy assumes that the overall system is designed to tolerate the failure of some components. For instance, we have two systems, each with a downtime probability of 5% and need only one to be functioning, the probability that both systems will be down at the same time is P(A and B) = P(A) × P(B), or 5% × 5% = 0.25%. Uptime is simply 100%-downtime. By adding a redundant component, we increased the uptime from 95% to 100%-0.25%=99.75%.

The converse of redundancy is when we design a system that requires all components to function. With the same 5% downtime as in the previous example, the probability that both systems are down is 100% - P(system A is up AND system B is up), which is 1-(1–5%)×(1–5%), or 1 - 0.95 × 0.95 = 9.75%. Uptime is 1-downtime, so in this case, we have an uptime of 90.25% versus 90% for a single system. As we depend on more and more systems, the probability of any system being down approaches 100%.

These examples illustrate series systems* versus parallel systems. A series system fails if any of its components fail while a parallel system Fails only if all of its components fail. We want to avoid designing series systems.

Reliability may be confused with availability but refers how long a system can function before it fails. A system can be highly available but not reliable if it can recover quickly from failure. Reliability also deals with the integrity of data. We can have systems that appear to be functioning well but transmit incorrect data. Or we might have malicious interference where an intruder is sending messages to confuse the systems. We will address message integrity with error detecting and correcting codes or with cryptographic hashes and message authentication codes but we may also need to address issues of message authentication.

Failure can manifest itself in different ways:

Fail-stop
With fail-stop failure, the failed component simply stops functioning. Ideally, it will be able to detect its own failure and notify other members of the system first, but this is often not possible. Halting refers to the explicit case where a component stops without notice. We can try to detect failed components by sending messages over a network and setting timeouts for a response. Unfortunately, this is not foolproof because network latency is variable, and the response might arrive after our timeout. Moreover, we can have problems with network connectivity between some hosts.
Fail-restart
Fail-restart is when a component restarts after a failure. The restart may be nearly instantaneous, so other systems didn’t notice, or it may occur after a long interval. A concern with fail-restart behavior is stale state: the restarted component may have missed messages and hence has a view of the world that is now obsolete even though it is now functioning correctly.
Omission
Omission failure deals with networking. It is the failure to send or receive messages. This can be due to data corruption, queue overflows in routers, or overflows in the receive buffer in the operating system. An omission failure may cause a query or its response to get dropped, resulting in one system assuming that another one has failed.
Timing
With asynchronous networks such as IP, messages may take longer to arrive than expected. This can lead us to assume that a system is not responding and hence not functioning when it actually is operating. Another problem that is based on timing is that each system has its own clock and hence its own concept of the time of day. This can create undesirable behavior with process coordination, message ordering, and system logs.
Partition
A network of computers may be working, but a link between two groups of systems may fail. For example, an Ethernet switch connecting two racks may fail or a cable between them may get disconnected. In this case, the network fragments into two or more sub-networks that cannot communicate with each other. Each group of systems thinks the other group is dead.
Byzantine
Byzantine failures cover any failures where a component does not cease to function but instead produces faulty data. This can be due to bad hardware, software logic errors, network problems or it can be due to malicious interference. To a large extent, we will address byzantine failures on the network with the use of cryptography.

Regardless of the type of failure, a basic goal in distributed systems is to design a system that avoids a single point of failure. This is the case where one component is crucial to the functioning of the entire system. For example, we might have one process on one system that serves as a coordinator to dispatch and check on computation taking place on thousands of other systems or a single computer that keeps track of where various blocks of data live in a distributed storage system. A failure of such coordinators effectively causes the entire system to fail.

Global state

In a distributed environment, it helps for one process to know what other systems are doing. For instance, a process may need to know the currently active members of a group of processes that hold replicated data. A problem with distributed systems design is that nobody has the true global state of a system. Because we lack shared memory, we cannot instantaneously see the data or check the liveness of other processes. Any data that changes over the execution of a program is referred to as state. For example, state may be lists of live processes, group members, database contents, computation progress, lists of processes that have remote files open, etc.

A process obviously knows its own state. It can periodically report its state to other processes via network messages and it may receive updates from other processes over the network. However, these updates are neither continuous nor instantaneous. A process will only know the last reported state of other processes, which may not be equivalent to the current state of those processes.

One type of state is not just information about group membership or the status of processes but the data stored by network file systems, databases, and object stores. This data is shared among systems that are designed to act as replicas – redundant systems that will give us instantaneous backups in case one system dies or allow us to load balance requests among multiple systems. Here we also have to deal with the fact that not all processes will be updated instantaneously.

A restricted form of replication is a cache. A cache is simply local storage of frequency-accessed data to reduce access latency. Instead of making a network request, a process can store a copy of the results. For example, a process may store the result set of common database queries. Caching can do a wonderful job in improving performance but also poses the risk of stale data – cached copies of data that are no longer valid since the original data has been modified.

Software

One goal in some distributed systems software is providing a single-system image. This is software that makes a collection of independent computers appear as a single system to the users. This involves hiding the fact that the work is distributed among multiple systems. The software should “just work.” A few areas when we want to provide transparency include:

Location
The user should not be aware of where the software is actually running or where resources reside.
Migration
The user should not be aware that the location of resources may have moved from one place to another or that a process was restarted on another computer, possibly in another data center.
Replication
The user should not be aware that data might be replicated for fault tolerance or for proximity to provide faster access.
Concurrency
The user should not be aware that multiple processes might be accessing resources at approximately the same time. Results should appear as if all the processes ran one after another in some order. This means that some processes might be temporarily locked from being able to access a set of resources while another process is using them.

Service models

In software design, we often turn to layered architectures, where we break up application functionality into multiple layers of abstraction. each layer presents well-defined interfaces and hides the specifics of its implementation. For example, a typical computer system has an operating system that provides well-defined access to system resources, middleware that is linked to the application as a set of libraries that abstract things such as message encoding, communication, encryption, and database access, and various layers of abstraction created by the application designer.

With network systems, we often experience similar layers of abstraction but this time across systems. When our network-based software architecture mimics a layered design, we use autonomous processes that communicate with each other via a network interface rather than procedure calls. Each such abstraction layer is known as a tier in a multi-tier model. It is a generalization of a client-server model.

Centralized
The original, non-networking computing model is a centralized one, where all computing takes place on a single system.
Client-server
This is the dominant model of interaction in a networked system and is the foundation of many other models. One application, called the client (and usually run by the end user), requests something from another system, called a server. The server provides a service. Examples of this are a web browser (client) requesting a web page from a web server, a mail application (client) accessing a mail server to get mailbox contents, or a print server being given content to print. In this model, clients communicate with the server and not with other clients. The model can be enhanced with multiple “layers,” or services, to mimic a layered architecture, resulting in a build a multi-tier system.
Peer-to-peer
A peer-to-peer architecture employs a collection of systems, all running identical software, any of which can talk to any other. These applications are peers and are often run by a collection of end users rather than some service provider. The name peer implies that there is no leader: applications all have equal capabilities. An appealing aspect of a peer-to-peer design is self-scalability. As more and more computers join the collection of peers, the system has more peers to do the work and can thus handle a large workload. Examples of peer-to-peer architectures are BitTorrent, Skype, and Amazon Dynamo.
Hybrid
A difficulty with peer-to-peer architectures is that one often needs to do things such as keep track of peers, identify which system can take on work or has specific content, and handle user lookup and authentication. This led to a variation of the peer-to-peer model where a coordinator, a central server, is in place to deal with these centralized needs. However, the peers still handle all the bandwidth-intensive or compute-intensive work.

Networking

Goal: Enable computers to communicate with each other; create the illusion of machine-to-machine and process-to-process communication channels.

Without shared memory, we need a way for collections of systems (computers or other endpoint devices) to communicate. To do so, they use a communication network.

<– If every communicating pair of hosts would have a dedicated physical connection between them, then there would be no sharing of the network infrastructure and we have a true physical circuit. This is not practical since it limits the ability for arbitrary computers to talk with each other concurrently. It is also incredibly wasteful in terms of resource use: the circuit would be present even if no data is flowing.

What is needed is a way to share the network infrastructure among all connected systems. The challenge in doing so is to allow these systems to talk but avoid collisions, the case when two nodes on a network transmit at the same time, on the same channel, and on the same connection. Both signals then get damaged and data does is not transmitted, or is garbled. This is the multiple access problem: how do you coordinate multiple transmitters on a network to ensure that each of them can send their messages?

There are three broad approaches that enable us to do this:

  1. Channel partitioning divides the communication channel into “slots”. If the network is divided into short, fixed-length time slots, we have Time Division Multiplexing, or TDM. Each host must communicate only during its assigned time slot. Routers may connect multiple networks together. When two hosts need to communicate, they establish an end-to-end route called a virtual circuit. It is called a virtual circuit because the setup of this route configures routers and assigns communication slots. This provides the illusion of a true circuit switched network in that all messages are guaranteed to arrive in order with constant latency and a guaranteed bandwidth. The switched telephone network is an example of virtual circuit switching, providing a maximum delay of 150 milliseconds and digitizing voice to a constant 64 kbps data rate.

    If the network is partitioned into frequency bands, or channels, then we have Frequency Division Multiplexing, or FDM. This defines a broadband network. Cable TV is an example of a broadband network, transmitting many channels simultaneously, each in using a well-defined frequency band.

    The problem with a channel partitioning approach is that it is wasteful. Network bandwidth is allocated even if there is nothing to transmit.

  2. Taking turns requires that we create some means of granting permission for a system to transmit. A polling protocol uses a master node to poll other nodes in sequence, offering each a chance to transmit their data. A token passing protocol circulates a special message, called a token, from machine to machine. When a node has a token, it is allowed to transmit and must then pass the token to its neighbor.

    The problem with the taking turns approach is that a dead master or lost token can bring the network to a halt. Handling failure cases is complex. This method was used by networks such as IBM’s Token Ring Network but is largely dead now.

  3. A random access protocol does not use scheduled time slots and allows nodes to transmit at arbitrary times in variable size time slots. This technique is known as packet switching. Network access is accomplished via statistical multiplexing. A data stream is segmented into multiple variable-size packets. Since these packets will be intermixed with others, each packet must be identified and addressed. Packet switched networks generally cannot provide guaranteed bandwidth or constant latency. Ethernet is an example of a packet-switched network. –>

Packet switching is the dominant means of data communication today. The packets in a packet-switched network are called datagrams and are characterized by unreliable delivery with no guarantees on arrival time or arrival order. Each datagram is fully self-contained with no reliance on previous or future datagrams. This form of communication is also known as connectionless service. There is no need to set up a communication session and hence no concept of a connection. Neither routers nor endpoints need to maintain any state as they have to do with a virtual circuit; there is no concept of where a system is in its conversation.

OSI reference model

Data networking is generally implemented as a layered stack of several protocols — each responsible for a specific aspect of networking. The OSI reference model defines seven layers of network protocols. Some of the more interesting ones are: the network, transport, and presentation layers.

 1. Physical
Deals with hardware, connectors, voltage levels, frequencies, etc.
 2. Data link
Sends and receives packets on the physical network. Ethernet packet transmission is an example of this layer. Connectivity at the link layer defines the local area network (LAN).
 3. Network
Relays and routes data to its destination. This is where networking gets interesting because we are no longer confined to a single physical network but can route traffic between networks. IP, the Internet Protocol, is an example of this layer.
 4. Transport
Provides a software endpoint for networking. Now we can talk application-to-application instead of machine-to-machine. TCP/IP and UDP/IP are examples of this layer.
 5. Session
Manages multiple logical connections over a single communication link. Examples are TLS (Transport Layer Security and its predecessor, SSL, the Secure Sockets Layer) tunnels, remote procedure call connection management, and HTTP 1.1.
 6. Presentation
Converts data between machine representations. Examples are data representation formats such as XML, JSON, XDR (for ONC remote procedure calls), NDR (for Microsoft COM+ remote procedure calls), and ASN.1 (used for encoding cryptographic keys and digital certificates).
 7. Application
This is a catch-all layer that includes every application-specific communication protocol. For example, SMTP (sending email), IMAP (receiving email), FTP (file transfer), HTTP (getting web pages).

Data link layer

Ethernet and Wi-Fi (the 802.11 family of protocols) are the most widely used link-layer technologies for local area networks. Both Wi-Fi and Ethernet use the same addressing format and were designed to freely interoperate at the link layer.

Ethernet provides packet-based, in-order, unreliable, connectionless communications. It occupies layers one and two of the OSI model. There is no acknowledgment of packet delivery. This means that a packet may be lost or mangled and the sender will not know. Communication is connectionless, which means that there is no need to set up a path between the sender and receiver and no need for either the sender or receiver to maintain any information about the state of communications; packets can be sent and received spontaneously. Messages are delivered in the order they were sent. Unlike IP-based wide-area networking, there are no multiple paths that may cause a scrambling of the sequence of messages.

Interfaces communicating at the link layer must use link-layer addressing. A MAC address (for example, an Ethernet address) is different from, and unrelated to, an IP address. An Ethernet MAC address is globally unique to a device and there is no expected grouping of such addresses within a local area network. IP addresses on a LAN, on the other hand, will share a common network prefix.

Network layer: IP Networking

The Internet Protocol (IP) is a network layer protocol that handles the interconnection of multiple local and wide-area networks and the routing logic between the source and destination. It is a logical network whose data is transported by physical networks (such as Ethernet, for example). The IP layer provides unreliable, connectionless datagram delivery of packets between nodes (e.g., computers).

The key principles that drove the design of the Internet are:

  1. Support the interconnection of networks. The Internet is a logical network that spans multiple physical networks, each of which may have different characteristics. IP demands nothing of these underlying networks except an ability to try to deliver packets.

  2. IP assumes unreliable communication. That does not mean that most packets will get lost! It means that delivery is not guaranteed. If reliable delivery is needed, software on the receiver will have to detect lost data and ask the sender to retransmit it. Think of mail delivery: most mail gets to its destination but once in a while, a letter gets lost or takes a really long time to arrive.

  3. Routers connect networks together. A router is essentially a dedicated computer with multiple network links. It receives packets from one network and decides which outgoing link to send the packet.

  4. No central control of the network. The precursor of the Internet was the ARPAnet, and was built to connect companies and universities working on Department of Defense projects. As such, it was important that there wouldn’t be a single point of failure – a key element that could be taken out of service to cause the entire network to stop functioning.

Since IP is a logical network, any computer that needs to send out IP packets must do so via the physical network, using the data link layer. Often, this is Ethernet, which uses a 48-bit Ethernet address that is completely unrelated to a 32-bit IP address (or a 128-bit IPv6 address if IPv6 is used). To send an IP packet out, the system needs to identify the link layer destination address (MAC, or Media Access Control address) on the local area network that corresponds to the desired IP destination (it may be the address of a router if the packet is going to a remote network). The Address Resolution Protocol, or ARP, accomplishes this. It works by broadcasting a request containing an IP address (the message asks, do you know the corresponding MAC address for this IP address?) and then waiting for a response from the computer with the corresponding IP address. To avoid doing this for every outgoing packet, ARP maintains a cache of most recently used addresses. Wi-Fi uses the same addressing format as Ethernet and can bridge with ethernet networks to form a single local area network.

Transport layer: TCP and UDP

IP is responsible for transporting packets between computers. The transport layer enables applications to communicate with each other by providing logical communication channels so that related messages can be abstracted as a single stream at an application.

There are two widely-used transport-layer protocols on top of IP: TCP and UDP.

TCP (Transmission Control Protocol) provides reliable byte stream (connection-oriented) service. This layer of software ensures that packets arrive at the application in order and lost or corrupt packets are retransmitted. The transport layer keeps track of the destination so that the application can have the illusion of a connected data stream.

UDP (User Datagram Protocol) provides datagram (connectionless) service. While UDP drops packets with corrupted data, it does not ensure in-order delivery or reliable delivery.

Port numbers in both TCP and UDP are used to allow the operating system to direct the data to the appropriate application (or, more precisely, to the communication endpoint, or socket, that is associated with the communication stream).

TCP tries to give a datagram some of the characteristics of a virtual circuit network. The TCP layer will send packet sequence numbers along with the data, buffer received data in memory so they can be presented to the application in order, acknowledge received packets, and request a retransmission of missing or corrupt packets. The software will also keep track of source and destination addresses (this is state that is maintained at the source and destination systems). We now have the illusion of having a network-level virtual circuit with its preset connection and reliable in-order message delivery. What we do not get is constant latency or guaranteed bandwidth. TCP also implements flow control to ensure that the sender does not send more data than the receiver can receive. To implement this, the receiver simply sends the amount of free buffer space it has when it sends responses. Finally, TCP tries to be a good network citizen and implements congestion control. If the sender gets a notification of a certain level of packet loss, it assumes that some router’s queue must be getting congested. It then lowers its transmission rate to relieve the congestion.

The design of the Internet employs the end-to-end principle. This is a design philosophy that states that application-specific functions should, whenever possible, reside in the end nodes of a network and not in intermediary nodes, such as routers. Only if the functions cannot be implemented “completely and correctly,” should any logic migrate to the network elements. An example of this philosophy in action is TCP. TCP’s reliable, in-order delivery and flow control are all is implemented via software on the sender and receiver: routers are blissfully unaware of any of this.

A related principle is fate sharing, which is also a driving philosophy of Internet design. Fate sharing states that it is acceptable to lose the state information associated with an entity if, at the same time, the entity itself is lost. For example, it is acceptable to lose a TCP connection if a client or server dies. The argument is that the connection has no value in that case and will be reestablished when the computer recovers. However, it is not acceptable to lose a TCP connection if a router in the network dies. As long as alternate paths for packet delivery are available, the connection should remain alive.

Acknowledgments

To achieve reliable delivery on an unreliable network, we rely on detecting lost or corrupted packets and requesting retransmissions.

The simplest possible mechanism is to send a packet and wait for the receiver to acknowledge it … then send the next one and wait for that to get acknowledged. This, unfortunately, is horribly inefficient since only a single packet is on the network at any time. It is more efficient to use pipelining and send multiple packets before receiving any acknowledgments. Acknowledgments can arrive asynchronously and the sender needs to be prepared to retransmit any lost packets.

It would be a waste of network resources for the TCP layer to send back a packet containing nothing an acknowledgment number. While this is inevitable in some cases, if the receiver happens to have data to transmit back to the sender, the acknowledgment number is simply set in the TCP header of the transmitted segment, completely avoiding the need to send a separate acknowledgment. Using an outgoing data segment to transmit an acknowledgment is known as a piggybacked acknowledgment.

TCP also uses cumulative acknowledgments. Instead of sending an acknowledgment per received message, TCP can acknowledge multiple messages at once.

Sockets

Sockets are a general-purpose interface to the network provided to applications by the operating system. By this, we mean that they were not designed to support one specific network but rather provide a generic mechanism for inter-process communication. Since the operating system provides access to devices, sockets are the only way that an application can interact with the network. Any higher-level communication abstractions are built on top of sockets. We will look at sockets at the system call level. With minor differences, they socket-related system calls are similar among Windows, macOS, Linux, and pretty much every other operating system.

A socket is a communication channel on which a process and read and write data on the network. Each socket is created with the socket system call and assigned an address and port number with the bind system call. For connection-oriented protocols (e.g., TCP), a socket on the server can be set to listen for connections with the listen system call. The accept call blocks until a connection is received, at which point the server receives a socket dedicated to that connection. A client establishes a connection with the connect system call. The “connection” is not a configuration of routers as with virtual circuits; it is just data (state) that is maintained by the transport layer of the network stack in the operating system at both endpoints. After this, sending and receiving data is compatible with file operations: the same read/write system calls can be used. When communication is complete, the socket can be closed with the shutdown or close system calls.

With sockets that use a connectionless protocol (e.g., UDP), there is no need to establish a connection or to close one. Hence, there is no need for the connect, listen, or shutdown system calls. The sendto and recvfrom system calls were created to send and receive datagrams since the read and write system calls do not enable you to specify the remote address. sendto allows you to send a datagram and specify its destination. recvfrom allows you to receive a datagram and identify who sent it.

Protocol encapsulation

We saw that if we want to send an IP packet out on an Ethernet network (IP is a logical network, so there is no physical IP network), we needed to send out an Ethernet packet. The entire IP packet becomes the payload (data) of an Ethernet packet. Similarly, TCP and UDP have their own headers, distinct from IP headers (they both need a port number and checksum, for example). A TCP or UDP packet is likewise treated as data by the IP layer. This wrapping process is known as protocol encapsulation. It enables protocol layers to stay distinct. TCP headers, for example, could live within IPv4 or IPv6, which are two different versions of the IP protocol with different headers and address sizes. TCP doesn’t care what’s under it. Similarly, the IP layer doesn’t care what it’s carrying: whether it’s TCP, UDP, RSVP, or any other transport layer protocol. IP headers and their encapsulated data don’t care if they are being pushed around over Ethernet on a local area network or SONET (Synchronous Optical Network) on a wide area network.

Remote Procedure Calls

Goal: Provide a layer of abstraction for process-to-process communication that enables a process on one system to invoke a function, or service, on another system without having to deal with the issues of network communication, formatting data, and parsing responses.

One problem with the interface offered by sockets was that it offered a send-receive model of interaction. However, most programs use a functional (procedure call) model. Remote procedure calls are a programming language construct (something provided by the compiler, as opposed to an operating system construct such as sockets). They provide the illusion of calling a procedure on a remote machine. During this time, execution of the local thread stops until the results are returned. The programmer is alleviated from packaging data, sending and receiving messages, and parsing results.

The illusion of a remote procedure call is accomplished by generating stub functions. On the client side, the stub (sometimes known as a proxy) is a function with the same interface as the desired remote procedure. Its job is to take the parameters, marshal them into a network message, send them to the server, await a reply, and then unmarshal the results and return them to the caller. On the server side, the stub (sometimes known as a skeleton) is responsible for being the main program that registers the service and awaits incoming requests for running the remote procedure. It unmarshals the data in the request, calls the user’s procedure, and marshals the results into a network message that is sent back to the recipient.

There are a few hurdles to overcome in implementing remote procedure calls:

Parameter passing
Most parameters in our programs are passed by value. That is easy to do remotely: just send the data in a network message. Some parameters, however, are passed by reference. A reference is a memory address of the parameter. The problem with passing this is that memory is local and the memory address passed from a client to a server will now refer to memory on the server rather than to the contents on the client. There is no good solution to this except to understand the structure of the data that is being referenced, send it to the remote side (pass by value), where it will be placed in a temporary memory buffer. A local reference can then be passed to the server function. Because the contents might have been modified by the function, the data will need to be sent back to the calling client and copied back to its original location.
Marshalling
All data that is sent needs to be represented as a series of bytes that can be placed into one or more network messages. This is known as marshalling. Not only must any data structure be sent in a serialized format: a sequence of bytes with no pointers, but the format of this marshalled data must be standardized between the client and server so that the server can make sense of the data it receives and vice versa. Different processors and languages may use different conventions for integer sizes, floating point sizes and formats, placement of most significant bytes, and alignment of data.
The marshalled data that is sent over the network may contain data that makes it self-describing: identifying the individual parameters by name and type. Such a format is known as explicit typing. JSON and XML formats are examples of this. In contrast, implicit typing does not send such information and requires the remote side to know the precise expected sequence of parameters.
Generating stubs
Since most languages do not support remote procedure calls natively, something has to generate client and server stubs. That is often a stand-alone program known as an RPC compiler. The RPC compiler takes an interface specification as input and generates client-side stubs (proxies) and a server-side proxy (skeleton). The interface specification is written in an interface definition language (IDL) and defines remote classes, methods, and data structures. It contains all the information that the RPC compiler needs to generate stub functions.
Looking up services
A client process needs to find out how to set up a network connection to an appropriate service that hosts the remote procedures: which host and port to use. An RPC name server is a network service that a client can communicate with to query the host and port of a desired remote interface. The client sends the server a name identifying the interface. That “name” is often a number that uniquely identifies the service that hosts a set of functions on the server. The server returns a host and port number for the service that implements the functions. In many cases, the name server resides on the machine where the remote procedures run and the server will return only the port number. When a service starts up, it will register its interfaces with the RPC name server.
Handling failures
We don’t have a concept of local procedure calls not working. With remote calls, however, problems can arise. The server can stop working or network connectivity may break or experience unexpected delays. These may prevent or delay requests reaching the server or responses reaching the client. To combat this, RPC libraries may attempt to retransmit requests if a response is not received in time. This may have the side effect of invoking a procedure more than once if the network is slow. In some cases, no harm is done in doing this.
Functions that may be run multiple times without undesirable side-effects are called idempotent functions. Functions that have undesirable side-effects if run multiple times (e.g., transfer $500 from my checking account to my savings account) are called non-idempotent functions.
Most RPC systems offer at least once semantics, in which case a remote procedure will be executed one or more times (if there are network delays) or at most once semantics, in which case a remote procedure library will avoid resending the procedure request even if it does not get a timely response. Software that uses remote procedure calls has to be prepared to deal with errors that can arise from problems in contacting or getting a response from a remote procedure.

Remote Procedure Calls: case studies

Sun (ONC) RPC

ONC (Open Network Computing) RPC (usually called Sun RPC because of its origins at Sun Microsystems) was one of the first RPC systems to achieve widespread use, thanks to the early popularity of Sun workstations, servers, and particularly the Network File System (NFS). It is still in use on virtually all UNIX-derived systems (Linux, macOS, *BSD, SunOS). It uses an RPC compiler called rpcgen that takes input from an interface definition language (IDL) file. This is a file that defines the interfaces to the remote procedures. The interfaces are a set of functions that could be called by clients, including their parameters and return types. The programmer can also define multiple versions of an interface. This is useful since services may evolve over time but one cannot always count on all clients getting updated; some clients may still call functions on the old interfaces, which might take different parameters or have different names. From this IDL file, rpcgen creates client stub functions and a server stub program. These can be compiled and linked with the client and server functions, respectively.

A programmer must assign a program number to each interface, that is, each set of server functions in the IDL file. This is a 32-bit number that must be a unique ID on that server.

When the server starts up, it binds a socket to any available port and registers that port number and interface’s program number with a name server, known as the portmapper, running on the same machine. A client, before it can invoke any remote procedure calls, contacts the portmapper on the specified server with the program number to find the port to which it needs to send its requests.

The choice of transport protocol, UDP or TCP, can be specified at run-time. All incoming and return parameters are marshalled into a standard format called XDR, or eXternal Data Representation. XDR is a binary format that uses implicit typing.

DCE RPC

The Distributed Computing Environment, defined by the Open Group, created its own flavor of RPC which was similar to Sun RPC. They also had the programmer specify an interface in an IDL, which they called the Interface Definition Notation (IDN).

To avoid the problem of having a programmer choose a unique 32-bit identifier for the interface, DCE RPC provides the programmer with a program called uuidgen. This generates a **unique universal ID **(UUID) – a 128-bit number that is a function of the current time and ethernet address.

The Distributed Computing Environment also introduced the concept of a cell, which is an administrative grouping of machines. Each cell has a cell directory server that maintains information about the services available within the cell.

Each computer in the cell knows how to contact its cell directory server. When a server program starts up under DCE RPC, it registers its port and the interface’s UUID with a local name server (the DCE host dæmon, dced, which is similar to the portmapper we find on Linux and BSD systems).

The server program also registers the UUID-to-host mapping with the cell directory server. This allows for location transparency for services: a client does not need to know what machine a service lives on a priori. The client queries the cell server with the UUID of the interface to find the system on which the desired service is running. Then it contacts the local name server to get the port number and transport type on which to make requests.

A standard way of encapsulating data (marshalling) is crucial since encodings may differ between different machines. Sun defined a standard format called XDR (eXternal Data Representation). Every participating system must convert (marshal) data into this format. DCE defined a format called NDR (Network Data Representation). However, instead of creating a single set of definitions, NDR defines a set of data representations that can be used. The hope is that the sender can find a format that will require minimal or no data conversion (hence, greater efficiency). If the client uses the same system architecture, it also will not need to convert data. In the worst case, only one side will need to convert data. This is known as a multi-canonical approach to data conversion.

As object-oriented languages gained popularity in the late 1980s and 1990s, RPC systems like Sun’s and DCE’s proved incapable of handling some object-oriented constructs, such object instantiation or polymorphism (different functions sharing the same name, with the function distinguished by the incoming parameters). Creating objects, in particular, requires a need for memory allocation on the server along with cleanup when these remote objects are no longer needed. This cleanup is called distributed garbage collection. A new generation of RPC systems dealt with extending RPC to support objects.

Microsoft COM+/DCOM & ORPC (MS-RPC)

Microsoft already had a mechanism in place for dynamically loading software modules, called components, into a process. It was known as COM, the Component Object Model, and provided a well-defined mechanism for a process to identify and access interfaces within the component. The same model was extended to invoke remotely-located components and became the Distributed Component Object Model (DCOM), later fully merged with COM and called COM+.

Because remote components cannot be loaded into the local process space, they have to be loaded by some process on the remote system. This process is known as a surrogate process. It runs on the server (under the name dllhost.exe), accepting remote requests for loading components and invoking operations on them.

COM+ is implemented through remote procedure calls. The local COM object simply makes RPC requests to the remote implementation. Microsoft enhanced the DCE RPC protocol slightly to create Object RPC (ORPC). DCE is compatible with Microsoft’s Object RPC.

Object RPC is essentially DCE RPC with the addition of support for an interface pointer identifier (IPID). The IPID provides the ability to identify a specific instance of a remote class. Interfaces are defined via the Microsoft Interface Definition Language (MIDL) and compiled into client and server side stubs. MIDL is a slight extension to DCE RPC’s IDN. The client-side stub becomes the local COM object that is loaded on the client when the object is activated by the client program. The remote, server-side, COM object is loaded by the server’s surrogate process when first requested by the client. Like DCE, ORPC supports multi-canonical data representation.

Since objects can be instantiated and deleted remotely, the surrogate process needs to ensure that there isn’t a build-up of objects that are no longer needed by any client. COM+ accomplishes this via remote reference counting. This is an explicit action on the part of the client where the client sends requests to increment or decrement a reference count on the server. When the reference count for an object drops to zero, the surrogate process deletes that object.

To guard against programming errors or processes (or computers) that terminated abnormally, a secondary mechanism exists, called pinging. The client must periodically send the server a ping set – a list of all the remote objects that are currently active. If the server does not receive this information within a certain period, it deletes the objects. This is a form of leasing, where the object expires if a lease is not renewed periodically. However, there is a subtle difference. With leasing, the lifetime of an object is generally renewed whenever an object is accessed, so there is no need for the client to ping the server to renew a lease unless it has not accessed the object for the duration of the lease.

Java RMI

When Java was created, it was designed to be a language for deploying downloadable applets. In 1995, Sun extended Java to support Remote Method Invocation (RMI). This allows a programmer to invoke methods on objects that are resident on other JVMs.

Since RMI is designed for Java, there is no need for OS, language, or architecture interoperability. This allows RMI to have a simple and clean design. Classes that interact with RMI must simply play by a couple of rules. All parameters to remote methods must implement the serializable interface. This ensures that the data can be serialized into a byte stream (marshalled) for transport over the network.

Serialization is a core aspect of marshalling: converting data into a stream of bytes so that it can be sent over a network or stored in a file or database. All remote classes must extend the remote interface. A remote interface is one whose methods may be invoked from a different Java virtual machine. Any class that is defined as extends Remote can be a remote object. Remote methods within that class must be capable of throwing a java.rmi.RemoteException. This is an exception that the client RMI library will throw if there is a communication error in calling the remote method.

RMI provides a naming service called rmiregistry to allow clients to locate remote object references. These objects are given symbolic names and looked up via a URI naming scheme (e.g., rmi://cs.rutgers.edu:2311/testinterface).

Java’s distributed garbage collection is somewhat simpler than Microsoft’s COM+. Instead of reference counting, it uses a form of leased-based garbage collection. There are two operations that a client can send to the server: dirty and clean. When the first reference to a remote object is made, the client JVM sends a dirty message to the server for that object. As long as local references exist for the object, the client will periodically send dirty messages to the server to renew the lease on the object. When the client’s JVM’s garbage collector detects that there are no more references to the object, it sends a clean message for that object to the server. Should the client exit abnormally, it will not renew its lease by refreshing the dirty call, so the lease will expire on the server and the server will destroy the object.

RPyC

RPyC (short for Remote Python Call) is a transparent and symmetric remote procedure call library for Python, allowing applications to execute functions on remote machines. It seamlessly extends local Python code to invoke procedures on remote instances. It is transparent in that there are no interface definition files, name servers, HTTP servers, or special syntax involved. It is symmetric because there isn’t a strict distinction between which process is operating as a client and which process is running as a server; either process can handle requests and replies. Methods must be explicitly tagged as exposed for them to be available to other processes.

Unlike other RPC systems, RPyC supports passing objects by value or reference. Immutable data types (such as strings, integers, tuples) are passed by value: they are serialized and sent to the other side. Other objects are passed by reference. A reference becomes a special proxy object that behaves just like the actual object but the proxy sends operations to the remote process – effectively a remote procedure call in the other direction.

Web Services

In the late 1990s, the web browser became the dominant model for user interaction on the Internet. It used HTTP, the Hypertext Transfer Protocol, over TCP for requests and responses and formatted web pages with HTML, the Hypertext Markup Language. While this created a good user experience, dealing with fully-formatted pages was not good for programmatic access to data within these pages. Presentation (tables, images, text formatting) was a major component of the content. Approaches such as site scraping were often employed, where a program would request and receive an HTML page and then parse through tons of formatting directives to access the data it needed.

What we wanted was remotely-hosted services that programs, not users, can access. This enables machine-to-machine (m2m) communication. Remote procedure calls would seem to be a natural choice for this but they also had some problems when used on the Internet outside of an organization’s LAN:

  1. Failure is a fact of life in distributed systems. RPC frameworks tried to hide it as much as possible. For example, the framework might eventually time out waiting for a response and then generate an exception. Programs needed more flexibility for handling failure. If you don’t get a rapid response from a server, perhaps you’d like to try sending the request to another server instead of waiting.

  2. Because of the convenience of “you don’t need to pick a port”, existing RPC solutions typically ran services over an arbitrary range of ports where the operating system selected an unused port and the service registered it with an RPC name server. This led to an administrative nightmare where an administrator could not set a firewall rule to allow or block a specific port, instead being forced to accept a wide range of traffic.

  3. Even though some RPC solutions were designed to support multiple languages and operating systems, most RPC systems were really deployed with a limited set of environments in mind. Sun RPC did not work on IBM’s flavor of UNIX and DCE RPC did not work on Sun or Linux systems. Microsoft’s services were difficult to use outside of the Microsoft ecosystem. Some cross-platform solutions, such as COBRA, were sold by multiple vendors and were not always interoperable.

  4. It turns out that we often need more than RPC’s request-response style of interaction. For example, we might want to implement a publish-subscribe interface where a client requests to be informed when a certain event takes place. The server will, at future times, send messages informing of these events. In many cases, we just want to send and receive messages but with structured data.

  5. RPC systems were designed with local area networks in mind. This meant that clients expected low latency to server. A high latency to remote, loaded servers could lead to excessive retries (which generates even more server load) as well as clients giving up and returning a failure because a response was too slow to arrive.

  6. Security was often not much of a consideration given that RPC frameworks were designed with trusted, local environments in mind. While various mechanisms were added, they were specific to a particular RPC framework.

  7. Finally, state management was somewhat of an issue. Although RPC does not require that servers store client state, a distributed object model makes this the norm. Large-scale deployments could get bogged down by the memory use of objects created to service requests that have not yet been garbage collected.

Web services are a set of protocols by which services can be published, discovered, and used over the Internet in a technology neutral form. This means that they are designed to be language and architecture independent. Applications will typically invoke multiple remote services across different systems, sometimes offered by different organizations.

The general principles of web services are:

  • Use text-based payloads, called documents, marshalling all data into formats such as XML or JSON. This ensures that the marshalling format does not favor a specific processor architecture or programming language. It also ensures that content-inspecting firewalls do not cause problems.

  • Use HTTP over TCP/IP for transport. This allows us to use existing infrastructure: web servers, firewalls, and load balancers. It also enables the use of security mechanisms that are built into the web framework, such as TLS (transport layer security) connections, and digital certificate-based authentication.

  • Tolerate high latency. Servers are likely not to be on a local area network and may be slow to respond either due to their own load or due to network latency. This means that, where possible, programmers should strive for asynchronous interactions: dispatch a request and see if you can do something else useful before you get the response.

  • Tolerate a large number of clients. Applications that interact with web services tend to be more loosely-coupled than those that use distributed objects (remote procedure calls). Services may be run by different organizations and servers cannot count on well-behaved clients or a small number of them. When possible, the ideal design is a stateless one where each client request contains all the necessary data and the server does not have to store any state between requests. Documents, the unit of message exchange in web services, tend to be self-describing. That is, they will identify and itemize all the parameters (explicit typing) and also describe any additional state needed for the interaction.

The use of web services leads to a programming model called Service Oriented Architecture (SOA). Under SOA, an application is the integration of network-accessible services where each service has a well-defined interface. These services, or components, are unassociated and loosely coupled. By unassociated we mean that neither service depends on the other one; they are all mutually independent. By loosely coupled we mean that neither service needs to know about the internal structure of other services. To maintain this independence, web services forgo object creation and the distributed garbage collection that goes along with it.

Functionally, you can do anything with web services that you can with distributed objects (RPC). The differences are generally philosophical. Web services focus on document exchange and are designed with high latency in mind. Document design is central to web services. Distributed objects tend to look at the world in a way where interfaces are the key parts of the design. The data structures (“documents”) passed in these interfaces just package the data for use by them.

XML RPC

XML-RPC was created in 1998 as a simple protocol that marshals all requests and responses into XML messages. It is essentially a marshalling protocol and the standard does not define an IDL or stub function generator. There are a lot of libraries to support XML RPC and some languages implement it more transparently than others. XML-RPC is just a messaging format. Nothing in the spec has support for remote objects, object references, or garbage collection. The protocol was designed when the dominant vision of web services was that of RPC-like interactions. This turned out not to always be the case. For example, one might want to implement the publish-subscribe model we mentioned earlier, where a client would subscribe to receive published notifications of specific events from a server.

SOAP

XML RPC took an evolutionary fork and, with the support of companies such as Microsoft and IBM, evolved into SOAP, the Simple Object Access Protocol. The acronym has since been deprecated since SOAP is neither simple nor confined to accessing objects. XML-RPC is a subset of SOAP. SOAP added extensible data types to the limited set available in XML-RPC. In addition to remote procedure calls, SOAP added support for general purpose messaging (sending, receiving, and asynchronous notification of messages). SOAP invocations are always XML messages that are usually sent via an HTTP protocol. However, HTTP transport is not a requirement; you can send a SOAP message via email and SMTP (Simple Mail Transport Protocol).

SOAP services can be described via a Web Services Description Language (WSDL) document. This is another XML document that essentially serves as an interface definition and defines all the names, operations, parameters, destination, and format of requests. WSDL is somewhat complex for human consumption. Typically, one creates an interface definition in a language such as Java and then uses a tool to translate that definition into a WSDL document. That WSDL document can then be fed to another tool (often by another programmer) to generate code that can be used to invoke remote functions or send remote messages.

<– In addition to WSDL, SOAP was also augmented with a directory service for storing and serving information about web services. The service is called UDDI, for Universal Description, Discovery, and Integration and uses SOAP to communicate, providing WSDL documents as a result. UDDI never really achieved widespread popularity or deployment. –>

JAX-WS: Java Web Services

As web services became popular, quite a few services to support them were created for the Java platform. One of the more popular ones, and supported by the Oracle (the owner of Java), is JAX-WS (Java API for XML Web Services). The goal of JAX-WS is to invoke Java web services using Java RMI. Unlike traditional Java RMI, interoperability is important since the remote side (client or server) may not necessarily use Java. JAX-WS uses SOAP and WSDL to achieve platform independence.

REST

REST (REpresentational State Transfer) is a departure from the approach of SOAP. Instead of using HTTP simply as a conduit for sending and receiving XML messages where everything you need is contained in the body, REST incorporates itself into the HTTP protocol. In particular, the URL incorporates the request and list of parameters. The protocol itself defines the core nature of the operation:

  • HTTP POST: create something
  • HTTP GET: read something
  • HTTP PUT: update/replace something
  • HTTP PATCH: update/modify something (sometimes redundant with PUT)_
  • HTTP DELETE: delete something

The body of the message will contain the document, which will be formatted data and not, as in the case of SOAP, a structure that also identifies the operations to be performed on the document.

When web services were first developed, the obvious marshalling format to use was XML. This was, roughly, what HTML used for describing the content of web pages (HTML was not particularly strict about proper structure). Its use was adopted for XML-RPC and SOAP and it remains heavily in use.

However, XML turned out to be a rather text-heavy protocol that was complex to parse. A lightweight alternative that gained much popularity is JSON (JavaScript Object Notation). Despite the JavaScript in the name, it was designed to be language-independent and easy to parse. It was touted as the “fat-free alternative to XML.”

Even more efficient is the use of Google Protocol Buffers. This is a binary marshalling protocol and is not always suited for web services over HTTP but is phenomenally efficient for local services and for storing serialized data (e.g., saving objects in a file system).

Clock synchronization

Goal: Enable clocks on multiple machines to be synchronized to the same time of day.

We would like our computers to have a knowledge of the current time of day. By this, we mean UTC time, Coordinated Universal Time (or Temps Universel Coordonné), formerly called Greenwich Mean Time1. This is the international standard time of day. From this, we can present time in the user’s local time zone and adjust for daylight saving time. Note that clients or users accessing a computer could be in a different time zone. We want a consistent time so we can make sense of timestamps on file data, mail messages, databases, and system logs. These timestamps can be used for software development environments (know what still needs to be compiled), versioning tools, and database queries.

Keeping track of time is difficult. No two clocks tick in perfect synchrony with each other. Quartz oscillators, which drive the timekeeping mechanisms of clock circuits, are not consistent over time and no two tick at exactly the same rate. The difference between two clocks at any given instant is the clock offset. The rate at which the clocks are drifting is the clock drift. The variation of network delay is called jitter. Jitter is useful for assessing the consistency of our interaction with servers.

We can correct for drift simply by changing the value of a system’s clock to reflect that of UTC time. However, we do not want to provide the illusion of time moving backward to any process that is observing the clock. A linear drift compensation function adjusts the rate at which time is measured on a computer (e.g., the number of ticks that make up a second).

Cristian’s algorithm sets the time on a client to the time returned by the server plus an offset that is one-half of the transit time between the request and response messages: Tclient = Tserver + ½(Treceived - Tsent).

It also allows one to compute the maximum error of the new time stamp. The error is ± ½[(round-trip time) - (best-case round-trip time)]. Errors are additive. If you incur an error of ±50 msec and the server’s clock source has an error of ±80 msec, your clock’s error is now ±130 msec.

The Berkeley algorithm does not assume the presence of a server with an accurate time (i.e., one that keeps track of UTC time). Instead, one system is chosen to act as a leader, or coordinator. The leader requests the time from all systems in the group (including itself) and computes a fault-tolerant average. A fault-tolerant average is an arithmetic average that is computed from the largest subset of time values that don’t differ from each other by some predefined quantity. It then sends each machine an offset by which to adjust its clock.

The Network Time Protocol, NTP, was created to allow a large set of machines to synchronize their clocks. A set of machines act as time servers. This collection of machines is known as the synchronization subnet. The subnet is hierarchical, with the time server’s stratum defined as the number of hops it is from a machine that synchronizes from a direct time source.

Machines that are directly connected to a time source (e.g., a GPS receiver) are at stratum 0. Machines that synchronize from a system at stratum 0 are at stratum one, and so on. A clock is synchronized essentially the same as with Cristian’s algorithm. The formula for NTP (and SNTP) is

time_offset = ½ (T2 - T1 + T3 - T4)

where

T1 is the time the message left the client,
T2 is the time it arrived at the server,
T3 is the time the response left the server,
and T4 is the time that it arrived at the client.

If we let TS = ½(T2+T3) then we arrive at Cristian’s formula.

NTP encourages clients to try synchronizing from several servers. For each server contacted, the protocol computes

Offset:
The difference between the client’s clock and the server’s. This is how much the client needs to offset its clock to set it to the server’s time.
Delay:
This is an estimate of the time spent sending or receiving the message. It is one half of the round-trip time minus the estimate of time spent on the server.
Jitter:
Jitter measures the variation in delay among multiple messages to the server. It gives us an idea of how consistent the latency is between the client and server.
Dispersion:
Dispersion is the estimated maximum error for the computed offset. It takes into account the root delay (total delay, not just to the server but the delay from the server to the ultimate time source), estimated server clock drift, and jitter.

After requesting time from multiple NTP servers, the client picks the server with the lowest dispersion. That is, it chooses the server from which it can get the most consistent and accurate time. If there is a tie, it then picks one with the lowest stratum; that is, one closest to the master time source. Note that this may result having a client synchronize from a higher-stratum server even if it can contact a lower-stratum one (one that is closer to the time source). This may happen if the client can access that server more reliably and with less delay and jitter than a “better” server.

The Simple Network Time Protocol, SNTP, is a subset of the NTP protocol designed for use by systems that need to set their clock from a server but will not provide clock synchronization services. With SNTP, clients contact a single NTP server rather than choosing the best possible server from a list of known NTP servers. It simply applies Cristian’s algorithm and does not deal with computing the dispersion or delay. Most PCs use SNTP.

The Precision Time Protocol (PTP, an IEEE standard), was designed with different goals than NTP. While NTP was designed for synchronizing time with machines across the Internet and remote servers, PTP was designed for LANs: networks with low jitter and low latency. PTP is designed to achieve sub-microsecond precision among cooperating machines.

PTP is useful for precise synchronization that is needed in some industrial process control equipment, in high-frequency trading, power grid controls, and systems that need to synchronize audio and video streams. A version of PTP [White Rabbit] is used to synchronize computers in the Large Hadron Collider to within a few picoseconds. To achieve exceptionally high precision, PTP is often implemented in the network hardware to completely avoid the overhead of the operating system and process scheduling.

Clock synchronization is initiated by a PTP master (unlike NTP, where the client host initiates the sync). The master announces its time to clients via a sync message. If a client is then interested in synchronizing its clock, it must communicate back to the server in order to discern the delay in communication. The client sends a delay request message and notes the time it was sent. The server sends back a delay response message containing the time of arrival of the delay request message. With this data, and the assumption that uplink and downlink latency is the same, the client can compute the server’s timestamp with an adjustment for the network transit delay.

Logical clocks

Goal: Allow processes on different systems to identify and causal relationships and their ordering among events, particularly among messages between different systems.

Lamport clocks allow one to assign sequence numbers (“timestamps”) to messages and other events so that all cooperating processes can agree on the order of related events. There is no assumption of a central time source and no concept of total ordering (when events took place). The central concept with logical clocks is the happened-before relation: a→b represents that event a occurred before event b. This order is imposed upon consecutive events at a process and also upon a message being sent before it is received at another process. Beyond that, we can use the transitive property of the relationship to determine causality: if a→b and b→c then a→c. If there is no causal relationship between two events (e.g., they occur on different processes that do not exchange messages or have not yet exchanged messages), the events are concurrent.

Lamport’s algorithm states that every event is timestamped (assigned a sequence number) and each message carries a timestamp of the sender’s clock (sequence number). A message comprises two events: (1) at the sender, we have the event of sending the message and (2) at the receiver, we have the event of receiving the message. The clock is a process-wide counter (e.g., a global variable) and is always incremented before each event. When a message arrives, if the receiver’s clock is less than or equal to the message’s timestamp, the clock is set to the message timestamp + 1. This ensures that the timestamp marking the event of a received message will always be greater than the timestamp of that sent message.

One result of Lamport timestamps is that multiple events on different processes may all be tagged with the same timestamp. We can force each timestamp to be unique by suffixing it with a globally unique process number. While these new timestamps will not relate to real time ordering, each will be a unique number that can be used for consistent comparisons of timestamps among multiple processes (e.g., if we need to make a decision on who gets to access a resource based on comparing two timestamps).

A second deficiency with Lamport timestamps is that, by looking at timestamps, one cannot determine whether there is a causal relationship between two events. For example, just because event a has a timestamp of 5 and event b has a timestamp of 6, it does not imply that event a happened before event b.

Vector clocks

A way to create timestamps that allow us to discern causal relationships is to use a vector clock. A vector clock is no longer a single value but rather a vector of numbers, with each element corresponding to a process. The rules are as follows:

  1. Before affixing a vector timestamp to an event, a process increments the element of its local vector that corresponds to its position in the (for example, process 0 increments element 0 of its vector; process 1 increments element 1 of its vector).

  2. When a process receives a message, it also first increments its element of the vector (i.e., it applies the previous rule). it then sets the vector of the received event to a set of values that contains the higher of two values when doing and element-by-element comparison of the original event’s vector and the vector received in the message.

This new vector timestamp becomes the new per-process vector from which future timestamps for events on that process will be generated. Think of a vector timestamp as a set of version numbers, each representing a different author.

For example, in an environment of four processors, P0, … P3, P1 will “own” the second element of the vector. If one event on P1 is (2, 4, 0, 1) then the next event will be (2, 5, 0, 1). If the event after that is the receipt of a message with a timestamp of (3, 2, 9, 8) then the timestamp will be created by setting an element-by-element maximum of (2, 6, 0, 1) and (3, 2, 9, 8), which is (3, 6, 9, 8). We can illustrate this with the following pseudocode where e is the system’s vector counter and r is the received vector timestamp:

/* receive message with vector timestamp r */
e[myproc]++;	/* increment our process' index */
for (i=0; i < num_procs; i++) {
	if (e[i] < r[i])
		e[i] = r[i];
}

Two events are concurrent if one vector timestamp is neither greater than or equal nor less than or equal to the other element when doing an element-by-element comparison. For example, events (2, 4, 6, 8) and (3, 4, 7, 9) are not concurrent (i.e., they are causally related) because every element of the first vector is less than or equal to the corresponding element of the second vector. The vectors (2, 4, 6, 8) and (1, 5, 4, 9) represent concurrent events. Neither vector is less than or equal to the other. For instance, 2 ≥ 1 (first element of the first vector is greater than the first element of the second vector) but 4 ≤ 5 (second element of the first vector is less than the second element of the second vector).

Note that in an environment where the overall set of processes is not known by each process, a vector timestamp may consist of a set of <process, number> tuples. In such a case, the timestamping and comparison process is exactly the same: numbers are compared with those of matching processors and any missing process is treated as having a value of 0 in that vector.

Group communication

Goal: Provide mechanisms for a process to send a message to a group of processes instead of to one process. Consider the issues of reliability and message ordering.

Point-to-point communication is known as unicast. This is what we generally use to communicate a single client and server. There are other modes of communication. Broadcast is the sending of data to every host on the network. Anycast is point-to-point communication (as in unicast) but the receiver is the nearest one of receivers with the same address (for example, IPv6 uses this to allow a host to update the routing table of the nearest host). Anycast is a special purpose form of unicast that will will not discuss here.

An alternative to point-to-point communication is group communication, or point-to-multipoint messaging. Group communication is known as multicast and provides the abstraction of allowing a process to send a single message that is delivered to all group members. True multicast is handled by the network and offers the efficiency of avoiding replicated messages. The sender sends a single message on a special multicast address. Interested parties listen on that address and the network takes care of the delivery. If multicast is not available, it can be simulated by invoking multiple unicasts, one to each recipient.

There are two considerations in group communication (multicast): reliability and message ordering.

Message receipt versus message delivery

We talk about a process receiving a message, which means that it is received from the network and handled by a multicast receiving algorithm. This algorithm decides when to deliver the message to the application logic.

Since receivers cannot control the order in which messages are received over the network, a layer of software is responsible for transmitting a multicast message and, at the receiver, receiving it and deciding when and if to make it available to the application (i.e., deliver it). When a message is received, the multicast receiving algorithm may take one of three actions:

  1. Discard the message. This may be done if the receiver is no longer a member of the group or if the message is a duplicate.

  2. Deliver the message. The message is placed into a FIFO (first-in, first-out) queue from which the application reads incoming multicast messages.

  3. Hold the message. The message is not ready to be delivered to the application. Most likely, this is because it has not arrived in the expected order and the algorithm must first receive an earlier message. Alternatively, it may need to be held to get feedback that all other members have received it before passing it to the application. In cases like this, the message is placed on a hold-back queue. When the next message is received, the algorithm may check the hold-back queue to determine whether any held messages can now be delivered or discarded.

Reliability

An atomic multicast requires that a message must reach all group members (if one host cannot get the message, no others can process it). This multicast must survive machines going down – both the sender and/or receivers. If a recipient is not sure that a message has been received by all group members, it cannot deliver it to the application. Because of this, it requires the most overhead in implementation, often employing persistent logs.

A reliable multicast is a multicast where the sender tries its best to ensure that messages get to group members. The sender sends a message and waits for an acknowledgment. If it doesn’t receive the acknowledgment in time, it will retransmit the message. It will try this again and again until, eventually, after a longer interval of time, it will give up and assume the receiver is dead.

An unreliable multicast doesn’t wait for acknowledgments and will generally use whatever underlying multicast mechanism is provided.

Message ordering

The issue in message ordering is that multiple hosts can be sending messages to the entire group at the same time. Will each host receive all the messages in the same order? Will each host receive all the messages in the order they were sent?

Global time ordering requires that all messages arrive in the exact order they were sent: if host A sent a message 5 nanoseconds before host B did then all hosts should receive the message from host A first. This is impossible to implement (clocks can’t be that perfectly synchronized and the chance of a tie is always present; also, networks will not be able to guarantee this ordering if routing is involved).

A more practical approach is total ordering. This requires that all messages arrive in the same order at all hosts. This can be easily achieved by providing a group-wide mechanism for obtaining a totally sequenced message ID: for example, from a sequence number server.

Causal ordering means that messages that are causally related (according to Lamport’s happened before definition) will be delivered in order to all group members. Concurrent messages may be delivered in any order.

One way of implementing causal ordering is by having each process keep a precedence vector and sending it along with every message that is sent to the group. A precedence vector is similar to a vector timestamp with the exception that an event counter is not incremented for received messages. The vector applies only to events that relate to sending messages to a specific group. Each entry in the precedence vector represents the latest message sequence number that the corresponding process (group member) knows about.

When a process Psender sends a message, it increments the element of the vector that corresponds to its own entry:

Vsender[sender] = Vsender[sender] + 1

This vector is sent along with the message.

When a process Preceiver receives a message from Psender, it checks two conditions:

(1) The message must be the very next message from Psender. That is, the value of the sequence number in the received vector that corresponds to Preceiver must be exactly one greater than the one we have for that process in our vector:

(Vsender[i] == Vreceiver[i] + 1) ?

This tells us that we are receiving messages in sequence from Psender. If the value is more than one greater, it means that we missed one or more messages from that sender. This message will have to go on the hold-back queue until the earlier messages arrive.

(2) The message should not be causally dependent on another message that the receiver has not yet seen. This means that every other element of the vector has to be less than or equal to the corresponding element of the vector at the receiver.

∀i, i ≠ sender: (Vsender[i] ≤ Vreceiver[i]) ?

If the vector from the sender contains some sequnce number that is greater than a corresponding sequence number in the receiver’s vector, it means that the sending process has seen a message from some other process that the receiver has not yet processed. Since the precedence vector is used for group communication and the reciver is part of the group, the receiver will need to wait for that message to come.

If both conditions are satisfied, then the received message can be delivered to the application immediately. Otherwise, it is placed in the hold-back queue until the conditions can be satisfied. Causal ordering has the advantage that there is no need for a global sequencer as in total ordering. Note that the precedence vector technique requires reliable message delivery. Otherwise, the receiver will not know if a message was lost or if a message has just not yet arrived.

Sync ordering requires a special type of message — a sync message. When this is issued, any messages that were already sent have to be processed (and acknowledged to the senders). The sync assures that any messages sent before the sync will be processed before any messages after the sync (globally – for all hosts).

SSFIFO (single source first in, first out) ordering on messages ensures that messages from each source are delivered in order but messages from multiple sources may be interleaved in any order at the receiver. For example, if host A sends messages m1, m2, m3 and host B sends messages n1, n2, n3, it is valid for host C to receive the sequence m1, m2, m3, n1, n2, n3 and for host D to receive the sequence m1, n1, n2, m2, n3, m3.

Finally, an unordered multicast doesn’t impose any message ordering among messages from other hosts.

IP multicast

IP multicasting is designed, like IP, to span multiple physical networks. Membership is dynamic: a machine can join or leave a multicast group at any time. Moreover, there is no central coordinator and no restriction on the number of hosts that can be in a group. Multicasting provides network efficiency. Packets in a multicast stream only need to be replicated when a router needs to send them to multiple network links. Only one stream of packets is needed on any network segment regardless of the number of receivers.

An IP multicast address (also known as a class D address) is an IP address that starts with 1110 and contains a 28-bit multicast address. A host may join this address and receive messages addressed to that multicast ID. Within a LAN, an IP class D address is mapped onto an Ethernet multicast address by copying the least-significant 23 bits of the address onto an Ethernet multicast address. Within a LAN, an ethernet chip is programmed to to perform an exact match on a small set of addresses or to accept addresses that hash to particular values. The ethernet driver will need to remove any unneeded addresses that pass through. The ethernet chip can also be set to multicast promiscuous mode, where it will accept all multicast ethernet packets.

Routers have to get involved to support multicasting beyond the local area network. Two protocols are used to implement multicasting. The Internet Group Management Protocol (IGMP) is designed for hosts to inform the routers on their LAN that they are interested in joining a multicast group. The Protocol Independent Multicast (PIM) protocol enables routers to tell their neighboring routers that they are, or are no longer, interested in receiving packets for a particular multicast group.

A host uses IGMP to send a multicast join message (also known a a membership report) to join a specific group. A multicast-aware router will get this message and now know that the the link on which the message arrived needs to receive any packets addressed to that multicast group.

Periodically, a router will send a membership query message to all hosts on the LAN. If any node is still interested in the group, it must re-send a join message. In version 1 of the protocol, if no join messages are received, then the router would stop responding to join messages and the LAN will no longer receive packets for that group. With IGMP v2, a leave message was added to avoid having to wait for the timeout to realize that nobody is interested in a group. That avoids needlessly sending multicast traffic on a LAN where no hosts are interested in receiving the messages anymore.

A lingering problem was that multicast IP uses no centralized coordinator and anyone can send multicast messages to any multicast addresses. IGMP v3 adds the ability for a host that joins a multicast group to specify the source address (originator) of the multicast. The router will then not forward packets originating from unwanted source addresses onto the LAN.

IGMP allows edge routers (those routers connected to LANs) to be told what multicast groups the nodes on its connected LANs are interested in receiving PIM, Protocol Independent Multicast, is responsible for conveying multicast membership information among routers within the wide area Internet. It assumes the presence of other protocols to know the network topology and which routers are connected together. There are two basic approaches to multicasting on the WAN (wide-area network): dense mode (flooding) and sparse-mode multicast.

Dense Mode multicast, also known as flooding, originates from the multicast sender. The message is duplicated and sent to all connected routers. Each of those routers, in turn, duplicates and sends the message to all of its connected routers, and so on. To avoid routing loops, each router uses reverse path forwarding (RFP). A received packet is forwarded only if it was received via the link that the router knows is the shortest path back to the sender (it finds this by checking its forwarding table, which is what it would use if it was sending a packet to that address). PIM Dense Mode floods the entire network of connected multicast-aware routers.

If an edge router receives this multicast packet and is not interested the data stream (i.e., it has not received IGMP join messages), it will send a prune message to the router that delivered that packet. If that router receives prune messages from all interfaces, it will in turn send a prune message to the router that is sending it the multicast messages. A router sends prune messages if it is getting redundant traffic from another link or if its downstream router or LAN connections are not interested in the stream. If a node on a LAN joins a multicast group at a later time, sending an IGMP message to a router, that router would then send a PIM Graft message to its connected routers to state interest in the stream. Dense mode only makes sense when there are receivers spread through most locations covered my multicast-aware routers. It is rarely used.

In contradistinction to Dense Mode, PIM Sparse Mode starts with requests from multicast receivers rather than flooding the network with traffic from the sender. Each router must send a Join message to its connected routers in order to request multicast traffic (and a Prune message when it no longer is). This causes multicast packets to only go to the routers where it is needed. The trick to getting this to work is that a multicast group must be associated with a router known as a rendezvous point (RP). The RP acts as a central point that senders know how to contact to register that they are transmitting multicast streams and for receivers to contact to join multicast streams. Join messages initially are routed to the RP to avoid flooding the network. From there, they are routed to participating routers – routers that expressed an interest in that multicast group.

A sender that transmits multicast packets will simply have those packets routed only to the rendezvous point (this is effectively a unicast stream). The RP then multicasts the packet to any routers from which it has received Join messages.

To ensure that the RP does not become a bottleneck, after a the aggregate bandwidth exceeds a defined threshold, routers closer to the receiver will try to join routers that are more directly connected to the source since they have seen the multicast traffic and know the source address.

Sparse mode is ideal when the receivers are concentrated among a few network segments.

State Machine Replication and Virtual Synchrony

Goal: Create a software framework that gives everyone the same view of live group members and provides atomic multicasting of messages to all of these members. There should never be a case where only a subset of a group receives a message.

State Machine Replication

We often deploy distributed systems in an effort to achieve both high availability and high scalability. Availability refers to the fraction of time that the overall system is up and running, capable of doing whatever it is supposed to do. A highly availabile services means the service is designed to survive computer or network failures.

Scalability refers to the ability to handle an increasing number of requests by adding more servers to the system. Both of these goals are often achieved via redundancy; by adding replicated components to the system. These replicas can step in immediately when some components break, thus helping with availability.

Active-passive replication means the replicated systems do not process requests when the main system is up but are standing by in case it fails. Google’s Chubby lock manager/file system (we will study it later in the course) is an example of this, where requests must always go to the master. Any changes at the master get propagated (replicated) to the replicas. With active-passive systems, we get fault tolerance by having standby systems but achieve no scalability of performance since one process has to handle all requests.

Active-active replication means that the workload is distributed among the set of replicas, thus helping with scalability (each component only handles a fraction of the workload) as well as availability.

Faults

Faults may be fail-silent or byzantine. With fail-silent faults, the faulty process is dead: it does not accept or transmit communication. With byzantine faults, a process is running but producing erroneous data. Byzantine faults are more difficult to handle and, for our discussion, we will assume that our processes can detect faulty messages (e.g., checksums, digital signatures, encryption, and time stamps will be used and we’ll examine these techniques in a future lecture) and disregard them. With fail-silent faults, a process may remain dead. In that case, it exhibits fail-stop behavior. If a failed process recovers, it exhibits fail-recover behavior. Recovery is good, of course, but we need to realize that the process has not received any updates to the state of the group during the time it was dead.

The two-armies problem (also known as the two generals' problem) demonstrates that reliable communication can never be achieved with asynchronous and possibly unreliable communication lines. The story goes like this:

The generals of two armies, A & B, need to decide whether to attack the enemy. If both attack, they win. If only one attacks, it will die. A sends a messenger to B: “let’s attack in the morning”. But A needs to know that B received the message so it asks for an acknowledgement as a return message. If the messenger bearing the acknowledgement does not arrive, A does not know if the return messenger didn’t make it and B got the message or if B never received the message. If A receives a response, B isn’t sure if the messenger made it to A or not. B can ask for an acknowledgement to the acknowledgement but then A will not be sure if the second acknowledgement made it to B. We can do this indefinitely…

In asynchronous networks, such as the IP network, there is no time limit by which one can reliably expect a packet to arrive. The two-army problem shows us that we can never achieve 100% certainty that a process is truly dead (fail-silent) just because we do not hear from it. This is something we have to live with. Because of this, one process cannot reliably detect that another one has failed. Instead, a process may suspect the another process has failed and share that knowledge with all other members of the group, taking the “failed” process out of the group even if it is actually alive. Virtual synchrony deals with managing this type of group-wide list of active group members. As with active-passive systems, changes at one process must be propagated to all others. However, consistency becomes a consideration: if all replicas do not get the updates at the same time (atomically), then there is a risk that clients may read inconsistent or old data depending on which server handles their requests.

For both scalability and high availability, all replicas should share the same software and same data. This data reflects the state of the system. It would be confusing if data in a file sometimes looks different because it is accessed from different servers that received write requests in a different order, or if only some systems were told that a lock has been released.

To accomplish this synchronization among replicas, processes across different systems must see the same sequence of events (events that modify the state of the system). These events can include lease/lock information, file or object writes, or any inputs into a process that is running on a machine.

Virtual synchrony

Virtual synchrony is a technique that provides fault-tolerant totally ordered atomic multicasts among a group. Virtual synchrony gives applications the abstraction of multicasting a message to a group of processes and ensuring that all data is replicated to all processes in the group in such a manner that it is indistinguishable from using a single non-faulty system. This involves managing group membership and ensuring that causal dependencies on messages from multiple systems are taken into account.

Virtual synchrony focuses on process groups. Each process group is a collection of processes. When a process sends a message to the group, it must be received by all current group members. Processes may join groups, leave groups, and send messages to groups. Processes that send messages to the group may often, but not necessarily, be outside of the group (e.g., think of systems communicating with a fault tolerant server). The framework ensures that there will never be an incomplete delivery of a multicast and that all group members receive multicast messages in the same order. This ensures that all group members can keep their state replicated, which is crucial for fault-tolerant and load-balanced servers where any one server may stand in for another one.

A group view is the set of processes that are currently in a given process group. Every multicast is associated with a group view and this group view must be identical for every process that will send multicasts to the group.

The virtual synchrony framework tracks group membership. Over time, the membership of a process group may change. Processes may be added and processes might leave (e.g., a computer may crash). This change of membership is called a view change. Any processes in the system should be made aware of this change since future multicasts will have to be sent to the new group view. Message delivery should not span a view change. That is, if a view change takes place while a message was being multicast, that message should be delivered to all the processes in the group before the view change message is delivered to any member.

Although our discussion is general, we will focus on the Isis framework, which is deployed in places such as the New York Stock Exchange and the Swiss Exchange. Similar services are also used by IBM’s Distribution and Consistency Services, Microsoft’s Cluster Service, CORBA remote objects, and many others.

Group Membership Service

A Group Membership Service (GMS) keeps track of which processes are in the group. If a process, p, reports another process, q, as faulty, the GMS will take the faulty process out of the group and tell every process with a connection to q that q is no longer in the group. With asynchronous networks, we cannot be 100% sure that the process was really faulty; it might have just been slow to respond. However, if any process suspects that process is dead, it reports it as such, the GMS takes it out of the process group, and nobody will send messages to it. Once it is taken out of the group, it will have to rejoin at a later time just as if was a new process.

When a process joins a group, it first needs to import the current state of the synchronized group. This is called a state transfer. The process contacts an existing member and transfers the state to initialize itself to from its most recent checkpoint. During this time, it is not processing any requests. After the state transfer, it is part of the group.

Reliability and message ordering

An enhanced version of Isis, Isis2, supports four types of multicast protocols depending on the order of delivery that is needed: unordered, causally ordered, total ordered, and sync ordered (barriers). Isis2 also includes a SafeSend API that is a full Paxos implementation, which is a relatively recent addition to the software (we will look at Paxos, a consensus protocol, in a future lecture).

Systems can think of the process group and virtual synchrony as one abstract entity. Isis uses reliable TCP point-to-point links and the client library iterates through group members to deliver the message to each member.

Upon receiving a message, a receiver adds the message to its queue but does not yet deliver to the application. At this point, the message is marked unstable at the receiver.

The receiver then sends an acknowledgment to the sender. When the sender receives acknowledgments from all members, it sends a confirmation to all processes in the group, who then mark the message as stable and deliver it to the application. To improve performance, not every message is actually acknowledged. A receiver may collect a set of messages, each with a per-source sequence number and periodically send a cumulative acknowledgment containing the highest sequence number it received. Likewise, the sender can send a cumulative confirmation with a sequence number that tells each receiver that it is safe to deliver all messages with a lower sequence number.

If a multicast sender dies before it is able to confirm delivery of messages to the entire group, these messages will remain in an unstable state at the receivers and are not delivered to applications. When the death of the sender is eventually detected (either by the Group Membership Service or by a process that then alerted the GMS) or a new member joins, the GMS instantiates a view change since the group membership has now changed. The view change will ensure that unstable messages are received by all current group members, thus enforcing atomic multicasting (the all or none property).

Processing a view change

If some process P receives a view change message because another process wants to join the group or a process has left the group, P will forward a copy of any unstable messages to every process in the group. This will ensure that the messages will become stable and all current group members will get to deliver them to their applications. Process P then needs to be sure that all group members are ready and have no unstable message of their own. To do this, P sends a flush message to the group. Upon receiving a flush message, a process will do the same thing that P did: multicast all unstable messages (if it has not done so yet) and then acknowledge the flush. When a process has received a flush message from every other process in the group, it knows that there are no unstable messages left in the system and can switch to the new group view. We thus achieve atomicity of multicasts. The flush is the implementation of a barrier: all outstanding messages must be delivered to applications before the barrier allows new message traffic to continue.

Mutual exclusion

Goal: Allow processes to request and be granted exclusive access to named resources. The algorithms should be designed to avoid starvation and ensure that exactly one requesting process gets the resource even if multiple processes make requests concurrently.

Mutual exclusion is responsible for making sure that only one process or thread accesses a resource at a time. Processes do this by requesting a lock on a resource. In operating systems, it is implemented through mechanisms such as semaphores and monitors and, at a low level, through instructions such as test-and-set locks. None of these mechanisms work across networks of computers. We need an algorithm that will allow processes to compete for access to a resource and ensure that at most one process will be granted access. The algorithm must be fair: it must ensure that all processes that want a resource will eventually get a chance to get it. If this is not the case, starvation occurs, where a process will have to wait indefinitely for a resource.

The “resource” is an arbitrary name that all processes agree to use. It may refer to a critical section of code, a file, a field in a database, a physical device, or some network service. A mutual exclusion algorithm allows a process to request access to this resource and be granted exclusive access to it. This is known as a lock. If the lock has a timeout (an expiration) associated with it, then it is known as a lease.

There are three basic properties that we expect from a mutual exclusion algorithm are:

  1. Safety: This simply means the algorithm works and only one process can hold a resource at a time.

  2. Liveness: The algorithm should make progress and not wait forever for a message that will never arrive.

  3. Fairness: Every process that wants a resource should get a fair chance to get it. For example, two processes cannot continuously hand the resource back and forth to each other, disallowing a third from being able to get it.

Distributed algorithms fall into three categories. A centralized approach uses a central coordinator that is responsible for granting access permissions.

A token-based approach allows a process to access a resource if it is holding a “token”; that is, it received an unsolicited message where ownership of the message allows the process to access the resource. Sending the message (token) means forfeiting access.

A contention-based algorithm is one where all processes coordinate together on deciding who gets access to the resource.

The centralized algorithm runs on a single server that accepts REQUEST messages for a resource. If nobody is using the resource, it responds with a GRANT message and places the request on a FIFO (first-in, first-out) queue of requests for that resource. If somebody is using the resource, that the server simply does not respond but also places the request on the queue of requests. When a client is done with a resource, it sends the server a RELEASE message. The server removes the process' ID from the queue and sends a GRANT message to the next client in the queue (if there is one).

The token ring algorithm creates a logical communication ring among the processes in the group, where each process communicates with a single neighbor. A message, called a token, is created for each resource. This token is passed from process to process along the ring. If a process receives a token and does not need to access that resource, it simply passes the token to the next process. Otherwise, it will hold on to the token until it is done with the resource.

Lamport’s mutual exclusion algorithm requires each process that wants a resource to send a timestamped request for the resource to all processes in the group, including itself. Receipt of each message is immediately acknowledged by every process. Each process places the received message in a local priority queue that is sorted by the Lamport timestamp that is in each message (or any totally unique timestamp). A process decides whether it can access the resource by checking whether its own request is the earliest (first) one in the queue of all requests that it has received. If so, it accesses the resource and, when done, sends a release message to all members (including itself). The receipt of a release message causes each process to remove that process ID from the ordered queue. If a process now finds itself as the earliest process in the queue, it – and everyone else – knows that it is now its turn to access the resource.

Lamport’s mutual exclusion algorithm summary: send a request message to everyone in the group, including yourself. Get an acknowledgement from everyone. Each process stores request messages in a priority queue sorted by timestamp. The process ID at the head of the queue can access the resource. When it is done, it sends a release message to everyone.

The Ricart & Agrawala algorithm, like Lamport’s, is also based on using reliable multicasts. A process that wants to access a resource sends a request to all other processes in the group and waits for all of them to respond. As with Lamport’s mutual exclusion algorithm, the request message contains a unique Lamport timestamp. If another process is currently using the resource, that process delays its response until it is done. If process A and process B sent out requests concurrently (i.e., two systems want the same resource at approximately the same time), each of those systems compares the timestamp of that request with that of its own request. If process A received a request from process B that is older (a lower timestamp) than the one it sent, then process A will give process B priority to access the resource by sending a response to process B . Otherwise, if process A has the earlier timestamp, it will queue the request from B and continue to wait for all acknowledgements, sending a response to B (and any other processes who wanted the resource) only when it is done using the resource. The key point is that processes A and B will make the same comparison and exactly one will hold back on sending the response.

Ricart & Agrawala’s algorithm summary: send a request message to everyone in the group and wait for acknowledgements from everyone. Any process using the resource will delay sending an acknowledgement.

The Lamport and Ricart & Agrawala algorithms are similar in that they are both contention based and truly distributed. Ricart & Agrawala’s requires fewer messages since there is no explicit release message that needs to be sent; acknowledgements are simply delayed. Neither are efficient compared with the centralized algorithm.

Election algorithms

Goal: Allow all processes in a group to elect exactly one of the processes in the group. All processes should agree to elect the same process.

Election algorithms are designed to allow a collection of processes to agree upon a single coordinator. All candidate processes are considered to be equally capable of acting as a coordinator. The task is to select exactly one of these processes — and make sure that every process is aware of the selection. In designing these algorithms, we assume that every process has a unique ID (for example, a process ID combined with the machine’s address). The algorithms we examine in this section will choose a winner (a coordinator) by selecting either the highest available process ID or the lowest one. It is important that whatever criteria is used will never result in one process choosing a different winner than another even multiple concurrent elections are run.

The bully algorithm selects the largest process ID as the coordinator. If a process detects a dead coordinator, it sends an ELECTION message to all processes with a higher ID number and waits for any replies. If it gets none within a certain time, it announces itself as a coordinator. When a process receives an ELECTION message it immediately sends a response back to the requestor (so the requestor won’t become the coordinator) and holds an election to see if there are any higher-numbered processes that will respond. If not, then it declares itself as coordinator.

The ring election algorithm requires a logical communication ring among the processes in the group (as we had in the token ring mutual exclusion algorithm). When a process detects a non-responding coordinator, it creates an ELECTION message containing its own process ID and sends it to the next process in the ring. If the process is dead, then it sends the message to the following process. This sequence continues, with each process sending an ELECTION message to the next process in the ring, skipping dead processes until it finds a process that can receive the message. When a process receives an ELECTION message, it adds its own process ID to the body and sends that message out to its neighboring process. When the election message circulates back to the original sender (the sender detects this by seeing its process ID at the head of the list, indicating that it started the election), the sender looks at the list of active processes in the message body and chooses a coordinator from among them. Since multiple elections may have been started concurrently, the algorithm for choosing the leader must yield the same result among the different list. Hence, selecting the highest or the lowest process ID will work. Selecting the first or last process in the list will not.

The Chang and Roberts ring algorithm improves on the ring algorithm in two ways. First, there is no need to keep the entire list of live processes in the election messages; we can perform the election as we go along. If the goal is to vote for the highest-numbered live process ID and a process receives an election message, it does one of two things: (1) passes it untouched if its process ID is smaller than the one in the message, or (2) replaces the process ID in the message with its own if the process ID is greater than the one in the message. When a process receives a message that contains its own process ID, it knows that the message has come full circle and it is the winner. It will then notify each process of the outcome.

The second optimization is to avoid having multiple election messages circulate if multiple processes have initiated elections concurrently. To do this, a process keeps track if it has participated in an election (i.e., has initiated or forwarded an election message). If a process receives a message with a smaller process ID than its own and it is a participant in an election, it simply discards the message. If, on the other hand, the received message contains a greater process ID than its own, it will forward the message even if it is a participant in an election since the message contains a candidate that takes priority over the previous one that was forwarded. Once election results are announced, each process clears its status of participating in an election. There is still the opportunity to have multiple elections circulate (no harm done) but they are aborted when it makes sense to do so.

One problem with election algorithms is when a network gets partitioned (also known as segmented): one set of processes is separated from another and cannot communicate with the other. In this case, each segment may elect its own coordinator and problems can arise. This is known as a split brain situation. To combat this, a redundant network is needed or some alternate communication mechanism to enquire about the state of remote processes.

Another approach that is often taken is to require a majority of prospective coordinators to be available. A quorum is the minimum number of processes that need to be available to participate in an operation. In this case, the operation is an election and we need a quorum of over 50% of the processes. If a network is partitioned into two or more segments, at most one of the partitions will have over 50% of the processes. Those processes that do not will refuse to elect a coordinator.

Raft Consensus

Goal: Create a fault-tolerant distributed algorithm that enables a group of processes to agree on a single value.

The goal of a consensus algorithm is to have a group of processes agree on a common value. The value must be one of the values that was proposed by at least one of the processes. Once the processes agree on the value, it is considered final.

Replicated state machines

The most common use of consensus is to create create fault-tolerant systems by creating replicated state machines. fault-tolerant systems. Processes send messages (commands, such as data updates) to a group of replicated servers. These messages are received by each of the servers and added to a log on that server. If one process wants to set name=value_1 and, concurrently, another process tries to set name=value_2, we would like every instance of our database to consistent. Every copy should have either name=value_1 or name=value_2. It is not acceptable to have some instances have name=value_1 while others have name=value_2.

This log must be consistent across all servers so that when the commands in the log are applied to the state machine (i.e., the processes on the servers read their logs and apply the commands) they are all processed in the same sequence on each server. Consensus enables all processes to agree on the order in which commands are added to the replicated logs.

Another, simpler, use of consensus is running an election: each process may propose itself as a contender and the consensus algorithm will pick one winner. Yet another example is mutual exclusion: agreement on who gets a lock on a resource, although software must be built around this to manage a queue of outstanding requests.

Achieving consensus is easy if processes don’t fail or we are willing to wait for all processes to recover if they die (as with the two-phase commit protocol that we will examine later). The challenge with a consensus algorithm is to achieve consensus in the presence of faults.

A consensus algorithm must satisfy four goals:

Validity
The outcome must be one of the proposed values.
Uniform agreement
No two processes may ultimately agree on different values.
Integrity
A process must ultimately agree on a single value. It cannot change its mind later.
Progress (termination)
The algorithm must eventually terminate such that every process will decide on the same value.

Raft

Raft is a fault-tolerant distributed consensus algorithm. It was created as an easier-to-understand alternative to Paxos (the first provably correct distributed consensus protocol and was designed specifically to make it easy to implement replicated logs, something that was not direcly achievable with Paxos. It can be easily used to create agreement among participants on a globally consistent (total) order for client messages. These messages can be commands to update the state of a system or data in a file. By creating a consistent order of these messages among all servers, we ensure that each of them applies changes in the same order and hence each server is an identical replica of each other.

Raft achieves consensus as long as a a majority of servers running the protocol are functioning and accessible. It also requires a single elected leader to receive all client requests. Any server can act as a leader and an election takes place to select a leader.

A server can be in a follower, leader, or candidate state. If it is the leader then it receives all client requests. If it is the follower then it is a non-leader and the system has a leader. Followers only accept messages from leaders and not from clients. If it is a candidate then we are holding an election to pick a leader and the server may become a leader.

Raft elections

Leaders send periodic heartbeats to all followers. Servers start in a follower state. A server remains in a follower state as long as it receives valid heartbeat messages from a leader or a candidate. If it receives no heartbeat within a specific time interval, the follower may need to become a leader, so it becomes a candidate and runs an election by requesting votes from other servers. If a candidate receives a majority of votes from the other servers, it becomes the leader.

To start an election, a follower waits a random time interval before it makes itself a candidate. It then votes for itself and sends a request vote RPC message to all other servers. If a server receives a request vote message and has not voted yet, it then votes for that candidate. When a candidate gets a majority of votes, it becomes the leader.

Each election marks a new term, identified by an incrementing number.

If a candidate receives a heartbeat from another server and that leader’s term number is at least as large as the candidate’s current term, then the candidate recognizes the leader as legitimate and becomes a follower.

It’s possible that multiple candidates receive votes but none receives a majority of votes. This indicates a split vote situation. On the chance that a candidate does not win or lose an election, it simply times out and starts a new election. Randomized timeouts ensure that split votes rarely occur since it’s unlikely that multiple candidates will start their elections at the same time: one candidate will usually have the advantage of requesting votes before others get a chance to do so.

Raft log replication

The elected leader accepts client requests. Each message sent by clients becomes a log entry that will be replicated among the servers. The leader first adds the message to its own log. At this point, the entry is considered to be uncommitted. The leader then sends the message to the other servers (the followers). Each follower acknowledges the message. When a majority of followers have acknowledged the message, the entry is applied to the state machine and considered to be committed. The leader then notifies the followers that the entry has been committed and they apply the message to their state machines. Consensus has now been achieved.

Chubby

Goal: Create a highly-available centralized lease manager and file system for small files that can serve as a name server and configuration repository.

Chubby is a highly available and persistent distributed lock service and file system created by Google and used in various projects and frameworks, including Bigtable and MapReduce. Its purpose is to manage locks for resources and store configuration information for various distributed services throughout the Google cluster environment. The key design goal of Chubby was high availability and high reliability.

By default, the service runs on five computers as five active replicas. This grouping is called a Chubby cell. One of these replicas is elected as the master to serve client requests. Only the master will serve client requests; the other machines in the cell are for fault tolerance in case the master dies. The only request they will answer from clients is to tell them who the master is.

A majority of the replicas in the cell must be running for the service to work. The Paxos distributed consensus algorithm is used to agree on a leader and is used for state machine replication. That is, it ensures that all updates take place in the same order on all replicas to keep the replicas consistent. Typically, there will be one Chubby cell per data center.

In addition to providing locks, Chubby is designed to manage relatively small amounts of data: items such as system configuration and state information for various services. Chubby provides clients with a namespace of files & directories. Every file or directory can be treated as a lock file and every file may, of course, be used to store data. The name of a lock is the name of its file: its hierarchical pathname. The interface to Chubby is not that of a native file system. There is no kernel module and client software communicates with Chubby via an API that sends remote procedure calls to the Chubby master.

File operations are somewhat different from those offered by conventional file systems. Files can be read and written only in their entirety: there are no seek or byte-range read and write operations. When a file is opened by the client, it is downloaded and a lease for that file is established. In this way, Chubby keeps track of which clients have cached copies of a file. All writes from a client must be sent to the Chubby master (we have a write-through cache). Chubby then sends invalidations to all clients that have cached copies.

Locks are advisory and can be either exclusive (one writer) or not (multiple readers). A client can send a request to acquire a lock for a file, release it, and also assign and check sequence numbers for a lock. Clients can also subscribe to receive events for an open file. These events include notification of file modification, the creation of directories or files under an open directory, and lock acquisition.

Chubby is designed primarily for managing coarse-grained locks. Fine-grained locks are locks that are generally used for a small object, such as a row in a table of a database. They are generally held held for a short duration, seconds or less. Coarse-grained locks typically control larger structures, such as an entire table or an entire database. They might be held for hours or days. They are acquired rarely compared to fine-grained locks. Hence, a server that is designed for coarse-grained locks can handle more clients.

Even though Chubby uses the term “coarse-grained”, it doesn’t exactly fit a “pure” coarse/fine grained lock model, but that model does not necessarily work well in real systems. In theory, the concept of a coarse-grained lock is to grant a process a lock for a large pool of objects of a particular class and then have that process be the lock manager for those objects. This is essentially what Chubby does but Chubby doesn’t keep track of all the objects. Instead, the top (coarse) level is a set of services, such as Bigtable tables, a GFS file system, or Pregel frameworks. Chubby allows them to ensure there is a single master and to lock any critical resources that might be shared among these applications. Those applications then handle the fine-grained aspect of giving out locks for data blocks, table cells, or synchronizing communication barriers.

Because Chubby does not hold huge amounts of data but may serve thousands of clients, all Chubby data is cached in memory. For fault tolerance, all data is written through to the disk when modified, and is propagated to replicas via the Paxos consensus algorithm to ensure consistent ordering. The entire database is backed up to the Google File System (GFS) every few hours to ensure that critical configuration information is saved even if all replicas die.

Chubby’s fault-tolerant locking makes it a good service for leader election. If a group of processes wants to elect a leader, they can each request a lock on the same Chubby file. Whoever gets the lock is the leader (for the lease period of the lock).

Network Attached Storage

Goal: Allow multiple clients to access files from file servers on a network.

To provide the same system call interface for supporting different local file systems as well as remote files, operating systems generally rely on a layer of abstraction that allows different file system-specific interfaces to coexist underneath the common system calls. On most Unix-derived systems (e.g., Linux, BSD, OS X, SunOS), this is known as the VFS layer (Virtual File System).

There are a couple of models for implementing distributed file systems: the _download/upload model _or the remote procedure call model. In a stateful file system, the server maintains varying amounts of state about client access to files (e.g., whether a file is open, whether a file has been downloaded, cached blocks, modes of access). In a stateless file system, the server maintains no state about a client’s access to files. The design of a file system will influence the access semantics to files. Sequential semantics are what we commonly expect to see in file systems, where reads return the results of previous writes. Session semantics occur when an application owns the file for the entire access session, writing the contents of the file only upon close, thereby making the updates visible to others after the file is closed, and overwriting any modifications made by others prior to that.

NFS

NFS was designed as a stateless, RPC-based model implementing commands such as read bytes, write bytes, link files, create a directory, and remove a file. Since the server does not maintain any state, there is no need for remote open or close procedures: these only establish state on the client. NFS works well in faulty environments: there’s no state to restore if a client or server crashes. To improve performance, a client reads data a block (8 KB by default) at a time and performs read-ahead (fetching future blocks before they are needed). NFS suffers from ambiguous semantics because the server (or other clients) has no idea what blocks the client has cached and the client does not know whether its cached blocks are still valid. NFS uses validation, where the client compares modification times from server requests to the times of data that it cached. However, it does this only if there are file operations to the server. Otherwise, the client simply invalidates the blocks after a few seconds. In NFS’s original stateless design, file locking could not be supported since that would require the server to keep state. It was later added through a separate lock manager that maintained the state of locks.

To facilitate larger deployments, NFS introduced the automounter. It was common to have an environment with many clients, each mounting many remote file systems. In such an environment, if all clients start up at approximately the same time, they can flood the server with mount requests. The automounter mounts remote directories only when they are first accessed. To make keeping track of mount points easier across many machines, automounter maps are configuration files that define what remote directories get mounted. These can be distributed across a set of clients.

AFS

AFS was designed as an improvement over NFS to support file sharing on a massive scale. NFS suffered because clients would never cache data for a long time (not knowing if it would become obsolete) and had to frequently contact the server. AFS introduced the use of a partition on a client’s disk to cache large amounts of data for a long time: whole file caching and long-term caching. It supports a file download-upload model. The entire file is downloaded on first access (whole file download) and uploaded back to the server after a close only if it was modified. Because of this behavior, AFS provides session semantics: the last one to close a modified file wins and other changes (earlier closes) are lost.

During file access, the client need never bother the server: it already has the file. When a client first downloads a file, the server makes a callback promise: it maintains a list of each client that has downloaded a copy of a certain file. Whenever it gets an update for that file, the server goes through the list and sends a callback to each client that may have a cached copy so that it can be invalidated on the client. The next time the client opens that file, it will download it from the server. Files under AFS are shared in units called volumes. A volume is just a directory (with its subdirectories and files) on a file server that is assigned a unique ID among the cell of machines (remember cells from DCE RPC?). If an administrator decides to move the volume to another server, the old server can issue a referral to the new server. This allows the client to remain unaware of resource movement.

Coda

Coda was built on top of AFS and focused on two things: supporting replicated servers and disconnected operation. To support replicated storage, AFS’s concept of a volume was expanded into that of a Volume Storage Group (VSG). Given that some volumes may be inaccessible at a particular point in time, the Accessible Volume Storage Group (AVSG) refers to the subset of the VSG that the client can currently access. Before a client accesses a file, it first looks up the replicated volume ID of the file to get the list of servers containing replicated volumes and the respective local volume IDs. While it can read files from any available server, it first checks the versions from all of them to ensure that one or more servers don’t have out-of-date files. If the client discovers that a server has an old version of a file, it initiates a resolution process by sending a message to that server, telling it to update its old versions. When a client closes a file, if there were any modifications, the changes are written out to all available replicated volumes.

If no servers are available for access, the client goes into disconnected operation mode. In this mode, no attempt is made to contact the server and any file updates are logged instead in a client modification log (CML). Upon connection, the client plays back the log to send updated files to the servers and receive invalidations. If conflicts arise (e.g., the file may have been modified on the server while the client was disconnected) user intervention may be required.

Because there’s a chance that users may need to access files that are not yet in the local cache, Coda supports hoarding, which is a term for user-directed caching. It provides a user interface that allows a user to look over what is already in the cache and bring additional files into the cache if needed.

DFS (AFS version 3)

AFS evolved over the years. The most significant evolutionary version is version 3, which was adopted as the recommended distributed file system in the Distributed Computing Environment (DCE) where it is named DFS (Distributed File System).

The primary design goal of this system was to avoid the unpredictable lost data problems of session semantics if multiple clients are modifying the same file. The concept of tokens was introduced. A token is permission given by the server to the client to perform certain operations on a file and cache a file’s data. The system has four classes of tokens: open, data, status, and lock tokens. An open token must be obtained to have permission to open a file. A read data token must be obtained for a byte range of a file to have permission to access that part of the file. Similarly, a write data token is needed to write the file. Status tokens tell the client that it may be able to cache file attributes. These tokens give the server control over who is doing what to a file. Tokens are granted and revoked by the server. For example, if one client needs to write to a file then any outstanding read and write data tokens that were issued to any clients for that byte range get revoked: those clients are now denied access until they get new tokens.

SMB/CIFS

Microsoft’s Server Message Block protocol was designed as a connection-oriented, stateful file system with a priority on file consistency and support of locking rather than client caching and performance. While it does not use remote procedure calls, its access principle is the same: requests (message blocks) are functional messages, providing file access commands such as open, create, rename, read, write, and close.

With the advent of Windows NT 4.0 and an increasing need to provide improved performance via caching, Microsoft introduced the concept of opportunistic locks (oplocks) into the operating system. This is a modified form of DFS’s tokens. An oplock tells the client how it may cache data. It allows clients to cache information without worrying about changes to the file at the server. At any time, a client’s oplock may be revoked or changed by the server. The mechanism has been extended since Windows 7 and is generally referred to as leases. There are currently eight oplocks (do not memorize these but have an understanding of what they do):

  1. A level 1 oplock (exclusive oplock) provides the client with exclusive access to the file (nobody else is reading or writing it), so it can cache lock information, file attributes, and perform read-aheads and write-behinds.

  2. A level 2 oplock (shared oplock) is granted in cases where multiple processes may read a file and no processes are writing to it.

  3. A batch oplock is also exclusive and allows a client to keep a file open on the server even if the local process using it closed the file. This optimizes cases where a process repeatedly opens and closes tehe same files (e.g., batch script execution).

  4. A filter oplock is exclusive and allows applications that hold the oplock to be preempted whenever other processes or clients need to access the file.

  5. A read oplock (R) is a shared oplock that is essentially the same as a level 2 oplock. It supports read caching.

  6. A read-handle oplock (RH) allows multiple readers and keeps the file open on the server even if the client process closes it. It is similar to the batch oplock but is shared and does not support file modifications. It supports read caching and handle caching.

  7. A read-write oplock (RW) gives a client exclusive access to the file and supports read and write caching. It is essentially the same the the level 1, or exclusive, oplock.

  8. A read-write-handle oplock (RWH) enables a client to keep a file open on the server even if the client process closes it. It is exclusive and similar to the batch oplock.

The last four oplocks have been added since Windows 7 and are somewhat redundant with earlier mechanisms.

Oplocks may be revoked (broken) by the server. For example, if Process A has a read-write oplock, it can cache all read and write operations, knowing that no other client is modifying or reading that file. If another client, Process B, opens the file for reading, a conflict is detected. Process B is suspended and Process A is informed of the oplock break. This gives Process A a chance to send any cached changes to the server before Process B resumes execution.

SMB 2 and beyond

The SMB protocol was known to be chatty. Common tasks often required several round-trip messages. It was originally designed for LANs and did not perform optimally either on wide area networks (WANs) or on today’s high-speed LANs (1–100 Gbps). The SMB protocol was dramatically cleaned up with the introduction of the Microsoft Vista operating system (SMB 2), with minor changes added in Windows 7 (SMB 2.1) and even more in Windows 8 (SMB 3). Apple has dropped its proprietary AFP protocol in favor of SMB 2 in OS X 10.10 (Mavericks). We will focus our discussion on the significant changes introduced in SMB 2.

SMB 2 added six key changes to its design:

Reduced complexity
The set of SMB commands went from over 100 commands down to 19.
Pipelining
Pipelining is the ability to send additional commands before the response to a prior command is received. Traditionally, SMB (and any RPC-based system) would have to wait for one command to complete before sending an additional command. The goal of pipelining is to keep more data in flight and use more of the available network bandwidth.
To give the server control over getting an overly high rate of client requests, credit-based flow control is used. With credit-based flow control, the server creates a small number of credits and later increases this number as needed. The server sends these credits to each client. The client needs credits to send a message to the server. Each time a message is sent, it decrements its credit balance. This allows server to control the rate of messages from any client and avoid buffer overflow.
Note that TCP uses congestion control, which results in data loss and wild oscillations in traffic intensity (TCP keeps increasing its transmission window size until packet loss occurs; then it cuts the value of the buffer in half and starts increasing again).
Compounding
Compounding is similar to pipelining but now allows multiple commands to be sent in one message. It avoids the need to optimize the system by creating commands that combine common sets of operations. Instead, one can send an arbitrary set of commands in one request. For instance, instead of the old SMB RENAME command, the following set of commands are sent: CREATE (create new file or open existing); SET_INFO; CLOSE. Compounding reduces network time because multiple requests can be placed within one message.
Larger read/write sizes
Fast networks can handle larger packet sizes and hence transfer larger read and write buffers more efficiently.
Improved caching
SMB 2 improved its ability to cache folder and file properties. This avoids messages to the server to retrieve these properties.
Durable handles
If there was a temporary network disconnection, An SMB client would lose its connection to the server and have to reestablish all connections and remount all file systems and reopen all files. With SMB 2, the connection has to be reestablished but all handles to open files will remain valid.

NFS version 4

While NFS version 3 is still widely used, NFS version 4 introduced significant changes and is a departure from the classic stateless design of NFS. The server is now stateful and is able to control client-side cache coherence better, allowing clients to cache data for a longer time. Servers can grant clients the ability to do specific actions on a file to enable more aggressive client caching. This is similar to SMB’s oplocks. Because of the stateful server, NFS now acquired open and close operations.

The addition of callbacks will notify a client when file or directory contents have changed.

Like SMB 2, NFS now supports compound RPC, where multiple operations can be grouped together into a single request. Sending one message instead of a series of messages reduces overall round-trip time significantly.

NFSv4 also added strong authentication and encryption and support file system replication and migration. This includes a mechanism of sending referrals similar to that used by AFS.

Parallel file systems

Goal: Store huge files (terabyte and larger) in a file system that is distributed across many (thousands) of machines.

Conventional file systems lay out an entire file system on one logical storage device (a disk or NAND flash memory). This means that the metadata and data for all files and directories is located in this one device. In some cases, the device might be a collection of disks (e.g., a RAID drive or several disks connected by a volume manager) but, to the operating system, it appears as one device – one collection of blocks. This places a restriction on each file system that it cannot span one machine. Conventional distributed file systems, used for network attached storage (NAS), basically use a file system access protocol but interact with one or more servers running these conventional file systems.

With parallel file systems, the metadata and data of files are separated onto different entities. The metadata of a file is the set of attributes about a file: its name, size, access permissions, time stamps, etc. The data of a file is, well, the file’s data. Metadata does not take up much space. It’s a small set of attributes and each attribute is generally only a few bytes in size. Data, however, can be huge: terabytes or more for large files.

The idea of a parallel file system is to have a system – or systems – manage the metadata. As part of the metadata for each file, the system will identify the systems and logical blocks holding the file’s data. By doing this, we gain several benefits:

  1. A file’s data is no longer limited to the capacity of any one system and can be spread out across many systems.

  2. Because data blocks are distributed among multiple systems, access is effectively load balanced across them. Moreover, multiple systems can process different file or different parts of a file with read operations taking place on different systems. This provides scalable bandwidth, comparable to striping in storage arrays.

  3. Data blocks can be replicated on multiple systems, providing fault tolerance and the potential for greater read bandwidth since processes can read from any of the replicated data blocks.

This design approach is the basis for the Google File System (GFS), the Hadoop Distributed File System (HDFS, essentially a clone of GFS), and distributed file systems used in supercomputing clusters, such as Luster and GlusterFS. We will not look at these last two but the concepts are similar.

Google File System (GFS)

The Google File System is designed to support large data-intensive applications (the kind of algorithms Google uses for search and other services). The system is designed for an environment where there are many thousands of storage servers, some of which are expected to be down at any given point in time. The data that is managed comprises of billions of objects and many petabytes of data. GFS is not designed to be a general-purpose file system and is optimized for large files (rather than lots of small files), streaming reads (more common than writes), and atomic appends (which may be done concurrently by multiple clients).

Each GFS cluster contains one master file server. This is a faster and more reliable machine that manages file system metadata, such as names and attributes of files and directories. None of the actual file data resides on this system. Instead, it contains a mapping of the file contents to the chunks that hold the data. Chunks are fixed-size, 64 MB blocks and are stored in chunkservers. The chunkservers are less reliable than the master and are replicated so that each chunk is stored on typically three three separate chunkservers.

Clients contact the master to look up files. The master gives them a chunkserver name and chunk handles for the files requested.

To write to a file, the master grants a chunk lease to one of the replicated chunks. The chunkserver holding this is the primary replica chunkserver for that chunk. Writing is a two-stage process. First, the client writes to one replica. That replica then forwards the data to another replica chunk server, and so on until all replicas for that chunk receive the data. This relieves the load and network demands on the client in that the client has to only write one copy. It also doesn’t put any one computer within GFS in the position of having to send out N copies of the data.

Note that the data flow (data transfer) from a client is chained: it goes to the primary replica, then to secondary replica, then another secondary replica, etc. This pipelining optimizes the bandwidth on each link. Writes are pipelined; a chunkserver sends data to the next replica at the same time as it is receiving it. This minimizes latency. To further reduce latency, each system will try to pick the nearest system that has not yet been added to the chain to send data. The client has the choice of all the replicas and can pick the closest one.

Once all replicas acknowledge receipt of the data, the second stage, writing, takes place. The client sends a write request to the primary chunkserver, identifying the data that was sent. The primary assigns a sequence to the write operations that take place on the data and sends write requests to all the replicas so that they will write data in the same sequence-number order.

The control flow (write commands) is sent from the client to the primary and then goes from the primary directly to all of the secondaries (but is a much, much smaller message).

Hadoop Distributed File System

Apache’s Hadoop Distributed File System (HDFS) is incredibly similar to GFS and is designed with essentially the same goals. The key distinction is that it does not support modifying a file once it is created, but it does support appends. This avoids the need to manage leases or locks for the file.

Like GFS’s master, HDFS uses a separate server, called a NameNode, that is responsible for keeping track of the filesystem namespace and the location of replicated file blocks. File data is broken into 128 MB blocks (default, but configurable per file) and is stored on DataNodes (GFS’s chunkservers). Each block (GFS chunk) is replicated on multiple DataNodes (the default is three, as with GFS). As with GFS, all file system metadata, names and block maps, is stored in memory on the DataNode for performance. File writes from clients are pipelined through each of the replica DataNodes that will hold the data.

HDFS uses rack-aware logic for determining which computers should host replicas for a block. The first replica is targeted to a computer in the same rack as the requesting client (to minimize latency). Second and additional replicas come from other racks in the data center for fault tolerance in case the first rack fails. The process for writing a block is essentially the same as in GFS. Data writes are pipelined to get data onto the replicas and then an update command is sent by the primary replica to all the others to write the block (this is essentially a commit operation, as it makes the data permanent).

Distributed Lookup Services (Distributed Hash Tables)

Goal: Create a highly-scalable, completely decentralized key-value store

The purpose of a distributed lookup service is to find the computer that has data that corresponds to a key that you have. For example, the key can be the name of the song and the computer is one that is hosting the MP3 file, or the key can be your customer ID and the corresponding data is the contents of your shopping cart.

The most straightforward approach is to use a central coordinator to store all the keys and their mapping to computers. You ask the server to look up a key and it gives you the computer containing the content. This is the Napster implementation, which was one of the first peer-to-peer file sharing services. A coordinator served as the web service that kept track of who was hosting which music files for download but the files themselves were hosted by the community of users. The Google File System, GFS, also uses a central coordinator although the corresponding data for a key (file) is distributed among multiple chunkservers.

Another approach is to “flood” the network with queries. What happens here is that your computer knows of a few peer computers, not the entire collection. It sends the request to these computers. Those computers, in turn, know of other computers. When a computer needs to look up a key, it forwards the request to all of its peer nodes. If a peer does not have the content, it repeats the process and forwards the request to its peers (and they forward the request to their peers …). A time to live (TTL) value in the request is decremented with each hop and the request is no longer forwarded if the hop count drops below zero. If one of the peers has the content, then it responds to whoever sent it the request. That node, in turn sends the response back to its requestor and the process continues back until the originator of the query gets the response. This is called back propagation. This approach was used by the Gnutella file sharing system.

Flooding uses what is known as an overlay network. An overlay network is a logical network that is built on top of another network. Typically, computers on the network have knowledge of only some of the other computers on the network and will have to use these “neighbor” nodes to route traffic to more distant nodes.

Finally, the distributed hash table (DHT) approach is a set of solutions that is based on hashing the search key to a number that is then used to find the node responsible for items that hash to that number. A key difference between the DHT approach and the centralized or flooding approaches is that the hash of the key determines which node is responsible for holding information relating to the key.

Consistent hashing

Conventional hash functions require most keys to be remapped if the size of the hash table changes: that is, keys will usually hash to a different value when the size of the table changes. For a distributed hash, this will be particularly problematic. It would mean that if we remove or add a node to a group of computers managing a DHT, a very large percentage of data will have to migrate onto different systems. A consistent hash is a hashing technique where most keys will not need to be reamapped if the number of slots in the table changes. On average, only k/n keys will need to be remapped for a system where k is the number of keys and n is the number of slots in the table.

CAN: Content-Addressable Network

CAN implements a logical x-y grid (although it can be applied to an arbitrary number of dimensions). A key is hashed by two hashing functions, one for each dimension (e.g., an x-hash and a y-hash). The result of these hashes identifies an x, y point on the grid. Each node is responsible for managing all keys that are located within a rectangle on the grid; that is, a node will be responsible for all keys whose x-hash falls in some range between xa and xb and whose y-hash falls in some range between ya and yb. This rectangle is known as a zone. If a node is contacted with a query for a key, it will hash the key and determine if it is present within its zone. If it falls within the node’s zone, the node can satisfy the request. If the hashed values are out of range, the node will forward the request to one of four neighbors (north, east, south, or west), which will then invoke the same logic to either process the query of forward it onto other nodes. The average route for a system with n nodes is O(sqrt(n)) hops.

CAN scales because each zone can be split into two, either horizontally or vertically, and the process can be repeated over and over as needed. For each split, some of the keys will have to move to the new node. For fault tolerance, key, value data can be replicated on one or more neighboring nodes and each node will know not just its neighbors but its neighbors' neighbors. If a node is not reachable, the request will be sent to a neighboring node. Even though we discussed CAN in two dimensions, it can be implemented in an arbitrary number of dimensions. For d dimensions, each node needs to keep track of 2d neighbors.

Chord

Chord constructs a logical ring representing all possible hash values (bucket positions). Note that for hash functions such as a 160-bit SHA-1 hash, this is an insanely huge value of approximately 1.46 x 1048 slots. Each node in the system is assigned a position in the huge logical ring by hashing its IP address. Because the vast majority of bucket positions will be empty, (key, value) data is stored either at the node to which the key hashes (if, by some rare chance, the key hashes to the same value that the node’s IP address hashed) or on a successor node, the next node that would be encountered as the ring is traversed clockwise. For a simple example, let us suppose that we have a 4-bit hash (0..15) and nodes occupying positions 2 and 7. If a key hashes to 4, the successor node is 7 and hence the computer at node 7 will be responsible for storing all data for keys that hash to 4. It is also responsible for storing all data to keys that hash to 3, 5, 6, and 7.

If a node only knows of its clockwise neighbor node, then any query that a node cannot handle will be forwarded to a neighboring node. This results in an unremarkable O(n) lookup time for a system with n nodes. An alternate approach is to have each node keep a list of all the other nodes in the group. This way, any node will be able to find out out which node is responsible for the data on a key simply by hashing the key and traversing the list to find the first node ≥ the hash of the key. This gives us an impressive O(1) performance at the cost of having to maintain a full table of all the nodes in the system on each node. A compromise approach to have a bounded table size is to use finger tables. A finger table is a partial list of nodes with each element in the table identifying a node that is a power of two away from the current node. Element 0 of the table is the next node (20 = 1 away), element 1 of the table is the node after that (21 = 2 away), element 2 of the table four nodes removed (22), element 3 of the table eight nodes removed (23), and so on. With finger tables, O(log n) nodes need to be contacted to find the owner of a key.

Distributed hash table case study: Amazon Dynamo

Goal: Create a practical implementation of a key-value store that is highly scalable, completely decentralized, replicated, and efficient.

Dynamo is a highly available key-value store created internally within Amazon. Many services within Amazon, such as best seller lists, shopping carts, user preferences, session management, etc., need only primary key access to data. A multi-table relational database system would be overkill and would limit scalability and availability. With Dynamo, an application can configure its instance of Dynamo for desired availability (# replicas) and consistency.

Dynamo supports two operations: get(key) and put(key, data, context). The data is an arbitrary bunch of bytes (typically less than 1 MB) and is always identified by a unique key. The (key, data) values are distributed over a large set of servers and replicated for availability. The context is a value that is returned with a get operation and then sent back with a future put operation. The context is meaningless to the user program and is used like a cookie. Internally, it contains versioning information.

The system is decentralized: there is no need for a central coordinator to oversee operations. Every node has equivalent responsibilities and the system can grow dynamically by adding new nodes.

Partitioning: scalability

Since massive scalability is a key design aspect of Dynamo, data storage is distributed among many nodes. Dynamo relies on consistent hashing to identify which node will be responsible for a specific key. With normal hashing, a change in the number of slots in the table (e.g., number of nodes since we are hashing to find a node) will result in having to remap all keys. With consistent hashing, this is not the case and only K/n keys need to be remapped, where K is the number of keys and n is the number of slots (nodes).

Nodes are arranged in a logical ring. Each node is assigned a random “token” (a number) that represents its position in the logical ring. It is responsible for all keys that hash to any number between the node’s number and its predecessor’s number. When a key is hashed, a node will traverse a logical ring of node values to find the first node with a position greater than or equal to that hashed result. This tells it which node is responsible for handling that key. Since nodes are not added that frequently, every node can store a cached copy of this ring (the size is the list of N hash values and server addresses, where N is the number of nodes). Moreover, to avoid the extra hop resulting because a client sent a request to the wrong Dynamo node, a client may also get a copy of this table. The system has been described as a zero-hop distributed hash table (DHT).

Adding or removing nodes affects only the node’s immediate neighbor in the ring. If a node is added, the successor node will give up some range of values to the new node. If a node is removed, the key, value data it manages will have to be moved over to the successor. In reality, a physical node is assigned to manage multiple points in the ring (multiple ranges of hash values). The nodes in the algorithm that we just discussed are really virtual nodes. A real machine, or physical node, will be responsible for managing multiple virtual nodes. The advantage of doing this is that we can attain balanced load distribution. If a machine becomes unavailable, the load is evenly distributed among the available nodes. If a new machine is added, each virtual node within it will take a set of values from other virtual nodes, effectively taking on a small amount of load from a set of other machines instead of from just the neighboring system (successor node). Moreover, if a more powerful machine is added, it can be assigned a larger number of virtual nodes so that it bears a greater percentage of the query workload.

Replication: availability

Dynamo is designed as an always writable data store. Data can be replicated on N hosts, where the value N is configurable per instance of Dynamo. The node that is responsible for storing the key-value is assigned the role of coordinator for that key and is in charge of replication for that key. The coordinator copies (key, value) data onto N-1 successor nodes clockwise in the ring. Copying is asynchronous (in the background) and Dynamo provides an eventually consistent model. This replication technique is called optimistic replication, which means that replicas are not guaranteed to be identical at all times. If a client cannot contact the coordinator, it sends the request to a node holding a replica, which will then periodically try to contact the coordinator.

Because data is replicated and the system does not provide ACID guarantees, it is possible that replicas may end up holding different versions. Dynamo favors application-based conflict resolution since the application is aware of the meaning of the data associated with the key and can act upon it intelligently. For example, the application may choose to merge two versions of a shopping cart. If the application does not want to bother with this, the system falls back on a last write wins strategy. Dynamo uses vector clocks to identify versions of stored data and to distinguish between versions that have been modified concurrently from versions that are causally related (a later one is just an update of an earlier one). For example, one can identify whether there are two versions of a shopping cart that have been updated independently or whether one version is just a newer modification of an older shopping card. A vector clock value is a set of (node, counter) pairs. This set of values constitutes the version of the data. It is returned as the “context” with a get operation and updated and stored when that context is passed as a parameter to a put operation.

Because Dynamo is completely decentralized and does not rely on coordinator (unlike GFS, for example), each node serves three functions:

  1. Managing get and put requests. A node may act as a coordinator responsible for a particular key or may forward the request to the node that has that responsibility.

  2. Keeping track of membership and detecting failures. A node will detect if other nodes are out of service and keep track of the list of nodes in the system and their associated hash ranges.

  3. Local persistent storage. Each node is responsible for being either the primary or replica store for keys that hash to a certain range of values. These (key, value) pairs are stored within that node using a variety of mechanisms depending on application needs. These include the Berkeley Database Transactional Data Store, MySQL (for large objects), or an in-memory buffer with persistent backing store (for highest performance).

Domain Name System

Goal: Build a wide-area distributed name servers for resolving information about Internet domain names

The Internet Domain Name System (DNS) is the naming system for nodes on the Internet that associates human-friendly compound names with numeric IP addresses and other information about that node. IP domain names (e.g., cs.rutgers.edu) are hierarchical and assigned entirely independently from their corresponding addresses (e.g., 128.6.4.2). Any name can be associated with any address.

IP addresses are managed globally by the IANA, the Internet Assigned Numbers Authority, which hands off management to Regional Internet Registries that, in turn assign blocks of IP addresses to ISPs. Internet domain names are also assigned hierarchically, with hundreds of top-level domains (.com, .org, .ca, .es, .yoga, .wedding, …) under a single root. The IANA delegates the management of individual top-level domain names to various companies. These are called domain name registry operators, each of which operates a NIC, or Network Information Center. Domain name registrars are companies that allow end users to register domain names. They update the registry database at the NIC. For example, the .yoga top-level domain is operated by a company in Ireland called Top Level Domain Holdings Ltd. When you register a domain with a domain name registrar such as Namecheap or GoDaddy, the company contacts Top Level Domain Holdings to update the master database of information about domains registered under .yoga.

The Domain Name System is implemented through a collection of DNS servers. It is an example of a distributed name server for resolving Internet domain names into IP addresses. Each server is responsible for answering questions about machines within its zone. A zone is a subtree of the Internet Domain name space that is managed by that server. A registered domain name must include the address of one or more name servers that are responsible for answering queries about hosts within that domain.

Each DNS server will do one of several things: (1) answer a request if it already knows the answer, (2) contact another name server(s) to search for the answer, (3) return an error if the domain name does not exist, or (4) return a referral: the address of another name server that may know more of the answer.

For example, a search for mail.pk.org (with no cached information) begins by querying one of several replicated root name servers. These keep track of the name servers responsible for top-level domains. This query will give you a referral to a name server that is responsible the .org domain. Querying that name server will give you a name server responsible for pk.org. Finally, the name server responsible for pk.org will provide the IP address or an authoritative “this host does not exist” response. Along the way, referrals may be cached so that a name server need not go through the entire process again.

An iterative name resolution process is one where you go through the name hierarchy to find lower-level name servers that will eventually take you to the final component of the name. This is the way DNS servers operate when they return a referral to another name server if they are not responsible for an address and is the common way for DNS server to work since a server does not have to maintain any state. A recursive name resolution process is one where the name server itself will invoke other name servers as needed to get the answer to the query. DNS supports both forms. The local DNS server, called a DNS resolver (for example at your ISP) will generally support recursive requests and perform the iteration itself rather than passing referrals back to the client.

Distributed transactions

Goal: Create a fault tolerant algorithm that ensures that all group members agree to commit (make their actions permanent). If agreement is not reached, then all group members must agree to abort (undo any changes).

A transaction is a set of operations that operates on, and typically modifies, data. A key facet of a transaction is that it has to be atomic — all results have to be made permanent (commit) and appear to anyone outside the transaction as an indivisible action. If a transaction cannot complete, it must abort, reverting the state of the system to that before it ran. If several transactions run concurrently, the end result must be the same as if those transactions ran in some (any) serial order. The specific order in which transactions execute is known as a schedule.

A transaction-based model guarantees a set of properties known as ACID:

Atomic
The transaction happens as a single indivisible action. Everything succeeds (and the transaction commits) or else the entire transaction is rolled back (the transaction aborts). Other transactions do not see intermediate results.
Consistent
A transaction cannot leave the data in an inconsistent state. If the system has invariants, they must hold after the transaction. For example, the total amount of money in all accounts must be the same before and after a “transfer funds” transaction.
Isolated (Serializable)
Transactions cannot interfere with each other. If transactions run at the same time, the final result must be the same as if they executed in some serial order.
Durable
Once a transaction commits, the results are made permanent. No failures after a commit will cause the results to revert.

A write-ahead log (or transaction log) is used to enable rollback: reverting to the previous state when aborting a transaction. It is also crucial in maintaining the state of the transaction in a stable place in case the process or computer should die. The system will be able to read the log and recover to where it was in the transaction commit protocol.

In a distributed transaction environment, multiple processes participate in a transaction, each executing its own sub-transaction that can commit only if there is unanimous consensus by all participants to do so. Each system runs a transaction manager, a process that is responsible for participating in the commit algorithm algorithm to decide whether to commit or abort its sub-transaction. One of these transaction managers may be elected as the coordinator and initiates and runs the commit algorithm. Alternatively, the coordinator could be a separate process from any of the transaction participants.

Two-phase commit protocol

The two-phase commit protocol uses atomic multicasts to reach a consensus among the group on whether to commit or abort. It uses a coordinator to send a request (“can you commit?”) to every member of the group (reliably, retransmitting as often as needed until all replies are received). Phase 1 is complete when every member of the group (each participant) responds. If the coordinator gets even a single abort response from a participant, it must tell all participants to abort the entire transaction. Otherwise, it will tell all participants to commit it. In phase 2, the coordinator sends the commit or abort order and waits for a response from everyone. In summary, in phase 1, the coordinator gets everyone’s agreement and in phase 2, the coordinator sends the directive to commit or abort.

The write-ahead log in stable storage is crucial for ensuring atomic multicasts (the write-ahead log is also important for transaction rollback, which is used for aborts!). For example, if a participant sent the coordinator a commit response for phase 1 and then died, it must be able to reboot and reconstruct the transaction state from the log; it cannot change its mind after rebooting. The two-phase commit protocol cannot proceed until every participant acknowledges every message.

The two phase commit stalls if any member, coordinator or any participant, dies. It has to wait for the recovery of that member before proceeding with the protocol. A recovery coordinator can step in in certain circumstances. If the coordinator died and a recovery coordinator took over, it queries the participants. If at least one participant has received a commit message then the new coordinator knows that the vote to commit must have been unanimous and it can tell the others to commit. If no participants received a commit message then the new coordinator can restart the protocol. However, if one of the participants died along with the coordinator, confusion may arise. If all the live participants state that they have not received a commit message, the coordinator does not know whether there was a consensus and the dead participant may have been the only one to receive the commit message (which it will process when it recovers). As such, the coordinator cannot tell the other participants to make any progress; it must wait for the dead participant to come back.

Three-phase commit protocol

The two-phase commit protocol is a blocking protocol. If the coordinator or any participant stops running, the entire protocol stalls until the failed process is restarted. The three-phase commit protocol is similar to the two-phase commit protocol but allows entities to time out in certain cases to avoid indefinite waits.

The protocol also supports the use of a recovery coordinator by introducing an extra phase where participants are told what the consensus was so that any participant that received this information before a coordinator died could inform a standby coordinator whether there was a unanimous decision to commit or abort. The three-phase commit protocol propagates the knowledge of the outcome of the election to all participants before starting the commit phase.

In phase 1 of the three-phase commit protocol, the coordinator sends a query to commit request (“can you commit?”) to every member of the group (reliably, retransmitting as often as needed until all replies are received over a period of time). If any of the participants respond with a no or if any failed to respond within a defined time, then send an abort to every participant.

In phase 2, the coordinator sends a agreed to commit message to all participants and gets acknowledgements from everyone. When this message is received, a participant knows that the unanimous decision was to commit. If a participant fails to receive this message in time, then it aborts. At this point, the participants do not commit. However, if a participant receives an abort message then it can immediately abort the transaction.

In phase 3, the coordinator sends a commit message to all participants telling them to commit. If a participant fails to receive this message, it commits anyway since it knows from phase 2 that there was the unanimous decision to commit. If a coordinator crashes during this protocol, another one can step in and query the participants for the commit decision. If every participant received the prepare-to-commit message then the coordinator can issue the commit directives. If some participants received the message, the coordinator now knows that the unanimous decision was to commit and can re-issue the prepare-to-commit request followed by a commit. If no participant received the message, the coordinator can restart to protocol or, if necessary, restart the transaction.

The three-phase commit protocol accomplishes two things:

  1. Enables use of a recovery coordinator. If a coordinator died, a recovery coordinator can query a participant.

    • If the participant is found to be in phase 2, that means that every participant has completed phase 1 and voted on the outcome. The completion of phase 1 is guaranteed. It is possible that some participants may have received commit requests (phase 3). The recovery coordinator can safely resume at phase 2.

    • If the participant was in phase 1, that means NO participant has started commits or aborts. The protocol can start at the beginning

    • If the participant was in phase 3, the coordinator can continue in phase 3 – and make sure everyone gets the commit/abort request

  2. Every phase can now time out – there is no indefinite wait like in the two-phase commit protocol.

    Phase 1:
    Participant aborts if it doesn’t hear from a coordinator in time
    Coordinator sends aborts to all if it doesn’t hear from any participant
    Phase 2:
    If coordinator times out waiting for a participant – assume it crashed, tell everyone to abort
    If participant times out waiting for a coordinator, elect a new coordinator
    Phase 3:
    If a participant fails to hear from a coordinator, it can contact any other participant for results

The three-phase commit protocol suffers from two problems. First, a partitioned network may cause a subset of participants to elect a new coordinator and vote on a different transaction outcome. Secondly, it does not handle fail-recover well. If a coordinator that died recovers, it may read its write-ahead log and resume the protocol at what is now an obsolete state, possibly issuing conflicting directives to what already took place. The protocol does not work well with fail-recover systems. Perhaps most importantly, the three-phase commit protocol is less efficient than the two-phase commit protocol. We expect the vast bulk of transactions to commit without aborts or failed coordinators. With the three phase commit protocol, we incur an extra set of messages (which are logged onto stable storage) to enable systems to time out and abort or to enable a recovery coordinator to take over. In other words, we pay a 50% communication penalty to handle an event that we rarely expect to occur.

Brewer’s CAP Theorem

Ideally, we’d like three properties in a replicated distributed system:

Consistency
The data retrieved from the system should be the same regardless of which server is contacted.
Availability
The system should always be available to handle requests.
Partition tolerance
The system should continue to function even if some network links do not work and one group of computers cannot talk to another. For example, a link between two data centers may go down.

Eric Brewer’s CAP Theorem states that you can have either consistency or availability in the presence of partitions but not both. Quite often, the theorem is summarized as: if you want consistency, availability, and partition tolerance, you have to settle for at most two out of three of these. However, we expect real-world distributed systems to be at risk of network partitions, so the choice is really between consistency and availability.

With distributed architectures, we generally strive for high availability and the ability to withstand partitions (occasional breaks in the ability of nodes to communicate). Hence, we will have to give up on consistency and break the guarantees of ACID. An alternative to the requirements of ACID is BASE.

BASE stands for Basic Availability, Soft-state, Eventual consistency. Instead of requiring consistency after every transaction, it is enough for the data to eventually get to a consistent state. The downside is that some processes may access stale data which has not yet been brought into a consistent state.

Concurrency control

Goal: Allow multiple transactions to run concurrently but ensure that resource access is controlled to give results that are identical if they ran in some serial sequence. This preserves the “Isolated” guarantee of ACID transaction semantics.

A schedule is a sequence of transactions. A serial schedule is one where transactions are executed sequentially: a new transaction starts when a previous one commits (or aborts). This is inefficient, since we are forcing transactions to run one at a time. However, it ensures isolation (the I in ACID).

The goal of concurrency control is to allow multiple transactions to run concurrently (which is great for performance) while ensuring that data access is controlled such that the net effect is the same as if the transactions all ran in some serial order. That is, we cannot have the net result be one where transactions read an interim state of data from another transaction.

Two-phase locking (2PL)

Exclusive locks, via a lock manager, help us accomplish this. A transaction can grab locks for the resources it needs. That ensures mutual exclusion. To ensure serializability, it is important that a transaction does not acquire any new locks after it has released a lock. If we do not do this, transaction A may create a result based on the completion of transaction B while transaction B might have used data that A wrote and unlocked. This technique is known as two-phase locking (2PL). The first phase is the growing phase, in which locks are acquired. The second phase is the shrinking phase, in which locks are released.

Strong strict two-phase locking (SS2PL)

The problem with two-phase locking is that, if a transaction that has released some locks now aborts, there is a chance that other transactions have used data that was modified by the transaction. In that case, those transactions (and all transactions that depend on them) have to abort as well. This condition is called cascading aborts. Strong strict two-phase locking (SS2PL) avoids this problem by requiring all locks to be held until the end. The shrinking phase, in effect, is an atomic operation that occurs at the very end of the transaction. You lose concurrency this way but avoid having to process cascading aborts.

Exclusive and shared locks

Exclusive locking for every resource access is a bit aggressive. Consider a transaction that just reads data. It needs to lock that data to ensure that no other transaction modifies it but nothing would go wrong if another transaction also wanted to read that same data. We can achieve greater concurrency by distinguishing read locks from write locks. Read locks (called shared locks) need not be exclusive: any number of them can be granted. However, if there are any read locks on an object, a write lock cannot be granted and the transaction must wait. Similarly, if there is a write lock (called an exclusive lock) on an object then any read lock requests or write lock requests must wait.

Two-version-based concurrency control

A way of increasing concurrency even beyond read/write locks is two-version-based concurrency control. In this case, one transaction writes tentative versions (private versions) while other transactions read existing, previously committed versions of the data. This allows transactions to request read locks even if another transaction has a write lock on the object. When the transaction that has a write lock is ready to commit, it converts its write locks to a commit locks, waits until any transactions that have a read lock for that object complete, and then makes its modified version permanent. During a commit lock, no other transaction can grab any lock on that object. This allows increased concurrency but transactions that write data risk waiting when they attempt to commit and make their data permanent.

Optimistic concurrency control

Concurrency control techniques that rely on locking force locks to be held while the transaction is using the resource, or, in some cases, until the end of the transaction. This restricts the maximum amount of concurrency that may be achieved in the system. Optimistic concurrency control techniques assume that transactions are more likely to complete than not. As such, it is better to put more effort on rectifying errors from having used data from aborted transactions than to lock access to the data. A fully optimistic system uses no locks and checks for conflicts at commit time. Optimistic concurrency control has three phases of operation:

  1. Working phase. The transaction reads and writes data. A private workspace is often, but not always, used to keep non-committed results isolated.

  2. Validation phase. When the transaction is ready to commit, a check is made to see if there is any conflict with other transactions that took place during this time. For example, if some transaction A modified data and committed but transaction B read data before A modified that data, then transaction B cannot be allowed to commit because that would violate serializability.

  3. Update phase. Tentative changes are made permanent and the transaction commits.

Timestamp ordering

Timestamp ordering allows lock-free concurrency control by keeping track of two timestamps per object: the timestamp of the last committed that read the object and the timestamp of the last committed transaction that wrote the object. If a transaction wants to write an object, it compares its own timestamp with the object’s write timestamp. If the object’s timestamp is older then we have good ordering and the transaction can proceed. Otherwise the transaction aborts and is restarted.

Multiversion concurrency control

We can extend two-version-based concurrency control to support multiple versions of an object and apply the concepts of timestamp ordering to create multiversion concurrency control (MVCC).

In multiversion concurrency control, no locks are used. The system can maintain multiple versions of an object (e.g., a field in a database table). Each transaction has a timestamp associated with it that marks the start of the transaction. Each version of an object has two timestamps: the read timestamp, which records the time the object was read, and the write timestamp, which records the time the object was modified.

With MVCC, a transaction will never have to wait to read an object; it simply reads an the latest version of the object that has a write timestamp earlier than that of the transaction. A write cannot take place if the are any other uncommitted transactions that read the object and have a timestamp earlier than the read timestamp of our transaction. This is because an earlier transaction depends on a previous value of the object and we need to consider the possibility that the earlier transaction may modify that object.

Locks vs. leases

When processes rely on locks, there is an implicit assumption that fault-tolerance is lost. The process holding a lock may forget to release a lock or may die. A way to avoid this problem is to use leases instead of locks. A lease is a lock with an expiration time. The downside with a leasing approach is that the resource is unavailable to others until the lease expires. Now we have a trade-off: have long leases with a possibly long wait after a failure or have short leases that need to be renewed frequently.

Fault-tolerant consensus protocols, such as Raft or Paxos, can be used for granting leases to resources. However, the overhead of the algorithm isn’t attractive for all situations, especially when short-term leases are needed. A compromise is to use hierarchical leases, or sub-leases. The top-level lease is a coarse-grained lease while sub-leases are typically fine-grained leases. A consensus algorithm is used to elect a coordinator. This coordinator is granted a lease on a set of resources (or all resources) in the system. Now the coordinator hands out leases to processes that need those resources. When the coordinator’s main lease expires, a consensus algorithm has to be run again to grant a new lease and possibly elect a new coordinator but it does not have to be run for every client’s lease request; that is simply handled by the coordinator.

Distributed Deadlock

Goal: Determine if a transaction will wait for resources in such a manner as to create an indefinite wait, called deadlock. Find ways to ensure that this will not happen.

To ensure consistency, a transaction get locks for resources (e.g., objects, database rows, files) it needs to access. This ensures that other transactions will not modify the data it needs during its execution or read intermediate results. A deadlock occurs when there is a circular dependency on processes holding and requesting locks on resources.

Note that we will use processes and transactions interchangeably. A transaction is simply a process that can be aborted. That is, it can revert any modifications that it made before exiting.

The four conditions that must hold are:

  1. mutual exclusion: A resource can be held by at most one process.
  2. hold and wait: Processes that already hold resources can wait for another resource.
  3. non-preemption: A resource, once granted, cannot be taken away.
  4. circular wait: Two or more processes are waiting for resources held by one of the other processes.

Three approaches can be used for managing deadlock in distributed systems.

Detection
A centralized deadlock detection approach uses a central coordinator to manage a resource graph of processes and the resources they are using. Each time a process gets a lock or releases a lock on a resource, it sends a message to this coordinator (waiting-for or releasing). The coordinator builds a graph of all the processes and the resources they are holding or waiting for. This is called a wait-for graph. If a cycle is detected, in the graph then the coordinator knows a deadlock exists. In some cases, if release and waiting-for messages are received out of order, they can lead the coordinator to believe that there is a deadlock cycle when none really exists. In reality, the release message should have been processed first and would cause the deadlock to not happen. This condition is known as phantom deadlock.
The Chandy-Misra-Haas distributed deadlock detection algorithm has a process send a probe message to a process that is holding a resource prior to waiting for the resource. The receiving process forwards the probe to every process that contains resources it is waiting for. This is called edge chasing. If the original process receives its own probe message then it knows that a dependency cycle, and hence deadlock, will exist if it waits for the resource it wants.
Prevention
Deadlock prevention requires the system to ensure that at least one of the conditions that is necessary for deadlock cannot occur. This can be implemented by making decisions based on the timestamps of each transaction competing for resources.
The wait-die algorithm states that if a younger process is using the resource, then the older process (that wants the resource) waits. If an older process is holding a resource, then the younger process that wants the resource kills itself (that’s ok; transactions are designed to be restartable). This forces the resource utilization graph to be directed from older to younger processes, making cycles impossible.
The wound-wait algorithm ensures that the graph flows from young to old and cycles are again impossible. an old process will preempt (kill) the younger process that holds a resource. If a younger process wants a resource that an older one is using, then it waits until the old process is done.
Avoidance
This requires knowing which locks will be needed by transactions and then managing resource allocation or transaction scheduling to ensure that deadlock will not happen. This requires a priori knowledge of which resources each transaction will need and when each transaction is scheduled to execute. As such, it is usually not a practical approach in distributed environments (it can be used in operating systems; the banker’s algorithm tries to find an execution sequence that will avoid deadlock … but we will not cover it here). One way in which this can be implemented is that a transaction will try to get all the locks it needs before it starts to execute. If it cannot get every one of those locks, it will abort rather than wait.

MapReduce

Goal: Create a massively parallel software framework to make it easy to program classes of problems that use big data that can be parsed into (key, value) pairs. Each unique key is then processed with a list of all values that were found for it.

MapReduce is a framework for parallel computing. Programmers get a simple API and do not have to deal with issues of parallelization, remote execution, data distribution, load balancing, or fault tolerance. The framework makes it easy for one to use thousands of processors to process huge amounts of data (e.g., terabytes and petabytes). It is designed for problems that lend themselves to a map-reduce technique, which we will discuss. From a user’s perspective, there are two basic operations in MapReduce: Map and Reduce.

The Map function reads data and parses it into intermediate (key, value) pairs. When that is complete and all (key, value) sets are generated, the Reduce function is called once for each unique key that was generated by Map and is given that key along with a list of all values that were generated for that key as parameters. The keys are presented in sorted order.

The MapReduce client library creates a master process and many worker processes on the available, presumably large, set of machines. The master serves as a coordinator and is responsible for assigning roles to workers, keeping track of progress, and detecting failed workers. The set of inputs is broken up into chunks called shards. The master assigns map tasks to idle workers and gives each of them a unique shard to work on. The map worker invokes a user’s map function, which reads and parses the data in the shard and emits intermediate (key, value) results.

The intermediate (key, value) data is partitioned based on the key according to a partitioning function that determines which of R reduce workers will work on that key and its associated data. The default function is simply hash(key) mod R that [usually] distributes keys uniformly among R reduce workers but a user can replace it with a custom function. These partitioned results are stored in intermediate files on the map worker, grouped by key. All map workers must use the same partitioning function since the same set of keys must go to the same reduce workers. Map workers work in parallel and there is no need for any one to communicate with another. When all map workers are finished processing their inputs and generated their (key, value) data, they inform the master that they are complete.

We now need to get all values that share the same key to a single reduce worker. The process of moving the data to reduce workers is called shuffling. After that, each reduce worker sorts the (key, value) data so that all the values fot the same key can be combined into one list. This is the sorting phase.

The master dispatches reduce workers. Each reduce worker contacts all the map worker nodes with remote procedure calls to get the set of (key, value) data that was targeted for them. The combined data for each key is then merged to create a single sorted list of values for each key. The combined process of getting the data and sorting it my key is called shuffle and sort.

The user’s reduce function gets called once for each unique key. The reduce function is passed the key and the list of all values associated with that key. This reduce function writes its output to an output file, which the client can read once the MapReduce operation is complete.

The master periodically pings each worker for liveness. If no response is received within a time limit, then the master reschedules and restarts that worker’s task onto another worker.

Bigtable

Goal: Build a massively scalable table of data that is indexed by a row key, contains a potentially arbitrary and huge number of columns, and can be distributed among tens of thousands of computers.

Bigtable is a distributed storage system developed at Google that is structured as a large table: one that may be petabytes in size and distributed among tens of thousands of machines. It is designed and used for storing items such as billions of web pages indexed by URL, with many versions per page; hundreds of terabytes of satellite image data; or statistics and preferences of hundreds of millions of users. It is also designed to be able to handle thousands of queries a second.

To the user, an instance of Bigtable appears as a large spreadsheet: a table that is indexed by a row key, column name, and timestamp. Each cell can store multiple timestamped versions of data. If an application does not specify a timestamp, it will get the latest version. Alternatively, it can specify a timestamp and get the latest version that is no newer than that timestamp. All operations on rows are atomic but only on one row at a time. That is, a client cannot grab a lock for a set of rows in order to make changes.

Columns in the table are organized into column families. A column family is a related group of columns (e.g., “outgoing links” could be a column family). Each column family has a unique name and contains within it a list of named columns. A Bigtable instance will typically have a small number of column families (perhaps a few hundred at most) but each column family may have a huge number (perhaps millions) of columns within it. Bigtable tables are generally sparse. If a cell (a column in a row) is empty, it will not use up any space. Column families are configured when the table is created and are common to all rows of the table. Columns within a column family may be specific to a row and can be added dynamically to any row. An example of columns within a column family is a list of all the websites you visited (i.e., many thousands of columns, with a new one added when you visit a new website). The set of columns used between one row and another may be wildly different (which is what makes tables typically sparse). Each cell may contain multiple versions of data. Timestamped versioning is configured on a per-column-family basis.

A Bigtable table starts off as a single table. As rows are added, the table is always kept sorted by row key. As the table grows, it may split along row boundaries into sub-tables, called tablets. Tablet servers are responsible for serving data for tablets. If a tablet gets too many queries, Bigtable can balance the workload by splitting the tablet and moving one of the new tablets to a different server.

Bigtable comprises a client library (linked with the user’s code), a master server that coordinates activity, and many tablet servers. Each tablet server is responsible for managing multiple tablets. Depending on the size of the tablets, it may manage from tens to thousands of tablets. Tablet servers can be added or removed dynamically. Tablet data is stored within GFS (Google File System) and tablet servers run on the same machines as GFS chunkservers. A tablet server handles read/write requests for its tablets and splits tablets when they grow large.

New data to Bigtable is stored on the tablet server in a memory structure called a memtable. Once the size of the memtable gets larger than some threshold value, it is frozen and converted to an SSTable. The SSTable (sorted string table) is the file format used to store Bigtable data, managing the mapping of keys to their values. An SSTable is immutable. Any changes result in a new table and old tables will get deleted.

The master assigns tablets to tablet servers, balances tablet server load, and handles schema changes (table and column family creations). It tries to run the tablet server on the same GFS chunkserver that holds data for that tablet. The master is also responsible for garbage collection of unneeded files (e.g., old SSTables in a tablet) and managing schema changes (table and column family creation).

In Bigtable, Chubby is used to ensure there is only one active master, store the bootstrap location of Bigtable data, discover tablet servers, store Bigtable schema information, and store access control lists. Chubby stores the location of the root tablet, which is the top level of the three-layer-deep balanced tree. The top two layers of the tree are metadata tablets, which do not contain table data but only information to locate the tablet that is responsible for a particular range of key values.

Bigtable relies on GFS for protecting its data from loss but can be configured for replication to multiple Bigtable clusters in different data centers to ensure availability. Data propagation for this replication is asynchronous and results in an eventually consistent model.

Cassandra NoSQL database

Goal: Create a distributed NoSQL database system designed for high availability, fault tolerance, and efficient storage and retrieval of large volumes of data across many commodity servers.

Cassandra is a distributed NoSQL database system that is designed to handle large volumes of data across many commodity servers while providing high availability, fault tolerance, and low latency. Cassandra’s architecture is based on the concept of a wide-column data store and distributed hash tables.

In a wide-column database (also known as a column-family or columnar database), data is organized into tables where the columns for each row can vary in quantity and type. This allows for flexible schema design and efficient storage and retrieval of data. Cassandra uses the Cassandra Query Language (CQL) to interact with table data.

Cassandra’s architecture is based on a peer-to-peer distributed system that uses a ring-based architecture to distribute data across nodes in a cluster. Each node in the cluster is responsible for storing a portion of the data, and nodes work together to provide a highly available and fault-tolerant system.

Cassandra uses a Chord-style distributed hash table (DHT) to manage the distribution of data across the cluster. A DHT is a distributed data structure that maps keys to values, allowing nodes to efficiently locate data stored on other nodes in the cluster. Cassandra’s DHT is called the partitioner, and it uses consistent hashing to evenly distribute data across nodes in the cluster. This ensures that each node is responsible for a balanced and manageable portion of the data.

A row within a table is indexed by a primary key. The primary key in Cassandra may be composed of a combination of a partition key and one or more clustering keys (columns). The partition key determines which partition the data belongs to. This value is hashed and used by the DHT to locate the node responsible for that partition.

A clustering key is used to define the sequence of data within a partition. The clustering key is crucial for sorting and organizing data inside a partition, making it easier to retrieve and query data efficiently. When multiple clustering columns are used, they are combined in the order they are defined, creating a hierarchical structure to sort the data.

Cassandra’s replication feature provides fault tolerance by replicating data across multiple nodes in the cluster. Each data item in Cassandra is replicated across a configurable number of nodes, known as the replication factor. This ensures that if a node fails, the data can still be accessed from other nodes in the cluster.

Cassandra also provides tunable consistency, which allows developers to balance data consistency and availability according to their needs. Consistency can be tuned on a per-query basis, allowing developers to prioritize consistency for critical data while relaxing consistency requirements for less critical data.

Cassandra’s architecture also includes a distributed architecture for reads and writes. When a write request is sent to Cassandra, it is first written to a commit log on the node that received the request. The data is then written to an in-memory data structure called a memtable, which is periodically flushed to disk as a new SSTable file. Reads in Cassandra are typically served from the memtable, with the SSTable files being used to back up the data.

Spanner

Goal: Design a huge-scale worldwide database that provides ACID semantics, read-free locking, and external consistency.

Introduction

Brewer’s CAP theorem led to the popularity of an eventual consistency model rather than relying on transactional ACID semantics.

In this model, writes propagate through the system so that all replicated copies of data will eventually be consistent. Prior to the completion of all updates, processes may access old versions of the data if they are reading from a replica that has not been updated yet.

With ACID semantics, this would not happen since the transaction would grab a lock on all the replicas prior to updating them. Eventual consistency is the trade-off in designing a system that is both highly available and can survive partitioning.

The presence of network partitions is, in many environments, a rare event. By using an eventual consistency model, we choose to give up on “C” (consistency) in order to gain availability on the slim chance that the network becomes partitioned.

Given that partitions are rare in places such as Google data centers, which uses redundant networks both within and outside each center, it is not unreasonable to enforce a strong consistency model (which requires mutual exclusion via locks). Network partitions will rarely affect the entire system but only a subset of computers. As such, it is possible to use a majority consensus algorithm, such as Paxos, to continue operations as long as the majority of replicas are functioning. Spanner takes advantage of an environment where partitions are rare and is willing to be partially unavailable in that rare event when a network becomes partitioned.

Experience with eventual consistency has taught us that this model places a greater burden on the programmer. With an eventual consistency model, it is now up to the programmer to reconcile the possibility that some data that is being accessed may be stale while other data might be current. Bigtable, for example, was difficult to use in applications that required strong consistency. Custom code had to be written to coordinate locks for any changes that spanned multiple rows.

With Spanner, Google built a transactional (ACID) database that spans many systems and widely distributed data centers. Spanner provides the user with a large-scale database that comprises multiple tables, with each table containing multiple rows and columns. The model is semi-relational: each table is restricted to having a single primary key. Unlike Bigtable, transactions can span multiple rows and across multiple tables. Also unlike Bigtable, which provided eventually consistent replication, Spanner offers synchronous replication, ACID semantics, and lock-free reads.

Data storage

Spanner gives the user the the ability to manage multiple tables, each with rows and columns. Internally, the data is stored as a large keyspace that is sharded across multiple servers. Like Bigtable, each shard stores a group of consecutive of rows, called a tablet. This sharding is invisible to applications.

Tablets are replicated synchronously using Paxos. One of the replicas is elected to be a leader and runs a transaction manager. Any transactions that span multiple shards use the two-phase commit protocol. Replication is performed within a transaction and all replicas remain locked until replication is complete, ensuring a consistent view of the database.

Applications can specify geographic data placement and the amount of replication:

  • Proximity of data to users (impacts read latency)

  • Proximity of replicas to each other (impacts write latency)

  • Amount of replication (impacts availability and read performance)

Spanner was designed for a global scale and a database will span multiple data centers around the globe. In a data center, spanservers store tablets. A zonemaster periodically rebalances tablets across servers to balance load. The zonemaster does not participate in transactions.

Transactions

Spanner provides transactions with ACID semantics. Transactions are serialized to satisfy the “I” (isolated) property and to create the illusion that one transaction happens after another. Strict two-phase locking is used to accomplish this.

Transactions can be distributed among multiple systems (which may span multiple datacenters). Each transaction is assigned a globally-meaningful timestamp and these timestamps reflect the serialization order of the transactions. Once a transaction has acquired all the locks it needs, it does its work and then picks a commit timestamp. Two-phase locking can reduce overall perfomance because other transactions may need to wait for locks to be released before they can access resources. Spanner uses separate read and write locks to achieve greater concurrency but even these can often block another transaction. A read lock will cause any writers to block and a write lock will cause any other writers or readers to block.

Spanner also offers lock-free reads. For this, spanner implements a form of multiversion concurrency control by storing multiple timestamped versions of data. A transaction can read data from a snapshot, an earlier point in time, without getting a lock. This is particularly useful for long-running transactions that read that many rows of the database. Any other ongoing transactions that modify that data will not affect the data that the long-running transaction reads since those will be later versions.

External consistency

Serialization (isolation, the I in ACID) simply requires that transactions behave as if they executed in some serial order. Spanner implements a stronger consistency model, external consistency, which means that the order of transactions reflects their true time order. Specifically, if a transaction T1 commits before a transaction T2 starts, based on physical (also called “wall clock”) time, then the serial ordering of commits should reflect that and T1 must get a smaller timestamp than T2. Spanner does this by using physical timestamps.

It is not possible to get a completely accurate real-world timestamp. Even if we had an accurate time source, there would be synchronization delays that would add a level of uncertainty. The problem is compounded by having the database span data centers around the globe. Spanner tries to minimize the uncertainty in getting a timestamp and make it explicit.

Implementing external consistency

Every datacenter where spanner is used is equipped with one or more highly accurate time servers, called time masters (a combination of a GPS receivers and atomic clocks is used). The time master ensures that the uncertainty of a timestamp can be strongly bounded regardless of where a server resides. Each spanserver has access to a TrueTime API that provides a narrow time interval that contains the current time, ranging from TT.now().earliest up to TT.now().latest. The earliest time is guaranteed to be in the past and the latest is guaranteed to be a timestamp in the future when the function was called. These values would typically be only milliseconds apart. Spanner can now use these timestamps to ensure that transactions satisfy the demands of external consistency.

The key to providing external consistency is to ensure that any data committed by the transaction will not be visible until after the transaction’s timestamp. This means that even other systems that have a different clock should still see a wall clock time that is later than the transaction’s timestamp. To do this, spanner waits out any uncertainty. Before a transaction commits, it acquires a commit timestamp:

t = TT.now().latest

This is the latest possible value of the true time across all servers in the system. It then makes sure that no locks will be released until that time is definitely in the past. This means waiting until the earliest possible current time on any system is greater than the transaction timestamp:

TT.now().earliest > t

This wait is called a commit wait and ensures that any newer transaction that grabs any of the same resources will definitely get a later timestamp.

Note that there is no issue if multiple transactions have the same timestamp. Timestamps are strictly used for versioning to ensure that we provide a consistent view of the database while supporting lock-free reads. Consider the case of a transaction that needs to read hundreds of millions of rows of data. Many transactions may modify the database since you start your work but the view of the database will be consistent since all data read will be no later than a specified timestamp.

Summary

By making timestamp uncertainty explicit, Spanner could implement a commit wait operation that can wait out the inaccuracy of a timestamp and provide external consistency along with full ACID semantics. Coupled with storing multiple versions of data, Spanner provides lock-free reads.

Spanner’s design was a conscious decision to not sacrifice the strong ACID semantics of a database. Programming without ACID requires a lot of extra thinking about the side effects of eventual consistency and careful programming if one wants to avoid it. Strict two-phase locking and a two-phase commit protocol are implemented within Spanner so programmers do not have to reinvent it.

Bulk Synchronous Parallel & Pregel

Goal: Create a software framework for fault tolerant, deadlock-free parallel processing. Then, adapt that to create an software framework makes it easy to operate on graphs on a massive scale.

Bulk Synchronous Parallel (BSP)

BSP
BSP

Bulk Synchronous Parallel (BSP) is a programming model and computation framework for parallel computing. Computation is divided into a sequence of supersteps. In each superstep, a collection of processes executes concurrently and creates messages that are sent to other processes. The superstep ends when all the computation in the superstep is complete and all messages are sent. A barrier synchronization at the end of the superstep ensures that all messages have been transmitted (but not yet delivered to the processes). The next superstep begins with the delivery of all those messages to the processes, who then execute their superstep and send messages that will be delivered at the start of the next superstep. This process continues until all processors vote to halt.

Note that no process is permitted to send and receive messages from with another process during a superstep. Any sent messages will be delivered only at the start of the next superstep. This restriction ensures deadlock-free execution.

A popular implementation of the BSP framework is Apache Hama.

Pregel

What’s a graph?

A graph is a set of vertices connected by edges. Edges may be directed from one vertex to another or bidirectional. In computing, a vertex is represented by an object and a directed edge is a link to another object.

Graphs are all around us

They represent computer networks, social groups, roads, disease outbreaks, phone call connections, Twitter followers, Facebook friends, web page links, etc. Some of these graphs have an enormous scale. The world wide web has billions of pages and Facebook has around a billion users.

What’s wrong with MapReduce?

Nothing is wrong with MapReduce. It’s great for many problems. However, many graph traversal problems, when written for MapReduce, end up having to take multiple iterations of MapReduce, with the output of one iteration feeding the input of another. This is not only slow, but it is also inefficient as data has to be written to files at the end of every map and reduce operation and moved between machines. The entire state of the graph needs to be transmitted in each stage, which requires a lot more communication overhead than Pregel, where the vertices and edges remain on the machine that performs the computation. Moreover, MapReduce is not the most direct way of thinking about and logically solving many problems.

Introducing Pregel

Pregel is a software framework created by Google to make it easy to work on huge graphs (e.g., ones with billions of vertices) that span many machines (e.g., tens of thousands). Like MapReduce, the framework relieves the programmer from having to deal with assigning tasks to processors, monitoring jobs, handling failures, and managing communications.

The Pregel framework allows you to write “vertex-centric” code. That is, the same user code, a compute() function, is run concurrently on each vertex of the graph. Each instance of this function keeps track of information, can iterate over outgoing edges (each of which has a value), and can send messages to the vertices connected to those edges or to any other vertices it may know about (e.g., having received a vertex ID via a message).

When a function does not have any more work to do, it votes to halt. This puts the corresponding vertex in an inactive state. When all vertices are in an inactive state, the framework terminates. However, if a vertex’s compute function sends a message to an inactive vertex, that vertex will be made active at the next superstep.

Pregel is an adaptation of the Bulk Synchronous Parallel (BSP) model that is specifically designed with graph processing in mind. Like BSP, Pregel executes in supersteps. Each superstep consists of computation followed by message sending. All messages are synchronized with a barrier, which marks the end of the superstep. At the next superstep, the messages from the previous superstep are delivered to, and available at, the compute function. The downside of using BSP directly for graph processing is that significant additional work would be needed to define, propagate, and maintain the topology (connections) of a graph and map vertices to compute nodes.

Advanced APIs

Since there is overhead in sending messages to vertices, particularly when they are on other machines, Pregel supports the optional use of combiners. A combiner is a user-defined function that is applied to a bunch of messages all targeted to the same vertex. The Combine method processes the values and creates a single input to that vertex. For example, if the goal is to take the sum of a set of values or to choose data from the highest-numbered vertex, the combiner can merge several messages into one before they are transmitted over the network. This does not alter the compute function since it still has to be prepared to receive multiple messages from multiple sources.

To manage global state, such as overall statistics, total edges of a graph, global flags, or minium or maximum values of a vertex, Pregel allows a user to define an aggregator. An aggregator comines received values into one value and makes that value available to all vertices at the next superstep.

Pregel design

Pregel uses a master-slave architecture. Many copies of the program are started on a cluster of machines. One copy becomes the master and is responsible for coordinating activity rather than processing the graph. Others are workers. The master registers itself with a name server (Chubby). Each worker process contacts the name server to find the master. The master assigns one or more sets of vertices to each worker. By default, the assignment is based on the hash of the vertex ID, so neighboring vertices will not necessarily be assigned to the same worker.

The master assigns chunks of input, which is usually resident in GFS or Bigtable, to each worker. Each input item is a set of vertices and its edges. Workers read this input and create either local messages for the vertices they manage or, if the input record is for a remote vertex, send the message to the worker that owns that vertex. All vertices are initially marked as active. The master then asks each worker to perform a superstep. The worker will run concurrent threads, each of which executes a compute function on its vertex. The compute function consumes input, runs its algorithm, and generates zero or more messages to other vertices. Workers send these messages asynchronously but they will not be delivered to their target functions until the next superstep starts. When a worker is done with its processing for one superstep, it informs the master. It also tells the master how many of the vertices it manages will be in the active state in the next superstep. The master waits for all workers to complete before starting the next superstep. This cycle continues until there are no more vertices in the active state.

Fault tolerance

Pregel uses checkpointing for fault tolerance. The master tells each worker to checkpoint itself every N supersteps. Checkpointing means that each vertex has to save its entire state in stable storage. This state will include vertex values, edge values, incoming messages, and possibly any other state that the algorithm needs to track. A master will periodically send ping messages to workers to see if they are alive. If a master does not hear from a worker within a certain window of time, it assumes that the worker has failed. In this case, the master reassigns the set of vertices to other workers and tells all workers to restart their computation from the superstep at the most recent checkpoint.

A popular implementation of Pregel is Apache Giraph, which has been used by Facebook to analyze its social graph.

Spark

Goal: Create a general-purpose high-performance framework for big-data processing.

MapReduce was a powerful framework for parallel computation but it forced a rigid model onto the programmer. Quite often, a computation had to be implemented as a sequence of MapReduce operations. Every map and every reduce operation has to run to completion and write its results to a file before the next sequence of operations can start. Spark creates a highly-flexible framework that lets programmers define their job as a sequence of tranformations and actions on data.

The main client application is called the driver program (or just driver) and is linked with a Spark library. When run, the library creates a SparkContext, which connects to a cluster manager that allocates available worker nodes to run the job. Each worker node runs an executor that contacts the driver for tasks to run. Each executor runs as a process inside a Java Virtual Machine (JVM).

The driver goes through the program, which consists of a sequence of transformations and actions as well as the source data. It creates a directed graph of tasks, identifying how data flows from one transformation to another and ultimately to an action. These tasks are then sent to the executors as jar files for execution. Tasks operate on data and each task is either a transformation or action.

RDDs

Data in Spark is a collection of Resilient Distributed Datasets (RDDs). RDDs can be created in three ways:

  1. They can be data that is a file or set of files in HDFS, key-value data from an Amazon S3 server (similar to Dynamo), HBase data (Hadoop’s version of Bigtable), or the result of a SQL or Cassandra database query.

  2. They can be streaming data obtained using the Spark Streaming connector. As an example, this could be a stream of data from a set of remote sensors.

  3. An RDD can be the output of a transformation function. This is the way one transformation creates data that another transformation will use. To optimize performance, the RDD created by a transfomation is cached in memory (overflowing to disk if necessary).

RDDs have several key properties:

  • They are immutable. A task can read an RDD and create a new one but cannot modify an existing RDD.

  • They are typed (structured). An RDD has a structure that a task can parse.

  • They are partitioned. Parts of an RDD may be sent to different servers. The default partitioning function is to send a row of data to the server corresponding to hash(key) mod servercount.

  • They are ordered. An RDD contains elements that can be sorted. For example, a list of key-value pairs can be sorted by key. This property is optional.

Transformations and actions

Spark allows two types of operations on RDDs: transformations and actions. Transformations read an RDD and create a new RDD. Example transformations are map, filter, groupByKey, and reduceByKey. Actions are finalizing operations that evaluate an RDD and return a value to the driver program.

Example actions are reduce, grab samples, and write to file.

Lazy evaluation

Spark uses a technique called lazy evaluation for applying transformations. Spark postpones the execution of transformations (operations that modify data) until an action (an operation that triggers a computation to produce results) is called. Instead of executing transformations immediately, Spark constructs a Directed Acyclic Graph (DAG) that identifies the sequence of transformations required for each action. This allows Spark to optimize the processing pipeline, such as combining multiple transformations into a single operation or avoiding running unnecessary transformations, thus reducing computational overhead and improving overall performance.

When an action is requested on an RDD object, the necessary transformations are computed and the result is returned to the driver. Transformations are computed only when some other task needs the RDD that they generate.

Lazy evaluation leads to reduced network and I/O operations, as data is processed only when it is needed.

Fault tolerance

Spark’s lazy evaluation approach aids in efficient fault recovery. In case of a node failure, Spark can recompute only the lost parts of the data using the sequence of transformations, rather than needing to duplicate entire datasets across nodes. This method not only conserves resources but also enhances the robustness and reliability of Spark in handling complex data processing tasks.

For each RDD, the driver tracks, via the DAG, the sequence of transformations that was used to create it. That means an RDD knows which RDDs it is dependent on and which tasks were needed to create each one. If any RDD is lost (e.g., a task that creates one died), the driver can ask the task that generated it to recreate it. The driver maintains the entire dependency graph, so this recreation may end up being a chain of transformation tasks going back to the original data, if necessary.

Kafka publish-subscribe messaging

Goal: Create a high-performance, highly-scalable, reliable message delivery framework.

Kafka messaging is a distributed, fault-tolerant, and high-throughput messaging system that was developed by LinkedIn. It is an open-source project and is widely used in various industries to process and handle large volumes of data in real-time. Kafka is designed to handle data in motion, allowing it to be used in stream processing applications.

Kafka’s architecture is based on the publish-subscribe model. Producers publish messages to Kafka topics, and consumers subscribe to those topics to receive the messages. Kafka brokers act as intermediaries between producers and consumers, storing and distributing messages across the cluster. Brokers can run on a single node or across multiple nodes in a cluster, providing high availability and fault tolerance.

Messages are associated with a category, called a topic. A topic is a way to separate messages based on their content. Each message in Kafka is written to a specific topic, and each topic consists of one or more partitions.

A partition is an ordered, immutable sequence of messages that can be stored on a different Kafka broker in a Kafka cluster (a collection of Kafka servers). The number of partitions for a topic is determined when the topic is created, and can be increased or decreased later.

Partitions allow Kafka to scale horizontally by distributing the data across multiple brokers. Each partition is also replicated across multiple brokers, providing fault tolerance and ensuring that the data is available even if a broker fails. Kafka uses a partitioning scheme to determine which partition a message belongs to based on the key or a custom partitioner. Partitions can also be used to parallelize processing, allowing multiple consumers to process messages from the same topic simultaneously.

Kafka’s replication feature provides fault tolerance by replicating each partition across multiple brokers. When a broker fails, its partitions can be automatically reassigned to other brokers in the cluster, ensuring that the data remains available and that the cluster continues to function normally.

Kafka can be used in queuing or publish-subscribe models. In the queuing model, messages are sent to a single consumer in a first-in-first-out (FIFO) order. In this model, consumers in a consumer group compete for messages in a topic, and each message is only consumed by one consumer. In the publish-subscribe model, messages are available to all subscribed consumers (as long as they are not in the same consumer group). In this model, each message is consumed by all subscribed consumers.

Kafka’s publish-subscribe model allows for flexibility in processing data. Consumers can subscribe to multiple topics, allowing them to process related data from multiple sources. Additionally, Kafka’s ability to partition data allows for parallel processing, allowing multiple consumers to process data from the same topic simultaneously.

Kafka’s queuing model is useful when you need to ensure that each message is only consumed by one consumer. For example, a queue might be used to process orders in an e-commerce system, ensuring that each order is only processed once. Kafka’s queuing model can also be used for tasks like load balancing, where tasks are distributed among multiple consumers.

Content delivery networks

Goal: Provide a highly-scalable infrastructure for caching and serving content from multiple sources.

A content delivery network (CDN) is a set of servers that are usually placed at various points at the edges of the Internet, at various ISPs, to cache content and distribute it to users.

There are several traditional approaches to making a site more scalable and available:

Proxy servers
Organizations can pass web requests through caching proxies. This can help a small set of users but you’re out of luck if you are not in that organization.
Clustering within a datacenter with a load balancer*
Multiple machines within a datacenter can be load-balanced. However, they all fail if the datacenter loses power or internet connectivity.
Multihoming
Machines can be connected with links to multiple networks served by multiple ISPs to guard against ISP failure. However, protocols that drive dynamic routing of IP packets (BGP) are often not quick enough to find new routes, resulting in service disruption.
Mirroring at multiple sites
The data can be served at multiple sites, with each machine’s content synchronized with the others. However, synchronization can be difficult.

All these solutions require additional capital costs. You’re building the capability to handle excess capacity and improved availability even if the traffic never comes and the faults never happen.

By serving content that is replicated on a collection of servers, traffic from the main (master) server is reduced. Because some of the caching servers are likely to be closer to the requesting users, network latency is reduced. Because there are multiple servers, traffic is distributed among them. Because all of the servers are unlikely to be down at the same time, availability is increased. Hence, a CDN can provide highly increased performance, scalability, and availability for content.

Akamai

We will focus on one CDN: Akamai. The company evolved from MIT research that was focused on “inventing a better way to deliver Internet content.” A key issue was the flash crowd problem: what if your web site becomes really popular all of a sudden? Chances are, your servers and/or ISP will be saturated and a vast number of people will not be able to access your content. This became known as the slashdot effect.

In late 2022, Akamai ran on 345,000 servers in over 1,300 networks around the world. Around 2015, Akamai served between 15 and 30 percent of all web traffic. Even as several other large-scale content delivery networks emerged and companies such as Google, Amazon, and Apple built their own distribution infrastructure, Akamai still served a peak rate of 250 Tbps of traffic in 2022.

Akamai tries to serve clients from nearest, available servers that are likely to have requested content. According to the company’s statistics, 85% percent of the world’s Internet users are within a single network hop of an Akamai CDN server.

To access a web site, the user’s computer first looks up a domain name via DNS. A mapping system locates the caching server that can serve the content. Akamai deploys custom dynamic DNS servers. Customers who use Akamai’s services define their domains as aliases (DNS CNAMEs) that point to an Akamai domain name so that the names will be resolved by these servers. An Akamai DNS server uses the requestor’s address to find the nearest edge server that is likely to hold the cached content for the requested site. For example, the domain www.nbc.com is:

www.nbc.com.  164  IN  CNAME www.nbc.com-v2.edgesuite.net.
www.nbc.com-v2.edgesuite.net.  14177  IN  CNAME  a414.dscq.akamai.net.

or

www.tiktok.com.  271  IN  CNAME www.tiktok.com.edgesuite.net.
www.tiktok.com.edgesuite.net. 21264 IN  CNAME  a2047.api10.akamai.net.

When an Akamai DNS server gets a request to resolve a host name, it chooses the IP address to return based on:

  • domain name being requested

  • server health

  • server load

  • user location

  • network status

  • load balancing

Akamai can also perform load shedding on specific content servers; if servers get too loaded, the DNS server will not respond with those addresses.

Now the user can set up a connection to an Akamai caching server in a cluster of servers in a region at the network edge close to the user and send requests for content. If the content isn’t there, the server broadcasts a query to the other edge servers in that region. If the content isn’t cached at any servers within the region, then the server contacts a parent server. The parent server is also a caching server operated by Akamai but not located at the edge. The goal is to avoid accessing the origin server (the server at the company that hosts the content) unless necessary. If the content isn’t at the parent, the parent queries the other parent servers before giving up and forwarding the request to the origin server via its transport system.

To send data efficiently, Akamai manages an overlay network: the collection of its thousands of servers and statistics about their availability and connectivity. Akamai generates its own map of overall IP network topology based on BGP (Border Gateway Protocol) and traceroute data from various points on the network. It factors in latency and quality of service into its decisions and may choose to route requests to hop through its own servers via always-on TCP links rather than rely on Internet routing, which usually prioritizes the number of hops and cost over speed and quality.

Content servers report their load to a monitoring application. The monitoring application publishes load reports to a local Akamai DNS server, which then determines which IP addresses to return when resolving names.

CDN benefits

A CDN, serving as a caching overlay, provides five distinct benefits:

  1. Caching: static content can be served from caches, thus reducing the load on origin servers.

  2. Routing: by measuring latency, packet loss, and bandwidth throughout its collection of servers, The CDN can find the best route to an origin server, even if that requires forwarding the request through several of its servers instead of relying on IP routers to make the decision.

  3. Security: Because all requests go to the CDN, which can handle a high capacity of requests, it absorbs any Distributed Denial-of-Service attacks (DDoS) rather than overwhelming the origin server. Moreover, any penetration attacks target the machines in the CDN rather than the origin servers. Companies (or a group within a company) that runs the CDN service is singularly focused on the IT infrastructure of the service and is likely to have the expertise to ensure that systems are properly maintained, updated, and are running the best security software. These aspects may be neglected by companies that need to focus on other services.

  4. Analytics: CDNs monitor virtually every aspect of their operation, providing their customers with detailed reports on the quality of service, network latency, and information on where the clients are located.

  5. Cost: CDNs cost money, which is a disadvantage. However, CDNs are an elastic service, which is one that can adapt to changing workloads by using more or fewer computing and storage resources. CDNs provide the ability to scale instantly to surges of demand practically anywhere in the world. They absorb all the traffic so you don’t have to scale up.

Video

Video is the most bandwidth-intensiver service on the Internet and has been a huge source of growth for CDNs. As an example, Netflix operates a global CDN called OpenConnect, which contains up to 80 percent of its entire media catalog. Stored (e.g., on-demand) video is, in some ways, just like any other content that can be distributed and cached. The difference is that, for use cases that require video streaming rather than downloading (i.e., video on demand services), the video stream may be reprocessed, or transcoded, to lower bitrates to support smaller devices or lower-bandwidth connections.

Live video cannot be cached. We have seen the benefits of the fanout that CDNs offer by having many servers at the edge; a video stream can be replicated onto multiple edge servers so it doesn’t have to be sourced from the origin. CDNs can also help with progressive downloads, which is there the user can start watching content while it is still being downloaded.

Today, HTTP Live Streaming, or HLS, is the most popular protocol for streaming video. It allows the use of standard HTTP to deliver content and uses a technique called Adaptive Bitrate Coding (ABR) to deliver video. ABR supports playback on various devices in different formats & bitrates.

The CDN takes on the responsibility of taking the video stream and converting it into to a sequence of chunks. Each chunk represents between 2 and 10 seconds of video. The CDN then encodes each chunk at various bitrates, with can affect the quality & resolution of that chunk of video. For content delivery, the HLS protocol uses feedback from the user’s client to select the optimal chunk. It revises this constantly throughout playback since network conditions can change.

Peer-to-peer content delivery

A peer-to-peer model is an application architecture that removes the need for dedicated servers and enables each host to participate in providing a service. Because all systems can both access as well as provide the service, they are called peers. In this discussion, we will focus on one application: peer-to-peer file distribution via BitTorrent

BitTorrent

The design of BitTorrent was motivated by the flash crowd problem. How do you design a file sharing service that will scale as a huge number of users want to download a specific file? Systems such as Napster, Gnutella, and Kazaa all serve their content from the peer that hosts it. If a large number of users try to download a popular file, all of them will have to share the bandwidth that is available to the peer hosting that content.

The idea behind BitTorrent is to turn each peer that is downloading content into a server of that content. BitTorrent only focuses on the download problem and does not handle the mechanism for locating the content.

To offer content, the content owner creates a .torrent file. This file contains metadata, or information, about the file, such as the name, size of the file, and a SHA-256 hash of the file to allow a downloader to validate its integrity. A peer downloads a file in fixed-size chunks called pieces, and the torrent file also contains the piece size and a list of hashes of each piece of the file. This allows a downloading peer to validate that every downloaded piece has not been modified. Finally, the .torrent file contains the address of a tracker.

The tracker is a server running a process that manages downloads for a set of .torrent files. When a downloading peer opens a .torrent file, it contacts a tracker that is specified in that file. The tracker is responsible for keeping track of which peers have which have all or some pieces of that file. There could be many trackers, each responsible for different torrents.

A seeder is a peer that has the entire file available for download by other peers. Seeders register themselves with trackers so that trackers can direct downloading peers to them. An initial seeder is the initial version of the file.

A leecher is a peer that is downloading files. To start the download, the leecher must have a .torrent file. That identifies the tracker for the contents. It contacts the tracker, which keeps track of the seed nodes that file as well as other leechers, some of whom may have already downloaded some blocks of the file. A leecher contacts seeders and other leechers to download random blocks of the file. As it gets these blocks, it can make them available to other leechers. This allows download bandwidth to scale: every downloader increases overall download capacity. Once a file is fully downloaded, the leecher has the option of turning itself into a seeder and continue to offer serving the file.

Security: Cryptographic Communication

Goal: Use symmetric cryptography, public key cryptography, random numbers, and hash functions to enable exchange encryption keys, provide secure communication, and ensure message.

Security encompasses many things, including authenticating users, hiding data contents while they are at rest (e.g., stored in files) or in motion (moving over a network), destruction of data, masquerading as another server or user, providing false data (e.g., the wrong IP address for a DNS query), and physical premises security. We will focus only on a few topics here.

Security in distributed systems introduces two specific concerns that centralized systems do not have. The first is the use of a network where contents may be seen by other, possibly malicious, parties. The second is the use of servers. Because clients interact with services (applications) running on a server, the application rather than the operating system is responsible for authenticating the client and controlling access to services. Moreover, physical access to the system and the security controls configured for the operating system may be unknown to the client.

Cryptography on its own is not a panacea. Its use will not ensure that a system is secure. However, it is an important component in building secure distributed systems. It is used to implement mechanisms for:

  • Confidentiality: ensuring that others cannot read the contents of a message (or file).
  • Authentication: proving the origin of a message (or the entity sending that message).
  • Integrity: validating that the message has not been modified, either maliciously or accidentally.
  • Nonrepudiation: ensuring that senders cannot falsely deny that they authored a message.

Confidentiality

Confidentiality is the classic application of cryptography: obscuring the contents of a message so that it looks like a random bunch of bits that can only be decoded by someone with the proper knowledge. To do this, we start with our plaintext message, P (the term plaintext refers to unencrypted content; it does not need to be text). By applying an encryption algorithm, or cipher, to it, we convert the plaintext to ciphertext, C=E(P). We can decrypt this message to recover the plaintext: P=D(C).

There are two widely-used classes of ciphers: symmetric and public key. A symmetric cipher uses the same to decrypt a message as was used to encrypt it: C=EK(P) and P=DK(C). A public key cipher uses two related keys, known as a key pair. Knowing one key does not enable you to derive the other key of the pair. One of these keys is called the private key because it is never shared. The other key is called the public key because it is freely shared. Examples of popular symmetric encryption algorithms include AES (Advanced Encryption Standard), DES (Data Encryption Standard), and Blowfish.

A message encrypted with one of the keys can only be decrypted with the other key of the pair. For a pair of keys (a, A), where a is the private key and A is the public key, if C=Ea(P) then P=DA(C). A message encrypted with your private key can be decrypted only with your public key. Anyone can do the decryption but you are the only one who could perform the encryption. This becomes the basis for authentication. Conversely, if C=EA(P) then P=Da(C). A message encrypted with your public key can be decrypted only with your private key. Anyone can now perform the encryption but you are the only one can decrypt the message. This becomes the basis for confidentiality and secure communication. Examples of popular public key encryption algorithms include AES (Advanced Encryption Standard), DES (Data Encryption Standard), and Blowfish.

Key distribution

Symmetric encryption algorithms are the dominant means of encrypting files and large streams of data. They are much faster than public key algorithms and key generation is far easier: a key is just a random set of bits rather than two huge numbers with specific properties. The biggest problem with symmetric cryptography is key distribution: ensuring that both sides get the key in a way that no eavesdropper can observe it. For instance, you cannot just send it as a network message containing the key. There are several techniques to handle the problem of key distribution.

  1. Manually, by using pre-shared keys (PSK). This requires setting up the keys ahead of time, such as registering a password or PIN among all the parties that will communicate.

  2. Using a trusted third party. A trusted third party is a trusted server that knows everyone’s keys. If two systems, let’s call them Alice and Bob, want to communicate, the third party will help both of them get the key they need. To communicate, they will use a session key. This is the name for a throw-away key that will be used for one communication session. One way of sharing it is for Alice to create it (it’s just a random number), encrypt it with her secret key, and send it to the trusted third party, which we will name Trent. Since Trent has all the keys, he can decrypt the message with Alice’s secret key to extract the session key. He can then encrypt it for Bob using Bob’s secret key and send it to Bob.

    Alternatively, Alice can ask Trent to create a session key that both Alice and Bob will share. Trent will generate the key, encrypt it for Alice, and send it to Alice. He will take the same key, encrypt it for Bob, and send it to Bob. Now they both share a key.

  3. Using the Diffie-Hellman key exchange algorithm. Diiffie-Hellman is a key distribution algorithm, not an encryption algorithm. It uses a one-way function, which is a mathematical function where there is no known way of computing the inverse function to do this. Unfortunately, the algorithm uses the terms public key and private key even though it is not an encryption algorithm; the keys are just numbers, one of which is kept private. Two parties can apply the one-way function to generate a common key that is derived from one public key and one private key. Alice generates a common key that is f(Alice_private_key, Bob_public_key) and Bob generates a common key that is f(Bob_private_key, Alice_public_key). The magic of the mathematics in Diffie-Hellman is that both common keys will be identical and can be used as a symmetric encryption key.

  4. Use public key cryptography. Life becomes easy. If Alice wants to send a message to Bob, she simply encrypts it with Bob’s public key. Only Bob will be able to decrypt it using his private key. If Bob wants to send a message to Alice, he will encrypt it with her public key. Only she will be able to decrypt it using her private key. We have two concerns with this. First, we need to ensure that everyone has trusted copies of public keys (Alice needs to be sure she has Bob’s public key rather than that of an imposter saying he’s Bob). Second, we have to be mindful of the fact that public key cryptography and key generation are both far slower than symmetric cryptography.

To get the best of both worlds, we often rely on hybrid cryptography: we use public key cryptography (often Diffie-Hellman key exchange) only to encrypt a symmetric session key (a small amount of data). From then on, we use symmetric cryptography and encrypt data with that session key.

Message Integrity & Non-repudiation

Without cryptography, we often resort to various error-detecting functions to detect whether there is any data corruption. For example, ethernet uses CRC (cyclic redundancy checksums) and IP headers use 16-bit checksums. A cryptographic hash function is similar in purpose: a function that takes an arbitrary amount of data and generates a fixed set of bits. The hash is much larger than typical checksums – typically 256 or 512 bits – and hence is highly unlikely to result in collisions, where multiple messages result in the same hash value.

A cryptographic hash of a message, H=hash(M) should have the following properties:

  • It produces a fixed-length output regardless of how long or short the input message is.

  • It is deterministic; it produces the same results when given the same message as input.

  • It is a one-way function. Given a hash value H, it should be difficult to figure out what the input is. (N.B.: in cryptography, the term “difficult” means that there are no known shortcuts other that trying every possible input).

  • It is collision resistant. Given a hash value H, it should be difficult to find another message M’, such that H=hash(M’). Note, however, that because hash values are a finite number of bits in length, multiple messages will hash to the same value (an infinite number, actually). We expect the probability of two messages yielding the same hash value to be so infinitesimally small as to be effectively zero.

  • Its output should not give any information about any of the input. This is a property of diffusion that applies to encryption algorithms as well. For example, if you notice that anytime the first byte of a message is 0, bit 12 of the hash becomes a 1, the property of diffusion has been violated. Even the smallest change to the message should, on average, alter half the bits of the resulting hash in a seemingly random manner.

  • It is efficient. We want to be able to generate these easily to validate file contents and messages.

Just augmenting a message with its hash is not sufficient to ensure message integrity. It will allow us to detect that a message was accidentally modified but an attacker who modifies a message can easily recompute a hash function. To prevent the hash from being modified, we can encrypt it.

A message authentication code (MAC) is a hash of a message and a symmetric key. If Alice and Bob share a key, K, Alice can send Bob a message along with a hash concatenated with K. Since Bob also has the key K, he can hash the received message concatenated with K and compare it with the MAC he received from Alice. If an intruder modifies the message, she will not be able to hash it since she will not know K.

A digital signature is similar to a MAC but the hash is encrypted with the sender’s private key. Note that digital signature algorithms are optimized to combine the hashing and encryption but we will describe them as two steps. Here, Alice will send Bob a message along with a hash that is encrypted with her private key. Bob can validate the message and signature by decrypting the signature using Alice’s public key and comparing the result with a hash of the message. A digital signature provides non-repudiation: Alice cannot claim that she did not create the message since only Alice knows her private key. Nobody but Alice could have created the signature. Digital signatures generally use digital signature algorithms that effectively combine the hashing and encryption operations efficiently but can think of signing as a two step process.

We frequently combine covert messaging together with message integrity to ensure that a recipient can detect if a message has been modified. One way of doing this is:

  1. Alice creates a session key, S (a random number).
  2. She encrypts the session key with Bob’s public key, B: EB(S) and sends it to Bob.
  3. Bob decrypts the session key using his private key, b: S = Db(EB(S)). He now knows the session key.
  4. Alice encrypts the message with the session key, S, using a symmetric algorithm: ES(M).
  5. Alice creates a signature by encrypting the hash of the message with her private key, a: Ea(H(M)).
  6. She sends both the encrypted message and signature to Bob.
  7. Bob decrypts the message using the session key: M = DS(ES(M)).
  8. Bob decrypts the signature using Alice’s public key A: DA(Ea(H(M)))
  9. If the decrypted hash matches the hash of the message, H(M), Bob is convinced that the message has not been modified since Alice sent it.

Authentication and Authorization

Goal: Create protocols for authenticating users, establishing secure communication sessions, authorizing services, and passing identities.

Authentication is the process of binding an identity to the user. Note the distinction between authentication and identification. Identification is simply the process of asking you to identify yourself (for example, ask for a login name). Authentication is the process of proving that the identification is correct.

Authorization is when, given an identity, a system decides what access the user is permitted. Authentication is responsible for access control.

Authentication factors

The three factors of authentication are: something you have (such as a key or a card), something you know (such as a password or PIN), and something you are (biometrics).

Each of these factors has pitfalls and can be stolen. Someone can take your access card or see your password. Biometrics are more insidious: they are not precise and if someone can recreate your stolen biometric then you can never use it again.

Combining these factors into a multi-factor authentication scheme can increase security against the chance that any one of the factors is compromised. Multi-factor authentication must use two or more of these factors. Using two passwords, for example, is not sufficient.

Password Authentication Protocol (PAP)

The best-known authentication method is the use of reusable passwords. This is known as the password authentication protocol, or PAP. The system asks you to identify yourself (your login name) and then enter a password. If the password matches that which is associated with the login name on the system then you’re authenticated.

One problem with the protocol is that if someone gets hold of the password file on the system, then they have all the passwords. The common way to thwart this is to store hashes of passwords instead of the passwords themselves. This takes advantage of the one-way property of the hash. To authenticate a user, check if hash(password) = stored_hashed_password. If an intruder gets hold of the password file, they’re still stuck since they won’t be able to reconstruct the original password from the hash. They’ll have to resort to an exhaustive search or a dictionary attack to search for a password that hashes to the value in the file. An exhaustive search may take a prohibitively long time. A dictionary attack is an optimization of the search that does not test every permutation of characters but iterates over common passwords, dictionary words, and common letter-number substitutions.

The other problem with reusable passwords is that if a network is insecure, an eavesdropper may sniff the password from the network. A potential intruder may also simply observe the user typing a password. To thwart this, we can turn to one-time passwords. If someone sees you type your credentials or read them from the network stream, it won’t matter because that information will be useless for future logins.

CHAP Authentication

The Challenge-Handshake Authentication Protocol (CHAP) is an authentication protocol that allows a server to authenticate a user without sending a password over the network.

Both the client and server share a secret (such as a password). A server creates a random bunch of bits (called a nonce) and sends it to the client (user) that wants to authenticate. This is the challenge.

The client identifies itself and sends a response that is the hash of the shared secret combined with the challenge. The server has the same data and can generate its own hash of the same challenge and secret. If the hash matches the one received from the client, the server is convinced that the client knows the shared secret and is therefore legitimate.

An intruder who sees this hash cannot extract the original data. An intruder who sees the challenge cannot create a suitable hashed response without knowing the secret. Note that this technique requires passwords to be accessible at the

server and the security rests on the password file remaining secure.

Time-based: TOTP

With the Time-based One Time Password (TOTP) protocol, both sides share a secret key. To authenticate, a user runs the TOTP function to create a one-time password. The TOTP function is a password created as a hash of a shared secret key and the time of day. The service, who also knows the secret key and time, can generate the same hash and thus validate the value presented by the user.

TOTP is often used as a second factor (proof that you have some device with the secret configured in it) in addition to a password. The protocol is widely supported by companies such as Amazon, Dropbox, Wordpress, Microsoft, and Google.

Public key authentication

Public key authentication relies on the use of nonces. A nonce is simply a randomly-generated bunch of bits and is sent to the other party as a challenge for them to prove that they are capable of encrypting something with a specific key that they possess. The use of a nonce is central to public key authentication.

If Alice wants to authenticate Bob, she needs to have Bob prove that he possesses his private key (private keys are never shared). To do this, Alice generates a nonce (a random bunch of bits) and sends it to Bob, asking him to encrypt it with his private key. If she can decrypt Bob’s response using Bob’s public key and sees the same nonce, she will be convinced that she is talking to Bob because nobody else will have Bob’s private key. Mutual authentication requires that each party authenticate itself to the other: Bob will also have to generate a nonce and ask Alice to encrypt it with her private key.

Passkey authentication is an implementation of public key authentication that is designed to eliminate the use of passwords. A user first logs onto a service via whatever legacy login protocol the service supports: typically a username-password or the additional use of a time-based one-time password or SMS authentication code. After that, the user’s device generates a public-private key pair for that specific service. The public key is sent to the service and associated with the user, much like a passwword was in the past. Note that the public key is not secret. The private key is stored on the user’s device.

Once passkey authentication is set up, the user logs in by providing their user name. The server generates a random challenge string (at least 16 bytes long) and sends it to the user. The user’s device retrieves the private key for the desired service. This is generally stored securely on the device and unlocked via Face ID, Touch ID, or a local password. None of this information, including the private key, is sent to the server. Using the private key, the device creates a digital signature for the challenge provided by the service and sends the result to server. The server looks up the user’s public key, which was registered during enrollment, and verifies the signature against the challenge (that is, decrypt the data sent by the user and see if it matches a hash of the original challenge string). If the signature is valid, the service is convinced that the other side holds a valid private key that corresponds to the public key that is associated with the user and is therefore the legitimate user.

Identity binding: digital certificates

While public keys provide a mechanism for asserting integrity via digital signatures, they are themselves anonymous. We’ve discussed a scenario where Alice uses Bob’s public key but never explained how she can be confident that the key really belongs to Bob and was not planted by an adversary. Some form of identity binding of the public key must be implemented for you to know that you really have my public key instead of someone else’s.

Digital certificates provide a way to do this. A certificate is a data structure that contains user information (called a distinguished name) and the user’s public key. To ensure that nobody changes any of this data, this data structure also contains a signature of the certification authority. The signature is created by taking a hash of the rest of the data in the structure and encrypting it with the private key of the certification authority. The certification authority (CA) is an organization that is responsible for setting policies of how they validate the identity of the person who presents the public key for encapsulation in a certificate.

To validate a certificate, you hash all the certificate data except for the signature. Then you would decrypt the signature using the public key of the issuer. If the two values match, then you know that the certificate data has not been modified since it has been signed. The challenge now is how to get the public key of the issuer. Public keys are stored in certificates, so the issuer would also have a certificate containing its public key. The certificates for many of the CAs are preloaded into operating systems or, in some cases, browsers.

Transport Layer Security (TLS)

Secure Sockets Layer (SSL) was created as a layer of software above TCP that provides authentication, integrity, and encrypted communication while preserving the abstraction of a sockets interface to applications. An application sets up an SSL session to a service. After that, it simply sends and receives data over a socket just like it would with the normal sockets-based API that operating systems provide. The programmer does not have to think about network security. As SSL evolved, it morphed into a new version called TLS, Transport Layer Security. While SSL is commonly used in conversation, all current implementations are TLS.

Any TCP-based application that may not have addressed network security can be security-enhanced by simply using TLS. For example, the standard email protocols, SMTP, POP, and IMAP, all have TLS-secured interfaces. Web browsers use HTTP, the Hypertext Transfer Protocol, and also support HTTPS, which is the exact same protocol but uses A TLS connection.

TLS provides:

Authentication
TLS provides allows endpoints to authenticate prior to sending data. It uses public key authentication via X.509 certificates. Authentication is optional and can be unidirectional (the client may just authenticate the server), unidirectional (each side authenticates the other), or none (in which case we just exchange keys but do not validate identities).
Key exchange
After authentication, TLS performs a key exchange so that both sides can obtain random shared session keys.
Data encryption
Symmetric cryptography is used to encrypt data.
Data integrity
Ensure that we can detect if data in transit has not been modified. TLS includes a MAC with transmitted data.
Interoperability & evolution
TLS was designed to support many different key exchange, encryption, integrity, & authentication protocols. The start of each session enables the protocol to negotiate what protocols to use for the session.

Service Authorization via OAuth

Goal: Enable users to provide limited permissions for one service to access another service.

Suppose that you want an app (or some network service) to access your Google calendar. One way to do this is to provide the app with the ID and password of your Google account. Unfortunately, this gives the app full control of the entire account and is something you may not feel comfortable granting.

OAuth is designed to allow users to control what data or services one service can access from another service.

For example, if you want a photo printing service to access photos from your flickr account, you don’t want to provide it with your flickr username and password for unrestricted access. Instead, OAuth allows you to specify what specific access you allow the photo printing service to have to your flickr account and for how long. Token credentials (a bunch of bits as far as your app or website is concerned) are used in place of the resource owner’s username and password for gaining access to the service. These token credentials include a token identifier and a secret.

There are three entities involved in OAuth:

  1. User: the user and the user’s browser.

  2. Client (application, called the consumer): this is the service that the user is accessing. For example, the moo.com photo printing service

  3. Authorization Server, acting as the Service Provider (server, also called the provider): this is the service that the consumer needs to access. For example, flicker.com to get the photos

We will use the moo.com photo printing and the flickr.com photo serving service as an example.

  1. Alice wants to order some prints and logs into moo.com. Moo knows about flickr and allows her to select select flickr as the source of her photos. When Moo built its service, its developers registered the service with flickr and obtained OAuth client credentials (client ID and secret).

  2. When Alice selects flickr, moo.com needs to contact flickr.com for an authorization code, which is a temporary credential. The application, moo.com, creates a request that contains a scope: a list of requested services that it needs from flickr (for example, get photos from your account). The application then redirects Alice to to the flickr OAuth page (that could be a separate server at flickr) with a directive to redirect her back to Moo when she’s done. The redirect request to Flickr contains the app’s ID, the app’s secret, and the scope.

  3. At the flickr OAuth page, she authenticates (using whatever authentication mechanism the service requires, such as login/password or perhaps via OpenID, which may cause another level of redirection, depending on how the service handles authentication) and is presented with a consent form. This is a description of what moo.com is requesting to do (e.g., access to download photos for the next 10 minutes). She can approve or reject the request.

  4. When Alice approves the request, she is redirected back to moo.com. The redirect contains a one-time-use authorization code.

  5. Moo now contacts flickr.com directly and exchanges the authorization code for an access token. Authorization codes are used to obtain a user’s approval. Since they are sent back via an HTTP REDIRECT, they may not be sent securely but the Service Provider will only accept them from the client to which they were issued. Access tokens are used to access resources on the provider (server); that is, to call APIs.

Moo now sends API requests to flickr to download the requested photos. Every request must include the access token, which the service provider (flickr) will validate each time to ensure that the user has not revoked the ability of the client to access these services.

Distributed Authentication via OpenID Connect

Goal: Allow services to use a third-party user authentication service to authenticate a user.

OpenID Connect was created to solve the problem of alleviating a user from managing multiple identities (logins and passwords) at many different services (web sites).

It is not an authentication protocol. Rather, it’s a technique for a service to redirect authentication to a specific OpenID Connect authentication server and have that server be responsible for authentication.

OpenID Connect is an identity layer on top of the OAuth 2.0 protocol. It uses the basic protocol of OAuth and the same communication mechanisms: HTTPS with JSON messages.

There are three entities involved:

  1. Client: this is the application that the user is accessing. It is often a web site but may be a dedicated app. (In OAuth, we think of this as the consumer).

  2. User: this is the user’s browser. In the case of a dedicated app, the app will take this role.

  3. Authorization Server, acting as the Identity Provider. This server is responsible for managing your ID and authenticating you. It is called the Authorization Server because OpenID Connect is built on top of OAuth 2.0, a service authorization framework. For OAuth, we refer to this as a Service Provider instead of an Identity Provider.

OpenID Connect allows a client to answer the question, what is the identity of the person using this service?

The protocol takes the same steps as OAuth; it redirects the user to the Identity Provider, which authenticates the user and redirects the user back to the client app with a one-time authorization code. The client then contacts the Identity Provider for an access token.

Here’s the basic flow:

  1. Before OpenID Connect is used, a client may pre-register its service with an Identity Provider’s Authorization Server. This allows the Identity Provider administrator to configure policies that set restrictions on accessing user information. It also allows a client and server to agree upon a shared secret that will be used to validate future messages. If this is not done, public key cryptography can be used to transmit a key to establish a secure channel between the client and the Identity Provider.

  2. The user is presented with a request to log in. The client may choose to support multiple Identity Providers. In this case, the user identifies the Identity Provider in the user name. For example, joe@example.com. The protocol then knows that the OpenID Connect Authorization Server is located at example.com. Alternatively, the provider may require the use of a specific provider (e.g., Google, Microsoft). The client redirects the user to the Authorization Server at the Identity Provider via an HTTP REDIRECT message.

  3. The redirect results in the client sending an Authentication Request to the Identity Provider. This is an OAuth message with a scope (access request) that requests "openid'. The scope may also include a request for other identifying data, such as a user profile, email, postal address, and phone number.

  4. The Identity Provider (Authorization Server) authenticates the user using whatever protocol it chooses to use. That decision is left up to the administrators of the Authorization Server.

  5. After authenticating the user, the Identity Provider requests the user’s permission for all the requests listed in the scope. For example, SomeApp requests permission for: signing you in, your profile, your email address, your address, your phone number. This is the same consent form as a user will see with any OAuth request for services.

  6. If the user approves, the Identity Provider sends a redirect to send the browser back to the client. This redirect message contains an authorization code created by the Identity Provider that is now given to the client. The authorization code looks like a large bunch of random text and has no meaning to the user or the client.

  7. The client now sends a token request directly to the Identity Provider. The request includes the authorization code that it received. All traffic is encrypted via HTTPS to guard against eavesdroppers. If the client pre-registered, the request will contain the pre-established secret so that the Authorization Server can validate the client.

  8. The server returns an access token along with an ID token to the client. Note that the user is not involved in this flow.

The ID token asserts the user’s identity. It is a JSON object that:

  • Identifies the Identity Provider (Authorization Server)
  • Has an issue and expiration date
  • May contain additional details about the user or service
  • Is digitally signed by the Identity Provider using either the provider’s private key or a shared secret that was set up during registration.

By getting this ID token, the client knows that, as far as the Identity Provider is concerned, the user has successfully authenticated. At this point, the client does not need to contact the Identity Provider anymore.

The access token is the same access token that OAuth provides clients so that they can request services. It has an expiration date associated with it and may be passed to the Identity Provider to request access to detailed information about the user or get other protected resources. What the client can request is limited by the scope of the Authentication Request in step 2.

OpenID and OAuth were separate protocols until 2014. They were then merged so that OpenID Connect is a special mode of OAuth that requests authentication and provides the client with an ID token. However, the purpose of OAuth and OpennID are fundamentally different. OpenID is designed to allow a third party to manage user authentication. That is, it provides single sign-on: allowing a user to use the same login and a third party (OpenID) authentication service for many web sites. OAuth, on the other hand, is designed for service authorization. It is up to the remote service you are accessing to decide if and how to authenticate you before allowing you to authorize access to that service.

Clusters

Goal: Coordinate the activities of a network of computers to provide users with a single system image.

Clustering is the aggregation of multiple independent computers to work together and provide a single system that offers increased reliability and/or performance. It is a realization of the single system image that we discussed at the start of the semester. Clusters are generally off-the-shelf computers that are connected to a local area network that allows them to communicate with other computers in the cluster. A cluster may be a collection of tens of thousands (or more) computers (e.g., a Google cluster) or just a backup computer to take over for a failed web server or database.

Systems are clustered to accomplish various goals:

Workload coordination
Jobs need to be distributed across a network of machines. Some of these jobs, such as those that use MapReduce or Spark, themselves need to be distributed across machines with sufficient resources to execute them. Apache YARN and Mesos are two examples of such coordinators.
Supercomputing
Also known as high-performance computing, or HPC. The goal of an HPC cluster is to create a computing environment that resembles that of a supercomputer.
High Availability (HA)
The goal in this cluster it to ensure maximum availability by providing redundant systems for failover.
Load Balancing
A load balancing cluster distributes requests among a collection of computers. In doing so, it addresses both scalability and high availability.
Storage
Storage clustering is a way to ensure that systems can all access the same storage. It is also a way to make vast amounts of storage available to a computer without having to put it inside the computer (where it will not be available if the computer fails).

The boundaries between these cluster types are often fuzzy and many clusters will use elements of several of these cluster types. For example, any cluster may employ a storage cluster. High availability is important in supercomputing clusters since the likelihood of any one computer failing increases with an increasing number of computers. There is no such thing as a “standard” cluster.

YARN

Apache YARN (Yet Another Resource Negotiator) is a core component of the Apache Hadoop ecosystem, designed for cluster management and job scheduling. Its primary purpose is to efficiently manage and allocate resources across the cluster, providing scalability. YARN bseparates the resource management and job scheduling/monitoring functions into separate processes. This separation allows for greater flexibility in processing data and isn’t tied to a single framework, like MapReduce. By managing resources at a cluster level, YARN enables more effective use of distributed resources than if each framework tried to coordinate activities on its own.

The architecture of YARN is composed of several key components. The Resource Manager, the master daemon, is responsible for managing the use of resources across the cluster and scheduling user applications. It has two main components: the Scheduler, which allocates resources based on constraints such as capacity, queues, etc., and the ApplicationManager, which manages system applications in the cluster. Each worker node runs a NodeManage process, which monitors the resource usage (CPU, memory, disk, network) and reports it to the Resource Manager. Lastly, the Application Master, a container created for each application, negotiates resources from the ResourceManager and works with the NodeManager to execute and monitor tasks. This decentralized approach makes YARN more scalable and efficient compared to the older Hadoop MapReduce version, which relied on a centralized JobTracker for resource management and job scheduling.

Mesos

Apache Mesos is designed to facilitate resource isolation and sharing across diverse distributed applications or frameworks. At its core, Mesos is built to abstract and manage computing resources like CPU, memory, and storage across physical or virtual machines. This abstraction allows for the creation of fault-tolerant and elastic distributed systems. Mesos is distinct in its ability to support a wide range of application frameworks (such as Hadoop, MPI, Spark, Kafka) in a shared environment, offering more dynamic resource sharing than traditional clusters. This differs from Apache YARN, which is tailored specifically for Hadoop frameworks. Mesos’s general-purpose nature and its unique two-level scheduling mechanism enable it to handle a broader array of frameworks and use cases beyond just big data processing, including real-time analytics and machine learning workloads.

The architecture of Apache Mesos is centered around a Mesos Master node, which sends requests to agents running on each worker node in the cluster. A key feature of Mesos is its pluggable allocation module, which plays a crucial role in scheduling jobs among different frameworks. This module allows for custom policies, enabling various frameworks to coexist and efficiently share resources according to specific needs. The Mesos Master employs this module to offer resources to registered frameworks in a manner that differs fundamentally from YARN’s ResourceManager. The frameworks, each with its own scheduler, then decide how to utilize these resource offers. This two-level scheduling, where the Mesos Master makes offers and frameworks make their own scheduling decisions, grants frameworks the autonomy to implement tailored scheduling logic. As a result, Mesos provides a more adaptable and scalable solution, especially in heterogeneous environments where multiple, diverse distributed systems need to coexist and optimally share resources.

Cluster components

A cluster needs to keep track of its cluster membership: which machines are members of the cluster. Related to this are the configuration and service management components. The configuration system runs on each node (computer) in the cluster and manages the setup of each machine while the service management component identifies which nodes in the cluster perform which roles (e.g., standby, active, running specific applications).

A quorum is the number of nodes in a cluster that have to be alive for the cluster to function. Typically, a majority is required. This provides a simple way of avoiding split-brain due to network partitioning where one group of computers cannot talk to another and two instances of the cluster may be created. With a majority quorum, a minority of systems will never create their own cluster.

A cluster interconnect is the network that allows computers in a cluster to communicate with each other. In most cases, this is just an Ethernet local area network. For performance, bandwidth and latency are considerations. Communicating outside of a rack incurs longer cable runs and the overhead of an extra switching stage. Communicating outside of a data center incurs even longer latency. For maximum performance, we would like computers that communicate frequently to be close together physically. However, for maximum availability, we would like them to be distant. If a rack switch or an entire data center loses power, it would be good to have a working replica elsewhere. For high performance applications within a local area, a dedicated network is often used as a cluster interconnect. This is known as a System Area Network (SAN). A high-performance SAN will provide low latency, highly reliable, switched communication between computers. By using a SAN, the software overhead of having to run the TCP/IP stack, with its requisite fragmentation, buffer management, timers, acknowledgements, and retransmissions, is largely eliminated. Remote DMA (RDMA) allows data to be copied directly to the memory of another processor. System Area Networks are often used for HPC clusters, along with RDMA communication incorporated into the Message Passing Interface (MPI) library. MPI is used in many high performance computing applications. Common examples of system area networks are Infiniband and 10 Gbps ethernet with Data Center Bridging.

A heartbeat network is the mechanism that is used to determine whether computers in the cluster are alive or dead. A simple heartbeat network exchanges messages between computers to ensure that they are alive and capable of responding. Since a local area network may go down, one or more secondary networks are often used as dedicated heartbeat networks in order to distinguish failed computers from failed networks. Asynchronous networks, such as IP, make the detection of a failed computer problematic: one is never certain whether a computer failed to send a message or whether the message is delayed beyond a timeout value.

Storage clusters

Storage in a clustered computer system can be provided in a variety of ways. Distributed file systems, such as NFS, SMB, or AFS can be used. These provide file-level remote access operations over a network.

A Storage Area Network (SAN, not to be confused with a System Area Network) is a dedicated network for connecting computers to dedicated disk systems (storage arrays). Common SAN interconnect technologies include iSCSI, which uses the SCSI protocol over the Ethernet, and Fibre Channel. Computers access this remote storage at the block level (read a specific block, write a specific block), just like they would access local storage. With a SAN, however, access to the same storage can be shared among multiple computers. This environment is called shared disk. A distributed lock manager, or DLM, manages mutual exclusion by controlling access to key resources on the shared disk so that, for example, two computers will not try to write to the same disk block at the same time. A clustered file system (also called a cluster file system) is a file system that is built on top of a shared disk. Unlike a distributed file system (NFS, SMB, et al.), which uses remote access at a file level, each computer’s operating system implements a full file system and makes requests at the block level. Examples of such file systems include the Oracle Cluster File System for Linux (OCFS2), Red Hat’s Global File System (GFS2), and Microsoft’s Cluster Shared Volumes (CSV). The DLM is used to ensure that critical shared file system data structures, such as bitmaps of free blocks, inode structures, and file lock tables, are accessed exclusively and caching is coherent. It operates at the level of the implementation of a file system rather than high-level file systems services as in distributed file systems. As such, it differs from something like the NFS lock daemon, which kept track of file locks requested by applications rather than block-level locks needed to keep a file system coherent.

A shared nothing cluster architecture is one where each system is independent and there is no single point of contention in the system, such as competing for access to a shared disk. Because there is no contention, there is no need for a DLM. In this environment, any data that is resident on a system’s disk can only be obtained by sending a request to the computer that owns the disk. If the computer dies, the data is generally unavailable but may be replicated on other nodes. An alternative design that uses a SAN can allow disk access to be switched to another computer but ensure that only one computer accesses the file system at any time.

To make disks themselves highly available, RAID (redundant array of independent disks) is often employed. RAID 1 is disk mirroring. Anything that is written to one disk gets written to a secondary disk. If one fails then you still have the other. RAID 5 and RAID 6 stripes the data across several disks and also adds in error correcting codes so that it data could be reconstructed from the available segments if one would die (e.g., parity to allow recovering data lost if one disk fails).

High-Performance Computing (HPC)

High-performance clusters (HPC) are generally custom efforts but there are a number of components that are common across many implementations. HPCs are designed for traditional supercomputing applications that focus on a large amount of computation on large data sets. These applications are designed to be partitioned into multiple communicating processes. The Message Passing Interface (MPI) is a popular programming interface for sending and receiving messages that handles point-to-point and group communication and provides support for barrier-based synchronization. It is sometimes used together with the Parallel Virtual Machine (PVM), a layer of software that provides an interface for creating tasks, managing global task IDs, and managing groups of tasks on arbitrary collections of processors. PVM is in many ways similar to MPI but designed to be more dynamic and support heterogenous environments. However, its performance was not up to the levels of MPI and its popularity is waning. Beowulf and Rocks Cluster are examples of HPC clusters based on Linux. Microsoft offers high performance clustering via the Microsoft HPC Pack. There are many other HPC systems as well. The common thread among them all is that they provide a front-end server for scheduling jobs and monitoring processes and offer an MPI library for programming.

Batch Processing: Single-Queue Work Distribution

Single queue work distribution is a form of high performance computing that does not rely on communication between computing nodes. Where traditional HPC applications usually involve large-scale array processing and a high level of cooperation among processing elements, the work distribution approach is used for applications such as render farms for computer animation, where a central coordinator (dispatcher) sends job requests to a collection of computers. When a system completes a job (e.g., “render frame #4,178”), the dispatcher will send it the next job (e.g., “now render frame #12,724”). The dispatcher will have the ability to list jobs, delete jobs, dispatch jobs, and get notified when a job is complete. The worker nodes have no need to communicate with each other.

Load Balancing

Web-services load balancing is a somewhat trivial but very highly used technique for distributing the load of many network requests among a collection of computers, each of which is capable of processing the request. Load balancing serves three important functions:

  1. Load balancing. It enables scalability by distributing requests among multiple computers.

  2. High availability (failover). If a computer is dead, the requests will be distributed among the remaining live computers.

  3. Planned outage management. If a computer needs to be taken out of service temporarily (for example, to upgrade software or replace hardware), requests will be distributed among the remaining live computers.

The simplest form of load balancing is to have all requests go to a single computer that then returns an HTTP REDIRECT error. This is part of the HTTP protocol and will lead the client to re-issue the request to the computer specified by the REDIRECT error.

Another, and the most popular approach, is to use a load-balancing router to map incoming requests to one of several multiple back-end computers.

For load balancing across data centers, DNS-based load balancing may be used where a DNS query returns IP addresses of machines at different data centers for domain name queries.

High Availability

High-availability clusters strive to provide a high level of system uptime by taking into account the fact that computers may fail. When this happens, applications running on those computers will resume on other computers that are still running. This is called failover.

Low-level software to support high-availability clustering includes facilities to access shared disks and support for IP address takeover, which enables a computer to listen on multiple IP addresses so that IP packets that were sent to a failed machine can reach the backup system instead.

Mid-layer software includes distributed elections to pick a coordinator, propagation of status information, and figuring out which systems and applications are alive. Higher-layer software includes the ability to restart applications, let a user assign applications to computer, and let a user see what’s going on in the system as a whole.

An active/passive configuration is one where one or more backup (passive) systems are waiting to step in for a system that died. An active/active configuration allows multiple systems to handle requests. Requests may be load balanced across all active systems and no failover is needed; the dead system is simply not sent any requests.

Failover can be implemented in several ways:

Cold failover
This is an application restart — the application is started afresh from the beginning. An example is starting up a web server on a backup computer because the primary web server died. There is no state transfer. If we are referring to computer systems rather than software, cold failover refers to a system that needs to be powered up.
Warm failover
Here, the application is checkpointed periodically. It can then be restarted from from the last checkpoint. Many cluster libraries provide the ability for a process to checkpoint itself (save its memory image). Pregel is an example of a software framework that relies on periodic checkpointing so that a graph computation does not have to restart from the beginning. If we are referring to computer systems rather than software, warm failover refers to a system that is periodically powered down and updated with the state of its software.
Hot failover
Here, a replica application is always kept synchronized with the active application on another computer. An example of this is a replicated state machine. Chubby servers, for example, implement hot failover: if the Chubby master fails, any other machine in the Chubby cluster can step in. If we are referring to computer systems rather than software, hot failover refers to a system that is already running.

Be aware that the precise definitions of cold, hot, and warm may differ among different vendors who advertise these services.

Cascading failover refers to the ability of an application to fail over even after it already has failed over in the past. Multi-directional failover refers to the ability to restart applications from a failed system on multiple available systems instead of a specific computer that is designated for use as a standby system.

An annoying malfunction is a byzantine failure. In this case, the failed process or computer continues to communicate but communicates with faulty data. Related to this is the problem of fail-restart behavior, where a process may restart but not realize that the data it is working with is obsolete (e.g., a transaction coordinator might restart and not realize that the transaction has already been aborted). Fencing is the use of various techniques to isolate a node from the rest of the cluster. Power fencing shuts off power to the node to ensure that the node does not restart. SAN fencing disables the node from accessing shared storage, avoiding possible file system corruption. Other fencing techniques may block network messages from the node or remove processes from a replication group (as done in virtual synchrony).


  1. The abbreviation UTC is a compromize between the English Coordinated Universal Time and the French Temps Universel Coordonné.  ↩︎

Last modified December 12, 2023.
recycled pixels