From 4ebf7cfc31fb476e20eee05e19dfc89e51237c05 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 6 Dec 2022 00:04:51 +0100 Subject: [PATCH 1/2] Improved locking for HttpReceiver.ContentSource. Improved response failure code path. Now either responseFailure() must be called, or exchange.responseComplete() followed by HttpReceiver.abort(). Fixed failAndClose() for HTTP/2 and HTTP/3: the connection must not be closed, stream.reset() is sufficient. Fixed flaky test HttpClientDemandTest.testTwoListenersWithDifferentDemand(). Fixed DistributionTests.testVirtualThreadPool(). Signed-off-by: Simone Bordet --- .../eclipse/jetty/client/HttpExchange.java | 2 +- .../eclipse/jetty/client/HttpReceiver.java | 199 ++++++++++-------- .../internal/HttpReceiverOverHTTP2.java | 14 +- .../internal/HttpReceiverOverHTTP3.java | 13 +- .../transport/HttpClientDemandTest.java | 10 +- .../tests/distribution/DistributionTests.java | 2 +- 6 files changed, 127 insertions(+), 113 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index b91652fe5178..0106dbb4df8c 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -244,7 +244,7 @@ public void abort(Throwable failure, Promise promise) } if (LOG.isDebugEnabled()) - LOG.debug("Failed {}: req={}/rsp={} {}", this, abortRequest, abortResponse, failure); + LOG.debug("Failed {}: req={}/rsp={}", this, abortRequest, abortResponse, failure); if (!abortRequest && !abortResponse) { diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index a331eb813710..3de9188c543a 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -31,6 +31,7 @@ import org.eclipse.jetty.io.content.ContentSourceTransformer; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.SerializedInvoker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -266,6 +267,9 @@ protected void responseHeaders(HttpExchange exchange) List responseListeners = exchange.getConversation().getResponseListeners(); notifier.notifyHeaders(responseListeners, response); + if (exchange.isResponseComplete()) + return; + if (HttpStatus.isInterim(response.getStatus())) { if (LOG.isDebugEnabled()) @@ -338,24 +342,18 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) if (LOG.isDebugEnabled()) LOG.debug("Invoking responseSuccess on {}", this); - NotifiableContentSource contentSource = this.contentSource; - if (contentSource != null) - { - this.contentSource = null; - contentSource.eof(); - } + // Mark atomically the response as completed, with respect + // to concurrency between response success and response failure. + if (!exchange.responseComplete(null)) + return; invoker.run(() -> { if (LOG.isDebugEnabled()) LOG.debug("Executing responseSuccess on {}", this); - // Mark atomically the response as completed, with respect - // to concurrency between response success and response failure. - if (!exchange.responseComplete(null)) - return; - responseState = ResponseState.IDLE; + reset(); HttpResponse response = exchange.getResponse(); @@ -385,34 +383,26 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) protected void responseFailure(Throwable failure, Promise promise) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking responseFailure with {} on {}", failure, this); - - invoker.run(() -> + LOG.debug("Failing with {} on {}", failure, this); + + HttpExchange exchange = getHttpExchange(); + // In case of a response error, the failure has already been notified + // and it is possible that a further attempt to read in the receive + // loop throws an exception that reenters here but without exchange; + // or, the server could just have timed out the connection. + if (exchange == null) { - if (LOG.isDebugEnabled()) - LOG.debug("Executing responseFailure on {}", this); - - HttpExchange exchange = getHttpExchange(); - // In case of a response error, the failure has already been notified - // and it is possible that a further attempt to read in the receive - // loop throws an exception that reenters here but without exchange; - // or, the server could just have timed out the connection. - if (exchange == null) - { - promise.succeeded(false); - return; - } - - if (LOG.isDebugEnabled()) - LOG.debug("Response failure {}", exchange.getResponse(), failure); + promise.succeeded(false); + return; + } - // Mark atomically the response as completed, with respect - // to concurrency between response success and response failure. - if (exchange.responseComplete(failure)) - abort(exchange, failure, promise); - else - promise.succeeded(false); - }); + // Mark atomically the response as completed, with respect + // to concurrency between response success and response failure. + boolean completed = exchange.responseComplete(failure); + if (completed) + abort(exchange, failure, promise); + else + promise.succeeded(false); } private void terminateResponse(HttpExchange exchange) @@ -482,10 +472,12 @@ public void abort(HttpExchange exchange, Throwable failure, Promise pro if (LOG.isDebugEnabled()) LOG.debug("Invoking abort with {} on {}", failure, this); + assert exchange.isResponseComplete(); + invoker.run(() -> { if (LOG.isDebugEnabled()) - LOG.debug("Executing abort on {}", this); + LOG.debug("Executing abort with {} on {}", failure, this); if (responseState == ResponseState.FAILURE) { @@ -496,7 +488,7 @@ public void abort(HttpExchange exchange, Throwable failure, Promise pro responseState = ResponseState.FAILURE; this.failure = failure; if (contentSource != null) - contentSource.fail(failure); + contentSource.error(failure); dispose(); HttpResponse response = exchange.getResponse(); @@ -557,7 +549,7 @@ private enum ResponseState private interface NotifiableContentSource extends Content.Source { - void eof(); + boolean error(Throwable failure); void onDataAvailable(); } @@ -577,12 +569,6 @@ public DecodingContentSource(NotifiableContentSource rawSource, ContentDecoder d _decoder = decoder; } - @Override - public void eof() - { - _rawSource.eof(); - } - @Override public void onDataAvailable() { @@ -643,6 +629,15 @@ protected Content.Chunk transform(Content.Chunk inputChunk) } } } + + @Override + public boolean error(Throwable failure) + { + if (_chunk != null) + _chunk.release(); + _chunk = null; + return _rawSource.error(failure); + } } /** @@ -654,42 +649,36 @@ private class ContentSource implements NotifiableContentSource private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class); private final AtomicReference demandCallbackRef = new AtomicReference<>(); - private volatile Content.Chunk currentChunk; + private final AutoLock lock = new AutoLock(); + private Content.Chunk currentChunk; @Override public Content.Chunk read() { if (LOG.isDebugEnabled()) LOG.debug("Reading from {}", this); - Content.Chunk chunk = consumeCurrentChunk(); - if (chunk != null) - return chunk; - currentChunk = HttpReceiver.this.read(false); - return consumeCurrentChunk(); - } - @Override - public void eof() - { - if (LOG.isDebugEnabled()) - LOG.debug("Setting EOF on {}", this); - if (currentChunk != null) - throw new IllegalStateException(); - currentChunk = Content.Chunk.EOF; + Content.Chunk current; + try (AutoLock ignored = lock.lock()) + { + current = currentChunk; + currentChunk = Content.Chunk.next(current); + if (current != null) + return current; + } - Runnable demandCallback = demandCallbackRef.getAndSet(null); - if (LOG.isDebugEnabled()) - LOG.debug("Calling demand callback on {}", this); - if (demandCallback != null) + current = HttpReceiver.this.read(false); + + try (AutoLock ignored = lock.lock()) { - try - { - demandCallback.run(); - } - catch (Throwable x) + if (currentChunk != null) { - fail(x); + // There was a concurrent call to fail(). + current.release(); + return currentChunk; } + currentChunk = Content.Chunk.next(current); + return current; } } @@ -703,15 +692,6 @@ public void onDataAvailable() invokeDemandCallback(true); } - private Content.Chunk consumeCurrentChunk() - { - if (LOG.isDebugEnabled()) - LOG.debug("Consuming current chunk from {}", this); - Content.Chunk chunk = currentChunk; - currentChunk = Content.Chunk.next(chunk); - return chunk; - } - @Override public void demand(Runnable demandCallback) { @@ -731,12 +711,30 @@ private void processDemand() if (LOG.isDebugEnabled()) LOG.debug("Processing demand on {}", this); - if (currentChunk == null) + Content.Chunk current; + try (AutoLock ignored = lock.lock()) + { + current = currentChunk; + } + + if (current == null) { - currentChunk = HttpReceiver.this.read(true); - if (currentChunk == null) + current = HttpReceiver.this.read(true); + if (current == null) return; + + try (AutoLock ignored = lock.lock()) + { + if (currentChunk != null) + { + // There was a concurrent call to fail(). + current.release(); + return; + } + currentChunk = current; + } } + // The processDemand method is only ever called by the // invoker so there is no need to use the latter here. invokeDemandCallback(false); @@ -765,20 +763,43 @@ private void invokeDemandCallback(boolean invoke) @Override public void fail(Throwable failure) + { + boolean failed = error(failure); + if (failed) + HttpReceiver.this.failAndClose(failure); + invokeDemandCallback(true); + } + + @Override + public boolean error(Throwable failure) { if (LOG.isDebugEnabled()) LOG.debug("Failing {}", this); - if (currentChunk != null) - currentChunk.release(); - if (currentChunk == null || !(currentChunk instanceof Content.Chunk.Error)) - HttpReceiver.this.failAndClose(failure); - currentChunk = Content.Chunk.from(failure); + + try (AutoLock ignored = lock.lock()) + { + if (currentChunk instanceof Content.Chunk.Error) + return false; + if (currentChunk != null) + currentChunk.release(); + currentChunk = Content.Chunk.from(failure); + } + + return true; + } + + private Content.Chunk chunk() + { + try (AutoLock ignored = lock.lock()) + { + return currentChunk; + } } @Override public String toString() { - return String.format("%s@%x{c=%s,d=%s}", getClass().getSimpleName(), hashCode(), currentChunk, demandCallbackRef); + return String.format("%s@%x{c=%s,d=%s}", getClass().getSimpleName(), hashCode(), chunk(), demandCallbackRef); } } } diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java index 0ca4f18051e2..9b343cbe3af3 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java @@ -92,13 +92,9 @@ public void failAndClose(Throwable failure) Stream stream = getHttpChannel().getStream(); responseFailure(failure, Promise.from(failed -> { - stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); - getHttpChannel().getHttpConnection().close(failure); - }, x -> - { - stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); - getHttpChannel().getHttpConnection().close(failure); - })); + if (failed) + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); + }, x -> stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP))); } @Override @@ -127,15 +123,11 @@ private void onResponse(Stream stream, HeadersFrame frame) httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); responseBegin(exchange); - if (exchange.isResponseComplete()) - return; HttpFields headers = response.getFields(); for (HttpField header : headers) { responseHeader(exchange, header); - if (exchange.isResponseComplete()) - return; } HttpRequest httpRequest = exchange.getRequest(); diff --git a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java index 3fecd5f98a61..e2d431a60c15 100644 --- a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java @@ -23,6 +23,7 @@ import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http3.internal.HTTP3ErrorCode; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.util.Promise; import org.slf4j.Logger; @@ -73,8 +74,12 @@ public Content.Chunk read(boolean fillInterestIfNeeded) @Override public void failAndClose(Throwable failure) { - responseFailure(failure, Promise.from(failed -> getHttpChannel().getHttpConnection().close(failure), - x -> getHttpChannel().getHttpConnection().close(failure))); + Stream stream = getHttpChannel().getStream(); + responseFailure(failure, Promise.from(failed -> + { + if (failed) + stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), failure); + }, x -> stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), failure))); } @Override @@ -95,15 +100,11 @@ public void onResponse(Stream.Client stream, HeadersFrame frame) httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); responseBegin(exchange); - if (exchange.isResponseComplete()) - return; HttpFields headers = response.getFields(); for (HttpField header : headers) { responseHeader(exchange, header); - if (exchange.isResponseComplete()) - return; } // TODO: add support for HttpMethod.CONNECT. diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java index 6c801f0b5d5b..2b8180690058 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java @@ -335,22 +335,22 @@ public void process(Request request, org.eclipse.jetty.server.Response response, resultLatch.countDown(); }); - await().atMost(5, TimeUnit.SECONDS).until(listener1DemandRef::get, not(nullValue())); - await().atMost(5, TimeUnit.SECONDS).until(listener2DemandRef::get, not(nullValue())); - // Make both listeners progress in locksteps. int i = 0; while (resultLatch.getCount() > 0) { i++; + await().atMost(5, TimeUnit.SECONDS).until(listener1DemandRef::get, not(nullValue())); + await().atMost(5, TimeUnit.SECONDS).until(listener2DemandRef::get, not(nullValue())); + // Assert that no listener can progress for as long as both listeners did not demand. assertThat(listener1Chunks.get(), is(i)); assertThat(listener2Chunks.get(), is(i)); - listener2DemandRef.get().accept(1); + listener2DemandRef.getAndSet(null).accept(1); assertThat(listener1Chunks.get(), is(i)); assertThat(listener2Chunks.get(), is(i)); - listener1DemandRef.get().accept(1); + listener1DemandRef.getAndSet(null).accept(1); } assertTrue(resultLatch.await(5, TimeUnit.SECONDS)); diff --git a/tests/test-distribution/test-distribution-common/src/test/java/org/eclipse/jetty/tests/distribution/DistributionTests.java b/tests/test-distribution/test-distribution-common/src/test/java/org/eclipse/jetty/tests/distribution/DistributionTests.java index 7a59e532ce44..db626d15ad1b 100644 --- a/tests/test-distribution/test-distribution-common/src/test/java/org/eclipse/jetty/tests/distribution/DistributionTests.java +++ b/tests/test-distribution/test-distribution-common/src/test/java/org/eclipse/jetty/tests/distribution/DistributionTests.java @@ -1191,7 +1191,7 @@ public void testVirtualThreadPool() throws Exception int httpPort = distribution.freePort(); try (JettyHomeTester.Run run2 = distribution.start(List.of("jetty.http.selectors=1", "jetty.http.port=" + httpPort))) { - assertTrue(run2.awaitConsoleLogsFor("Started Server@", 10, TimeUnit.SECONDS)); + assertTrue(run2.awaitConsoleLogsFor("Started oejs.Server@", 10, TimeUnit.SECONDS)); startHttpClient(); ContentResponse response = client.newRequest("localhost", httpPort) From 0b6d6db40ae71b68b1adb23826820474f0a0a522 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 6 Dec 2022 14:51:54 +0100 Subject: [PATCH 2/2] Updates after review. Signed-off-by: Simone Bordet --- .../java/org/eclipse/jetty/client/HttpReceiver.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 3de9188c543a..588734af83c0 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -472,7 +472,9 @@ public void abort(HttpExchange exchange, Throwable failure, Promise pro if (LOG.isDebugEnabled()) LOG.debug("Invoking abort with {} on {}", failure, this); - assert exchange.isResponseComplete(); + // This method should be called only after calling HttpExchange.responseComplete(). + if (!exchange.isResponseComplete()) + throw new IllegalStateException(); invoker.run(() -> { @@ -674,7 +676,8 @@ public Content.Chunk read() if (currentChunk != null) { // There was a concurrent call to fail(). - current.release(); + if (current != null) + current.release(); return currentChunk; } currentChunk = Content.Chunk.next(current); @@ -764,6 +767,8 @@ private void invokeDemandCallback(boolean invoke) @Override public void fail(Throwable failure) { + if (LOG.isDebugEnabled()) + LOG.debug("Failing {}", this); boolean failed = error(failure); if (failed) HttpReceiver.this.failAndClose(failure); @@ -774,8 +779,7 @@ public void fail(Throwable failure) public boolean error(Throwable failure) { if (LOG.isDebugEnabled()) - LOG.debug("Failing {}", this); - + LOG.debug("Erroring {}", this); try (AutoLock ignored = lock.lock()) { if (currentChunk instanceof Content.Chunk.Error) @@ -784,7 +788,6 @@ public boolean error(Throwable failure) currentChunk.release(); currentChunk = Content.Chunk.from(failure); } - return true; }