Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Jetty 9.4.x 4855 h2spec failures #4946

Merged
merged 10 commits into from
Jun 9, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
return customize(connection, context);
}

private class HTTP2ClientConnection extends HTTP2Connection implements Callback
private static class HTTP2ClientConnection extends HTTP2Connection implements Callback
{
private final HTTP2Client client;
private final Promise<Session> promise;
Expand Down Expand Up @@ -154,7 +154,7 @@ public void failed(Throwable x)
}
}

private class ConnectionListener implements Connection.Listener
private static class ConnectionListener implements Connection.Listener
{
@Override
public void onOpened(Connection connection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import javax.servlet.http.HttpServlet;

import org.eclipse.jetty.http.HostPortHttpField;
Expand All @@ -33,7 +32,7 @@
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.server.AbstractHTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory;
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.HttpConfiguration;
Expand All @@ -54,7 +53,7 @@ public class AbstractTest

protected void start(HttpServlet servlet) throws Exception
{
HTTP2ServerConnectionFactory connectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration());
HTTP2CServerConnectionFactory connectionFactory = new HTTP2CServerConnectionFactory(new HttpConfiguration());
connectionFactory.setInitialSessionRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
connectionFactory.setInitialStreamRecvWindow(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
prepareServer(connectionFactory);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

package org.eclipse.jetty.http2.client;

import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
Expand All @@ -41,6 +45,7 @@
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.FrameType;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PingFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
Expand All @@ -63,6 +68,7 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class PrefaceTest extends AbstractTest
Expand Down Expand Up @@ -332,4 +338,71 @@ public void onHeaders(HeadersFrame frame)
assertTrue(clientSettingsLatch.await(5, TimeUnit.SECONDS));
}
}

@Test
public void testInvalidServerPreface() throws Exception
{
try (ServerSocket server = new ServerSocket(0))
{
prepareClient();
client.start();

CountDownLatch failureLatch = new CountDownLatch(1);
Promise.Completable<Session> promise = new Promise.Completable<>();
InetSocketAddress address = new InetSocketAddress("localhost", server.getLocalPort());
client.connect(address, new Session.Listener.Adapter()
{
@Override
public void onFailure(Session session, Throwable failure)
{
failureLatch.countDown();
}
}, promise);

try (Socket socket = server.accept())
{
OutputStream output = socket.getOutputStream();
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));

Session session = promise.get(5, TimeUnit.SECONDS);
assertNotNull(session);

assertTrue(failureLatch.await(5, TimeUnit.SECONDS));

// Verify that the client closed the socket.
InputStream input = socket.getInputStream();
while (true)
{
int read = input.read();
if (read < 0)
break;
}
}
}
}

@Test
public void testInvalidClientPreface() throws Exception
{
start(new ServerSessionListener.Adapter());

try (Socket client = new Socket("localhost", connector.getLocalPort()))
{
OutputStream output = client.getOutputStream();
output.write("enough_junk_bytes".getBytes(StandardCharsets.UTF_8));
output.flush();

byte[] bytes = new byte[1024];
InputStream input = client.getInputStream();
int read = input.read(bytes);
if (read < 0)
{
// Closing the connection without GOAWAY frame is fine.
return;
}

int type = bytes[3];
assertEquals(FrameType.GO_AWAY.getType(), type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -425,12 +425,21 @@ public void failed(Throwable x)
super.failed(x);
}

/**
* @return whether the entry is stale and must not be processed
*/
private boolean isStale()
{
return !isProtocol() && stream != null && stream.isReset();
// If it is a protocol frame, process it.
if (isProtocolFrame(frame))
return false;
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isReset();
}

private boolean isProtocol()
private boolean isProtocolFrame(Frame frame)
{
switch (frame.getType())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public void onGoAway(final GoAwayFrame frame)
// We received a GO_AWAY, so try to write
// what's in the queue and then disconnect.
closeFrame = frame;
notifyClose(this, frame, new DisconnectCallback());
onClose(frame, new DisconnectCallback());
return;
}
break;
Expand Down Expand Up @@ -498,6 +498,11 @@ public void onWindowUpdate(WindowUpdateFrame frame)
public void onStreamFailure(int streamId, int error, String reason)
{
Callback callback = new ResetCallback(streamId, error, Callback.NOOP);
onStreamFailure(streamId, error, reason, callback);
}

private void onStreamFailure(int streamId, int error, String reason, Callback callback)
{
IStream stream = getStream(streamId);
if (stream != null)
stream.process(new FailureFrame(error, reason), callback);
Expand All @@ -513,7 +518,28 @@ public void onConnectionFailure(int error, String reason)

protected void onConnectionFailure(int error, String reason, Callback callback)
{
notifyFailure(this, new IOException(String.format("%d/%s", error, reason)), new CloseCallback(error, reason, callback));
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(new CloseCallback(error, reason, callback), count + 1);
for (Stream stream : streams)
{
onStreamFailure(stream.getId(), error, reason, countCallback);
}
IOException failure = new IOException(String.format("%s/%s", ErrorCode.toString(error, null), reason));
notifyFailure(this, failure, countCallback);
}

private void onClose(GoAwayFrame frame, Callback callback)
{
Collection<Stream> streams = getStreams();
int count = streams.size();
Callback countCallback = new CountingCallback(callback, count + 1);
String reason = frame.tryConvertPayload();
for (Stream stream : streams)
{
onStreamFailure(stream.getId(), frame.getError(), reason, countCallback);
}
notifyClose(this, frame, countCallback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ private boolean startWrite(Callback callback)
{
if (writing.compareAndSet(null, callback))
return true;
close();
callback.failed(new WritePendingException());
return false;
}
Expand Down Expand Up @@ -177,7 +176,7 @@ public boolean isClosed()
public boolean isRemotelyClosed()
{
CloseState state = closeState.get();
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING;
return state == CloseState.REMOTELY_CLOSED || state == CloseState.CLOSING || state == CloseState.CLOSED;
}

public boolean isLocallyClosed()
Expand Down Expand Up @@ -358,6 +357,8 @@ private void onWindowUpdate(WindowUpdateFrame frame, Callback callback)

private void onFailure(FailureFrame frame, Callback callback)
{
// Don't close or remove the stream, as the listener may
// want to use it, for example to send a RST_STREAM frame.
notifyFailure(this, frame, callback);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
Expand All @@ -41,7 +40,6 @@
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
Expand All @@ -58,7 +56,6 @@
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.TypeUtil;

public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
Expand Down Expand Up @@ -214,13 +211,17 @@ public boolean onStreamTimeout(IStream stream, Throwable failure)
public void onStreamFailure(IStream stream, Throwable failure, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", stream, failure);
LOG.debug("Processing stream failure on {}", stream, failure);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttachment();
if (channel != null)
{
Runnable task = channel.onFailure(failure, callback);
if (task != null)
{
// We must dispatch to another thread because the task
// may call application code that performs blocking I/O.
offerTask(task, true);
}
}
else
{
Expand All @@ -245,22 +246,10 @@ public boolean onSessionTimeout(Throwable failure)

public void onSessionFailure(Throwable failure, Callback callback)
{
ISession session = getSession();
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", session, failure);
Collection<Stream> streams = session.getStreams();
if (streams.isEmpty())
{
callback.succeeded();
}
else
{
CountingCallback counter = new CountingCallback(callback, streams.size());
for (Stream stream : streams)
{
onStreamFailure((IStream)stream, failure, counter);
}
}
LOG.debug("Processing session failure on {}", getSession(), failure);
// All the streams have already been failed, just succeed the callback.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this method still required? It doesn't override any contract?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

callback.succeeded();
}

public void push(Connector connector, IStream stream, MetaData.Request request)
Expand Down
Loading