Skip to content

Commit

Permalink
merge: #15052
Browse files Browse the repository at this point in the history
15052: [Backport stable/8.3] Improve resilience of remote stream restart r=npepinpe a=backport-action

# Description
Backport of #15025 to `stable/8.3`.

relates to #14884
original author: `@npepinpe`

Co-authored-by: Nicolas Pepin-Perreault <[email protected]>
Co-authored-by: Nicolas Pepin-Perreault <[email protected]>
  • Loading branch information
3 people authored Nov 7, 2023
2 parents 3ea6664 + bb34091 commit d77867a
Show file tree
Hide file tree
Showing 8 changed files with 409 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public MessagingException(final String message, final Throwable cause) {

/** Exception indicating no remote registered remote handler. */
public static class NoRemoteHandler extends MessagingException {
public NoRemoteHandler(String subject) {
public NoRemoteHandler(final String subject) {
super(
String.format(
"No remote message handler registered for this message, subject %s", subject));
Expand All @@ -41,7 +41,7 @@ public NoRemoteHandler(String subject) {

/** Exception indicating handler failure. */
public static class RemoteHandlerFailure extends MessagingException {
public RemoteHandlerFailure(String message) {
public RemoteHandlerFailure(final String message) {
super(String.format("Remote handler failed to handle message, cause: %s", message));
}
}
Expand All @@ -56,10 +56,12 @@ public ProtocolException() {
}

public static class NoSuchMemberException extends MessagingException {
public NoSuchMemberException(final Address sender) {
super(
"Failed to handle incoming message, sender %s is not a known cluster member"
.formatted(sender));
public NoSuchMemberException(final Address address) {
super("Failed to handle message, host %s is not a known cluster member".formatted(address));
}

public NoSuchMemberException(final String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.ManagedClusterCommunicationService;
import io.atomix.cluster.messaging.MessagingException;
import io.atomix.cluster.messaging.MessagingException.NoSuchMemberException;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.cluster.messaging.UnicastService;
import io.atomix.utils.net.Address;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -212,7 +212,7 @@ private <T> CompletableFuture<T> failOnMemberNotKnown(
String.format(
"Expected to send a message with subject '%s' to member '%s', but member is not known. Known members are '%s'.",
subject, toMemberId, membershipService.getMembers());
return CompletableFuture.failedFuture(new ConnectException(errorMessage));
return CompletableFuture.failedFuture(new NoSuchMemberException(errorMessage));
}

@Override
Expand Down Expand Up @@ -302,7 +302,7 @@ private final class InternalMessageBiResponder<M, R>

InternalMessageBiResponder(
final Function<byte[], M> decoder,
Function<R, byte[]> encoder,
final Function<R, byte[]> encoder,
final BiFunction<MemberId, M, R> handler,
final Executor executor) {
this.decoder = decoder;
Expand Down
7 changes: 6 additions & 1 deletion atomix/utils/src/main/java/io/atomix/utils/Managed.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
*
* @param <T> managed type
*/
public interface Managed<T> {
public interface Managed<T> extends AutoCloseable {

/**
* Starts the managed object.
Expand All @@ -45,4 +45,9 @@ public interface Managed<T> {
* @return A completable future to be completed once the object has been stopped.
*/
CompletableFuture<Void> stop();

@Override
default void close() throws Exception {
stop().join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import io.camunda.zeebe.scheduler.future.CompletableActorFuture;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import org.jetbrains.annotations.NotNull;

/**
* Control interface to schedule tasks or follow-up tasks such that different tasks scheduled via
* the same {@code ConcurrencyControl} object are never executed concurrently
*/
public interface ConcurrencyControl {
public interface ConcurrencyControl extends Executor {

/**
* Schedules a callback to be invoked after the future has completed
Expand Down Expand Up @@ -66,4 +68,9 @@ default <V> ActorFuture<V> createFuture() {
default <V> ActorFuture<V> createCompletedFuture() {
return CompletableActorFuture.completed(null);
}

@Override
default void execute(@NotNull final Runnable command) {
run(command);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public void event(final ClusterMembershipEvent event) {
if (type == Type.MEMBER_REMOVED) {
apiServer.removeAll(event.subject().id());
} else if (type == Type.MEMBER_ADDED) {
apiServer.recreateStreams(event.subject().id());
apiServer.restartStreams(event.subject().id());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,53 @@

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ClusterCommunicationService;
import io.atomix.cluster.messaging.MessagingException.NoRemoteHandler;
import io.atomix.cluster.messaging.MessagingException.NoSuchMemberException;
import io.atomix.cluster.messaging.MessagingException.RemoteHandlerFailure;
import io.camunda.zeebe.scheduler.Actor;
import io.camunda.zeebe.transport.stream.impl.messages.AddStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.MessageUtil;
import io.camunda.zeebe.transport.stream.impl.messages.RemoveStreamRequest;
import io.camunda.zeebe.transport.stream.impl.messages.StreamTopics;
import io.camunda.zeebe.util.ExponentialBackoff;
import io.camunda.zeebe.util.VisibleForTesting;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.function.LongUnaryOperator;
import org.agrona.collections.ArrayUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Server-side actor which takes care of the network communication between the remote stream clients
* (e.g. gateways) and servers (e.g. brokers). Sets up handlers for shared topics to receive add,
* remove, and remove all requests, and manages sending restart requests to added clients.
*
* @param <M> type of the stream's metadata
*/
public final class RemoteStreamTransport<M> extends Actor {
private static final byte[] EMPTY_PAYLOAD = new byte[0];
private static final Duration REQUEST_TIMEOUT = Duration.ofSeconds(5);
private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamTransport.class);
private static final int INITIAL_RETRY_DELAY_MS = 100;

private final ClusterCommunicationService transport;
private final RemoteStreamApiHandler<M> requestHandler;
private final LongUnaryOperator retryDelaySupplier;

public RemoteStreamTransport(
final ClusterCommunicationService transport, final RemoteStreamApiHandler<M> requestHandler) {
this(transport, requestHandler, new ExponentialBackoff());
}

@VisibleForTesting
RemoteStreamTransport(
final ClusterCommunicationService transport,
final RemoteStreamApiHandler<M> requestHandler,
final LongUnaryOperator retryDelaySupplier) {
this.transport = transport;
this.requestHandler = requestHandler;
this.retryDelaySupplier = retryDelaySupplier;
}

@Override
Expand Down Expand Up @@ -69,45 +94,108 @@ public void removeAll(final MemberId member) {

private byte[] onAdd(final MemberId sender, final AddStreamRequest request) {
requestHandler.add(sender, request);
return EMPTY_PAYLOAD;
return ArrayUtil.EMPTY_BYTE_ARRAY;
}

private byte[] onRemove(final MemberId sender, final RemoveStreamRequest request) {
requestHandler.remove(sender, request);
return EMPTY_PAYLOAD;
return ArrayUtil.EMPTY_BYTE_ARRAY;
}

private byte[] onRemoveAll(final MemberId sender, final byte[] ignored) {
requestHandler.removeAll(sender);
return EMPTY_PAYLOAD;
return ArrayUtil.EMPTY_BYTE_ARRAY;
}

public void recreateStreams(final MemberId receiver) {
public CompletableFuture<Void> restartStreams(final MemberId receiver) {
final var completed = new CompletableFuture<Void>();
sendRestartStreamsRequest(receiver, completed, INITIAL_RETRY_DELAY_MS);
return completed;
}

private void sendRestartStreamsRequest(
final MemberId receiver, final CompletableFuture<Void> completed, final long retryDelayMs) {
try {
LOG.debug("Restarting streams with for newly added member '{}'", receiver);
sendRestartStreamsCommand(receiver)
.whenComplete(
(ok, error) -> {
if (error != null) {
LOG.warn("Failed to restart streams for member '{}'", receiver, error);
} else {
LOG.debug("Restarted streams for member '{}'", receiver);
}
});
sendRestartStreamsRequest(receiver)
.whenCompleteAsync(
(ok, error) -> onRestartStreamsResponse(receiver, error, completed, retryDelayMs),
actor);
} catch (final Exception e) {
LOG.warn("Failed to restart streams for member '{}'", receiver, e);
}
}

private CompletableFuture<Void> sendRestartStreamsCommand(final MemberId receiver) {
private CompletableFuture<Void> sendRestartStreamsRequest(final MemberId receiver) {
return transport
.send(
StreamTopics.RESTART_STREAMS.topic(),
EMPTY_PAYLOAD,
ArrayUtil.EMPTY_BYTE_ARRAY,
Function.identity(),
Function.identity(),
receiver,
REQUEST_TIMEOUT)
.thenApply(ok -> null);
}

private void onRestartStreamsResponse(
final MemberId receiver,
final Throwable error,
final CompletableFuture<Void> completed,
final long retryDelayMs) {
if (error == null) {
LOG.debug("Requested streams from client service member '{}'", receiver);
completed.complete(null);
return;
}

final var cause = error.getCause();
// it's possible that the member that was just added has since been removed in between
// retries; if this is the case, it'll be re-added eventually
if (cause instanceof final NoSuchMemberException e) {
LOG.trace(
"""
Failed to restart streams for member '{}', which has been removed from the
membership protocol; can be safely ignored.""",
receiver,
e);
completed.complete(null);
return;
}

// this error means the remote member is not handling requests of this type; either it's not
// a gateway, or it's still starting up or shutting down. in either case, restarting streams
// makes no sense
if (cause instanceof final NoRemoteHandler e) {
LOG.trace(
"""
Failed to restart streams for member '{}'; either it's not a client
stream service, or it's still starting up. Can be safely ignored.""",
receiver,
e);
completed.complete(null);
return;
}

// no point retrying if the remote handler failed to handle our request
if (cause instanceof final RemoteHandlerFailure e) {
LOG.warn(
"""
Failed to restart streams for member '{}'; unrecoverable error occurred on recipient
side, will not retry.""",
receiver,
e);
completed.completeExceptionally(e);
return;
}

LOG.warn(
"Failed to restart streams for member '{}', retrying in {}ms",
receiver,
retryDelayMs,
error);
final var nextRetryDelay = retryDelaySupplier.applyAsLong(retryDelayMs);
actor.schedule(
Duration.ofMillis(nextRetryDelay),
() -> sendRestartStreamsRequest(receiver, completed, nextRetryDelay));
}
}
Loading

0 comments on commit d77867a

Please sign in to comment.