Skip to content

Commit

Permalink
Merge #3914 into 3.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
chemicL committed Oct 31, 2024
2 parents 8224e8c + c2a0402 commit 6692812
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 7 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,8 +24,6 @@
import org.reactivestreams.Subscription;

import reactor.core.CoreSubscriber;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

Expand Down Expand Up @@ -269,8 +267,12 @@ final void remove(CacheMonoSubscriber<T> toRemove) {

if (n == 1) {
if (SUBSCRIBERS.compareAndSet(this, a, COORDINATOR_DONE)) {
//cancel the subscription no matter what, at this point coordinator is done and cannot accept new subscribers
this.upstream.cancel();
// Cancel the subscription if it has been established.
// At this point coordinator is done and cannot accept new subscribers.
Subscription upstream = UPSTREAM.getAndSet(this, Operators.cancelledSubscription());
if (upstream != null) {
upstream.cancel();
}
//only switch to EMPTY_STATE if the current state is this coordinator
STATE.compareAndSet(this.main, this, EMPTY_STATE);
return;
Expand All @@ -294,7 +296,12 @@ void delayedSubscribe() {
if (old != null && old != Operators.cancelledSubscription()) {
old.cancel();
}
source.subscribe(this);
// Only trigger the upstream subscription if the coordinator has not been
// aborted. This can happen when meanwhile all the downstream Subscribers
// have been cancelled.
if (SUBSCRIBERS.get(this) != COORDINATOR_DONE) {
source.subscribe(this);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,11 +17,15 @@
package reactor.core.publisher;

import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;

import reactor.core.Disposable;
import reactor.core.scheduler.Schedulers;
import reactor.test.MemoryUtils;
import reactor.test.StepVerifier;
import reactor.test.publisher.TestPublisher;
Expand Down Expand Up @@ -153,4 +157,57 @@ void cancellingAllSubscribersBeforeOnNextInvalidates() {
sub3.dispose();
source.assertCancelled(1);
}

// See https://github.com/reactor/reactor-core/issues/3907
@Test
public void cancelBeforeUpstreamSubscribeDoesntFail() throws Exception {
AtomicReference<Throwable> error = new AtomicReference<>();
AtomicInteger upstreamCalled = new AtomicInteger();

CountDownLatch cancelled = new CountDownLatch(1);
CountDownLatch cacheInvalidateInnerSubscribed = new CountDownLatch(1);
CountDownLatch mainChainDone = new CountDownLatch(1);

Mono<String> cachedMono = Mono.fromSupplier(() -> {
upstreamCalled.incrementAndGet();
return "foobar";
})
.cacheInvalidateIf(c -> true);

Mono<String> mono = Mono.defer(() -> Mono.just("foo"))
.flatMap(x -> {
return cachedMono
.doOnSubscribe(s -> {
cacheInvalidateInnerSubscribed.countDown();
try { cancelled.await(1, TimeUnit.SECONDS); } catch (Exception e) {}
})
.subscribeOn(Schedulers.boundedElastic());
})
.doOnError(error::set)
.doFinally(s -> mainChainDone.countDown());

Disposable d = mono.subscribe();

assertThat(cacheInvalidateInnerSubscribed.await(1, TimeUnit.SECONDS))
.as("Should assemble the chain")
.isTrue();

d.dispose();
cancelled.countDown();


assertThat(mainChainDone.await(1, TimeUnit.SECONDS))
.as("Should finish in time")
.isTrue();

// Force an actual upstream subscription to validate the count
cachedMono.block();

assertThat(upstreamCalled.get())
.as("No upstream subscription expected")
.isEqualTo(1);
assertThat(error.get())
.as("No errors expected")
.isNull();
}
}

0 comments on commit 6692812

Please sign in to comment.