Skip to content

Commit

Permalink
Fixes #12313 - Jetty 12 ee9/ee10 doesn't invoke callbacks when h2 cli…
Browse files Browse the repository at this point in the history
…ent sends RST_STREAM.

* Fixed invocation of AsyncListener.onError(), now called even if the response is already committed, in both EE9 and EE10.
* Reworked EE9 HttpChannel state machine in case of failures to be like EE10's.
  In particular, calling abort now is a state change, rather than a failure of the Handler callback.
  In this way, the handle() loop continues, enters case TERMINATED, and the callback is completed in onCompleted().
* Fixed EE9 handling of idle timeout in HttpChannel.onRequest(), that was missing.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored and joakime committed Oct 24, 2024
1 parent 291e50a commit 4b27217
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ public void flush() throws IOException
private void checkWritable() throws EofException
{
if (_softClose)
throw new EofException("Closed");
throw new EofException("Closed");

switch (_state)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -662,20 +662,15 @@ else if (noStack != null)
LOG.warn(request == null ? "unknown request" : request.getServletApiRequest().getRequestURI(), failure);
}

if (isCommitted())
try
{
abort(failure);
boolean abort = _state.onError(failure);
if (abort)
abort(failure);
}
else
catch (Throwable x)
{
try
{
_state.onError(failure);
}
catch (IllegalStateException e)
{
abort(failure);
}
abort(failure);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,14 +871,15 @@ public boolean onIdleTimeout(TimeoutException timeout)
}
}

protected void onError(Throwable th)
protected boolean onError(Throwable th)
{
boolean committed = _servletChannel.isCommitted();
final AsyncContextEvent asyncEvent;
final List<AsyncListener> asyncListeners;
try (AutoLock ignored = lock())
{
if (LOG.isDebugEnabled())
LOG.debug("thrownException {}", getStatusStringLocked(), th);
LOG.debug("onError {}", getStatusStringLocked(), th);

// This can only be called from within the handle loop
if (_state != State.HANDLING)
Expand All @@ -887,34 +888,42 @@ protected void onError(Throwable th)
// If sendError has already been called, we can only handle one failure at a time!
if (_sendError)
{
LOG.warn("unhandled due to prior sendError", th);
return;
LOG.warn("onError not handled due to prior sendError() {}", getStatusStringLocked(), th);
return false;
}

// Check async state to determine type of handling
switch (_requestState)
{
case BLOCKING:
// handle the exception with a sendError
{
// Handle the exception with a sendError.
if (committed)
return true;
sendError(th);
return;

return false;
}
case DISPATCH: // Dispatch has already been called, but we ignore and handle exception below
case COMPLETE: // Complete has already been called, but we ignore and handle exception below
case ASYNC:
{
if (_asyncListeners == null || _asyncListeners.isEmpty())
{
if (committed)
return true;
sendError(th);
return;
return false;
}
asyncEvent = _event;
asyncEvent.addThrowable(th);
asyncListeners = _asyncListeners;
break;

}
default:
LOG.warn("unhandled in state {}", _requestState, new IllegalStateException(th));
return;
{
LOG.warn("onError not handled due to invalid requestState {}", getStatusStringLocked(), th);
return false;
}
}
}

Expand Down Expand Up @@ -945,7 +954,10 @@ protected void onError(Throwable th)
{
// The listeners did not invoke API methods and the
// container must provide a default error dispatch.
if (committed)
return true;
sendError(th);
return false;
}
else if (_requestState != RequestState.COMPLETE)
{
Expand All @@ -954,12 +966,14 @@ else if (_requestState != RequestState.COMPLETE)
else
LOG.warn("unhandled in state {}", _requestState, new IllegalStateException(th));
}
return committed;
}
}

private void sendError(Throwable th)
{
// No sync as this is always called with lock held
assert _lock.isHeldByCurrentThread();

// Determine the actual details of the exception
final Request request = _servletChannel.getServletContextRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1160,7 +1160,7 @@ protected ContextRequest wrapRequest(Request request, Response response)

ServletContextRequest servletContextRequest = newServletContextRequest(servletChannel, request, response, decodedPathInContext, matchedResource);
servletChannel.associate(servletContextRequest);
Request.addCompletionListener(request, servletChannel::recycle);
Request.addCompletionListener(servletContextRequest, servletChannel::recycle);
return servletContextRequest;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

package org.eclipse.jetty.ee11.test.client.transport;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -21,12 +22,14 @@
import jakarta.servlet.AsyncContext;
import jakarta.servlet.AsyncEvent;
import jakarta.servlet.AsyncListener;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServlet;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.eclipse.jetty.ee11.servlet.ServletContextHandler;
import org.eclipse.jetty.ee11.servlet.ServletHolder;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpURI;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MetaData;
Expand Down Expand Up @@ -54,11 +57,12 @@

public class Http2AsyncIOServletTest
{
private final HttpConfiguration httpConfig = new HttpConfiguration();
private Server server;
private ServerConnector connector;
private HTTP2Client client;

private void start(HttpConfiguration httpConfig, HttpServlet httpServlet) throws Exception
private void start(HttpServlet httpServlet) throws Exception
{
server = new Server();
connector = new ServerConnector(server, 1, 1, new HTTP2CServerConnectionFactory(httpConfig));
Expand All @@ -83,12 +87,10 @@ public void tearDown()
@ValueSource(booleans = {true, false})
public void testStartAsyncThenClientResetRemoteErrorNotification(boolean notify) throws Exception
{
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setNotifyRemoteAsyncErrors(notify);

AtomicReference<AsyncEvent> errorAsyncEventRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
start(httpConfig, new HttpServlet()
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
Expand Down Expand Up @@ -138,14 +140,82 @@ public void onStartAsync(AsyncEvent event)
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code));

if (notify)
{
// Wait for the reset to be notified to the async context listener.
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
AsyncEvent asyncEvent = errorAsyncEventRef.get();
return asyncEvent == null ? null : asyncEvent.getThrowable();
}, instanceOf(EofException.class));
}
else
{
// Wait for the reset to NOT be notified to the failure listener.
await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).until(errorAsyncEventRef::get, nullValue());
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testClientResetNotifiesAsyncListener(boolean commitResponse) throws Exception
{
CountDownLatch requestLatch = new CountDownLatch(1);
CountDownLatch errorLatch = new CountDownLatch(1);
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
if (commitResponse)
response.flushBuffer();

AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);

asyncContext.addListener(new AsyncListener()
{
@Override
public void onComplete(AsyncEvent event)
{
}

@Override
public void onTimeout(AsyncEvent event)
{
}

@Override
public void onError(AsyncEvent event)
{
if (!response.isCommitted())
response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500);
asyncContext.complete();
errorLatch.countDown();
}

@Override
public void onStartAsync(AsyncEvent event)
{
}
});

requestLatch.countDown();
}
});

Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {})
.get(5, TimeUnit.SECONDS);
MetaData.Request request = new MetaData.Request("GET", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY);
Stream stream = session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);

// Wait for the server to become idle after the request.
assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
Thread.sleep(500);

// Reset the stream.
stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code));

assertTrue(errorLatch.await(5, TimeUnit.SECONDS));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand All @@ -56,17 +57,18 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

@ExtendWith(WorkDirExtension.class)
public class Http3AsyncIOServletTest
{
public WorkDir workDir;

private final HttpConfiguration httpConfig = new HttpConfiguration();
private Server server;
private QuicServerConnector connector;
private HTTP3Client client;

private void start(HttpConfiguration httpConfig, HttpServlet httpServlet) throws Exception
private void start(HttpServlet httpServlet) throws Exception
{
server = new Server();
SslContextFactory.Server serverSslContextFactory = new SslContextFactory.Server();
Expand Down Expand Up @@ -95,12 +97,10 @@ public void tearDown()
@ValueSource(booleans = {true, false})
public void testStartAsyncThenClientResetRemoteErrorNotification(boolean notify) throws Exception
{
HttpConfiguration httpConfig = new HttpConfiguration();
httpConfig.setNotifyRemoteAsyncErrors(notify);

AtomicReference<AsyncEvent> errorAsyncEventRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
start(httpConfig, new HttpServlet()
start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse response)
Expand Down Expand Up @@ -158,4 +158,14 @@ public void onStartAsync(AsyncEvent event)
// Wait for the reset to NOT be notified to the failure listener.
await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).until(errorAsyncEventRef::get, nullValue());
}

@Test
public void testClientResetNotifiesAsyncListener()
{
// See the equivalent test in Http2AsyncIOServletTest for HTTP/2.
// For HTTP/3 we do not have a "reset" event that we can relay to applications,
// because HTTP/3 does not have a "reset" frame; QUIC has RESET_STREAM, but we
// do not have an event from Quiche to reliably report it to applications.
assumeTrue(false);
}
}

0 comments on commit 4b27217

Please sign in to comment.