Building Distributed Systems With Stateright
by Jonathan Nadal
Deep understanding of causality sometimes requires the understanding of very large patterns and their abstract relationships and interactions, not just the understanding of microscopic objects interacting in microscopic time intervals.
― Douglas R. Hofstadter, I Am a Strange Loop
Distributed computing is a term that refers to multiple computers working together as a system to solve a problem, typically because that problem would not be solvable on a single computer. For example, we all want to know that our important files will be accessible even when computer hardware inevitably fails. As a second example, a researcher at a pharmaceutical company may have a complex problem that would take decades for a single computer to solve, but which a collection of computers working together could solve in days.
Unique algorithms must be employed to coordinate workloads across these geographically distributed systems of computers because they are susceptible to categories of nondeterminism that do not arise when a problem is solved with a single computer. For example, the networks that link these computers will drop, reorder, and even redeliver messages. Algorithms that fail to account for this behavior may run correctly for extended periods but will eventually fail at unpredicatable times in unpredictable ways, such as causing data corruption.
Stateright is a software framework for analyzing and systematically verifying distributed systems. Its name refers to the goal of verifying that a system's collective state always satisfies a correctness specification, such as "any data written to the system should be accessible as long as at least one data center is reachable."
Cloud service providers like AWS and Azure leverage verification software such as the TLA+ model checker to achieve the same goal, but whereas those solutions typically verify a high level system design, Stateright is able to verify the underlying system implementation in addition to the design (along with providing other unique benefits explained in the "Comparison with TLA+" chapter). On the other end of the spectrum are tools such as Jepsen which can validate a final implementation by testing a random subset of the system's behavior, whereas Stateright can systematically enumerate all possible behaviors within a specified model of the system.
We'll jump right in with a motivating example in the first chapter, Getting Started. Alternatively:
- See the Stateright YouTube channel if you prefer a quick video introduction.
- For a broader overview of Stateright see the related Distributed Systems
Reading Group special
session.
The slides are available on this site
(
Esc
key zooms out, and note that there areUp
/Down
slides). Murat Demirbas also wrote an associated blog post.
Chapter 1: Getting Started
IMPORTANT: Stateright is a relatively new framework and will be making breaking API changes leading up to a 1.0 release, at which point the API will be considered stable. If you plan to use Stateright for production scenarios, then please file a GitHub issue so that the author can coordinate with you to minimize any disruption.
Stateright's Value Proposition
Stateright is a Rust library that simplifies implementing distributed systems while more importantly providing a powerful mechanism for verification.
Verification of distributed systems is difficult because distributed algorithms must be resilient to nondeterminism caused by concurrency such as threads racing, but that's only part of the challenge. Distributed algorithms also need to account for the fact that computers typically interface via unreliable networks that will periodically do things like reorder or lose messages (if you run your system in "the cloud," then it's running on an unreliable network). Furthermore, often a distributed algorithm is expected to continue working even if a subset of computers crash at arbitrary points in their execution. Designing algorithms that continue working in the presence of this added nondeterminism is nontrivial and error prone, which necessitates special tools for verifying correctness.
One approach to verifying the correctness of distributed algorithms is to run them in an environment that randomly introduces nondeterminism more frequently than a normal environment would. This is the approach taken by Jepsen, and it has proven to be incredibly effective, finding bugs in distributed systems such as etcd, PostgreSQL, Redis, and Zookeeper among many others.
Stateright's approach is similar, but rather than testing a random subset of possible behaviors, it tests all possible observable behaviors within a particular specification. The catch is that Stateright needs to be embedded into the system's implementation, whereas solutions such as Jepsen do not, making them amenable to testing a wider range of software; but if you are writing a distributed system in Rust, then Stateright can provide additional verification over random testing.
An Example
Let's start with a very simple distributed system: a single client that can interact with a single server by reading or writing a value. We'll see that even this minimal example is susceptible to surprising behavior.
Install the Rust programming
language if it is not already
installed, then initialize a new project using the cargo
utility included
with Rust. If you are new to Rust, then you should also review some of the
language's learning resources.
mkdir getting-started
cd getting-started
cargo init
Define dependencies in Cargo.toml
.
[package]
name = "getting-started"
version = "0.1.0"
edition = "2018"
[dependencies]
env_logger = "0.7"
serde_json = "1.0"
stateright = "0.30"
Here is the complete implementation for main.rs
. Copy-paste it into your own
file. The subsequent sections will explain further.
use stateright::actor::{*, register::*};
use std::borrow::Cow; // COW == clone-on-write
use std::net::{SocketAddrV4, Ipv4Addr};
type RequestId = u64;
#[derive(Clone)]
struct ServerActor;
impl Actor for ServerActor {
type Msg = RegisterMsg<RequestId, char, ()>;
type State = char;
type Timer = ();
fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
'?' // default value for the register
}
fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
src: Id, msg: Self::Msg, o: &mut Out<Self>) {
match msg {
RegisterMsg::Put(req_id, value) => {
*state.to_mut() = value;
o.send(src, RegisterMsg::PutOk(req_id));
}
RegisterMsg::Get(req_id) => {
o.send(src, RegisterMsg::GetOk(req_id, **state));
}
_ => {}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use stateright::{*, semantics::*, semantics::register::*};
use ActorModelAction::Deliver;
use RegisterMsg::{Get, GetOk, Put, PutOk};
#[test]
fn is_unfortunately_not_linearizable() {
let checker = ActorModel::new(
(),
LinearizabilityTester::new(Register('?'))
)
.actor(RegisterActor::Server(ServerActor))
.actor(RegisterActor::Client { put_count: 2, server_count: 1 })
.property(Expectation::Always, "linearizable", |_, state| {
state.history.serialized_history().is_some()
})
.property(Expectation::Sometimes, "get succeeds", |_, state| {
state.network.iter_deliverable()
.any(|e| matches!(e.msg, RegisterMsg::GetOk(_, _)))
})
.property(Expectation::Sometimes, "put succeeds", |_, state| {
state.network.iter_deliverable()
.any(|e| matches!(e.msg, RegisterMsg::PutOk(_)))
})
.record_msg_in(RegisterMsg::record_returns)
.record_msg_out(RegisterMsg::record_invocations)
.checker().spawn_dfs().join();
//checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
checker.assert_discovery("linearizable", vec![
Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(1) },
Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(2, 'Z') },
Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(2) },
Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
Deliver { src: Id::from(1), dst: Id::from(0), msg: Get(3) },
Deliver { src: Id::from(0), dst: Id::from(1), msg: GetOk(3, 'A') },
]);
}
}
fn main() {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
spawn(
serde_json::to_vec,
|bytes| serde_json::from_slice(bytes),
vec![
(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000), ServerActor)
]).unwrap();
}
Actor Framework Intro
The code implements a simple server using the actor model in which an "actor" is an object that can respond to events (such as timeouts or message receipt) and in turn updates its internal state and generates outputs (such as sending a message or setting a timer).
If you are familiar with the actor model (e.g. via the Erlang language or the Akka library), then it is useful to note distinguishing characteristics of Stateright's approach:
- Stateright must have visibility of every input and output to facilitate
simulating all possible system behaviors. That means inputs and outputs must
be in the form of messages. For example, if your actor needs to interface
with a database, you might introduce
DbExec(...)
output andDbResult(...)
input messages; or if it needs to interface with the file system, you might introduceFileRead(...)
output andFileResult(...)
input messages. An adapter layer would then translate these into/from the corresponding effects rather than treating them as standard messages between actors in the system. This technique will be demonstrated in a later chapter. - Outputs do not take effect until after the handler returns. The outputs
are simply collected in the
o: &mut Out<Self>
parameter whenever methods such asOut::send(...)
are called, and Stateright's actor runtime sends them later. - The actor state is only accessible via a clone-on-write cell with the
state: &mut Cow<Self::State>
parameter. Doing so enables Stateright to more efficiently validate a system when it is enumerating different branches of nondeterministic behavior.
Model Checking in More Detail
The reference to "enumerating different branches of nondeterministic behavior" deserves additional explanation. The nondeterminism within a nondeterministic system is never completely open ended. Instead there are "decision points" that arise, where a "decision" isn't directly made by your code but rather is the weighted random outcome of many factors. For example, if two clients concurrently initiate requests to a service, then the initial "decision" would be:
- the first client's request is delivered causing the service to respond,
- the second client's request is delivered causing the service to respond,
- or both clients time out while contacting the service.
If option 1 occurs, then it can be followed by option 2, and vice versa. In both cases, timeouts are also possible. In this manner the possible behaviors of a nondeterministic system can be seen as a decision tree.
At each of these decision points, Stateright will explore one of the outcomes and then backtrack to expore the other outcome. This is the distinguishing characteristic of model checking in comparison with random testing. Also, each gray box indicates a potential state of the aggregate system (where the aggregate system state includes the client state, the service state, and the network state), and the collection of all potential aggregate states is known as the "state space."
Implementation Walkthrough
The server responds to Put
and Get
messages based only on its own local
state, providing its clients with a simple form of distributed storage,
sometimes known as a shared
register. Responses are linked
to requests via a request ID chosen by the client.
type RequestId = u64;
#[derive(Clone)]
struct ServerActor;
impl Actor for ServerActor {
type Msg = RegisterMsg<RequestId, char, ()>;
type State = char;
type Timer = ();
fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
'?' // default value for the register
}
fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
src: Id, msg: Self::Msg, o: &mut Out<Self>) {
match msg {
RegisterMsg::Put(req_id, value) => {
*state.to_mut() = value;
o.send(src, RegisterMsg::PutOk(req_id));
}
RegisterMsg::Get(req_id) => {
o.send(src, RegisterMsg::GetOk(req_id, **state));
}
_ => {}
}
}
}
A test follows. The test checks the system for a property called linearizability, which loosely speaking means that the visible behavior of the register emulated by the actor system is identical to that of a register within a single-threaded system. In the words of the individuals who coined the term:
Linearizability provides the illusion that each operation applied by concurrent processes takes effect instantaneously at some point between its invocation and its response, implying that the meaning of a concurrent object’s operations can be given by pre- and post-conditions.
- Maurice Herlihy and Jeannette Wing, in Linearizability: A Correctness Condition for Concurrent Objects
An important aspect of linearizability is the notion of a "sequential specification," which serves as a reference for correct behavior of the system. In other words, the system emulates the sequential specification. For instance, the sequential specification could indicate:
- the system behaves like a memory cell (i.e. register semantics),
- the system behaves like a queue,
- or the system behaves like a stack.
That means that when someone indicates that a system is linearizable, it is important to keep in mind the question "linearizable with respect to what?" In this chapter, the sequential specification is register semantics, provided by Stateright, but later chapters will involve other sequential specifications.
The test leverages
RegisterActor
,
which is built into Stateright and defines a kind of test fixture in which
clients (only 1 in this case) write distinct values and independently read
values without coordinating with one another.
#[test]
fn is_unfortunately_not_linearizable() {
let checker = ActorModel::new(
(),
LinearizabilityTester::new(Register('?'))
)
.actor(RegisterActor::Server(ServerActor))
.actor(RegisterActor::Client { put_count: 2, server_count: 1 })
.property(Expectation::Always, "linearizable", |_, state| {
state.history.serialized_history().is_some()
})
.property(Expectation::Sometimes, "get succeeds", |_, state| {
state.network.iter_deliverable()
.any(|e| matches!(e.msg, RegisterMsg::GetOk(_, _)))
})
.property(Expectation::Sometimes, "put succeeds", |_, state| {
state.network.iter_deliverable()
.any(|e| matches!(e.msg, RegisterMsg::PutOk(_)))
})
.record_msg_in(RegisterMsg::record_returns)
.record_msg_out(RegisterMsg::record_invocations)
.checker().spawn_dfs().join();
//checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
checker.assert_discovery("linearizable", vec![
Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(1) },
Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(2, 'Z') },
Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(2) },
Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
Deliver { src: Id::from(1), dst: Id::from(0), msg: Get(3) },
Deliver { src: Id::from(0), dst: Id::from(1), msg: GetOk(3, 'A') },
]);
}
Stateright is able to find a bug that arises even if there is only a single
client. The test indicates a sequence of steps that trigger the bug (AKA a
Path
), but
in practice you would normally just call checker.assert_properties()
, and
Stateright would fail the test while indicating steps that reproduce the bug
(although the specific example that it finds can vary).
The actor with Id
0 is the server while the actor with Id
1 is the client.
For brevity, the example shows actor inputs (Deliver
) but not outputs.
- The server receives a
Put
from the client with value'A'
, which it acknowledges. The client receives thePutOk
acknowledgement and in turn sends a secondPut
with a new value,'Z'
(not shown yet since the test indicates message deliveries only, not message sends).Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') }, Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(1) },
- The server receives the second
Put
, which it acknowledges. The client receives thePutOk
acknowledgement and in turn sends aGet
request (not shown yet), expecting to read'Z'
.Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(2, 'Z') }, Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(2) },
- The network redelivers the first write, inadvertently overwriting the second:
Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
- The server receives the earlier
Get
request and replies with'A'
. The client receives the unexpected value, which violates linearizability because from the perspective of the client, the system is not behaving as a single-threaded register.Deliver { src: Id::from(1), dst: Id::from(0), msg: Get(3) }, Deliver { src: Id::from(0), dst: Id::from(1), msg: GetOk(3, 'A') },
The last bit of code defines the main
method, which allows you to run the
actor on UDP port 3000, encoding messages with the JSON format.
fn main() {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
spawn(
serde_json::to_vec,
|bytes| serde_json::from_slice(bytes),
vec![
(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000), ServerActor)
]).unwrap();
}
Running
Confirm the system behaves as expected by running the test, which should pass
because the test asserts that the bug exists. Include the --release
flag so
that Rust fully optimizes the code even during testing, as Stateright tests are
computationally intensive and can be time consuming.
cargo test --release
Now run the actor on a UDP socket.
cargo run --release
If using a POSIX-oriented operating system,
netcat can be used to interact with the
actor from a different terminal window. Actor responses are omitted from the
listing below for clarity, but you will see messages such as {"PutOk":0}
printed to STDOUT. Numbers in the messages are request IDs, the importance of
which will be more evident in the next chapter.
nc -u localhost 3000
{"Put":[0,"X"]}
{"Get":1}
{"Put":[2,"X"]}
{"Get":3}
Exercise
Uncomment the // TRY IT
line, then run the test again. It should fail
indicating a sequence of steps that would cause the linearizability expectation
to be violated, and these steps may differ from the example that we followed.
This exercise demonstrates how Stateright can detect flaws that would likely go
undetected when simply reviewing code.
Summary
This chapter introduced one of the simplest possible distributed systems and showed how Stateright can find a subtle bug. The next chapter Taming the Network will address that bug.
Chapter 2: Taming the Network
In the last chapter we discovered a bug caused by the network's susceptibility to message redelivery. We address that in this chapter.
Initialize a new Rust project:
mkdir taming-the-network
cd taming-the-network
cargo init
Then add dependencies to Cargo.toml
:
[package]
name = "taming-the-network"
version = "0.1.0"
edition = "2018"
[dependencies]
env_logger = "0.7"
serde_json = "1.0"
stateright = "0.30"
Here is the complete implementation for main.rs
, explained below:
use stateright::actor::{*, register::*};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::net::{SocketAddrV4, Ipv4Addr};
#[derive(Clone)]
struct ServerActor;
type RequestId = u64;
#[derive(Clone, Debug, Hash, PartialEq)]
struct ActorState {
value: char,
delivered: BTreeSet<(Id, RequestId)>,
}
impl Actor for ServerActor {
type Msg = RegisterMsg<RequestId, char, ()>;
type State = ActorState;
type Timer = ();
fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
ActorState {
value: '?',
delivered: Default::default(),
}
}
fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
src: Id, msg: Self::Msg, o: &mut Out<Self>) {
match msg {
RegisterMsg::Put(req_id, value) => {
if state.delivered.contains(&(src, req_id)) { return }
let mut state = state.to_mut();
state.value = value;
state.delivered.insert((src, req_id));
o.send(src, RegisterMsg::PutOk(req_id));
}
RegisterMsg::Get(req_id) => {
o.send(src, RegisterMsg::GetOk(req_id, state.value));
}
_ => {}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use stateright::{*, semantics::*, semantics::register::*};
use ActorModelAction::Deliver;
use RegisterMsg::{Get, GetOk, Put, PutOk};
#[test]
fn satisfies_all_properties() {
// Works with 1 client.
base_model()
.actor(RegisterActor::Server(ServerActor))
.actor(RegisterActor::Client { put_count: 2, server_count: 1 })
.checker().spawn_dfs().join()
.assert_properties();
// Or with multiple clients.
// (TIP: test with `--release` mode for more clients)
base_model()
.actor(RegisterActor::Server(ServerActor))
.actor(RegisterActor::Client { put_count: 1, server_count: 1 })
.actor(RegisterActor::Client { put_count: 1, server_count: 1 })
.checker().spawn_dfs().join()
.assert_properties();
}
#[test]
fn not_linearizable_with_two_servers() {
let checker = base_model()
.actor(RegisterActor::Server(ServerActor))
.actor(RegisterActor::Server(ServerActor))
.actor(RegisterActor::Client { put_count: 2, server_count: 2 })
.checker().spawn_dfs().join();
//checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
checker.assert_discovery("linearizable", vec![
Deliver { src: Id::from(2), dst: Id::from(0), msg: Put(2, 'A') },
Deliver { src: Id::from(0), dst: Id::from(2), msg: PutOk(2) },
Deliver { src: Id::from(2), dst: Id::from(1), msg: Put(4, 'Z') },
Deliver { src: Id::from(1), dst: Id::from(2), msg: PutOk(4) },
Deliver { src: Id::from(2), dst: Id::from(0), msg: Get(6) },
Deliver { src: Id::from(0), dst: Id::from(2), msg: GetOk(6, 'A') },
]);
}
fn base_model()
-> ActorModel<
RegisterActor<ServerActor>,
(),
LinearizabilityTester<Id, Register<char>>>
{
ActorModel::new(
(),
LinearizabilityTester::new(Register('?'))
)
.property(Expectation::Always, "linearizable", |_, state| {
state.history.serialized_history().is_some()
})
.property(Expectation::Sometimes, "get succeeds", |_, state| {
state.network.iter_deliverable()
.any(|e| matches!(e.msg, RegisterMsg::GetOk(_, _)))
})
.property(Expectation::Sometimes, "put succeeds", |_, state| {
state.network.iter_deliverable()
.any(|e| matches!(e.msg, RegisterMsg::PutOk(_)))
})
.record_msg_in(RegisterMsg::record_returns)
.record_msg_out(RegisterMsg::record_invocations)
}
}
// Running the program spawns a single actor on UDP port 3000. Messages are JSON-serialized.
fn main() {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
spawn(
serde_json::to_vec,
|bytes| serde_json::from_slice(bytes),
vec![
(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000), ServerActor)
]).unwrap();
}
Implementation Walkthrough
Addressing the linearizability issue is merely a matter of throwing away message redeliveries. The simplest approach is to record every delivered request ID.
type RequestId = u64;
#[derive(Clone, Debug, Hash, PartialEq)]
struct ActorState {
value: char,
delivered: BTreeSet<(Id, RequestId)>,
}
impl Actor for ServerActor {
type Msg = RegisterMsg<RequestId, char, ()>;
type State = ActorState;
type Timer = ();
fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
ActorState {
value: '?',
delivered: Default::default(),
}
}
fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
src: Id, msg: Self::Msg, o: &mut Out<Self>) {
match msg {
RegisterMsg::Put(req_id, value) => {
if state.delivered.contains(&(src, req_id)) { return }
let mut state = state.to_mut();
state.value = value;
state.delivered.insert((src, req_id));
o.send(src, RegisterMsg::PutOk(req_id));
}
RegisterMsg::Get(req_id) => {
o.send(src, RegisterMsg::GetOk(req_id, state.value));
}
_ => {}
}
}
}
With that small change, each server provides an independent linearizable register. In the presence of messages concurrently in flight, the register abstraction is still linearizable (for example, reads cannot observe values overwritten before the read began, among other characteristics).
The servers feature no replication, so a collection of servers does not provide a unified service that emulates a linearizable register as the servers will not generally agree upon the last value they received.
#[test]
fn satisfies_all_properties() {
// Works with 1 client.
base_model()
.actor(RegisterActor::Server(ServerActor))
.actor(RegisterActor::Client { put_count: 2, server_count: 1 })
.checker().spawn_dfs().join()
.assert_properties();
// Or with multiple clients.
// (TIP: test with `--release` mode for more clients)
base_model()
.actor(RegisterActor::Server(ServerActor))
.actor(RegisterActor::Client { put_count: 1, server_count: 1 })
.actor(RegisterActor::Client { put_count: 1, server_count: 1 })
.checker().spawn_dfs().join()
.assert_properties();
}
#[test]
fn not_linearizable_with_two_servers() {
let checker = base_model()
.actor(RegisterActor::Server(ServerActor))
.actor(RegisterActor::Server(ServerActor))
.actor(RegisterActor::Client { put_count: 2, server_count: 2 })
.checker().spawn_dfs().join();
//checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
checker.assert_discovery("linearizable", vec![
Deliver { src: Id::from(2), dst: Id::from(0), msg: Put(2, 'A') },
Deliver { src: Id::from(0), dst: Id::from(2), msg: PutOk(2) },
Deliver { src: Id::from(2), dst: Id::from(1), msg: Put(4, 'Z') },
Deliver { src: Id::from(1), dst: Id::from(2), msg: PutOk(4) },
Deliver { src: Id::from(2), dst: Id::from(0), msg: Get(6) },
Deliver { src: Id::from(0), dst: Id::from(2), msg: GetOk(6, 'A') },
]);
}
Those two tests are supported by a helper that sets up the model.
fn base_model()
-> ActorModel<
RegisterActor<ServerActor>,
(),
LinearizabilityTester<Id, Register<char>>>
{
ActorModel::new(
(),
LinearizabilityTester::new(Register('?'))
)
.property(Expectation::Always, "linearizable", |_, state| {
state.history.serialized_history().is_some()
})
.property(Expectation::Sometimes, "get succeeds", |_, state| {
state.network.iter_deliverable()
.any(|e| matches!(e.msg, RegisterMsg::GetOk(_, _)))
})
.property(Expectation::Sometimes, "put succeeds", |_, state| {
state.network.iter_deliverable()
.any(|e| matches!(e.msg, RegisterMsg::PutOk(_)))
})
.record_msg_in(RegisterMsg::record_returns)
.record_msg_out(RegisterMsg::record_invocations)
}
Suggested Exercises
- Compaction: Storing every request ID isn't viable for a long running process. The simplest approach is to require that request IDs are "monotonic" -- which means they are increasing (and gaps are acceptable). In that case, the delivery handler throws away messages with a request ID smaller than the last handled message. See if you can amend the example accordingly.
- Optimized Compaction: Reordered messages will be dropped because late delivered message will have a smaller request ID. Protocols need to account for the network dropping messages anyway, so generally speaking this tradeoff only impacts performance. Still, throughput can be improved by adding a "sliding window" buffer on the server side to minimize dropped messages. See if you can implement that.
- Lossless Link: One technique for minimizing message loss is to have
the client also maintain a buffer of outgoing messages, and the client
periodically resends messages that have not been acknowledged by the
recipient within a particular timeout period. TCP for example does this for
packets. See if you can implement this as well. If you need help, see
ordered_reliable_link.rs
in the Stateright repository.
Summary
This chapter showed how to fix the implementation from the previous chapter, maintaining linearizability even if messages are redelivered. The next chapter Seeking Consensus will introduce the concept of replication, which is used to provide (1) data recoverability in the event of a server crash and/or (2) improved performance for high request rates by distributing requests (such as reads) across a wider range of hosts.
Chapter 3: Seeking Consensus
In the last chapter we fixed a bug caused by the network's susceptibility to message redelivery, but our solution could only run on a single server. Introducing a second server would break linearizability as the system failed to replicate information between servers.
In this chapter we introduce a simple replication protocol in an attempt to address that shortcoming.
Once again, we start by initializing a new Rust project:
mkdir seeking-consensus
cd seeking-consensus
cargo init
Next we add dependencies to Cargo.toml
. Notice that we now need to include
the serde
package.
[package]
name = "seeking-consensus"
version = "0.1.0"
edition = "2018"
[dependencies]
env_logger = "0.7"
serde = "1.0"
serde_json = "1.0"
stateright = "0.30"
By now you have the hang of implementing basic actor systems in Stateright, so
we'll defer the full main.rs
source code listing until later in the chapter.
A Replication Protocol
First we get to decide on a replication protocol. Give this some thought, and see if you have ideas.
Exercise: Yes, really. Take some time to think about how you might add replication.
Done? Great!
For this example, we'll proceed with a protocol that simply involves forwarding the value to every peer server before replying to the client, thereby ensuring the servers agree on the value. This might be the protocol that you envisioned as well.
Implementation Walkthrough
The first notable difference is the need to introduce actor-specific configuration indicating each server's peers.
#[derive(Clone)]
struct ServerActor {
peers: Vec<Id>,
}
The next notable difference is the need to introduce a message type for replication.
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
#[derive(Deserialize, Serialize)]
enum InternalMsg {
Replicate(RequestId, char),
ReplicateOk(RequestId),
}
The server defers sending a PutOk
message until replicas reply, but
Stateright actors are nonblocking, so they must manage some additional state:
- the ID of the request, against which replica replies are matched (to guard against late responses),
- the ID of the client that made the request (to facilitate replying later),
- and the set of servers that have acknowledged the replicated value (to facilitate waiting until all have replied).
#[derive(Clone, Debug, Hash, PartialEq)]
struct ActorState {
value: char,
delivered: BTreeSet<(Id, RequestId)>,
in_flight_put: Option<PutState>,
}
#[derive(Clone, Debug, Hash, PartialEq)]
struct PutState {
req_id: RequestId,
src: Id,
peer_acks: BTreeSet<Id>,
}
We are now ready to implement the protocol.
fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
src: Id, msg: Self::Msg, o: &mut Out<Self>) {
match msg {
RegisterMsg::Put(req_id, value) if state.in_flight_put.is_none() => {
if state.delivered.contains(&(src, req_id)) { return }
let mut state = state.to_mut();
state.value = value;
state.delivered.insert((src, req_id));
state.in_flight_put = Some(PutState {
req_id,
src,
peer_acks: Default::default(),
});
for &peer_id in &self.peers {
o.send(peer_id,
RegisterMsg::Internal(
InternalMsg::Replicate(req_id, value)));
}
// Will not reply w/ `PutOk` until all replicas ack.
}
RegisterMsg::Get(req_id) => {
o.send(src, RegisterMsg::GetOk(req_id, state.value));
}
RegisterMsg::Internal(InternalMsg::Replicate(req_id, value)) => {
if state.delivered.contains(&(src, req_id)) { return }
let mut state = state.to_mut();
state.value = value;
state.delivered.insert((src, req_id));
o.send(src,
RegisterMsg::Internal(InternalMsg::ReplicateOk(req_id)));
}
RegisterMsg::Internal(InternalMsg::ReplicateOk(req_id)) => {
if state.delivered.contains(&(src, req_id)) { return }
let mut state = state.to_mut();
if let Some(put) = &mut state.in_flight_put {
if req_id != put.req_id { return }
put.peer_acks.insert(src);
if put.peer_acks.len() == self.peers.len() {
o.send(put.src, RegisterMsg::PutOk(req_id));
state.in_flight_put = None;
}
}
}
_ => {}
}
}
Now the big question: does this protocol solve the problem we ran into last chapter?
Unfortunately achieving linearizability involves a bit more sophistication, and Stateright identifies a sequence of steps that are not linearizable. The sequence is nontrivial and demonstrates why a model checker is so useful for implementing distributed systems.
#[test]
fn appears_linearizable_in_limited_scenarios() {
// Succeeds if there are 2 clients and 2 servers.
base_model()
.actor(RegisterActor::Server(ServerActor {
peers: Id::vec_from(vec![1]),
}))
.actor(RegisterActor::Server(ServerActor {
peers: Id::vec_from(vec![0]),
}))
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.checker().spawn_dfs().join()
.assert_properties();
}
#[test]
fn not_generally_linearizable() {
// Can fail if there are 3 clients.
let checker = base_model()
.actor(RegisterActor::Server(ServerActor {
peers: Id::vec_from(vec![1]),
}))
.actor(RegisterActor::Server(ServerActor {
peers: Id::vec_from(vec![0]),
}))
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.checker()
.spawn_dfs().join(); // TRY IT: Comment out this line, and uncomment
//.serve("0:3000"); // the next to load Stateright Explorer.
//checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
checker.assert_discovery("linearizable", vec![
Deliver { src: Id::from(4), dst: Id::from(0), msg: Put(4, 'C') },
Deliver { src: Id::from(0), dst: Id::from(1), msg: Internal(Replicate(4, 'C')) },
Deliver { src: Id::from(1), dst: Id::from(0), msg: Internal(ReplicateOk(4)) },
Deliver { src: Id::from(3), dst: Id::from(1), msg: Put(3, 'B') },
Deliver { src: Id::from(1), dst: Id::from(0), msg: Internal(Replicate(3, 'B')) },
Deliver { src: Id::from(0), dst: Id::from(1), msg: Internal(ReplicateOk(3)) },
Deliver { src: Id::from(1), dst: Id::from(3), msg: PutOk(3) },
Deliver { src: Id::from(2), dst: Id::from(0), msg: Put(2, 'A') },
Deliver { src: Id::from(3), dst: Id::from(0), msg: Get(6) },
Deliver { src: Id::from(0), dst: Id::from(3), msg: GetOk(6, 'A') },
Deliver { src: Id::from(0), dst: Id::from(4), msg: PutOk(4) },
Deliver { src: Id::from(4), dst: Id::from(1), msg: Get(8) },
Deliver { src: Id::from(1), dst: Id::from(4), msg: GetOk(8, 'B') },
]);
}
fn base_model()
-> ActorModel<
RegisterActor<ServerActor>,
(),
LinearizabilityTester<Id, Register<char>>>
{
ActorModel::new(
(),
LinearizabilityTester::new(Register('?'))
)
.property(Expectation::Always, "linearizable", |_, state| {
state.history.serialized_history().is_some()
})
.property(Expectation::Sometimes, "value chosen", |_, state| {
state.network.iter_deliverable().any(|e| {
if let RegisterMsg::GetOk(_, value) = e.msg {
return *value != '?';
}
return false
})
})
.record_msg_in(RegisterMsg::record_returns)
.record_msg_out(RegisterMsg::record_invocations)
}
Stateright Explorer
It's not immediately clear why the sequence of steps identified by Stateright violates linearizability. Luckily Stateright includes a web UI that can help you understand scenarios such as this one.
Stateright Explorer is started by calling
serve(...)
.
You can easily do this by following the directions in the first // TRY IT
line,
which will suspend the test when it is next run, allowing you to load
http://localhost:3000
in your web browser to debug.
Tip: Model checking with Stateright Explorer is breadth-first, as that tends to find shorter discovery paths than depth-first search. One downside of this approach is that breadth-first search consumes more memory, so Explorer works best with relatively small state spaces (hundreds of thousands of states rather than millions of states, for example).
When you load Stateright Explorer, you'll see the checker status in the upper
left corner. Within a few seconds the checker should add "linearizable" counterexample
to its list of discoveries.
Click that link to load the discovery.
The first thing you might notice is the sequence diagram.
Tracing backwards from the last event, we can see why linearizability is violated:
GetOk(8, 'B')
indicates that'B'
is the earliest write finishing before the read. Also, no operations are concurrent with the read.PutOk(4)
was in response to the long runningPut(4, 'C')
operation, indicating that the value'C'
must have been written at some point between invocation and response. Unlike the read, the precise sequencing of this write in relation to other operations is indeterminate due to concurrency).GetOk(6, 'A')
indicates that'A'
is the earliest write finishing before that read. Only the write of'C'
is concurrent with the start and end of the read, so it's possible that the write took effect before or after the read took effect.
We don't have to trace any further back, as those observations above highlight
the anomaly: 'A'
had been the most recent write, then 'C'
may or may not
have been written next (as the concurrency allows different linearizations),
and finally 'B'
was read. No linearization of concurrent operations can
reconcile this anomaly, so the protocol is not linearizable. QED.
Complete Implementation
Here is the complete implementation for main.rs
:
use serde::{Deserialize, Serialize};
use stateright::actor::{*, register::*};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::net::{SocketAddrV4, Ipv4Addr};
#[derive(Clone)]
struct ServerActor {
peers: Vec<Id>,
}
type RequestId = u64;
#[derive(Clone, Debug, Hash, Eq, PartialEq)]
#[derive(Deserialize, Serialize)]
enum InternalMsg {
Replicate(RequestId, char),
ReplicateOk(RequestId),
}
#[derive(Clone, Debug, Hash, PartialEq)]
struct ActorState {
value: char,
delivered: BTreeSet<(Id, RequestId)>,
in_flight_put: Option<PutState>,
}
#[derive(Clone, Debug, Hash, PartialEq)]
struct PutState {
req_id: RequestId,
src: Id,
peer_acks: BTreeSet<Id>,
}
impl Actor for ServerActor {
type Msg = RegisterMsg<RequestId, char, InternalMsg>;
type State = ActorState;
type Timer = ();
fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
ActorState {
value: '?',
delivered: Default::default(),
in_flight_put: None,
}
}
fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
src: Id, msg: Self::Msg, o: &mut Out<Self>) {
match msg {
RegisterMsg::Put(req_id, value) if state.in_flight_put.is_none() => {
if state.delivered.contains(&(src, req_id)) { return }
let mut state = state.to_mut();
state.value = value;
state.delivered.insert((src, req_id));
state.in_flight_put = Some(PutState {
req_id,
src,
peer_acks: Default::default(),
});
for &peer_id in &self.peers {
o.send(peer_id,
RegisterMsg::Internal(
InternalMsg::Replicate(req_id, value)));
}
// Will not reply w/ `PutOk` until all replicas ack.
}
RegisterMsg::Get(req_id) => {
o.send(src, RegisterMsg::GetOk(req_id, state.value));
}
RegisterMsg::Internal(InternalMsg::Replicate(req_id, value)) => {
if state.delivered.contains(&(src, req_id)) { return }
let mut state = state.to_mut();
state.value = value;
state.delivered.insert((src, req_id));
o.send(src,
RegisterMsg::Internal(InternalMsg::ReplicateOk(req_id)));
}
RegisterMsg::Internal(InternalMsg::ReplicateOk(req_id)) => {
if state.delivered.contains(&(src, req_id)) { return }
let mut state = state.to_mut();
if let Some(put) = &mut state.in_flight_put {
if req_id != put.req_id { return }
put.peer_acks.insert(src);
if put.peer_acks.len() == self.peers.len() {
o.send(put.src, RegisterMsg::PutOk(req_id));
state.in_flight_put = None;
}
}
}
_ => {}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use stateright::{*, semantics::*, semantics::register::*};
use ActorModelAction::Deliver;
use InternalMsg::{Replicate, ReplicateOk};
use RegisterMsg::{Get, GetOk, Internal, Put, PutOk};
#[test]
fn appears_linearizable_in_limited_scenarios() {
// Succeeds if there are 2 clients and 2 servers.
base_model()
.actor(RegisterActor::Server(ServerActor {
peers: Id::vec_from(vec![1]),
}))
.actor(RegisterActor::Server(ServerActor {
peers: Id::vec_from(vec![0]),
}))
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.checker().spawn_dfs().join()
.assert_properties();
}
#[test]
fn not_generally_linearizable() {
// Can fail if there are 3 clients.
let checker = base_model()
.actor(RegisterActor::Server(ServerActor {
peers: Id::vec_from(vec![1]),
}))
.actor(RegisterActor::Server(ServerActor {
peers: Id::vec_from(vec![0]),
}))
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.checker()
.spawn_dfs().join(); // TRY IT: Comment out this line, and uncomment
//.serve("0:3000"); // the next to load Stateright Explorer.
//checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
checker.assert_discovery("linearizable", vec![
Deliver { src: Id::from(4), dst: Id::from(0), msg: Put(4, 'C') },
Deliver { src: Id::from(0), dst: Id::from(1), msg: Internal(Replicate(4, 'C')) },
Deliver { src: Id::from(1), dst: Id::from(0), msg: Internal(ReplicateOk(4)) },
Deliver { src: Id::from(3), dst: Id::from(1), msg: Put(3, 'B') },
Deliver { src: Id::from(1), dst: Id::from(0), msg: Internal(Replicate(3, 'B')) },
Deliver { src: Id::from(0), dst: Id::from(1), msg: Internal(ReplicateOk(3)) },
Deliver { src: Id::from(1), dst: Id::from(3), msg: PutOk(3) },
Deliver { src: Id::from(2), dst: Id::from(0), msg: Put(2, 'A') },
Deliver { src: Id::from(3), dst: Id::from(0), msg: Get(6) },
Deliver { src: Id::from(0), dst: Id::from(3), msg: GetOk(6, 'A') },
Deliver { src: Id::from(0), dst: Id::from(4), msg: PutOk(4) },
Deliver { src: Id::from(4), dst: Id::from(1), msg: Get(8) },
Deliver { src: Id::from(1), dst: Id::from(4), msg: GetOk(8, 'B') },
]);
}
fn base_model()
-> ActorModel<
RegisterActor<ServerActor>,
(),
LinearizabilityTester<Id, Register<char>>>
{
ActorModel::new(
(),
LinearizabilityTester::new(Register('?'))
)
.property(Expectation::Always, "linearizable", |_, state| {
state.history.serialized_history().is_some()
})
.property(Expectation::Sometimes, "value chosen", |_, state| {
state.network.iter_deliverable().any(|e| {
if let RegisterMsg::GetOk(_, value) = e.msg {
return *value != '?';
}
return false
})
})
.record_msg_in(RegisterMsg::record_returns)
.record_msg_out(RegisterMsg::record_invocations)
}
}
// Running the program spawns actors on UDP ports 3000-3002. Messages are JSON-serialized.
fn main() {
env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
let id0 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000));
let id1 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3001));
let id2 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3002));
spawn(
serde_json::to_vec,
|bytes| serde_json::from_slice(bytes),
vec![
(id0, ServerActor { peers: vec![id1, id2].into_iter().collect() } ),
(id1, ServerActor { peers: vec![id0, id2].into_iter().collect() } ),
(id2, ServerActor { peers: vec![id0, id1].into_iter().collect() } ),
]).unwrap();
}
Suggested Exercise
Uncomment the second // TRY IT
line to cause the test to fail, delete the
linearizability counterexample assertion since it may not longer apply, and see
if you can amend the actor implementation to make the test pass. The next
chapter will provide a solution, but going through the exercise of trying to
design a solution youself will help convey how subtle bugs can be with
distributed protocols, and hopefully it will demonstrate why Stateright is so
useful for this problem space.
Reminder: use
cargo test --release
when running the tests for dramatically better model checking performance. Running tests without that flag may result in significant delays. Also, try using Stateright Explorer for debugging as needed.
Summary
This chapter introduced replication, and Stateright was able to find a bug in
our replication protocol. The next chapter, Achieving Linearizability
introduces a more
sophisticated protocol that makes the replicated register linearizable.
Chapter 4: Achieving Linearizability
In the last chapter we implemented a faulty replication protocol that violated linearizability because the emulated register was not atomic. This chapter introduces a more sophisticated replication protocol that the author first learned about when reading the blog post "Replicated/Fault-tolerant atomic storage" by Murat Demirbas.
As usual, we start by initializing a new Rust project:
mkdir achieving-linearizability
cd achieving-linearizability
cargo init
Then we define dependencies.
[package]
name = "achieving-linearizability"
version = "0.1.0"
edition = "2018"
[dependencies]
env_logger = "0.7"
num_cpus = "1"
serde = "1.0"
serde_json = "1.0"
stateright = "0.30"
A Truly Atomic Register
One problem with the earlier protocol is that reads can observe different values depending on which server accepts the read. We need to amend the protocol to ensure that given the same aggregate system state, any two reads are guaranteed to observe the same value. The simplest solution is to query every server when servicing a read operation, but that introduces yet another problem: availability. By forcing reads to query every server, if a single server becomes unavailable, the entire system becomes unavailable. This problem also exists for writes in that replication protocol.
The trick to solving this new problem is observing that we only need to ensure that "read sets" overlap one another. If a majority of servers agree on a value, then we know that any other majority of servers must either agree on the same value or must have a different value that is not part of the majority. We can apply the same logic to the "write set" as well. We call the set of sufficient reads a "read quorum" and the set of sufficient writes a "write quorum."
Leveraging read and write quorums solves the availability problem when a minority of servers are unreachable, but we still have a second availability problem: in many cases a read set will not agree on a value. This can happen if the read is concurrent with a write or if two earlier writes were concurrent. In those cases, the server accepting the read must either ignore the request or return an error, upon which we can improve.
To solve this remaining problem, we simply need to force the system into agreement, thereby ensuring any subsequent read either observes the forced agreement or a subsequent write. This is the technique employed by Attiya, Bar-Noy, and Dolev in their paper "Sharing Memory Robustly in Message-Passing Systems".
Both read and write operations are handled in two phases: (1) a query phase followed by (2) a replication phase.
- 1. Query Phase: In the query phase, a server finds out what replicated values a quorum of replicas has previously accepted. The replicas also return a logical clock that serves to sequence earlier write operations, so the implementation will refer to this as a sequencer.
- 2a. Write Replication Phase: If servicing a write, then upon receiving responses from a quorum, the server replicates the chosen value along with a slightly larger sequencer than the one observed. Once a majority of the replicas ack this second phase, the server can indicate to the client that the write is complete.
- 2b. Read Replication Phase: If servicing a read, then upon receiving responses from a quorum, the server replicates the value with the largest observed sequencer. Once a majority of the replicas ack this second phase, the server can return the value that it replicated.
Implementation Walkthrough
We first define our message type. Abd...
in the type refers to the names of
the algorithm's original author's -- Attiya, Bar-Noy, and Dolev -- or "ABD."
type RequestId = u64;
type Value = char;
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[derive(Serialize, Deserialize)]
enum AbdMsg {
Query(RequestId),
AckQuery(RequestId, Seq, Value),
Replicate(RequestId, Seq, Value),
AckReplicate(RequestId),
}
The current implementation combines the roles of "replica" and "coordinator
that accepts a request and facilitates replication." seq
and val
are for
the first role, while phase
tracks information needed by the second role.
Of particular note is that for phase 1, writes need to remember what value to
later replicate, which is stored in write
. Conversely for phase 2, reads need
to remember what value to later return to the client, which is stored in
read
.
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct AbdState {
seq: Seq,
val: Value,
phase: Option<AbdPhase>,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum AbdPhase {
Phase1 {
request_id: RequestId,
requester_id: Id,
write: Option<Value>, // `None` for read
responses: BTreeMap<Id, (Seq, Value)>,
},
Phase2 {
request_id: RequestId,
requester_id: Id,
read: Option<Value>, // `None` for write
acks: BTreeSet<Id>,
},
}
type Seq = (LogicalClock, Id); // `Id` for uniqueness
type LogicalClock = u64;
We are now ready to implement the protocol. Note that the implementation intentionally avoids decomposing the message handlers into different function calls because each call needs to manage shared actor state. Spreading state mutation across multiple locations in the source code arguably makes the implementation harder to follow (much like mutable global variables), and functions are only used where they can be reasoned about locally.
An alternative composition strategy that does often work well for actor systems
involves distinguishing roles so that each has less state (having different
AbdServer
and AbdReplica
actors for example), although we will not do that
for this short example. The next chapter on the other hand will demonstrate how
to compose a system of different actor types.
#[derive(Clone)]
struct AbdActor {
peers: Vec<Id>,
}
impl Actor for AbdActor {
type Msg = RegisterMsg<RequestId, Value, AbdMsg>;
type State = AbdState;
type Timer = ();
fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
AbdState {
seq: (0, Id::from(0)),
val: '?',
phase: None,
}
}
fn on_msg(&self, id: Id, state: &mut Cow<Self::State>,
src: Id, msg: Self::Msg, o: &mut Out<Self>) {
use RegisterMsg::*;
match msg {
Put(req_id, val) if state.phase.is_none() => {
o.broadcast(&self.peers, &Internal(Query(req_id)));
state.to_mut().phase = Some(AbdPhase::Phase1 {
request_id: req_id,
requester_id: src,
write: Some(val),
responses: {
let mut responses = BTreeMap::default();
responses.insert(id, (state.seq, state.val.clone()));
responses
},
});
}
Get(req_id) if state.phase.is_none() => {
o.broadcast(&self.peers, &Internal(Query(req_id)));
state.to_mut().phase = Some(AbdPhase::Phase1 {
request_id: req_id,
requester_id: src,
write: None,
responses: {
let mut responses = BTreeMap::default();
responses.insert(id, (state.seq, state.val.clone()));
responses
},
});
}
Internal(Query(req_id)) => {
o.send(
src,
Internal(AckQuery(req_id, state.seq, state.val.clone())));
}
Internal(AckQuery(expected_req_id, seq, val))
if matches!(state.phase,
Some(AbdPhase::Phase1 { request_id, .. })
if request_id == expected_req_id) =>
{
let mut state = state.to_mut();
if let Some(AbdPhase::Phase1 {
request_id: req_id,
requester_id: requester,
write,
responses,
..
}) = &mut state.phase {
responses.insert(src, (seq, val));
if responses.len() == majority(self.peers.len() + 1) {
// Quorum reached. Move to phase 2.
// Determine sequencer and value.
let (_, (seq, val)) = responses.into_iter()
.max_by_key(|(_, (seq, _))| *seq)
.unwrap();
let mut seq = *seq;
let mut read = None;
let val = if let Some(val) = std::mem::take(write) {
seq = (seq.0 + 1, id);
val
} else {
read = Some(val.clone());
val.clone()
};
o.broadcast(
&self.peers,
&Internal(Replicate(*req_id, seq, val.clone())));
// Self-send `Replicate`.
if seq > state.seq {
state.seq = seq;
state.val = val;
}
// Self-send `AckReplicate`.
let mut acks = BTreeSet::default();
acks.insert(id);
state.phase = Some(AbdPhase::Phase2 {
request_id: *req_id,
requester_id: std::mem::take(requester),
read,
acks,
});
}
}
}
Internal(Replicate(req_id, seq, val)) => {
o.send(src, Internal(AckReplicate(req_id)));
if seq > state.seq {
let mut state = state.to_mut();
state.seq = seq;
state.val = val;
}
}
Internal(AckReplicate(expected_req_id))
if matches!(state.phase,
Some(AbdPhase::Phase2 { request_id, ref acks, .. })
if request_id == expected_req_id && !acks.contains(&src)) =>
{
let mut state = state.to_mut();
if let Some(AbdPhase::Phase2 {
request_id: req_id,
requester_id: requester,
read,
acks,
..
}) = &mut state.phase {
acks.insert(src);
if acks.len() == majority(self.peers.len() + 1) {
let msg = if let Some(val) = read {
GetOk(*req_id, std::mem::take(val))
} else {
PutOk(*req_id)
};
o.send(*requester, msg);
state.phase = None;
}
}
}
_ => {}
}
}
}
The test case confirms that this implementation is linearizable provided that
the network does not redeliver messages. Performing thorough model checking may
take up to several minutes (depending on your system's performance) and will
only be performed with a --release
build.
#[test]
fn is_linearizable_quick() {
let checker = base_model()
.actor(RegisterActor::Server(AbdActor {
peers: Id::vec_from(vec![1]),
}))
.actor(RegisterActor::Server(AbdActor {
peers: Id::vec_from(vec![0]),
}))
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.checker().threads(num_cpus::get()).spawn_dfs().join();
checker.assert_properties();
assert_eq!(checker.unique_state_count(), 544);
}
#[test]
#[cfg_attr(debug_assertions, ignore = "enabled for --release only")]
fn is_linearizable() {
let checker = base_model()
.actor(RegisterActor::Server(AbdActor {
peers: Id::vec_from(vec![1, 2]),
}))
.actor(RegisterActor::Server(AbdActor {
peers: Id::vec_from(vec![0, 2]),
}))
.actor(RegisterActor::Server(AbdActor {
peers: Id::vec_from(vec![0, 1]),
}))
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.checker().threads(num_cpus::get()).spawn_dfs().join();
checker.assert_properties();
assert_eq!(checker.unique_state_count(), 37_168_889);
}
fn base_model()
-> ActorModel<
RegisterActor<AbdActor>,
(),
LinearizabilityTester<Id, Register<char>>>
{
ActorModel::new(
(),
LinearizabilityTester::new(Register('?'))
)
.init_network(Network::new_unordered_nonduplicating([]))
.property(Expectation::Always, "linearizable", |_, state| {
state.history.serialized_history().is_some()
})
.property(Expectation::Sometimes, "value chosen", |_, state| {
state.network.iter_deliverable().any(|e| {
if let RegisterMsg::GetOk(_, value) = e.msg {
return *value != '?';
}
return false
})
})
.record_msg_in(RegisterMsg::record_returns)
.record_msg_out(RegisterMsg::record_invocations)
}
Complete Implementation
Here is the complete implementation for main.rs
:
use serde::{Deserialize, Serialize};
use stateright::actor::{*, register::*};
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::hash::Hash;
use std::net::{SocketAddrV4, Ipv4Addr};
type RequestId = u64;
type Value = char;
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[derive(Serialize, Deserialize)]
enum AbdMsg {
Query(RequestId),
AckQuery(RequestId, Seq, Value),
Replicate(RequestId, Seq, Value),
AckReplicate(RequestId),
}
use AbdMsg::*;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct AbdState {
seq: Seq,
val: Value,
phase: Option<AbdPhase>,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum AbdPhase {
Phase1 {
request_id: RequestId,
requester_id: Id,
write: Option<Value>, // `None` for read
responses: BTreeMap<Id, (Seq, Value)>,
},
Phase2 {
request_id: RequestId,
requester_id: Id,
read: Option<Value>, // `None` for write
acks: BTreeSet<Id>,
},
}
type Seq = (LogicalClock, Id); // `Id` for uniqueness
type LogicalClock = u64;
#[derive(Clone)]
struct AbdActor {
peers: Vec<Id>,
}
impl Actor for AbdActor {
type Msg = RegisterMsg<RequestId, Value, AbdMsg>;
type State = AbdState;
type Timer = ();
fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
AbdState {
seq: (0, Id::from(0)),
val: '?',
phase: None,
}
}
fn on_msg(&self, id: Id, state: &mut Cow<Self::State>,
src: Id, msg: Self::Msg, o: &mut Out<Self>) {
use RegisterMsg::*;
match msg {
Put(req_id, val) if state.phase.is_none() => {
o.broadcast(&self.peers, &Internal(Query(req_id)));
state.to_mut().phase = Some(AbdPhase::Phase1 {
request_id: req_id,
requester_id: src,
write: Some(val),
responses: {
let mut responses = BTreeMap::default();
responses.insert(id, (state.seq, state.val.clone()));
responses
},
});
}
Get(req_id) if state.phase.is_none() => {
o.broadcast(&self.peers, &Internal(Query(req_id)));
state.to_mut().phase = Some(AbdPhase::Phase1 {
request_id: req_id,
requester_id: src,
write: None,
responses: {
let mut responses = BTreeMap::default();
responses.insert(id, (state.seq, state.val.clone()));
responses
},
});
}
Internal(Query(req_id)) => {
o.send(
src,
Internal(AckQuery(req_id, state.seq, state.val.clone())));
}
Internal(AckQuery(expected_req_id, seq, val))
if matches!(state.phase,
Some(AbdPhase::Phase1 { request_id, .. })
if request_id == expected_req_id) =>
{
let mut state = state.to_mut();
if let Some(AbdPhase::Phase1 {
request_id: req_id,
requester_id: requester,
write,
responses,
..
}) = &mut state.phase {
responses.insert(src, (seq, val));
if responses.len() == majority(self.peers.len() + 1) {
// Quorum reached. Move to phase 2.
// Determine sequencer and value.
let (_, (seq, val)) = responses.into_iter()
.max_by_key(|(_, (seq, _))| *seq)
.unwrap();
let mut seq = *seq;
let mut read = None;
let val = if let Some(val) = std::mem::take(write) {
seq = (seq.0 + 1, id);
val
} else {
read = Some(val.clone());
val.clone()
};
o.broadcast(
&self.peers,
&Internal(Replicate(*req_id, seq, val.clone())));
// Self-send `Replicate`.
if seq > state.seq {
state.seq = seq;
state.val = val;
}
// Self-send `AckReplicate`.
let mut acks = BTreeSet::default();
acks.insert(id);
state.phase = Some(AbdPhase::Phase2 {
request_id: *req_id,
requester_id: std::mem::take(requester),
read,
acks,
});
}
}
}
Internal(Replicate(req_id, seq, val)) => {
o.send(src, Internal(AckReplicate(req_id)));
if seq > state.seq {
let mut state = state.to_mut();
state.seq = seq;
state.val = val;
}
}
Internal(AckReplicate(expected_req_id))
if matches!(state.phase,
Some(AbdPhase::Phase2 { request_id, ref acks, .. })
if request_id == expected_req_id && !acks.contains(&src)) =>
{
let mut state = state.to_mut();
if let Some(AbdPhase::Phase2 {
request_id: req_id,
requester_id: requester,
read,
acks,
..
}) = &mut state.phase {
acks.insert(src);
if acks.len() == majority(self.peers.len() + 1) {
let msg = if let Some(val) = read {
GetOk(*req_id, std::mem::take(val))
} else {
PutOk(*req_id)
};
o.send(*requester, msg);
state.phase = None;
}
}
}
_ => {}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use stateright::{*, semantics::*, semantics::register::*};
#[test]
fn is_linearizable_quick() {
let checker = base_model()
.actor(RegisterActor::Server(AbdActor {
peers: Id::vec_from(vec![1]),
}))
.actor(RegisterActor::Server(AbdActor {
peers: Id::vec_from(vec![0]),
}))
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.checker().threads(num_cpus::get()).spawn_dfs().join();
checker.assert_properties();
assert_eq!(checker.unique_state_count(), 544);
}
#[test]
#[cfg_attr(debug_assertions, ignore = "enabled for --release only")]
fn is_linearizable() {
let checker = base_model()
.actor(RegisterActor::Server(AbdActor {
peers: Id::vec_from(vec![1, 2]),
}))
.actor(RegisterActor::Server(AbdActor {
peers: Id::vec_from(vec![0, 2]),
}))
.actor(RegisterActor::Server(AbdActor {
peers: Id::vec_from(vec![0, 1]),
}))
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.actor(RegisterActor::Client { put_count: 1, server_count: 2 })
.checker().threads(num_cpus::get()).spawn_dfs().join();
checker.assert_properties();
assert_eq!(checker.unique_state_count(), 37_168_889);
}
fn base_model()
-> ActorModel<
RegisterActor<AbdActor>,
(),
LinearizabilityTester<Id, Register<char>>>
{
ActorModel::new(
(),
LinearizabilityTester::new(Register('?'))
)
.init_network(Network::new_unordered_nonduplicating([]))
.property(Expectation::Always, "linearizable", |_, state| {
state.history.serialized_history().is_some()
})
.property(Expectation::Sometimes, "value chosen", |_, state| {
state.network.iter_deliverable().any(|e| {
if let RegisterMsg::GetOk(_, value) = e.msg {
return *value != '?';
}
return false
})
})
.record_msg_in(RegisterMsg::record_returns)
.record_msg_out(RegisterMsg::record_invocations)
}
}
fn main() {
env_logger::init_from_env(
env_logger::Env::default().default_filter_or("info"));
let id0 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000));
let id1 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3001));
let id2 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3002));
spawn(
serde_json::to_vec,
|bytes| serde_json::from_slice(bytes),
vec![
(id0, AbdActor { peers: vec![id1, id2] }),
(id1, AbdActor { peers: vec![id0, id2] }),
(id2, AbdActor { peers: vec![id0, id1] }),
]).unwrap();
}
Suggested Exercises
-
This algorithm can be optimized by observing that the replication phases need not replicate values if the quorum already agrees on a value. See if you can implement this optimization.
-
More generally, replication messages only need to be sent to replicas that disagree. See if you can implement this optimization as well.
TIP: This is a slightly more complex optimization because we need to treat "all agree" versus "not all agree" slightly differently to avoid dropping requests in some cases. Can you see why?
-
The current implementation also assumes that the network does not redeliver messages. Revise it to account for potential message redelivery.
Summary
This chapter provided the first taste of a real distributed algorithm. We were able to incrementally infer a solution, but it was nontrivial. If there were bugs in the code, they could be relatively difficult to identify without a model checker.
In the next chapter, we will introduce the notion of "consensus" and implement it via the Multi-Paxos algorithm. That chapter is not yet available, so in the meantime you can learn more about Stateright by browsing additional Stateright examples and reviewing the Stateright API docs. If you are familiar with TLA+, then the subsequent chapter Comparison with TLA+ may also be interesting to you.
If you have any questions, comments, or ideas, please share them on Stateright's Discord server. At this time Stateright is a small personal project, and the main author is eager to hear community feedback.
Comparison with TLA+
The previous part of this book focused on model checking runnable actor systems. Stateright is also able to model check higher level designs via "abstract models," much like a traditional model checker. This chapter compares two abstract models of the two-phase commit protocol: one written in a language called TLA+ and the other written in Rust. The chapter exists to assist those who are already familiar with TLA+, so feel free to skip if you are not familiar with TLA+.
Attribution
The TLA+ model comes from the paper "Consensus on transaction commit" by Jim Gray and Leslie Lamport and is used in accordance with the ACM's Software Copyright Notice. It has been adapted slightly for this book. Here are the copyright details from the paper:
Copyright 2005 by the Association for Computing Machinery, Inc. Permission to make digital or hard copies of part or all of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than ACM must be honored. Abstracting with credit is permitted. To copy otherwise, to republish, to post on servers, or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from Publications Dept, ACM Inc., fax +1 (212) 869-0481, or permissions@acm.org.
Citation:
Jim Gray and Leslie Lamport. 2006. Consensus on transaction commit. ACM Trans. Database Syst. 31, 1 (March 2006), 133–160. DOI:https://doi.org/10.1145/1132863.1132867
Lecture 6 of Leslie Lamport's TLA+ Video Course is recommended for an additional overview of the specification.
Unique Benefits of Each
Before getting to the code, it is valuable to highlight that while the functionality of TLC (the model checker for TLA+) and Stateright overlap to some degree, each has unique benefits. Enumerating some of these can assist with deciding when to use each solution.
Unique benefits of TLC/TLA+:
- Brevity: TLA+ is more concise than Rust.
- State Reduction: TLC supports symmetry reduction.
- Features: TLC supports arbitrarily complex temporal properties including fairness.
- Features: TLC supports refinement mapping between models.
Unique benefits of Stateright/Rust:
- Additional Verification: With Stateright your model and final implementation are encouraged to share code. This eliminates a possible failure mode whereby a model and it's resulting production system implementation deviate.
- Reuse and Extensibility: Rust has a far larger library ecosystem and is arguably more amenable to reuse than TLA+. For example, Stateright's code for defining a system's operational semantics are not built into the model checker and could be provided as an external library. The same can be said about the included register semantics, linearizability tester, and actor model to name a few other examples. More generally the entire Rust crate registry (with tens of thousands of libraries) is at your disposal. In contrast, the pattern for reuse in TLA+ is copy-paste-modify, and the number of reusable modules is relatively small.
- Checker Performance: Stateright tends to be faster and offers additional optimization possibilities such as replacing a set with a bit vector for example. While TLC allows you to override modules with Java implementations, doing so is relatively cumbersome and rarely used.
- Final Implementation Performance: As a more auxiliary benefit, Stateright can serve as a stress test for your final implementation, identifying regressions and also facilitating performance investigation.
- Features: Stateright offers "sometimes" properties that serve to sanity check that expected outcomes are possible. These are less powerful than temporal properties but serve a slightly different purpose because examples of these properties being met are included in the checker discoveries. You can simulate these in TLC by introducing false "invariants," but they need to be commented out and periodically run, which is more cumbersome.
- Features: Stateright Explorer allows you to interactively browse a model's state space and also lets you jump between discoveries (whether they are property violations or instances of "sometimes" properties).
With those out of the way, let's move on to the code comparison.
Code Comparison
Both models are parameterized by a collection of "resource managers." The TLA+
spec does not specify a type, but a set is expected. The Stateright spec maps
each resource manager to an integer in the range 0..N
(0
to N-1
inclusive).
|
|
Next we define variables. These are global in the TLA+ spec, and their type constraints are indicated later in the spec via an invariant. In Stateright these have a statically defined type.
|
|
Types in the TLA+ spec are conveyed via an invariant that is passed to its model checker, TLC.
|
|
TLA+ leverages temporal logic to convey the specification, while Stateright requires that a trait is implemented. One other distinct aspect is that Stateright actions are also types.
|
|
The initial aggregate state is one where
- the resource mangers are ready to start a transaction;
- the sole transaction manager is ready to start a transaction;
- the transaction manager's view of each resource manager indicates that none have indicated that they are preparing a transaction;
- and the network is an empty set.
Note that set semantics provide an ideal model for a network in this case because they capture the fact that networks rarely make guarantees about message order. Those guarantees must be imposed by additional protocols. And keep in mind that often those protocols only provide guarantees under limited scopes (e.g. TCP only orders messages within the lifetime of that connection, so a transient network partition can still cause message redelivery with TCP).
|
|
Now we get to the most interesting part of the model, the state transitions
(which Stateright calls
actions).
TLA+ requires each transition relation to precede the aggregate next state
relation (Next
in this case). Each action serves two roles: (1) it defines
its own preconditions and (2) it definies the subsequent state change (along
with the unchanged states). In Stateright it is more idiomatic (and performant)
to distinguish between the preconditions (in fn actions...
) and state change
(in fn next_state...
).
|
|
Then we get to the sole property: if a resource manager reaches a final commit/abort state, then no other resource manager can disagree with that decision.
|
|
Performance Comparison
Now we need to configure the model. For TLC, this is done via a special "CFG"
file, while for Stateright you simply introduce a Rust test. Rust also requires
a Cargo.toml
file.
|
|
TLA+ can be run from the command line using a tool such as
tla-bin or
tlacli, while the Stateright test is run
using cargo test --release
. The example below first calls build --tests
to
avoid timing dependency compilation but then revises the file timestamp to
include compilation time relevant to the development iteration cycle.
|
|
Stateright is generally faster, and the speedup tends to increase with larger state spaces. A comparison of model checking times on the author's laptop follows.
#RM | States | TLC | Stateright | Speedup |
---|---|---|---|---|
7 | 296,448 | 3 s | 1.697 s | 1.8X |
8 | 1,745,408 | 12 s | 2.566 s | 4.7X |
9 | 10,340,352 | 90 s | 8.902 s | 10.1X |
10 | 61,515,776 | 674 s | 54.709 s | 12.3X |