Skip to content

Commit

Permalink
fix(engine): allow passing receiver in general
Browse files Browse the repository at this point in the history
  • Loading branch information
npepinpe committed Sep 24, 2022
1 parent 572438d commit 350887d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.zeebe.containers.ZeebeBrokerNode;
import io.zeebe.containers.ZeebeGatewayNode;
import io.zeebe.containers.cluster.ZeebeCluster;
import io.zeebe.containers.exporter.DebugReceiver;
import java.time.Duration;
import org.apiguardian.api.API;
import org.apiguardian.api.API.Status;
Expand All @@ -36,19 +37,6 @@
@API(status = Status.EXPERIMENTAL)
public interface ContainerEngine extends Startable, ZeebeTestEngine {

/**
* Marks all records with a position less than {@code position} on partition with ID {@code
* partitionId} as acknowledged, meaning they can now be deleted from Zeebe.
*
* <p>Note that this is not a synchronous operation, but instead will take effect when the next
* record is exported. See {@link io.zeebe.containers.exporter.DebugReceiver#acknowledge(int,
* long)} for more.
*
* @param partitionId the ID of the partition on which to acknowledge
* @param position the position up to which they should be acknowledged
*/
void acknowledge(final int partitionId, final long position);

/**
* Returns a default builder. Calling {@link Builder#build()} on a fresh builder will return a
* builder wrapping a default {@link io.zeebe.containers.ZeebeContainer}, with an idle period of 1
Expand All @@ -70,6 +58,19 @@ static ContainerEngine createDefault() {
return builder().build();
}

/**
* Marks all records with a position less than {@code position} on partition with ID {@code
* partitionId} as acknowledged, meaning they can now be deleted from Zeebe.
*
* <p>Note that this is not a synchronous operation, but instead will take effect when the next
* record is exported. See {@link io.zeebe.containers.exporter.DebugReceiver#acknowledge(int,
* long)} for more.
*
* @param partitionId the ID of the partition on which to acknowledge
* @param position the position up to which they should be acknowledged
*/
void acknowledge(final int partitionId, final long position);

/**
* A helper class to build {@link ContainerEngine} instances. A fresh, non-configured builder will
* always return one which has an idle period of 1 second, and uses a default {@link
Expand Down Expand Up @@ -170,9 +171,21 @@ interface Builder {
*
* @param port the port to assign to the receiver
* @return itself for chaining
* @deprecated since 3.5.2, will be removed in 3.7.0; use {@link
* #withDebugReceiver(DebugReceiver)} instead
*/
@Deprecated
Builder withDebugReceiverPort(final int port);

/**
* The pre-configured {@link DebugReceiver} instance to use. Useful if you want to pre-assign
* ports or have fine-grained control over the acknowledgment process.
*
* @param receiver the debug receiver to use
* @return itself for chaining
*/
Builder withDebugReceiver(final DebugReceiver receiver);

/**
* Builds a {@link ContainerEngine} based on the configuration. If nothing else was called, will
* build an engine using a default {@link io.zeebe.containers.ZeebeContainer}, an idle period of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ final class ContainerEngineBuilder implements Builder {
private Duration gracePeriod;
private boolean autoAcknowledge;
private int debugReceiverPort;
private DebugReceiver debugReceiver;

@Override
public <T extends GenericContainer<T> & ZeebeGatewayNode<T> & ZeebeBrokerNode<T>>
Expand Down Expand Up @@ -102,21 +103,34 @@ public Builder withAutoAcknowledge(final boolean autoAcknowledge) {

@Override
public Builder withDebugReceiverPort(final int debugReceiverPort) {
if (debugReceiverPort < 0) {
throw new IllegalArgumentException(
String.format(
"Debug receiver port must be greater than or equal to 0, but %d was given",
debugReceiverPort));
}

this.debugReceiverPort = debugReceiverPort;
return this;
}

@Override
public Builder withDebugReceiver(final DebugReceiver debugReceiver) {
this.debugReceiver = Objects.requireNonNull(debugReceiver, "must specify a debug receiver");
return this;
}

@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public ContainerEngine build() {
final Duration listGracePeriod = Optional.ofNullable(gracePeriod).orElse(DEFAULT_GRACE_PERIOD);
final Duration receiveIdlePeriod = Optional.ofNullable(idlePeriod).orElse(DEFAULT_IDLE_PERIOD);
final InfiniteList<Record<?>> records = new InfiniteList<>(listGracePeriod);
final DebugReceiver receiver =
Optional.ofNullable(debugReceiver)
.orElse(new DebugReceiver(records::add, debugReceiverPort, autoAcknowledge));
final DebugReceiverStream recordStream =
new DebugReceiverStream(
records,
new DebugReceiver(records::add, debugReceiverPort, autoAcknowledge),
receiveIdlePeriod);
new DebugReceiverStream(records, receiver, receiveIdlePeriod);

try {
if (container != null) {
Expand Down

0 comments on commit 350887d

Please sign in to comment.