diff --git a/client/src/main/java/io/atomix/copycat/client/session/ClientSequencer.java b/client/src/main/java/io/atomix/copycat/client/session/ClientSequencer.java index 7e02245d..d5155e9d 100644 --- a/client/src/main/java/io/atomix/copycat/client/session/ClientSequencer.java +++ b/client/src/main/java/io/atomix/copycat/client/session/ClientSequencer.java @@ -58,9 +58,9 @@ final class ClientSequencer { private static final Logger LOGGER = LoggerFactory.getLogger(ClientSequencer.class); private final ClientSessionState state; - private long requestSequence; - private long responseSequence; - private long eventIndex; + long requestSequence; + long responseSequence; + long eventIndex; private final Queue eventCallbacks = new ArrayDeque<>(); private final Map responseCallbacks = new HashMap<>(); @@ -91,7 +91,7 @@ public long nextRequest() { */ public void sequenceEvent(PublishRequest request, Runnable callback) { if (requestSequence == responseSequence) { - LOGGER.debug("{} - Completing event {}", state.getSessionId(), request); + LOGGER.debug("{} - Completing {}", state.getSessionId(), request); callback.run(); eventIndex = request.eventIndex(); } else { @@ -150,7 +150,7 @@ private void completeResponses() { if (requestSequence == responseSequence) { EventCallback eventCallback = eventCallbacks.poll(); while (eventCallback != null) { - LOGGER.debug("{} - Completing event {}", state.getSessionId(), eventCallback.request); + LOGGER.debug("{} - Completing {}", state.getSessionId(), eventCallback.request); eventCallback.run(); eventIndex = eventCallback.request.eventIndex(); eventCallback = eventCallbacks.poll(); @@ -172,23 +172,39 @@ private boolean completeResponse(OperationResponse response, Runnable callback) // If the response's event index is greater than the current event index, that indicates that events that were // published prior to the response have not yet been completed. Attempt to complete pending events. - if (response.eventIndex() > eventIndex) { + long responseEventIndex = response.eventIndex(); + if (responseEventIndex > eventIndex) { // For each pending event with an eventIndex less than or equal to the response eventIndex, complete the event. // This is safe since we know that sequenced responses should see sequential order of events. EventCallback eventCallback = eventCallbacks.peek(); - while (eventCallback != null && eventCallback.request.eventIndex() <= response.eventIndex()) { + while (eventCallback != null && eventCallback.request.eventIndex() <= responseEventIndex) { eventCallbacks.remove(); - LOGGER.debug("{} - Completing event {}", state.getSessionId(), eventCallback.request); + LOGGER.debug("{} - Completing {}", state.getSessionId(), eventCallback.request); eventCallback.run(); eventIndex = eventCallback.request.eventIndex(); eventCallback = eventCallbacks.peek(); } + + // If the response event index is still greater than the last sequenced event index, check + // enqueued events to determine whether any events can be skipped. This is necessary to + // ensure that a response with a missing event can still trigger prior events. + if (responseEventIndex > eventIndex) { + for (EventCallback event : eventCallbacks) { + // If the event's previous index is consistent with the current event index and the event + // index is greater than the response event index, set the response event index to the + // event's previous index. + if (event.request.previousIndex() <= eventIndex && event.request.eventIndex() >= response.eventIndex()) { + responseEventIndex = event.request.previousIndex(); + break; + } + } + } } // If after completing pending events the eventIndex is greater than or equal to the response's eventIndex, complete the response. // Note that the event protocol initializes the eventIndex to the session ID. - if (response.eventIndex() <= eventIndex || (eventIndex == 0 && response.eventIndex() == state.getSessionId())) { - LOGGER.debug("{} - Completing response {}", state.getSessionId(), response); + if (responseEventIndex <= eventIndex || (eventIndex == 0 && responseEventIndex == state.getSessionId())) { + LOGGER.debug("{} - Completing {}", state.getSessionId(), response); callback.run(); return true; } else { diff --git a/client/src/test/java/io/atomix/copycat/client/session/ClientSequencerTest.java b/client/src/test/java/io/atomix/copycat/client/session/ClientSequencerTest.java index b5b8d4f4..71563de3 100644 --- a/client/src/test/java/io/atomix/copycat/client/session/ClientSequencerTest.java +++ b/client/src/test/java/io/atomix/copycat/client/session/ClientSequencerTest.java @@ -193,4 +193,86 @@ public void testSequenceResponses() throws Throwable { assertTrue(run.get()); } + /** + * Tests sequencing responses with a missing PublishRequest. + */ + public void testSequenceMissingEvent() throws Throwable { + ClientSessionState state = new ClientSessionState(UUID.randomUUID().toString()); + state.setSessionId(1) + .setCommandRequest(2) + .setResponseIndex(15) + .setEventIndex(5); + + AtomicInteger run = new AtomicInteger(); + + ClientSequencer sequencer = new ClientSequencer(state); + sequencer.requestSequence = 2; + sequencer.responseSequence = 1; + sequencer.eventIndex = 5; + + CommandResponse commandResponse = CommandResponse.builder() + .withStatus(Response.Status.OK) + .withIndex(20) + .withEventIndex(10) + .build(); + sequencer.sequenceResponse(2, commandResponse, () -> assertEquals(run.getAndIncrement(), 0)); + + PublishRequest publishRequest = PublishRequest.builder() + .withSession(1) + .withEventIndex(25) + .withPreviousIndex(5) + .build(); + sequencer.sequenceEvent(publishRequest, () -> assertEquals(run.getAndIncrement(), 1)); + + assertEquals(run.get(), 2); + } + + /** + * Tests sequencing multiple responses that indicate missing events. + */ + public void testSequenceMultipleMissingEvents() throws Throwable { + ClientSessionState state = new ClientSessionState(UUID.randomUUID().toString()); + state.setSessionId(1) + .setCommandRequest(2) + .setResponseIndex(15) + .setEventIndex(5); + + AtomicInteger run = new AtomicInteger(); + + ClientSequencer sequencer = new ClientSequencer(state); + sequencer.requestSequence = 3; + sequencer.responseSequence = 1; + sequencer.eventIndex = 5; + + CommandResponse commandResponse2 = CommandResponse.builder() + .withStatus(Response.Status.OK) + .withIndex(20) + .withEventIndex(10) + .build(); + sequencer.sequenceResponse(3, commandResponse2, () -> assertEquals(run.getAndIncrement(), 1)); + + CommandResponse commandResponse1 = CommandResponse.builder() + .withStatus(Response.Status.OK) + .withIndex(18) + .withEventIndex(8) + .build(); + sequencer.sequenceResponse(2, commandResponse1, () -> assertEquals(run.getAndIncrement(), 0)); + + PublishRequest publishRequest1 = PublishRequest.builder() + .withSession(1) + .withEventIndex(25) + .withPreviousIndex(5) + .build(); + sequencer.sequenceEvent(publishRequest1, () -> assertEquals(run.getAndIncrement(), 2)); + + PublishRequest publishRequest2 = PublishRequest.builder() + .withSession(1) + .withEventIndex(28) + .withPreviousIndex(8) + .build(); + sequencer.sequenceEvent(publishRequest2, () -> assertEquals(run.getAndIncrement(), 3)); + + assertEquals(run.get(), 4); + } + }