Skip to content

Detailed Overview

Aeron Cluster is a large framework, and this page covers a wide area. We start by looking at how Aeron Cluster uses Aeron Transport and Aeron Archive, upon which is it built.

Aeron Transport

Aeron Cluster uses Aeron Transport to communicate between components. The diagram below provides a quick recap of how Aeron Transport works, then shows the different communication channels created by Aeron Cluster.

Clustered Service Consensus Module Archive Clustered Service Consensus Module Archive Clustered Service Consensus Module Archive Client B Client A Network read update result add Publication cnc.dat to driver to clients counters cnc.dat cnc.dat cnc.dat cnc.dat cnc.dat control-response control-request cons-module service cons-module service control-response control-request control-response control-request cons-module service consensus0-2 consensus1-2 consensus2-0 consensus2-1 consensus0-1 consensus2-1 consensus1-0 consensus1-2 consensus1-0 consensus2-0 consensus0-1 consensus0-2 log log log egressB ingressB egressA ingressA ingressB ingressA egressA egressB egressA egressA SETUP ingressA SETUP ingressA connect() log buffers log buffers log buffers log buffers Image log buffers Publication log buffers IPC log buffers IPC log buffers Image log buffers Publication log buffers IPC log buffers IPC log buffers Image log buffers Publication log buffers IPC log buffers IPC log buffers Conductor Receiver Sender Conductor Receiver Sender Conductor Receiver Sender Receiver Conductor Sender Receiver Conductor Sender Media Driver Media Driver Media Driver Media Driver Media Driver Archive Archive Archive Aeron Transport Aeron Transport Aeron Transport Aeron Transpt Aeron Transpt Archive Files Shared Memory files Aeron Transport Aeron Archive User Application Aeron Cluster Key: Leader Follower 1 Follower 2

Use the tabs above to step through the animation.



The Aeron Transport component is the Media Driver. It can be run in a standalone process as depicted above, or co-located with all the other components in one process (described later in Running). It creates a shared memory buffer named cnc.dat, which is central to everything in Aeron. All the components access this as though it's in their own memory space. This is the same on all machines, even the clients.

The cnc.dat file contains different sections. If a component needs to set up something new in Aeron Transport, like adding a new Publication, it calls a method on an Aeron Transport client API. This talks to the Media Driver by writing a message in the 'to driver' section of cnc.dat. The Media Driver reads the message, processes it, and replies by writing a message in the 'to clients' section, which the client API reads.

cnc.dat also contains Counters, which are 64-bit integer values. Being in shared memory allows the components to share values without having to send messages to each other. For example, Aeron Archive could update a Counter that indicates how much it has written to disk in a Log recording. The Consensus Module could read the Counter as though it's reading a local value.

The Media Driver comprises three Agrona Agents. These can be configured to run on different threads or share the same thread (described in Running). The Conductor is responsible for processing driver commands sent in cnc.dat. It is the 'admin' that creates and cleans up resources. Once it has set up a Publication or Subscription, it isn't used on the hot-path of sending and receiving application messages.

When a network connection has been set up, there will be a Publication log buffer on the sending machine, and a corresponding Image log buffer on the receiving machine. The Sender polls each Publication log buffer and sends any new messages over the network to the Receiver. The Receiver polls each network connection and puts new messages in the correct Image log buffer, creating a replica. Other log buffers are used for inter-process communication, which aren't sent over the network.

Looking at how a connection is established, when a client calls connect() on the Cluster client API, it uses the Aeron Transport client API to create an Ingress Publication to each of the cluster members. Focussing on one cluster member, that creates a Publication log buffer, which the client will eventually write Ingress messages into. On the client, the Sender starts sending SETUP messages to the Ingress Channel (host:port) of the cluster member.

Each Consensus Module has a Subscription to the Ingress Channel. This means each Receiver is listening on the Ingress port. When one of them receives a SETUP message, it creates an Image log buffer for that connection. Each connection from a client will have its own Image log buffer on the cluster member, which becomes a replica of the client's Publication log buffer. When the Consensus Module polls its Subscription, it returns messages from each client's Image log buffer.

Before the client sent the SETUP message, it created a Subscription to its Egress Channel, so its Receiver is listening on the Egress port. Once the client's Ingress is connected, it sends a SessionConnectRequest message on it. The message contains the client's Egress Channel (host:port), which the Consensus Module uses to connect back to the client, setting up its Egress connection.

Once both clients have connected, the cluster member will have two Ingress Image log buffers and two Egress Publication log buffers. Its Receiver listens on one Ingress port, which is used by both clients. Each Publication has a unique sessionId, so when a client sends an Ingress message, the Receiver uses the sessionId in the message to identify which Image log buffer to put it into. The leader's Clustered Service also ends up connecting to the Egress and shares the log buffers.

The leader's Consensus Module creates a Publication for the Raft Log, so it has a network Publication log buffer for writing Raft Log messages into. The followers are subscribed to it, so they listen on a Log port and have Image log buffers to replicate the leader's Log Publication log buffer into. The followers are now listening on two ports: one for the Ingress endpoint and one for the Log endpoint.

The Consensus Modules are connected to each other to exchange consensus messages (like AppendPosition and CommitPosition shown in Raft Implementation). This means each member has two Publication log buffers to send messages and two Image log buffers to receive them. They are named in the diagram in the form consensus<sender>-<receiver>. Note that each member is now listening on a new port for the Consensus endpoint.

And for good measure, some IPC log buffers are used to send messages between the Consensus Module, Clustered Service and Aeron Archive. That's not too complicated, is it? 🙂


Cluster Member Endpoints

As we saw above, each cluster member listens on a number of ports for different endpoints. The diagram shows members listening on the Ingress, Log and Consensus endpoints, but there are two more: the Archive and Catchup endpoints.

Ingress - the port that a cluster member listens on for connections from clients. Clients can connect to any member on its Ingress port and send a SessionConnectRequest. The leader will accept the connection, but the followers will return a SessionEvent containing a redirect to the leader.

Log - the port that follower cluster members listen on to receive live Log updates from the leader. The followers have a Log Subscription and manually add this endpoint to it as a destination ('destination' from the perspective of the sender - see Multiple Destinations), when they are ready to receive live Log updates.

Consensus - the port that a cluster member listens on to receive Consensus messages from the other members.

Archive - the port that a cluster member listens on for its Archive's Control (Request) Channel. When the Archive starts, it Subscribes to two Control Channels for receiving commands, one over IPC, the other over UDP. The diagram shows each Consensus Module talking to its Archive on the IPC channel, via the control-request log buffer. The Receiver listens on the Archive port for the UDP Control Channel. Within Aeron Cluster, this is used by a follower that starts up and discovers it is missing Log entries since before the start of the current leadership term. The follower's Consensus Module asks its Archive to connect to the leader's Archive to replicate the missing Log messages to it, using Archive replication.

Catchup - the port that a cluster member listens on when it is a follower, and needs to receive catchup Log messages. If a follower starts up and is missing Log entries within the current leadership term, it catches up from the leader's Archive in a different way. Here, the follower's Consensus Module adds the Catchup endpoint as a second destination to its Log Subscription and sends the endpoint string to the leader in a CatchupPosition message. The leader sets up an Archive replay from its Archive to the follower's Catchup endpoint. This is different from Archive replication, because here, the follower listens to both the Catchup and live Log endpoints, both of which put messages into its Log Image log buffer. When the messages from the Catchup endpoint reach the start of the messages from the Log endpoint, the follower has caught up and the Catchup endpoint is removed.

Cluster Members Configuration...

Each Consensus Module needs configuring with the 5 endpoints described above, using a string with the format:

0,ingress:port,consensus:port,log:port,catchup:port,archive:port|\
1,ingress:port,consensus:port,log:port,catchup:port,archive:port|...

It doesn't matter which ports are used. The BasicAuctionClusteredServiceNode uses the following ports, so it can run all 3 members on the same machine:

Member Archive Ingress Consensus Log Catchup
0 9001 9002 9003 9004 9005
1 9101 9102 9103 9104 9105
2 9201 9202 9203 9204 9205

They all run on localhost, so the cluster members string would look like:

0,localhost:9002,localhost:9003,localhost:9004,localhost:9005,localhost:9001|\
1,localhost:9102,localhost:9103,localhost:9104,localhost:9105,localhost:9101|\
2,localhost:9202,localhost:9203,localhost:9204,localhost:9205,localhost:9201

On a multi-host deployment, it might look like:

0,host-0:9002,host-0:9003,host-0:9004,host-0:9005,host-0:9001|\
1,host-1:9102,host-1:9103,host-1:9104,host-1:9105,host-1:9101|\
2,host-2:9202,host-2:9203,host-2:9204,host-2:9205,host-2:9201

You can put channels on different networks to segregate traffic, e.g. you could specify a 'public' network interface for the Ingress channel and a 'private' network interface for the rest.

Dynamic Join...

Aeron Cluster requires each member to remain on the same host. It used to support 'dynamic join', where cluster members could be added while the cluster is running, but this was removed in version 1.41.0 (PR):

Deprecate cluster dynamic join feature. This is to be replaced with a more robust and user-friendly premium offering

Aeron Archive

Aeron Archive is used to create a durable copy of the Log, in a Recording. Each cluster snapshot is also stored in a Recording. When a cluster member starts, it can replay a snapshot Recording to load the initial state, and/or replay the whole, or part of, the Log Recording. If a cluster member starts and finds itself behind the leader, it can use Aeron Archive to either replicate the Log from the leader (Archive to Archive copy), or replay the log from the leader to catch up (from the leader's Archive, directly into the follower's Log Subscription).

Here's a quick recap of how Aeron Archive works, then a look at some of the ways it is used by Aeron Cluster. The diagram zooms in to focus on the leader and just one follower.

Network Clustered Service consensus-module service Consensus Module control-response control-request Conductor Receiver Sender consensus1-0 consensus1-2 consensus0-1 consensus2-1 cnc.dat Clustered Service consensus-module service Consensus Module control-response control-request Conductor Receiver Sender consensus0-1 consensus0-2 log consensus1-0 consensus2-0 cnc.dat control-request-0 control-request-0 ControlSession ControlSession, ReplicationSession A RecordingSession Election (FOLLOWER_LOG_REPLICATION) Election (CANVASS) ControlSession RecordingReplication B A log recording term 2 term 1 term 0 0 C B D 0 log recording term 0 ReplaySession RecordingSession replication-1 replication-1 log ReplaySession RecordingSession log log-replay live control-request-0 ControlSession ControlSession control-request-0 ReplaySession RecordingSession log log-replay catchup ReplaySession RecordingSession replication-1 replication-1 control-request-0 ControlSession ControlSession, ReplicationSession control-request-0 RecordingSession RecordingSession log recording log recording log recording log recording snapshot1 snapshot2 archive.catalog snapshot1 snapshot2 archive.catalog ControlSession1, ReplicationSession1 ControlSession1, ReplicationSession1 RecordingSession1, 2, 3... RecordingSession1, 2, 3... ReplaySession1, 2, 3... ReplaySession1, 2, 3... Replayer Recorder Conductor Replayer Recorder Conductor Archive Archive FOLLOWER LEADER

Use the tabs above to step through the animation.



Aeron Archive comprises three Agrona Agents. As per other Agrona Agents, they can be configured to run on dedicated threads for best performance, or share a thread with other Agents. The Conductor is responsible for handling requests on the Control (Request) Channel. Similar to the Aeron Transport Conductor, it is the 'admin' that creates and cleans up resources. The other Agents are the Recorder and Replayer.

Each long running task within Aeron Archive is modelled as a Session, which is added to one of the Agrona Agents. On each duty cycle, the Agent calls doWork() on each of its Sessions. When a session is complete, it is removed from the Agent. The Recorder services all the RecordingSessions, each of which reads from a Subscription and writes to a Recording. The Replayer services all the ReplaySessions, which do the opposite, reading from a Recording and writing to a Publication.

The Conductor services all the other session types, such as ControlSession, ReplicationSession, ListRecordingsSession and DeleteSegmentsSession. When a client connects to Aeron Archive, the Conductor creates a ControlSession for it. This sets up the connection back to the client on the response channel that the client specified in the ConnectionRequest message (known as the Control Response Channel).

The Consensus Module asks the Archive to record the Log, which creates a Recording that grows continuously. Recordings are stored in Segment files. Each Segment file has a maximum size and once full, the next Segment file is created (not shown). This makes it easy to archive the oldest segment files. Snapshots of Aeron Cluster are also stored in Recordings. Aeron Archive keeps track of the Recordings and their Segment files in an Archive Catalog file.

Use Case 1: Aeron Archive uses a RecordingSession to record the Log. As the Consensus Module on the leader adds new entries to its Log Publication, the RecordingSession appends them to the Log Recording. Aeron Transport sends the new entries to the followers. A RecordingSession on each follower appends them to their Log Recording.

Use Case 2: if a cluster member starts and finds its Log Recording stopped before the current leadership term, it uses Archive replication to replicate the missing entries from the leader (this is in the FOLLOWER_LOG_REPLICATION election state). This is an Archive to Archive copy, which the follower's Consensus Module requests its Archive to initiate. The follower's Archive creates a ReplicationSession, which requests a replay from the leader's Archive, via the it's UDP Control Request Channel.

The leader's Archive receives the request and creates a ReplaySession. This replays the requested part of the Log Recording to a Subscription on the follower. The follower's Subscription is the Recording Subscription created by its Archive when the ReplicationSession opened the Log Recording to extend it (append to it). Note that the follower does not have a Subscription to the live Log at this stage, so there is no live Log Recording.

Use Case 3: if a cluster member starts and is only missing entries within the current leadership term, it replays them from the leader's Archive, straight into its Log Subscription. This is different from Archive replication as the follower processes the entries as they arrive, as well as recording them. The follower adds a Catchup endpoint to its Log Subscription, which the leader replays into. The leader creates an ExclusivePublication for the replay, so replay doesn't interfere with the live Publication log buffer.

When it's nearly caught up, the follower adds the live endpoint to its Log Subscription, so it can start to populate its Log log buffer with live messages. The two endpoints populate different regions of the follower's Log log buffer. Once entries from the catchup endpoint reach the start of the entries from the live endpoint, it has caught up. The Archive replay stops and the Catchup endpoint is removed. This is 'replay merge'. This Catchup process is performed by the FOLLOWER_CATCHUP election state.


Cluster Startup

When you start Aeron Cluster, you start a Media Driver, Archive, Consensus Module and Clustered Service.

Aeron Transport

The Media Driver starts from a clean slate each time. It creates a new cnc.dat file, then sits waiting for Transport clients to connect. It doesn't create any Publications, Subscriptions or log buffers automatically.

Aeron Archive

When the Archive starts up, it connects to the Media Driver, creates several Counters, refreshes its Catalog, subscribes to the IPC and UDP Control Request Channels, then waits for Archive clients to connect.

Consensus Module

When the Consensus Module starts, it does the following (this is an overview - details can be found later in Consensus Module):

  • connects to the Media Driver and Archive
  • creates a RecoveryPlan, which tells it which is the Log Recording and which Recordings are for the latest snapshot (if there is one)
  • creates a RecoveryState counter to pass recovery information to the ClusteredService
  • loads the latest snapshot, if there is one (the Consensus Module stores some state in a separate snapshot from the Clustered Service)
  • waits for the Clustered Service to load its snapshot, if there is one, and acknowledge the Log position it is up to (snapshot position or zero)
  • moves to the FOLLOWER role
  • enters an Election
  • during the Election, if it finds that its Log is missing entries that are on the leader, it copies those entries
    • if it is missing entries from before the current leadership term, it uses Archive Replication to replicate up to the start of the current leadership term. Once replication is complete, it processes those entries
    • once that is complete, or if it was only missing entries from the current leadership term, it uses 'catchup' to replay missing entries and processes them at the same time. When it is near the live Log position, it merges into processing the live Log stream ('replay merge')
  • once the Election is over, it starts normal consensus operations, either as leader or a follower

Elections

At a high level, an Election is just choosing a leader and getting the follower(s) to catch up. An Election can function without all the cluster members, but must have enough for a majority. Once an Election is complete, there are at least a majority of the members running, they have caught up to the leader's Log position, and the leader can 'open the doors' for new Ingress messages. Elections are a big subject and are covered in Elections.

Clustered Service

The Clustered Service startup is intertwined with the Consensus Module's. It is wrapped by a Clustered Service Agent. When it starts, the Agent waits for the RecoveryState counter to be created, which contains information about which snapshot (if any) to load. The Agent loads the snapshot into the Clustered Service, if there was one. It then sends an Ack to the Consensus Module, confirming the logPosition that it is up to, which should match the position the Consensus Module is expecting. The Agent is then active and will start polling for messages on the Service and Log channels. Any messages on the Log channel (which is polled up to the CommitPosition counter) are given to the Clustered Service.

Multiple Clustered Services

It is possible to run more than one Clustered Service in the cluster. For example, you might have two related applications like a matching engine and a risk module that you want to co-locate. They would each run in their own thread in the cluster and process the same Log. Each application would have to ignore Log messages that are not applicable to it. Like any Clustered Service, they should only take input from the Log, so they should not talk to each other directly. Any communications between them must be done via messages they append to the Log for each other.

Deterministic Behaviour

It should be clear from the description of Raft, but it is crucial that the Clustered Service (your code) is deterministic. It is what keeps the state held in memory on each cluster member identical. It's why replaying the Log at startup can be relied upon to recreate the exact same in-memory state. If the members diverge, you lose the ability to automatically fail over to another member, because the others may have an alternate view of the world. Any non-deterministic behaviour means there is little point in using Aeron Cluster.

For example, in a trading system, a trader could place an order, which the leader could validate and accept. The order could go on to match against another order, creating a trade, which is notified to other downstream systems. If a follower behaved differently for whatever reason, it could validate the trader's order and reject it. After that, the divergence would only grow. That means you can no longer fail over to that follower. The leader has already notified downstream systems of the trade - business cannot continue as though it didn't happen. Small differences in behaviour can have a large impact on a system's state. This can be as trivial as different iteration order on a collection.

There is a whole page dedicated to Determinism.

Time

In order to avoid non-deterministic behaviour, the Clustered Service should not retrieve the current system time itself. It's not just the issue of small differences in time between each cluster member when they process a message. If a cluster member restarts and replays the Log, that could be several hours, or even days since the message was originally processed. It needs to use the original time in order to recreate the original behaviour.

To address this, time should be taken from the Cluster. When the leader appends a message to the Log, it includes the current time as a timestamp in the Log message. This is replicated to the followers as part of the Log message. When a cluster member processes the Log message, it sets the cluster time to this timestamp. This timestamp is provided when processing the Log message, but is also accessible on the Cluster interface (it's the same value).

Cluster time is represented as epoch time: a number of time units since 1 Jan 1970 UTC. By default, the cluster uses millisecond time units, but it can be configured differently. If a Clustered Service schedules a job, it specifies the deadline in cluster time units.

Scheduled Jobs

If a Clustered Service needs to schedule some work to happen in the future, it can use the Cluster interface to schedule a timer. This registers a correlationId against a deadline in the Consensus Module. When the deadline passes, the Consensus Module appends a TimerEvent to the Log (in the same way that it appends Ingress messages). This goes through the same Consensus process and will eventually be processed by the Clustered Service on each cluster member. It's up to the Clustered Service to map the correlationId to the action it wants to perform.

Cluster Clients

As per Raft, clients can connect to the Ingress of any cluster member, but if they connect to a follower, it will respond, redirecting them to the leader. When a client connects to the leader, it's the Consensus Module that accepts the connection. It creates a connection back to the client on the client's Egress channel.

The leader's Consensus Module monitors session liveness and closes the connection if it becomes stale. It's also responsible for closing the connection on request from the client, or the Clustered Service.

When a client connects, the leader Consensus Module puts a message on the Log. This is read by the follower Consensus Modules and all the Clustered Services. The follower Consensus Modules use this to maintain their own list of sessions, but don't connect back to the client on the Egress. If an election happens, they have enough information so that the new leader can contact all the clients, tell them to switch their Ingress connections to it, and connect to their Egress channels.

When the Clustered Services receive the Log message for a new client connection, they create a session object for it. The leader asks its session object to connect to the client's Egress channel, which ends up sharing the Egress log buffer created by its Consensus Module. When the Clustered Services try to send an application-level message to the client, the leader sends the message on the Egress connection, and the followers fake a successful send.

Authentication

When a client connects to the cluster, authentication is supported, but it's disabled by default. Authentication can be done using encoded credentials on the connect request, and / or be followed by a challenge-response exchange. This is described later in Authentication.

Developer Replay

When a cluster member starts, it replays the latest snapshot and any Log messages from the Log Recording, beyond the snapshot position (or just replays the Log, if there is no snapshot). This lets it recreate the same state that it was in when it shut down. This same mechanism can be used to investigate production issues.

If a developer can get hold of a copy of the snapshot and Log Recordings from the Archive, they can also replay them (into the same version of the code). They can replay using a single node cluster, even if the production system is a multi node cluster. While replaying, the cluster member does not attempt to connect any of the Egress connections, so replay can occur in isolation, with no outside dependencies - the rest of the system is not required.

If you have some way of identifying the Log position where the issue happened, or some other identifying data, you can set a breakpoint in the Clustered Service where it happened. Once you hit the breakpoint, you can start your investigation.

Tip

Conditional breakpoints can be slow to process, but you can add temporary code like this:

    if (/*the condition that triggered the issue*/)
    {
        System.out.println("BOOM!");   // set normal breakpoint here
    }

Then you can run at full speed in the debugger.

Single Node Cluster

You can run an Aeron Cluster with one cluster member. You obviously lose the main selling point of Aeron Cluster - automatic failover - but retain the significant benefits of:

  • being able to replay the Log to reproduce issues precisely. This is a game-changer if you are used to relying on log messages and inference when investigating issues. It doesn't get any better than being able to step through the exact same scenario in a debugger, with all the variable values on display, without having to worry about the interactions of multiple threads. With this level of support, you can reduce logging to a minimum, which improves performance
  • it creates an excellent boundary for testing, which reduces the risk of issues in the first place (discussed more below)

When running with a single cluster member, the cluster doesn't follow any different code paths, other than in the Election. The Election goes straight to the state that the leader goes to, upon winning an Election. The cluster member still has a list of 'all' cluster members, but it only contains itself, so when it tries to achieve consensus, it only considers its own Append position. Once it has written an entry to its Log Recording, it can be processed.

Testing

A Clustered Service typically represents a bounded context. Aeron Cluster isolates it from the outside world, providing it with a single sequence of inputs that are processed on a single thread, with a well-defined set of outputs via client Egress connections. This provides a perfect interface for writing tests against.

You would typically create a test framework that lets you create an instance of the Clustered Service, but instead of managing it with Aeron Cluster, you'd wrap it with some kind of 'driver' that the test can use to interact with it (so no Aeron code is running). The driver would let the test send Log messages in, by directly calling into the Clustered Service. The driver would capture any Egress messages and let the test assert on them in some way. All of this runs on a single thread, so tests don't suffer from race conditions.

Each test would create a new instance of the Clustered Service for test isolation. This typically doesn't take long (millis) as the Clustered Service wouldn't have any state. The test would set up all the state it requires by sending in Log messages.

Tests at this bounded context level can make up the bulk of the tests in your test pyramid. These test only interact with the Clustered Service via its interface, so they don't impose on the implementation. You can refactor the implementation without having to change any tests, which gives you confidence that you haven't broken anything. Testing at a lower level, like individual classes, can be like pouring concrete on your code. You often can't change the code without having to change the tests, which can make it hard to spot whether you've broken anything.

In my experience, the bounded context tests are typically easier to write as you end up developing a lot of reusable support code for sending in Log messages and asserting on Egress messages. This makes it easier to write tests, which in turn encourages a higher quantity and quality of tests, which reduces the risk of issues.