|
Welcome to ShortScience.org! |
|
|
[link]
A parameter server is more or less a distributed key-value store optimized for training machine learning models. For example, imagine we're learning a weight vector $w = (w_1, w_2, w_3)$ using logistic regression. We can distribute $w$ across two shards of the parameter server where one shard stores $(1, w_1)$ and the other stores $(2, w_2)$ and $(3, w_3)$. Worker nodes can then read parts of the weight vector, perform some computation, and write back parts of the weight vector.
This paper presents an optimized parameter server with the following features:
1. Efficient communication.
2. Flexible consistency models.
3. Elastic scalability.
4. Fault tolerance and durability.
5. Ease of use.
#### Machine Learning
Many machine learning algorithms try to find a weight vector $w \in \mathbb{R}^d$ that minimizes a regularized cost function of the following form:
$$ F(w) = \Omega(w) + \sum_{i=1}^n l(x_i, y_i, w) $$
When $n$ and $d$ get really big, it becomes intractable to run these algorithms on a single machine. Instead, we have to parallelize the algorithm across multiple machines.
As an example, consider how to perform distributed batch gradient descent across $m$ workers. We initialize $w$ and store it in a parameter server. Concurrently, each worker begins by reading $\frac{1}{m}$ of the training data. Then, every worker reads $w$ from the parameter server and computes a gradient with respect to its local training data (actually, it only needs to read the relevant parts of $w$). Then, it pushes its gradient to the parameter server. Once the server receives gradients from every worker, it aggregates them together and updates $w$. Finally, workers pull the updated $w$ and loop.
#### Architecture
A parameter server consists of a bunch of servers that store weights and a bunch of workers that perform computations with the weights (e.g. compute gradients). Servers are organized into a server group managed by a server manager. Workers are organized into multiple worker groups, and each worker group is managed by a task scheduler. The server manager manages which data is assigned to which server. The task scheduler assigns tasks to workers and monitors progress.
Parameters are stores as key-value pairs. For example, a weight vector $w \in \mathbb{R}^d$ can be stored as a set of pairs $(i, w_i)$ for $1 \leq 1 \leq d$. To store sparse vectors more efficiently, only non-zero entries of $w$ must be explicitly stored. If a pair $(i, w_i)$ is missing, the parameter server assumes $w_i = 0$.
Most machine learning algorithms do not update individual entries of a weight vector at a time. Instead, they typically update part of a matrix or part of a vector. To take advantage of this workload pattern, the parameter server allows workers to read and write ranges of values instead of single values at a time. This reduces network overhead.
In addition to pushing and pulling entries of $w$, workers can also register user-defined functions to run at a server. For example, a server can compute the gradient of a regularization term.
By default, the parameter server runs tasks asynchronously. That is, if a worker issues a pull or push request, it does not block. However, the parameter server also allows workers to explicitly mark dependencies between different requests which forces them to serialize.
Some algorithms are robust to weir inconsistencies. These algorithms can often run faster with weaker consistency. The parameter server provides three levels of consistency:
1. Sequential consistency in which every request is totally serialized.
2. Eventual consistency in which requests are run whenever they please.
3. Bounded delay in which a request is delayed until all tasks that began τ time ago have completed.
Users can also specify that a certain consistency model only apply to a certain subset of key-value pairs as specified by a filter.
#### Implementation
Data is partitioned across servers using consistent hashing, and the server manager records the assignment of key ranges to machines. When a new server joins:
1. The server manager assigns a new key range to the server.
2. The server fetches its data from other servers.
3. The server manager broadcasts the update to the other servers who relinquish data they no longer own.
The parameter server uses chain replication to replicate data. Each node forms a chain with the $k$ previous nodes in the hashing ring. Workers send updates to the master which is chain replicated to the next $k$ servers.
Logically, the parameter server tags each key-value pair with a vector clock (though honestly, I'm not exactly sure I understand why). Physically, each range of key-value pairs is associated with a vector clock. This range-based vector clock avoids storing redundant vector clock information.
Messages sent from workers to servers are compressed with Snappy. Moreover, servers cache parts of messages, and workers can send a hash instead of a whole message if they the think a server has it cached.
![]() |
|
[link]
Modelling a distributed system as a replicated state machine provides the illusion that the distributed system is really just a single machine. At the core of the replicated state machine approach is a replicated log that is kept consistent by a consensus algorithm. Traditionally, consensus has been synonymous with Paxos. Paxos is taught in schools, and most consensus algorithm implementations are based on Paxos. However, Paxos has two main disadvantages: 1. It is hard to understand. Single-decree Paxos is nuanced, and composing single-decree Paxos into multi-Paxos is confusing. 2. It is hard to implement efficiently. Multi-Paxos is not very well described in the literature, and the algorithm is difficult to implement efficiently without modification. This paper presents the Raft consensus algorithm. Raft provides the same performance and safety as multi-Paxos but it is designed to be much easier to understand. Basics. Every node in a raft cluster is in one of three states: leader, follower, or candidate. The leader receives requests from users and forwards them to followers. Followers are completely passive and receive messages from leaders. Candidates perform leader elections in an attempt to become a leader. In normal operation, there is a single leader, and every other node is a follower. Raft proceeds in a series of increasingly numbered terms. Each term consists of a leader election followed by (potentially) normal operation. There is exactly one leader elected per term. Moreover, each node participates in monotonically increasing terms. When a node sends a message in Raft, it annotates it with its term. If a leader receives a message from a later term, it immediately becomes a follower. Nodes ignore messages annotated with older terms. Raft uses two RPCs: RequestVote (for leader election) and AppendEntries (for replication and heartbeats). Leader Election. Leaders periodically send heartbeats (AppendEntries RPCs without any entries) to followers. As long as a follower continues to receive heartbeats, it continues to be a follower. If a follower does not receive a heartbeat after a certain amount of time, it begins leader election: it increments its term, enters the candidate state, votes for itself, and sends RequestVote RPCs in parallel to all other nodes. Either, 1. It wins. Nodes issue a single vote per term on a first come first serve basis. If a candidate receives a vote from a majority of the nodes, then it becomes leader. 2. It hears from another leader. If a candidate receives a message from another leader in a term at least as large as it, it becomes a follower. 3. It times out. It's possible that a split vote occurs and nobody becomes leader in a particular term. If this happens, the candidate times out after a certain amount of time and begins another election in the next term. Log Replication. During normal operation, a leader receives a request from a client, appends it to its log annotated with the current term, and issues AppendEntries to all nodes in parallel. An entry is considered committed after it is replicated to a majority of the nodes. Once a log entry is committed, all previous log entries are also committed. Once a log entry is committed, the leader can apply it and respond to the user. Moreover, once an entry is committed, it is guaranteed to eventually execute at all available nodes. The leader keeps track of the index of the largest committed entry and sends it to all other nodes so that they can also apply log entries. Raft satisfies a powerful log matching invariant: 1. "If two entries in different logs have the same index and term, then they store the same command." 2. "If two entries in different logs have the same index and term, then the logs are identical in all preceding entries." 1 is ensured by the fact that a single leader is elected for any given term, the fact that a leader only creates a single log entry per index, and the fact that once a log entry is created, it never changes index. 2 is ensured by a runtime check. When a leader sends an AppendEntries RPC for a particular index, it also sends its log entry for the previous index. The follower only applies the AppendEntries RPC if it agrees on the previous index. Inductively, this guarantees 2. Followers may have missing or extraneous log entries. When this happens, the leader identifies the longest prefix on which the two agree. It then sends the rest of its log. The follower overwrites its log to match the leader. Safety. The protocol described so far is unsafe. If a new leader is elected, it can accidentally force followers to overwrite committed values with uncommitted values. Thus, we must ensure that leaders contain all committed entries. Other consensus algorithms ensure this by shipping committed values to newly elected leaders. Raft takes an alternative approach and guarantees that if a leader is elected, it has every committed entry. To ensure this, Raft must restrict which nodes can be elected. A follower rejects a RequestVote RPC if the requesting candidate's log is not as up-to-date as its log. One log is as up-to-date as another if its last entry has a higher term or has the same term but is longer. Since a candidate must receive a majority of votes and committed values have been replicated to a majority of nodes, a candidate must contact a node with all committed values during its election which will prevent it from being elected if it doesn't have all the committed log entries. To prevent another subtle bug, leaders also do not directly commit values from previous terms. They only commit values from their own term which indirectly commits previous log entries from previous terms. Cluster Membership Changes. A Raft cluster cannot be instantaneously switched from one configuration to another. For example consider a cluster moving from 3 to 5 nodes. It's possible that two nodes are elected master for the same term which can lead to a safety violation. Instead, the cluster transitions to a joint consensus phase where decisions require a majority from both the old and new configuration. Once a majority of nodes accept the new configuration, the cluster can transition to it. ![]() |
|
[link]
When running an application in the cloud, users have to trust (i) the cloud provider's software, (ii) the cloud provider's staff, and (iii) law enforcement with the ability to access user data. Intel SGX partially solves this problem by allowing users to run small portions of program on remote servers with guarantees of confidentiality and integrity. Haven leverages SGX and Drawbridge to run entire legacy programs with shielded execution. Haven assumes a very strong adversary which has access to all the system's software and most of the system's hardware. Only the processor and SGX hardware is trusted. Haven provides confidentiality and integrity, but not availability. It also does not prevent side-channel attacks. There are two main challenges that Haven's design addresses. First, most programs are written assuming a benevolent host. This leads to Iago attacks in which the OS subverts the application by exploiting its assumptions about the OS. Haven must operate correctly despite a malicious host. To do so, Haven uses a library operation system LibOS that is part of a Windows sandboxing framework called Drawbridge. LibOS implements a full OS API using only a few core host OS primitives. These core host OS primitives are used in a defensive way. A shield module sits below LibOS and takes great care to ensure that LibOS is not susceptible to Iago attacks. The user's application, LibOS, and the shield module are all run in an SGX enclave. Second, Haven aims to run unmodified binaries which were not written with knowledge of SGX. Real world applications allocate memory, load and run code dynamically, etc. Many of these things are not supported by SGX, so Haven (a) emulated them and (b) got the SGX specification revised to address them. Haven also implements an in-enclave encrypted file system in which only the root and leaf pages need to be written to stable storage. As of publication, however, Haven did not fully implement this feature. Haven is susceptible to replay attacks. Haven was evaluated by running Microsoft SQL Server and Apache HTTP Server. ![]() |
|
[link]
Storm is Twitter's stream processing system designed to be scalable, resilient, extensible, efficient, and easy to administer. In Storm, streams of tuples flow through (potentially cyclic) directed graphs, called topologies, of processing elements. Each processing element is either a spout (a source of tuples) or a bolt (a tuple processor). Storm Overview. Storm runs on a cluster, typically over something like Mesos. Each Storm cluster is managed by a single master node knows as a Nimbus. The Nimbus oversees a cluster of workers. Each worker runs multiple worker processes which run a JVM which run multiple executors which run multiple tasks: ``` worker +--------------------------------------------------------------+ | worker process worker process | | +---------------------------+ +---------------------------+ | | | JVM | | JVM | | | | +-----------------------+ | | +-----------------------+ | | | | | executor executor | | | | executor executor | | | | | | +--------+ +--------+ | | | | +--------+ +--------+ | | | | | | | +----+ | | +----+ | | | | | | +----+ | | +----+ | | | | | | | | |task| | | |task| | | | | | | |task| | | |task| | | | | | | | | +----+ | | +----+ | | | | | | +----+ | | +----+ | | | | | | | | +----+ | | +----+ | | | | | | +----+ | | +----+ | | | | | | | | |task| | | |task| | | | | | | |task| | | |task| | | | | | | | | +----+ | | +----+ | | | | | | +----+ | | +----+ | | | | | | | +--------+ +--------+ | | | | +--------+ +--------+ | | | | | +-----------------------+ | | +-----------------------+ | | | +---------------------------+ +---------------------------+ | | supervisor | | +----------------------------------------------------------+ | | | | | | +----------------------------------------------------------+ | +--------------------------------------------------------------+ ``` Users specify a topology which acts as a logical topology. Storm exploits data parallelism by expanding the logical topology into a physical topology in which each logical bolt is converted into a replicated set of physical bolts. Data is partitioned between producer and consumer bolts using one of the following partitioning scheme: - shuffle: Data is randomly shuffled. - fields: Data is hash partitioned on a subset of fields. - all: All data is sent to all downstream bolts. - global: All data is sent to a single bolt. - local: Data is sent to a task running on the same executor. Each worker runs a supervisor which communicates with the Nimbus. The Nimbus stores its state in Zookeeper. Storm Internals. Nimbus and Zookeeper. In Storm, topologies are represented as Thrift objects, and the Nimbus is a Thrift server which stores topologies in Zookeeper. This allows topologies to be constructed in any programming language or framework. For example, Summingbird is a Scala library which can compile dataflows to one of many data processing systems like Storm or Hadoop. Users also send over a JAR of the code to the Nimbus which stores it locally on disk. Supervisors advertise openings which the Nimbus fills. All communication between workers and the Nimbus is done through Zookeeper to increase the resilience of the system. Supervisor. Each worker runs a supervisor process which is responsible for communicating with the Nimbus, spawning workers, monitoring workers, restarting workers, etc. The supervisor consists of three threads: (1) a main thread, (2) an event manager thread, and (3) a process event manager thread. The main thread sends heartbeats to the Nimbus every 15 seconds. The event manager thread looks for assignment changes every 10 seconds. The process event manager thread monitors workers every 3 seconds. Workers and Executors. Each executor is a thread running in a JVM. Each worker process has a thread to receive tuples and thread to send tuples. The receiving thread multiplexes tuples to different tasks' input queues. Each executor runs (1) a user logic thread which reads tuples from its input queue and processes them and (2) an executor send thread which puts outbound tuples in a global outbound queue. Processing Semantics. Storm provides at most once and at least once semantics. Each tuple in the system is assigned a unique 64 bit identifier. When a bolt processes a tuple, it can generate new tuples. Each of these tuples is also given a unique identifier. The lineage of each tuple is tracked in a lineage tree. When a tuple leaves the system, all bolts that contributed to it are acknowledged and can retire their buffered output. Storm implements this using a memory-efficient representation that uses bitwise XORs. Commentary. The paper doesn't mention stateful operators. ![]() |
|
[link]
Strong consistency is easy to reason about, but typically requires coordination which increases latency. Weaker consistency can improve performance but is difficult to reason about. This paper presents a program analysis and distributed protocol to run transactions with coordination-free strong consistency.
Analysing Transactions. We model a database as a finite map from objects to integers. Transactions are ordered sequences of reads, writes, computations, and prints; this is formalized below. A transaction T executes on a database D to produce a new database D' state and a log G' of printed values. Formally, eval(D, T) = <D', G'>.
Symbolic tables categorize the behavior of a transaction based on the initial database sate. Formally, a symbolic table for transaction T is a binary relation Q of pairs <P, T'> where P is a formula in first order logic describing the contents of a database, and T' is a transaction such that T and T' are observationally equivalent when run on databases satisfying P. A symbolic table can also be built for a set of transactions.
Formally, transactions are expressed in a language L which is essentially IMP with database reads, database writes, and prints. A somewhat simple recursive algorithm walks backwards through the program computing symbolic tables. Essentially, the algorithm traces all paths through the programs control flow.
There is also a higher-level language L++ which can be compiled to L.
Homeostasis Protocol. Assume data is partitioned (not replicated) across a cluster of K nodes. We model a distributed database as a pair <D, Loc> where D is a database and Loc is a function from objects to an index between 1 and K. Each transaction T runs on a site i; formally, l(T) = i. For simplicity, we assume that transactions only write to objects local to the site it is running on.
Each transaction runs on some site. It reads fresh versions of values on the site and stale versions of values on other sites. Nodes establish treaties with one another such that operating with stale data does not affect the correctness of the transaction. This is best explained by way of example. Imagine the following transaction is running on a site where x is remote.
```
x' = r(x)
if x' > 0:
write(y = 1)
else:
write(y = 2)
```
If we establish the treaty x > 0, then it doesn't matter what the actual value of x is. We now formalize this notion.
Given a database D, a local-remote partition is a function p from objects to booleans. We can represent a database D with respect to a local-remote p as a pair (l, r) where l is a vector of values x such that p(x), and r is a vector of values x such that not p(x). In words, we can model a database as disjoint sets of local and remote values.
We say <(l, r), G> = <(l', r') G'> if l = l' and r = r'. Given a database D, local-remote partition p, transaction T, and set of vectors L and R, we say (L, R) is a local-remote slice (LR-slice) for T if Eval((l, r), T) = Eval((l, r'), T) for all l in L and r, r' in R. In words, (L, R) is a local-remote slice for T if T's output depends only on the values of local values.
A global treaty Gamma is a subset of possible database states. A global treaty is valid for a set of transactions {T1, ..., Tn} if ({l | (l, r) in Gamma}, {r | (l, r) in Gamma}) is an LR-slice for all T.
The homoeostasis protocol proceeds in rounds where each round has three phases:
1. Treaty generation The system generates a treaty for the current database state.
2. Normal execution. Transactions can execute without coordination reading a snapshot of remote values. After each site executes a transaction, it checks that it does not bring the database to a state outside the treaty. If it doesn't, the transaction is committed. If it does, we enter the next phase.
3. Cleanup. All sites synchronize and communicate all values that have changed since the last round. All sites then run the transaction that caused the violation. Finally, we enter the next round.
Generating Treaties. Two big questions remain: how do we generate treaties, and how do we enforce treaties?
Given an initial database state D, we could always pick Gamma = {D}. This requires that we synchronize after every single database modification. We want to pick the treaties that let us run as long as possible before synchronizing. We can pick the predicate P in the symbolic table that D satisfies but this isn't guaranteed to be a valid treaty. Instead we take the predicate P and divide it into a set of local treaties P1, ..., PK where the conjunction of all local treaties imply the global treaty. Moreover, each local treaty must be satisfied by the database. The conjunction of the local treaties is our global treaty and is guaranteed to be valid.
Finding good local treaties is not easy. In fact, it can be undecidable pretty easily. We limit ourselves to linear arithmetic and leverage SMT solvers to do the heavy lifting for us. First, we decompose the global treaty into a conjunction of linear constraints. We then generate templates from the constraints and instantiate them using Z3.
Homeostasis in Practice. Roy et al. present a homoeostasis prototype. An offline preprocessing component takes in L++ transactions and computes join symbolic tables, using tricks to keep the tables small. It them initializes global and local treaties. The online execution component executes the homeostasis protocol described above. It is implemented in Java over MySQL. The analysis uses ANTLR-4 and Z3.
![]() |