Skip to content

Commit

Permalink
LibraryTimeout publish sessions (#519)
Browse files Browse the repository at this point in the history
  • Loading branch information
devesh-b1 authored Sep 3, 2024
1 parent d4c1888 commit 3171046
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
<sbe:messageSchema xmlns:sbe="http://fixprotocol.io/2016/sbe"
package="uk.co.real_logic.artio.messages"
id="666"
version="25"
version="26"
semanticVersion="0.2"
description="Internal messaging format used by the FIX Gateway"
byteOrder="littleEndian">
Expand Down Expand Up @@ -499,6 +499,9 @@
description="notifies library instances that they have been timed out, added for monitoring purposes">
<field name="libraryId" id="1" type="LibraryId"/>
<field name="connectCorrelationId" id="2" type="CorrelationId"/>
<group name="sessions" id="3" dimensionType="groupSizeEncoding" sinceVersion="26">
<field name="sessionId" id="4" type="FixSessionId"/>
</group>
</sbe:message>

<sbe:message name="FollowerSessionReply" id="48"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,9 +559,8 @@ private boolean receivedIndexedPosition(final int aeronSessionId, final long pos

private void saveLibraryTimeout(final LibraryInfo library)
{
final int libraryId = library.libraryId();
schedule(() -> inboundPublication.saveLibraryTimeout(libraryId, 0));
schedule(() -> outboundPublication.saveLibraryTimeout(libraryId, 0));
schedule(() -> inboundPublication.saveLibraryTimeout(library, 0));
schedule(() -> outboundPublication.saveLibraryTimeout(library, 0));
}

private void acquireLibrarySessions(final LiveLibraryInfo library)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
import org.agrona.concurrent.status.AtomicCounter;
import uk.co.real_logic.artio.DebugLogger;
import uk.co.real_logic.artio.dictionary.FixDictionary;
import uk.co.real_logic.artio.engine.ConnectedSessionInfo;
import uk.co.real_logic.artio.engine.RecordingCoordinator;
import uk.co.real_logic.artio.engine.SessionInfo;
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.messages.*;
import uk.co.real_logic.artio.messages.ControlNotificationEncoder.DisconnectedSessionsEncoder;
import uk.co.real_logic.artio.messages.ControlNotificationEncoder.SessionsEncoder;
Expand Down Expand Up @@ -125,6 +127,8 @@ public class GatewayPublication extends ClaimablePublication
private static final int THROTTLE_CONFIGURATION_REPLY_LENGTH = HEADER_LENGTH +
ThrottleConfigurationReplyEncoder.BLOCK_LENGTH;
private static final int SEQ_INDEX_SYNC_LENGTH = HEADER_LENGTH + SeqIndexSyncEncoder.BLOCK_LENGTH;
private static final int LIBRARY_TIMEOUT_LENGTH = HEADER_LENGTH + LibraryTimeoutEncoder.BLOCK_LENGTH +
GroupSizeEncodingEncoder.ENCODED_LENGTH;

private static final boolean APPLICATION_HEARTBEAT_ATTEMPT_ENABLED = isEnabled(APPLICATION_HEARTBEAT_ATTEMPT);
private static final boolean APPLICATION_HEARTBEAT_ENABLED = isEnabled(APPLICATION_HEARTBEAT);
Expand Down Expand Up @@ -966,25 +970,33 @@ public long saveRequestSessionReply(final int libraryId, final SessionReplyStatu
return position;
}

public long saveLibraryTimeout(final int libraryId, final long connectCorrelationId)
public long saveLibraryTimeout(final LibraryInfo libraryInfo, final long connectCorrelationId)
{
final long position = claim(LibraryTimeoutEncoder.BLOCK_LENGTH + HEADER_LENGTH);
if (position < 0)
{
return position;
}
final List<ConnectedSessionInfo> connectedSessionInfos = libraryInfo.sessions();
final int sessionsCount = connectedSessionInfos.size();

final MutableDirectBuffer buffer = bufferClaim.buffer();
final int offset = bufferClaim.offset();
final int framedLength = LIBRARY_TIMEOUT_LENGTH + sessionsCount *
LibraryTimeoutEncoder.SessionsEncoder.sbeBlockLength();
final ExpandableArrayBuffer buffer = buffer(framedLength);

libraryTimeout
.wrapAndApplyHeader(buffer, offset, header)
.libraryId(libraryId)
.wrapAndApplyHeader(buffer, 0, header)
.libraryId(libraryInfo.libraryId())
.connectCorrelationId(connectCorrelationId);

bufferClaim.commit();
final LibraryTimeoutEncoder.SessionsEncoder sessionsEncoder = libraryTimeout.sessionsCount(sessionsCount);
for (int i = 0; i < sessionsCount; i++)
{
final SessionInfo session = connectedSessionInfos.get(i);
sessionsEncoder.next().sessionId(session.sessionId());
}

logSbeMessage(GATEWAY_MESSAGE, libraryTimeout);
final long position = dataPublication.offer(buffer, 0, framedLength);

if (position > 0)
{
logSbeMessage(GATEWAY_MESSAGE, libraryTimeout);
}

return position;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.mockito.verification.VerificationMode;
import uk.co.real_logic.artio.CloseChecker;
import uk.co.real_logic.artio.FixCounters;
import uk.co.real_logic.artio.LivenessDetector;
import uk.co.real_logic.artio.Timing;
import uk.co.real_logic.artio.dictionary.FixDictionary;
import uk.co.real_logic.artio.engine.*;
Expand Down Expand Up @@ -139,6 +140,12 @@ public class FramerTest

private final MutableLong connectionId = new MutableLong(NO_CONNECTION_ID);
private final ErrorHandler errorHandler = mock(ErrorHandler.class);
private final LivenessDetector livenessDetector = mock(LivenessDetector.class);

private final LiveLibraryInfo libraryInfo = new LiveLibraryInfo(
errorHandler,
LIBRARY_ID, LIBRARY_NAME, livenessDetector, 1,
false);

@Before
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -1003,7 +1010,7 @@ private void verifyEndpointsCreated()

private void verifyLibraryTimeout()
{
verify(inboundPublication).saveLibraryTimeout(LIBRARY_ID, 0);
verify(inboundPublication).saveLibraryTimeout(libraryInfo, 0);
}

private void libraryHasAcceptedClient() throws IOException
Expand Down

0 comments on commit 3171046

Please sign in to comment.