links chat crates.io docs.rs stars

Building Distributed Systems With Stateright

by Jonathan Nadal

Deep understanding of causality sometimes requires the understanding of very large patterns and their abstract relationships and interactions, not just the understanding of microscopic objects interacting in microscopic time intervals.

― Douglas R. Hofstadter, I Am a Strange Loop

Distributed computing is a term that refers to multiple computers working together as a system to solve a problem, typically because that problem would not be solvable on a single computer. For example, we all want to know that our important files will be accessible even when computer hardware inevitably fails. As a second example, a researcher at a pharmaceutical company may have a complex problem that would take decades for a single computer to solve, but which a collection of computers working together could solve in days.

Unique algorithms must be employed to coordinate workloads across these geographically distributed systems of computers because they are susceptible to categories of nondeterminism that do not arise when a problem is solved with a single computer. For example, the networks that link these computers will drop, reorder, and even redeliver messages. Algorithms that fail to account for this behavior may run correctly for extended periods but will eventually fail at unpredicatable times in unpredictable ways, such as causing data corruption.

Stateright is a software framework for analyzing and systematically verifying distributed systems. Its name refers to the goal of verifying that a system's collective state always satisfies a correctness specification, such as "any data written to the system should be accessible as long as at least one data center is reachable."

Cloud service providers like AWS and Azure leverage verification software such as the TLA+ model checker to achieve the same goal, but whereas those solutions typically verify a high level system design, Stateright is able to verify the underlying system implementation in addition to the design (along with providing other unique benefits explained in the "Comparison with TLA+" chapter). On the other end of the spectrum are tools such as Jepsen which can validate a final implementation by testing a random subset of the system's behavior, whereas Stateright can systematically enumerate all possible behaviors within a specified model of the system.

We'll jump right in with a motivating example in the first chapter, Getting Started. Alternatively:

Chapter 1: Getting Started

IMPORTANT: Stateright is a relatively new framework and will be making breaking API changes leading up to a 1.0 release, at which point the API will be considered stable. If you plan to use Stateright for production scenarios, then please file a GitHub issue so that the author can coordinate with you to minimize any disruption.

Stateright's Value Proposition

Stateright is a Rust library that simplifies implementing distributed systems while more importantly providing a powerful mechanism for verification.

Verification of distributed systems is difficult because distributed algorithms must be resilient to nondeterminism caused by concurrency such as threads racing, but that's only part of the challenge. Distributed algorithms also need to account for the fact that computers typically interface via unreliable networks that will periodically do things like reorder or lose messages (if you run your system in "the cloud," then it's running on an unreliable network). Furthermore, often a distributed algorithm is expected to continue working even if a subset of computers crash at arbitrary points in their execution. Designing algorithms that continue working in the presence of this added nondeterminism is nontrivial and error prone, which necessitates special tools for verifying correctness.

One approach to verifying the correctness of distributed algorithms is to run them in an environment that randomly introduces nondeterminism more frequently than a normal environment would. This is the approach taken by Jepsen, and it has proven to be incredibly effective, finding bugs in distributed systems such as etcd, PostgreSQL, Redis, and Zookeeper among many others.

Stateright's approach is similar, but rather than testing a random subset of possible behaviors, it tests all possible observable behaviors within a particular specification. The catch is that Stateright needs to be embedded into the system's implementation, whereas solutions such as Jepsen do not, making them amenable to testing a wider range of software; but if you are writing a distributed system in Rust, then Stateright can provide additional verification over random testing.

An Example

Let's start with a very simple distributed system: a single client that can interact with a single server by reading or writing a value. We'll see that even this minimal example is susceptible to surprising behavior.

Install the Rust programming language if it is not already installed, then initialize a new project using the cargo utility included with Rust. If you are new to Rust, then you should also review some of the language's learning resources.

mkdir getting-started
cd getting-started
cargo init

Define dependencies in Cargo.toml.

[package]
name = "getting-started"
version = "0.1.0"
edition = "2018"

[dependencies]
env_logger = "0.7"
serde_json = "1.0"
stateright = "0.30"

Here is the complete implementation for main.rs. Copy-paste it into your own file. The subsequent sections will explain further.

use stateright::actor::{*, register::*};
use std::borrow::Cow; // COW == clone-on-write
use std::net::{SocketAddrV4, Ipv4Addr};

type RequestId = u64;

#[derive(Clone)]
struct ServerActor;

impl Actor for ServerActor {
    type Msg = RegisterMsg<RequestId, char, ()>;
    type State = char;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        '?' // default value for the register
    }

    fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        match msg {
            RegisterMsg::Put(req_id, value) => {
                *state.to_mut() = value;
                o.send(src, RegisterMsg::PutOk(req_id));
            }
            RegisterMsg::Get(req_id) => {
                o.send(src, RegisterMsg::GetOk(req_id, **state));
            }
            _ => {}
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use stateright::{*, semantics::*, semantics::register::*};
    use ActorModelAction::Deliver;
    use RegisterMsg::{Get, GetOk, Put, PutOk};

    #[test]
    fn is_unfortunately_not_linearizable() {
        let checker = ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 2, server_count: 1 })
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "get succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::GetOk(_, _)))
            })
            .property(Expectation::Sometimes, "put succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::PutOk(_)))
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
            .checker().spawn_dfs().join();
        //checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
        checker.assert_discovery("linearizable", vec![
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
            Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(1) },
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(2, 'Z') },
            Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(2) },
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Get(3) },
            Deliver { src: Id::from(0), dst: Id::from(1), msg: GetOk(3, 'A') },
        ]);
    }
}

fn main() {
    env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
    spawn(
        serde_json::to_vec,
        |bytes| serde_json::from_slice(bytes),
        vec![
            (SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000), ServerActor)
        ]).unwrap();
}

Actor Framework Intro

The code implements a simple server using the actor model in which an "actor" is an object that can respond to events (such as timeouts or message receipt) and in turn updates its internal state and generates outputs (such as sending a message or setting a timer).

If you are familiar with the actor model (e.g. via the Erlang language or the Akka library), then it is useful to note distinguishing characteristics of Stateright's approach:

  1. Stateright must have visibility of every input and output to facilitate simulating all possible system behaviors. That means inputs and outputs must be in the form of messages. For example, if your actor needs to interface with a database, you might introduce DbExec(...) output and DbResult(...) input messages; or if it needs to interface with the file system, you might introduce FileRead(...) output and FileResult(...) input messages. An adapter layer would then translate these into/from the corresponding effects rather than treating them as standard messages between actors in the system. This technique will be demonstrated in a later chapter.
  2. Outputs do not take effect until after the handler returns. The outputs are simply collected in the o: &mut Out<Self> parameter whenever methods such as Out::send(...) are called, and Stateright's actor runtime sends them later.
  3. The actor state is only accessible via a clone-on-write cell with the state: &mut Cow<Self::State> parameter. Doing so enables Stateright to more efficiently validate a system when it is enumerating different branches of nondeterministic behavior.

Model Checking in More Detail

The reference to "enumerating different branches of nondeterministic behavior" deserves additional explanation. The nondeterminism within a nondeterministic system is never completely open ended. Instead there are "decision points" that arise, where a "decision" isn't directly made by your code but rather is the weighted random outcome of many factors. For example, if two clients concurrently initiate requests to a service, then the initial "decision" would be:

  1. the first client's request is delivered causing the service to respond,
  2. the second client's request is delivered causing the service to respond,
  3. or both clients time out while contacting the service.

If option 1 occurs, then it can be followed by option 2, and vice versa. In both cases, timeouts are also possible. In this manner the possible behaviors of a nondeterministic system can be seen as a decision tree.

decision diagram showing possible states

At each of these decision points, Stateright will explore one of the outcomes and then backtrack to expore the other outcome. This is the distinguishing characteristic of model checking in comparison with random testing. Also, each gray box indicates a potential state of the aggregate system (where the aggregate system state includes the client state, the service state, and the network state), and the collection of all potential aggregate states is known as the "state space."

Implementation Walkthrough

The server responds to Put and Get messages based only on its own local state, providing its clients with a simple form of distributed storage, sometimes known as a shared register. Responses are linked to requests via a request ID chosen by the client.

type RequestId = u64;

#[derive(Clone)]
struct ServerActor;

impl Actor for ServerActor {
    type Msg = RegisterMsg<RequestId, char, ()>;
    type State = char;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        '?' // default value for the register
    }

    fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        match msg {
            RegisterMsg::Put(req_id, value) => {
                *state.to_mut() = value;
                o.send(src, RegisterMsg::PutOk(req_id));
            }
            RegisterMsg::Get(req_id) => {
                o.send(src, RegisterMsg::GetOk(req_id, **state));
            }
            _ => {}
        }
    }
}

A test follows. The test checks the system for a property called linearizability, which loosely speaking means that the visible behavior of the register emulated by the actor system is identical to that of a register within a single-threaded system. In the words of the individuals who coined the term:

Linearizability provides the illusion that each operation applied by concurrent processes takes effect instantaneously at some point between its invocation and its response, implying that the meaning of a concurrent object’s operations can be given by pre- and post-conditions.

An important aspect of linearizability is the notion of a "sequential specification," which serves as a reference for correct behavior of the system. In other words, the system emulates the sequential specification. For instance, the sequential specification could indicate:

  • the system behaves like a memory cell (i.e. register semantics),
  • the system behaves like a queue,
  • or the system behaves like a stack.

That means that when someone indicates that a system is linearizable, it is important to keep in mind the question "linearizable with respect to what?" In this chapter, the sequential specification is register semantics, provided by Stateright, but later chapters will involve other sequential specifications.

The test leverages RegisterActor, which is built into Stateright and defines a kind of test fixture in which clients (only 1 in this case) write distinct values and independently read values without coordinating with one another.

    #[test]
    fn is_unfortunately_not_linearizable() {
        let checker = ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 2, server_count: 1 })
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "get succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::GetOk(_, _)))
            })
            .property(Expectation::Sometimes, "put succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::PutOk(_)))
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
            .checker().spawn_dfs().join();
        //checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
        checker.assert_discovery("linearizable", vec![
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
            Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(1) },
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(2, 'Z') },
            Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(2) },
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Get(3) },
            Deliver { src: Id::from(0), dst: Id::from(1), msg: GetOk(3, 'A') },
        ]);
    }

Stateright is able to find a bug that arises even if there is only a single client. The test indicates a sequence of steps that trigger the bug (AKA a Path), but in practice you would normally just call checker.assert_properties(), and Stateright would fail the test while indicating steps that reproduce the bug (although the specific example that it finds can vary).

The actor with Id 0 is the server while the actor with Id 1 is the client. For brevity, the example shows actor inputs (Deliver) but not outputs.

sequence diagram for the linearizability violation

  1. The server receives a Put from the client with value 'A', which it acknowledges. The client receives the PutOk acknowledgement and in turn sends a second Put with a new value, 'Z' (not shown yet since the test indicates message deliveries only, not message sends).
    Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
    Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(1) },
    
  2. The server receives the second Put, which it acknowledges. The client receives the PutOk acknowledgement and in turn sends a Get request (not shown yet), expecting to read 'Z'.
    Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(2, 'Z') },
    Deliver { src: Id::from(0), dst: Id::from(1), msg: PutOk(2) },
    
  3. The network redelivers the first write, inadvertently overwriting the second:
    Deliver { src: Id::from(1), dst: Id::from(0), msg: Put(1, 'A') },
    
  4. The server receives the earlier Get request and replies with 'A'. The client receives the unexpected value, which violates linearizability because from the perspective of the client, the system is not behaving as a single-threaded register.
    Deliver { src: Id::from(1), dst: Id::from(0), msg: Get(3) },
    Deliver { src: Id::from(0), dst: Id::from(1), msg: GetOk(3, 'A') },
    

The last bit of code defines the main method, which allows you to run the actor on UDP port 3000, encoding messages with the JSON format.

fn main() {
    env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
    spawn(
        serde_json::to_vec,
        |bytes| serde_json::from_slice(bytes),
        vec![
            (SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000), ServerActor)
        ]).unwrap();
}

Running

Confirm the system behaves as expected by running the test, which should pass because the test asserts that the bug exists. Include the --release flag so that Rust fully optimizes the code even during testing, as Stateright tests are computationally intensive and can be time consuming.

cargo test --release

Now run the actor on a UDP socket.

cargo run --release

If using a POSIX-oriented operating system, netcat can be used to interact with the actor from a different terminal window. Actor responses are omitted from the listing below for clarity, but you will see messages such as {"PutOk":0} printed to STDOUT. Numbers in the messages are request IDs, the importance of which will be more evident in the next chapter.

nc -u localhost 3000
{"Put":[0,"X"]}
{"Get":1}
{"Put":[2,"X"]}
{"Get":3}

Exercise

Uncomment the // TRY IT line, then run the test again. It should fail indicating a sequence of steps that would cause the linearizability expectation to be violated, and these steps may differ from the example that we followed. This exercise demonstrates how Stateright can detect flaws that would likely go undetected when simply reviewing code.

Summary

This chapter introduced one of the simplest possible distributed systems and showed how Stateright can find a subtle bug. The next chapter Taming the Network will address that bug.

Chapter 2: Taming the Network

In the last chapter we discovered a bug caused by the network's susceptibility to message redelivery. We address that in this chapter.

Initialize a new Rust project:

mkdir taming-the-network
cd taming-the-network
cargo init

Then add dependencies to Cargo.toml:

[package]
name = "taming-the-network"
version = "0.1.0"
edition = "2018"

[dependencies]
env_logger = "0.7"
serde_json = "1.0"
stateright = "0.30"

Here is the complete implementation for main.rs, explained below:


use stateright::actor::{*, register::*};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::net::{SocketAddrV4, Ipv4Addr};

#[derive(Clone)]
struct ServerActor;

type RequestId = u64;

#[derive(Clone, Debug, Hash, PartialEq)]
struct ActorState {
    value: char,
    delivered: BTreeSet<(Id, RequestId)>,
}

impl Actor for ServerActor {
    type Msg = RegisterMsg<RequestId, char, ()>;
    type State = ActorState;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        ActorState {
            value: '?',
            delivered: Default::default(),
        }
    }

    fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        match msg {
            RegisterMsg::Put(req_id, value) => {
                if state.delivered.contains(&(src, req_id)) { return }

                let mut state = state.to_mut();
                state.value = value;
                state.delivered.insert((src, req_id));
                o.send(src, RegisterMsg::PutOk(req_id));
            }
            RegisterMsg::Get(req_id) => {
                o.send(src, RegisterMsg::GetOk(req_id, state.value));
            }
            _ => {}
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use stateright::{*, semantics::*, semantics::register::*};
    use ActorModelAction::Deliver;
    use RegisterMsg::{Get, GetOk, Put, PutOk};

    #[test]
    fn satisfies_all_properties() {
        // Works with 1 client.
        base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 2, server_count: 1 })
            .checker().spawn_dfs().join()
            .assert_properties();

        // Or with multiple clients.
        // (TIP: test with `--release` mode for more clients)
        base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 1, server_count: 1 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 1 })
            .checker().spawn_dfs().join()
            .assert_properties();
    }

    #[test]
    fn not_linearizable_with_two_servers() {
        let checker = base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 2, server_count: 2 })
            .checker().spawn_dfs().join();
        //checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
        checker.assert_discovery("linearizable", vec![
            Deliver { src: Id::from(2), dst: Id::from(0), msg: Put(2, 'A') },
            Deliver { src: Id::from(0), dst: Id::from(2), msg: PutOk(2) },
            Deliver { src: Id::from(2), dst: Id::from(1), msg: Put(4, 'Z') },
            Deliver { src: Id::from(1), dst: Id::from(2), msg: PutOk(4) },
            Deliver { src: Id::from(2), dst: Id::from(0), msg: Get(6) },
            Deliver { src: Id::from(0), dst: Id::from(2), msg: GetOk(6, 'A') },
        ]);
    }

    fn base_model()
        -> ActorModel<
            RegisterActor<ServerActor>,
            (),
            LinearizabilityTester<Id, Register<char>>>
    {
        ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "get succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::GetOk(_, _)))
            })
            .property(Expectation::Sometimes, "put succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::PutOk(_)))
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
    }
}

// Running the program spawns a single actor on UDP port 3000. Messages are JSON-serialized.
fn main() {
    env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
    spawn(
        serde_json::to_vec,
        |bytes| serde_json::from_slice(bytes),
        vec![
            (SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000), ServerActor)
        ]).unwrap();
}

Implementation Walkthrough

Addressing the linearizability issue is merely a matter of throwing away message redeliveries. The simplest approach is to record every delivered request ID.

type RequestId = u64;

#[derive(Clone, Debug, Hash, PartialEq)]
struct ActorState {
    value: char,
    delivered: BTreeSet<(Id, RequestId)>,
}

impl Actor for ServerActor {
    type Msg = RegisterMsg<RequestId, char, ()>;
    type State = ActorState;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        ActorState {
            value: '?',
            delivered: Default::default(),
        }
    }

    fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        match msg {
            RegisterMsg::Put(req_id, value) => {
                if state.delivered.contains(&(src, req_id)) { return }

                let mut state = state.to_mut();
                state.value = value;
                state.delivered.insert((src, req_id));
                o.send(src, RegisterMsg::PutOk(req_id));
            }
            RegisterMsg::Get(req_id) => {
                o.send(src, RegisterMsg::GetOk(req_id, state.value));
            }
            _ => {}
        }
    }
}

With that small change, each server provides an independent linearizable register. In the presence of messages concurrently in flight, the register abstraction is still linearizable (for example, reads cannot observe values overwritten before the read began, among other characteristics).

The servers feature no replication, so a collection of servers does not provide a unified service that emulates a linearizable register as the servers will not generally agree upon the last value they received.

    #[test]
    fn satisfies_all_properties() {
        // Works with 1 client.
        base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 2, server_count: 1 })
            .checker().spawn_dfs().join()
            .assert_properties();

        // Or with multiple clients.
        // (TIP: test with `--release` mode for more clients)
        base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 1, server_count: 1 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 1 })
            .checker().spawn_dfs().join()
            .assert_properties();
    }

    #[test]
    fn not_linearizable_with_two_servers() {
        let checker = base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 2, server_count: 2 })
            .checker().spawn_dfs().join();
        //checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
        checker.assert_discovery("linearizable", vec![
            Deliver { src: Id::from(2), dst: Id::from(0), msg: Put(2, 'A') },
            Deliver { src: Id::from(0), dst: Id::from(2), msg: PutOk(2) },
            Deliver { src: Id::from(2), dst: Id::from(1), msg: Put(4, 'Z') },
            Deliver { src: Id::from(1), dst: Id::from(2), msg: PutOk(4) },
            Deliver { src: Id::from(2), dst: Id::from(0), msg: Get(6) },
            Deliver { src: Id::from(0), dst: Id::from(2), msg: GetOk(6, 'A') },
        ]);
    }

Those two tests are supported by a helper that sets up the model.

    fn base_model()
        -> ActorModel<
            RegisterActor<ServerActor>,
            (),
            LinearizabilityTester<Id, Register<char>>>
    {
        ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "get succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::GetOk(_, _)))
            })
            .property(Expectation::Sometimes, "put succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::PutOk(_)))
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
    }

Suggested Exercises

  1. Compaction: Storing every request ID isn't viable for a long running process. The simplest approach is to require that request IDs are "monotonic" -- which means they are increasing (and gaps are acceptable). In that case, the delivery handler throws away messages with a request ID smaller than the last handled message. See if you can amend the example accordingly.
  2. Optimized Compaction: Reordered messages will be dropped because late delivered message will have a smaller request ID. Protocols need to account for the network dropping messages anyway, so generally speaking this tradeoff only impacts performance. Still, throughput can be improved by adding a "sliding window" buffer on the server side to minimize dropped messages. See if you can implement that.
  3. Lossless Link: One technique for minimizing message loss is to have the client also maintain a buffer of outgoing messages, and the client periodically resends messages that have not been acknowledged by the recipient within a particular timeout period. TCP for example does this for packets. See if you can implement this as well. If you need help, see ordered_reliable_link.rs in the Stateright repository.

Summary

This chapter showed how to fix the implementation from the previous chapter, maintaining linearizability even if messages are redelivered. The next chapter Seeking Consensus will introduce the concept of replication, which is used to provide (1) data recoverability in the event of a server crash and/or (2) improved performance for high request rates by distributing requests (such as reads) across a wider range of hosts.

Chapter 3: Seeking Consensus

In the last chapter we fixed a bug caused by the network's susceptibility to message redelivery, but our solution could only run on a single server. Introducing a second server would break linearizability as the system failed to replicate information between servers.

In this chapter we introduce a simple replication protocol in an attempt to address that shortcoming.

Once again, we start by initializing a new Rust project:

mkdir seeking-consensus
cd seeking-consensus
cargo init

Next we add dependencies to Cargo.toml. Notice that we now need to include the serde package.

[package]
name = "seeking-consensus"
version = "0.1.0"
edition = "2018"

[dependencies]
env_logger = "0.7"
serde = "1.0"
serde_json = "1.0"
stateright = "0.30"

By now you have the hang of implementing basic actor systems in Stateright, so we'll defer the full main.rs source code listing until later in the chapter.

A Replication Protocol

First we get to decide on a replication protocol. Give this some thought, and see if you have ideas.

Exercise: Yes, really. Take some time to think about how you might add replication.

Done? Great!

For this example, we'll proceed with a protocol that simply involves forwarding the value to every peer server before replying to the client, thereby ensuring the servers agree on the value. This might be the protocol that you envisioned as well.

Implementation Walkthrough

The first notable difference is the need to introduce actor-specific configuration indicating each server's peers.

#[derive(Clone)]
struct ServerActor {
    peers: Vec<Id>,
}

The next notable difference is the need to introduce a message type for replication.

#[derive(Clone, Debug, Hash, Eq, PartialEq)]
#[derive(Deserialize, Serialize)]
enum InternalMsg {
    Replicate(RequestId, char),
    ReplicateOk(RequestId),
}

The server defers sending a PutOk message until replicas reply, but Stateright actors are nonblocking, so they must manage some additional state:

  • the ID of the request, against which replica replies are matched (to guard against late responses),
  • the ID of the client that made the request (to facilitate replying later),
  • and the set of servers that have acknowledged the replicated value (to facilitate waiting until all have replied).
#[derive(Clone, Debug, Hash, PartialEq)]
struct ActorState {
    value: char,
    delivered: BTreeSet<(Id, RequestId)>,
    in_flight_put: Option<PutState>,
}

#[derive(Clone, Debug, Hash, PartialEq)]
struct PutState {
    req_id: RequestId,
    src: Id,
    peer_acks: BTreeSet<Id>,
}

We are now ready to implement the protocol.

    fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        match msg {
            RegisterMsg::Put(req_id, value) if state.in_flight_put.is_none() => {
                if state.delivered.contains(&(src, req_id)) { return }

                let mut state = state.to_mut();
                state.value = value;
                state.delivered.insert((src, req_id));
                state.in_flight_put = Some(PutState {
                    req_id,
                    src,
                    peer_acks: Default::default(),
                });
                for &peer_id in &self.peers {
                    o.send(peer_id,
                           RegisterMsg::Internal(
                               InternalMsg::Replicate(req_id, value)));
                }
                // Will not reply w/ `PutOk` until all replicas ack.
            }
            RegisterMsg::Get(req_id) => {
                o.send(src, RegisterMsg::GetOk(req_id, state.value));
            }
            RegisterMsg::Internal(InternalMsg::Replicate(req_id, value)) => {
                if state.delivered.contains(&(src, req_id)) { return }

                let mut state = state.to_mut();
                state.value = value;
                state.delivered.insert((src, req_id));
                o.send(src,
                       RegisterMsg::Internal(InternalMsg::ReplicateOk(req_id)));
            }
            RegisterMsg::Internal(InternalMsg::ReplicateOk(req_id)) => {
                if state.delivered.contains(&(src, req_id)) { return }

                let mut state = state.to_mut();
                if let Some(put) = &mut state.in_flight_put {
                    if req_id != put.req_id { return }

                    put.peer_acks.insert(src);
                    if put.peer_acks.len() == self.peers.len() {
                        o.send(put.src, RegisterMsg::PutOk(req_id));
                        state.in_flight_put = None;
                    }
                }
            }
            _ => {}
        }
    }

Now the big question: does this protocol solve the problem we ran into last chapter?

Unfortunately achieving linearizability involves a bit more sophistication, and Stateright identifies a sequence of steps that are not linearizable. The sequence is nontrivial and demonstrates why a model checker is so useful for implementing distributed systems.

    #[test]
    fn appears_linearizable_in_limited_scenarios() {
        // Succeeds if there are 2 clients and 2 servers.
        base_model()
            .actor(RegisterActor::Server(ServerActor {
                peers: Id::vec_from(vec![1]),
            }))
            .actor(RegisterActor::Server(ServerActor {
                peers: Id::vec_from(vec![0]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker().spawn_dfs().join()
            .assert_properties();
    }

    #[test]
    fn not_generally_linearizable() {
        // Can fail if there are 3 clients.
        let checker = base_model()
            .actor(RegisterActor::Server(ServerActor {
                peers: Id::vec_from(vec![1]),
            }))
            .actor(RegisterActor::Server(ServerActor {
                peers: Id::vec_from(vec![0]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker()
            .spawn_dfs().join();       // TRY IT: Comment out this line, and uncomment
            //.serve("0:3000");        //         the next to load Stateright Explorer.
        //checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
        checker.assert_discovery("linearizable", vec![
            Deliver { src: Id::from(4), dst: Id::from(0), msg: Put(4, 'C') },
            Deliver { src: Id::from(0), dst: Id::from(1), msg: Internal(Replicate(4, 'C')) },
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Internal(ReplicateOk(4)) },
            Deliver { src: Id::from(3), dst: Id::from(1), msg: Put(3, 'B') },
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Internal(Replicate(3, 'B')) },
            Deliver { src: Id::from(0), dst: Id::from(1), msg: Internal(ReplicateOk(3)) },
            Deliver { src: Id::from(1), dst: Id::from(3), msg: PutOk(3) },
            Deliver { src: Id::from(2), dst: Id::from(0), msg: Put(2, 'A') },
            Deliver { src: Id::from(3), dst: Id::from(0), msg: Get(6) },
            Deliver { src: Id::from(0), dst: Id::from(3), msg: GetOk(6, 'A') },
            Deliver { src: Id::from(0), dst: Id::from(4), msg: PutOk(4) },
            Deliver { src: Id::from(4), dst: Id::from(1), msg: Get(8) },
            Deliver { src: Id::from(1), dst: Id::from(4), msg: GetOk(8, 'B') },
        ]);
    }

    fn base_model()
        -> ActorModel<
            RegisterActor<ServerActor>,
            (),
            LinearizabilityTester<Id, Register<char>>>
    {
        ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "value chosen", |_, state| {
                state.network.iter_deliverable().any(|e| {
                    if let RegisterMsg::GetOk(_, value) = e.msg {
                        return *value != '?';
                    }
                    return false
                })
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
    }

Stateright Explorer

It's not immediately clear why the sequence of steps identified by Stateright violates linearizability. Luckily Stateright includes a web UI that can help you understand scenarios such as this one.

Stateright Explorer is started by calling serve(...). You can easily do this by following the directions in the first // TRY IT line, which will suspend the test when it is next run, allowing you to load http://localhost:3000 in your web browser to debug.

Tip: Model checking with Stateright Explorer is breadth-first, as that tends to find shorter discovery paths than depth-first search. One downside of this approach is that breadth-first search consumes more memory, so Explorer works best with relatively small state spaces (hundreds of thousands of states rather than millions of states, for example).

When you load Stateright Explorer, you'll see the checker status in the upper left corner. Within a few seconds the checker should add "linearizable" counterexample to its list of discoveries.

Stateright Explorer on load

Click that link to load the discovery.

Stateright Explorer after clicking the link

The first thing you might notice is the sequence diagram.

sequence diagram for the linearizability violation

Tracing backwards from the last event, we can see why linearizability is violated:

  1. GetOk(8, 'B') indicates that 'B' is the earliest write finishing before the read. Also, no operations are concurrent with the read.
  2. PutOk(4) was in response to the long running Put(4, 'C') operation, indicating that the value 'C' must have been written at some point between invocation and response. Unlike the read, the precise sequencing of this write in relation to other operations is indeterminate due to concurrency).
  3. GetOk(6, 'A') indicates that 'A' is the earliest write finishing before that read. Only the write of 'C' is concurrent with the start and end of the read, so it's possible that the write took effect before or after the read took effect.

We don't have to trace any further back, as those observations above highlight the anomaly: 'A' had been the most recent write, then 'C' may or may not have been written next (as the concurrency allows different linearizations), and finally 'B' was read. No linearization of concurrent operations can reconcile this anomaly, so the protocol is not linearizable. QED.

Complete Implementation

Here is the complete implementation for main.rs:


use serde::{Deserialize, Serialize};
use stateright::actor::{*, register::*};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::net::{SocketAddrV4, Ipv4Addr};

#[derive(Clone)]
struct ServerActor {
    peers: Vec<Id>,
}

type RequestId = u64;

#[derive(Clone, Debug, Hash, Eq, PartialEq)]
#[derive(Deserialize, Serialize)]
enum InternalMsg {
    Replicate(RequestId, char),
    ReplicateOk(RequestId),
}

#[derive(Clone, Debug, Hash, PartialEq)]
struct ActorState {
    value: char,
    delivered: BTreeSet<(Id, RequestId)>,
    in_flight_put: Option<PutState>,
}

#[derive(Clone, Debug, Hash, PartialEq)]
struct PutState {
    req_id: RequestId,
    src: Id,
    peer_acks: BTreeSet<Id>,
}

impl Actor for ServerActor {
    type Msg = RegisterMsg<RequestId, char, InternalMsg>;
    type State = ActorState;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        ActorState {
            value: '?',
            delivered: Default::default(),
            in_flight_put: None,
        }
    }

    fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        match msg {
            RegisterMsg::Put(req_id, value) if state.in_flight_put.is_none() => {
                if state.delivered.contains(&(src, req_id)) { return }

                let mut state = state.to_mut();
                state.value = value;
                state.delivered.insert((src, req_id));
                state.in_flight_put = Some(PutState {
                    req_id,
                    src,
                    peer_acks: Default::default(),
                });
                for &peer_id in &self.peers {
                    o.send(peer_id,
                           RegisterMsg::Internal(
                               InternalMsg::Replicate(req_id, value)));
                }
                // Will not reply w/ `PutOk` until all replicas ack.
            }
            RegisterMsg::Get(req_id) => {
                o.send(src, RegisterMsg::GetOk(req_id, state.value));
            }
            RegisterMsg::Internal(InternalMsg::Replicate(req_id, value)) => {
                if state.delivered.contains(&(src, req_id)) { return }

                let mut state = state.to_mut();
                state.value = value;
                state.delivered.insert((src, req_id));
                o.send(src,
                       RegisterMsg::Internal(InternalMsg::ReplicateOk(req_id)));
            }
            RegisterMsg::Internal(InternalMsg::ReplicateOk(req_id)) => {
                if state.delivered.contains(&(src, req_id)) { return }

                let mut state = state.to_mut();
                if let Some(put) = &mut state.in_flight_put {
                    if req_id != put.req_id { return }

                    put.peer_acks.insert(src);
                    if put.peer_acks.len() == self.peers.len() {
                        o.send(put.src, RegisterMsg::PutOk(req_id));
                        state.in_flight_put = None;
                    }
                }
            }
            _ => {}
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use stateright::{*, semantics::*, semantics::register::*};
    use ActorModelAction::Deliver;
    use InternalMsg::{Replicate, ReplicateOk};
    use RegisterMsg::{Get, GetOk, Internal, Put, PutOk};

    #[test]
    fn appears_linearizable_in_limited_scenarios() {
        // Succeeds if there are 2 clients and 2 servers.
        base_model()
            .actor(RegisterActor::Server(ServerActor {
                peers: Id::vec_from(vec![1]),
            }))
            .actor(RegisterActor::Server(ServerActor {
                peers: Id::vec_from(vec![0]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker().spawn_dfs().join()
            .assert_properties();
    }

    #[test]
    fn not_generally_linearizable() {
        // Can fail if there are 3 clients.
        let checker = base_model()
            .actor(RegisterActor::Server(ServerActor {
                peers: Id::vec_from(vec![1]),
            }))
            .actor(RegisterActor::Server(ServerActor {
                peers: Id::vec_from(vec![0]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker()
            .spawn_dfs().join();       // TRY IT: Comment out this line, and uncomment
            //.serve("0:3000");        //         the next to load Stateright Explorer.
        //checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
        checker.assert_discovery("linearizable", vec![
            Deliver { src: Id::from(4), dst: Id::from(0), msg: Put(4, 'C') },
            Deliver { src: Id::from(0), dst: Id::from(1), msg: Internal(Replicate(4, 'C')) },
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Internal(ReplicateOk(4)) },
            Deliver { src: Id::from(3), dst: Id::from(1), msg: Put(3, 'B') },
            Deliver { src: Id::from(1), dst: Id::from(0), msg: Internal(Replicate(3, 'B')) },
            Deliver { src: Id::from(0), dst: Id::from(1), msg: Internal(ReplicateOk(3)) },
            Deliver { src: Id::from(1), dst: Id::from(3), msg: PutOk(3) },
            Deliver { src: Id::from(2), dst: Id::from(0), msg: Put(2, 'A') },
            Deliver { src: Id::from(3), dst: Id::from(0), msg: Get(6) },
            Deliver { src: Id::from(0), dst: Id::from(3), msg: GetOk(6, 'A') },
            Deliver { src: Id::from(0), dst: Id::from(4), msg: PutOk(4) },
            Deliver { src: Id::from(4), dst: Id::from(1), msg: Get(8) },
            Deliver { src: Id::from(1), dst: Id::from(4), msg: GetOk(8, 'B') },
        ]);
    }

    fn base_model()
        -> ActorModel<
            RegisterActor<ServerActor>,
            (),
            LinearizabilityTester<Id, Register<char>>>
    {
        ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "value chosen", |_, state| {
                state.network.iter_deliverable().any(|e| {
                    if let RegisterMsg::GetOk(_, value) = e.msg {
                        return *value != '?';
                    }
                    return false
                })
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
    }
}

// Running the program spawns actors on UDP ports 3000-3002. Messages are JSON-serialized.
fn main() {
    env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
    let id0 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000));
    let id1 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3001));
    let id2 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3002));
    spawn(
        serde_json::to_vec,
        |bytes| serde_json::from_slice(bytes),
        vec![
            (id0, ServerActor { peers: vec![id1, id2].into_iter().collect() } ),
            (id1, ServerActor { peers: vec![id0, id2].into_iter().collect() } ),
            (id2, ServerActor { peers: vec![id0, id1].into_iter().collect() } ),
        ]).unwrap();
}

Suggested Exercise

Uncomment the second // TRY IT line to cause the test to fail, delete the linearizability counterexample assertion since it may not longer apply, and see if you can amend the actor implementation to make the test pass. The next chapter will provide a solution, but going through the exercise of trying to design a solution youself will help convey how subtle bugs can be with distributed protocols, and hopefully it will demonstrate why Stateright is so useful for this problem space.

Reminder: use cargo test --release when running the tests for dramatically better model checking performance. Running tests without that flag may result in significant delays. Also, try using Stateright Explorer for debugging as needed.

Summary

This chapter introduced replication, and Stateright was able to find a bug in our replication protocol. The next chapter, Achieving Linearizability introduces a more sophisticated protocol that makes the replicated register linearizable.

Chapter 4: Achieving Linearizability

In the last chapter we implemented a faulty replication protocol that violated linearizability because the emulated register was not atomic. This chapter introduces a more sophisticated replication protocol that the author first learned about when reading the blog post "Replicated/Fault-tolerant atomic storage" by Murat Demirbas.

As usual, we start by initializing a new Rust project:

mkdir achieving-linearizability
cd achieving-linearizability
cargo init

Then we define dependencies.

[package]
name = "achieving-linearizability"
version = "0.1.0"
edition = "2018"

[dependencies]
env_logger = "0.7"
num_cpus = "1"
serde = "1.0"
serde_json = "1.0"
stateright = "0.30"

A Truly Atomic Register

One problem with the earlier protocol is that reads can observe different values depending on which server accepts the read. We need to amend the protocol to ensure that given the same aggregate system state, any two reads are guaranteed to observe the same value. The simplest solution is to query every server when servicing a read operation, but that introduces yet another problem: availability. By forcing reads to query every server, if a single server becomes unavailable, the entire system becomes unavailable. This problem also exists for writes in that replication protocol.

The trick to solving this new problem is observing that we only need to ensure that "read sets" overlap one another. If a majority of servers agree on a value, then we know that any other majority of servers must either agree on the same value or must have a different value that is not part of the majority. We can apply the same logic to the "write set" as well. We call the set of sufficient reads a "read quorum" and the set of sufficient writes a "write quorum."

Leveraging read and write quorums solves the availability problem when a minority of servers are unreachable, but we still have a second availability problem: in many cases a read set will not agree on a value. This can happen if the read is concurrent with a write or if two earlier writes were concurrent. In those cases, the server accepting the read must either ignore the request or return an error, upon which we can improve.

To solve this remaining problem, we simply need to force the system into agreement, thereby ensuring any subsequent read either observes the forced agreement or a subsequent write. This is the technique employed by Attiya, Bar-Noy, and Dolev in their paper "Sharing Memory Robustly in Message-Passing Systems".

Both read and write operations are handled in two phases: (1) a query phase followed by (2) a replication phase.

  • 1. Query Phase: In the query phase, a server finds out what replicated values a quorum of replicas has previously accepted. The replicas also return a logical clock that serves to sequence earlier write operations, so the implementation will refer to this as a sequencer.
  • 2a. Write Replication Phase: If servicing a write, then upon receiving responses from a quorum, the server replicates the chosen value along with a slightly larger sequencer than the one observed. Once a majority of the replicas ack this second phase, the server can indicate to the client that the write is complete.
  • 2b. Read Replication Phase: If servicing a read, then upon receiving responses from a quorum, the server replicates the value with the largest observed sequencer. Once a majority of the replicas ack this second phase, the server can return the value that it replicated.

Implementation Walkthrough

We first define our message type. Abd... in the type refers to the names of the algorithm's original author's -- Attiya, Bar-Noy, and Dolev -- or "ABD."

type RequestId = u64;
type Value = char;

#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[derive(Serialize, Deserialize)]
enum AbdMsg {
    Query(RequestId),
    AckQuery(RequestId, Seq, Value),
    Replicate(RequestId, Seq, Value),
    AckReplicate(RequestId),
}

The current implementation combines the roles of "replica" and "coordinator that accepts a request and facilitates replication." seq and val are for the first role, while phase tracks information needed by the second role.

Of particular note is that for phase 1, writes need to remember what value to later replicate, which is stored in write. Conversely for phase 2, reads need to remember what value to later return to the client, which is stored in read.

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct AbdState {
    seq: Seq,
    val: Value,
    phase: Option<AbdPhase>,
}

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum AbdPhase {
    Phase1 {
        request_id: RequestId,
        requester_id: Id,
        write: Option<Value>, // `None` for read
        responses: BTreeMap<Id, (Seq, Value)>,
    },
    Phase2 {
        request_id: RequestId,
        requester_id: Id,
        read: Option<Value>, // `None` for write
        acks: BTreeSet<Id>,
    },
}

type Seq = (LogicalClock, Id); // `Id` for uniqueness
type LogicalClock = u64;

We are now ready to implement the protocol. Note that the implementation intentionally avoids decomposing the message handlers into different function calls because each call needs to manage shared actor state. Spreading state mutation across multiple locations in the source code arguably makes the implementation harder to follow (much like mutable global variables), and functions are only used where they can be reasoned about locally.

An alternative composition strategy that does often work well for actor systems involves distinguishing roles so that each has less state (having different AbdServer and AbdReplica actors for example), although we will not do that for this short example. The next chapter on the other hand will demonstrate how to compose a system of different actor types.

#[derive(Clone)]
struct AbdActor {
    peers: Vec<Id>,
}

impl Actor for AbdActor {
    type Msg = RegisterMsg<RequestId, Value, AbdMsg>;
    type State = AbdState;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        AbdState {
            seq: (0, Id::from(0)),
            val: '?',
            phase: None,
        }
    }

    fn on_msg(&self, id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        use RegisterMsg::*;
        match msg {
            Put(req_id, val) if state.phase.is_none() => {
                o.broadcast(&self.peers, &Internal(Query(req_id)));
                state.to_mut().phase = Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: src,
                    write: Some(val),
                    responses: {
                        let mut responses = BTreeMap::default();
                        responses.insert(id, (state.seq, state.val.clone()));
                        responses
                    },
                });
            }
            Get(req_id) if state.phase.is_none() => {
                o.broadcast(&self.peers, &Internal(Query(req_id)));
                state.to_mut().phase = Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: src,
                    write: None,
                    responses: {
                        let mut responses = BTreeMap::default();
                        responses.insert(id, (state.seq, state.val.clone()));
                        responses
                    },
                });
            }
            Internal(Query(req_id)) => {
                o.send(
                    src,
                    Internal(AckQuery(req_id, state.seq, state.val.clone())));
            }
            Internal(AckQuery(expected_req_id, seq, val))
                if matches!(state.phase,
                            Some(AbdPhase::Phase1 { request_id, .. })
                            if request_id == expected_req_id) =>
            {
                let mut state = state.to_mut();
                if let Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: requester,
                    write,
                    responses,
                    ..
                }) = &mut state.phase {
                    responses.insert(src, (seq, val));
                    if responses.len() == majority(self.peers.len() + 1) {
                        // Quorum reached. Move to phase 2.

                        // Determine sequencer and value.
                        let (_, (seq, val)) = responses.into_iter()
                            .max_by_key(|(_, (seq, _))| *seq)
                            .unwrap();
                        let mut seq = *seq;
                        let mut read = None;
                        let val = if let Some(val) = std::mem::take(write) {
                            seq = (seq.0 + 1, id);
                            val
                        } else {
                            read = Some(val.clone());
                            val.clone()
                        };

                        o.broadcast(
                            &self.peers,
                            &Internal(Replicate(*req_id, seq, val.clone())));

                        // Self-send `Replicate`.
                        if seq > state.seq {
                            state.seq = seq;
                            state.val = val;
                        }

                        // Self-send `AckReplicate`.
                        let mut acks = BTreeSet::default();
                        acks.insert(id);

                        state.phase = Some(AbdPhase::Phase2 {
                            request_id: *req_id,
                            requester_id: std::mem::take(requester),
                            read,
                            acks,
                        });
                    }
                }
            }
            Internal(Replicate(req_id, seq, val)) => {
                o.send(src, Internal(AckReplicate(req_id)));
                if seq > state.seq {
                    let mut state = state.to_mut();
                    state.seq = seq;
                    state.val = val;
                }
            }
            Internal(AckReplicate(expected_req_id))
                if matches!(state.phase,
                            Some(AbdPhase::Phase2 { request_id, ref acks, .. })
                            if request_id == expected_req_id && !acks.contains(&src)) =>
            {
                let mut state = state.to_mut();
                if let Some(AbdPhase::Phase2 {
                    request_id: req_id,
                    requester_id: requester,
                    read,
                    acks,
                    ..
                }) = &mut state.phase {
                    acks.insert(src);
                    if acks.len() == majority(self.peers.len() + 1) {
                        let msg = if let Some(val) = read {
                            GetOk(*req_id, std::mem::take(val))
                        } else {
                            PutOk(*req_id)
                        };
                        o.send(*requester, msg);
                        state.phase = None;
                    }
                }
            }
            _ => {}
        }
    }
}

The test case confirms that this implementation is linearizable provided that the network does not redeliver messages. Performing thorough model checking may take up to several minutes (depending on your system's performance) and will only be performed with a --release build.

    #[test]
    fn is_linearizable_quick() {
        let checker = base_model()
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![1]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker().threads(num_cpus::get()).spawn_dfs().join();
        checker.assert_properties();
        assert_eq!(checker.unique_state_count(), 544);
    }

    #[test]
    #[cfg_attr(debug_assertions, ignore = "enabled for --release only")]
    fn is_linearizable() {
        let checker = base_model()
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![1, 2]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0, 2]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0, 1]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker().threads(num_cpus::get()).spawn_dfs().join();
        checker.assert_properties();
        assert_eq!(checker.unique_state_count(), 37_168_889);
    }

    fn base_model()
        -> ActorModel<
            RegisterActor<AbdActor>,
            (),
            LinearizabilityTester<Id, Register<char>>>
    {
        ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .init_network(Network::new_unordered_nonduplicating([]))
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "value chosen", |_, state| {
                state.network.iter_deliverable().any(|e| {
                    if let RegisterMsg::GetOk(_, value) = e.msg {
                        return *value != '?';
                    }
                    return false
                })
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
    }

Complete Implementation

Here is the complete implementation for main.rs:

use serde::{Deserialize, Serialize};
use stateright::actor::{*, register::*};
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::hash::Hash;
use std::net::{SocketAddrV4, Ipv4Addr};

type RequestId = u64;
type Value = char;

#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[derive(Serialize, Deserialize)]
enum AbdMsg {
    Query(RequestId),
    AckQuery(RequestId, Seq, Value),
    Replicate(RequestId, Seq, Value),
    AckReplicate(RequestId),
}
use AbdMsg::*;

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct AbdState {
    seq: Seq,
    val: Value,
    phase: Option<AbdPhase>,
}

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum AbdPhase {
    Phase1 {
        request_id: RequestId,
        requester_id: Id,
        write: Option<Value>, // `None` for read
        responses: BTreeMap<Id, (Seq, Value)>,
    },
    Phase2 {
        request_id: RequestId,
        requester_id: Id,
        read: Option<Value>, // `None` for write
        acks: BTreeSet<Id>,
    },
}

type Seq = (LogicalClock, Id); // `Id` for uniqueness
type LogicalClock = u64;

#[derive(Clone)]
struct AbdActor {
    peers: Vec<Id>,
}

impl Actor for AbdActor {
    type Msg = RegisterMsg<RequestId, Value, AbdMsg>;
    type State = AbdState;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        AbdState {
            seq: (0, Id::from(0)),
            val: '?',
            phase: None,
        }
    }

    fn on_msg(&self, id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        use RegisterMsg::*;
        match msg {
            Put(req_id, val) if state.phase.is_none() => {
                o.broadcast(&self.peers, &Internal(Query(req_id)));
                state.to_mut().phase = Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: src,
                    write: Some(val),
                    responses: {
                        let mut responses = BTreeMap::default();
                        responses.insert(id, (state.seq, state.val.clone()));
                        responses
                    },
                });
            }
            Get(req_id) if state.phase.is_none() => {
                o.broadcast(&self.peers, &Internal(Query(req_id)));
                state.to_mut().phase = Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: src,
                    write: None,
                    responses: {
                        let mut responses = BTreeMap::default();
                        responses.insert(id, (state.seq, state.val.clone()));
                        responses
                    },
                });
            }
            Internal(Query(req_id)) => {
                o.send(
                    src,
                    Internal(AckQuery(req_id, state.seq, state.val.clone())));
            }
            Internal(AckQuery(expected_req_id, seq, val))
                if matches!(state.phase,
                            Some(AbdPhase::Phase1 { request_id, .. })
                            if request_id == expected_req_id) =>
            {
                let mut state = state.to_mut();
                if let Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: requester,
                    write,
                    responses,
                    ..
                }) = &mut state.phase {
                    responses.insert(src, (seq, val));
                    if responses.len() == majority(self.peers.len() + 1) {
                        // Quorum reached. Move to phase 2.

                        // Determine sequencer and value.
                        let (_, (seq, val)) = responses.into_iter()
                            .max_by_key(|(_, (seq, _))| *seq)
                            .unwrap();
                        let mut seq = *seq;
                        let mut read = None;
                        let val = if let Some(val) = std::mem::take(write) {
                            seq = (seq.0 + 1, id);
                            val
                        } else {
                            read = Some(val.clone());
                            val.clone()
                        };

                        o.broadcast(
                            &self.peers,
                            &Internal(Replicate(*req_id, seq, val.clone())));

                        // Self-send `Replicate`.
                        if seq > state.seq {
                            state.seq = seq;
                            state.val = val;
                        }

                        // Self-send `AckReplicate`.
                        let mut acks = BTreeSet::default();
                        acks.insert(id);

                        state.phase = Some(AbdPhase::Phase2 {
                            request_id: *req_id,
                            requester_id: std::mem::take(requester),
                            read,
                            acks,
                        });
                    }
                }
            }
            Internal(Replicate(req_id, seq, val)) => {
                o.send(src, Internal(AckReplicate(req_id)));
                if seq > state.seq {
                    let mut state = state.to_mut();
                    state.seq = seq;
                    state.val = val;
                }
            }
            Internal(AckReplicate(expected_req_id))
                if matches!(state.phase,
                            Some(AbdPhase::Phase2 { request_id, ref acks, .. })
                            if request_id == expected_req_id && !acks.contains(&src)) =>
            {
                let mut state = state.to_mut();
                if let Some(AbdPhase::Phase2 {
                    request_id: req_id,
                    requester_id: requester,
                    read,
                    acks,
                    ..
                }) = &mut state.phase {
                    acks.insert(src);
                    if acks.len() == majority(self.peers.len() + 1) {
                        let msg = if let Some(val) = read {
                            GetOk(*req_id, std::mem::take(val))
                        } else {
                            PutOk(*req_id)
                        };
                        o.send(*requester, msg);
                        state.phase = None;
                    }
                }
            }
            _ => {}
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use stateright::{*, semantics::*, semantics::register::*};

    #[test]
    fn is_linearizable_quick() {
        let checker = base_model()
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![1]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker().threads(num_cpus::get()).spawn_dfs().join();
        checker.assert_properties();
        assert_eq!(checker.unique_state_count(), 544);
    }

    #[test]
    #[cfg_attr(debug_assertions, ignore = "enabled for --release only")]
    fn is_linearizable() {
        let checker = base_model()
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![1, 2]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0, 2]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0, 1]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker().threads(num_cpus::get()).spawn_dfs().join();
        checker.assert_properties();
        assert_eq!(checker.unique_state_count(), 37_168_889);
    }

    fn base_model()
        -> ActorModel<
            RegisterActor<AbdActor>,
            (),
            LinearizabilityTester<Id, Register<char>>>
    {
        ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .init_network(Network::new_unordered_nonduplicating([]))
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "value chosen", |_, state| {
                state.network.iter_deliverable().any(|e| {
                    if let RegisterMsg::GetOk(_, value) = e.msg {
                        return *value != '?';
                    }
                    return false
                })
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
    }
}

fn main() {
    env_logger::init_from_env(
        env_logger::Env::default().default_filter_or("info"));
    let id0 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000));
    let id1 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3001));
    let id2 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3002));
    spawn(
        serde_json::to_vec,
        |bytes| serde_json::from_slice(bytes),
        vec![
            (id0, AbdActor { peers: vec![id1, id2] }),
            (id1, AbdActor { peers: vec![id0, id2] }),
            (id2, AbdActor { peers: vec![id0, id1] }),
        ]).unwrap();
}

Suggested Exercises

  1. This algorithm can be optimized by observing that the replication phases need not replicate values if the quorum already agrees on a value. See if you can implement this optimization.

  2. More generally, replication messages only need to be sent to replicas that disagree. See if you can implement this optimization as well.

    TIP: This is a slightly more complex optimization because we need to treat "all agree" versus "not all agree" slightly differently to avoid dropping requests in some cases. Can you see why?

  3. The current implementation also assumes that the network does not redeliver messages. Revise it to account for potential message redelivery.

Summary

This chapter provided the first taste of a real distributed algorithm. We were able to incrementally infer a solution, but it was nontrivial. If there were bugs in the code, they could be relatively difficult to identify without a model checker.

In the next chapter, we will introduce the notion of "consensus" and implement it via the Multi-Paxos algorithm. That chapter is not yet available, so in the meantime you can learn more about Stateright by browsing additional Stateright examples and reviewing the Stateright API docs. If you are familiar with TLA+, then the subsequent chapter Comparison with TLA+ may also be interesting to you.

If you have any questions, comments, or ideas, please share them on Stateright's Discord server. At this time Stateright is a small personal project, and the main author is eager to hear community feedback.

Comparison with TLA+

The previous part of this book focused on model checking runnable actor systems. Stateright is also able to model check higher level designs via "abstract models," much like a traditional model checker. This chapter compares two abstract models of the two-phase commit protocol: one written in a language called TLA+ and the other written in Rust. The chapter exists to assist those who are already familiar with TLA+, so feel free to skip if you are not familiar with TLA+.

Attribution

The TLA+ model comes from the paper "Consensus on transaction commit" by Jim Gray and Leslie Lamport and is used in accordance with the ACM's Software Copyright Notice. It has been adapted slightly for this book. Here are the copyright details from the paper:

Copyright 2005 by the Association for Computing Machinery, Inc. Permission to make digital or hard copies of part or all of this work for personal or classroom use is granted without fee provided that copies are not made or distributed for profit or commercial advantage and that copies bear this notice and the full citation on the first page. Copyrights for components of this work owned by others than ACM must be honored. Abstracting with credit is permitted. To copy otherwise, to republish, to post on servers, or to redistribute to lists, requires prior specific permission and/or a fee. Request permissions from Publications Dept, ACM Inc., fax +1 (212) 869-0481, or permissions@acm.org.

Citation:

Jim Gray and Leslie Lamport. 2006. Consensus on transaction commit. ACM Trans. Database Syst. 31, 1 (March 2006), 133–160. DOI:https://doi.org/10.1145/1132863.1132867

Lecture 6 of Leslie Lamport's TLA+ Video Course is recommended for an additional overview of the specification.

Unique Benefits of Each

Before getting to the code, it is valuable to highlight that while the functionality of TLC (the model checker for TLA+) and Stateright overlap to some degree, each has unique benefits. Enumerating some of these can assist with deciding when to use each solution.

Unique benefits of TLC/TLA+:

  • Brevity: TLA+ is more concise than Rust.
  • State Reduction: TLC supports symmetry reduction.
  • Features: TLC supports arbitrarily complex temporal properties including fairness.
  • Features: TLC supports refinement mapping between models.

Unique benefits of Stateright/Rust:

  • Additional Verification: With Stateright your model and final implementation are encouraged to share code. This eliminates a possible failure mode whereby a model and it's resulting production system implementation deviate.
  • Reuse and Extensibility: Rust has a far larger library ecosystem and is arguably more amenable to reuse than TLA+. For example, Stateright's code for defining a system's operational semantics are not built into the model checker and could be provided as an external library. The same can be said about the included register semantics, linearizability tester, and actor model to name a few other examples. More generally the entire Rust crate registry (with tens of thousands of libraries) is at your disposal. In contrast, the pattern for reuse in TLA+ is copy-paste-modify, and the number of reusable modules is relatively small.
  • Checker Performance: Stateright tends to be faster and offers additional optimization possibilities such as replacing a set with a bit vector for example. While TLC allows you to override modules with Java implementations, doing so is relatively cumbersome and rarely used.
  • Final Implementation Performance: As a more auxiliary benefit, Stateright can serve as a stress test for your final implementation, identifying regressions and also facilitating performance investigation.
  • Features: Stateright offers "sometimes" properties that serve to sanity check that expected outcomes are possible. These are less powerful than temporal properties but serve a slightly different purpose because examples of these properties being met are included in the checker discoveries. You can simulate these in TLC by introducing false "invariants," but they need to be commented out and periodically run, which is more cumbersome.
  • Features: Stateright Explorer allows you to interactively browse a model's state space and also lets you jump between discoveries (whether they are property violations or instances of "sometimes" properties).

With those out of the way, let's move on to the code comparison.

Code Comparison

Both models are parameterized by a collection of "resource managers." The TLA+ spec does not specify a type, but a set is expected. The Stateright spec maps each resource manager to an integer in the range 0..N (0 to N-1 inclusive).

CONSTANT RM    
type R = usize; // RM in 0..N

#[derive(Clone)]
struct TwoPhaseSys { pub rms: Range<R> }

Next we define variables. These are global in the TLA+ spec, and their type constraints are indicated later in the spec via an invariant. In Stateright these have a statically defined type.

VARIABLES
  rmState,
  tmState,
  tmPrepared,
  msgs           
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct TwoPhaseState {
    rm_state: Vec<RmState>,
    tm_state: TmState,
    tm_prepared: Vec<bool>,
    msgs: BTreeSet<Message>,
}

Types in the TLA+ spec are conveyed via an invariant that is passed to its model checker, TLC.

Message ==
  [type : {"Prepared"}, rm : RM]  \cup  [type : {"Commit", "Abort"}]
   
TypeOK ==  
  /\ rmState \in [RM -> {"working", "prepared", "committed", "aborted"}]
  /\ tmState \in {"init", "committed", "aborted"}
  /\ tmPrepared \subseteq RM
  /\ msgs \subseteq Message
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
enum Message { Prepared { rm: R }, Commit, Abort }

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum RmState { Working, Prepared, Committed, Aborted }

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum TmState { Init, Committed, Aborted }

TLA+ leverages temporal logic to convey the specification, while Stateright requires that a trait is implemented. One other distinct aspect is that Stateright actions are also types.

Spec == Init
     /\ [][Next]_<<rmState, tmState, tmPrepared, msgs>>
#[derive(Clone, Debug)]
enum Action {
    TmRcvPrepared(R),
    TmCommit,
    TmAbort,
    RmPrepare(R),
    RmChooseToAbort(R),
    RmRcvCommitMsg(R),
    RmRcvAbortMsg(R),
}

impl Model for TwoPhaseSys {
    type State = TwoPhaseState;
    type Action = Action;
    ...
}

The initial aggregate state is one where

  • the resource mangers are ready to start a transaction;
  • the sole transaction manager is ready to start a transaction;
  • the transaction manager's view of each resource manager indicates that none have indicated that they are preparing a transaction;
  • and the network is an empty set.

Note that set semantics provide an ideal model for a network in this case because they capture the fact that networks rarely make guarantees about message order. Those guarantees must be imposed by additional protocols. And keep in mind that often those protocols only provide guarantees under limited scopes (e.g. TCP only orders messages within the lifetime of that connection, so a transient network partition can still cause message redelivery with TCP).

Init ==   
  /\ rmState = [rm \in RM |-> "working"]
  /\ tmState = "init"
  /\ tmPrepared   = {}
  /\ msgs = {}
fn init_states(&self) -> Vec<Self::State> {
    vec![TwoPhaseState {
        rm_state: self.rms.clone().map(|_| RmState::Working).collect(),
        tm_state: TmState::Init,
        tm_prepared: self.rms.clone().map(|_| false).collect(),
        msgs: Default::default(),
    }]
}

Now we get to the most interesting part of the model, the state transitions (which Stateright calls actions). TLA+ requires each transition relation to precede the aggregate next state relation (Next in this case). Each action serves two roles: (1) it defines its own preconditions and (2) it definies the subsequent state change (along with the unchanged states). In Stateright it is more idiomatic (and performant) to distinguish between the preconditions (in fn actions...) and state change (in fn next_state...).

TMCommit ==
  /\ tmState = "init"
  /\ tmPrepared = RM
  /\ tmState' = "committed"
  /\ msgs' = msgs \cup {[type |-> "Commit"]}
  /\ UNCHANGED <<rmState, tmPrepared>>

TMAbort ==
  /\ tmState = "init"
  /\ tmState' = "aborted"
  /\ msgs' = msgs \cup {[type |-> "Abort"]}
  /\ UNCHANGED <<rmState, tmPrepared>>

TMRcvPrepared(rm) ==
  /\ tmState = "init"
  /\ [type |-> "Prepared", rm |-> rm] \in msgs
  /\ tmPrepared' = tmPrepared \cup {rm}
  /\ UNCHANGED <<rmState, tmState, msgs>>

RMPrepare(rm) == 
  /\ rmState[rm] = "working"
  /\ rmState' = [rmState EXCEPT ![rm] = "prepared"]
  /\ msgs' = msgs \cup {[type |-> "Prepared", rm |-> rm]}
  /\ UNCHANGED <<tmState, tmPrepared>>
  
RMChooseToAbort(rm) ==
  /\ rmState[rm] = "working"
  /\ rmState' = [rmState EXCEPT ![rm] = "aborted"]
  /\ UNCHANGED <<tmState, tmPrepared, msgs>>

RMRcvCommitMsg(rm) ==
  /\ [type |-> "Commit"] \in msgs
  /\ rmState' = [rmState EXCEPT ![rm] = "committed"]
  /\ UNCHANGED <<tmState, tmPrepared, msgs>>

RMRcvAbortMsg(rm) ==
  /\ [type |-> "Abort"] \in msgs
  /\ rmState' = [rmState EXCEPT ![rm] = "aborted"]
  /\ UNCHANGED <<tmState, tmPrepared, msgs>>

Next ==
  \/ TMCommit \/ TMAbort
  \/ \E rm \in RM : 
       TMRcvPrepared(rm) \/ RMPrepare(rm) \/ RMChooseToAbort(rm)
         \/ RMRcvCommitMsg(rm) \/ RMRcvAbortMsg(rm)
fn actions(&self, state: &Self::State, actions: &mut Vec<Self::Action>) {
    if state.tm_state == TmState::Init
            && state.tm_prepared.iter().all(|p| *p) {
        actions.push(Action::TmCommit);
    }
    if state.tm_state == TmState::Init {
        actions.push(Action::TmAbort);
    }
    for rm in self.rms.clone() {
        if state.tm_state == TmState::Init
                && state.msgs.contains(&Message::Prepared { rm }) {
            actions.push(Action::TmRcvPrepared(rm));
        }
        if state.rm_state.get(rm) == Some(&RmState::Working) {
            actions.push(Action::RmPrepare(rm));
        }
        if state.rm_state.get(rm) == Some(&RmState::Working) {
            actions.push(Action::RmChooseToAbort(rm));
        }
        if state.msgs.contains(&Message::Commit) {
            actions.push(Action::RmRcvCommitMsg(rm));
        }
        if state.msgs.contains(&Message::Abort) {
            actions.push(Action::RmRcvAbortMsg(rm));
        }
    }
}

fn next_state(&self, last_state: &Self::State, action: Self::Action)
        -> Option<Self::State> {
    let mut state = last_state.clone();
    match action {
        Action::TmRcvPrepared(rm) => {
            state.tm_prepared[rm] = true;
        }
        Action::TmCommit => {
            state.tm_state = TmState::Committed;
            state.msgs.insert(Message::Commit);
        }
        Action::TmAbort => {
            state.tm_state = TmState::Aborted;
            state.msgs.insert(Message::Abort);
        },
        Action::RmPrepare(rm) => {
            state.rm_state[rm] = RmState::Prepared;
            state.msgs.insert(Message::Prepared { rm });
        },
        Action::RmChooseToAbort(rm) => {
            state.rm_state[rm] = RmState::Aborted;
        }
        Action::RmRcvCommitMsg(rm) => {
            state.rm_state[rm] = RmState::Committed;
        }
        Action::RmRcvAbortMsg(rm) => {
            state.rm_state[rm] = RmState::Aborted;
        }
    }
    Some(state)
}

Then we get to the sole property: if a resource manager reaches a final commit/abort state, then no other resource manager can disagree with that decision.

Consistent ==
  \A rm1, rm2 \in RM : ~ /\ rmState[rm1] = "aborted"
                         /\ rmState[rm2] = "committed"
fn properties(&self) -> Vec<Property<Self>> {
    vec![
        Property::<Self>::always("consistent", |_, state| {
           !state.rm_state.iter().any(|s1|
                state.rm_state.iter().any(|s2|
                    s1 == &RmState::Aborted && s2 == &RmState::Committed))
        }),
    ]
}

Performance Comparison

Now we need to configure the model. For TLC, this is done via a special "CFG" file, while for Stateright you simply introduce a Rust test. Rust also requires a Cargo.toml file.

SPECIFICATION Spec
INVARIANT TypeOK
INVARIANT Consistent
CONSTANTS
  RM = {0,1,2,3,4,5,6}
#[cfg(test)]
#[test]
fn can_model_2pc() {
    use stateright::Checker;
    TwoPhaseSys { rms: 0..7 }.checker()
        .threads(num_cpus::get()).spawn_dfs().join()
    	.assert_properties();
}
[package]
name = "comparison-with-tlaplus"
version = "0.1.0"
edition = "2018"

[dependencies]
num_cpus = "1"
stateright = "0.30"

TLA+ can be run from the command line using a tool such as tla-bin or tlacli, while the Stateright test is run using cargo test --release. The example below first calls build --tests to avoid timing dependency compilation but then revises the file timestamp to include compilation time relevant to the development iteration cycle.

$ tlc -workers auto TwoPhase.tla  
$ export RUSTFLAGS='-C target-cpu=native'  
$ cargo build --tests --release
$ touch src/lib.rs
$ time cargo test --release

Stateright is generally faster, and the speedup tends to increase with larger state spaces. A comparison of model checking times on the author's laptop follows.

plot of checking time (y) against state count (x)

#RMStatesTLCStaterightSpeedup
7296,4483 s1.697 s1.8X
81,745,40812 s2.566 s4.7X
910,340,35290 s8.902 s10.1X
1061,515,776674 s54.709 s12.3X