diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
index 6d91bc9f237439..22134fa44a79eb 100644
--- a/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/BUILD
@@ -19,7 +19,6 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/actions:execution_requirements",
"//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info",
"//src/main/java/com/google/devtools/build/lib/authandtls",
- "//src/main/java/com/google/devtools/build/lib/concurrent",
"//src/main/java/com/google/devtools/build/lib/remote:ExecutionStatusException",
"//src/main/java/com/google/devtools/build/lib/remote/common",
"//src/main/java/com/google/devtools/build/lib/remote/options",
@@ -28,6 +27,7 @@ java_library(
"//src/main/protobuf:failure_details_java_proto",
"//third_party:guava",
"//third_party:jsr305",
+ "//third_party:rxjava3",
"//third_party/grpc:grpc-jar",
"//third_party/protobuf:protobuf_java",
"//third_party/protobuf:protobuf_java_util",
diff --git a/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java
new file mode 100644
index 00000000000000..7eef8807b964d4
--- /dev/null
+++ b/src/main/java/com/google/devtools/build/lib/remote/util/RxFutures.java
@@ -0,0 +1,177 @@
+// Copyright 2021 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote.util;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.util.concurrent.AbstractFuture;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.reactivex.rxjava3.annotations.NonNull;
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.CompletableEmitter;
+import io.reactivex.rxjava3.core.CompletableObserver;
+import io.reactivex.rxjava3.core.CompletableOnSubscribe;
+import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+
+/** Methods for interoperating between Rx and ListenableFuture. */
+public class RxFutures {
+
+ private RxFutures() {}
+
+ /**
+ * Returns a {@link Completable} that is complete once the supplied {@link ListenableFuture} has
+ * completed.
+ *
+ *
A {@link ListenableFuture>} represents some computation that is already in progress. We use
+ * {@link Callable} here to defer the execution of the thing that produces ListenableFuture until
+ * there is subscriber.
+ *
+ *
Errors are also propagated except for certain "fatal" exceptions defined by rxjava. Multiple
+ * subscriptions are not allowed.
+ *
+ *
Disposes the Completable to cancel the underlying ListenableFuture.
+ */
+ public static Completable toCompletable(
+ Callable> callable, Executor executor) {
+ return Completable.create(new OnceCompletableOnSubscribe(callable, executor));
+ }
+
+ private static class OnceCompletableOnSubscribe implements CompletableOnSubscribe {
+ private final AtomicBoolean subscribed = new AtomicBoolean(false);
+
+ private final Callable> callable;
+ private final Executor executor;
+
+ private OnceCompletableOnSubscribe(
+ Callable> callable, Executor executor) {
+ this.callable = callable;
+ this.executor = executor;
+ }
+
+ @Override
+ public void subscribe(@NonNull CompletableEmitter emitter) throws Throwable {
+ try {
+ checkState(!subscribed.getAndSet(true), "This completable cannot be subscribed to twice");
+ ListenableFuture future = callable.call();
+ Futures.addCallback(
+ future,
+ new FutureCallback() {
+ @Override
+ public void onSuccess(@Nullable Void t) {
+ emitter.onComplete();
+ }
+
+ @Override
+ public void onFailure(Throwable throwable) {
+ /*
+ * CancellationException can be thrown in two cases:
+ * 1. The ListenableFuture itself is cancelled.
+ * 2. Completable is disposed by downstream.
+ *
+ * This check is used to prevent propagating CancellationException to downstream
+ * when it has already disposed the Completable.
+ */
+ if (throwable instanceof CancellationException && emitter.isDisposed()) {
+ return;
+ }
+
+ emitter.onError(throwable);
+ }
+ },
+ executor);
+ emitter.setCancellable(() -> future.cancel(true));
+ } catch (Throwable t) {
+ // We failed to construct and listen to the LF. Following RxJava's own behaviour, prefer
+ // to pass RuntimeExceptions and Errors down to the subscriber except for certain
+ // "fatal" exceptions.
+ Exceptions.throwIfFatal(t);
+ executor.execute(() -> emitter.onError(t));
+ }
+ }
+ }
+
+ /**
+ * Returns a {@link ListenableFuture} that is complete once the {@link Completable} has completed.
+ *
+ * Errors are also propagated. If the {@link ListenableFuture} is canceled, the subscription to
+ * the {@link Completable} will automatically be cancelled.
+ */
+ public static ListenableFuture toListenableFuture(Completable completable) {
+ CompletableFuture future = new CompletableFuture();
+ completable.subscribe(
+ new CompletableObserver() {
+ @Override
+ public void onSubscribe(Disposable d) {
+ future.setCancelCallback(d);
+ }
+
+ @Override
+ public void onComplete() {
+ // Making the Completable as complete.
+ future.set(null);
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ future.setException(e);
+ }
+ });
+ return future;
+ }
+
+ private static final class CompletableFuture extends AbstractFuture {
+ private final AtomicReference cancelCallback = new AtomicReference<>();
+
+ private void setCancelCallback(Disposable cancelCallback) {
+ this.cancelCallback.set(cancelCallback);
+ // Just in case it was already canceled before we set the callback.
+ doCancelIfCancelled();
+ }
+
+ private void doCancelIfCancelled() {
+ if (isCancelled()) {
+ Disposable callback = cancelCallback.getAndSet(null);
+ if (callback != null) {
+ callback.dispose();
+ }
+ }
+ }
+
+ @Override
+ protected void afterDone() {
+ doCancelIfCancelled();
+ }
+
+ // Allow set to be called by other members.
+ @Override
+ protected boolean set(@Nullable Void t) {
+ return super.set(t);
+ }
+
+ // Allow setException to be called by other members.
+ @Override
+ protected boolean setException(Throwable throwable) {
+ return super.setException(throwable);
+ }
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/BUILD b/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
index f31e6381026b9d..8871a241a0b01c 100644
--- a/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/BUILD
@@ -23,11 +23,15 @@ java_library(
"//src/main/java/com/google/devtools/build/lib/exec:spawn_runner",
"//src/main/java/com/google/devtools/build/lib/remote",
"//src/main/java/com/google/devtools/build/lib/remote/common",
+ "//src/main/java/com/google/devtools/build/lib/remote/util",
"//src/main/java/com/google/devtools/build/lib/util/io",
"//src/main/java/com/google/devtools/build/lib/vfs",
"//src/main/java/com/google/devtools/build/lib/vfs:pathfragment",
"//src/test/java/com/google/devtools/build/lib/actions/util",
"//third_party:guava",
+ "//third_party:junit4",
+ "//third_party:rxjava3",
+ "//third_party:truth",
"//third_party/protobuf:protobuf_java",
"@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto",
],
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/RxFuturesTest.java b/src/test/java/com/google/devtools/build/lib/remote/util/RxFuturesTest.java
new file mode 100644
index 00000000000000..e38755aeb11a18
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/RxFuturesTest.java
@@ -0,0 +1,230 @@
+// Copyright 2021 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote.util;
+
+import static com.google.common.truth.Truth.assertThat;
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+import io.reactivex.rxjava3.core.Completable;
+import io.reactivex.rxjava3.core.CompletableEmitter;
+import io.reactivex.rxjava3.observers.TestObserver;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.annotation.Nullable;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link RxFutures}. */
+@RunWith(JUnit4.class)
+public class RxFuturesTest {
+ @Rule public final RxNoGlobalErrorsRule rxNoGlobalErrorsRule = new RxNoGlobalErrorsRule();
+
+ @Test
+ public void toCompletable_noSubscription_noExecution() {
+ SettableFuture future = SettableFuture.create();
+ AtomicBoolean executed = new AtomicBoolean(false);
+
+ RxFutures.toCompletable(
+ () -> {
+ executed.set(true);
+ return future;
+ },
+ MoreExecutors.directExecutor());
+
+ assertThat(executed.get()).isFalse();
+ }
+
+ @Test
+ public void toCompletable_futureOnSuccess_completableOnComplete() {
+ SettableFuture future = SettableFuture.create();
+ Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor());
+
+ TestObserver observer = completable.test();
+ observer.assertEmpty();
+ future.set(null);
+
+ observer.assertComplete();
+ }
+
+ @Test
+ public void toCompletable_futureOnError_completableOnError() {
+ SettableFuture future = SettableFuture.create();
+ Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor());
+
+ TestObserver observer = completable.test();
+ observer.assertEmpty();
+ Throwable error = new IllegalStateException("error");
+ future.setException(error);
+
+ observer.assertError(error);
+ }
+
+ @Test
+ public void toCompletable_futureOnSuccessBeforeSubscription_completableOnComplete() {
+ SettableFuture future = SettableFuture.create();
+ Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor());
+
+ future.set(null);
+ TestObserver observer = completable.test();
+
+ observer.assertComplete();
+ }
+
+ @Test
+ public void toCompletable_futureOnErrorBeforeSubscription_completableOnError() {
+ SettableFuture future = SettableFuture.create();
+ Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor());
+
+ Throwable error = new IllegalStateException("error");
+ future.setException(error);
+ TestObserver observer = completable.test();
+
+ observer.assertError(error);
+ }
+
+ @Test
+ public void toCompletable_futureCancelledBeforeSubscription_completableOnError() {
+ SettableFuture future = SettableFuture.create();
+ Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor());
+
+ future.cancel(true);
+ TestObserver observer = completable.test();
+
+ observer.assertError(CancellationException.class);
+ }
+
+ @Test
+ public void toCompletable_disposeCompletable_cancelFuture() {
+ SettableFuture future = SettableFuture.create();
+ Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor());
+
+ TestObserver observer = completable.test();
+ observer.assertEmpty();
+ observer.dispose();
+
+ assertThat(future.isCancelled()).isTrue();
+ }
+
+ @Test
+ public void toCompletable_multipleSubscriptions_error() {
+ ListenableFuture future = immediateVoidFuture();
+ Completable completable = RxFutures.toCompletable(() -> future, MoreExecutors.directExecutor());
+ completable.test().assertComplete();
+
+ TestObserver observer = completable.test();
+
+ observer.assertError(IllegalStateException.class);
+ }
+
+ @Test
+ public void toListenableFutureFromCompletable_noEvents_waiting() {
+ CompletableToListenableFutureSetup setup = CompletableToListenableFutureSetup.create();
+
+ assertThat(setup.getEmitter()).isNotNull();
+ assertThat(setup.getFuture().isDone()).isFalse();
+ assertThat(setup.getFuture().isCancelled()).isFalse();
+ }
+
+ @Test
+ public void toListenableFutureFromCompletable_completableOnComplete_futureOnSuccess() {
+ CompletableToListenableFutureSetup setup = CompletableToListenableFutureSetup.create();
+
+ setup.getEmitter().onComplete();
+
+ assertThat(setup.isSuccess()).isTrue();
+ assertThat(setup.getFailure()).isNull();
+ }
+
+ @Test
+ public void toListenableFutureFromCompletable_completableOnError_futureOnFailure() {
+ CompletableToListenableFutureSetup setup = CompletableToListenableFutureSetup.create();
+
+ Throwable error = new IllegalStateException("error");
+ setup.getEmitter().onError(error);
+
+ assertThat(setup.isSuccess()).isFalse();
+ assertThat(setup.getFailure()).isEqualTo(error);
+ }
+
+ @Test
+ public void toListenableFutureFromCompletable_cancelled() {
+ CompletableToListenableFutureSetup setup = CompletableToListenableFutureSetup.create();
+
+ setup.getFuture().cancel(true);
+
+ assertThat(setup.isSuccess()).isFalse();
+ assertThat(setup.getFailure()).isInstanceOf(CancellationException.class);
+ assertThat(setup.isDisposed()).isTrue();
+ }
+
+ private static class CompletableToListenableFutureSetup {
+ public static CompletableToListenableFutureSetup create() {
+ return new CompletableToListenableFutureSetup();
+ }
+
+ private final ListenableFuture future;
+
+ private CompletableEmitter emitter;
+ private boolean disposed;
+ private boolean success;
+ private Throwable failure;
+
+ CompletableToListenableFutureSetup() {
+ Completable completable =
+ Completable.create(emitter -> this.emitter = emitter).doOnDispose(() -> disposed = true);
+ future = RxFutures.toListenableFuture(completable);
+ Futures.addCallback(
+ future,
+ new FutureCallback() {
+ @Override
+ public void onSuccess(@Nullable Void result) {
+ success = true;
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ failure = t;
+ }
+ },
+ MoreExecutors.directExecutor());
+ }
+
+ public CompletableEmitter getEmitter() {
+ return emitter;
+ }
+
+ public ListenableFuture getFuture() {
+ return future;
+ }
+
+ public boolean isDisposed() {
+ return disposed;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public Throwable getFailure() {
+ return failure;
+ }
+ }
+}
diff --git a/src/test/java/com/google/devtools/build/lib/remote/util/RxNoGlobalErrorsRule.java b/src/test/java/com/google/devtools/build/lib/remote/util/RxNoGlobalErrorsRule.java
new file mode 100644
index 00000000000000..ad3d4c3d45e335
--- /dev/null
+++ b/src/test/java/com/google/devtools/build/lib/remote/util/RxNoGlobalErrorsRule.java
@@ -0,0 +1,69 @@
+// Copyright 2021 The Bazel Authors. All rights reserved.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+package com.google.devtools.build.lib.remote.util;
+
+import io.reactivex.rxjava3.exceptions.CompositeException;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.junit.rules.ExternalResource;
+
+/**
+ * A JUnit {@link org.junit.Rule} that captures uncaught errors from RxJava streams and rethrows
+ * them post-test if left unaddressed.
+ *
+ * This is to prevent false-positives caused by RxJava's default uncaught error handler, which
+ * manually forwards the event to the current Thread's exception handler and bypasses JUnit's
+ * failure reporting.
+ *
+ *
Can also be used to assert that no uncaught errors have yet been thrown mid-test. This is
+ * useful to ensure tests are in a consistent state before continuing.
+ */
+public class RxNoGlobalErrorsRule extends ExternalResource {
+ private final List errors = new CopyOnWriteArrayList<>();
+
+ @Override
+ protected void before() {
+ RxJavaPlugins.setErrorHandler(errors::add);
+ }
+
+ @Override
+ protected void after() {
+ assertNoErrors();
+ }
+
+ private static final class UncaughtRxErrors extends RuntimeException {
+ private UncaughtRxErrors(Throwable cause) {
+ super("There were uncaught Rx errors during test execution", cause);
+ }
+ }
+
+ /**
+ * Asserts that no uncaught errors have yet occurred.
+ *
+ * If an Rx stream has thrown an uncaught error any time before this method is called, an
+ * {@link UncaughtRxErrors} is thrown. This is useful for ensuring that tests are in a consistent
+ * state before continuing.
+ *
+ *
You may need to advance any test schedulers so that any pending events are flushed.
+ */
+ private void assertNoErrors() {
+ if (errors.size() > 1) {
+ Throwable[] errorsArray = errors.toArray(new Throwable[0]);
+ throw new UncaughtRxErrors(new CompositeException(errorsArray));
+ } else if (errors.size() == 1) {
+ throw new UncaughtRxErrors(errors.get(0));
+ }
+ }
+}