Formal descriptions of message passing distributed protocols are complex and heterogeneous. In theory, writing a formal spec of a protocol is a great way to formalize and communicate its precise behavior. In practice, though, many of these specs become quite large and challenging to digest clearly. They use different messaging formats and patterns for how information is communicated between nodes, making protocol comprehension and modification tedious and error-prone. There are whole discussions around the various message types used and comparions between Raft and Viewstamped Replication.

I’ve found the way these protocols are described also often leads to confusion around the separation between (1) the messaging-specific details and communication patterns of a protocol and (2) the essential behavior required for ensuring correctness. It would be nice to have a better canonical format for describing/modeling distributed protocols that makes their similarities & differences clearer, and potentially also facilitates mechanical derivation of protocol optimizations, modifications etc. without muddying things up with too many implementation details.

Raft, for example, chooses two specific message types, RequestVote and AppendEntries, to implement its protocol behavior. It also contains a host of other specific state variables for tracking state, etc. What would a version of Raft look like if we try to abstract it to eliminate concrete message types i.e. specify it in what we can call a so-called “canonicalized” message passing form? We can take a very simple approach and see how far it takes us.

Conceptually, we will express protocols in a model where all actions on a given node follow a simple, common template:

  1. Read its local state and optionally a message from the network.
  2. Update its local state based on this read.
  3. Broadcast its entire updated state into the network as a new message.

We don’t impose any message type details on communication between nodes, so we can think of every action as based on reading some message from the network and updating its state appropriately in response. More simply, since all messages are simply a full recording of a node’s local state at sending time, we can view every action as based on reading the remote (past) state of some other node and acting in response.

So, for example, if we apply this approach to the original Raft TLA+ spec, we can have election related actions GrantVote, RecordGrantedVote, and BecomeLeader actions as follows:

\* Server i grants its vote to a candidate server.
GrantVote(i, m) ==
    /\ m.currentTerm >= currentTerm[i]
    /\ state[i] = Follower
    /\ LET  j     == m.from
            logOk == \/ LastTerm(m.log) > LastTerm(log[i])
                     \/ /\ LastTerm(m.log) = LastTerm(log[i])
                        /\ Len(m.log) >= Len(log[i])
            grant == /\ m.currentTerm >= currentTerm[i]
                     /\ logOk
                     /\ votedFor[i] \in {Nil, j} IN
            /\ votedFor' = [votedFor EXCEPT ![i] = IF grant THEN j ELSE votedFor[i]]
            /\ currentTerm' = [currentTerm EXCEPT ![i] = m.currentTerm]
            /\ UNCHANGED <<state, candidateVars, leaderVars, logVars>>
            /\ BroadcastUniversalMsg(i)
            
\* Server i records a vote that was granted for it in its current term.
RecordGrantedVote(i, m) ==
    /\ m.currentTerm = currentTerm[i]
    /\ state[i] = Candidate
    /\ votesGranted' =
        [votesGranted EXCEPT ![i] =
            \* The sender must have voted for us in this term.
            votesGranted[i] \cup IF (i = m.votedFor) THEN {m.from} ELSE {}]
    /\ UNCHANGED <<serverVars, votedFor, leaderVars, logVars, msgs>>

\* Candidate i becomes a leader.
BecomeLeader(i) ==
    /\ state[i] = Candidate
    /\ votesGranted[i] \in Quorum
    /\ state'      = [state EXCEPT ![i] = Leader]
    /\ nextIndex'  = [nextIndex EXCEPT ![i] = [j \in Server |-> Len(log[i]) + 1]]
    /\ matchIndex' = [matchIndex EXCEPT ![i] = [j \in Server |-> 0]]
    /\ UNCHANGED <<currentTerm, votedFor, candidateVars, logVars, msgs>>
    /\ BroadcastUniversalMsg(i)

where each action is parameterized on a message m whose fields exactly match the state variables on a local node, and the BroadcastUniversalMsg operator simply pushes a node’s full, updated state into the network as a new message.

We can do this similarly for the core log replication related actions:

\* Server i appends a new log entry from some other server.
AppendEntry(i, m) ==
    /\ m.currentTerm = currentTerm[i]
    /\ state[i] \in { Follower } \* is this precondition necessary?
    \* Can always append an entry if we are a prefix of the other log, and will only
    \* append if other log actually has more entries than us.
    /\ IsPrefix(log[i], m.log)
    /\ Len(m.log) > Len(log[i])
    \* Only update logs in this action. Commit learning is done separately.
    /\ log' = [log EXCEPT ![i] = Append(log[i], m.log[Len(log[i]) + 1])]
    /\ UNCHANGED <<candidateVars, commitIndex, leaderVars, votedFor, currentTerm, state>>
    /\ BroadcastUniversalMsg(i)

\* Server i learns that another server has applied an entry up to some point in its log.
LeaderLearnsOfAppliedEntry(i, m) ==
    /\ state[i] = Leader
    \* Entry is applied in current term.
    /\ m.currentTerm = currentTerm[i]
    \* Only need to update if newer.
    /\ Len(m.log) > matchIndex[i][m.from]
    \* Follower must have a matching log entry.
    /\ Len(m.log) \in DOMAIN log[i]
    /\ m.log[Len(m.log)] = log[i][Len(m.log)]
    \* Update matchIndex to highest index of their log.
    /\ matchIndex' = [matchIndex EXCEPT ![i][m.from] = Len(m.log)]
    /\ UNCHANGED <<serverVars, candidateVars, logVars, nextIndex, msgs>>

\* Leader advances its commit index.
AdvanceCommitIndex(i, Q, newCommitIndex) ==
    /\ state[i] = Leader
    /\ newCommitIndex > commitIndex[i]
    /\ LET \* The maximum indexes for which a quorum agrees
        agreeIndexes == {index \in 1..Len(log[i]) : Agree(i, index) \in Quorum}
        \* New value for commitIndex'[i]
        newCommitIndex ==
            IF /\ agreeIndexes /= {}
                /\ log[i][Max(agreeIndexes)] = currentTerm[i]
            THEN Max(agreeIndexes)
            ELSE commitIndex[i]
    IN 
        /\ commitIndex[i] < newCommitIndex \* only enabled if it actually advances
    /\ commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
    /\ UNCHANGED <<serverVars, candidateVars, leaderVars, log>>
    /\ BroadcastUniversalMsg(i)

This type of abstraction first gets rid of message passing and communication pattern specific details from the protocol. All we do is define actions that are able to read some past state of another node and make updates based on it.

History Queries

We can push this type of abstraction further, simplifying some actions to express their reads entirely in terms of history queries, rather than incrementally updating and reading an auxiliary variable. For example, for the BecomeLeader action, it is really just waiting until the votesGranted variable has accumulated the right internal state so that it can safely transition to a leader state. If we ignore this variable entirely, we can express the action precondition with one big precondition query like this:

\* Candidate i becomes a leader.
BecomeLeader(i, Q) ==
    /\ state[i] = Candidate
    /\ \A j \in Q : \E m \in msgs : m.currentTerm = currentTerm[i] /\ m.from = j /\ m.votedFor = i
    /\ state'      = [state EXCEPT ![i] = Leader]
    /\ nextIndex'  = [nextIndex EXCEPT ![i] = [j \in Server |-> Len(log[i]) + 1]]
    /\ matchIndex' = [matchIndex EXCEPT ![i] = [j \in Server |-> 0]]
    /\ UNCHANGED <<currentTerm, votedFor, candidateVars, logVars, msgs>>
    /\ BroadcastUniversalMsg(i)

which checks for the appropriate quorum of voters given the set of messages (states) in the network.

We can do something similar for the log replication related actions, the LeaderLearnsOfAppliedEntry is another similar action that records log application progress from other nodes.

\* Leader advances its commit index.
AdvanceCommitIndex(i, Q, newCommitIndex) ==
    /\ state[i] = Leader
    /\ newCommitIndex > commitIndex[i]
    /\ \A j \in Q : \E m \in msgs : 
        /\ m.from = j 
        /\ Len(m.log) >= newCommitIndex
        /\ log[i][newCommitIndex] = m.log[newCommitIndex]
        /\ m.currentTerm = currentTerm[i]
    /\ commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex]
    /\ UNCHANGED <<serverVars, candidateVars, leaderVars, log>>
    /\ BroadcastUniversalMsg(i)

Applying this history query specification approach, we end up with a simplified set of actions for the protocol:

  • BecomeCandidate
  • GrantVote
  • BecomeLeader
  • ClientRequest
  • AppendEntry
  • TruncateEntry
  • AdvanceCommitIndex
  • LearnCommit
  • UpdateTerm

where the previously required RecordGrantedVote and LeaderLearnsOfAppliedEntry actions have been subsumed into the BecomeLeader and AdvanceCommitIndex actions respectively, as well as their associated state variables votesGranted and matchIndex.

Simplifying the action structure by utilizing history queries can also have a non-trivial impact on model checking performance, as we are able to cut out a number of intermediate steps from the protocol. For example, in one experiment, even for a relatively small model (3 servers, MaxTerm = 2, MaxLogLen=1), running the original spec with RecordGrantedVote and LeaderLearnsOfAppliedEntry actions enabled generates 2,060,946 distinct states. With these actions disabled and using the history query based spec, only 27,062 distinct states were generated, a potential 75x reduction.

Query Incrementalization

Specifying a protocol in terms of history queries is conceptually satisfying and a nice way to abstract away more of the lower level protocol details. It moves the protocol further away from a practical implementation, though, since it’s not realistic for a node to have the ability to continuously read and query over the entire history of all states of other nodes. We can bridge this over to practical implementations, though, by viewing this as an incremental view maintenance problem.

That is, in a real system, we essentially want to maintain the correct output of these precondition queries based on the current state of the network. We can view this as an online maintenance problem i.e. instead of computing the query output over a giant batch of historical messages, we update the output of the query incrementally as each new message arrives. This is a formal way to map between the abstract, query-oriented protocol specification and a more practical, operational algorithmic implementation. It also, in theory, is sufficiently general i.e. as long as know that the queries we write down can be computed incrementally, any protocol we specify in this manner could in theory always be automatically “incrementalized” into a practical, operational version.

A lot of previous work has explored the foundations of evaluating these types of (first order logic) queries incrementally, particularly in the context of Datalog. I’m not as clear, though, what work has been done on automatically “incrementalizing” these types of queries into practical, operational versions for realistic protocols like Raft. Hydroflow might be the closest project tackling similar ideas.

This approach is similar to past work on the Heard-Of Model, and also a specification approach taken in some PaxosStore specifications from WeChat that they refer to as semi-symmetric message passing. The notion of specifying protocols as queries over histories also has been around for a while. This includes the foundational work done on Dedalus and Bloom by Peter Alvaro and also on DistAlgo. My understanding is that this also overlapped somewhat with the “relational transducer” model for declarative networking used in NDLog and similar techniques. The general idea of a history-oriented approach to specification has appeared in a kind of folk way in some of Lamport’s original specs of Paxos. Similar concepts also appear in posts on a message soup approach to modeling. I believe the Hydroflow work is also more recently taking these ideas further by concretely exploring ways to incrementally compute (e.g. compile) network or dataflow queries.