Skip to content

Commit

Permalink
test: use one Executor for exporting records
Browse files Browse the repository at this point in the history
Using an own executor avoids using the ProcessingScheduleService, which
shouldn't be accessed from outside the Processing Actor.

Furthermore, we reduce the access to the LogStream object from the
Context.
  • Loading branch information
ChrisKujawa committed Sep 30, 2022
1 parent c7ff8cd commit 7017918
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 18 deletions.
55 changes: 37 additions & 18 deletions engine/src/test/java/io/camunda/zeebe/engine/util/EngineRule.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import io.camunda.zeebe.engine.util.client.ProcessInstanceClient;
import io.camunda.zeebe.engine.util.client.PublishMessageClient;
import io.camunda.zeebe.engine.util.client.VariableClient;
import io.camunda.zeebe.logstreams.log.LogStream;
import io.camunda.zeebe.logstreams.log.LogStreamReader;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.log.LoggedEvent;
import io.camunda.zeebe.logstreams.util.ListLogStorage;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.model.bpmn.Bpmn;
import io.camunda.zeebe.protocol.Protocol;
import io.camunda.zeebe.protocol.impl.encoding.MsgPackConverter;
Expand Down Expand Up @@ -69,6 +69,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -195,7 +197,9 @@ private void startProcessors() {
partitionId, interPartitionCommandSender),
jobsAvailableCallback,
featureFlags)
.withListener(new ProcessingExporterTransistor())
.withListener(
new ProcessingExporterTransistor(
environmentRule.getLogStream(partitionId)))
.withListener(reprocessingCompletedListener),
Optional.of(
new StreamProcessorListener() {
Expand Down Expand Up @@ -423,28 +427,43 @@ private static class ProcessingExporterTransistor implements StreamProcessorLife

private LogStreamReader logStreamReader;
private TypedRecordImpl typedEvent;
private final SynchronousLogStream synchronousLogStream;
private final ExecutorService executorService;

public ProcessingExporterTransistor(final SynchronousLogStream synchronousLogStream) {
this.synchronousLogStream = synchronousLogStream;
executorService = Executors.newSingleThreadExecutor();
}

@Override
public void onRecovered(final ReadonlyStreamProcessorContext context) {
final int partitionId = context.getPartitionId();
typedEvent = new TypedRecordImpl(partitionId);
final var scheduleService = context.getScheduleService();

final LogStream logStream = context.getLogStream();
logStream.registerRecordAvailableListener(
() -> scheduleService.runDelayed(Duration.ZERO, this::onNewEventCommitted));
logStream
.newLogStreamReader()
.onComplete(
((reader, throwable) -> {
if (throwable == null) {
logStreamReader = reader;
onNewEventCommitted();
}
}));
executorService.submit(
() -> {
final int partitionId = context.getPartitionId();
typedEvent = new TypedRecordImpl(partitionId);
final var asyncLogStream = synchronousLogStream.getAsyncLogStream();
asyncLogStream.registerRecordAvailableListener(this::onNewEventCommitted);
logStreamReader = asyncLogStream.newLogStreamReader().join();
exportEvents();
});
}

@Override
public void onClose() {
executorService.shutdownNow();
}

private void onNewEventCommitted() {
// this is called from outside (LogStream), so we need to enqueue the task
if (executorService.isShutdown()) {
return;
}

executorService.submit(this::exportEvents);
}

private void exportEvents() {
// to always run in the same thread
if (logStreamReader == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.camunda.zeebe.engine.processing.streamprocessor.TypedRecordProcessors;
import io.camunda.zeebe.engine.state.mutable.MutableZeebeState;
import io.camunda.zeebe.logstreams.log.LogStreamRecordWriter;
import io.camunda.zeebe.logstreams.util.SynchronousLogStream;
import io.camunda.zeebe.msgpack.UnpackedObject;
import io.camunda.zeebe.protocol.record.RecordType;
import io.camunda.zeebe.protocol.record.intent.Intent;
Expand Down Expand Up @@ -56,6 +57,10 @@ public LogStreamRecordWriter getLogStreamRecordWriter(final int partitionId) {
return streams.getLogStreamRecordWriter(logName);
}

public SynchronousLogStream getLogStream(final int partitionId) {
return streams.getLogStream(getLogName(partitionId));
}

public LogStreamRecordWriter newLogStreamRecordWriter(final int partitionId) {
final String logName = getLogName(partitionId);
return streams.newLogStreamRecordWriter(logName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ public StreamProcessor getStreamProcessor(final int partitionId) {
return streamProcessingComposite.getStreamProcessor(partitionId);
}

public SynchronousLogStream getLogStream(final int partitionId) {
return streamProcessingComposite.getLogStream(partitionId);
}

public CommandResponseWriter getCommandResponseWriter() {
return streams.getMockedResponseWriter();
}
Expand Down

0 comments on commit 7017918

Please sign in to comment.