Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetty 9.4.x 4855 h2spec failures #4946

Merged
merged 10 commits into from
Jun 9, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
return customize(connection, context);
}

private class HTTP2ClientConnection extends HTTP2Connection implements Callback
private static class HTTP2ClientConnection extends HTTP2Connection implements Callback
{
private final HTTP2Client client;
private final Promise<Session> promise;
Expand Down Expand Up @@ -154,7 +154,7 @@ public void failed(Throwable x)
}
}

private class ConnectionListener implements Connection.Listener
private static class ConnectionListener implements Connection.Listener
{
@Override
public void onOpened(Connection connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import javax.servlet.http.HttpServlet;

import org.eclipse.jetty.http.HostPortHttpField;
Expand All @@ -33,7 +32,7 @@
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
Expand All @@ -54,7 +53,7 @@ public class AbstractTest

protected void start(HttpServlet servlet) throws Exception
{
HTTP2ServerConnectionFactory connectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration());
HTTP2CServerConnectionFactory connectionFactory = new HTTP2CServerConnectionFactory(new HttpConfiguration());
connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
prepareServer(connectionFactory);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.eclipse.jetty.http2.client;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
Expand All @@ -41,6 +45,7 @@
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
Expand All @@ -63,6 +68,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class PrefaceTest extends AbstractTest
Expand Down Expand Up @@ -332,4 +338,71 @@ public void onHeaders(HeadersFrame frame)
assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS));
}
}

@Test
public void testInvalidServerPreface() throws Exception
{
try (ServerSocket server = new ServerSocket(0))
{
prepareClient();
client.start();

CountDownLatch failureLatch = new CountDownLatch(1);
Promise.Completable<Session> promise = new Promise.Completable<>();
InetSocketAddress address = new InetSocketAddress("localhost", server.getLocalPort());
client.connect(address, new Session.Listener.Adapter()
{
@Override
public void onFailure(Session session, Throwable failure)
{
failureLatch.countDown();
}
}, promise);

try (Socket socket = server.accept())
{
OutputStream output = socket.getOutputStream();
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));

Session session = promise.get(5, TimeUnit.SECONDS);
assertNotNull(session);

assertTrue(failureLatch.await(5, TimeUnit.SECONDS));

// Verify that the client closed the socket.
InputStream input = socket.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
}
}
}

@Test
public void testInvalidClientPreface() throws Exception
{
start(new ServerSessionListener.Adapter());

try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
output.flush();

byte[] bytes = new byte[1024];
InputStream input = client.getInputStream();
int read = input.read(bytes);
if (read < 0)
{
// Closing the connection without GOAWAY frame is fine.
return;
}

int type = bytes[3];
assertEquals(FrameType.GO_AWAY.getType(), type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -425,12 +425,21 @@ public void failed(Throwable x)
super.failed(x);
}

/**
* @return whether the entry is stale and must not be processed
*/
private boolean isStale()
{
return !isProtocol() && stream != null && stream.isReset();
// If it is a protocol frame, process it.
if (isProtocolFrame(frame))
return false;
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isReset();
}

private boolean isProtocol()
private boolean isProtocolFrame(Frame frame)
{
switch (frame.getType())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public void onGoAway(final GoAwayFrame frame)
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
closeFrame = frame;
notifyClose(this, frame, new DisconnectCallback());
onClose(frame, new DisconnectCallback());
return;
}
break;
Expand Down Expand Up @@ -498,9 +498,15 @@ public void onWindowUpdate(WindowUpdateFrame frame)
public void onStreamFailure(int streamId, int error, String reason)
{
Callback callback = new ResetCallback(streamId, error, Callback.NOOP);
Throwable failure = toFailure("Stream failure", error, reason);
onStreamFailure(streamId, error, reason, failure, callback);
}

private void onStreamFailure(int streamId, int error, String reason, Throwable failure, Callback callback)
{
IStream stream = getStream(streamId);
if (stream != null)
stream.process(new FailureFrame(error, reason), callback);
stream.process(new FailureFrame(error, reason, failure), callback);
else
callback.succeeded();
}
Expand All @@ -513,7 +519,45 @@ public void onConnectionFailure(int error, String reason)

protected void onConnectionFailure(int error, String reason, Callback callback)
{
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason, callback));
Throwable failure = toFailure("Session failure", error, reason);
onFailure(error, reason, failure, new CloseCallback(error, reason, callback));
}

protected void abort(Throwable failure)
{
onFailure(ErrorCode.NO_ERROR.code, null, failure, new TerminateCallback(failure));
}

private void onFailure(int error, String reason, Throwable failure, Callback callback)
{
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
for (Stream stream : streams)
{
onStreamFailure(stream.getId(), error, reason, failure, countCallback);
}
notifyFailure(this, failure, countCallback);
}

private void onClose(GoAwayFrame frame, Callback callback)
{
int error = frame.getError();
String reason = frame.tryConvertPayload();
Throwable failure = toFailure("Session close", error, reason);
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
for (Stream stream : streams)
{
onStreamFailure(stream.getId(), error, reason, failure, countCallback);
}
notifyClose(this, frame, countCallback);
}

private Throwable toFailure(String message, int error, String reason)
{
return new IOException(String.format("%s %s/%s", message, ErrorCode.toString(error, null), reason));
}

@Override
Expand Down Expand Up @@ -998,11 +1042,6 @@ private void terminate(Throwable cause)
}
}

protected void abort(Throwable failure)
{
notifyFailure(this, failure, new TerminateCallback(failure));
}

public boolean isDisconnected()
{
return !endPoint.isOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ private boolean startWrite(Callback callback)
{
if (writing.compareAndSet(null, callback))
return true;
close();
callback.failed(new WritePendingException());
return false;
}
Expand Down Expand Up @@ -177,7 +176,7 @@ public boolean isClosed()
public boolean isRemotelyClosed()
{
CloseState state = closeState.get();
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING;
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED;
}

public boolean isLocallyClosed()
Expand Down Expand Up @@ -358,6 +357,8 @@ private void onWindowUpdate(WindowUpdateFrame frame, Callback callback)

private void onFailure(FailureFrame frame, Callback callback)
{
// Don't close or remove the stream, as the listener may
// want to use it, for example to send a RST_STREAM frame.
notifyFailure(this, frame, callback);
}

Expand Down Expand Up @@ -608,7 +609,7 @@ private void notifyFailure(Stream stream, FailureFrame frame, Callback callback)
{
try
{
listener.onFailure(stream, frame.getError(), frame.getReason(), callback);
listener.onFailure(stream, frame.getError(), frame.getReason(), frame.getFailure(), callback);
}
catch (Throwable x)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,24 @@ default boolean onIdleTimeout(Stream stream, Throwable x)
* @param stream the stream
* @param error the error code
* @param reason the error reason, or null
* @param failure the failure
* @param callback the callback to complete when the failure has been handled
*/
default void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find this new method's signature and default implementation surprising. I would have expected that the error and reason parameters to go away especially since the overriding implementations just ignore them. Plus the fact that the default implementation drops the failure in favor of calling the deprecated implementation instead of directly succeeding the callback also makes me scratch my head.

Maybe FailureFrame.getFailure() should return some sort of H2Exception that wraps the original exception as well as containing the reason and error code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is at the HTTP/2 low-level API so our implementation may not use error and reason but users of these APIs may.
However, there are cases where we still have a failure but not an associated error (e.g. idle timeout and aborts).

{
onFailure(stream, error, reason, callback);
}

/**
* <p>Callback method invoked when the stream failed.</p>
*
* @param stream the stream
* @param error the error code
* @param reason the error reason, or null
* @param callback the callback to complete when the failure has been handled
* @deprecated use {@link #onFailure(Stream, int, String, Throwable, Callback)} instead
*/
@Deprecated
default void onFailure(Stream stream, int error, String reason, Callback callback)
{
callback.succeeded();
Expand Down
Loading