Starting Replay
Starting the Replay of a Recording follows a very similar workflow to starting a Recording. We'll go through the diagram quickly, then delve into the details below it. Here, the application requesting replay is the Trading Strategy from our example application.
Use the tabs above to step through the animation.
The Trading Strategy calls one of the replay()
methods on the Archive Client, passing in parameters including
the recordingId
, replayPosition
, replayChannel
and replayStreamId
. These are sent to Aeron Archive in
a ReplayRequest
message, published on the Control Request Channel (along with the controlSessionId
).
The ControlSessionDemuxer reads the message, looks up the ControlSession, calls ControlSession.onStartReplay()
,
which calls ArchiveConductor.startReplay()
. The ArchiveConductor validates
the parameters against the RecordingDescriptor, then starts the async creation of an Exclusive Publication back
to the replayChannel
and replayStreamId
. It then adds a CreateReplayPublicationSession
to its list of
Sessions, which checks for the Publication to be created.
Once the Publication is created, the CreateReplayPublicationSession
asks the ArchiveConductor to create a new
ReplaySession
. The ArchiveConductor does so and adds it to the Replayer. The CreateReplayPublicationSession
is then 'done' and is removed from the ArchiveConductor.
The ReplaySession
starts in an INIT state. It opens the correct Segment file within the Recording that the
replay will start from (replayPosition
). It asks the ArchiveConductor to send a ControlResponse with an OK
response code to the Client. The message contains a replaySessionId
that can be used to interact with the
ReplaySession, e.g. to stop it. Part of the replaySessionId
is the sessionId
of the Replay Publication, which
the Client will use when creating a Subscription to it.
The Client receives the ControlResponse message and creates a Subscription to the replayChannel
and
replayStreamId
. When the Subscription connects to the Publication, it will put the ReplaySession into the
REPLAY state. It is now ready to replay data, starting from replayPosition
, which is in the Segment file
that it has open.
Step 1) Client Requests Start Replay
Replaying a recording is started using one of the various AeronArchive.replay()
or startReplay()
methods. These
send a ReplayRequest
message to Aeron Archive to start the replay. The replay()
methods also create a Subscription
object that a User Application can use to receive the replayed data. The startReplay()
methods require the User
Application to create the Subscription itself.
Replay requires the following parameters:
recordingId
, which can be looked up from the CatalogreplayPosition
- the position to replay from, orNULL_POSITION
to replay from the startlength
- how much of the recording to replay, orLong.MAX_VALUE
to tail the recording once at the endreplayChannel
andreplayStreamId
- the channel and streamId that Aeron Archive will publish the recorded data to, which the User Application needs to subscribe to
There is also a startBoundedReplay()
method that behaves like startReplay()
, but as well as passing in length
for how much to replay up to, the User Application can also pass in a limitCounterId
. This the id of a Counter
(in cnc.dat), which will be queried by the replay to determine how much it can
read up to, and obviously the counter can advance while the replay is happening.
Step 2) Aeron Archive creates a Publication to the ReplayChannel
Whichever method is called in the Client, Aeron Archive ends up in ArchiveConductor.startReplay()
. This
checks that the Catalog contains the recordingId
, then loads a
RecordingSummary
from it. The RecordingSummary contains details about the recording, including startPosition, and if it is no
longer active, stopPosition.
The replayPosition
is checked to ensure it is aligned on a frame boundary (multiple of 32, which each frame is
guaranteed to start on), and is between the recording startPosition and stopPosition (if set).
The ArchiveConductor starts the async creation of an ExclusivePublication back to the replayChannel
and
replayStreamId
. The Publication's current termId
and termOffset
are set based on the replayPosition
(or if
it's not set, the RecordingSummary startPosition
). The initialTermId
and termLength
are also set, from the
RecordingSummary. The ArchiveConductor adds a CreateReplayPublicationSession
to its list of
sessions. Each time the CreateReplayPublicationSession
is serviced, it checks if the publication has been created.
Step 3) Aeron Archive creates a ReplaySession
When the Replay Publication has been created, the CreateReplayPublicationSession
calls
ArchiveConductor.newReplaySession()
, which adds a new ReplaySession
to the Replayer
. The
CreateReplayPublicationSession
marks itself as 'done', which causes it to be removed from the list of sessions
and discarded.
Step 4) ReplaySession (INIT state)
By default, the Replayer
runs in its own thread. It calls doWork()
on each of its ReplaySessions. The
ReplaySession starts off in an INIT state, where it calculates which Segment file contains the replayPosition
and
opens the file.
If replayPosition
is not the recording start position, it is validated to ensure it is at the start of a
Fragment. Rather than walk the recording from the start of a Term to the replayPosition
, the ReplaySession
checks to see if there is a frame header at that position that contains the correct streamId, termId and termOffset.
termOffset
is calculated from the replayPosition by taking the remainder of dividing it by the termLength in the
RecordingSummary. termId
is also calculated from the replayPosition, using the initialTermId and termLength.
streamId
is taken straight from the RecordingSummary.
Once replayPosition
has been validated, the ReplaySession asks the ControlSession (which is on the
ArchiveConductor thread) to send a ControlResponse message with an OK response code back to the Client. The message
includes the correlationId
from the ReplayRequest and a replaySessionId
, which comes from the ArchiveConductor:
final long replaySessionId = ((long)(replayId++) << 32) | (replayPublication.sessionId() & 0xFFFF_FFFFL);
The lower 32 bits are the sessionId of the replay Publication (which the client needs to subscribe to). The top 32 bits
are a unique id within the ArchiveConductor (replayId
starts at 1 and increments as above). The client needs the lower
32 bits when it subscribes to the replay Publication, so it only receives messages from this sessionId. The whole
replaySessionId
is required when performing other operations on the replay, such as stopping it.
The ReplaySession stays in the INIT state until the Publication back to the replayChannel has been created, and the client has connected to it (created a Subscription). Once connected, it moves to the REPLAY state. If it doesn't connect within 5 seconds, it times out and errors, moving to a terminal INACTIVE state.
Step 5) The Client Subscribes
When the Client receives the ControlResponse message, it takes the replaySessionId
from it.
If the User Application called one of the AeronArchive.replay()
methods, the Archive Client creates a Subscription
to the replayChannel
and replayStreamId
, and includes the sessionId from the lower 32 bits of the replaySessionId
.
The Subscription will connect to the Publication, putting the ReplaySession in the REPLAY state. The Subscription
is then returned to the User Application.
If the User Application called one of the AeronArchive.startReplay()
methods, the Client returns the replaySessionId
to it.