Clustered Service
In Aeron Cluster, your application code (the Raft replicated state machine) runs in a Clustered Service, which also contains some Aeron Cluster code to integrate it with the rest of the Cluster. In order for Aeron code to talk to your code, it needs to implement the ClusteredService interface.
Note
I'm going to refer to your application code as the "ClusteredService", after the interface, and use "Clustered Service" to refer to the overall component that includes your ClusteredService and the supporting Aeron code. This should be clearer in the diagrams below.
The ClusteredService is managed by a ClusteredServiceAgent, which always runs on its own thread, and is responsible for passing Log messages into the ClusteredService. If you want to run more than one application as a Clustered Service, they would each have their own ClusteredServiceAgent and would run independently on different threads, each consuming the same messages from the Log, possibly at different rates.
The creation of the Clustered Service follows the same overall pattern as the Consensus Module.
The Clustered Service contains a ClusteredServiceContainer class, which is used to configure and create a ClusteredServiceAgent. Once startup has completed, the ClusteredServiceContainer doesn't do anything other than hold a reference to the ClusteredServiceAgent to close it on shutdown. It's the ClusteredServiceAgent (an Agrona Agent) that performs all the interesting work, bridging between the Cluster and your application.
Clustered Service Configuration¶
The ClusteredServiceContainer class contains:
- a Configuration inner class that contains static methods for reading System properties for configuring the clustered service, along with default values (fairly chunky, at 550+ lines of code)
- a Context inner class, which is a programmable configuration object (larger, at 1500+ lines) that is passed into the ClusteredServiceContainer. You create an instance of the Context, which defaults a lot of its values from the Configuration class. You can then programmatically override values on the Context object before passing it into the ClusteredServiceContainer
Your application is passed to the ClusteredServiceAgent by setting it on the Context as a ClusteredService.
There isn't much else of interest in the ClusteredServiceContainer class, other than a
launch(Context)
method and a
constructor.
ClusteredServiceContainer.launch()¶
The static ClusteredServiceContainer.launch(Context)
method is used to construct a ClusteredServiceContainer. If
you have more than one application, you would call this once for each of them, using a different Context for each. The
launch
method passes the Context to the ClusteredServiceContainer constructor, which 'concludes' the Context, then
passes the concluded Context to the ClusteredServiceAgent constructor. The Clustered Service always runs on its own
thread, so the ClusteredServiceContainer constructor always creates an AgentRunner for the ClusteredServiceAgent
(unlike the ConsensusModule, which can be configured to use an AgentInvoker). The launch
method starts a thread
for the AgentRunner, then returns the ClusteredServiceContainer.
At this point, the Clustered Service is 'running'. It has a ClusteredServiceAgent with a duty cycle that is invoked by the AgentRunner on its own thread.
The ClusteredServiceContainer object's role is pretty much complete at this stage. It keeps a reference to the AgentRunner. When the ClusteredServiceContainer is closed (on shutdown), it closes the AgentRunner, which in turn closes the ClusteredServiceAgent. The ClusteredServiceAgent performs all the application related work, so we'll focus on that from this point, and won't show the ClusteredServiceContainer object on diagrams below.
The following sections look at the above process in more detail.
Context.conclude()¶
Concluding the Context is done using
Context.conclude()
,
which can only be called once. This causes it to build and initialise several objects based on the configuration,
ready for use by the ClusteredServiceAgent. It does the following:
- creates or opens the Clustered Service's cluster mark file, wrapping it in a ClusterMarkFile object. That checks if the file already exists and contains a timestamp that has been updated within the last 10 seconds. That would indicate a ClusteredServiceAgent is already running for this Clustered Service (using the same cluster directory), in which case, an exception would be thrown, which would stop the creation of this Clustered Service
- creates an Aeron Transport client, which connects to the Media Driver. Unless configured otherwise, the Transport Client created by the Context uses an AgentRunner to run its Client Conductor, so it starts a new thread
- creates several Counters (see Counters)
- creates a DutyCycleStallTracker and a SnapshotDurationTracker - these update Counters if the duty cycle isn't invoked frequently enough, or a snapshot takes too long
- creates an Archive Context - the configuration for the Archive Client. The Clustered Service doesn't stay connected to the Archive like the Consensus Module. It only connects to take or load snapshots, then disconnects
- concludes the mark file - once it is running, the ClusteredServiceAgent will update the timestamp in the mark file once per second to stop another ClusteredServiceAgent from starting for this Clustered Service. A lot of information is written into the mark file, such as archiveStreamId, serviceStreamId, memberId, clusterId and serviceId
There is still a lot to create, but after concluding the Context, the Clustered Service looks something like this:
ClusteredServiceAgent constructor¶
The ClusteredServiceAgent constructor takes the concluded Context and does the following:
- populates itself from a lot of the values in the Context, including the ClusteredService
- creates a BoundedLogAdapter, which is what the Clustered Service uses to poll the Log. It is bounded because it only polls up to the commit-pos Counter
- creates a ConsensusModuleProxy and Publication for sending messages to the Consensus Module
- creates a ServiceAdapter and Subscription for receiving messages from the ConsensusModule
Once the ClusteredServiceAgent has been created, it is driven from outside by the AgentRunner, which call its
onStart()
method once, then its doWork()
method repeatedly to perform its duty cycle. More objects are
created in onStart()
, but at this point, the Clustered Service looks something like this:
ClusteredServiceAgent.onStart()¶
The ClusteredServiceAgent's
onStart()
method does the following:
- the Clustered Service needs the commit-pos Counter, so it waits for it to exist. It's created by the Consensus Module when it concludes its Context
- recovers its state, which does:
- waits for the RecoveryState Counter to exist - that's created in ConsensusModuleAgent.onStart()
- if the RecoveryState Counter contains information about a snapshot, it loads the snapshot from the Archive. The
snapshot starts with some state from the ClusteredServiceAgent, which it loads
first. This is followed by the application's state, which the ClusteredServiceAgent asks the application to load
next, by calling
ClusteredService.onStart()
. The contents of the snapshot are described later in State and Snapshots - if the RecoveryState Counter does not contain information about a snapshot, the ClusteredServiceAgent calls
ClusteredService.onStart()
without a snapshot Image.onStart()
is where the ClusteredService can perform any initialisation - the ClusteredServiceAgent sends a ServiceAck message to the ConsensusModuleAgent, acknowledging that it has reached the logPosition specified in the RecoveryState Counter. This will either be the Log position the snapshot was taken at, or zero. The ConsensusModuleAgent waits for this after loading its own snapshot
- moves to the Active state
At this point, the Clustered Service has finished initialising, but it still hasn't subscribed to the Log.
Final Structure¶
At some point in the Consensus Module's initialisation, it will start an Election. Within the Election, it will connect to the Clustered Service via the Service channel and send it a JoinLog message. This is the point where the Clustered Service creates the Log Subscription (the JoinLog also tells the Clustered Service whether it is the leader). Each time it polls the Log Subscription to process new messages, it polls up to the commit-pos Counter, which is set by the Consensus Module. As the Clustered Service only takes input from the Log, it can't distinguish between replayed data and live data - there is no marker to indicate when it reaches live messages.
Clients are described on the next page, but in terms of the overall Clustered Service structure, when a client connects to the cluster, it is represented in the Clustered Service by a ClientSession. On the leader, the ClientSession will have an Egress Publication back to the client (followers don't send messages to the client).
The final structure of the Clustered Service looks like this: