Chapter 4: Achieving Linearizability

In the last chapter we implemented a faulty replication protocol that violated linearizability because the emulated register was not atomic. This chapter introduces a more sophisticated replication protocol that the author first learned about when reading the blog post "Replicated/Fault-tolerant atomic storage" by Murat Demirbas.

As usual, we start by initializing a new Rust project:

mkdir achieving-linearizability
cd achieving-linearizability
cargo init

Then we define dependencies.

[package]
name = "achieving-linearizability"
version = "0.1.0"
edition = "2018"

[dependencies]
env_logger = "0.7"
num_cpus = "1"
serde = "1.0"
serde_json = "1.0"
stateright = "0.30"

A Truly Atomic Register

One problem with the earlier protocol is that reads can observe different values depending on which server accepts the read. We need to amend the protocol to ensure that given the same aggregate system state, any two reads are guaranteed to observe the same value. The simplest solution is to query every server when servicing a read operation, but that introduces yet another problem: availability. By forcing reads to query every server, if a single server becomes unavailable, the entire system becomes unavailable. This problem also exists for writes in that replication protocol.

The trick to solving this new problem is observing that we only need to ensure that "read sets" overlap one another. If a majority of servers agree on a value, then we know that any other majority of servers must either agree on the same value or must have a different value that is not part of the majority. We can apply the same logic to the "write set" as well. We call the set of sufficient reads a "read quorum" and the set of sufficient writes a "write quorum."

Leveraging read and write quorums solves the availability problem when a minority of servers are unreachable, but we still have a second availability problem: in many cases a read set will not agree on a value. This can happen if the read is concurrent with a write or if two earlier writes were concurrent. In those cases, the server accepting the read must either ignore the request or return an error, upon which we can improve.

To solve this remaining problem, we simply need to force the system into agreement, thereby ensuring any subsequent read either observes the forced agreement or a subsequent write. This is the technique employed by Attiya, Bar-Noy, and Dolev in their paper "Sharing Memory Robustly in Message-Passing Systems".

Both read and write operations are handled in two phases: (1) a query phase followed by (2) a replication phase.

  • 1. Query Phase: In the query phase, a server finds out what replicated values a quorum of replicas has previously accepted. The replicas also return a logical clock that serves to sequence earlier write operations, so the implementation will refer to this as a sequencer.
  • 2a. Write Replication Phase: If servicing a write, then upon receiving responses from a quorum, the server replicates the chosen value along with a slightly larger sequencer than the one observed. Once a majority of the replicas ack this second phase, the server can indicate to the client that the write is complete.
  • 2b. Read Replication Phase: If servicing a read, then upon receiving responses from a quorum, the server replicates the value with the largest observed sequencer. Once a majority of the replicas ack this second phase, the server can return the value that it replicated.

Implementation Walkthrough

We first define our message type. Abd... in the type refers to the names of the algorithm's original author's -- Attiya, Bar-Noy, and Dolev -- or "ABD."

type RequestId = u64;
type Value = char;

#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[derive(Serialize, Deserialize)]
enum AbdMsg {
    Query(RequestId),
    AckQuery(RequestId, Seq, Value),
    Replicate(RequestId, Seq, Value),
    AckReplicate(RequestId),
}

The current implementation combines the roles of "replica" and "coordinator that accepts a request and facilitates replication." seq and val are for the first role, while phase tracks information needed by the second role.

Of particular note is that for phase 1, writes need to remember what value to later replicate, which is stored in write. Conversely for phase 2, reads need to remember what value to later return to the client, which is stored in read.

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct AbdState {
    seq: Seq,
    val: Value,
    phase: Option<AbdPhase>,
}

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum AbdPhase {
    Phase1 {
        request_id: RequestId,
        requester_id: Id,
        write: Option<Value>, // `None` for read
        responses: BTreeMap<Id, (Seq, Value)>,
    },
    Phase2 {
        request_id: RequestId,
        requester_id: Id,
        read: Option<Value>, // `None` for write
        acks: BTreeSet<Id>,
    },
}

type Seq = (LogicalClock, Id); // `Id` for uniqueness
type LogicalClock = u64;

We are now ready to implement the protocol. Note that the implementation intentionally avoids decomposing the message handlers into different function calls because each call needs to manage shared actor state. Spreading state mutation across multiple locations in the source code arguably makes the implementation harder to follow (much like mutable global variables), and functions are only used where they can be reasoned about locally.

An alternative composition strategy that does often work well for actor systems involves distinguishing roles so that each has less state (having different AbdServer and AbdReplica actors for example), although we will not do that for this short example. The next chapter on the other hand will demonstrate how to compose a system of different actor types.

#[derive(Clone)]
struct AbdActor {
    peers: Vec<Id>,
}

impl Actor for AbdActor {
    type Msg = RegisterMsg<RequestId, Value, AbdMsg>;
    type State = AbdState;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        AbdState {
            seq: (0, Id::from(0)),
            val: '?',
            phase: None,
        }
    }

    fn on_msg(&self, id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        use RegisterMsg::*;
        match msg {
            Put(req_id, val) if state.phase.is_none() => {
                o.broadcast(&self.peers, &Internal(Query(req_id)));
                state.to_mut().phase = Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: src,
                    write: Some(val),
                    responses: {
                        let mut responses = BTreeMap::default();
                        responses.insert(id, (state.seq, state.val.clone()));
                        responses
                    },
                });
            }
            Get(req_id) if state.phase.is_none() => {
                o.broadcast(&self.peers, &Internal(Query(req_id)));
                state.to_mut().phase = Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: src,
                    write: None,
                    responses: {
                        let mut responses = BTreeMap::default();
                        responses.insert(id, (state.seq, state.val.clone()));
                        responses
                    },
                });
            }
            Internal(Query(req_id)) => {
                o.send(
                    src,
                    Internal(AckQuery(req_id, state.seq, state.val.clone())));
            }
            Internal(AckQuery(expected_req_id, seq, val))
                if matches!(state.phase,
                            Some(AbdPhase::Phase1 { request_id, .. })
                            if request_id == expected_req_id) =>
            {
                let mut state = state.to_mut();
                if let Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: requester,
                    write,
                    responses,
                    ..
                }) = &mut state.phase {
                    responses.insert(src, (seq, val));
                    if responses.len() == majority(self.peers.len() + 1) {
                        // Quorum reached. Move to phase 2.

                        // Determine sequencer and value.
                        let (_, (seq, val)) = responses.into_iter()
                            .max_by_key(|(_, (seq, _))| *seq)
                            .unwrap();
                        let mut seq = *seq;
                        let mut read = None;
                        let val = if let Some(val) = std::mem::take(write) {
                            seq = (seq.0 + 1, id);
                            val
                        } else {
                            read = Some(val.clone());
                            val.clone()
                        };

                        o.broadcast(
                            &self.peers,
                            &Internal(Replicate(*req_id, seq, val.clone())));

                        // Self-send `Replicate`.
                        if seq > state.seq {
                            state.seq = seq;
                            state.val = val;
                        }

                        // Self-send `AckReplicate`.
                        let mut acks = BTreeSet::default();
                        acks.insert(id);

                        state.phase = Some(AbdPhase::Phase2 {
                            request_id: *req_id,
                            requester_id: std::mem::take(requester),
                            read,
                            acks,
                        });
                    }
                }
            }
            Internal(Replicate(req_id, seq, val)) => {
                o.send(src, Internal(AckReplicate(req_id)));
                if seq > state.seq {
                    let mut state = state.to_mut();
                    state.seq = seq;
                    state.val = val;
                }
            }
            Internal(AckReplicate(expected_req_id))
                if matches!(state.phase,
                            Some(AbdPhase::Phase2 { request_id, ref acks, .. })
                            if request_id == expected_req_id && !acks.contains(&src)) =>
            {
                let mut state = state.to_mut();
                if let Some(AbdPhase::Phase2 {
                    request_id: req_id,
                    requester_id: requester,
                    read,
                    acks,
                    ..
                }) = &mut state.phase {
                    acks.insert(src);
                    if acks.len() == majority(self.peers.len() + 1) {
                        let msg = if let Some(val) = read {
                            GetOk(*req_id, std::mem::take(val))
                        } else {
                            PutOk(*req_id)
                        };
                        o.send(*requester, msg);
                        state.phase = None;
                    }
                }
            }
            _ => {}
        }
    }
}

The test case confirms that this implementation is linearizable provided that the network does not redeliver messages. Performing thorough model checking may take up to several minutes (depending on your system's performance) and will only be performed with a --release build.

    #[test]
    fn is_linearizable_quick() {
        let checker = base_model()
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![1]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker().threads(num_cpus::get()).spawn_dfs().join();
        checker.assert_properties();
        assert_eq!(checker.unique_state_count(), 544);
    }

    #[test]
    #[cfg_attr(debug_assertions, ignore = "enabled for --release only")]
    fn is_linearizable() {
        let checker = base_model()
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![1, 2]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0, 2]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0, 1]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker().threads(num_cpus::get()).spawn_dfs().join();
        checker.assert_properties();
        assert_eq!(checker.unique_state_count(), 37_168_889);
    }

    fn base_model()
        -> ActorModel<
            RegisterActor<AbdActor>,
            (),
            LinearizabilityTester<Id, Register<char>>>
    {
        ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .init_network(Network::new_unordered_nonduplicating([]))
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "value chosen", |_, state| {
                state.network.iter_deliverable().any(|e| {
                    if let RegisterMsg::GetOk(_, value) = e.msg {
                        return *value != '?';
                    }
                    return false
                })
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
    }

Complete Implementation

Here is the complete implementation for main.rs:

use serde::{Deserialize, Serialize};
use stateright::actor::{*, register::*};
use std::borrow::Cow;
use std::collections::{BTreeMap, BTreeSet};
use std::fmt::Debug;
use std::hash::Hash;
use std::net::{SocketAddrV4, Ipv4Addr};

type RequestId = u64;
type Value = char;

#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[derive(Serialize, Deserialize)]
enum AbdMsg {
    Query(RequestId),
    AckQuery(RequestId, Seq, Value),
    Replicate(RequestId, Seq, Value),
    AckReplicate(RequestId),
}
use AbdMsg::*;

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct AbdState {
    seq: Seq,
    val: Value,
    phase: Option<AbdPhase>,
}

#[derive(Clone, Debug, Eq, Hash, PartialEq)]
enum AbdPhase {
    Phase1 {
        request_id: RequestId,
        requester_id: Id,
        write: Option<Value>, // `None` for read
        responses: BTreeMap<Id, (Seq, Value)>,
    },
    Phase2 {
        request_id: RequestId,
        requester_id: Id,
        read: Option<Value>, // `None` for write
        acks: BTreeSet<Id>,
    },
}

type Seq = (LogicalClock, Id); // `Id` for uniqueness
type LogicalClock = u64;

#[derive(Clone)]
struct AbdActor {
    peers: Vec<Id>,
}

impl Actor for AbdActor {
    type Msg = RegisterMsg<RequestId, Value, AbdMsg>;
    type State = AbdState;
    type Timer = ();

    fn on_start(&self, _id: Id, _o: &mut Out<Self>) -> Self::State {
        AbdState {
            seq: (0, Id::from(0)),
            val: '?',
            phase: None,
        }
    }

    fn on_msg(&self, id: Id, state: &mut Cow<Self::State>,
              src: Id, msg: Self::Msg, o: &mut Out<Self>) {
        use RegisterMsg::*;
        match msg {
            Put(req_id, val) if state.phase.is_none() => {
                o.broadcast(&self.peers, &Internal(Query(req_id)));
                state.to_mut().phase = Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: src,
                    write: Some(val),
                    responses: {
                        let mut responses = BTreeMap::default();
                        responses.insert(id, (state.seq, state.val.clone()));
                        responses
                    },
                });
            }
            Get(req_id) if state.phase.is_none() => {
                o.broadcast(&self.peers, &Internal(Query(req_id)));
                state.to_mut().phase = Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: src,
                    write: None,
                    responses: {
                        let mut responses = BTreeMap::default();
                        responses.insert(id, (state.seq, state.val.clone()));
                        responses
                    },
                });
            }
            Internal(Query(req_id)) => {
                o.send(
                    src,
                    Internal(AckQuery(req_id, state.seq, state.val.clone())));
            }
            Internal(AckQuery(expected_req_id, seq, val))
                if matches!(state.phase,
                            Some(AbdPhase::Phase1 { request_id, .. })
                            if request_id == expected_req_id) =>
            {
                let mut state = state.to_mut();
                if let Some(AbdPhase::Phase1 {
                    request_id: req_id,
                    requester_id: requester,
                    write,
                    responses,
                    ..
                }) = &mut state.phase {
                    responses.insert(src, (seq, val));
                    if responses.len() == majority(self.peers.len() + 1) {
                        // Quorum reached. Move to phase 2.

                        // Determine sequencer and value.
                        let (_, (seq, val)) = responses.into_iter()
                            .max_by_key(|(_, (seq, _))| *seq)
                            .unwrap();
                        let mut seq = *seq;
                        let mut read = None;
                        let val = if let Some(val) = std::mem::take(write) {
                            seq = (seq.0 + 1, id);
                            val
                        } else {
                            read = Some(val.clone());
                            val.clone()
                        };

                        o.broadcast(
                            &self.peers,
                            &Internal(Replicate(*req_id, seq, val.clone())));

                        // Self-send `Replicate`.
                        if seq > state.seq {
                            state.seq = seq;
                            state.val = val;
                        }

                        // Self-send `AckReplicate`.
                        let mut acks = BTreeSet::default();
                        acks.insert(id);

                        state.phase = Some(AbdPhase::Phase2 {
                            request_id: *req_id,
                            requester_id: std::mem::take(requester),
                            read,
                            acks,
                        });
                    }
                }
            }
            Internal(Replicate(req_id, seq, val)) => {
                o.send(src, Internal(AckReplicate(req_id)));
                if seq > state.seq {
                    let mut state = state.to_mut();
                    state.seq = seq;
                    state.val = val;
                }
            }
            Internal(AckReplicate(expected_req_id))
                if matches!(state.phase,
                            Some(AbdPhase::Phase2 { request_id, ref acks, .. })
                            if request_id == expected_req_id && !acks.contains(&src)) =>
            {
                let mut state = state.to_mut();
                if let Some(AbdPhase::Phase2 {
                    request_id: req_id,
                    requester_id: requester,
                    read,
                    acks,
                    ..
                }) = &mut state.phase {
                    acks.insert(src);
                    if acks.len() == majority(self.peers.len() + 1) {
                        let msg = if let Some(val) = read {
                            GetOk(*req_id, std::mem::take(val))
                        } else {
                            PutOk(*req_id)
                        };
                        o.send(*requester, msg);
                        state.phase = None;
                    }
                }
            }
            _ => {}
        }
    }
}

#[cfg(test)]
mod test {
    use super::*;
    use stateright::{*, semantics::*, semantics::register::*};

    #[test]
    fn is_linearizable_quick() {
        let checker = base_model()
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![1]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker().threads(num_cpus::get()).spawn_dfs().join();
        checker.assert_properties();
        assert_eq!(checker.unique_state_count(), 544);
    }

    #[test]
    #[cfg_attr(debug_assertions, ignore = "enabled for --release only")]
    fn is_linearizable() {
        let checker = base_model()
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![1, 2]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0, 2]),
            }))
            .actor(RegisterActor::Server(AbdActor {
                peers: Id::vec_from(vec![0, 1]),
            }))
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .actor(RegisterActor::Client { put_count: 1, server_count: 2 })
            .checker().threads(num_cpus::get()).spawn_dfs().join();
        checker.assert_properties();
        assert_eq!(checker.unique_state_count(), 37_168_889);
    }

    fn base_model()
        -> ActorModel<
            RegisterActor<AbdActor>,
            (),
            LinearizabilityTester<Id, Register<char>>>
    {
        ActorModel::new(
                (),
                LinearizabilityTester::new(Register('?'))
            )
            .init_network(Network::new_unordered_nonduplicating([]))
            .property(Expectation::Always, "linearizable", |_, state| {
                state.history.serialized_history().is_some()
            })
            .property(Expectation::Sometimes, "value chosen", |_, state| {
                state.network.iter_deliverable().any(|e| {
                    if let RegisterMsg::GetOk(_, value) = e.msg {
                        return *value != '?';
                    }
                    return false
                })
            })
            .record_msg_in(RegisterMsg::record_returns)
            .record_msg_out(RegisterMsg::record_invocations)
    }
}

fn main() {
    env_logger::init_from_env(
        env_logger::Env::default().default_filter_or("info"));
    let id0 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3000));
    let id1 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3001));
    let id2 = Id::from(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 3002));
    spawn(
        serde_json::to_vec,
        |bytes| serde_json::from_slice(bytes),
        vec![
            (id0, AbdActor { peers: vec![id1, id2] }),
            (id1, AbdActor { peers: vec![id0, id2] }),
            (id2, AbdActor { peers: vec![id0, id1] }),
        ]).unwrap();
}

Suggested Exercises

  1. This algorithm can be optimized by observing that the replication phases need not replicate values if the quorum already agrees on a value. See if you can implement this optimization.

  2. More generally, replication messages only need to be sent to replicas that disagree. See if you can implement this optimization as well.

    TIP: This is a slightly more complex optimization because we need to treat "all agree" versus "not all agree" slightly differently to avoid dropping requests in some cases. Can you see why?

  3. The current implementation also assumes that the network does not redeliver messages. Revise it to account for potential message redelivery.

Summary

This chapter provided the first taste of a real distributed algorithm. We were able to incrementally infer a solution, but it was nontrivial. If there were bugs in the code, they could be relatively difficult to identify without a model checker.

In the next chapter, we will introduce the notion of "consensus" and implement it via the Multi-Paxos algorithm. That chapter is not yet available, so in the meantime you can learn more about Stateright by browsing additional Stateright examples and reviewing the Stateright API docs. If you are familiar with TLA+, then the subsequent chapter Comparison with TLA+ may also be interesting to you.

If you have any questions, comments, or ideas, please share them on Stateright's Discord server. At this time Stateright is a small personal project, and the main author is eager to hear community feedback.