Skip to content

Commit

Permalink
Fixes #12266 - InvocationType improvements and cleanups.
Browse files Browse the repository at this point in the history
* Removed usages of `AbstractConnection.getInvocationType()`.
* Changed HTTP server-side Connection implementations to use `AbstractConnection.fillInterested(Callback)` with a callback that specifies the `InvocationType`, derived from the `Server`, which derives it from the `Handler` chain.
* Changed client-side receivers to use `AbstractConnection.fillInterested(Callback)` with a callback that specifies the `InvocationType`, derived from the `HttpClientTransport`.
* Introduced `HttpClientTransport.getInvocationType(Connection)`, so that client applications can specify whether they block or not.
* Made sure client-side HTTP/2 and HTTP/3 return tasks with the proper `InvocationType` to be run by the `ExecutionStrategy` when calling application code.
* HTTP3StreamConnection now uses an `EITHER` fillable callback to possibly process streams in parallel.
* `QuicStreamEndPoint` now uses a task to invoke `FillInterest.fillable()`, rather than invoking it directly, therefore honoring the `InvocationType` of callback set by the `Connection` associated with the `QuicStreamEndPoint`.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Sep 23, 2024
1 parent 3b1f38e commit b08b57e
Show file tree
Hide file tree
Showing 36 changed files with 852 additions and 429 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.util.thread.Invocable;

/**
* {@link HttpClientTransport} represents what transport implementations should provide
Expand Down Expand Up @@ -83,4 +84,9 @@ public interface HttpClientTransport extends ClientConnectionFactory
* @param factory the factory for ConnectionPool instances
*/
public void setConnectionPoolFactory(ConnectionPool.Factory factory);

public default Invocable.InvocationType getInvocationType(Connection connection)
{
return Invocable.InvocationType.BLOCKING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,6 @@ public void failed(Throwable x)
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}

@Override
public void onFillable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
Expand All @@ -55,6 +56,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP.class);

private final Callback fillableCallback = new FillableCallback();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger sweeps = new AtomicInteger();
private final Promise<Connection> promise;
Expand Down Expand Up @@ -188,7 +190,7 @@ public void setInitialize(boolean initialize)
public void onOpen()
{
super.onOpen();
fillInterested();
setFillInterest();
boolean initialize = isInitialize();
if (initialize)
{
Expand All @@ -210,6 +212,11 @@ public void onOpen()
}
}

void setFillInterest()
{
fillInterested(fillableCallback);
}

@Override
public boolean isClosed()
{
Expand Down Expand Up @@ -432,4 +439,26 @@ public String toString()
return HttpConnectionOverHTTP.this.toString();
}
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
HttpClientTransport transport = getHttpDestination().getHttpClient().getTransport();
return transport.getInvocationType(delegate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ protected void fillInterested()
{
if (LOG.isDebugEnabled())
LOG.debug("Registering as fill interested in {}", this);
getHttpConnection().fillInterested();
getHttpConnection().setFillInterest();
}

private void shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +57,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverFCGI.class);

private final Callback fillableCallback = new FillableCallback();
private final ByteBufferPool networkByteBufferPool;
private final AtomicInteger requests = new AtomicInteger();
private final AtomicBoolean closed = new AtomicBoolean();
Expand Down Expand Up @@ -128,10 +130,15 @@ public SendFailure send(HttpExchange exchange)
public void onOpen()
{
super.onOpen();
fillInterested();
setFillInterest();
promise.succeeded(this);
}

void setFillInterest()
{
fillInterested(fillableCallback);
}

@Override
public void onFillable()
{
Expand Down Expand Up @@ -492,4 +499,25 @@ private enum State
{
STATUS, HEADERS, CONTENT, COMPLETE
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return getHttpDestination().getHttpClient().getTransport().getInvocationType(delegate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void receive()
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill(true);
if (!hasContent() && setFillInterest)
httpConnection.fillInterested();
fillInterested(httpConnection);
}
else
{
Expand Down Expand Up @@ -86,7 +86,7 @@ public Content.Chunk read(boolean fillInterestIfNeeded)
if (chunk != null)
return chunk;
if (needFillInterest && fillInterestIfNeeded)
httpConnection.fillInterested();
fillInterested(httpConnection);
return null;
}

Expand Down Expand Up @@ -138,7 +138,12 @@ private void receiveNext()
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill(true);
if (!hasContent() && setFillInterest)
httpConnection.fillInterested();
fillInterested(httpConnection);
}

private void fillInterested(HttpConnectionOverFCGI httpConnection)
{
httpConnection.setFillInterest();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,6 +43,7 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
{
private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class);

private final Callback fillableCallback = new FillableCallback();
private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
private final Attributes attributes = new Lazy();
private final Connector connector;
Expand Down Expand Up @@ -160,7 +162,7 @@ public void clearAttributes()
public void onOpen()
{
super.onOpen();
fillInterested();
setFillInterest();
}

@Override
Expand Down Expand Up @@ -188,7 +190,7 @@ public void onFillable()
else if (read == 0)
{
releaseInputBuffer();
fillInterested();
setFillInterest();
return;
}
else
Expand Down Expand Up @@ -304,11 +306,16 @@ void onCompleted(Throwable failure)
{
releaseInputBuffer();
if (failure == null)
fillInterested();
setFillInterest();
else
getFlusher().shutdown();
}

private void setFillInterest()
{
fillInterested(fillableCallback);
}

@Override
public boolean onIdleExpired(TimeoutException timeoutException)
{
Expand Down Expand Up @@ -418,4 +425,25 @@ public void close()
}
super.close();
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return getConnector().getServer().getInvocationType();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Invocable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,32 +37,26 @@ public ClientHTTP2StreamEndPoint(HTTP2Stream stream)
}

@Override
public Runnable onDataAvailable()
public void onDataAvailable()
{
// The InvocationType may change depending on the read callback.
return new Invocable.ReadyTask(getInvocationType(), this::processDataAvailable);
processDataAvailable();
}

@Override
public Runnable onReset(ResetFrame frame, Callback callback)
public void onReset(ResetFrame frame, Callback callback)
{
int error = frame.getError();
EofException failure = new EofException(ErrorCode.toString(error, "error_code_" + error));
return onFailure(failure, callback);
onFailure(failure, callback);
}

@Override
public Runnable onTimeout(TimeoutException timeout, Promise<Boolean> promise)
public void onTimeout(TimeoutException timeout, Promise<Boolean> promise)
{
if (LOG.isDebugEnabled())
LOG.debug("idle timeout on {}", this, timeout);
Connection connection = getConnection();
if (connection == null)
{
promise.succeeded(true);
return null;
}
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->
if (connection != null)
{
boolean expire = connection.onIdleExpired(timeout);
if (expire)
Expand All @@ -72,17 +65,18 @@ public Runnable onTimeout(TimeoutException timeout, Promise<Boolean> promise)
close(timeout);
}
promise.succeeded(expire);
});
}
else
{
promise.succeeded(true);
}
}

@Override
public Runnable onFailure(Throwable failure, Callback callback)
public void onFailure(Throwable failure, Callback callback)
{
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->
{
processFailure(failure);
close(failure);
callback.failed(failure);
});
processFailure(failure);
close(failure);
callback.failed(failure);
}
}
Loading

0 comments on commit b08b57e

Please sign in to comment.