diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java index 45451fa2ead9..ce6648ef2113 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClientTransport.java @@ -19,6 +19,7 @@ import org.eclipse.jetty.client.transport.HttpDestination; import org.eclipse.jetty.io.ClientConnectionFactory; +import org.eclipse.jetty.util.thread.Invocable; /** * {@link HttpClientTransport} represents what transport implementations should provide @@ -100,4 +101,9 @@ public default void connect(SocketAddress address, Map context) * @param factory the factory for ConnectionPool instances */ public void setConnectionPoolFactory(ConnectionPool.Factory factory); + + public default Invocable.InvocationType getInvocationType(Connection connection) + { + return Invocable.InvocationType.BLOCKING; + } } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/ProxyProtocolClientConnectionFactory.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/ProxyProtocolClientConnectionFactory.java index 4eef6cfdb796..e4ba9bfcc233 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/ProxyProtocolClientConnectionFactory.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/ProxyProtocolClientConnectionFactory.java @@ -508,12 +508,6 @@ public void failed(Throwable x) promise.failed(x); } - @Override - public InvocationType getInvocationType() - { - return InvocationType.NON_BLOCKING; - } - @Override public void onFillable() { diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpConnectionOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpConnectionOverHTTP.java index 018fe81df0cc..50816a4b3d47 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpConnectionOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpConnectionOverHTTP.java @@ -46,6 +46,7 @@ import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Attachable; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.thread.Sweeper; import org.slf4j.Logger; @@ -55,6 +56,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne { private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP.class); + private final Callback fillableCallback = new FillableCallback(); private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicInteger sweeps = new AtomicInteger(); private final Promise promise; @@ -188,7 +190,7 @@ public void setInitialize(boolean initialize) public void onOpen() { super.onOpen(); - fillInterested(); + setFillInterest(); boolean initialize = isInitialize(); if (initialize) { @@ -210,6 +212,11 @@ public void onOpen() } } + void setFillInterest() + { + fillInterested(fillableCallback); + } + @Override public boolean isClosed() { @@ -432,4 +439,26 @@ public String toString() return HttpConnectionOverHTTP.this.toString(); } } + + private class FillableCallback implements Callback + { + @Override + public void succeeded() + { + onFillable(); + } + + @Override + public void failed(Throwable x) + { + onFillInterestedFailed(x); + } + + @Override + public InvocationType getInvocationType() + { + HttpClientTransport transport = getHttpDestination().getHttpClient().getTransport(); + return transport.getInvocationType(delegate); + } + } } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java index f74dbc6c149f..5fd3167828be 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java @@ -386,7 +386,7 @@ protected void fillInterested() { if (LOG.isDebugEnabled()) LOG.debug("Registering as fill interested in {}", this); - getHttpConnection().fillInterested(); + getHttpConnection().setFillInterest(); } private void shutdown() diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java index 4bdc222b125e..13ea1a072463 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpConnectionOverFCGI.java @@ -48,6 +48,7 @@ import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne { private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverFCGI.class); + private final Callback fillableCallback = new FillableCallback(); private final ByteBufferPool networkByteBufferPool; private final AtomicInteger requests = new AtomicInteger(); private final AtomicBoolean closed = new AtomicBoolean(); @@ -128,10 +130,15 @@ public SendFailure send(HttpExchange exchange) public void onOpen() { super.onOpen(); - fillInterested(); + setFillInterest(); promise.succeeded(this); } + void setFillInterest() + { + fillInterested(fillableCallback); + } + @Override public void onFillable() { @@ -492,4 +499,25 @@ private enum State { STATUS, HEADERS, CONTENT, COMPLETE } + + private class FillableCallback implements Callback + { + @Override + public void succeeded() + { + onFillable(); + } + + @Override + public void failed(Throwable x) + { + onFillInterestedFailed(x); + } + + @Override + public InvocationType getInvocationType() + { + return getHttpDestination().getHttpClient().getTransport().getInvocationType(delegate); + } + } } diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java index 805cff268acc..2381bd6951c3 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java @@ -36,7 +36,7 @@ void receive() HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); boolean setFillInterest = httpConnection.parseAndFill(true); if (!hasContent() && setFillInterest) - httpConnection.fillInterested(); + fillInterested(httpConnection); } else { @@ -86,7 +86,7 @@ public Content.Chunk read(boolean fillInterestIfNeeded) if (chunk != null) return chunk; if (needFillInterest && fillInterestIfNeeded) - httpConnection.fillInterested(); + fillInterested(httpConnection); return null; } @@ -138,7 +138,12 @@ private void receiveNext() HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection(); boolean setFillInterest = httpConnection.parseAndFill(true); if (!hasContent() && setFillInterest) - httpConnection.fillInterested(); + fillInterested(httpConnection); + } + + private void fillInterested(HttpConnectionOverFCGI httpConnection) + { + httpConnection.setFillInterest(); } @Override diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java index fdfbcf0d6781..f3a880711e43 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java @@ -34,6 +34,7 @@ import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.util.Attributes; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.StringUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +43,7 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements { private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class); + private final Callback fillableCallback = new FillableCallback(); private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory(); private final Attributes attributes = new Lazy(); private final Connector connector; @@ -160,7 +162,7 @@ public void clearAttributes() public void onOpen() { super.onOpen(); - fillInterested(); + setFillInterest(); } @Override @@ -188,7 +190,7 @@ public void onFillable() else if (read == 0) { releaseInputBuffer(); - fillInterested(); + setFillInterest(); return; } else @@ -305,11 +307,16 @@ void onCompleted(Throwable failure) { releaseInputBuffer(); if (failure == null) - fillInterested(); + setFillInterest(); else getFlusher().shutdown(); } + private void setFillInterest() + { + fillInterested(fillableCallback); + } + @Override public boolean onIdleExpired(TimeoutException timeoutException) { @@ -419,4 +426,25 @@ public void close() } super.close(); } + + private class FillableCallback implements Callback + { + @Override + public void succeeded() + { + onFillable(); + } + + @Override + public void failed(Throwable x) + { + onFillInterestedFailed(x); + } + + @Override + public InvocationType getInvocationType() + { + return getConnector().getServer().getInvocationType(); + } + } } diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/ClientHTTP2StreamEndPoint.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/ClientHTTP2StreamEndPoint.java index d3dc3150d447..f5c071a2cceb 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/ClientHTTP2StreamEndPoint.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/ClientHTTP2StreamEndPoint.java @@ -63,7 +63,7 @@ public Runnable onTimeout(TimeoutException timeout, Promise promise) promise.succeeded(true); return null; } - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + return new Invocable.ReadyTask(getInvocationType(), () -> { boolean expire = connection.onIdleExpired(timeout); if (expire) @@ -78,7 +78,7 @@ public Runnable onTimeout(TimeoutException timeout, Promise promise) @Override public Runnable onFailure(Throwable failure, Callback callback) { - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + return new Invocable.ReadyTask(getInvocationType(), () -> { processFailure(failure); close(failure); diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java index 7e1991b9b777..f5d38379105a 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java @@ -184,7 +184,7 @@ public void onNewStream(Stream stream) @Override public void onHeaders(Stream stream, HeadersFrame frame) { - receiver.onHeaders(stream, frame); + offerTask(receiver.onHeaders(stream, frame)); } @Override @@ -197,28 +197,33 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame) public void onDataAvailable(Stream stream) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); - connection.offerTask(channel.onDataAvailable(), false); + offerTask(channel.onDataAvailable()); } @Override public void onReset(Stream stream, ResetFrame frame, Callback callback) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); - connection.offerTask(channel.onReset(frame, callback), false); + offerTask(channel.onReset(frame, callback)); } @Override public void onIdleTimeout(Stream stream, TimeoutException x, Promise promise) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); - connection.offerTask(channel.onTimeout(x, promise), false); + offerTask(channel.onTimeout(x, promise)); } @Override public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); - connection.offerTask(channel.onFailure(failure, callback), false); + offerTask(channel.onFailure(failure, callback)); + } + + private void offerTask(Runnable task) + { + connection.offerTask(task, false); } } } diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java index ade527797cb0..0a42cfc0ca8b 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java @@ -46,6 +46,7 @@ import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Sweeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -293,6 +294,11 @@ void offerTask(Runnable task, boolean dispatch) connection.offerTask(task, dispatch); } + Invocable.InvocationType getInvocationType() + { + return getHttpClient().getTransport().getInvocationType(this); + } + @Override public String toString() { diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java index ba6835cfaf43..c286c0bec21f 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java @@ -116,13 +116,11 @@ protected HttpChannelOverHTTP2 getHttpChannel() return (HttpChannelOverHTTP2)super.getHttpChannel(); } - void onHeaders(Stream stream, HeadersFrame frame) + Runnable onHeaders(Stream stream, HeadersFrame frame) { MetaData metaData = frame.getMetaData(); - if (metaData.isResponse()) - onResponse(stream, frame); - else - onTrailer(frame); + Runnable task = metaData.isResponse() ? () -> onResponse(stream, frame) : () -> onTrailer(frame); + return new Invocable.ReadyTask(getInvocationType(), task); } private void onResponse(Stream stream, HeadersFrame frame) @@ -224,7 +222,7 @@ public Runnable onDataAvailable() HttpExchange exchange = getHttpExchange(); if (exchange == null) return null; - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> responseContentAvailable(exchange)); + return new Invocable.ReadyTask(getInvocationType(), () -> responseContentAvailable(exchange)); } @Override @@ -236,7 +234,7 @@ public Runnable onReset(ResetFrame frame, Callback callback) callback.succeeded(); return null; } - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + return new Invocable.ReadyTask(getInvocationType(), () -> { int error = frame.getError(); IOException failure = new IOException(ErrorCode.toString(error, "reset_code_" + error)); @@ -253,7 +251,7 @@ public Runnable onTimeout(TimeoutException failure, Promise promise) promise.succeeded(false); return null; } - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + return new Invocable.ReadyTask(getInvocationType(), () -> promise.completeWith(exchange.getRequest().abort(failure)) ); } @@ -262,6 +260,11 @@ public Runnable onTimeout(TimeoutException failure, Promise promise) public Runnable onFailure(Throwable failure, Callback callback) { Promise promise = Promise.from(failed -> callback.succeeded(), callback::failed); - return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> responseFailure(failure, promise)); + return new Invocable.ReadyTask(getInvocationType(), () -> responseFailure(failure, promise)); + } + + private Invocable.InvocationType getInvocationType() + { + return getHttpChannel().getHttpConnection().getInvocationType(); } } diff --git a/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index dc2469e2d897..c8eb0418608d 100644 --- a/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -173,12 +173,6 @@ public void failed(Throwable x) close(); promise.failed(x); } - - @Override - public InvocationType getInvocationType() - { - return InvocationType.NON_BLOCKING; - } } private static class ConnectionListener implements Connection.Listener diff --git a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpChannelOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpChannelOverHTTP3.java index bf325c78de34..1453dff0d71a 100644 --- a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpChannelOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpChannelOverHTTP3.java @@ -22,9 +22,12 @@ import org.eclipse.jetty.http3.HTTP3ErrorCode; import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.client.HTTP3SessionClient; +import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.util.Promise; public class HttpChannelOverHTTP3 extends HttpChannel { + private final Stream.Client.Listener listener = new Listener(); private final HttpConnectionOverHTTP3 connection; private final HTTP3SessionClient session; private final HttpSenderOverHTTP3 sender; @@ -52,7 +55,7 @@ public HTTP3SessionClient getSession() public Stream.Client.Listener getStreamListener() { - return receiver; + return listener; } @Override @@ -105,7 +108,7 @@ public void exchangeTerminated(HttpExchange exchange, Result result) public void release() { setStream(null); - connection.release(this); + getHttpConnection().release(this); } @Override @@ -116,4 +119,48 @@ public String toString() sender, receiver); } + + private class Listener implements Stream.Client.Listener + { + @Override + public void onNewStream(Stream.Client stream) + { + setStream(stream); + } + + @Override + public void onResponse(Stream.Client stream, HeadersFrame frame) + { + offerTask(receiver.onResponse(frame)); + } + + @Override + public void onDataAvailable(Stream.Client stream) + { + offerTask(receiver.onDataAvailable()); + } + + @Override + public void onTrailer(Stream.Client stream, HeadersFrame frame) + { + offerTask(receiver.onTrailer(frame)); + } + + @Override + public void onIdleTimeout(Stream.Client stream, Throwable failure, Promise promise) + { + offerTask(receiver.onIdleTimeout(failure, promise)); + } + + @Override + public void onFailure(Stream.Client stream, long error, Throwable failure) + { + offerTask(receiver.onFailure(failure)); + } + + private void offerTask(Runnable task) + { + getSession().getProtocolSession().offer(task, false); + } + } } diff --git a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpConnectionOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpConnectionOverHTTP3.java index 4f9e92910d74..a179460fc762 100644 --- a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpConnectionOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpConnectionOverHTTP3.java @@ -32,6 +32,7 @@ import org.eclipse.jetty.http3.client.HTTP3SessionClient; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.quic.common.QuicSession; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,4 +164,9 @@ public boolean onIdleTimeout(long idleTimeout, Throwable failure) close(failure); return false; } + + Invocable.InvocationType getInvocationType() + { + return getHttpClient().getTransport().getInvocationType(this); + } } diff --git a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java index 660fad9b9bee..6d3e6784ee9b 100644 --- a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java @@ -27,10 +27,11 @@ import org.eclipse.jetty.http3.frames.HeadersFrame; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client.Listener +public class HttpReceiverOverHTTP3 extends HttpReceiver { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP3.class); @@ -87,75 +88,74 @@ protected HttpChannelOverHTTP3 getHttpChannel() return (HttpChannelOverHTTP3)super.getHttpChannel(); } - @Override - public void onNewStream(Stream.Client stream) - { - getHttpChannel().setStream(stream); - } - - @Override - public void onResponse(Stream.Client stream, HeadersFrame frame) + Runnable onResponse(HeadersFrame frame) { HttpExchange exchange = getHttpExchange(); if (exchange == null) - return; + return null; - HttpResponse httpResponse = exchange.getResponse(); - MetaData.Response response = (MetaData.Response)frame.getMetaData(); - httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); + return new Invocable.ReadyTask(getInvocationType(), () -> + { + HttpResponse httpResponse = exchange.getResponse(); + MetaData.Response response = (MetaData.Response)frame.getMetaData(); + httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); - responseBegin(exchange); + responseBegin(exchange); - HttpFields headers = response.getHttpFields(); - for (HttpField header : headers) - { - responseHeader(exchange, header); - } + HttpFields headers = response.getHttpFields(); + for (HttpField header : headers) + { + responseHeader(exchange, header); + } - // TODO: add support for HttpMethod.CONNECT. + // TODO: add support for HttpMethod.CONNECT. - responseHeaders(exchange); + responseHeaders(exchange); + }); } - @Override - public void onDataAvailable(Stream.Client stream) + Runnable onDataAvailable() { if (LOG.isDebugEnabled()) LOG.debug("Data available notification in {}", this); HttpExchange exchange = getHttpExchange(); if (exchange == null) - return; + return null; - responseContentAvailable(exchange); + return new Invocable.ReadyTask(getInvocationType(), () -> responseContentAvailable(exchange)); } - @Override - public void onTrailer(Stream.Client stream, HeadersFrame frame) + Runnable onTrailer(HeadersFrame frame) { HttpExchange exchange = getHttpExchange(); if (exchange == null) - return; + return null; HttpFields trailers = frame.getMetaData().getHttpFields(); trailers.forEach(exchange.getResponse()::trailer); - responseSuccess(exchange, null); + return new Invocable.ReadyTask(getInvocationType(), () -> responseSuccess(exchange, null)); } - @Override - public void onIdleTimeout(Stream.Client stream, Throwable failure, Promise promise) + Runnable onIdleTimeout(Throwable failure, Promise promise) { HttpExchange exchange = getHttpExchange(); - if (exchange != null) - exchange.abort(failure, Promise.from(aborted -> promise.succeeded(!aborted), promise::failed)); - else + if (exchange == null) + { promise.succeeded(false); + return null; + } + return new Invocable.ReadyTask(getInvocationType(), () -> exchange.abort(failure, Promise.from(aborted -> promise.succeeded(!aborted), promise::failed))); } - @Override - public void onFailure(Stream.Client stream, long error, Throwable failure) + Runnable onFailure(Throwable failure) + { + return new Invocable.ReadyTask(getInvocationType(), () -> responseFailure(failure, Promise.noop())); + } + + private Invocable.InvocationType getInvocationType() { - responseFailure(failure, Promise.noop()); + return getHttpChannel().getHttpConnection().getInvocationType(); } } diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3StreamConnection.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3StreamConnection.java index 52b0bd715e9d..139999739d67 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3StreamConnection.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/HTTP3StreamConnection.java @@ -30,6 +30,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.quic.common.QuicStreamEndPoint; +import org.eclipse.jetty.util.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +40,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection // An empty DATA frame is the sequence of bytes [0x0, 0x0]. private static final ByteBuffer EMPTY_DATA_FRAME = ByteBuffer.allocate(2); + private final Callback fillableCallback = new FillableCallback(); private final AtomicReference action = new AtomicReference<>(); private final ByteBufferPool bufferPool; private final MessageParser parser; @@ -87,7 +89,7 @@ void setStream(HTTP3Stream stream) public void onOpen() { super.onOpen(); - fillInterested(); + setFillInterest(); } @Override @@ -219,7 +221,7 @@ private void processNonDataFrames() // No bytes left in the buffer, but there is demand. // Set fill interest to call the application when bytes arrive. tryReleaseInputBuffer(false); - fillInterested(); + setFillInterest(); } } @@ -321,7 +323,7 @@ private MessageParser.Result parseAndFill(boolean setFillInterest) throws IOExce } if (setFillInterest) - fillInterested(); + setFillInterest(); } return MessageParser.Result.NO_FRAME; @@ -335,6 +337,11 @@ private MessageParser.Result parseAndFill(boolean setFillInterest) throws IOExce } } + private void setFillInterest() + { + fillInterested(fillableCallback); + } + private int fill(ByteBuffer byteBuffer) throws IOException { return getEndPoint().fill(byteBuffer); @@ -481,4 +488,25 @@ public void onData(long streamId, DataFrame frame) throw new IllegalStateException(); } } + + private class FillableCallback implements Callback + { + @Override + public void succeeded() + { + onFillable(); + } + + @Override + public void failed(Throwable x) + { + onFillInterestedFailed(x); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.EITHER; + } + } } diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionStreamConnection.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionStreamConnection.java index 22f1a4e2d20d..213760a10fc4 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionStreamConnection.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/InstructionStreamConnection.java @@ -24,12 +24,15 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public abstract class InstructionStreamConnection extends AbstractConnection implements Connection.UpgradeTo { private static final Logger LOG = LoggerFactory.getLogger(InstructionStreamConnection.class); + + private final Callback fillableCallback = new FillableCallback(); private final ByteBufferPool bufferPool; private final ParserListener listener; private boolean useInputDirectByteBuffers = true; @@ -70,7 +73,7 @@ public void onOpen() if (buffer != null && buffer.hasRemaining()) onFillable(); else - fillInterested(); + setFillInterest(); } @Override @@ -95,7 +98,7 @@ public void onFillable() { buffer.release(); buffer = null; - fillInterested(); + setFillInterest(); break; } else if (filled < 0) @@ -117,6 +120,11 @@ else if (filled < 0) } } + private void setFillInterest() + { + fillInterested(fillableCallback); + } + private void fail(long errorCode, String message, Throwable failure) { buffer.release(); @@ -139,4 +147,25 @@ protected void notifySessionFailure(long error, String reason, Throwable failure } protected abstract void parseInstruction(ByteBuffer buffer) throws QpackException; + + private class FillableCallback implements Callback + { + @Override + public void succeeded() + { + onFillable(); + } + + @Override + public void failed(Throwable x) + { + onFillInterestedFailed(x); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + } } diff --git a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/UnidirectionalStreamConnection.java b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/UnidirectionalStreamConnection.java index 67e8bdc644a2..a16fbf5915b3 100644 --- a/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/UnidirectionalStreamConnection.java +++ b/jetty-core/jetty-http3/jetty-http3-common/src/main/java/org/eclipse/jetty/http3/UnidirectionalStreamConnection.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.io.RetainableByteBuffer; import org.eclipse.jetty.quic.common.QuicStreamEndPoint; import org.eclipse.jetty.quic.common.StreamType; +import org.eclipse.jetty.util.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +36,7 @@ public class UnidirectionalStreamConnection extends AbstractConnection implement { private static final Logger LOG = LoggerFactory.getLogger(UnidirectionalStreamConnection.class); + private final Callback fillableCallback = new FillableCallback(); private final ByteBufferPool bufferPool; private final QpackEncoder encoder; private final QpackDecoder decoder; @@ -72,7 +74,7 @@ public void setUseInputDirectByteBuffers(boolean useInputDirectByteBuffers) public void onOpen() { super.onOpen(); - fillInterested(); + setFillInterest(); } @Override @@ -109,7 +111,7 @@ public void onFillable() else if (filled == 0) { buffer.release(); - fillInterested(); + setFillInterest(); break; } else @@ -131,6 +133,11 @@ else if (filled == 0) } } + private void setFillInterest() + { + fillInterested(fillableCallback); + } + private void detectAndUpgrade(long streamType) { if (streamType == ControlStreamConnection.STREAM_TYPE) @@ -177,4 +184,25 @@ else if (streamType == DecoderStreamConnection.STREAM_TYPE) } } } + + private class FillableCallback implements Callback + { + @Override + public void succeeded() + { + onFillable(); + } + + @Override + public void failed(Throwable x) + { + onFillInterestedFailed(x); + } + + @Override + public InvocationType getInvocationType() + { + return InvocationType.NON_BLOCKING; + } + } } diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index 3f40f11c3902..426835d16568 100644 --- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -52,12 +52,10 @@ protected AbstractConnection(EndPoint endPoint, Executor executor) _readCallback = new ReadCallback(); } - @Deprecated @Override - public InvocationType getInvocationType() + @Deprecated(since = "12.0.0", forRemoval = true) + public Invocable.InvocationType getInvocationType() { - // TODO consider removing the #fillInterested method from the connection and only use #fillInterestedCallback - // so a connection need not be Invocable return Invocable.super.getInvocationType(); } @@ -139,10 +137,21 @@ protected void failedCallback(final Callback callback, final Throwable x) * @see #onFillable() */ public void fillInterested() + { + fillInterested(_readCallback); + } + + /** + *

Registers read interest with the given callback.

+ *

When read readiness is signaled, the callback will be completed.

+ * + * @param callback the callback to complete when read readiness is signaled + */ + public void fillInterested(Callback callback) { if (LOG.isDebugEnabled()) - LOG.debug("fillInterested {}", this); - getEndPoint().fillInterested(_readCallback); + LOG.debug("fillInterested {} {}", callback, this); + getEndPoint().fillInterested(callback); } public void tryFillInterested(Callback callback) @@ -167,10 +176,10 @@ public boolean isFillInterested() * * @param cause the exception that caused the failure */ - protected void onFillInterestedFailed(Throwable cause) + public void onFillInterestedFailed(Throwable cause) { if (LOG.isDebugEnabled()) - LOG.debug("{} onFillInterestedFailed {}", this, cause); + LOG.debug("{} onFillInterestedFailed", this, cause); if (_endPoint.isOpen()) { boolean close = true; @@ -333,11 +342,5 @@ public String toString() { return String.format("%s@%x{%s}", getClass().getSimpleName(), hashCode(), AbstractConnection.this); } - - @Override - public InvocationType getInvocationType() - { - return AbstractConnection.this.getInvocationType(); - } } } diff --git a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java index a86d7cd011cc..7c9266130ac9 100644 --- a/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java +++ b/jetty-core/jetty-quic/jetty-quic-common/src/main/java/org/eclipse/jetty/quic/common/QuicStreamEndPoint.java @@ -29,6 +29,7 @@ import org.eclipse.jetty.io.WriteFlusher; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.thread.Invocable; import org.eclipse.jetty.util.thread.Scheduler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -260,34 +261,35 @@ public boolean onReadable() boolean interested = isFillInterested(); if (LOG.isDebugEnabled()) LOG.debug("stream #{} is readable, processing: {}", streamId, interested); + if (interested) { - getFillInterest().fillable(); + Invocable.ReadyTask task = new Invocable.ReadyTask(getFillInterest().getCallbackInvocationType(), getFillInterest()::fillable); + getQuicSession().getProtocolSession().offer(task, false); + return true; } - else + + if (isStreamFinished()) { - if (isStreamFinished()) + // Check if the stream was finished normally. + try + { + fill(BufferUtil.EMPTY_BUFFER); + } + catch (EOFException x) + { + // Got reset. + getFillInterest().onFail(x); + getQuicSession().onFailure(x); + } + catch (Throwable x) { - // Check if the stream was finished normally. - try - { - fill(BufferUtil.EMPTY_BUFFER); - } - catch (EOFException x) - { - // Got reset. - getFillInterest().onFail(x); - getQuicSession().onFailure(x); - } - catch (Throwable x) - { - EofException e = new EofException(x); - getFillInterest().onFail(e); - getQuicSession().onFailure(e); - } + EofException e = new EofException(x); + getFillInterest().onFail(e); + getQuicSession().onFailure(e); } } - return interested; + return false; } @Override diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index bb291501a4ae..7eeb5341d7f6 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -88,6 +88,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab private static final ThreadLocal __currentConnection = new ThreadLocal<>(); private static final AtomicLong __connectionIdGenerator = new AtomicLong(); + private final Callback fillableCallback = new FillableCallback(); private final TunnelSupport _tunnelSupport = new TunnelSupportOverHTTP1(); private final AtomicLong _streamIdGenerator = new AtomicLong(); private final long _id; @@ -154,12 +155,6 @@ public HttpConnection(HttpConfiguration configuration, Connector connector, EndP LOG.debug("New HTTP Connection {}", this); } - @Override - public InvocationType getInvocationType() - { - return getServer().getInvocationType(); - } - /** * @deprecated No replacement, no longer used within {@link HttpConnection}, will be removed in Jetty 12.1.0 */ @@ -432,7 +427,7 @@ else if (filled == 0) { assert isRequestBufferEmpty(); releaseRequestBuffer(); - fillInterested(); + setFillInterest(); break; } else if (filled < 0) @@ -579,7 +574,7 @@ private boolean upgrade(HttpStreamOverHTTP1 stream) } @Override - protected void onFillInterestedFailed(Throwable cause) + public void onFillInterestedFailed(Throwable cause) { _parser.close(); super.onFillInterestedFailed(cause); @@ -610,20 +605,20 @@ public void onOpen() { super.onOpen(); if (isRequestBufferEmpty()) - fillInterested(); + setFillInterest(); else getExecutor().execute(this); } - @Override - public void run() + private void setFillInterest() { - onFillable(); + fillInterested(fillableCallback); } - public void asyncReadFillInterested() + @Override + public void run() { - getEndPoint().tryFillInterested(_demandContentCallback); + onFillable(); } @Override @@ -1631,4 +1626,25 @@ public String getReason() return getMessage(); } } + + private class FillableCallback implements Callback + { + @Override + public void succeeded() + { + onFillable(); + } + + @Override + public void failed(Throwable x) + { + onFillInterestedFailed(x); + } + + @Override + public InvocationType getInvocationType() + { + return getServer().getInvocationType(); + } + } } diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/VirtualThreadsTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/VirtualThreadsTest.java index 6ac723ca3962..e8e0bb2468e6 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/VirtualThreadsTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/VirtualThreadsTest.java @@ -13,24 +13,33 @@ package org.eclipse.jetty.test.client.transport; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.eclipse.jetty.client.ContentResponse; +import org.eclipse.jetty.client.Result; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.Content; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.VirtualThreads; import org.eclipse.jetty.util.thread.ThreadPool; +import org.eclipse.jetty.util.thread.VirtualThreadPool; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.api.condition.DisabledForJreRange; import org.junit.jupiter.api.condition.JRE; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; @DisabledForJreRange(max = JRE.JAVA_18) public class VirtualThreadsTest extends AbstractTest @@ -71,4 +80,55 @@ public boolean handle(Request request, Response response, Callback callback) assertEquals(HttpStatus.OK_200, response.getStatus(), " for transport " + transport); } + + @ParameterizedTest + @MethodSource("transports") + public void testClientListenersInvokedOnVirtualThread(Transport transport) throws Exception + { + startServer(transport, new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception + { + // Send only the headers. + response.write(false, null, Callback.NOOP); + // Wait to force the client to invoke the content + // callback separately from the headers callback. + Thread.sleep(500); + // Send the content. + Content.Sink.write(response, true, "hello", callback); + return true; + } + }); + + prepareClient(transport); + VirtualThreads.Configurable executor = (VirtualThreads.Configurable)client.getExecutor(); + VirtualThreadPool vtp = new VirtualThreadPool(); + vtp.setName("green-"); + executor.setVirtualThreadsExecutor(vtp); + client.start(); + + for (int i = 0; i < 2; ++i) + { + AtomicReference resultRef = new AtomicReference<>(); + ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); + Consumer verify = name -> queue.offer((VirtualThreads.isVirtualThread() ? "virtual" : "platform") + "-" + name); + client.newRequest(newURI(transport)) + .onResponseBegin(r -> verify.accept("begin")) + .onResponseHeaders(r -> verify.accept("headers")) + .onResponseContent((r, b) -> verify.accept("content")) + .onResponseSuccess(r -> verify.accept("success")) + .onComplete(r -> verify.accept("complete")) + .send(r -> + { + verify.accept("send"); + resultRef.set(r); + }); + + Result result = await().atMost(5, TimeUnit.SECONDS).until(resultRef::get, notNullValue()); + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + queue.forEach(event -> assertTrue(event.startsWith("virtual"), event)); + } + } } diff --git a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java index 19586df42bf4..289cd18ac08b 100644 --- a/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java +++ b/jetty-core/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java @@ -44,29 +44,38 @@ public interface Invocable enum InvocationType { /** - *

Invoking the {@link Invocable} may block the invoker thread, + *

Invoking the task may block the invoker thread, * and the invocation may be performed immediately (possibly blocking * the invoker thread) or deferred to a later time, for example - * by submitting the {@code Invocable} to a thread pool.

- *

This invocation type is suitable for {@code Invocable}s that + * by submitting the task to a thread pool.

+ *

This invocation type is suitable for tasks that * call application code, for example to process an HTTP request.

*/ BLOCKING, /** - *

Invoking the {@link Invocable} does not block the invoker thread, + *

Invoking the task does not block the invoker thread, * and the invocation may be performed immediately in the invoker thread.

- *

This invocation type is suitable for {@code Invocable}s that + *

This invocation type is suitable for tasks that * call implementation code that is guaranteed to never block the * invoker thread.

*/ NON_BLOCKING, /** - *

Invoking the {@link Invocable} may block the invoker thread, - * but the invocation cannot be deferred to a later time, differently + *

Invoking the task does not block the invoker thread, + * and the invocation may be performed immediately in the invoker thread.

+ *

The thread that produced the task may dispatch another + * thread to resume production, and then invoke the task, differently + * from {@link #NON_BLOCKING} which does not dispatch production to + * another thread.

+ *

The invocation cannot be deferred to a later time, differently * from {@link #BLOCKING}.

- *

This invocation type is suitable for {@code Invocable}s that - * themselves perform the non-deferrable action in a non-blocking way, - * thus advancing a possibly stalled system.

+ *

A series of {@code NON_BLOCKING} tasks is run sequentially, + * while a series of {@code EITHER} tasks may be run in parallel, + * if there are threads available to resume task production.

+ *

This invocation type is suitable for tasks that + * perform the non-deferrable action in a non-blocking way, + * hinting that may be run in parallel, for example when each task + * processes a different connection or a different stream.

*/ EITHER }