Skip to content

Commit

Permalink
Seal CompletionStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
pivovarit committed Jan 28, 2024
1 parent 7cf5087 commit d2b8783
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
Expand Down
23 changes: 20 additions & 3 deletions src/main/java/com/pivovarit/collectors/CompletionStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,30 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

interface CompletionStrategy<T> extends Function<List<CompletableFuture<T>>, Stream<T>> {
sealed interface CompletionStrategy<T> extends Function<List<CompletableFuture<T>>, Stream<T>> permits CompletionStrategy.Unordered, CompletionStrategy.Ordered {

Unordered<?> UNORDERED = new Unordered<>();
Ordered<?> ORDERED = new Ordered<>();

static <R> CompletionStrategy<R> unordered() {
return futures -> StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false);
return (CompletionStrategy<R>) UNORDERED;
}

static <R> CompletionStrategy<R> ordered() {
return futures -> futures.stream().map(CompletableFuture::join);
return (CompletionStrategy<R>) ORDERED;
}

final class Unordered<T> implements CompletionStrategy<T> {
@Override
public Stream<T> apply(List<CompletableFuture<T>> futures) {
return StreamSupport.stream(new CompletionOrderSpliterator<>(futures), false);
}
}

final class Ordered<T> implements CompletionStrategy<T> {
@Override
public Stream<T> apply(List<CompletableFuture<T>> futures) {
return futures.stream().map(CompletableFuture::join);
}
}
}
16 changes: 6 additions & 10 deletions src/main/java/com/pivovarit/collectors/Dispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ private FutureTask<T> completionTask(Supplier<T> supplier, InterruptibleCompleta
if (limiter == null) {
future.complete(supplier.get());
} else {
withLimiter(supplier, future);
try {
limiter.acquire();
future.complete(supplier.get());
} finally {
limiter.release();
}
}
} catch (Throwable e) {
completionSignaller.completeExceptionally(e);
Expand All @@ -65,15 +70,6 @@ private FutureTask<T> completionTask(Supplier<T> supplier, InterruptibleCompleta
return task;
}

private void withLimiter(Supplier<T> supplier, InterruptibleCompletableFuture<T> future) throws InterruptedException {
try {
limiter.acquire();
future.complete(supplier.get());
} finally {
limiter.release();
}
}

private static <T> BiConsumer<T, Throwable> shortcircuit(InterruptibleCompletableFuture<?> future) {
return (__, throwable) -> {
if (throwable != null) {
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/com/pivovarit/collectors/FutureCollectors.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,11 @@
final class FutureCollectors {
static <T, R> Collector<CompletableFuture<T>, ?, CompletableFuture<R>> toFuture(Collector<T, ?, R> collector) {
return Collectors.collectingAndThen(toList(), list -> {
CompletableFuture<R> future = CompletableFuture
.allOf(list.toArray(new CompletableFuture[0]))
var future = CompletableFuture.allOf(list.toArray(CompletableFuture[]::new))
.thenApply(__ -> list.stream()
.map(CompletableFuture::join)
.collect(collector));

// CompletableFuture#allOf doesn't shortcircuit on exception so that requires manual handling
for (CompletableFuture<T> f : list) {
f.whenComplete((t, throwable) -> {
if (throwable != null) {
Expand Down

0 comments on commit d2b8783

Please sign in to comment.