Chapter 2: Taming the Network

In the last chapter we discovered a bug caused by the network's susceptibility to message redelivery. We address that in this chapter.

Initialize a new Rust project:

mkdir taming-the-network
cd taming-the-network
cargo init

Then add dependencies to Cargo.toml:

[package]
name = "taming-the-network"
version = "0.1.0"
edition = "2018"

[dependencies]
env_logger = "0.7"
serde_json = "1.0"
stateright = "0.30"

Here is the complete implementation for main.rs, explained below:


use stateright::actor::{*, register::*};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::net::{SocketAddrV4, Ipv4Addr};

#[derive(Clone)]
struct ServerActor;

type RequestId = u64;

#[derive(Clone, Debug, Hash, PartialEq)]
struct ActorState {
    value: char,
    delivered: BTreeSet<(Id, RequestId)>,
}

impl Actor for ServerActor {
    type Msg = RegisterMsg<RequestId, char, ()>;
    type State = ActorState;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        ActorState {
            value: '?',
            delivered: Default::default(),
        }
    }

    fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        match msg {
            RegisterMsg::Put(req_id, value) => {
                if state.delivered.contains(&(src, req_id)) { return }

                let mut state = state.to_mut();
                state.value = value;
                state.delivered.insert((src, req_id));
                o.send(src, RegisterMsg::PutOk(req_id));
            }
            RegisterMsg::Get(req_id) => {
                o.send(src, RegisterMsg::GetOk(req_id, state.value));
            }
            _ => {}
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use stateright::{*, semantics::*, semantics::register::*};
    use ActorModelAction::Deliver;
    use RegisterMsg::{Get, GetOk, Put, PutOk};

    #[test]
    fn satisfies_all_properties() {
        // Works with 1 client.
        base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 2, server_count: 1 })
            .checker().spawn_dfs().join()
            .assert_properties();

        // Or with multiple clients.
        // (TIP: test with `--release` mode for more clients)
        base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 1, server_count: 1 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 1 })
            .checker().spawn_dfs().join()
            .assert_properties();
    }

    #[test]
    fn not_linearizable_with_two_servers() {
        let checker = base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 2, server_count: 2 })
            .checker().spawn_dfs().join();
        //checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
        checker.assert_discovery("linearizable", vec![
            Deliver { src: Id::from(2), dst: Id::from(0), msg: Put(2, 'A') },
            Deliver { src: Id::from(0), dst: Id::from(2), msg: PutOk(2) },
            Deliver { src: Id::from(2), dst: Id::from(1), msg: Put(4, 'Z') },
            Deliver { src: Id::from(1), dst: Id::from(2), msg: PutOk(4) },
            Deliver { src: Id::from(2), dst: Id::from(0), msg: Get(6) },
            Deliver { src: Id::from(0), dst: Id::from(2), msg: GetOk(6, 'A') },
        ]);
    }

    fn base_model()
        -> ActorModel<
            RegisterActor<ServerActor>,
            (),
            LinearizabilityTester<Id, Register<char>>>
    {
        ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "get succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::GetOk(_, _)))
            })
            .property(Expectation::Sometimes, "put succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::PutOk(_)))
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
    }
}

// Running the program spawns a single actor on UDP port 3000. Messages are JSON-serialized.
fn main() {
    env_logger::init_from_env(env_logger::Env::default().default_filter_or("info"));
    spawn(
        serde_json::to_vec,
        |bytes| serde_json::from_slice(bytes),
        vec![
            (SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000), ServerActor)
        ]).unwrap();
}

Implementation Walkthrough

Addressing the linearizability issue is merely a matter of throwing away message redeliveries. The simplest approach is to record every delivered request ID.

type RequestId = u64;

#[derive(Clone, Debug, Hash, PartialEq)]
struct ActorState {
    value: char,
    delivered: BTreeSet<(Id, RequestId)>,
}

impl Actor for ServerActor {
    type Msg = RegisterMsg<RequestId, char, ()>;
    type State = ActorState;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        ActorState {
            value: '?',
            delivered: Default::default(),
        }
    }

    fn on_msg(&self, _id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        match msg {
            RegisterMsg::Put(req_id, value) => {
                if state.delivered.contains(&(src, req_id)) { return }

                let mut state = state.to_mut();
                state.value = value;
                state.delivered.insert((src, req_id));
                o.send(src, RegisterMsg::PutOk(req_id));
            }
            RegisterMsg::Get(req_id) => {
                o.send(src, RegisterMsg::GetOk(req_id, state.value));
            }
            _ => {}
        }
    }
}

With that small change, each server provides an independent linearizable register. In the presence of messages concurrently in flight, the register abstraction is still linearizable (for example, reads cannot observe values overwritten before the read began, among other characteristics).

The servers feature no replication, so a collection of servers does not provide a unified service that emulates a linearizable register as the servers will not generally agree upon the last value they received.

    #[test]
    fn satisfies_all_properties() {
        // Works with 1 client.
        base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 2, server_count: 1 })
            .checker().spawn_dfs().join()
            .assert_properties();

        // Or with multiple clients.
        // (TIP: test with `--release` mode for more clients)
        base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 1, server_count: 1 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 1 })
            .checker().spawn_dfs().join()
            .assert_properties();
    }

    #[test]
    fn not_linearizable_with_two_servers() {
        let checker = base_model()
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Server(ServerActor))
            .actor(RegisterActor::Client { put_count: 2, server_count: 2 })
            .checker().spawn_dfs().join();
        //checker.assert_properties(); // TRY IT: Uncomment this line, and the test will fail.
        checker.assert_discovery("linearizable", vec![
            Deliver { src: Id::from(2), dst: Id::from(0), msg: Put(2, 'A') },
            Deliver { src: Id::from(0), dst: Id::from(2), msg: PutOk(2) },
            Deliver { src: Id::from(2), dst: Id::from(1), msg: Put(4, 'Z') },
            Deliver { src: Id::from(1), dst: Id::from(2), msg: PutOk(4) },
            Deliver { src: Id::from(2), dst: Id::from(0), msg: Get(6) },
            Deliver { src: Id::from(0), dst: Id::from(2), msg: GetOk(6, 'A') },
        ]);
    }

Those two tests are supported by a helper that sets up the model.

    fn base_model()
        -> ActorModel<
            RegisterActor<ServerActor>,
            (),
            LinearizabilityTester<Id, Register<char>>>
    {
        ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "get succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::GetOk(_, _)))
            })
            .property(Expectation::Sometimes, "put succeeds", |_, state| {
                state.network.iter_deliverable()
                    .any(|e| matches!(e.msg, RegisterMsg::PutOk(_)))
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
    }

Suggested Exercises

  1. Compaction: Storing every request ID isn't viable for a long running process. The simplest approach is to require that request IDs are "monotonic" -- which means they are increasing (and gaps are acceptable). In that case, the delivery handler throws away messages with a request ID smaller than the last handled message. See if you can amend the example accordingly.
  2. Optimized Compaction: Reordered messages will be dropped because late delivered message will have a smaller request ID. Protocols need to account for the network dropping messages anyway, so generally speaking this tradeoff only impacts performance. Still, throughput can be improved by adding a "sliding window" buffer on the server side to minimize dropped messages. See if you can implement that.
  3. Lossless Link: One technique for minimizing message loss is to have the client also maintain a buffer of outgoing messages, and the client periodically resends messages that have not been acknowledged by the recipient within a particular timeout period. TCP for example does this for packets. See if you can implement this as well. If you need help, see ordered_reliable_link.rs in the Stateright repository.

Summary

This chapter showed how to fix the implementation from the previous chapter, maintaining linearizability even if messages are redelivered. The next chapter Seeking Consensus will introduce the concept of replication, which is used to provide (1) data recoverability in the event of a server crash and/or (2) improved performance for high request rates by distributing requests (such as reads) across a wider range of hosts.