Leader Replay
LEADER_REPLAY - leader replays any unprocessed entries from the end of its Log to itself, ensuring the whole Log has been processed
On Entry¶
Enters from LEADER_LOG_REPLICATION. On entry, the majority of the cluster members have a Log Recording matching the leader's, so all entries in the Recording are 'Committed', and are safe to process. There may be a minority of the followers still replicating the Log, or they may not even be running.
Description¶
In this state, the leader replays any Log entries it hasn't yet processed, up to the end of the Log Recording.
On entering the state, the leader's logPosition (where it has processed up to) is either the snapshot position, if it loaded a snapshot, or zero. Its appendPosition is at the end of the Log Recording. The logPosition may equal the appendPosition, e.g. if there was no snapshot and no Log, or if the cluster shut down after taking a snapshot, so there were no new Log entries beyond the snapshot position.
If logPosition is earlier than appendPosition, the Log will need replaying from logPosition to appendPosition. Log replay is described below. Once replayed, or if logPosition is already at appendPosition, it moves to LEADER_INIT.
Log Replay¶
Replaying the Log means using Archive Replay to replay the unprocessed part of the Log Recording into the Consensus Module and Clustered Service, so they can process it. This is done via a private (local IPC) channel, not the Log Publication - it doesn't need publishing to the followers or recording by the Archive.
When replaying the Log, there will be some interactions between the Consensus Module and the Clustered Service, which are on different threads. Both the Consensus Module and Clustered Service need to process the Log. The Clustered Service obviously needs to process the business messages from clients, but the Consensus Module also processes the Log, as it maintains some state like client sessions that needs to be rebuilt.
Log Replay - Consensus Module thread¶
The Election object starts by setting up the Archive Replay:
- it creates a LogReplay object, which coordinates the replay. It gives it the logPosition (to start from), appendPosition (to end at), a client for its local Archive, the id of the Log Recording, and the LogAdapter, which is the message handler for replayed messages
- LogReplay asks the Archive to start replaying by sending it a ReplayRequest,
passing in the ConsensusModule's replayChannel (default:
aeron:ipc
) and replayStreamId (default: 103). This is where the Replay Publication will publish to, which the LogReplay will subscribe to - the Archive starts a ReplaySession, which creates a Publication to replay to the replayChannel and replayStreamId. The Archive client returns the sessionId of the Publication, which LogReplay uses as its logSessionId
- LogReplay adds a Subscription for the replayChannel, replayStreamId and logSessionId, subscribing to the replay
It then resets the logPosition of each ClusterMember to NULL_POSITION. Later, in LEADER_READY, it will wait for all the cluster members to catch up, which means it will need to have received at least one AppendPosition from each follower.
Once the replay has been set up, the Election object repeatedly invokes the LogReplay, which:
- checks if the ReplaySession has connected to the Subscription. When it has, the Subscription will have an Image with logSessionId
- when it gets the Image, it:
- gives it to the ConsensusModuleAgent's LogAdapter, which is responsible for polling the Image and replaying messages into the ConsensusModuleAgent
- asks the ConsensusModuleAgent to:
- send a JoinLog message to the Clustered Service, passing the replayChannel, replayStreamId and logSessionId, which asks it to also subscribe to the ReplaySession. The message includes the startPosition and stopPosition of the replay
- wait for the Clustered Service to reply with a ServiceAck, to say that its Subscription has connected, and it's at the startPosition. Without this wait, if the Consensus Module processed the replay quickly and disconnected from the Replay Publication, the Publication could close before the Clustered Service has created its Subscription to it
- once all of the above has been set up, LogReplay asks the ConsensusModuleAgent to poll for replay messages.
It polls the LogAdapter, which polls the Image. The LogAdapter reads the messages and calls into the
ConsensusModuleAgent on the
onReplayXXX()
methods likeonReplaySessionOpen()
. Once the ConsensusModuleAgent has polled the LogAdapter, it updates the commit-pos Counter to the new Log position, which has the effect of gating its ClusteredService so it doesn't get ahead
After each invocation, the Election object checks if the LogReplay is done. It is done when the LogAdapter reaches the stopPosition in the replay (which was set to the appendPosition). When it is done, the Election object:
- closes the LogReplay, which asks the LogAdapter to disconnect from the Image. The LogAdapter sets its logPosition to the Image's position, then closes the Image's Subscription (the ClusteredService has its own Subscription, which remains open)
- sets its logPosition to appendPosition
- moves to LEADER_INIT
After doing all of the above, in each duty cycle, the Election object also publishes:
- a NewLeadershipTerm message, on an interval
- a CommitPosition message using the quorum position, whenever it changes, or on an interval. Note the quorum position is calculated using the logPosition of each member (stored in the ClusterMember objects), not the commit-pos Counter. For the leader, this was already set to the appendPosition. For followers, it will be set to their Log Recording end position when they send an AppendPosition message. A majority of the members already have a Log Recording matching the leader's, so quorumPosition will be the position at the end of the leader's Log Recording, once the followers have sent their AppendPosition messages
Log Replay - Clustered Service thread¶
In the middle of all of the above, the Clustered Service was asked to join the same Log Replay. It is on a different thread and does this in parallel:
- the ClusteredServiceAgent has a ServiceAdapter
for reading from the Service stream, which it polls
- the ServiceAdapter reads the JoinLog message and calls back into the ClusteredServiceAgent
- the ClusteredServiceAgent sets its BoundedLogAdapter's maxPosition to the
logPosition
(start position) in the JoinLog message, to make sure any existing processing stops where the JoinLog starts - the ClusteredServiceAgent then captures all the values from the JoinLog message in an ActiveLogEvent object, which it stores in a field for later
- After the ClusteredServiceAgent has polled its ServiceAdapter, it checks for an ActiveLogEvent
- it finds it and checks that its BoundedLogAdapter does not currently have an image (isn't processing anything)
- it doesn't have an image, so it joins the active Log using the info in the ActiveLogEvent
- it creates a Subscription for the channel and streamId in the activeLogEvent (the replayChannel and replayStreamId), then waits for the Image with the logSessionId to be connected
- it gives the Image to its BoundedLogAdapter and sets the BoundedLogAdapter's maxLogPosition to the ActiveLogEvent.maxLogPosition (the stop position of the Log Recording)
- it sends a ServiceAck to the ConsensusModule to say it's connected and at the ActiveEventLog.logPosition (start)
- the JoinLog contains the cluster role, which for Log Replay is hardcoded to FOLLOWER. This stops the leader Clustered Service from connecting any of the Egress sessions
The Clustered Service will process the Log from the BoundedLogAdapter during its duty cycle. It does this in its own time, and it isn't synchronised with the Consensus Module.
Replay Publication¶
As we've seen above, the Replay Publication in Aeron Archive has 2 Subscriptions. Once both Subscriptions have reached the end of the Replay, the ReplaySession will see that it has published up to the end position, then closes.
On Exit¶
By the end of this state, the leader Consensus Module will have replayed (processed) up to the end of its Log Recording, and the Clustered Service, which is running on its own thread, will have started doing the same (it carries on in the background). The commit-pos Counter is set to the position at the end of the Recording.
The member moves to LEADER_INIT.