Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ensures late onRequest consumer observes demand #3557

Merged
merged 3 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package reactor.core.publisher;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.LongConsumer;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
import org.openjdk.jcstress.annotations.JCStressTest;
import org.openjdk.jcstress.annotations.Outcome;
import org.openjdk.jcstress.annotations.State;
import org.openjdk.jcstress.infra.results.J_Result;
import org.reactivestreams.Subscription;

import static org.openjdk.jcstress.annotations.Expect.ACCEPTABLE;

public abstract class FluxCreateStressTest {

@JCStressTest
@Outcome(id = {"4"}, expect = ACCEPTABLE, desc = "demand delivered")
@State
public static class RequestAndOnRequestStressTest implements LongConsumer {

FluxSink<? super Integer> sink;

Subscription s;

volatile long observedDemand;
static final AtomicLongFieldUpdater<RequestAndOnRequestStressTest> OBSERVED_DEMAND
= AtomicLongFieldUpdater.newUpdater(RequestAndOnRequestStressTest.class, "observedDemand");

{
Flux.<Integer>create(sink -> this.sink = sink)
.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
RequestAndOnRequestStressTest.this.s = subscription;
}
});
}

@Override
public void accept(long value) {
Operators.addCap(OBSERVED_DEMAND, this, value);
}

@Actor
public void request() {
s.request(1);
s.request(1);
s.request(1);
s.request(1);
}

@Actor
public void setOnRequestConsumer() {
sink.onRequest(this);
}


@Arbiter
public void artiber(J_Result r) {
r.r1 = observedDemand;
}
}
}
110 changes: 87 additions & 23 deletions reactor-core/src/main/java/reactor/core/publisher/FluxCreate.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -268,7 +268,7 @@ void drainLoop() {

@Override
public FluxSink<T> onRequest(LongConsumer consumer) {
sink.onRequest(consumer, consumer, sink.requested);
sink.onPushPullRequest(consumer);
return this;
}

Expand Down Expand Up @@ -409,6 +409,8 @@ static abstract class BaseSink<T> extends AtomicBoolean
static final Disposable TERMINATED = OperatorDisposables.DISPOSED;
static final Disposable CANCELLED = Disposables.disposed();

static final LongConsumer NOOP_CONSUMER = n -> {};

final CoreSubscriber<? super T> actual;
final Context ctx;

Expand All @@ -434,6 +436,7 @@ static abstract class BaseSink<T> extends AtomicBoolean
BaseSink(CoreSubscriber<? super T> actual) {
this.actual = actual;
this.ctx = actual.currentContext();
REQUESTED.lazySet(this, Long.MIN_VALUE);
}

@Override
Expand Down Expand Up @@ -500,7 +503,7 @@ void disposeResource(boolean isCancel) {

@Override
public long requestedFromDownstream() {
return requested;
return requested & Long.MAX_VALUE;
}

void onCancel() {
Expand All @@ -519,12 +522,15 @@ final boolean isTerminated() {
@Override
public final void request(long n) {
if (Operators.validate(n)) {
Operators.addCap(REQUESTED, this, n);
long s = addCap(this, n);

LongConsumer consumer = requestConsumer;
if (n > 0 && consumer != null && !isCancelled()) {
consumer.accept(n);
if (hasRequestConsumer(s)) {
LongConsumer consumer = requestConsumer;
if (!isCancelled()) {
consumer.accept(n);
}
}

onRequestedFromDownstream();
}
}
Expand All @@ -541,20 +547,29 @@ public CoreSubscriber<? super T> actual() {
@Override
public FluxSink<T> onRequest(LongConsumer consumer) {
Objects.requireNonNull(consumer, "onRequest");
onRequest(consumer, n -> {
}, Long.MAX_VALUE);
onPushRequest(consumer);
return this;
}

protected void onRequest(LongConsumer initialRequestConsumer,
LongConsumer requestConsumer,
long value) {
protected void onPushRequest(LongConsumer initialRequestConsumer) {
if (!REQUEST_CONSUMER.compareAndSet(this, null, NOOP_CONSUMER)) {
throw new IllegalStateException(
"A consumer has already been assigned to consume requests");
}

// do not change real flag since real consumer is technically absent
initialRequestConsumer.accept(Long.MAX_VALUE);
}

protected void onPushPullRequest(LongConsumer requestConsumer) {
if (!REQUEST_CONSUMER.compareAndSet(this, null, requestConsumer)) {
throw new IllegalStateException(
"A consumer has already been assigned to consume requests");
}
else if (value > 0) {
initialRequestConsumer.accept(value);

long initialRequest = markRequestConsumerSet(this);
if (initialRequest > 0) {
requestConsumer.accept(initialRequest);
}
}

Expand Down Expand Up @@ -607,7 +622,7 @@ else if (c instanceof SinkDisposable) {
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return disposable == TERMINATED;
if (key == Attr.CANCELLED) return disposable == CANCELLED;
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requested;
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return requestedFromDownstream();
if (key == Attr.RUN_STYLE) return Attr.RunStyle.ASYNC;

return InnerProducer.super.scanUnsafe(key);
Expand All @@ -617,6 +632,54 @@ public Object scanUnsafe(Attr key) {
public String toString() {
return "FluxSink";
}

static <T> void produced(BaseSink<T> instance, long toSub) {
long s, r, u;
do {
s = instance.requested;
r = s & Long.MAX_VALUE;
if (r == 0 || r == Long.MAX_VALUE) {
return;
}
u = Operators.subOrZero(r, toSub);
} while (!REQUESTED.compareAndSet(instance, s, u | (s & Long.MIN_VALUE)));
}


static <T> long addCap(BaseSink<T> instance, long toAdd) {
long r, u, s;
for (;;) {
s = instance.requested;
r = s & Long.MAX_VALUE;
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
u = Operators.addCap(r, toAdd);
if (REQUESTED.compareAndSet(instance, s, u | (s & Long.MIN_VALUE))) {
return s;
}
}
}

static <T> long markRequestConsumerSet(BaseSink<T> instance) {
long u, s;
for (;;) {
s = instance.requested;

if (hasRequestConsumer(s)) {
return s;
}

u = s & Long.MAX_VALUE;
if (REQUESTED.compareAndSet(instance, s, u)) {
return u;
}
}
}

static boolean hasRequestConsumer(long requestedState) {
return (requestedState & Long.MIN_VALUE) == 0;
}
}

static final class IgnoreSink<T> extends BaseSink<T> {
Expand All @@ -639,8 +702,9 @@ public FluxSink<T> next(T t) {
actual.onNext(t);

for (; ; ) {
long r = requested;
if (r == 0L || REQUESTED.compareAndSet(this, r, r - 1)) {
long s = requested;
long r = s & Long.MAX_VALUE;
if (r == 0L || REQUESTED.compareAndSet(this, s, (r - 1) | (s & Long.MIN_VALUE))) {
return this;
}
}
Expand All @@ -665,9 +729,9 @@ public final FluxSink<T> next(T t) {
return this;
}

if (requested != 0) {
if (requestedFromDownstream() != 0) {
actual.onNext(t);
Operators.produced(REQUESTED, this, 1);
produced(this, 1);
}
else {
onOverflow();
Expand Down Expand Up @@ -776,7 +840,7 @@ void drain() {
final Queue<T> q = queue;

for (; ; ) {
long r = requested;
long r = requestedFromDownstream();
long e = 0L;

while (e != r) {
Expand Down Expand Up @@ -844,7 +908,7 @@ void drain() {
}

if (e != 0) {
Operators.produced(REQUESTED, this, e);
produced(this, e);
}

if (WIP.decrementAndGet(this) == 0) {
Expand Down Expand Up @@ -936,7 +1000,7 @@ void drain() {
final AtomicReference<T> q = queue;

for (; ; ) {
long r = requested;
long r = requestedFromDownstream();
long e = 0L;

while (e != r) {
Expand Down Expand Up @@ -1006,7 +1070,7 @@ void drain() {
}

if (e != 0) {
Operators.produced(REQUESTED, this, e);
produced(this, e);
}

if (WIP.decrementAndGet(this) == 0) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -108,6 +108,11 @@ default ContextView contextView() {
* or {@link Flux#create(java.util.function.Consumer, FluxSink.OverflowStrategy)},
* the consumer
* is invoked for every request to enable a hybrid backpressure-enabled push/pull model.
* <p>
* <strong>Note:</strong> in case of multiple {@link Subscription#request} happening
* concurrently to this method, the first consumer invocation may process
* accumulated demand instead of being called multiple times.
* <p>
* When bridging with asynchronous listener-based APIs, the {@code onRequest} callback
* may be used to request more data from source if required and to manage backpressure
* by delivering data to sink only when requests are pending.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2015-2022 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2015-2023 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,10 @@

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +30,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.provider.EnumSource;
import org.reactivestreams.Subscriber;
Expand All @@ -53,6 +57,36 @@

class FluxCreateTest {

@Test
//https://github.com/reactor/reactor-core/issues/1949
void ensuresConcurrentRequestAndSettingOnRequestAlwaysDeliversDemand() throws ExecutionException, InterruptedException {
AtomicReference<Subscription> sub = new AtomicReference<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
int i = 0;
int attempts = 100;
for (AtomicBoolean requested = new AtomicBoolean(true); requested.getAndSet(false) && i < attempts; ++i) {
CountDownLatch latch = new CountDownLatch(1);
FutureTask<Void> task = new FutureTask<>(() -> sub.get().request(1), null);
Flux.create(sink -> sink.onRequest(__ -> {
requested.set(true);
// onRequest can be delivered asynchronously after request(n) has
// returned, so latch coordinates successful request->onRequest
// completion.
latch.countDown();
}))
.subscribe(new BaseSubscriber<Object>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
sub.set(subscription);
executor.execute(task);
}
});
latch.await(100, TimeUnit.MILLISECONDS);
}
executor.shutdown();
Assertions.assertThat(i).as("Failed after %d attempts", i).isEqualTo(attempts);
}

@Test
void normalBuffered() {
AssertSubscriber<Integer> ts = AssertSubscriber.create();
Expand Down
Loading