Skip to content

Commit

Permalink
Fixes #4855 - Occasional h2spec failures on CI
Browse files Browse the repository at this point in the history
Completely rewritten `HttpTransportOverHTTP2.TransportCallback`.
The rewrite handles correctly asynchronous failures that now are executed
sequentially (and not concurrently) with writes.
If a write is in progress, the failure will just change the state and at the
end of the write a check on the state will determine what actions to take.

A session failure is now handled in HTTP2Session by first failing all the
streams - which notifies the Stream.Listeners - and then failing the session
- which notifies the Session.Listener.
The stream failures are executed concurrently by dispatching each one to a
different thread; this means that the stream failure callbacks are executed
concurrently (likely sending RST_STREAM frames).
The session failure callback is completed only when all the stream failure
callbacks have completed, to ensure that a GOAWAY frame is processed after
all the RST_STREAM frames.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Jun 5, 2020
1 parent 9dec284 commit 8d60636
Show file tree
Hide file tree
Showing 4 changed files with 270 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public void onGoAway(final GoAwayFrame frame)
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
closeFrame = frame;
notifyClose(this, frame, new DisconnectCallback());
onClose(frame, new DisconnectCallback());
return;
}
break;
Expand Down Expand Up @@ -498,6 +498,11 @@ public void onWindowUpdate(WindowUpdateFrame frame)
public void onStreamFailure(int streamId, int error, String reason)
{
Callback callback = new ResetCallback(streamId, error, Callback.NOOP);
onStreamFailure(streamId, error, reason, callback);
}

private void onStreamFailure(int streamId, int error, String reason, Callback callback)
{
IStream stream = getStream(streamId);
if (stream != null)
stream.process(new FailureFrame(error, reason), callback);
Expand All @@ -513,7 +518,28 @@ public void onConnectionFailure(int error, String reason)

protected void onConnectionFailure(int error, String reason, Callback callback)
{
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason, callback));
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(new CloseCallback(error, reason, callback), count + 1);
for (Stream stream : streams)
{
onStreamFailure(stream.getId(), error, reason, countCallback);
}
IOException failure = new IOException(String.format("%s/%s", ErrorCode.toString(error, null), reason));
notifyFailure(this, failure, countCallback);
}

private void onClose(GoAwayFrame frame, Callback callback)
{
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
String reason = frame.tryConvertPayload();
for (Stream stream : streams)
{
onStreamFailure(stream.getId(), frame.getError(), reason, countCallback);
}
notifyClose(this, frame, countCallback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public boolean isClosed()
public boolean isRemotelyClosed()
{
CloseState state = closeState.get();
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING;
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED;
}

public boolean isLocallyClosed()
Expand Down Expand Up @@ -357,6 +357,8 @@ private void onWindowUpdate(WindowUpdateFrame frame, Callback callback)

private void onFailure(FailureFrame frame, Callback callback)
{
// Don't close or remove the stream, as the listener may
// want to use it, for example to send a RST_STREAM frame.
notifyFailure(this, frame, callback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
Expand All @@ -41,7 +40,6 @@
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
Expand All @@ -58,7 +56,6 @@
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.TypeUtil;

public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
Expand Down Expand Up @@ -220,7 +217,11 @@ public void onStreamFailure(IStream stream, Throwable failure, Callback callback
{
Runnable task = channel.onFailure(failure, callback);
if (task != null)
{
// We must dispatch to another thread because the task
// may call application code that performs blocking I/O.
offerTask(task, true);
}
}
else
{
Expand All @@ -245,22 +246,10 @@ public boolean onSessionTimeout(Throwable failure)

public void onSessionFailure(Throwable failure, Callback callback)
{
ISession session = getSession();
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", session, failure);
Collection<Stream> streams = session.getStreams();
if (streams.isEmpty())
{
callback.succeeded();
}
else
{
CountingCallback counter = new CountingCallback(callback, streams.size());
for (Stream stream : streams)
{
onStreamFailure((IStream)stream, failure, counter);
}
}
LOG.debug("Processing session failure on {}", getSession(), failure);
// All the streams have already been failed, just succeed the callback.
callback.succeeded();
}

public void push(Connector connector, IStream stream, MetaData.Request request)
Expand Down
Loading

0 comments on commit 8d60636

Please sign in to comment.