Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #12266 - InvocationType improvements and cleanups. #12273

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.eclipse.jetty.client.transport.HttpDestination;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.util.thread.Invocable;

/**
* {@link HttpClientTransport} represents what transport implementations should provide
Expand Down Expand Up @@ -100,4 +101,9 @@ public default void connect(SocketAddress address, Map<String, Object> context)
* @param factory the factory for ConnectionPool instances
*/
public void setConnectionPoolFactory(ConnectionPool.Factory factory);

public default Invocable.InvocationType getInvocationType(Connection connection)
{
return Invocable.InvocationType.BLOCKING;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -508,12 +508,6 @@ public void failed(Throwable x)
promise.failed(x);
}

@Override
public InvocationType getInvocationType()
{
return InvocationType.NON_BLOCKING;
}

@Override
public void onFillable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
Expand All @@ -55,6 +56,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP.class);

private final Callback fillableCallback = new FillableCallback();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger sweeps = new AtomicInteger();
private final Promise<Connection> promise;
Expand Down Expand Up @@ -188,7 +190,7 @@ public void setInitialize(boolean initialize)
public void onOpen()
{
super.onOpen();
fillInterested();
setFillInterest();
boolean initialize = isInitialize();
if (initialize)
{
Expand All @@ -210,6 +212,11 @@ public void onOpen()
}
}

void setFillInterest()
{
fillInterested(fillableCallback);
}

@Override
public boolean isClosed()
{
Expand Down Expand Up @@ -432,4 +439,26 @@ public String toString()
return HttpConnectionOverHTTP.this.toString();
}
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
HttpClientTransport transport = getHttpDestination().getHttpClient().getTransport();
return transport.getInvocationType(delegate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ protected void fillInterested()
{
if (LOG.isDebugEnabled())
LOG.debug("Registering as fill interested in {}", this);
getHttpConnection().fillInterested();
getHttpConnection().setFillInterest();
}

private void shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +57,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverFCGI.class);

private final Callback fillableCallback = new FillableCallback();
private final ByteBufferPool networkByteBufferPool;
private final AtomicInteger requests = new AtomicInteger();
private final AtomicBoolean closed = new AtomicBoolean();
Expand Down Expand Up @@ -128,10 +130,15 @@ public SendFailure send(HttpExchange exchange)
public void onOpen()
{
super.onOpen();
fillInterested();
setFillInterest();
promise.succeeded(this);
}

void setFillInterest()
{
fillInterested(fillableCallback);
}

@Override
public void onFillable()
{
Expand Down Expand Up @@ -492,4 +499,25 @@ private enum State
{
STATUS, HEADERS, CONTENT, COMPLETE
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return getHttpDestination().getHttpClient().getTransport().getInvocationType(delegate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void receive()
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill(true);
if (!hasContent() && setFillInterest)
httpConnection.fillInterested();
fillInterested(httpConnection);
}
else
{
Expand Down Expand Up @@ -86,7 +86,7 @@ public Content.Chunk read(boolean fillInterestIfNeeded)
if (chunk != null)
return chunk;
if (needFillInterest && fillInterestIfNeeded)
httpConnection.fillInterested();
fillInterested(httpConnection);
return null;
}

Expand Down Expand Up @@ -138,7 +138,12 @@ private void receiveNext()
HttpConnectionOverFCGI httpConnection = getHttpChannel().getHttpConnection();
boolean setFillInterest = httpConnection.parseAndFill(true);
if (!hasContent() && setFillInterest)
httpConnection.fillInterested();
fillInterested(httpConnection);
}

private void fillInterested(HttpConnectionOverFCGI httpConnection)
{
httpConnection.setFillInterest();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.Attributes;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,6 +43,7 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
{
private static final Logger LOG = LoggerFactory.getLogger(ServerFCGIConnection.class);

private final Callback fillableCallback = new FillableCallback();
private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
private final Attributes attributes = new Lazy();
private final Connector connector;
Expand Down Expand Up @@ -160,7 +162,7 @@ public void clearAttributes()
public void onOpen()
{
super.onOpen();
fillInterested();
setFillInterest();
}

@Override
Expand Down Expand Up @@ -188,7 +190,7 @@ public void onFillable()
else if (read == 0)
{
releaseInputBuffer();
fillInterested();
setFillInterest();
return;
}
else
Expand Down Expand Up @@ -305,11 +307,16 @@ void onCompleted(Throwable failure)
{
releaseInputBuffer();
if (failure == null)
fillInterested();
setFillInterest();
else
getFlusher().shutdown();
}

private void setFillInterest()
{
fillInterested(fillableCallback);
}

@Override
public boolean onIdleExpired(TimeoutException timeoutException)
{
Expand Down Expand Up @@ -419,4 +426,25 @@ public void close()
}
super.close();
}

private class FillableCallback implements Callback
{
@Override
public void succeeded()
{
onFillable();
}

@Override
public void failed(Throwable x)
{
onFillInterestedFailed(x);
}

@Override
public InvocationType getInvocationType()
{
return getConnector().getServer().getInvocationType();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public Runnable onTimeout(TimeoutException timeout, Promise<Boolean> promise)
promise.succeeded(true);
return null;
}
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->
return new Invocable.ReadyTask(getInvocationType(), () ->
{
boolean expire = connection.onIdleExpired(timeout);
if (expire)
Expand All @@ -78,7 +78,7 @@ public Runnable onTimeout(TimeoutException timeout, Promise<Boolean> promise)
@Override
public Runnable onFailure(Throwable failure, Callback callback)
{
return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () ->
return new Invocable.ReadyTask(getInvocationType(), () ->
{
processFailure(failure);
close(failure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ public void onNewStream(Stream stream)
@Override
public void onHeaders(Stream stream, HeadersFrame frame)
{
receiver.onHeaders(stream, frame);
offerTask(receiver.onHeaders(stream, frame));
}

@Override
Expand All @@ -197,28 +197,33 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame)
public void onDataAvailable(Stream stream)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
connection.offerTask(channel.onDataAvailable(), false);
offerTask(channel.onDataAvailable());
}

@Override
public void onReset(Stream stream, ResetFrame frame, Callback callback)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
connection.offerTask(channel.onReset(frame, callback), false);
offerTask(channel.onReset(frame, callback));
}

@Override
public void onIdleTimeout(Stream stream, TimeoutException x, Promise<Boolean> promise)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
connection.offerTask(channel.onTimeout(x, promise), false);
offerTask(channel.onTimeout(x, promise));
}

@Override
public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
{
HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment();
connection.offerTask(channel.onFailure(failure, callback), false);
offerTask(channel.onFailure(failure, callback));
}

private void offerTask(Runnable task)
{
connection.offerTask(task, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Invocable;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -293,6 +294,11 @@ void offerTask(Runnable task, boolean dispatch)
connection.offerTask(task, dispatch);
}

Invocable.InvocationType getInvocationType()
{
return getHttpClient().getTransport().getInvocationType(this);
}

@Override
public String toString()
{
Expand Down
Loading
Loading