Skip to content

Consensus Module

The Consensus Module contains a ConsensusModule class, which is used to configure and create a ConsensusModuleAgent. Once startup has completed, the ConsensusModule doesn't do anything other than hold a reference to the ConsensusModuleAgent to close it on shutdown. It's the ConsensusModuleAgent (an Agrona Agent) that performs all the interesting consensus work.

Consensus Module Configuration

The ConsensusModule class contains:

  • a Configuration inner class that contains static methods for reading System properties for configuring the cluster, along with default values (fairly chunky, at 1000+ lines of code)
  • a Context inner class, which is a programmable configuration object (huge, 3000+ lines) that is passed into the ConsensusModule. You create an instance of the Context, which defaults a lot of its values from the Configuration class. You can then programmatically override values on the Context object before passing it into the ConsensusModule

There isn't much else of interest in the ConsensusModule class, other than a launch(Context) method and a constructor.

ConsensusModule.launch()

The static ConsensusModule.launch(Context) method is used to construct a ConsensusModule. It passes the Context to the ConsensusModule constructor, which 'concludes' the Context, then passes the concluded Context to the ConsensusModuleAgent constructor. Once that has completed, the launch method checks if the ConsensusModule created an AgentRunner (as opposed to an AgentInvoker). If it did, the launch method starts a thread for it. launch returns the ConsensusModule.

At this point, the Consensus Module is 'running'. It has a ConsensusModuleAgent with a duty cycle that is invoked either by an AgentRunner on its own thread, or an AgentInvoker from another thread.

ConsensusModule Context ConsensusModuleAgent Context (concluded) doWork() AgentRunner creates creates creates thread for launch(Context) Consensus Module

The ConsensusModule object's role is pretty much complete at this stage. It keeps a reference to the AgentRunner (or AgentInvoker). When the ConsensusModule is closed (on shutdown), it closes the AgentRunner/Invoker, which in turn closes the ConsensusModuleAgent. The ConsensusModuleAgent performs all the consensus work, so we'll focus on that from this point, and won't show the ConsensusModule object on diagrams below.

The following sections look at the above process in more detail.

Context.conclude()

Concluding the Context is done using Context.conclude(), which can only be called once. This causes it to build and initialise several objects based on the configuration, ready for use by the ConsensusModuleAgent. It does the following:

  • creates or opens the Consensus Module's cluster mark file, wrapping it in a ClusterMarkFile object. That checks if the file already exists and contains a timestamp that has been updated within the last 10 seconds. That would indicate a ConsensusModuleAgent is already running (using the same cluster directory), in which case, an exception would be thrown, which would stop the creation of this Consensus Module
  • creates or opens the node-state.dat file, wrapping it in a NodeStateFile object
  • creates or opens the recording log file, wrapping it in a RecordingLog object, which reads the contents into memory
  • creates an Aeron Transport client, which connects to the Media Driver. Note that the Context configures the Transport Client to run its Client Conductor in invoker mode. It will be invoked by the ConsensusModuleAgent as part of its duty cycle, rather than using an additional thread
  • creates lots of Counters, including the commit-pos Counter that the Clustered Service also uses (see Counters)
  • creates a DutyCycleStallTracker and a SnapshotDurationTracker - these update Counters if the duty cycle isn't invoked frequently enough, or a snapshot takes too long
  • creates an Archive Context - the configuration for the Archive Client
  • creates a LogPublisher - this is used to publish messages to the log, but only if the cluster member becomes the leader
  • creates an EgressPublisher - this is stateless and used to encode and send messages to a ClusterSession (client session)
  • concludes the mark file - once it is running, the ConsensusModuleAgent will update the timestamp in the mark file once per second to stop another Consensus Module from starting. A lot of information is written into the mark file, such as archiveStreamId, serviceStreamId, memberId and clusterId

There is still a lot to create, but after concluding the Context, the Consensus Module looks something like this:

<counters> cnc.dat Cluster.Context creates creates creates Archive.Context EgressPublisher LogPublisher Transport Client ClusterMarkFile RecordingLog NodeStateFile Consensus Module cluster-mark.dat recording.log node-state.dat

ConsensusModuleAgent constructor

The ConsensusModuleAgent constructor takes the concluded Context and does the following:

  • populates itself from a lot of the values in the Context
  • initialises to the FOLLOWER role
  • reads the Cluster Member Endpoint configuration for each member, including itself, into an array of ClusterMember objects (see below)
  • creates Adapter objects for the Consensus, Ingress, Log and ConsensusModule streams. The Adapters are used to decode inbound messages from a Subscription and turn them into method calls on the ConsensusModuleAgent
  • creates Subscriptions for the Consensus and ConsensusModule streams (Subscriptions for the other Adapters are addressed later). There are no log buffers on the diagram for the Subscriptions because they aren't created until a Publication connects
  • creates a ServiceProxy object for the Service output stream, for sending messages to the Clustered Service
  • creates an IPC Publication for the Service stream (this creates a log buffer, as it's a Publication)
  • creates a PendingServiceMessageTracker for each Clustered Service (see below)
  • creates a ConsensusPublisher - this is stateless and used to encode and send messages to a ConsensusPublication

Once the ConsensusModuleAgent has been created, it is driven from outside by the AgentRunner or an AgentInvoker, which call its onStart() method once, then its doWork() method repeatedly to perform its duty cycle. More objects are created in onStart() and in the Election, but at this point, the Consensus Module looks something like this:

<counters> cnc.dat Archive.Context ServiceProxy ConsensusPublisher Consensus Subscription Consensus Module Subscription ConsensusModuleAdapter ConsensusAdapter IngressAdapter LogAdapter ConsensusModuleAgent Service Publication ClusterMember service PendingService MessageTracker EgressPublisher LogPublisher Transport Client ClusterMarkFile RecordingLog NodeStateFile Consensus Module cluster-mark.dat recording.log node-state.dat

ClusterMember

The ClusterMember objects are where the member stores information about all the cluster members, including itself. Each one contains:

  • all the endpoints for the member
  • member state, such as its leadershipTermId, candidateTermId, whether it is the leader, etc. The state that a member stores about the others are from its viewpoint and can become out of date
  • how each member voted for this member during the current candidate leadership term in an Election
  • the leader stores each member's logPosition. The leader updates its own ClusterMember's logPosition from the AppendPosition counter updated by its Archive. Each time it receives an AppendPosition message from a follower, it updates the logPosition in the follower's ClusterMember
  • the Consensus Publication object, for sending Consensus messages to the member

Calculations can be made across the set of ClusterMembers, for example:

  • during an election, a candidate determines whether it has won an Election by looking at the vote stored in each of the ClusterMember objects. It uses isUnanimousLeader() and isQuorumLeader()
  • the leader calculates the quorumPosition (commit position) using the logPosition in each ClusterMember

PendingServiceMessageTracker

While processing Log messages, the Clustered Service can ask the Consensus Module to append a new message to the Log. The message will be appended to the end, behind any current Log messages, and will go through the same Consensus process as messages from a Client's Ingress. If you have multiple ClusteredServices, this could be used to communicate between them.

The ClusteredService does this by calling one of the offer() or tryClaim() methods on the Cluster interface, which is implemented by the ClusteredServiceAgent. The ClusteredServiceAgent sends the message to the ConsensusModuleAgent via the ConsensusModule IPC stream, and the ConsensusModuleAgent appends it to the Log.

However, there is an edge case where a message can be added to the Log and then a Snapshot is taken before the message reaches the Committed state. This can cause the uncommitted message to be lost. For this reason, messages from the ConsensusModule stream are added to the PendingServiceMessageTracker (there is one per ClusteredService), which holds them until they are Committed. If a Snapshot is taken, the PendingServiceMessageTrackers are included in it, so uncommitted messages are not lost.

This is described in more detail in Consensus Module State.

ConsensusModuleAgent.onStart()

The ConsensusModuleAgent's onStart() method does the following:

  • connects to Aeron Archive. Connecting to it creates the Control Request log buffer. When the Archive connects back to the Consensus Module, that creates the Control Response log buffer.
  • replicates standby snapshots (an Aeron Premium feature). If there are snapshots on a standby cluster node, the latest snapshot is replicated to the local cluster node's Aeron Archive, from where it can be loaded
  • asks the RecordingLog to create a RecoveryPlan, using information from Archive (more details below). This contains information about how to rebuild the cluster state, such as the Log Recording id, and which snapshot (if any) should be loaded
  • creates a temporary RecoveryState counter that is used to pass recovery information (which snapshot Recordings to load) to the Clustered Service(s). While this exists:
    • if the RecoveryPlan contains snapshot information, the ConsensusModuleAgent loads its own snapshot, then sets the CommitPosition counter to the snapshot logPosition. If there is no snapshot, the CommitPosition counter will be zero. This is where the log needs to be processed from. The contents of the snapshot is described later in State and Snapshots
    • the ConsensusModuleAgent waits for the Clustered Service(s) to send a ServiceAck message to say that it has reached the CommitPosition (i.e. it has loaded its Snapshot, if there was one)
  • creates a Consensus Publication for each ClusterMember other than itself. Each Publication is owned by its ClusterMember object and is passed to the ConsensusPublisher object to encode and send messages
  • creates an Election object, passing in info from the RecoveryPlan
  • changes state to ConsensusModule.State.ACTIVE

The Consensus Module is almost complete, but it still hasn't participated in an Election. The Election adds more Subscriptions and Publications for the remaining Adapters and Publishers.

<counters> cnc.dat Election RecoveryState Consensus Publications Archive Client ctrl-request Publication ctrl-response Subscription ctrl-request ctrl-response consensus RecoveryPlan ServiceProxy ConsensusPublisher Consensus Subscription Consensus Module Subscription ConsensusModuleAdapter ConsensusAdapter IngressAdapter LogAdapter ConsensusModuleAgent Service Publication ClusterMember service PendingService MessageTracker EgressPublisher LogPublisher Transport Client ClusterMarkFile RecordingLog NodeStateFile Consensus Module cluster-mark.dat recording.log node-state.dat

When the ConsensusModuleAgent starts its duty cycle, it will delegate to the Election object until the Election is complete. Once complete, it will discard the Election object and set its election field to null. By the end of the election, each member will have created an Ingress Subscription for the IngressAdapter, the followers will have created a Log Subscription for the LogAdapter, and the leader will have created an MDC Publication for the LogPublisher.

When a client connects, the member it connected to will create an Egress Publication for the EgressPublisher.

RecoveryPlan

A RecoveryPlan contains information that the Consensus Module uses to recover cluster state. It contains information about the Log, and the latest Snapshot (if there is one). Note that a Snapshot consists of more than one Recording in the Archive, as the Consensus Module takes a snapshot of its state, separate from the Clustered Service(s). The RecoveryPlan contains a list of 'Snapshots', but these refer to the snapshots of the different components, i.e. the different Recordings for the latest Snapshot.

The RecoveryPlan is built by a RecordingLog object. The recording.log file is where the Consensus Module keeps track of what it has stored in each Recording in the Archive. One Recording will be for the Log and any others will be for Snapshots. The RecordingLog builds a RecoveryPlan by finding the latest Snapshot and merging the Log and Snapshot information with information from the Archive's Catalog (archive.catalog).

At the time of writing (Aeron 1.47.0), a RecoveryPlan contains the following information:

RecoveryPlan
    long lastLeadershipTermId       most recent leadership term in the Log
    long lastTermBaseLogPosition    the position at the start of that leadership term
    long appendedLogPosition        last position written to the Log
    long committedLogPosition       last known commit (consensus) position
    List<Snapshot>                  most recent snapshots for the consensus module and services
        long recordingId            to identify the Recording in the Archive
        long leadershipTermId       in which the snapshot was taken
        long termBaseLogPosition    position of the log over leadership terms at the beginning of this term
        long logPosition            position reached when the entry was snapshot was taken
        long timestamp              when the snapshot was taken
        int serviceId               of the service that was snapshotted
    Log                             appended local log details
        long recordingId            for the recording in an archive
        long leadershipTermId       identity for the leadership term
        long termBaseLogPosition    log position at the base of the leadership term
        long logPosition            position the log has reached for the term
        long startPosition          of the recording captured in the archive
        long stopPosition           of the recording captured in the archive
        int initialTermId           of the stream captured for the recording
        int termBufferLength        of the stream captured for the recording
        int mtuLength               of the stream captured for the recording
        int sessionId               of the stream captured for the recording

RecoveryState Counter

The RecoveryState Counter is a temporary Counter that is created by the ConsensusModuleAgent to pass recovery information to the Clustered Service(s). Once the ConsensusModuleAgent and Clustered Service(s) have loaded their Snapshots, the Counter is deleted.

It contains the following fields:

  • Leadership Term ID
  • Log position for Snapshot
  • Timestamp at beginning of Recovery
  • Cluster ID
  • Count of Services
  • Snapshot Recording ID (Service ID 0..n)

Final Structure

We haven't discussed Elections or Client connections yet, but in order to keep the diagram on the same page so you can compare it with those above, here is what the Consensus Module looks like when it's in full flight, having completed an Election and accepted some Client connections.

After the Election:

  • the Election object is no longer present
  • the RecoveryPlan sticks around, but isn't used much
  • each member has a Consensus connection from the others, so there is a consensus log buffer for each
  • each member subscribes to the Ingress, so it has an Ingress Subscription
  • when a client connects to the leader, it creates a ClusterSession for it (discussed later), it has an Ingress log buffer for the connection, and an Egress Publication and Egress log buffer
  • the leader has a Log Publication and a single Log log buffer (it uses Multi-Destination Cast to send from one log buffer to multiple followers)
  • the Clustered Service has started and created a Publication for the ConsensusModule stream, so there is a log buffer for the Consensus Module Subscription
  • followers have a Log Subscription, which has a Log log buffer (click the tab below the diagram)

<counters> cnc.dat ClusterSession ingress log Log Subscription Consensus Module (follower) egress Egress Publications log Log Publication Consensus Module (leader) consensus Ingress Subscription consensus module Consensus Publications Archive Client ctrl-request Publication ctrl-response Subscription ctrl-request ctrl-response consensus RecoveryPlan ServiceProxy ConsensusPublisher Consensus Subscription Consensus Module Subscription ConsensusModuleAdapter ConsensusAdapter IngressAdapter LogAdapter ConsensusModuleAgent Service Publication ClusterMember service PendingService MessageTracker EgressPublisher LogPublisher Transport Client ClusterMarkFile RecordingLog NodeStateFile Consensus Module cluster-mark.dat recording.log node-state.dat

The leader has a Log Publication for publishing Log messages, and Egress Publications for publishing Egress messages to any connected clients. It has a LogAdapter, but it isn't used as it doesn't subscribe to Log messages.

The follower has a Log Subscription for its LogAdapter, which it uses to receive Log messages from the leader.