zookeeper-specifications/system-spec/zk-3.7/ZkV3_7_0.tla (1,905 lines of code) (raw):
(*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*)
------------------------------ MODULE ZkV3_7_0 ------------------------------
(* This is the system specification for Zab in apache/zookeeper with version 3.7.0 *)
(* Reference:
FLE: FastLeaderElection.java, Vote.java, QuorumPeer.java, e.g. in
https://github.com/apache/zookeeper.
ZAB: QuorumPeer.java, Learner.java, Follower.java, LearnerHandler.java,
Leader.java, e.g. in https://github.com/apache/zookeeper.
https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zab1.0.
*)
EXTENDS FastLeaderElection
-----------------------------------------------------------------------------
\* The set of requests that can go into history
\* CONSTANT Value \* Replaced by recorder.nClientRequest
Value == Nat
\* Zab states
CONSTANTS ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST
\* Sync modes & message types
CONSTANTS DIFF, TRUNC, SNAP
\* Message types
CONSTANTS FOLLOWERINFO, LEADERINFO, ACKEPOCH, NEWLEADER, ACKLD,
UPTODATE, PROPOSAL, ACK, COMMIT
(* NOTE: In production, there is no message type ACKLD. Server judges if counter
of ACK is 0 to distinguish one ACK represents ACKLD or not. Here we
divide ACK into ACKLD and ACK, to enhance readability of spec.*)
\* Node status
CONSTANTS ONLINE, OFFLINE
\* [MaxTimeoutFailures, MaxTransactionNum, MaxEpoch, MaxCrashes, MaxPartitions]
CONSTANT Parameters
MAXEPOCH == 10
-----------------------------------------------------------------------------
\* Variables in annotations mean variables defined in FastLeaderElection.
\* Variables that all servers use.
VARIABLES zabState, \* Current phase of server, in
\* {ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST}.
acceptedEpoch, \* Epoch of the last LEADERINFO packet accepted,
\* namely f.p in paper.
lastCommitted, \* Maximum index and zxid known to be committed,
\* namely 'lastCommitted' in Leader. Starts from 0,
\* and increases monotonically before restarting.
lastSnapshot, \* Index and zxid corresponding to latest snapshot
\* from data tree.
initialHistory \* history that server initially has before election.
\* state, \* State of server, in {LOOKING, FOLLOWING, LEADING}.
\* currentEpoch, \* Epoch of the last NEWLEADER packet accepted,
\* namely f.a in paper.
\* lastProcessed,\* Index and zxid of the last processed txn.
\* history \* History of servers: sequence of transactions,
\* containing: zxid, value, ackSid, epoch.
\* leader : [committedRequests + toBeApplied] [outstandingProposals]
\* follower: [committedRequests] [pendingTxns]
\* Variables only used for leader.
VARIABLES learners, \* Set of servers leader connects,
\* namely 'learners' in Leader.
connecting, \* Set of learners leader has received
\* FOLLOWERINFO from, namely
\* 'connectingFollowers' in Leader.
\* Set of record [sid, connected].
electing, \* Set of learners leader has received
\* ACKEPOCH from, namely 'electingFollowers'
\* in Leader. Set of record
\* [sid, peerLastZxid, inQuorum].
\* And peerLastZxid = <<-1,-1>> means has done
\* syncFollower with this sid.
\* inQuorum = TRUE means in code it is one
\* element in 'electingFollowers'.
ackldRecv, \* Set of learners leader has received
\* ACK of NEWLEADER from, namely
\* 'newLeaderProposal' in Leader.
\* Set of record [sid, connected].
forwarding, \* Set of learners that are synced with
\* leader, namely 'forwardingFollowers'
\* in Leader.
tempMaxEpoch \* ({Maximum epoch in FOLLOWEINFO} + 1) that
\* leader has received from learners,
\* namely 'epoch' in Leader.
\* leadingVoteSet \* Set of voters that follow leader.
\* Variables only used for follower.
VARIABLES connectInfo, \* Record [sid, syncMode, nlRcv].
\* sid: Leader id that follower has connected with.
\* syncMode: Sync mode according to reveiced Sync Message.
\* nlRcv: If follower has received NEWLEADER.
packetsSync \* packets of PROPOSAL and COMMIT from leader,
\* namely 'packetsNotCommitted' and
\* 'packetsCommitted' in SyncWithLeader
\* in Learner.
\* Variables about network channel.
VARIABLE msgs \* Simulates network channel.
\* msgs[i][j] means the input buffer of server j
\* from server i.
\* electionMsgs \* Network channel in FLE module.
\* Variables about status of cluster network and node presence.
VARIABLES status, \* Whether the server is online or offline.
partition \* network partition.
\* Variables only used in verifying properties.
VARIABLES epochLeader, \* Set of leaders in every epoch.
proposalMsgsLog, \* Set of all broadcast messages.
violatedInvariants \* Check whether there are conditions
\* contrary to the facts.
\* Variables only used for looking.
\*VARIABLE currentVote, \* Info of current vote, namely 'currentVote'
\* \* in QuorumPeer.
\* logicalClock, \* Election instance, namely 'logicalClock'
\* \* in FastLeaderElection.
\* receiveVotes, \* Votes from current FLE round, namely
\* \* 'recvset' in FastLeaderElection.
\* outOfElection, \* Votes from previous and current FLE round,
\* \* namely 'outofelection' in FastLeaderElection.
\* recvQueue, \* Queue of received notifications or timeout
\* \* signals.
\* waitNotmsg \* Whether waiting for new not.See line 1050
\* \* in FastLeaderElection for details.
\*VARIABLE idTable \* For mapping Server to Integers,
\* to compare ids between servers.
\* Update: we have transformed idTable from variable to function.
\*VARIABLE clientRequest \* Start from 0, and increases monotonically
\* when LeaderProcessRequest performed. To
\* avoid existing two requests with same value.
\* Update: Remove it to recorder.nClientRequest.
\* Variable used for recording critical data,
\* to constrain state space or update values.
VARIABLE recorder \* Consists: members of Parameters and pc, values.
\* Form is record:
\* [pc, nTransaction, maxEpoch, nTimeout, nClientRequest,
\* nPartition, nCrash]
serverVars == <<state, currentEpoch, lastProcessed, zabState, acceptedEpoch,
history, lastCommitted, lastSnapshot, initialHistory>>
electionVars == electionVarsL
leaderVars == <<leadingVoteSet, learners, connecting, electing,
ackldRecv, forwarding, tempMaxEpoch>>
followerVars == <<connectInfo, packetsSync>>
verifyVars == <<proposalMsgsLog, epochLeader, violatedInvariants>>
msgVars == <<msgs, electionMsgs>>
envVars == <<status, partition>>
vars == <<serverVars, electionVars, leaderVars, followerVars,
verifyVars, msgVars, envVars, recorder>>
-----------------------------------------------------------------------------
ServersIncNullPoint == Server \union {NullPoint}
Zxid ==
Seq(Nat \union {-1})
HistoryItem ==
[ zxid: Zxid,
value: Value,
ackSid: SUBSET Server,
epoch: Nat ]
Proposal ==
[ source: Server,
epoch: Nat,
zxid: Zxid,
data: Value ]
LastItem ==
[ index: Nat, zxid: Zxid ]
SyncPackets ==
[ notCommitted: Seq(HistoryItem),
committed: Seq(Zxid) ]
Message ==
[ mtype: {FOLLOWERINFO}, mzxid: Zxid ] \union
[ mtype: {LEADERINFO}, mzxid: Zxid ] \union
[ mtype: {ACKEPOCH}, mzxid: Zxid, mepoch: Nat \union {-1} ] \union
[ mtype: {DIFF}, mzxid: Zxid ] \union
[ mtype: {TRUNC}, mtruncZxid: Zxid ] \union
[ mtype: {SNAP}, msnapZxid: Zxid, msnapshot: Seq(HistoryItem)] \union
[ mtype: {PROPOSAL}, mzxid: Zxid, mdata: Value ] \union
[ mtype: {COMMIT}, mzxid: Zxid ] \union
[ mtype: {NEWLEADER}, mzxid: Zxid ] \union
[ mtype: {ACKLD}, mzxid: Zxid ] \union
[ mtype: {ACK}, mzxid: Zxid ] \union
[ mtype: {UPTODATE}, mzxid: Zxid ]
ElectionState == {LOOKING, FOLLOWING, LEADING}
ZabState == {ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST}
ViolationSet == {"stateInconsistent", "proposalInconsistent",
"commitInconsistent", "ackInconsistent",
"messageIllegal" }
SyncMode == {DIFF, TRUNC, SNAP, NONE}
Connecting == [ sid : Server,
connected: BOOLEAN ]
AckldRecv == Connecting
Electing == [ sid: Server,
peerLastZxid: Zxid,
inQuorum: BOOLEAN ]
ConnectInfo == [ sid : ServersIncNullPoint,
syncMode: SyncMode,
nlRcv: BOOLEAN ]
Vote ==
[proposedLeader: ServersIncNullPoint,
proposedZxid: Zxid,
proposedEpoch: Nat ]
ElectionVote ==
[vote: Vote, round: Nat, state: ElectionState, version: Nat]
ElectionMsg ==
[ mtype: {NOTIFICATION},
msource: Server,
mstate: ElectionState,
mround: Nat,
mvote: Vote ] \union
[ mtype: {NONE} ]
TypeOK ==
/\ zabState \in [Server -> ZabState]
/\ acceptedEpoch \in [Server -> Nat]
/\ lastCommitted \in [Server -> LastItem]
/\ lastSnapshot \in [Server -> LastItem]
/\ learners \in [Server -> SUBSET Server]
/\ connecting \in [Server -> SUBSET Connecting]
/\ electing \in [Server -> SUBSET Electing]
/\ ackldRecv \in [Server -> SUBSET AckldRecv]
/\ forwarding \in [Server -> SUBSET Server]
/\ initialHistory \in [Server -> Seq(HistoryItem)]
/\ tempMaxEpoch \in [Server -> Nat]
/\ connectInfo \in [Server -> ConnectInfo]
/\ packetsSync \in [Server -> SyncPackets]
/\ status \in [Server -> {ONLINE, OFFLINE}]
/\ partition \in [Server -> [Server -> BOOLEAN ]]
/\ proposalMsgsLog \in SUBSET Proposal
/\ epochLeader \in [1..MAXEPOCH -> SUBSET Server]
/\ violatedInvariants \in [ViolationSet -> BOOLEAN]
/\ msgs \in [Server -> [Server -> Seq(Message)]]
\* Fast Leader Election
/\ electionMsgs \in [Server -> [Server -> Seq(ElectionMsg)]]
/\ recvQueue \in [Server -> Seq(ElectionMsg)]
/\ leadingVoteSet \in [Server -> SUBSET Server]
/\ receiveVotes \in [Server -> [Server -> ElectionVote]]
/\ currentVote \in [Server -> Vote]
/\ outOfElection \in [Server -> [Server -> ElectionVote]]
/\ lastProcessed \in [Server -> LastItem]
/\ history \in [Server -> Seq(HistoryItem)]
/\ state \in [Server -> ElectionState]
/\ waitNotmsg \in [Server -> BOOLEAN]
/\ currentEpoch \in [Server -> Nat]
/\ logicalClock \in [Server -> Nat]
-----------------------------------------------------------------------------
\* Return the maximum value from the set S
Maximum(S) == IF S = {} THEN -1
ELSE CHOOSE n \in S: \A m \in S: n >= m
\* Return the minimum value from the set S
Minimum(S) == IF S = {} THEN -1
ELSE CHOOSE n \in S: \A m \in S: n <= m
\* Check server state
IsON(s) == status[s] = ONLINE
IsOFF(s) == status[s] = OFFLINE
IsLeader(s) == state[s] = LEADING
IsFollower(s) == state[s] = FOLLOWING
IsLooking(s) == state[s] = LOOKING
IsMyLearner(i, j) == j \in learners[i]
IsMyLeader(i, j) == connectInfo[i].sid = j
HasNoLeader(i) == connectInfo[i].sid = NullPoint
HasLeader(i) == connectInfo[i].sid /= NullPoint
MyVote(i) == currentVote[i].proposedLeader
\* Check if s is a quorum
IsQuorum(s) == s \in Quorums
HasPartitioned(i, j) == /\ partition[i][j] = TRUE
/\ partition[j][i] = TRUE
-----------------------------------------------------------------------------
\* Check zxid state
ToZxid(z) == [epoch |-> z[1], counter |-> z[2]]
TxnZxidEqual(txn, z) == txn.zxid[1] = z[1] /\ txn.zxid[2] = z[2]
TxnEqual(txn1, txn2) == /\ ZxidEqual(txn1.zxid, txn2.zxid)
/\ txn1.value = txn2.value
EpochPrecedeInTxn(txn1, txn2) == txn1.zxid[1] < txn2.zxid[1]
-----------------------------------------------------------------------------
\* Actions about recorder
GetParameter(p) == IF p \in DOMAIN Parameters THEN Parameters[p] ELSE 0
GetRecorder(p) == IF p \in DOMAIN recorder THEN recorder[p] ELSE 0
RecorderGetHelper(m) == (m :> recorder[m])
RecorderIncHelper(m) == (m :> recorder[m] + 1)
RecorderIncTimeout == RecorderIncHelper("nTimeout")
RecorderGetTimeout == RecorderGetHelper("nTimeout")
RecorderIncCrash == RecorderIncHelper("nCrash")
RecorderGetCrash == RecorderGetHelper("nCrash")
RecorderSetTransactionNum(pc) == ("nTransaction" :>
IF pc[1] = "LeaderProcessRequest" THEN
LET s == CHOOSE i \in Server:
\A j \in Server: Len(history'[i]) >= Len(history'[j])
IN Len(history'[s])
ELSE recorder["nTransaction"])
RecorderSetMaxEpoch(pc) == ("maxEpoch" :>
IF pc[1] = "LeaderProcessFOLLOWERINFO" THEN
LET s == CHOOSE i \in Server:
\A j \in Server: acceptedEpoch'[i] >= acceptedEpoch'[j]
IN acceptedEpoch'[s]
ELSE recorder["maxEpoch"])
RecorderSetRequests(pc) == ("nClientRequest" :>
IF pc[1] = "LeaderProcessRequest" THEN
recorder["nClientRequest"] + 1
ELSE recorder["nClientRequest"] )
RecorderSetPartition(pc) == ("nPartition" :>
IF pc[1] = "PartitionStart" THEN recorder["nPartition"] + 1
ELSE recorder["nPartition"] )
RecorderSetPc(pc) == ("pc" :> pc)
RecorderSetFailure(pc) == CASE pc[1] = "Timeout" -> RecorderIncTimeout @@ RecorderGetCrash
[] pc[1] = "LeaderTimeout" -> RecorderIncTimeout @@ RecorderGetCrash
[] pc[1] = "FollowerTimeout" -> RecorderIncTimeout @@ RecorderGetCrash
[] pc[1] = "PartitionStart" -> IF IsLooking(pc[2])
THEN RecorderGetTimeout @@ RecorderGetCrash
ELSE RecorderIncTimeout @@ RecorderGetCrash
[] pc[1] = "NodeCrash" -> IF IsLooking(pc[2])
THEN RecorderGetTimeout @@ RecorderIncCrash
ELSE RecorderIncTimeout @@ RecorderIncCrash
[] OTHER -> RecorderGetTimeout @@ RecorderGetCrash
UpdateRecorder(pc) == recorder' = RecorderSetPartition(pc)
@@ RecorderSetFailure(pc) @@ RecorderSetTransactionNum(pc)
@@ RecorderSetMaxEpoch(pc) @@ RecorderSetPc(pc)
@@ RecorderSetRequests(pc) @@ recorder
UnchangeRecorder == UNCHANGED recorder
CheckParameterHelper(n, p, Comp(_,_)) == IF p \in DOMAIN Parameters
THEN Comp(n, Parameters[p])
ELSE TRUE
CheckParameterLimit(n, p) == CheckParameterHelper(n, p, LAMBDA i, j: i < j)
CheckTimeout == CheckParameterLimit(recorder.nTimeout, "MaxTimeoutFailures")
CheckTransactionNum == CheckParameterLimit(recorder.nTransaction, "MaxTransactionNum")
CheckEpoch == CheckParameterLimit(recorder.maxEpoch, "MaxEpoch")
CheckPartition == /\ CheckTimeout
/\ CheckParameterLimit(recorder.nPartition, "MaxPartitions")
CheckCrash(i) == /\ \/ IsLooking(i)
\/ /\ ~IsLooking(i)
/\ CheckTimeout
/\ CheckParameterLimit(recorder.nCrash, "MaxCrashes")
CheckStateConstraints == CheckTimeout /\ CheckTransactionNum /\ CheckEpoch
-----------------------------------------------------------------------------
\* Actions about network
PendingFOLLOWERINFO(i, j) == /\ msgs[j][i] /= << >>
/\ msgs[j][i][1].mtype = FOLLOWERINFO
PendingLEADERINFO(i, j) == /\ msgs[j][i] /= << >>
/\ msgs[j][i][1].mtype = LEADERINFO
PendingACKEPOCH(i, j) == /\ msgs[j][i] /= << >>
/\ msgs[j][i][1].mtype = ACKEPOCH
PendingNEWLEADER(i, j) == /\ msgs[j][i] /= << >>
/\ msgs[j][i][1].mtype = NEWLEADER
PendingACKLD(i, j) == /\ msgs[j][i] /= << >>
/\ msgs[j][i][1].mtype = ACKLD
PendingUPTODATE(i, j) == /\ msgs[j][i] /= << >>
/\ msgs[j][i][1].mtype = UPTODATE
PendingPROPOSAL(i, j) == /\ msgs[j][i] /= << >>
/\ msgs[j][i][1].mtype = PROPOSAL
PendingACK(i, j) == /\ msgs[j][i] /= << >>
/\ msgs[j][i][1].mtype = ACK
PendingCOMMIT(i, j) == /\ msgs[j][i] /= << >>
/\ msgs[j][i][1].mtype = COMMIT
\* Add a message to msgs - add a message m to msgs.
Send(i, j, m) == msgs' = [msgs EXCEPT ![i][j] = Append(msgs[i][j], m)]
SendPackets(i, j, ms) == msgs' = [msgs EXCEPT ![i][j] = msgs[i][j] \o ms ]
DiscardAndSendPackets(i, j, ms) == msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
![i][j] = msgs[i][j] \o ms ]
\* Remove a message from msgs - discard head of msgs.
Discard(i, j) == msgs' = IF msgs[i][j] /= << >> THEN [msgs EXCEPT ![i][j] = Tail(msgs[i][j])]
ELSE msgs
\* Leader broadcasts a message(PROPOSAL/COMMIT) to all other servers in forwardingFollowers.
Broadcast(i, m) == msgs' = [msgs EXCEPT ![i] = [v \in Server |-> IF /\ v \in forwarding[i]
/\ v /= i
THEN Append(msgs[i][v], m)
ELSE msgs[i][v]]]
DiscardAndBroadcast(i, j, m) ==
msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
![i] = [v \in Server |-> IF /\ v \in forwarding[i]
/\ v /= i
THEN Append(msgs[i][v], m)
ELSE msgs[i][v]]]
\* Leader broadcasts LEADERINFO to all other servers in connectingFollowers.
DiscardAndBroadcastLEADERINFO(i, j, m) ==
LET new_connecting_quorum == {c \in connecting'[i]: c.connected = TRUE }
new_sid_connecting == {c.sid: c \in new_connecting_quorum }
IN
msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
![i] = [v \in Server |-> IF /\ v \in new_sid_connecting
/\ v \in learners[i]
/\ v /= i
THEN Append(msgs[i][v], m)
ELSE msgs[i][v] ] ]
\* Leader broadcasts UPTODATE to all other servers in newLeaderProposal.
DiscardAndBroadcastUPTODATE(i, j, m) ==
LET new_ackldRecv_quorum == {a \in ackldRecv'[i]: a.connected = TRUE }
new_sid_ackldRecv == {a.sid: a \in new_ackldRecv_quorum}
IN
msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
![i] = [v \in Server |-> IF /\ v \in new_sid_ackldRecv
/\ v \in learners[i]
/\ v /= i
THEN Append(msgs[i][v], m)
ELSE msgs[i][v] ] ]
\* Combination of Send and Discard - discard head of msgs[j][i] and add m into msgs.
Reply(i, j, m) == msgs' = [msgs EXCEPT ![j][i] = Tail(msgs[j][i]),
![i][j] = Append(msgs[i][j], m)]
\* Shuffle input buffer.
Clean(i, j) == msgs' = [msgs EXCEPT ![j][i] = << >>, ![i][j] = << >>]
CleanInputBuffer(i) == msgs' = [s \in Server |-> [v \in Server |-> IF v = i THEN << >>
ELSE msgs[s][v]]]
CleanInputBufferInCluster(S) == msgs' = [s \in Server |->
[v \in Server |-> IF v \in S THEN << >>
ELSE msgs[s][v] ] ]
-----------------------------------------------------------------------------
\* Define initial values for all variables
InitServerVars == /\ InitServerVarsL
/\ zabState = [s \in Server |-> ELECTION]
/\ acceptedEpoch = [s \in Server |-> 0]
/\ lastCommitted = [s \in Server |-> [ index |-> 0,
zxid |-> <<0, 0>> ] ]
/\ lastSnapshot = [s \in Server |-> [ index |-> 0,
zxid |-> <<0, 0>> ] ]
/\ initialHistory = [s \in Server |-> << >>]
InitLeaderVars == /\ InitLeaderVarsL
/\ learners = [s \in Server |-> {}]
/\ connecting = [s \in Server |-> {}]
/\ electing = [s \in Server |-> {}]
/\ ackldRecv = [s \in Server |-> {}]
/\ forwarding = [s \in Server |-> {}]
/\ tempMaxEpoch = [s \in Server |-> 0]
InitElectionVars == InitElectionVarsL
InitFollowerVars == /\ connectInfo = [s \in Server |-> [sid |-> NullPoint,
syncMode |-> NONE,
nlRcv |-> FALSE ] ]
/\ packetsSync = [s \in Server |->
[ notCommitted |-> << >>,
committed |-> << >> ] ]
InitVerifyVars == /\ proposalMsgsLog = {}
/\ epochLeader = [e \in 1..MAXEPOCH |-> {} ]
/\ violatedInvariants = [stateInconsistent |-> FALSE,
proposalInconsistent |-> FALSE,
commitInconsistent |-> FALSE,
ackInconsistent |-> FALSE,
messageIllegal |-> FALSE ]
InitMsgVars == /\ msgs = [s \in Server |-> [v \in Server |-> << >>] ]
/\ electionMsgs = [s \in Server |-> [v \in Server |-> << >>] ]
InitEnvVars == /\ status = [s \in Server |-> ONLINE ]
/\ partition = [s \in Server |-> [v \in Server |-> FALSE] ]
InitRecorder == recorder = [nTimeout |-> 0,
nTransaction |-> 0,
nPartition |-> 0,
maxEpoch |-> 0,
nCrash |-> 0,
pc |-> <<"Init">>,
nClientRequest |-> 0]
Init == /\ InitServerVars
/\ InitLeaderVars
/\ InitElectionVars
/\ InitFollowerVars
/\ InitVerifyVars
/\ InitMsgVars
/\ InitEnvVars
/\ InitRecorder
-----------------------------------------------------------------------------
ZabTurnToLeading(i) ==
/\ zabState' = [zabState EXCEPT ![i] = DISCOVERY]
/\ learners' = [learners EXCEPT ![i] = {i}]
/\ connecting' = [connecting EXCEPT ![i] = { [ sid |-> i,
connected |-> TRUE ] }]
/\ electing' = [electing EXCEPT ![i] = { [ sid |-> i,
peerLastZxid |-> <<-1,-1>>,
inQuorum |-> TRUE ] }]
/\ ackldRecv' = [ackldRecv EXCEPT ![i] = { [ sid |-> i,
connected |-> TRUE ] }]
/\ forwarding' = [forwarding EXCEPT ![i] = {}]
/\ initialHistory' = [initialHistory EXCEPT ![i] = history'[i]]
/\ tempMaxEpoch' = [tempMaxEpoch EXCEPT ![i] = acceptedEpoch[i] + 1]
ZabTurnToFollowing(i) ==
/\ zabState' = [zabState EXCEPT ![i] = DISCOVERY]
/\ initialHistory' = [initialHistory EXCEPT ![i] = history'[i]]
/\ packetsSync' = [packetsSync EXCEPT ![i].notCommitted = << >>,
![i].committed = << >> ]
(* Fast Leader Election *)
FLEReceiveNotmsg(i, j) ==
/\ IsON(i)
/\ ReceiveNotmsg(i, j)
/\ UNCHANGED <<zabState, acceptedEpoch, lastCommitted, learners, connecting,
initialHistory, electing, ackldRecv, forwarding, tempMaxEpoch,
lastSnapshot, followerVars, verifyVars, envVars, msgs>>
/\ UpdateRecorder(<<"FLEReceiveNotmsg", i, j>>)
FLENotmsgTimeout(i) ==
/\ IsON(i)
/\ NotmsgTimeout(i)
/\ UNCHANGED <<zabState, acceptedEpoch, lastCommitted, learners, connecting,
initialHistory, electing, ackldRecv, forwarding, tempMaxEpoch,
lastSnapshot, followerVars, verifyVars, envVars, msgs>>
/\ UpdateRecorder(<<"FLENotmsgTimeout", i>>)
FLEHandleNotmsg(i) ==
/\ IsON(i)
/\ HandleNotmsg(i)
/\ LET newState == state'[i]
IN
\/ /\ newState = LEADING
/\ ZabTurnToLeading(i)
/\ UNCHANGED packetsSync
\/ /\ newState = FOLLOWING
/\ ZabTurnToFollowing(i)
/\ UNCHANGED <<learners, connecting, electing, ackldRecv,
forwarding, tempMaxEpoch>>
\/ /\ newState = LOOKING
/\ UNCHANGED <<zabState, learners, connecting, electing, ackldRecv,
forwarding, tempMaxEpoch, packetsSync, initialHistory>>
/\ UNCHANGED <<lastCommitted, lastSnapshot, acceptedEpoch,
connectInfo, verifyVars, envVars, msgs>>
/\ UpdateRecorder(<<"FLEHandleNotmsg", i>>)
\* On the premise that ReceiveVotes.HasQuorums = TRUE,
\* corresponding to logic in FastLeaderElection.
FLEWaitNewNotmsg(i) ==
/\ IsON(i)
/\ WaitNewNotmsg(i)
/\ LET newState == state'[i]
IN
\/ /\ newState = LEADING
/\ ZabTurnToLeading(i)
/\ UNCHANGED packetsSync
\/ /\ newState = FOLLOWING
/\ ZabTurnToFollowing(i)
/\ UNCHANGED <<learners, connecting, electing, ackldRecv, forwarding,
tempMaxEpoch>>
\/ /\ newState = LOOKING
/\ PrintT("Note: New state is LOOKING in FLEWaitNewNotmsgEnd," \o
" which should not happen.")
/\ UNCHANGED <<zabState, learners, connecting, electing, ackldRecv,
forwarding, tempMaxEpoch, initialHistory, packetsSync>>
/\ UNCHANGED <<lastCommitted, lastSnapshot, acceptedEpoch,
connectInfo, verifyVars, envVars, msgs>>
/\ UpdateRecorder(<<"FLEWaitNewNotmsg", i>>)
-----------------------------------------------------------------------------
InitialVotes == [ vote |-> InitialVote,
round |-> 0,
state |-> LOOKING,
version |-> 0 ]
InitialConnectInfo == [sid |-> NullPoint,
syncMode |-> NONE,
nlRcv |-> FALSE ]
\* Equals to for every server in S, performing action ZabTimeout.
ZabTimeoutInCluster(S) ==
/\ state' = [s \in Server |-> IF s \in S THEN LOOKING ELSE state[s] ]
/\ lastProcessed' = [s \in Server |-> IF s \in S THEN InitLastProcessed(s)
ELSE lastProcessed[s] ]
/\ logicalClock' = [s \in Server |-> IF s \in S THEN logicalClock[s] + 1
ELSE logicalClock[s] ]
/\ currentVote' = [s \in Server |-> IF s \in S THEN
[proposedLeader |-> s,
proposedZxid |-> lastProcessed'[s].zxid,
proposedEpoch |-> currentEpoch[s] ]
ELSE currentVote[s] ]
/\ receiveVotes' = [s \in Server |-> IF s \in S THEN [v \in Server |-> InitialVotes]
ELSE receiveVotes[s] ]
/\ outOfElection' = [s \in Server |-> IF s \in S THEN [v \in Server |-> InitialVotes]
ELSE outOfElection[s] ]
/\ recvQueue' = [s \in Server |-> IF s \in S THEN << [mtype |-> NONE] >>
ELSE recvQueue[s] ]
/\ waitNotmsg' = [s \in Server |-> IF s \in S THEN FALSE ELSE waitNotmsg[s] ]
/\ leadingVoteSet' = [s \in Server |-> IF s \in S THEN {} ELSE leadingVoteSet[s] ]
/\ UNCHANGED <<electionMsgs, currentEpoch, history>>
/\ zabState' = [s \in Server |-> IF s \in S THEN ELECTION ELSE zabState[s] ]
/\ connectInfo' = [s \in Server |-> IF s \in S THEN InitialConnectInfo
ELSE connectInfo[s] ]
/\ CleanInputBufferInCluster(S)
(* Describe how a server transitions from LEADING/FOLLOWING to LOOKING.*)
FollowerShutdown(i) ==
/\ ZabTimeout(i)
/\ zabState' = [zabState EXCEPT ![i] = ELECTION]
/\ connectInfo' = [connectInfo EXCEPT ![i] = InitialConnectInfo]
LeaderShutdown(i) ==
/\ LET cluster == {i} \union learners[i]
IN ZabTimeoutInCluster(cluster)
/\ learners' = [learners EXCEPT ![i] = {}]
/\ forwarding' = [forwarding EXCEPT ![i] = {}]
RemoveElecting(set, sid) ==
LET sid_electing == {s.sid: s \in set }
IN IF sid \notin sid_electing THEN set
ELSE LET info == CHOOSE s \in set: s.sid = sid
new_info == [ sid |-> sid,
peerLastZxid |-> <<-1, -1>>,
inQuorum |-> info.inQuorum ]
IN (set \ {info}) \union {new_info}
RemoveConnectingOrAckldRecv(set, sid) ==
LET sid_set == {s.sid: s \in set}
IN IF sid \notin sid_set THEN set
ELSE LET info == CHOOSE s \in set: s.sid = sid
new_info == [ sid |-> sid,
connected |-> FALSE ]
IN (set \ {info}) \union {new_info}
\* See removeLearnerHandler for details.
RemoveLearner(i, j) ==
/\ learners' = [learners EXCEPT ![i] = @ \ {j}]
/\ forwarding' = [forwarding EXCEPT ![i] = IF j \in forwarding[i]
THEN @ \ {j} ELSE @ ]
/\ electing' = [electing EXCEPT ![i] = RemoveElecting(@, j) ]
/\ connecting' = [connecting EXCEPT ![i] = RemoveConnectingOrAckldRecv(@, j) ]
/\ ackldRecv' = [ackldRecv EXCEPT ![i] = RemoveConnectingOrAckldRecv(@, j) ]
-----------------------------------------------------------------------------
\* Actions of situation error.
PartitionStart(i, j) ==
/\ CheckPartition \* test restrictions of partition
/\ i /= j
/\ IsON(i)
/\ IsON(j)
/\ \lnot HasPartitioned(i, j)
/\ \/ /\ IsLeader(i) /\ IsMyLearner(i, j)
/\ IsFollower(j) /\ IsMyLeader(j, i)
/\ LET newLearners == learners[i] \ {j}
IN \/ /\ IsQuorum(newLearners) \* just remove this learner
/\ RemoveLearner(i, j)
/\ FollowerShutdown(j)
/\ Clean(i ,j)
\/ /\ ~IsQuorum(newLearners) \* leader switches to looking
/\ LeaderShutdown(i)
/\ UNCHANGED <<connecting, electing, ackldRecv>>
\/ /\ IsLooking(i)
/\ IsLooking(j)
/\ IdCompare(i, j)
/\ UNCHANGED <<varsL, zabState, connectInfo, msgs, learners,
forwarding, connecting, electing, ackldRecv>>
/\ partition' = [partition EXCEPT ![i][j] = TRUE, ![j][i] = TRUE ]
/\ UNCHANGED <<acceptedEpoch, lastCommitted, lastSnapshot, tempMaxEpoch,
initialHistory, verifyVars, packetsSync, status>>
/\ UpdateRecorder(<<"PartitionStart", i, j>>)
PartitionRecover(i, j) ==
/\ IsON(i)
/\ IsON(j)
/\ IdCompare(i, j)
/\ HasPartitioned(i, j)
/\ partition' = [partition EXCEPT ![i][j] = FALSE, ![j][i] = FALSE ]
/\ UNCHANGED <<serverVars, leaderVars, electionVars, followerVars,
verifyVars, msgVars, status>>
/\ UpdateRecorder(<<"PartitionRecover", i, j>>)
NodeCrash(i) ==
/\ CheckCrash(i)
/\ IsON(i)
/\ status' = [status EXCEPT ![i] = OFFLINE ]
/\ \/ /\ IsLooking(i)
/\ UNCHANGED <<varsL, zabState, connectInfo, msgs, learners,
forwarding, connecting, electing, ackldRecv>>
\/ /\ IsFollower(i)
/\ LET connectedWithLeader == HasLeader(i)
IN \/ /\ connectedWithLeader
/\ LET leader == connectInfo[i].sid
newCluster == learners[leader] \ {i}
IN
\/ /\ IsQuorum(newCluster)
/\ RemoveLearner(leader, i)
/\ FollowerShutdown(i)
/\ Clean(leader, i)
\/ /\ ~IsQuorum(newCluster)
/\ LeaderShutdown(leader)
/\ UNCHANGED <<electing, connecting, ackldRecv>>
\/ /\ ~connectedWithLeader
/\ FollowerShutdown(i)
/\ CleanInputBuffer({i})
/\ UNCHANGED <<learners, forwarding, connecting, electing, ackldRecv>>
\/ /\ IsLeader(i)
/\ LeaderShutdown(i)
/\ UNCHANGED <<electing, connecting, ackldRecv>>
/\ UNCHANGED <<acceptedEpoch, lastCommitted, lastSnapshot, tempMaxEpoch,
initialHistory, verifyVars, packetsSync, partition>>
/\ UpdateRecorder(<<"NodeCrash", i>>)
NodeStart(i) ==
/\ IsOFF(i)
/\ status' = [status EXCEPT ![i] = ONLINE ]
/\ lastProcessed' = [lastProcessed EXCEPT ![i] = InitLastProcessed(i)]
/\ lastCommitted' = [lastCommitted EXCEPT ![i] = lastSnapshot[i]]
/\ UNCHANGED <<state, currentEpoch, zabState, acceptedEpoch, history,
lastSnapshot, initialHistory, leaderVars, electionVars,
followerVars, verifyVars, msgVars, partition>>
/\ UpdateRecorder(<<"NodeStart", i>>)
-----------------------------------------------------------------------------
(* Establish connection between leader and follower, containing actions like
addLearnerHandler, findLeader, connectToLeader.*)
ConnectAndFollowerSendFOLLOWERINFO(i, j) ==
/\ IsON(i) /\ IsON(j)
/\ IsLeader(i) /\ \lnot IsMyLearner(i, j)
/\ IsFollower(j) /\ HasNoLeader(j) /\ MyVote(j) = i
/\ learners' = [learners EXCEPT ![i] = learners[i] \union {j}]
/\ connectInfo' = [connectInfo EXCEPT ![j].sid = i]
/\ Send(j, i, [ mtype |-> FOLLOWERINFO,
mzxid |-> <<acceptedEpoch[j], 0>> ])
/\ UNCHANGED <<serverVars, electionVars, leadingVoteSet, connecting,
electing, ackldRecv, forwarding, tempMaxEpoch,
verifyVars, envVars, electionMsgs, packetsSync>>
/\ UpdateRecorder(<<"ConnectAndFollowerSendFOLLOWERINFO", i, j>>)
\* waitingForNewEpoch in Leader
WaitingForNewEpoch(i, set) == (i \in set /\ IsQuorum(set)) = FALSE
WaitingForNewEpochTurnToFalse(i, set) == /\ i \in set
/\ IsQuorum(set)
\* There may exists some follower in connecting but shuts down and
\* connects again. So when leader broadcasts LEADERINFO, the
\* follower will receive LEADERINFO, and receive it again after
\* sending FOLLOWERINFO. So connected makes sure each follower
\* will only receive LEADERINFO at most once before timeout.
UpdateConnectingOrAckldRecv(oldSet, sid) ==
LET sid_set == {s.sid: s \in oldSet}
IN IF sid \in sid_set
THEN LET old_info == CHOOSE info \in oldSet: info.sid = sid
follower_info == [ sid |-> sid,
connected |-> TRUE ]
IN (oldSet \ {old_info} ) \union {follower_info}
ELSE LET follower_info == [ sid |-> sid,
connected |-> TRUE ]
IN oldSet \union {follower_info}
(* Leader waits for receiving FOLLOWERINFO from a quorum including itself,
and chooses a new epoch e' as its own epoch and broadcasts LEADERINFO.
See getEpochToPropose in Leader for details. *)
LeaderProcessFOLLOWERINFO(i, j) ==
/\ CheckEpoch \* test restrictions of max epoch
/\ IsON(i)
/\ IsLeader(i)
/\ PendingFOLLOWERINFO(i, j)
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLearner(i, j)
lastAcceptedEpoch == msg.mzxid[1]
sid_connecting == {c.sid: c \in connecting[i]}
IN
/\ infoOk
/\ \/ \* 1. has not broadcast LEADERINFO
/\ WaitingForNewEpoch(i, sid_connecting)
/\ \/ /\ zabState[i] = DISCOVERY
/\ UNCHANGED violatedInvariants
\/ /\ zabState[i] /= DISCOVERY
/\ PrintT("Exception: waitingForNewEpoch true," \o
" while zabState not DISCOVERY.")
/\ violatedInvariants' = [violatedInvariants EXCEPT !.stateInconsistent = TRUE]
/\ tempMaxEpoch' = [tempMaxEpoch EXCEPT ![i] = IF lastAcceptedEpoch >= tempMaxEpoch[i]
THEN lastAcceptedEpoch + 1
ELSE @]
/\ connecting' = [connecting EXCEPT ![i] = UpdateConnectingOrAckldRecv(@, j) ]
/\ LET new_sid_connecting == {c.sid: c \in connecting'[i]}
IN
\/ /\ WaitingForNewEpochTurnToFalse(i, new_sid_connecting)
/\ acceptedEpoch' = [acceptedEpoch EXCEPT ![i] = tempMaxEpoch'[i]]
/\ LET newLeaderZxid == <<acceptedEpoch'[i], 0>>
m == [ mtype |-> LEADERINFO,
mzxid |-> newLeaderZxid ]
IN DiscardAndBroadcastLEADERINFO(i, j, m)
\/ /\ ~WaitingForNewEpochTurnToFalse(i, new_sid_connecting)
/\ Discard(j, i)
/\ UNCHANGED acceptedEpoch
\/ \* 2. has broadcast LEADERINFO
/\ ~WaitingForNewEpoch(i, sid_connecting)
/\ Reply(i, j, [ mtype |-> LEADERINFO,
mzxid |-> <<acceptedEpoch[i], 0>> ] )
/\ UNCHANGED <<tempMaxEpoch, connecting, acceptedEpoch, violatedInvariants>>
/\ UNCHANGED <<state, currentEpoch, lastProcessed, zabState, history, lastCommitted,
followerVars, electionVars, initialHistory, leadingVoteSet, learners,
electing, ackldRecv, forwarding, proposalMsgsLog, epochLeader,
lastSnapshot, electionMsgs, envVars>>
/\ UpdateRecorder(<<"LeaderProcessFOLLOWERINFO", i, j>>)
(* Follower receives LEADERINFO. If newEpoch >= acceptedEpoch, then follower
updates acceptedEpoch and sends ACKEPOCH back, containing currentEpoch and
lastProcessedZxid. After this, zabState turns to SYNC.
See registerWithLeader in Learner for details.*)
FollowerProcessLEADERINFO(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ PendingLEADERINFO(i, j)
/\ LET msg == msgs[j][i][1]
newEpoch == msg.mzxid[1]
infoOk == IsMyLeader(i, j)
epochOk == newEpoch >= acceptedEpoch[i]
stateOk == zabState[i] = DISCOVERY
IN /\ infoOk
/\ \/ \* 1. Normal case
/\ epochOk
/\ \/ /\ stateOk
/\ \/ /\ newEpoch > acceptedEpoch[i]
/\ acceptedEpoch' = [acceptedEpoch EXCEPT ![i] = newEpoch]
/\ LET epochBytes == currentEpoch[i]
m == [ mtype |-> ACKEPOCH,
mzxid |-> lastProcessed[i].zxid,
mepoch |-> epochBytes ]
IN Reply(i, j, m)
\/ /\ newEpoch = acceptedEpoch[i]
/\ LET m == [ mtype |-> ACKEPOCH,
mzxid |-> lastProcessed[i].zxid,
mepoch |-> -1 ]
IN Reply(i, j, m)
/\ UNCHANGED acceptedEpoch
/\ zabState' = [zabState EXCEPT ![i] = SYNCHRONIZATION]
/\ UNCHANGED violatedInvariants
\/ /\ ~stateOk
/\ PrintT("Exception: Follower receives LEADERINFO," \o
" whileZabState not DISCOVERY.")
/\ violatedInvariants' = [violatedInvariants EXCEPT !.stateInconsistent = TRUE]
/\ Discard(j, i)
/\ UNCHANGED <<acceptedEpoch, zabState>>
/\ UNCHANGED <<varsL, connectInfo, learners, forwarding, electing,
connecting, ackldRecv>>
\/ \* 2. Abnormal case - go back to election
/\ ~epochOk
/\ FollowerShutdown(i)
/\ Clean(i, connectInfo[i].sid)
/\ RemoveLearner(connectInfo[i].sid, i)
/\ UNCHANGED <<acceptedEpoch, violatedInvariants>>
/\ UNCHANGED <<history, lastCommitted, tempMaxEpoch, initialHistory, lastSnapshot,
proposalMsgsLog, epochLeader, packetsSync, envVars>>
/\ UpdateRecorder(<<"FollowerProcessLEADERINFO", i, j>>)
-----------------------------------------------------------------------------
RECURSIVE UpdateAckSidHelper(_,_,_,_)
UpdateAckSidHelper(his, cur, end, target) ==
IF cur > end THEN his
ELSE LET curTxn == [ zxid |-> his[1].zxid,
value |-> his[1].value,
ackSid |-> IF target \in his[1].ackSid THEN his[1].ackSid
ELSE his[1].ackSid \union {target},
epoch |-> his[1].epoch ]
IN <<curTxn>> \o UpdateAckSidHelper(Tail(his), cur + 1, end, target)
\* There originally existed one bug in LeaderProcessACK when
\* monotonicallyInc = FALSE, and it is we did not add ackSid of
\* history in SYNC. So we update ackSid in syncFollower.
UpdateAckSid(his, lastSeenIndex, target) ==
IF Len(his) = 0 \/ lastSeenIndex = 0 THEN his
ELSE UpdateAckSidHelper(his, 1, Minimum( { Len(his), lastSeenIndex} ), target)
\* return -1: this zxid appears at least twice; Len(his) + 1: does not exist;
\* 1 ~ Len(his): exists and appears just once.
RECURSIVE ZxidToIndexHelper(_,_,_,_)
ZxidToIndexHelper(his, zxid, cur, appeared) ==
IF cur > Len(his) THEN cur
ELSE IF TxnZxidEqual(his[cur], zxid)
THEN CASE appeared = TRUE -> -1
[] OTHER -> Minimum( { cur,
ZxidToIndexHelper(his, zxid, cur + 1, TRUE) } )
ELSE ZxidToIndexHelper(his, zxid, cur + 1, appeared)
ZxidToIndex(his, zxid) == IF ZxidEqual( zxid, <<0, 0>> ) THEN 0
ELSE IF Len(his) = 0 THEN 1
ELSE LET len == Len(his) IN
IF \E idx \in 1..len: TxnZxidEqual(his[idx], zxid)
THEN ZxidToIndexHelper(his, zxid, 1, FALSE)
ELSE len + 1
\* Find index idx which meets:
\* history[idx].zxid <= zxid < history[idx + 1].zxid
RECURSIVE IndexOfZxidHelper(_,_,_,_)
IndexOfZxidHelper(his, zxid, cur, end) ==
IF cur > end THEN end
ELSE IF ZxidCompare(his[cur].zxid, zxid) THEN cur - 1
ELSE IndexOfZxidHelper(his, zxid, cur + 1, end)
IndexOfZxid(his, zxid) == IF Len(his) = 0 THEN 0
ELSE LET idx == ZxidToIndex(his, zxid)
len == Len(his)
IN
IF idx <= len THEN idx
ELSE IndexOfZxidHelper(his, zxid, 1, len)
RECURSIVE queuePackets(_,_,_,_,_)
queuePackets(queue, his, cur, committed, end) ==
IF cur > end THEN queue
ELSE CASE cur > committed ->
LET m_proposal == [ mtype |-> PROPOSAL,
mzxid |-> his[cur].zxid,
mdata |-> his[cur].value ]
IN queuePackets(Append(queue, m_proposal), his, cur + 1, committed, end)
[] cur <= committed ->
LET m_proposal == [ mtype |-> PROPOSAL,
mzxid |-> his[cur].zxid,
mdata |-> his[cur].value ]
m_commit == [ mtype |-> COMMIT,
mzxid |-> his[cur].zxid ]
newQueue == queue \o <<m_proposal, m_commit>>
IN queuePackets(newQueue, his, cur + 1, committed, end)
RECURSIVE setPacketsForChecking(_,_,_,_,_,_)
setPacketsForChecking(set, src, ep, his, cur, end) ==
IF cur > end THEN set
ELSE LET m_proposal == [ source |-> src,
epoch |-> ep,
zxid |-> his[cur].zxid,
data |-> his[cur].value ]
IN setPacketsForChecking((set \union {m_proposal}), src, ep, his, cur + 1, end)
\* Func lead() calls zk.loadData(), which will call takeSnapshot().
LastSnapshot(i) == IF zabState[i] = BROADCAST THEN lastSnapshot[i]
ELSE CASE IsLeader(i) ->
LET lastIndex == Len(history[i])
IN IF lastIndex = 0 THEN [ index |-> 0,
zxid |-> <<0, 0>> ]
ELSE [ index |-> lastIndex,
zxid |-> history[i][lastIndex].zxid ]
[] OTHER -> lastSnapshot[i]
\* To compress state space,
\* 1. we merge sending SNAP and outputting snapshot buffer into sending SNAP, and
\* 2. substitute sub sequence of history for snapshot of data tree.
SerializeSnapshot(i, idx) == IF idx <= 0 THEN << >>
ELSE SubSeq(history[i], 1, idx)
(* See queueCommittedProposals in LearnerHandler and startForwarding in Leader
for details. For proposals in committedLog and toBeApplied, send <PROPOSAL,
COMMIT>. For proposals in outstandingProposals, send PROPOSAL only. *)
SendSyncMsgs(i, j, lastSeenZxid, lastSeenIndex, mode, needRemoveHead) ==
/\ LET lastCommittedIndex == IF zabState[i] = BROADCAST
THEN lastCommitted[i].index
ELSE Len(history[i])
lastProposedIndex == Len(history[i])
queue_origin == IF lastSeenIndex >= lastProposedIndex
THEN << >>
ELSE queuePackets(<< >>, history[i],
lastSeenIndex + 1, lastCommittedIndex,
lastProposedIndex)
set_forChecking == IF lastSeenIndex >= lastProposedIndex
THEN {}
ELSE setPacketsForChecking( { }, i,
acceptedEpoch[i], history[i],
lastSeenIndex + 1, lastProposedIndex)
m_trunc == [ mtype |-> TRUNC, mtruncZxid |-> lastSeenZxid ]
m_diff == [ mtype |-> DIFF, mzxid |-> lastSeenZxid ]
m_snap == [ mtype |-> SNAP, msnapZxid |-> lastSeenZxid,
msnapshot |-> SerializeSnapshot(i, lastSeenIndex) ]
newLeaderZxid == <<acceptedEpoch[i], 0>>
m_newleader == [ mtype |-> NEWLEADER,
mzxid |-> newLeaderZxid ]
queue_toSend == CASE mode = TRUNC -> (<<m_trunc>> \o queue_origin) \o <<m_newleader>>
[] mode = DIFF -> (<<m_diff>> \o queue_origin) \o <<m_newleader>>
[] mode = SNAP -> (<<m_snap>> \o queue_origin) \o <<m_newleader>>
IN /\ \/ /\ needRemoveHead
/\ DiscardAndSendPackets(i, j, queue_toSend)
\/ /\ ~needRemoveHead
/\ SendPackets(i, j, queue_toSend)
/\ proposalMsgsLog' = proposalMsgsLog \union set_forChecking
/\ forwarding' = [forwarding EXCEPT ![i] = @ \union {j} ]
/\ \/ /\ mode = TRUNC \/ mode = DIFF
/\ history' = [history EXCEPT ![i] = UpdateAckSid(@, lastSeenIndex, j) ]
\/ /\ mode = SNAP
/\ UNCHANGED history \* txns before minCommitted don't need to be committed again
(* Leader syncs with follower by sending DIFF/TRUNC/SNAP/PROPOSAL/COMMIT/NEWLEADER.
See syncFollower in LearnerHandler for details. *)
SyncFollower(i, j, peerLastZxid, needRemoveHead) ==
LET \* IsPeerNewEpochZxid == peerLastZxid[2] = 0
lastProcessedZxid == lastProcessed[i].zxid
minCommittedIdx == lastSnapshot[i].index + 1
maxCommittedIdx == IF zabState[i] = BROADCAST THEN lastCommitted[i].index
ELSE Len(history[i])
committedLogEmpty == minCommittedIdx > maxCommittedIdx
minCommittedLog == IF committedLogEmpty THEN lastProcessedZxid
ELSE history[i][minCommittedIdx].zxid
maxCommittedLog == IF committedLogEmpty THEN lastProcessedZxid
ELSE IF maxCommittedIdx = 0 THEN << 0, 0>>
ELSE history[i][maxCommittedIdx].zxid
\* Hypothesis: 1. minCommittedLog : txn with index of lastSnapshot + 1
\* 2. maxCommittedLog : LastCommitted, to compress state space.
\* 3. merge queueCommittedProposals,startForwarding and
\* sending NEWLEADER into SendSyncMsgs.
IN \/ \* case1. peerLastZxid = lastProcessedZxid,
\* sned DIFF & StartForwarding(lastProcessedZxid)
/\ ZxidEqual(peerLastZxid, lastProcessedZxid)
/\ SendSyncMsgs(i, j, peerLastZxid, lastProcessed[i].index,
DIFF, needRemoveHead)
\/ /\ ~ZxidEqual(peerLastZxid, lastProcessedZxid)
/\ \/ \* case2. peerLastZxid > maxCommittedLog,
\* send TRUNC(maxCommittedLog) & StartForwarding
/\ ZxidCompare(peerLastZxid, maxCommittedLog)
/\ SendSyncMsgs(i, j, maxCommittedLog, maxCommittedIdx,
TRUNC, needRemoveHead)
\/ \* case3. minCommittedLog <= peerLastZxid <= maxCommittedLog
/\ ~ZxidCompare(peerLastZxid, maxCommittedLog)
/\ ~ZxidCompare(minCommittedLog, peerLastZxid)
/\ LET lastSeenIndex == ZxidToIndex(history[i], peerLastZxid)
exist == /\ lastSeenIndex >= minCommittedIdx
/\ lastSeenIndex <= Len(history[i])
lastIndex == IF exist THEN lastSeenIndex
ELSE IndexOfZxid(history[i], peerLastZxid)
\* Maximum zxid that < peerLastZxid
lastZxid == IF exist THEN peerLastZxid
ELSE IF lastIndex = 0 THEN <<0, 0>>
ELSE history[i][lastIndex].zxid
IN
\/ \* case 3.1. peerLastZxid exists in committedLog,
\* DIFF + queueCommittedProposals(peerLastZxid + 1)
\* + StartForwarding
/\ exist
/\ SendSyncMsgs(i, j, peerLastZxid, lastSeenIndex,
DIFF, needRemoveHead)
\/ \* case 3.2. peerLastZxid does not exist in committedLog,
\* TRUNC(lastZxid) + queueCommittedProposals(lastZxid + 1)
\* + StartForwarding
/\ ~exist
/\ SendSyncMsgs(i, j, lastZxid, lastIndex,
TRUNC, needRemoveHead)
\/ \* case4. peerLastZxid < minCommittedLog,
\* send SNAP(lastProcessed) + StartForwarding
/\ ZxidCompare(minCommittedLog, peerLastZxid)
/\ SendSyncMsgs(i, j, lastProcessedZxid, maxCommittedIdx,
SNAP, needRemoveHead)
\* compare state summary of two servers
IsMoreRecentThan(ss1, ss2) == \/ ss1.currentEpoch > ss2.currentEpoch
\/ /\ ss1.currentEpoch = ss2.currentEpoch
/\ ZxidCompare(ss1.lastZxid, ss2.lastZxid)
\* electionFinished in Leader
ElectionFinished(i, set) == /\ i \in set
/\ IsQuorum(set)
\* There may exist some follower shuts down and connects again, while
\* it has sent ACKEPOCH or updated currentEpoch last time. This means
\* sid of this follower has existed in elecingFollower but its info
\* is old. So we need to make sure each sid in electingFollower is
\* unique and latest(newest).
UpdateElecting(oldSet, sid, peerLastZxid, inQuorum) ==
LET sid_electing == {s.sid: s \in oldSet }
IN IF sid \in sid_electing
THEN LET old_info == CHOOSE info \in oldSet : info.sid = sid
follower_info ==
[ sid |-> sid,
peerLastZxid |-> peerLastZxid,
inQuorum |-> (inQuorum \/ old_info.inQuorum) ]
IN (oldSet \ {old_info} ) \union {follower_info}
ELSE LET follower_info ==
[ sid |-> sid,
peerLastZxid |-> peerLastZxid,
inQuorum |-> inQuorum ]
IN oldSet \union {follower_info}
LeaderTurnToSynchronization(i) ==
/\ currentEpoch' = [currentEpoch EXCEPT ![i] = acceptedEpoch[i]]
/\ zabState' = [zabState EXCEPT ![i] = SYNCHRONIZATION]
(* Leader waits for receiving ACKEPOPCH from a quorum, and check whether it has most recent
state summary from them. After this, leader's zabState turns to SYNCHRONIZATION.
See waitForEpochAck in Leader for details. *)
LeaderProcessACKEPOCH(i, j) ==
/\ IsON(i)
/\ IsLeader(i)
/\ PendingACKEPOCH(i, j)
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLearner(i, j)
leaderStateSummary == [ currentEpoch |-> currentEpoch[i],
lastZxid |-> lastProcessed[i].zxid ]
followerStateSummary == [ currentEpoch |-> msg.mepoch,
lastZxid |-> msg.mzxid ]
logOk == \* whether follower is no more up-to-date than leader
~IsMoreRecentThan(followerStateSummary, leaderStateSummary)
electing_quorum == {e \in electing[i]: e.inQuorum = TRUE }
sid_electing == {s.sid: s \in electing_quorum }
IN /\ infoOk
/\ \/ \* electionFinished = true, jump ouf of waitForEpochAck.
\* Different from code, here we still need to record info
\* into electing, to help us perform syncFollower afterwards.
\* Since electing already meets quorum, it does not break
\* consistency between code and spec.
/\ ElectionFinished(i, sid_electing)
/\ electing' = [electing EXCEPT ![i] = UpdateElecting(@, j, msg.mzxid, FALSE) ]
/\ Discard(j, i)
/\ UNCHANGED <<varsL, zabState, forwarding, connectInfo,
learners, epochLeader, violatedInvariants>>
\/ /\ ~ElectionFinished(i, sid_electing)
/\ \/ /\ zabState[i] = DISCOVERY
/\ UNCHANGED violatedInvariants
\/ /\ zabState[i] /= DISCOVERY
/\ PrintT("Exception: electionFinished false," \o
" while zabState not DISCOVERY.")
/\ violatedInvariants' = [violatedInvariants EXCEPT
!.stateInconsistent = TRUE]
/\ \/ /\ followerStateSummary.currentEpoch = -1
/\ electing' = [electing EXCEPT ![i] = UpdateElecting(@, j,
msg.mzxid, FALSE) ]
/\ Discard(j, i)
/\ UNCHANGED <<varsL, zabState, forwarding, connectInfo,
learners, epochLeader>>
\/ /\ followerStateSummary.currentEpoch > -1
/\ \/ \* normal follower
/\ logOk
/\ electing' = [electing EXCEPT ![i] =
UpdateElecting(@, j, msg.mzxid, TRUE) ]
/\ LET new_electing_quorum == {e \in electing'[i]: e.inQuorum = TRUE }
new_sid_electing == {s.sid: s \in new_electing_quorum }
IN
\/ \* electionFinished = true, jump out of waitForEpochAck,
\* update currentEpoch and zabState.
/\ ElectionFinished(i, new_sid_electing)
/\ LeaderTurnToSynchronization(i)
/\ LET newLeaderEpoch == acceptedEpoch[i]
IN epochLeader' = [epochLeader EXCEPT ![newLeaderEpoch]
= @ \union {i} ] \* for checking invariants
\/ \* there still exists electionFinished = false.
/\ ~ElectionFinished(i, new_sid_electing)
/\ UNCHANGED <<currentEpoch, zabState, epochLeader>>
/\ Discard(j, i)
/\ UNCHANGED <<state, lastProcessed, electionVars, leadingVoteSet,
electionMsgs, connectInfo, learners, history, forwarding>>
\/ \* Exists follower more recent than leader
/\ ~logOk
/\ LeaderShutdown(i)
/\ UNCHANGED <<electing, epochLeader>>
/\ UNCHANGED <<acceptedEpoch, lastCommitted, lastSnapshot, connecting, ackldRecv,
tempMaxEpoch, initialHistory, packetsSync, proposalMsgsLog, envVars>>
/\ UpdateRecorder(<<"LeaderProcessACKEPOCH", i, j>>)
\* Strip syncFollower from LeaderProcessACKEPOCH.
\* Only when electionFinished = true and there exists some
\* learnerHandler has not perform syncFollower, this
\* action will be called.
LeaderSyncFollower(i, j) ==
/\ IsON(i)
/\ IsLeader(i)
/\ LET electing_quorum == {e \in electing[i]: e.inQuorum = TRUE }
electionFinished == ElectionFinished(i, {s.sid: s \in electing_quorum } )
toSync == {s \in electing[i] : /\ ~ZxidEqual( s.peerLastZxid, <<-1, -1>>)
/\ s.sid \in learners[i] }
canSync == toSync /= {}
IN
/\ electionFinished
/\ canSync
/\ \E s \in toSync: s.sid = j
/\ LET chosen == CHOOSE s \in toSync: s.sid = j
newChosen == [ sid |-> chosen.sid,
peerLastZxid |-> <<-1, -1>>, \* <<-1,-1>> means has handled.
inQuorum |-> chosen.inQuorum ]
IN /\ SyncFollower(i, chosen.sid, chosen.peerLastZxid, FALSE)
/\ electing' = [electing EXCEPT ![i] = (@ \ {chosen}) \union {newChosen} ]
/\ UNCHANGED <<state, currentEpoch, lastProcessed, zabState, acceptedEpoch,
lastCommitted, initialHistory, electionVars, leadingVoteSet,
learners, connecting, ackldRecv, tempMaxEpoch, followerVars,
lastSnapshot, epochLeader, violatedInvariants, electionMsgs, envVars>>
/\ UpdateRecorder(<<"LeaderSyncFollower", i, j>>)
TruncateLog(his, index) == IF index <= 0 THEN << >>
ELSE SubSeq(his, 1, index)
(* Follower receives DIFF/TRUNC, and then may receives PROPOSAL,COMMIT,NEWLEADER,
and UPTODATE. See syncWithLeader in Learner for details. *)
FollowerProcessSyncMessage(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ msgs[j][i] /= << >>
/\ \/ msgs[j][i][1].mtype = DIFF
\/ msgs[j][i][1].mtype = TRUNC
\/ msgs[j][i][1].mtype = SNAP
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLeader(i, j)
stateOk == zabState[i] = SYNCHRONIZATION
IN /\ infoOk
/\ \/ \* Follower should receive packets in SYNC.
/\ ~stateOk
/\ PrintT("Exception: Follower receives DIFF/TRUNC/SNAP," \o
" whileZabState not SYNCHRONIZATION.")
/\ violatedInvariants' = [violatedInvariants EXCEPT !.stateInconsistent = TRUE]
/\ UNCHANGED <<history, initialHistory, lastProcessed, lastCommitted, connectInfo>>
\/ /\ stateOk
/\ \/ /\ msg.mtype = DIFF
/\ connectInfo' = [connectInfo EXCEPT ![i].syncMode = DIFF]
/\ UNCHANGED <<history, initialHistory, lastProcessed, lastCommitted,
violatedInvariants>>
\/ /\ msg.mtype = TRUNC
/\ connectInfo' = [connectInfo EXCEPT ![i].syncMode = TRUNC]
/\ LET truncZxid == msg.mtruncZxid
truncIndex == ZxidToIndex(history[i], truncZxid)
truncOk == /\ truncIndex >= lastCommitted[i].index
/\ truncIndex <= Len(history[i])
IN
\/ /\ ~truncOk
/\ PrintT("Exception: TRUNC error.")
/\ violatedInvariants' = [violatedInvariants EXCEPT
!.proposalInconsistent = TRUE]
/\ UNCHANGED <<history, initialHistory, lastProcessed, lastCommitted>>
\/ /\ truncOk
/\ history' = [history EXCEPT
![i] = TruncateLog(history[i], truncIndex)]
/\ initialHistory' = [initialHistory EXCEPT ![i] = history'[i]]
/\ lastProcessed' = [lastProcessed EXCEPT
![i] = [ index |-> truncIndex,
zxid |-> truncZxid] ]
/\ lastCommitted' = [lastCommitted EXCEPT
![i] = [ index |-> truncIndex,
zxid |-> truncZxid] ]
/\ UNCHANGED violatedInvariants
\/ /\ msg.mtype = SNAP
/\ connectInfo' = [connectInfo EXCEPT ![i].syncMode = SNAP]
/\ history' = [history EXCEPT ![i] = msg.msnapshot]
/\ initialHistory' = [initialHistory EXCEPT ![i] = history'[i]]
/\ lastProcessed' = [lastProcessed EXCEPT
![i] = [ index |-> Len(history'[i]),
zxid |-> msg.msnapZxid] ]
/\ lastCommitted' = [lastCommitted EXCEPT
![i] = [ index |-> Len(history'[i]),
zxid |-> msg.msnapZxid] ]
/\ UNCHANGED violatedInvariants
/\ Discard(j, i)
/\ UNCHANGED <<state, currentEpoch, zabState, acceptedEpoch, electionVars,
leaderVars, tempMaxEpoch, packetsSync, lastSnapshot,
proposalMsgsLog, epochLeader, electionMsgs, envVars>>
/\ UpdateRecorder(<<"FollowerProcessSyncMessage", i, j>>)
\* See variable snapshotNeeded in Learner for details.
SnapshotNeeded(i) == \/ connectInfo[i].syncMode = TRUNC
\/ connectInfo[i].syncMode = SNAP
\* See variable writeToTxnLog in Learner for details.
WriteToTxnLog(i) == IF \/ connectInfo[i].syncMode = DIFF
\/ connectInfo[i].nlRcv = TRUE
THEN TRUE ELSE FALSE
\* See lastProposed in Leader for details.
LastProposed(i) == IF Len(history[i]) = 0 THEN [ index |-> 0,
zxid |-> <<0, 0>> ]
ELSE
LET lastIndex == Len(history[i])
entry == history[i][lastIndex]
IN [ index |-> lastIndex,
zxid |-> entry.zxid ]
\* See lastQueued in Learner for details.
LastQueued(i) == IF ~IsFollower(i) \/ zabState[i] /= SYNCHRONIZATION
THEN LastProposed(i)
ELSE \* condition: IsFollower(i) /\ zabState = SYNCHRONIZATION
LET packetsInSync == packetsSync[i].notCommitted
lenSync == Len(packetsInSync)
totalLen == Len(history[i]) + lenSync
IN IF lenSync = 0 THEN LastProposed(i)
ELSE [ index |-> totalLen,
zxid |-> packetsInSync[lenSync].zxid ]
IsNextZxid(curZxid, nextZxid) ==
\/ \* first PROPOSAL in this epoch
/\ nextZxid[2] = 1
/\ curZxid[1] < nextZxid[1]
\/ \* not first PROPOSAL in this epoch
/\ nextZxid[2] > 1
/\ curZxid[1] = nextZxid[1]
/\ curZxid[2] + 1 = nextZxid[2]
FollowerProcessPROPOSALInSync(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ PendingPROPOSAL(i, j)
/\ zabState[i] = SYNCHRONIZATION
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLeader(i, j)
isNext == IsNextZxid(LastQueued(i).zxid, msg.mzxid)
newTxn == [ zxid |-> msg.mzxid,
value |-> msg.mdata,
ackSid |-> {}, \* follower do not consider ackSid
epoch |-> acceptedEpoch[i] ] \* epoch of this round
IN /\ infoOk
/\ \/ /\ isNext
/\ packetsSync' = [packetsSync EXCEPT ![i].notCommitted
= Append(packetsSync[i].notCommitted, newTxn) ]
\/ /\ ~isNext
/\ PrintT("Warn: Follower receives PROPOSAL," \o
" while zxid != lastQueued + 1.")
/\ UNCHANGED packetsSync
\* logRequest -> SyncRequestProcessor -> SendAckRequestProcessor -> reply ack
\* So here we do not need to send ack to leader.
/\ Discard(j, i)
/\ UNCHANGED <<serverVars, electionVars, leaderVars, connectInfo,
verifyVars, electionMsgs, envVars>>
/\ UpdateRecorder(<<"FollowerProcessPROPOSALInSync", i, j>>)
RECURSIVE IndexOfFirstTxnWithEpoch(_,_,_,_)
IndexOfFirstTxnWithEpoch(his, epoch, cur, end) ==
IF cur > end THEN cur
ELSE IF his[cur].epoch = epoch THEN cur
ELSE IndexOfFirstTxnWithEpoch(his, epoch, cur + 1, end)
LastCommitted(i) == IF zabState[i] = BROADCAST THEN lastCommitted[i]
ELSE CASE IsLeader(i) ->
LET lastInitialIndex == Len(initialHistory[i])
IN IF lastInitialIndex = 0 THEN [ index |-> 0,
zxid |-> <<0, 0>> ]
ELSE [ index |-> lastInitialIndex,
zxid |-> history[i][lastInitialIndex].zxid ]
[] IsFollower(i) ->
LET completeHis == history[i] \o packetsSync[i].notCommitted
packetsCommitted == packetsSync[i].committed
lenCommitted == Len(packetsCommitted)
IN IF lenCommitted = 0 \* return last one in history
THEN LET lastIndex == Len(history[i])
lastInitialIndex == Len(initialHistory[i])
IN IF lastIndex = lastInitialIndex
THEN IF lastIndex = 0
THEN [ index |-> 0,
zxid |-> <<0, 0>> ]
ELSE [ index |-> lastIndex ,
zxid |-> history[lastIndex].zxid ]
ELSE IF lastInitialIndex < lastCommitted[i].index
THEN lastCommitted[i]
ELSE IF lastInitialIndex = 0
THEN [ index |-> 0,
zxid |-> <<0, 0>> ]
ELSE [ index |-> lastInitialIndex,
zxid |-> history[lastInitialIndex].zxid ]
ELSE \* return tail of packetsCommitted
LET committedIndex == ZxidToIndex(completeHis,
packetsCommitted[lenCommitted] )
IN [ index |-> committedIndex,
zxid |-> packetsCommitted[lenCommitted] ]
[] OTHER -> lastCommitted[i]
TxnWithIndex(i, idx) == IF ~IsFollower(i) \/ zabState[i] /= SYNCHRONIZATION
THEN history[i][idx]
ELSE LET completeHis == history[i] \o packetsSync[i].notCommitted
IN completeHis[idx]
(* To simplify specification, we assume snapshotNeeded = false and
writeToTxnLog = true. So here we just call packetsCommitted.add. *)
FollowerProcessCOMMITInSync(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ PendingCOMMIT(i, j)
/\ zabState[i] = SYNCHRONIZATION
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLeader(i, j)
committedIndex == LastCommitted(i).index + 1
exist == /\ committedIndex <= LastQueued(i).index
/\ IsNextZxid(LastCommitted(i).zxid, msg.mzxid)
match == ZxidEqual(msg.mzxid, TxnWithIndex(i, committedIndex).zxid )
IN /\ infoOk
/\ \/ /\ exist
/\ \/ /\ match
/\ LET writeToTxnLog == WriteToTxnLog(i)
IN
\/ /\ ~writeToTxnLog \* zk.processTxn() & packetsNotCommitted.remove()
/\ LET committedTxn == packetsSync[i].notCommitted[1]
IN
/\ history' = [ history EXCEPT ![i]
= Append(@, committedTxn)]
/\ lastCommitted' = [ lastCommitted EXCEPT ![i]
= [index |-> Len(history'[i]),
zxid |-> committedTxn.zxid ] ]
/\ lastProcessed' = [ lastProcessed EXCEPT ![i]
= lastCommitted'[i] ]
/\ packetsSync' = [ packetsSync EXCEPT ![i].notCommitted
= Tail(@) ]
\/ /\ writeToTxnLog \* packetsCommitted.add()
/\ packetsSync' = [ packetsSync EXCEPT ![i].committed
= Append(packetsSync[i].committed, msg.mzxid) ]
/\ UNCHANGED <<history, lastCommitted, lastProcessed>>
/\ UNCHANGED violatedInvariants
\/ /\ ~match
/\ PrintT("Warn: Follower receives COMMIT," \o
" but zxid not the next committed zxid in COMMIT.")
/\ violatedInvariants' = [violatedInvariants EXCEPT
!.commitInconsistent = TRUE ]
/\ UNCHANGED <<history, lastCommitted, lastProcessed, packetsSync>>
\/ /\ ~exist
/\ PrintT("Warn: Follower receives COMMIT," \o
" but no packets with its zxid exists.")
/\ violatedInvariants' = [violatedInvariants EXCEPT
!.commitInconsistent = TRUE ]
/\ UNCHANGED <<history, lastCommitted, lastProcessed, packetsSync>>
/\ Discard(j, i)
/\ UNCHANGED <<state, currentEpoch, zabState, acceptedEpoch,
lastSnapshot, initialHistory, electionVars, leaderVars,
connectInfo, epochLeader, proposalMsgsLog, electionMsgs, envVars>>
/\ UpdateRecorder(<<"FollowerProcessCOMMITInSync", i, j>>)
\* Assuming that everytime committing two txns, node takes snapshot.
ShouldSnapshot(i) == lastCommitted[i].index - lastSnapshot[i].index >= 2
(* There are mainly three places where calling takeSnapshot():
1. zk.loadData() in lead() when node becomes leader;
2. syncRequestProcessor.run() tells when to snapshot;
3. node processing NEWLEADER in learner.syncWithLeader();
*)
TakeSnapshot(i) == LET snapOk == lastSnapshot[i].index <= lastCommitted[i].index
IN \/ /\ snapOk
/\ lastSnapshot' = [ lastSnapshot EXCEPT ![i] = lastCommitted[i] ]
/\ UNCHANGED violatedInvariants
\/ /\ ~snapOk
/\ PrintT("Exception: index of snapshot greater than" \o
"index of committed.")
/\ violatedInvariants' = [violatedInvariants EXCEPT
!.commitInconsistent = TRUE ]
/\ UNCHANGED lastSnapshot
RECURSIVE ACKInBatches(_,_)
ACKInBatches(queue, packets) ==
IF packets = << >> THEN queue
ELSE LET head == packets[1]
newPackets == Tail(packets)
m_ack == [ mtype |-> ACK,
mzxid |-> head.zxid ]
IN ACKInBatches( Append(queue, m_ack), newPackets )
(* Update currentEpoch, and logRequest every packets in
packetsNotCommitted and clear it. As syncProcessor will
be called in logRequest, we have to reply acks here. *)
FollowerProcessNEWLEADER(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ PendingNEWLEADER(i, j)
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLeader(i, j)
packetsInSync == packetsSync[i].notCommitted
m_ackld == [ mtype |-> ACKLD,
mzxid |-> msg.mzxid ]
ms_ack == ACKInBatches( << >>, packetsInSync )
queue_toSend == <<m_ackld>> \o ms_ack \* send ACK-NEWLEADER first.
IN /\ infoOk
/\ currentEpoch' = [currentEpoch EXCEPT ![i] = acceptedEpoch[i] ]
/\ history' = [history EXCEPT ![i] = @ \o packetsInSync ]
/\ packetsSync' = [packetsSync EXCEPT ![i].notCommitted = << >> ]
/\ connectInfo' = [connectInfo EXCEPT ![i].nlRcv = TRUE,
![i].syncMode = NONE ]
/\ \/ /\ SnapshotNeeded(i)
/\ TakeSnapshot(i)
\/ /\ ~SnapshotNeeded(i)
/\ UNCHANGED <<lastSnapshot, violatedInvariants>>
/\ DiscardAndSendPackets(i, j, queue_toSend)
/\ UNCHANGED <<state, lastProcessed, zabState, acceptedEpoch, lastCommitted,
electionVars, leaderVars, initialHistory,
proposalMsgsLog, epochLeader, electionMsgs, envVars>>
/\ UpdateRecorder(<<"FollowerProcessNEWLEADER", i, j>>)
\* quorumFormed in Leader
QuorumFormed(i, set) == i \in set /\ IsQuorum(set)
UpdateElectionVote(i, epoch) == UpdateProposal(i, currentVote[i].proposedLeader,
currentVote[i].proposedZxid, epoch)
\* See startZkServer in Leader for details.
StartZkServer(i) ==
LET latest == LastProposed(i)
IN /\ lastCommitted' = [lastCommitted EXCEPT ![i] = latest]
/\ lastProcessed' = [lastProcessed EXCEPT ![i] = latest]
/\ lastSnapshot' = [lastSnapshot EXCEPT ![i] = latest]
/\ UpdateElectionVote(i, acceptedEpoch[i])
LeaderTurnToBroadcast(i) ==
/\ StartZkServer(i)
/\ zabState' = [zabState EXCEPT ![i] = BROADCAST]
(* Leader waits for receiving quorum of ACK whose lower bits of zxid is 0, and
broadcasts UPTODATE. See waitForNewLeaderAck for details. *)
LeaderProcessACKLD(i, j) ==
/\ IsON(i)
/\ IsLeader(i)
/\ PendingACKLD(i, j)
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLearner(i, j)
match == ZxidEqual(msg.mzxid, <<acceptedEpoch[i], 0>>)
currentZxid == <<acceptedEpoch[i], 0>>
m_uptodate == [ mtype |-> UPTODATE,
mzxid |-> currentZxid ] \* not important
sid_ackldRecv == {a.sid: a \in ackldRecv[i]}
IN /\ infoOk
/\ \/ \* just reply UPTODATE.
/\ QuorumFormed(i, sid_ackldRecv)
/\ Reply(i, j, m_uptodate)
/\ UNCHANGED <<ackldRecv, zabState, lastCommitted, lastProcessed,
lastSnapshot, currentVote, violatedInvariants>>
\/ /\ ~QuorumFormed(i, sid_ackldRecv)
/\ \/ /\ match
/\ ackldRecv' = [ackldRecv EXCEPT ![i] = UpdateConnectingOrAckldRecv(@, j) ]
/\ LET new_sid_ackldRecv == {a.sid: a \in ackldRecv'[i]}
IN
\/ \* jump out of waitForNewLeaderAck, and do startZkServer,
\* setZabState, and reply UPTODATE.
/\ QuorumFormed(i, new_sid_ackldRecv)
/\ LeaderTurnToBroadcast(i)
/\ DiscardAndBroadcastUPTODATE(i, j, m_uptodate)
\/ \* still wait in waitForNewLeaderAck.
/\ ~QuorumFormed(i, new_sid_ackldRecv)
/\ Discard(j, i)
/\ UNCHANGED <<zabState, lastCommitted, lastProcessed,
lastSnapshot, currentVote>>
/\ UNCHANGED violatedInvariants
\/ /\ ~match
/\ PrintT("Exception: NEWLEADER ACK is from a different epoch. ")
/\ violatedInvariants' = [violatedInvariants EXCEPT
!.ackInconsistent = TRUE]
/\ Discard(j, i)
/\ UNCHANGED <<ackldRecv, zabState, lastCommitted,
lastSnapshot, lastProcessed, currentVote>>
/\ UNCHANGED <<state, currentEpoch, acceptedEpoch, history, logicalClock, receiveVotes,
outOfElection, recvQueue, waitNotmsg, leadingVoteSet, learners, connecting,
electing, forwarding, tempMaxEpoch, initialHistory, followerVars,
proposalMsgsLog, epochLeader, electionMsgs ,envVars>>
/\ UpdateRecorder(<<"LeaderProcessACKLD", i, j>>)
TxnsWithPreviousEpoch(i) ==
LET completeHis == IF ~IsFollower(i) \/ zabState[i] /= SYNCHRONIZATION
THEN history[i]
ELSE history[i] \o packetsSync[i].notCommitted
end == Len(completeHis)
first == IndexOfFirstTxnWithEpoch(completeHis, acceptedEpoch[i], 1, end)
IN IF first > end THEN completeHis
ELSE SubSeq(completeHis, 1, first - 1)
TxnsRcvWithCurEpoch(i) ==
LET completeHis == IF ~IsFollower(i) \/ zabState[i] /= SYNCHRONIZATION
THEN history[i]
ELSE history[i] \o packetsSync[i].notCommitted
end == Len(completeHis)
first == IndexOfFirstTxnWithEpoch(completeHis, acceptedEpoch[i], 1, end)
IN IF first > end THEN << >>
ELSE SubSeq(completeHis, first, end) \* completeHis[first : end]
\* Txns received in current epoch but not committed.
\* See pendingTxns in FollowerZooKeeper for details.
PendingTxns(i) == IF ~IsFollower(i) \/ zabState[i] /= SYNCHRONIZATION
THEN SubSeq(history[i], lastCommitted[i].index + 1, Len(history[i]))
ELSE LET packetsCommitted == packetsSync[i].committed
completeHis == history[i] \o packetsSync[i].notCommitted
IN IF Len(packetsCommitted) = 0
THEN SubSeq(completeHis, Len(history[i]) + 1, Len(completeHis))
ELSE SubSeq(completeHis, LastCommitted(i).index + 1, Len(completeHis))
CommittedTxns(i) == IF ~IsFollower(i) \/ zabState[i] /= SYNCHRONIZATION
THEN SubSeq(history[i], 1, lastCommitted[i].index)
ELSE LET packetsCommitted == packetsSync[i].committed
completeHis == history[i] \o packetsSync[i].notCommitted
IN IF Len(packetsCommitted) = 0 THEN history[i]
ELSE SubSeq( completeHis, 1, LastCommitted(i).index )
\* Each zxid of packetsCommitted equals to zxid of
\* corresponding txn in txns.
RECURSIVE TxnsAndCommittedMatch(_,_)
TxnsAndCommittedMatch(txns, packetsCommitted) ==
LET len1 == Len(txns)
len2 == Len(packetsCommitted)
IN IF len2 = 0 THEN TRUE
ELSE IF len1 < len2 THEN FALSE
ELSE /\ ZxidEqual(txns[len1].zxid, packetsCommitted[len2])
/\ TxnsAndCommittedMatch( SubSeq(txns, 1, len1 - 1),
SubSeq(packetsCommitted, 1, len2 - 1) )
FollowerLogRequestInBatches(i, leader, ms_ack, packetsNotCommitted) ==
/\ history' = [history EXCEPT ![i] = @ \o packetsNotCommitted ]
/\ DiscardAndSendPackets(i, leader, ms_ack)
\* Since commit will call commitProcessor.commit, which will finally
\* update lastProcessed, we update it here atomically.
FollowerCommitInBatches(i) ==
LET committedTxns == CommittedTxns(i)
packetsCommitted == packetsSync[i].committed
match == TxnsAndCommittedMatch(committedTxns, packetsCommitted)
IN
\/ /\ match
/\ lastCommitted' = [lastCommitted EXCEPT ![i] = LastCommitted(i)]
/\ lastProcessed' = [lastProcessed EXCEPT ![i] = lastCommitted'[i]]
/\ UNCHANGED violatedInvariants
\/ /\ ~match
/\ PrintT("Warn: Committing zxid without see txn. /" \o
"Committing zxid != pending txn zxid.")
/\ violatedInvariants' = [violatedInvariants EXCEPT
!.commitInconsistent = TRUE ]
/\ UNCHANGED <<lastCommitted, lastProcessed>>
(* Follower jump out of outerLoop here, and log the stuff that came in
between snapshot and uptodate, which means calling logRequest and
commit to clear packetsNotCommitted and packetsCommitted. *)
FollowerProcessUPTODATE(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ PendingUPTODATE(i, j)
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLeader(i, j)
packetsNotCommitted == packetsSync[i].notCommitted
ms_ack == ACKInBatches(<< >>, packetsNotCommitted)
IN /\ infoOk
\* Here we ignore ack of UPTODATE.
/\ UpdateElectionVote(i, acceptedEpoch[i])
/\ FollowerLogRequestInBatches(i, j, ms_ack, packetsNotCommitted)
/\ FollowerCommitInBatches(i)
/\ packetsSync' = [packetsSync EXCEPT ![i].notCommitted = << >>,
![i].committed = << >> ]
/\ zabState' = [zabState EXCEPT ![i] = BROADCAST ]
/\ UNCHANGED <<state, currentEpoch, acceptedEpoch, logicalClock, lastSnapshot,
receiveVotes, outOfElection, recvQueue, waitNotmsg, leaderVars, envVars,
initialHistory, connectInfo, epochLeader, proposalMsgsLog, electionMsgs>>
/\ UpdateRecorder(<<"FollowerProcessUPTODATE", i, j>>)
-----------------------------------------------------------------------------
IncZxid(s, zxid) == IF currentEpoch[s] = zxid[1] THEN <<zxid[1], zxid[2] + 1>>
ELSE <<currentEpoch[s], 1>>
(* Leader receives client propose and broadcasts PROPOSAL. See processRequest
in ProposalRequestProcessor and propose in Leader for details. Since
prosalProcessor.processRequest -> syncProcessor.processRequest ->
ackProcessor.processRequest -> leader.processAck, we initially set
txn.ackSid = {i}, assuming we have done leader.processAck.
Note: In production, any server in traffic can receive requests and
forward it to leader if necessary. We choose to let leader be
the sole one who can receive write requests, to simplify spec
and keep correctness at the same time.
*)
LeaderProcessRequest(i) ==
/\ CheckTransactionNum \* test restrictions of transaction num
/\ IsON(i)
/\ IsLeader(i)
/\ zabState[i] = BROADCAST
/\ LET inBroadcast == {s \in forwarding[i]: zabState[s] = BROADCAST}
IN IsQuorum(inBroadcast)
/\ LET request_value == GetRecorder("nClientRequest") \* unique value
newTxn == [ zxid |-> IncZxid(i, LastProposed(i).zxid),
value |-> request_value,
ackSid |-> {i}, \* assume we have done leader.processAck
epoch |-> acceptedEpoch[i] ]
m_proposal == [ mtype |-> PROPOSAL,
mzxid |-> newTxn.zxid,
mdata |-> request_value ]
m_proposal_for_checking == [ source |-> i,
epoch |-> acceptedEpoch[i],
zxid |-> newTxn.zxid,
data |-> request_value ]
IN /\ history' = [history EXCEPT ![i] = Append(@, newTxn) ]
/\ \/ /\ ShouldSnapshot(i)
/\ TakeSnapshot(i)
\/ /\ ~ShouldSnapshot(i)
/\ UNCHANGED <<lastSnapshot, violatedInvariants>>
/\ Broadcast(i, m_proposal)
/\ proposalMsgsLog' = proposalMsgsLog \union {m_proposal_for_checking}
/\ UNCHANGED <<state, currentEpoch, lastProcessed, zabState, acceptedEpoch,
lastCommitted, electionVars, leaderVars, followerVars,
initialHistory, epochLeader, electionMsgs, envVars>>
/\ UpdateRecorder(<<"LeaderProcessRequest", i>>)
(* Follower processes PROPOSAL in BROADCAST. See processPacket
in Follower for details. *)
FollowerProcessPROPOSAL(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ PendingPROPOSAL(i, j)
/\ zabState[i] = BROADCAST
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLeader(i, j)
isNext == IsNextZxid( LastQueued(i).zxid, msg.mzxid)
newTxn == [ zxid |-> msg.mzxid,
value |-> msg.mdata,
ackSid |-> {},
epoch |-> acceptedEpoch[i] ]
m_ack == [ mtype |-> ACK,
mzxid |-> msg.mzxid ]
IN /\ infoOk
/\ \/ /\ isNext
/\ history' = [history EXCEPT ![i] = Append(@, newTxn)]
/\ \/ /\ ShouldSnapshot(i)
/\ TakeSnapshot(i)
\/ /\ ~ShouldSnapshot(i)
/\ UNCHANGED <<lastSnapshot, violatedInvariants>>
/\ Reply(i, j, m_ack)
\/ /\ ~isNext
/\ PrintT("Exception: Follower receives PROPOSAL, while" \o
" the transaction is not the next.")
/\ violatedInvariants' = [violatedInvariants EXCEPT
!.proposalInconsistent = TRUE]
/\ UNCHANGED <<history, lastSnapshot, msgs>>
/\ UNCHANGED <<state, currentEpoch, lastProcessed, zabState, acceptedEpoch,
lastCommitted, electionVars, leaderVars, followerVars, initialHistory,
epochLeader, proposalMsgsLog, electionMsgs, envVars>>
/\ UpdateRecorder(<<"FollowerProcessPROPOSAL", i, j>>)
\* See outstandingProposals in Leader
OutstandingProposals(i) == IF zabState[i] /= BROADCAST THEN << >>
ELSE SubSeq( history[i], lastCommitted[i].index + 1,
Len(history[i]) )
LastAckIndexFromFollower(i, j) ==
LET set_index == {idx \in 1..Len(history[i]): j \in history[i][idx].ackSid }
IN Maximum(set_index)
\* See commit in Leader for details.
LeaderCommit(s, follower, index, zxid) ==
/\ lastCommitted' = [lastCommitted EXCEPT ![s] = [ index |-> index,
zxid |-> zxid ] ]
/\ LET m_commit == [ mtype |-> COMMIT,
mzxid |-> zxid ]
IN DiscardAndBroadcast(s, follower, m_commit)
\* Try to commit one operation, called by LeaderProcessAck.
\* See tryToCommit in Leader for details.
\* commitProcessor.commit -> processWrite -> toBeApplied.processRequest
\* -> finalProcessor.processRequest, finally processTxn will be implemented
\* and lastProcessed will be updated. So we update it here.
LeaderTryToCommit(s, index, zxid, newTxn, follower) ==
LET allTxnsBeforeCommitted == lastCommitted[s].index >= index - 1
\* Only when all proposals before zxid has been committed,
\* this proposal can be permitted to be committed.
hasAllQuorums == IsQuorum(newTxn.ackSid)
\* In order to be committed, a proposal must be accepted
\* by a quorum.
ordered == lastCommitted[s].index + 1 = index
\* Commit proposals in order.
IN \/ /\ \* Current conditions do not satisfy committing the proposal.
\/ ~allTxnsBeforeCommitted
\/ ~hasAllQuorums
/\ Discard(follower, s)
/\ UNCHANGED <<violatedInvariants, lastCommitted, lastProcessed>>
\/ /\ allTxnsBeforeCommitted
/\ hasAllQuorums
/\ \/ /\ ~ordered
/\ PrintT("Warn: Committing zxid " \o ToString(zxid) \o " not first.")
/\ violatedInvariants' = [violatedInvariants EXCEPT
!.commitInconsistent = TRUE]
\/ /\ ordered
/\ UNCHANGED violatedInvariants
/\ LeaderCommit(s, follower, index, zxid)
/\ lastProcessed' = [lastProcessed EXCEPT ![s] = [ index |-> index,
zxid |-> zxid ] ]
(* Leader Keeps a count of acks for a particular proposal, and try to
commit the proposal. See case Leader.ACK in LearnerHandler,
processRequest in AckRequestProcessor, and processAck in Leader for
details. *)
LeaderProcessACK(i, j) ==
/\ IsON(i)
/\ IsLeader(i)
/\ PendingACK(i, j)
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLearner(i, j)
outstanding == LastCommitted(i).index < LastProposed(i).index
\* outstandingProposals not null
hasCommitted == ~ZxidCompare( msg.mzxid, LastCommitted(i).zxid)
\* namely, lastCommitted >= zxid
index == ZxidToIndex(history[i], msg.mzxid)
exist == index >= 1 /\ index <= LastProposed(i).index
\* the proposal exists in history
ackIndex == LastAckIndexFromFollower(i, j)
monotonicallyInc == \/ ackIndex = -1
\/ ackIndex + 1 = index
\* TCP makes everytime ackIndex should just increase by 1
IN /\ infoOk
/\ \/ /\ exist
/\ monotonicallyInc
/\ LET txn == history[i][index]
txnAfterAddAck == [ zxid |-> txn.zxid,
value |-> txn.value,
ackSid |-> txn.ackSid \union {j} ,
epoch |-> txn.epoch ]
IN \* p.addAck(sid)
/\ history' = [history EXCEPT ![i][index] = txnAfterAddAck ]
/\ \/ /\ \* Note: outstanding is 0.
\* / proposal has already been committed.
\/ ~outstanding
\/ hasCommitted
/\ Discard(j, i)
/\ UNCHANGED <<violatedInvariants, lastCommitted, lastProcessed>>
\/ /\ outstanding
/\ ~hasCommitted
/\ LeaderTryToCommit(i, index, msg.mzxid, txnAfterAddAck, j)
\/ /\ \/ ~exist
\/ ~monotonicallyInc
/\ PrintT("Exception: No such zxid. " \o
" / ackIndex doesn't inc monotonically.")
/\ violatedInvariants' = [violatedInvariants
EXCEPT !.ackInconsistent = TRUE]
/\ Discard(j, i)
/\ UNCHANGED <<history, lastCommitted, lastProcessed>>
/\ UNCHANGED <<state, currentEpoch, zabState, acceptedEpoch, electionVars,
leaderVars, initialHistory, followerVars, proposalMsgsLog, epochLeader,
lastSnapshot, electionMsgs, envVars>>
/\ UpdateRecorder(<<"LeaderProcessACK", i, j>>)
(* Follower processes COMMIT in BROADCAST. See processPacket
in Follower for details. *)
FollowerProcessCOMMIT(i, j) ==
/\ IsON(i)
/\ IsFollower(i)
/\ PendingCOMMIT(i, j)
/\ zabState[i] = BROADCAST
/\ LET msg == msgs[j][i][1]
infoOk == IsMyLeader(i, j)
pendingTxns == PendingTxns(i)
noPending == Len(pendingTxns) = 0
IN
/\ infoOk
/\ \/ /\ noPending
/\ PrintT("Warn: Committing zxid without seeing txn.")
/\ UNCHANGED <<lastCommitted, lastProcessed, violatedInvariants>>
\/ /\ ~noPending
/\ LET firstElementZxid == pendingTxns[1].zxid
match == ZxidEqual(firstElementZxid, msg.mzxid)
IN
\/ /\ ~match
/\ PrintT("Exception: Committing zxid not equals" \o
" next pending txn zxid.")
/\ violatedInvariants' = [violatedInvariants EXCEPT
!.commitInconsistent = TRUE]
/\ UNCHANGED <<lastCommitted, lastProcessed>>
\/ /\ match
/\ lastCommitted' = [lastCommitted EXCEPT
![i] = [ index |-> lastCommitted[i].index + 1,
zxid |-> firstElementZxid ] ]
/\ lastProcessed' = [lastProcessed EXCEPT
![i] = [ index |-> lastCommitted[i].index + 1,
zxid |-> firstElementZxid ] ]
/\ UNCHANGED violatedInvariants
/\ Discard(j, i)
/\ UNCHANGED <<state, currentEpoch, zabState, acceptedEpoch, history,
electionVars, leaderVars, initialHistory, followerVars,
lastSnapshot, proposalMsgsLog, epochLeader, electionMsgs, envVars>>
/\ UpdateRecorder(<<"FollowerProcessCOMMIT", i, j>>)
-----------------------------------------------------------------------------
(* Used to discard some messages which should not exist in network channel.
This action should not be triggered. *)
FilterNonexistentMessage(i) ==
/\ \E j \in Server \ {i}: /\ msgs[j][i] /= << >>
/\ LET msg == msgs[j][i][1]
IN
\/ /\ IsLeader(i)
/\ LET infoOk == IsMyLearner(i, j)
IN
\/ msg.mtype = LEADERINFO
\/ msg.mtype = NEWLEADER
\/ msg.mtype = UPTODATE
\/ msg.mtype = PROPOSAL
\/ msg.mtype = COMMIT
\/ /\ ~infoOk
/\ \/ msg.mtype = FOLLOWERINFO
\/ msg.mtype = ACKEPOCH
\/ msg.mtype = ACKLD
\/ msg.mtype = ACK
\/ /\ IsFollower(i)
/\ LET infoOk == IsMyLeader(i, j)
IN
\/ msg.mtype = FOLLOWERINFO
\/ msg.mtype = ACKEPOCH
\/ msg.mtype = ACKLD
\/ msg.mtype = ACK
\/ /\ ~infoOk
/\ \/ msg.mtype = LEADERINFO
\/ msg.mtype = NEWLEADER
\/ msg.mtype = UPTODATE
\/ msg.mtype = PROPOSAL
\/ msg.mtype = COMMIT
\/ IsLooking(i)
/\ Discard(j, i)
/\ violatedInvariants' = [violatedInvariants EXCEPT !.messageIllegal = TRUE]
/\ UNCHANGED <<serverVars, electionVars, leaderVars, envVars,
followerVars, proposalMsgsLog, epochLeader, electionMsgs>>
/\ UnchangeRecorder
-----------------------------------------------------------------------------
\* Defines how the variables may transition.
Next ==
(* FLE module *)
\/ \E i, j \in Server: FLEReceiveNotmsg(i, j)
\/ \E i \in Server: FLENotmsgTimeout(i)
\/ \E i \in Server: FLEHandleNotmsg(i)
\/ \E i \in Server: FLEWaitNewNotmsg(i)
(* situation errors like failure, network partition *)
\/ \E i, j \in Server: PartitionStart(i, j)
\/ \E i, j \in Server: PartitionRecover(i, j)
\/ \E i \in Server: NodeCrash(i)
\/ \E i \in Server: NodeStart(i)
(* Zab module - Discovery and Synchronization part *)
\/ \E i, j \in Server: ConnectAndFollowerSendFOLLOWERINFO(i, j)
\/ \E i, j \in Server: LeaderProcessFOLLOWERINFO(i, j)
\/ \E i, j \in Server: FollowerProcessLEADERINFO(i, j)
\/ \E i, j \in Server: LeaderProcessACKEPOCH(i, j)
\/ \E i, j \in Server: LeaderSyncFollower(i, j)
\/ \E i, j \in Server: FollowerProcessSyncMessage(i, j)
\/ \E i, j \in Server: FollowerProcessPROPOSALInSync(i, j)
\/ \E i, j \in Server: FollowerProcessCOMMITInSync(i, j)
\/ \E i, j \in Server: FollowerProcessNEWLEADER(i, j)
\/ \E i, j \in Server: LeaderProcessACKLD(i, j)
\/ \E i, j \in Server: FollowerProcessUPTODATE(i, j)
(* Zab module - Broadcast part *)
\/ \E i \in Server: LeaderProcessRequest(i)
\/ \E i, j \in Server: FollowerProcessPROPOSAL(i, j)
\/ \E i, j \in Server: LeaderProcessACK(i, j)
\/ \E i, j \in Server: FollowerProcessCOMMIT(i, j)
(* An action used to judge whether there are redundant messages in network *)
\/ \E i \in Server: FilterNonexistentMessage(i)
Spec == Init /\ [][Next]_vars
-----------------------------------------------------------------------------
\* Define safety properties of Zab 1.0 protocol.
CheckDuringAction == \A p \in DOMAIN violatedInvariants: violatedInvariants[p] = FALSE
\* There is most one established leader for a certain epoch.
Leadership1 == \A i, j \in Server:
/\ IsLeader(i) /\ zabState[i] \in {SYNCHRONIZATION, BROADCAST}
/\ IsLeader(j) /\ zabState[j] \in {SYNCHRONIZATION, BROADCAST}
/\ acceptedEpoch[i] = acceptedEpoch[j]
=> i = j
Leadership2 == \A epoch \in 1..MAXEPOCH: Cardinality(epochLeader[epoch]) <= 1
\* PrefixConsistency: The prefix that have been committed
\* in history in any process is the same.
PrefixConsistency == \A i, j \in Server:
LET smaller == Minimum({lastCommitted[i].index, lastCommitted[j].index})
IN \/ smaller = 0
\/ /\ smaller > 0
/\ \A index \in 1..smaller:
TxnEqual(history[i][index], history[j][index])
\* Integrity: If some follower delivers one transaction, then some primary has broadcast it.
Integrity == \A i \in Server:
/\ IsFollower(i)
/\ lastCommitted[i].index > 0
=> \A idx \in 1..lastCommitted[i].index: \E proposal \in proposalMsgsLog:
LET txn_proposal == [ zxid |-> proposal.zxid,
value |-> proposal.data ]
IN TxnEqual(history[i][idx], txn_proposal)
\* Agreement: If some follower f delivers transaction a and some follower f' delivers transaction b,
\* then f' delivers a or f delivers b.
Agreement == \A i, j \in Server:
/\ IsFollower(i) /\ lastCommitted[i].index > 0
/\ IsFollower(j) /\ lastCommitted[j].index > 0
=>
\A idx1 \in 1..lastCommitted[i].index, idx2 \in 1..lastCommitted[j].index :
\/ \E idx_j \in 1..lastCommitted[j].index:
TxnEqual(history[j][idx_j], history[i][idx1])
\/ \E idx_i \in 1..lastCommitted[i].index:
TxnEqual(history[i][idx_i], history[j][idx2])
\* Total order: If some follower delivers a before b, then any process that delivers b
\* must also deliver a and deliver a before b.
TotalOrder == \A i, j \in Server:
LET committed1 == lastCommitted[i].index
committed2 == lastCommitted[j].index
IN committed1 >= 2 /\ committed2 >= 2
=> \A idx_i1 \in 1..(committed1 - 1) : \A idx_i2 \in (idx_i1 + 1)..committed1 :
LET logOk == \E idx \in 1..committed2 :
TxnEqual(history[i][idx_i2], history[j][idx])
IN \/ ~logOk
\/ /\ logOk
/\ \E idx_j2 \in 1..committed2 :
/\ TxnEqual(history[i][idx_i2], history[j][idx_j2])
/\ \E idx_j1 \in 1..(idx_j2 - 1):
TxnEqual(history[i][idx_i1], history[j][idx_j1])
\* Local primary order: If a primary broadcasts a before it broadcasts b, then a follower that
\* delivers b must also deliver a before b.
LocalPrimaryOrder == LET p_set(i, e) == {p \in proposalMsgsLog: /\ p.source = i
/\ p.epoch = e }
txn_set(i, e) == { [ zxid |-> p.zxid,
value |-> p.data ] : p \in p_set(i, e) }
IN \A i \in Server: \A e \in 1..currentEpoch[i]:
\/ Cardinality(txn_set(i, e)) < 2
\/ /\ Cardinality(txn_set(i, e)) >= 2
/\ \E txn1, txn2 \in txn_set(i, e):
\/ TxnEqual(txn1, txn2)
\/ /\ ~TxnEqual(txn1, txn2)
/\ LET TxnPre == IF ZxidCompare(txn1.zxid, txn2.zxid) THEN txn2 ELSE txn1
TxnNext == IF ZxidCompare(txn1.zxid, txn2.zxid) THEN txn1 ELSE txn2
IN \A j \in Server: /\ lastCommitted[j].index >= 2
/\ \E idx \in 1..lastCommitted[j].index:
TxnEqual(history[j][idx], TxnNext)
=> \E idx2 \in 1..lastCommitted[j].index:
/\ TxnEqual(history[j][idx2], TxnNext)
/\ idx2 > 1
/\ \E idx1 \in 1..(idx2 - 1):
TxnEqual(history[j][idx1], TxnPre)
\* Global primary order: A follower f delivers both a with epoch e and b with epoch e', and e < e',
\* then f must deliver a before b.
GlobalPrimaryOrder == \A i \in Server: lastCommitted[i].index >= 2
=> \A idx1, idx2 \in 1..lastCommitted[i].index:
\/ ~EpochPrecedeInTxn(history[i][idx1], history[i][idx2])
\/ /\ EpochPrecedeInTxn(history[i][idx1], history[i][idx2])
/\ idx1 < idx2
\* Primary integrity: If primary p broadcasts a and some follower f delivers b such that b has epoch
\* smaller than epoch of p, then p must deliver b before it broadcasts a.
PrimaryIntegrity == \A i, j \in Server: /\ IsLeader(i) /\ IsMyLearner(i, j)
/\ IsFollower(j) /\ IsMyLeader(j, i)
/\ zabState[i] = BROADCAST
/\ zabState[j] = BROADCAST
/\ lastCommitted[j].index >= 1
=> \A idx_j \in 1..lastCommitted[j].index:
\/ history[j][idx_j].zxid[1] >= currentEpoch[i]
\/ /\ history[j][idx_j].zxid[1] < currentEpoch[i]
/\ \E idx_i \in 1..lastCommitted[i].index:
TxnEqual(history[i][idx_i], history[j][idx_j])
=============================================================================
\* Modification History
\* Last modified Tue Jan 17 21:21:28 CST 2023 by huangbinyu
\* Last modified Mon Nov 22 22:25:23 CST 2021 by Dell
\* Created Sat Oct 23 16:05:04 CST 2021 by Dell