Consensus Module
The Consensus Module contains a ConsensusModule class, which is used to configure and create a ConsensusModuleAgent. Once startup has completed, the ConsensusModule doesn't do anything other than hold a reference to the ConsensusModuleAgent to close it on shutdown. It's the ConsensusModuleAgent (an Agrona Agent) that performs all the interesting consensus work.
Consensus Module Configuration¶
The ConsensusModule class contains:
- a Configuration inner class that contains static methods for reading System properties for configuring the cluster, along with default values (fairly chunky, at 1000+ lines of code)
- a Context inner class, which is a programmable configuration object (huge, 3000+ lines) that is passed into the ConsensusModule. You create an instance of the Context, which defaults a lot of its values from the Configuration class. You can then programmatically override values on the Context object before passing it into the ConsensusModule
There isn't much else of interest in the ConsensusModule class, other than a
launch(Context)
method and a
constructor.
ConsensusModule.launch()¶
The static ConsensusModule.launch(Context)
method is used to construct a ConsensusModule. It passes the Context to
the ConsensusModule constructor, which 'concludes' the Context, then passes the concluded Context to the
ConsensusModuleAgent constructor. Once that has completed, the launch
method checks if the ConsensusModule created
an AgentRunner (as opposed to an AgentInvoker). If it did, the launch
method starts a thread for it. launch
returns the ConsensusModule.
At this point, the Consensus Module is 'running'. It has a ConsensusModuleAgent with a duty cycle that is invoked either by an AgentRunner on its own thread, or an AgentInvoker from another thread.
The ConsensusModule object's role is pretty much complete at this stage. It keeps a reference to the AgentRunner (or AgentInvoker). When the ConsensusModule is closed (on shutdown), it closes the AgentRunner/Invoker, which in turn closes the ConsensusModuleAgent. The ConsensusModuleAgent performs all the consensus work, so we'll focus on that from this point, and won't show the ConsensusModule object on diagrams below.
The following sections look at the above process in more detail.
Context.conclude()¶
Concluding the Context is done using
Context.conclude()
,
which can only be called once. This causes it to build and initialise several objects based on the configuration,
ready for use by the ConsensusModuleAgent. It does the following:
- creates or opens the Consensus Module's cluster mark file, wrapping it in a ClusterMarkFile object. That checks if the file already exists and contains a timestamp that has been updated within the last 10 seconds. That would indicate a ConsensusModuleAgent is already running (using the same cluster directory), in which case, an exception would be thrown, which would stop the creation of this Consensus Module
- creates or opens the node-state.dat file, wrapping it in a NodeStateFile object
- creates or opens the recording log file, wrapping it in a RecordingLog object, which reads the contents into memory
- creates an Aeron Transport client, which connects to the Media Driver. Note that the Context configures the Transport Client to run its Client Conductor in invoker mode. It will be invoked by the ConsensusModuleAgent as part of its duty cycle, rather than using an additional thread
- creates lots of Counters, including the commit-pos Counter that the Clustered Service also uses (see Counters)
- creates a DutyCycleStallTracker and a SnapshotDurationTracker - these update Counters if the duty cycle isn't invoked frequently enough, or a snapshot takes too long
- creates an Archive Context - the configuration for the Archive Client
- creates a LogPublisher - this is used to publish messages to the log, but only if the cluster member becomes the leader
- creates an EgressPublisher - this is stateless and used to encode and send messages to a ClusterSession (client session)
- concludes the mark file - once it is running, the ConsensusModuleAgent will update the timestamp in the mark file once per second to stop another Consensus Module from starting. A lot of information is written into the mark file, such as archiveStreamId, serviceStreamId, memberId and clusterId
There is still a lot to create, but after concluding the Context, the Consensus Module looks something like this:
ConsensusModuleAgent constructor¶
The ConsensusModuleAgent constructor takes the concluded Context and does the following:
- populates itself from a lot of the values in the Context
- initialises to the FOLLOWER role
- reads the Cluster Member Endpoint configuration for each member, including itself, into an array of ClusterMember objects (see below)
- creates Adapter objects for the Consensus, Ingress, Log and ConsensusModule streams. The Adapters are used to decode inbound messages from a Subscription and turn them into method calls on the ConsensusModuleAgent
- creates Subscriptions for the Consensus and ConsensusModule streams (Subscriptions for the other Adapters are addressed later). There are no log buffers on the diagram for the Subscriptions because they aren't created until a Publication connects
- creates a ServiceProxy object for the Service output stream, for sending messages to the Clustered Service
- creates an IPC Publication for the Service stream (this creates a log buffer, as it's a Publication)
- creates a PendingServiceMessageTracker for each Clustered Service (see below)
- creates a ConsensusPublisher - this is stateless and used to encode and send messages to a ConsensusPublication
Once the ConsensusModuleAgent has been created, it is driven from outside by the AgentRunner or an AgentInvoker, which
call its onStart()
method once, then its doWork()
method repeatedly to perform its duty cycle. More objects are
created in onStart()
and in the Election, but at this point, the Consensus Module looks something like this:
ClusterMember¶
The ClusterMember objects are where the member stores information about all the cluster members, including itself. Each one contains:
- all the endpoints for the member
- member state, such as its leadershipTermId, candidateTermId, whether it is the leader, etc. The state that a member stores about the others are from its viewpoint and can become out of date
- how each member voted for this member during the current candidate leadership term in an Election
- the leader stores each member's logPosition. The leader updates its own ClusterMember's logPosition from the AppendPosition counter updated by its Archive. Each time it receives an AppendPosition message from a follower, it updates the logPosition in the follower's ClusterMember
- the Consensus Publication object, for sending Consensus messages to the member
Calculations can be made across the set of ClusterMembers, for example:
- during an election, a candidate determines whether it has won an Election by looking at the vote stored in each of the ClusterMember objects. It uses isUnanimousLeader() and isQuorumLeader()
- the leader calculates the quorumPosition (commit position) using the logPosition in each ClusterMember
PendingServiceMessageTracker¶
While processing Log messages, the Clustered Service can ask the Consensus Module to append a new message to the Log. The message will be appended to the end, behind any current Log messages, and will go through the same Consensus process as messages from a Client's Ingress. If you have multiple ClusteredServices, this could be used to communicate between them.
The ClusteredService does this by calling one of the
offer()
or tryClaim()
methods on the Cluster interface, which is implemented by the
ClusteredServiceAgent.
The ClusteredServiceAgent sends the message to the ConsensusModuleAgent via the ConsensusModule IPC stream, and the
ConsensusModuleAgent appends it to the Log.
However, there is an edge case where a message can be added to the Log and then a Snapshot is taken before the message reaches the Committed state. This can cause the uncommitted message to be lost. For this reason, messages from the ConsensusModule stream are added to the PendingServiceMessageTracker (there is one per ClusteredService), which holds them until they are Committed. If a Snapshot is taken, the PendingServiceMessageTrackers are included in it, so uncommitted messages are not lost.
This is described in more detail in Consensus Module State.
ConsensusModuleAgent.onStart()¶
The ConsensusModuleAgent's
onStart()
method does the following:
- connects to Aeron Archive. Connecting to it creates the Control Request log buffer. When the Archive connects back to the Consensus Module, that creates the Control Response log buffer.
- replicates standby snapshots (an Aeron Premium feature). If there are snapshots on a standby cluster node, the latest snapshot is replicated to the local cluster node's Aeron Archive, from where it can be loaded
- asks the RecordingLog to create a RecoveryPlan, using information from Archive (more details below). This contains information about how to rebuild the cluster state, such as the Log Recording id, and which snapshot (if any) should be loaded
- creates a temporary RecoveryState counter that is used to pass recovery information
(which snapshot Recordings to load) to the Clustered Service(s). While this exists:
- if the RecoveryPlan contains snapshot information, the ConsensusModuleAgent loads its own snapshot, then sets the CommitPosition counter to the snapshot logPosition. If there is no snapshot, the CommitPosition counter will be zero. This is where the log needs to be processed from. The contents of the snapshot is described later in State and Snapshots
- the ConsensusModuleAgent waits for the Clustered Service(s) to send a ServiceAck message to say that it has reached the CommitPosition (i.e. it has loaded its Snapshot, if there was one)
- creates a Consensus Publication for each ClusterMember other than itself. Each Publication is owned by its ClusterMember object and is passed to the ConsensusPublisher object to encode and send messages
- creates an Election object, passing in info from the RecoveryPlan
- changes state to ConsensusModule.State.ACTIVE
The Consensus Module is almost complete, but it still hasn't participated in an Election. The Election adds more Subscriptions and Publications for the remaining Adapters and Publishers.
When the ConsensusModuleAgent starts its duty cycle, it will delegate to the Election object until the Election is
complete. Once complete, it will discard the Election object and set its election
field to null. By the end of the
election, each member will have created an Ingress Subscription for the IngressAdapter, the followers will have created
a Log Subscription for the LogAdapter, and the leader will have created an MDC Publication for the LogPublisher.
When a client connects, the member it connected to will create an Egress Publication for the EgressPublisher.
RecoveryPlan¶
A RecoveryPlan contains information that the Consensus Module uses to recover cluster state. It contains information about the Log, and the latest Snapshot (if there is one). Note that a Snapshot consists of more than one Recording in the Archive, as the Consensus Module takes a snapshot of its state, separate from the Clustered Service(s). The RecoveryPlan contains a list of 'Snapshots', but these refer to the snapshots of the different components, i.e. the different Recordings for the latest Snapshot.
The RecoveryPlan is built by a RecordingLog object. The recording.log file is where the Consensus Module keeps track of what it has stored in each Recording in the Archive. One Recording will be for the Log and any others will be for Snapshots. The RecordingLog builds a RecoveryPlan by finding the latest Snapshot and merging the Log and Snapshot information with information from the Archive's Catalog (archive.catalog).
At the time of writing (Aeron 1.47.0), a RecoveryPlan contains the following information:
RecoveryPlan
long lastLeadershipTermId most recent leadership term in the Log
long lastTermBaseLogPosition the position at the start of that leadership term
long appendedLogPosition last position written to the Log
long committedLogPosition last known commit (consensus) position
List<Snapshot> most recent snapshots for the consensus module and services
long recordingId to identify the Recording in the Archive
long leadershipTermId in which the snapshot was taken
long termBaseLogPosition position of the log over leadership terms at the beginning of this term
long logPosition position reached when the entry was snapshot was taken
long timestamp when the snapshot was taken
int serviceId of the service that was snapshotted
Log appended local log details
long recordingId for the recording in an archive
long leadershipTermId identity for the leadership term
long termBaseLogPosition log position at the base of the leadership term
long logPosition position the log has reached for the term
long startPosition of the recording captured in the archive
long stopPosition of the recording captured in the archive
int initialTermId of the stream captured for the recording
int termBufferLength of the stream captured for the recording
int mtuLength of the stream captured for the recording
int sessionId of the stream captured for the recording
RecoveryState Counter¶
The RecoveryState Counter is a temporary Counter that is created by the ConsensusModuleAgent to pass recovery information to the Clustered Service(s). Once the ConsensusModuleAgent and Clustered Service(s) have loaded their Snapshots, the Counter is deleted.
It contains the following fields:
- Leadership Term ID
- Log position for Snapshot
- Timestamp at beginning of Recovery
- Cluster ID
- Count of Services
- Snapshot Recording ID (Service ID 0..n)
Final Structure¶
We haven't discussed Elections or Client connections yet, but in order to keep the diagram on the same page so you can compare it with those above, here is what the Consensus Module looks like when it's in full flight, having completed an Election and accepted some Client connections.
After the Election:
- the Election object is no longer present
- the RecoveryPlan sticks around, but isn't used much
- each member has a Consensus connection from the others, so there is a consensus log buffer for each
- each member subscribes to the Ingress, so it has an Ingress Subscription
- when a client connects to the leader, it creates a ClusterSession for it (discussed later), it has an Ingress log buffer for the connection, and an Egress Publication and Egress log buffer
- the leader has a Log Publication and a single Log log buffer (it uses Multi-Destination Cast to send from one log buffer to multiple followers)
- the Clustered Service has started and created a Publication for the ConsensusModule stream, so there is a log buffer for the Consensus Module Subscription
- followers have a Log Subscription, which has a Log log buffer (click the tab below the diagram)
The leader has a Log Publication for publishing Log messages, and Egress Publications for publishing Egress messages to any connected clients. It has a LogAdapter, but it isn't used as it doesn't subscribe to Log messages.
The follower has a Log Subscription for its LogAdapter, which it uses to receive Log messages from the leader.