From a1a1ac0163028ef8bdc04f59ca8892134a627cb6 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Thu, 3 Jun 2021 15:20:42 +0200
Subject: [PATCH] Fixes #6323 - HttpClient requests with redirects gets
stuck/never calls onComplete()
* Reworked the total timeout handling.
* Now a CyclicTimeouts handles the exchanges in each HttpDestination,
and a CyclicTimeouts handles the exchanges in each HttpConnection
(rather than in HttpChannel).
* Now adjusting the total timeout for copied requests generated by
redirects and authentication.
Signed-off-by: Simone Bordet
(cherry picked from commit 2e7d17400f44097b95f2110ba5b595ec90fbe03f)
---
.../client/AuthenticationProtocolHandler.java | 22 +-
.../org/eclipse/jetty/client/HttpChannel.java | 21 +-
.../eclipse/jetty/client/HttpConnection.java | 59 +++-
.../jetty/client/HttpConversation.java | 15 +
.../eclipse/jetty/client/HttpDestination.java | 2 +-
.../eclipse/jetty/client/HttpReceiver.java | 4 +-
.../eclipse/jetty/client/HttpRedirector.java | 43 ++-
.../jetty/client/TimeoutCompleteListener.java | 5 +
.../client/http/HttpConnectionOverHTTP.java | 10 +
.../jetty/client/HttpClientRedirectTest.java | 113 +++++++-
.../client/http/HttpConnectionOverFCGI.java | 36 +++
.../client/http/HttpConnectionOverHTTP2.java | 8 +
.../org/eclipse/jetty/io/CyclicTimeouts.java | 62 ++++-
.../eclipse/jetty/io/CyclicTimeoutsTest.java | 261 ++++++++++++++++++
.../jetty/server/AsyncContentProducer.java | 2 +-
.../jetty/server/HttpChannelState.java | 2 +-
16 files changed, 612 insertions(+), 53 deletions(-)
create mode 100644 jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java
index afa6eb49f4fd..87fe1eca490b 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java
@@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -211,8 +212,25 @@ public void onComplete(Result result)
path = request.getPath();
}
Request newRequest = client.copyRequest(request, requestURI);
- // Disable the timeout so that only the one from the initial request applies.
- newRequest.timeout(0, TimeUnit.MILLISECONDS);
+
+ // Adjust the timeout of the new request, taking into account the
+ // timeout of the previous request and the time already elapsed.
+ long timeoutAt = request.getTimeoutAt();
+ if (timeoutAt < Long.MAX_VALUE)
+ {
+ long newTimeout = timeoutAt - System.nanoTime();
+ if (newTimeout > 0)
+ {
+ newRequest.timeout(newTimeout, TimeUnit.NANOSECONDS);
+ }
+ else
+ {
+ TimeoutException failure = new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed");
+ forwardFailureComplete(request, failure, response, failure);
+ return;
+ }
+ }
+
if (path != null)
newRequest.path(path);
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java
index 9c57d2705796..af45fdb882e8 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java
@@ -14,28 +14,26 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Result;
+import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public abstract class HttpChannel
+public abstract class HttpChannel implements CyclicTimeouts.Expirable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpChannel.class);
private final AutoLock _lock = new AutoLock();
private final HttpDestination _destination;
- private final TimeoutCompleteListener _totalTimeout;
private HttpExchange _exchange;
protected HttpChannel(HttpDestination destination)
{
_destination = destination;
- _totalTimeout = new TimeoutCompleteListener(destination.getHttpClient().getScheduler());
}
public void destroy()
{
- _totalTimeout.destroy();
}
public HttpDestination getHttpDestination()
@@ -106,6 +104,13 @@ public HttpExchange getHttpExchange()
}
}
+ @Override
+ public long getExpireNanoTime()
+ {
+ HttpExchange exchange = getHttpExchange();
+ return exchange != null ? exchange.getExpireNanoTime() : Long.MAX_VALUE;
+ }
+
protected abstract HttpSender getHttpSender();
protected abstract HttpReceiver getHttpReceiver();
@@ -114,15 +119,7 @@ public void send()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
- {
- long timeoutAt = exchange.getExpireNanoTime();
- if (timeoutAt != Long.MAX_VALUE)
- {
- exchange.getResponseListeners().add(_totalTimeout);
- _totalTimeout.schedule(exchange.getRequest(), timeoutAt);
- }
send(exchange);
- }
}
public abstract void send(HttpExchange exchange);
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
index ffa23a313024..1886aa080e9b 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java
@@ -17,6 +17,7 @@
import java.net.HttpCookie;
import java.net.URI;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -30,9 +31,11 @@
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.thread.AutoLock;
+import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,6 +45,7 @@ public abstract class HttpConnection implements IConnection, Attachable
private final AutoLock lock = new AutoLock();
private final HttpDestination destination;
+ private final RequestTimeouts requestTimeouts;
private Object attachment;
private int idleTimeoutGuard;
private long idleTimeoutStamp;
@@ -49,6 +53,7 @@ public abstract class HttpConnection implements IConnection, Attachable
protected HttpConnection(HttpDestination destination)
{
this.destination = destination;
+ this.requestTimeouts = new RequestTimeouts(destination.getHttpClient().getScheduler());
this.idleTimeoutStamp = System.nanoTime();
}
@@ -62,6 +67,8 @@ public HttpDestination getHttpDestination()
return destination;
}
+ protected abstract Iterator getHttpChannels();
+
@Override
public void send(Request request, Response.CompleteListener listener)
{
@@ -99,6 +106,7 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
SendFailure result;
if (channel.associate(exchange))
{
+ requestTimeouts.schedule(channel);
channel.send();
result = null;
}
@@ -228,16 +236,6 @@ private StringBuilder convertCookies(List cookies, StringBuilder bui
return builder;
}
- private void applyProxyAuthentication(Request request, ProxyConfiguration.Proxy proxy)
- {
- if (proxy != null)
- {
- Authentication.Result result = getHttpClient().getAuthenticationStore().findAuthenticationResult(proxy.getURI());
- if (result != null)
- result.apply(request);
- }
- }
-
private void applyRequestAuthentication(Request request)
{
AuthenticationStore authenticationStore = getHttpClient().getAuthenticationStore();
@@ -253,6 +251,16 @@ private void applyRequestAuthentication(Request request)
}
}
+ private void applyProxyAuthentication(Request request, ProxyConfiguration.Proxy proxy)
+ {
+ if (proxy != null)
+ {
+ Authentication.Result result = getHttpClient().getAuthenticationStore().findAuthenticationResult(proxy.getURI());
+ if (result != null)
+ result.apply(request);
+ }
+ }
+
public boolean onIdleTimeout(long idleTimeout)
{
try (AutoLock l = lock.lock())
@@ -288,9 +296,40 @@ public Object getAttachment()
return attachment;
}
+ protected void destroy()
+ {
+ requestTimeouts.destroy();
+ }
+
@Override
public String toString()
{
return String.format("%s@%h", getClass().getSimpleName(), this);
}
+
+ private class RequestTimeouts extends CyclicTimeouts
+ {
+ private RequestTimeouts(Scheduler scheduler)
+ {
+ super(scheduler);
+ }
+
+ @Override
+ protected Iterator iterator()
+ {
+ return getHttpChannels();
+ }
+
+ @Override
+ protected boolean onExpired(HttpChannel channel)
+ {
+ HttpExchange exchange = channel.getHttpExchange();
+ if (exchange != null)
+ {
+ HttpRequest request = exchange.getRequest();
+ request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
+ }
+ return false;
+ }
+ }
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java
index 002411fb3ebd..7e63cadd9d09 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java
@@ -18,6 +18,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;
+import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.AttributesMap;
import org.slf4j.Logger;
@@ -138,6 +139,20 @@ public void updateResponseListeners(Response.ResponseListener overrideListener)
this.listeners = listeners;
}
+ /**
+ * Returns the total timeout for the conversation.
+ * The conversation total timeout is the total timeout
+ * of the first request in the conversation.
+ *
+ * @return the total timeout of the conversation
+ * @see Request#getTimeout()
+ */
+ public long getTimeout()
+ {
+ HttpExchange firstExchange = exchanges.peekFirst();
+ return firstExchange == null ? 0 : firstExchange.getRequest().getTimeout();
+ }
+
public boolean abort(Throwable cause)
{
HttpExchange exchange = exchanges.peekLast();
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
index a941b45caf26..571dea7aa97d 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java
@@ -525,7 +525,7 @@ public interface Multiplexed
/**
* Enforces the total timeout for for exchanges that are still in the queue.
* The total timeout for exchanges that are not in the destination queue
- * is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.
+ * is enforced in {@link HttpConnection}.
*/
private class RequestTimeouts extends CyclicTimeouts
{
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
index a51d541c6256..e1dd3ace7cc1 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java
@@ -471,9 +471,9 @@ private void terminateResponse(HttpExchange exchange, Result result)
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(exchange, result);
- if (LOG.isDebugEnabled())
- LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
List listeners = exchange.getConversation().getResponseListeners();
+ if (LOG.isDebugEnabled())
+ LOG.debug("Request/Response {}: {}, notifying {}", failure == null ? "succeeded" : "failed", result, listeners);
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);
if (ordered)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRedirector.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRedirector.java
index 539b090a5157..c4ea3055dc0d 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRedirector.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRedirector.java
@@ -19,6 +19,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -106,8 +107,8 @@ public boolean isRedirect(Response response)
*/
public Result redirect(Request request, Response response) throws InterruptedException, ExecutionException
{
- final AtomicReference resultRef = new AtomicReference<>();
- final CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference resultRef = new AtomicReference<>();
+ CountDownLatch latch = new CountDownLatch(1);
Request redirect = redirect(request, response, new BufferingResponseListener()
{
@Override
@@ -302,13 +303,29 @@ private URI sanitize(String location)
}
}
- private Request sendRedirect(final HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method)
+ private Request sendRedirect(HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method)
{
try
{
Request redirect = client.copyRequest(httpRequest, location);
- // Disable the timeout so that only the one from the initial request applies.
- redirect.timeout(0, TimeUnit.MILLISECONDS);
+
+ // Adjust the timeout of the new request, taking into account the
+ // timeout of the previous request and the time already elapsed.
+ long timeoutAt = httpRequest.getTimeoutAt();
+ if (timeoutAt < Long.MAX_VALUE)
+ {
+ long newTimeout = timeoutAt - System.nanoTime();
+ if (newTimeout > 0)
+ {
+ redirect.timeout(newTimeout, TimeUnit.NANOSECONDS);
+ }
+ else
+ {
+ TimeoutException failure = new TimeoutException("Total timeout " + httpRequest.getConversation().getTimeout() + " ms elapsed");
+ fail(httpRequest, failure, response);
+ return null;
+ }
+ }
// Use given method
redirect.method(method);
@@ -325,17 +342,27 @@ private Request sendRedirect(final HttpRequest httpRequest, Response response, R
}
catch (Throwable x)
{
- fail(httpRequest, response, x);
+ fail(httpRequest, x, response);
return null;
}
}
protected void fail(Request request, Response response, Throwable failure)
+ {
+ fail(request, null, response, failure);
+ }
+
+ protected void fail(Request request, Throwable failure, Response response)
+ {
+ fail(request, failure, response, failure);
+ }
+
+ private void fail(Request request, Throwable requestFailure, Response response, Throwable responseFailure)
{
HttpConversation conversation = ((HttpRequest)request).getConversation();
conversation.updateResponseListeners(null);
List listeners = conversation.getResponseListeners();
- notifier.notifyFailure(listeners, response, failure);
- notifier.notifyComplete(listeners, new Result(request, response, failure));
+ notifier.notifyFailure(listeners, response, responseFailure);
+ notifier.notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
}
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java
index d8c2ca140369..f9ded6b49e30 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java
@@ -21,10 +21,15 @@
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeout;
+import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * @deprecated Do not use it, use {@link CyclicTimeouts} instead.
+ */
+@Deprecated
public class TimeoutCompleteListener extends CyclicTimeout implements Response.CompleteListener
{
private static final Logger LOG = LoggerFactory.getLogger(TimeoutCompleteListener.class);
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
index 489afc1c2e92..8ba22b8b3c6c 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java
@@ -15,6 +15,8 @@
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
+import java.util.Collections;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -22,6 +24,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
+import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpConversation;
@@ -266,6 +269,12 @@ private Delegate(HttpDestination destination)
super(destination);
}
+ @Override
+ protected Iterator getHttpChannels()
+ {
+ return Collections.singleton(channel).iterator();
+ }
+
@Override
public SendFailure send(HttpExchange exchange)
{
@@ -322,6 +331,7 @@ protected void normalizeRequest(HttpRequest request)
public void close()
{
HttpConnectionOverHTTP.this.close();
+ destroy();
}
@Override
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientRedirectTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientRedirectTest.java
index 5cb952f2fb2d..9a7a4065b854 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientRedirectTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientRedirectTest.java
@@ -516,6 +516,117 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
});
}
+ @ParameterizedTest
+ @ArgumentsSource(ScenarioProvider.class)
+ public void testRedirectToDifferentHostThenRequestToFirstHostExpires(Scenario scenario) throws Exception
+ {
+ long timeout = 1000;
+ start(scenario, new EmptyServerHandler()
+ {
+ @Override
+ protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
+ {
+ if ("/one".equals(target))
+ {
+ response.setStatus(HttpStatus.SEE_OTHER_303);
+ response.setHeader(HttpHeader.LOCATION.asString(), scenario.getScheme() + "://127.0.0.1:" + connector.getLocalPort() + "/two");
+ }
+ else if ("/two".equals(target))
+ {
+ try
+ {
+ // Send another request to "localhost", therefore reusing the
+ // connection used for the first request, it must timeout.
+ CountDownLatch latch = new CountDownLatch(1);
+ client.newRequest("localhost", connector.getLocalPort())
+ .scheme(scenario.getScheme())
+ .path("/three")
+ .timeout(timeout, TimeUnit.MILLISECONDS)
+ .send(result ->
+ {
+ if (result.getFailure() instanceof TimeoutException)
+ latch.countDown();
+ });
+ // Wait for the request to fail as it should.
+ assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS));
+ }
+ catch (Throwable x)
+ {
+ throw new ServletException(x);
+ }
+ }
+ else if ("/three".equals(target))
+ {
+ try
+ {
+ // The third request must timeout.
+ Thread.sleep(2 * timeout);
+ }
+ catch (InterruptedException x)
+ {
+ throw new ServletException(x);
+ }
+ }
+ }
+ });
+
+ ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
+ .scheme(scenario.getScheme())
+ .path("/one")
+ // The timeout should not expire, but must be present to trigger the test conditions.
+ .timeout(3 * timeout, TimeUnit.MILLISECONDS)
+ .send();
+ assertEquals(HttpStatus.OK_200, response.getStatus());
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(ScenarioProvider.class)
+ public void testManyRedirectsTotalTimeoutExpires(Scenario scenario) throws Exception
+ {
+ long timeout = 1000;
+ start(scenario, new EmptyServerHandler()
+ {
+ @Override
+ protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
+ {
+ try
+ {
+ String serverURI = scenario.getScheme() + "://localhost:" + connector.getLocalPort();
+ if ("/one".equals(target))
+ {
+ Thread.sleep(timeout);
+ response.setStatus(HttpStatus.SEE_OTHER_303);
+ response.setHeader(HttpHeader.LOCATION.asString(), serverURI + "/two");
+ }
+ else if ("/two".equals(target))
+ {
+ Thread.sleep(timeout);
+ response.setStatus(HttpStatus.SEE_OTHER_303);
+ response.setHeader(HttpHeader.LOCATION.asString(), serverURI + "/three");
+ }
+ else if ("/three".equals(target))
+ {
+ Thread.sleep(2 * timeout);
+ }
+ }
+ catch (InterruptedException x)
+ {
+ throw new ServletException(x);
+ }
+ }
+ });
+
+ assertThrows(TimeoutException.class, () ->
+ {
+ client.setMaxRedirects(-1);
+ client.newRequest("localhost", connector.getLocalPort())
+ .scheme(scenario.getScheme())
+ .path("/one")
+ .timeout(3 * timeout, TimeUnit.MILLISECONDS)
+ .send();
+ });
+ }
+
private void testSameMethodRedirect(final Scenario scenario, final HttpMethod method, int redirectCode) throws Exception
{
testMethodRedirect(scenario, method, method, redirectCode);
@@ -564,7 +675,7 @@ else if (pass == 2)
assertEquals(200, response.getStatus());
}
- private class RedirectHandler extends EmptyServerHandler
+ private static class RedirectHandler extends EmptyServerHandler
{
@Override
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java
index e57306b68874..19ee20518dc5 100644
--- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java
+++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java
@@ -16,6 +16,7 @@
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
@@ -366,6 +367,12 @@ private Delegate(HttpDestination destination)
super(destination);
}
+ @Override
+ protected Iterator getHttpChannels()
+ {
+ return new IteratorWrapper<>(activeChannels.values().iterator());
+ }
+
@Override
public SendFailure send(HttpExchange exchange)
{
@@ -384,6 +391,7 @@ public SendFailure send(HttpExchange exchange)
public void close()
{
HttpConnectionOverFCGI.this.close();
+ destroy();
}
protected void close(Throwable failure)
@@ -504,4 +512,32 @@ private void noChannel(int request)
LOG.debug("Channel not found for request {}", request);
}
}
+
+ private static final class IteratorWrapper implements Iterator
+ {
+ private final Iterator extends T> iterator;
+
+ private IteratorWrapper(Iterator extends T> iterator)
+ {
+ this.iterator = iterator;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return iterator.hasNext();
+ }
+
+ @Override
+ public T next()
+ {
+ return iterator.next();
+ }
+
+ @Override
+ public void remove()
+ {
+ iterator.remove();
+ }
+ }
}
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
index 429957eef35f..3c9f895f95ef 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
@@ -14,6 +14,7 @@
package org.eclipse.jetty.http2.client.http;
import java.nio.channels.AsynchronousCloseException;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -77,6 +78,12 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels)
this.recycleHttpChannels = recycleHttpChannels;
}
+ @Override
+ protected Iterator getHttpChannels()
+ {
+ return activeChannels.iterator();
+ }
+
@Override
public SendFailure send(HttpExchange exchange)
{
@@ -184,6 +191,7 @@ public boolean onIdleTimeout(long idleTimeout)
public void close()
{
close(new AsynchronousCloseException());
+ destroy();
}
protected void close(Throwable failure)
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java
index c57cb058d109..12b8ad9e6681 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java
@@ -51,14 +51,7 @@ public abstract class CyclicTimeouts impleme
public CyclicTimeouts(Scheduler scheduler)
{
- cyclicTimeout = new CyclicTimeout(scheduler)
- {
- @Override
- public void onTimeoutExpired()
- {
- CyclicTimeouts.this.onTimeoutExpired();
- }
- };
+ cyclicTimeout = new Timeouts(scheduler);
}
/**
@@ -68,6 +61,9 @@ public void onTimeoutExpired()
/**
* Invoked during the iteration when the given entity is expired.
+ * This method may be invoked multiple times, and even concurrently,
+ * for the same expirable entity and therefore the expiration of the
+ * entity, if any, should be an idempotent action.
*
* @param expirable the entity that is expired
* @return whether the entity should be removed from the iterator via {@link Iterator#remove()}
@@ -77,7 +73,7 @@ public void onTimeoutExpired()
private void onTimeoutExpired()
{
if (LOG.isDebugEnabled())
- LOG.debug("{} timeouts check", this);
+ LOG.debug("Timeouts check for {}", this);
long now = System.nanoTime();
long earliest = Long.MAX_VALUE;
@@ -87,18 +83,29 @@ private void onTimeoutExpired()
// be seen during the iteration below.
earliestTimeout.set(earliest);
+ Iterator iterator = iterator();
+ if (iterator == null)
+ return;
+
// Scan the entities to abort expired entities
// and to find the entity that expires the earliest.
- Iterator iterator = iterator();
while (iterator.hasNext())
{
T expirable = iterator.next();
long expiresAt = expirable.getExpireNanoTime();
+
+ if (LOG.isDebugEnabled())
+ LOG.debug("Entity {} expires in {} ms for {}", expirable, TimeUnit.NANOSECONDS.toMillis(expiresAt - now), this);
+
if (expiresAt == -1)
continue;
+
if (expiresAt <= now)
{
- if (onExpired(expirable))
+ boolean remove = onExpired(expirable);
+ if (LOG.isDebugEnabled())
+ LOG.debug("Entity {} expired, remove={} for {}", expirable, remove, this);
+ if (remove)
iterator.remove();
continue;
}
@@ -127,13 +134,19 @@ private void schedule(long expiresAt)
// When the timeout expires, scan the entities for the next
// earliest entity that may expire, and reschedule a new timeout.
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
- if (expiresAt < prevEarliest)
+ long expires = expiresAt;
+ while (expires < prevEarliest)
{
// A new entity expires earlier than previous entities, schedule it.
- long delay = Math.max(0, expiresAt - System.nanoTime());
+ long delay = Math.max(0, expires - System.nanoTime());
if (LOG.isDebugEnabled())
- LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
- cyclicTimeout.schedule(delay, TimeUnit.NANOSECONDS);
+ LOG.debug("Scheduling timeout in {} ms for {}", TimeUnit.NANOSECONDS.toMillis(delay), this);
+ schedule(cyclicTimeout, delay, TimeUnit.NANOSECONDS);
+
+ // If we lost a race and overwrote a schedule() with an earlier time, then that earlier time
+ // is remembered by earliestTimeout, in which case we will loop and set it again ourselves.
+ prevEarliest = expires;
+ expires = earliestTimeout.get();
}
}
@@ -143,6 +156,11 @@ public void destroy()
cyclicTimeout.destroy();
}
+ boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit)
+ {
+ return cyclicTimeout.schedule(delay, unit);
+ }
+
/**
* An entity that may expire.
*/
@@ -159,4 +177,18 @@ public interface Expirable
*/
public long getExpireNanoTime();
}
+
+ private class Timeouts extends CyclicTimeout
+ {
+ private Timeouts(Scheduler scheduler)
+ {
+ super(scheduler);
+ }
+
+ @Override
+ public void onTimeoutExpired()
+ {
+ CyclicTimeouts.this.onTimeoutExpired();
+ }
+ }
}
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java
new file mode 100644
index 000000000000..b2befd98085e
--- /dev/null
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/CyclicTimeoutsTest.java
@@ -0,0 +1,261 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.eclipse.jetty.util.component.LifeCycle;
+import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
+import org.eclipse.jetty.util.thread.Scheduler;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class CyclicTimeoutsTest
+{
+ private Scheduler scheduler;
+ private CyclicTimeouts timeouts;
+
+ @BeforeEach
+ public void prepare()
+ {
+ scheduler = new ScheduledExecutorScheduler();
+ LifeCycle.start(scheduler);
+ }
+
+ @AfterEach
+ public void dispose()
+ {
+ if (timeouts != null)
+ timeouts.destroy();
+ LifeCycle.stop(scheduler);
+ }
+
+ @Test
+ public void testNoExpirationForNonExpiringEntity() throws Exception
+ {
+ CountDownLatch latch = new CountDownLatch(1);
+ timeouts = new CyclicTimeouts<>(scheduler)
+ {
+ @Override
+ protected Iterator iterator()
+ {
+ latch.countDown();
+ return null;
+ }
+
+ @Override
+ protected boolean onExpired(ConstantExpirable expirable)
+ {
+ return false;
+ }
+ };
+
+ // Schedule an entity that does not expire.
+ timeouts.schedule(ConstantExpirable.noExpire());
+
+ Assertions.assertFalse(latch.await(1, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void testScheduleZero() throws Exception
+ {
+ ConstantExpirable entity = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
+ CountDownLatch iteratorLatch = new CountDownLatch(1);
+ CountDownLatch expiredLatch = new CountDownLatch(1);
+ timeouts = new CyclicTimeouts<>(scheduler)
+ {
+ @Override
+ protected Iterator iterator()
+ {
+ iteratorLatch.countDown();
+ return Collections.emptyIterator();
+ }
+
+ @Override
+ protected boolean onExpired(ConstantExpirable expirable)
+ {
+ expiredLatch.countDown();
+ return false;
+ }
+ };
+
+ timeouts.schedule(entity);
+
+ Assertions.assertTrue(iteratorLatch.await(1, TimeUnit.SECONDS));
+ Assertions.assertFalse(expiredLatch.await(1, TimeUnit.SECONDS));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testIterateAndExpire(boolean remove) throws Exception
+ {
+ ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
+ ConstantExpirable one = ConstantExpirable.ofDelay(1, TimeUnit.SECONDS);
+ Collection collection = new ArrayList<>();
+ collection.add(one);
+ AtomicInteger iterations = new AtomicInteger();
+ CountDownLatch expiredLatch = new CountDownLatch(1);
+ timeouts = new CyclicTimeouts<>(scheduler)
+ {
+ @Override
+ protected Iterator iterator()
+ {
+ iterations.incrementAndGet();
+ return collection.iterator();
+ }
+
+ @Override
+ protected boolean onExpired(ConstantExpirable expirable)
+ {
+ assertSame(one, expirable);
+ expiredLatch.countDown();
+ return remove;
+ }
+ };
+
+ // Triggers immediate call to iterator(), which
+ // returns an entity that expires in 1 second.
+ timeouts.schedule(zero);
+
+ // After 1 second there is a second call to
+ // iterator(), which returns the now expired
+ // entity, which is passed to onExpired().
+ assertTrue(expiredLatch.await(2, TimeUnit.SECONDS));
+
+ // Wait for the collection to be processed
+ // with the return value of onExpired().
+ Thread.sleep(1000);
+
+ // Verify the processing of the return value of onExpired().
+ assertEquals(remove ? 0 : 1, collection.size());
+
+ // Wait to see if iterator() is called again (it should not).
+ Thread.sleep(1000);
+ assertEquals(2, iterations.get());
+ }
+
+ @Test
+ public void testScheduleOvertake() throws Exception
+ {
+ ConstantExpirable zero = ConstantExpirable.ofDelay(0, TimeUnit.SECONDS);
+ long delayMs = 2000;
+ ConstantExpirable two = ConstantExpirable.ofDelay(delayMs, TimeUnit.MILLISECONDS);
+ ConstantExpirable overtake = ConstantExpirable.ofDelay(delayMs / 2, TimeUnit.MILLISECONDS);
+ Collection collection = new ArrayList<>();
+ collection.add(two);
+ CountDownLatch expiredLatch = new CountDownLatch(2);
+ List expired = new ArrayList<>();
+ timeouts = new CyclicTimeouts<>(scheduler)
+ {
+ private final AtomicBoolean overtakeScheduled = new AtomicBoolean();
+
+ @Override
+ protected Iterator iterator()
+ {
+ return collection.iterator();
+ }
+
+ @Override
+ protected boolean onExpired(ConstantExpirable expirable)
+ {
+ expired.add(expirable);
+ expiredLatch.countDown();
+ return true;
+ }
+
+ @Override
+ boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit)
+ {
+ if (delay <= 0)
+ return super.schedule(cyclicTimeout, delay, unit);
+
+ // Simulate that an entity with a shorter timeout
+ // overtakes the entity that is currently being scheduled.
+ // Only schedule the overtake once.
+ if (overtakeScheduled.compareAndSet(false, true))
+ {
+ collection.add(overtake);
+ schedule(overtake);
+ }
+ return super.schedule(cyclicTimeout, delay, unit);
+ }
+ };
+
+ // Trigger the initial call to iterator().
+ timeouts.schedule(zero);
+
+ // Make sure that the overtake entity expires properly.
+ assertTrue(expiredLatch.await(2 * delayMs, TimeUnit.MILLISECONDS));
+
+ // Make sure all entities expired properly.
+ assertSame(overtake, expired.get(0));
+ assertSame(two, expired.get(1));
+ }
+
+ private static class ConstantExpirable implements CyclicTimeouts.Expirable
+ {
+ private static ConstantExpirable noExpire()
+ {
+ return new ConstantExpirable();
+ }
+
+ private static ConstantExpirable ofDelay(long delay, TimeUnit unit)
+ {
+ return new ConstantExpirable(delay, unit);
+ }
+
+ private final long expireNanos;
+ private final String asString;
+
+ private ConstantExpirable()
+ {
+ this.expireNanos = Long.MAX_VALUE;
+ this.asString = "noexp";
+ }
+
+ public ConstantExpirable(long delay, TimeUnit unit)
+ {
+ this.expireNanos = System.nanoTime() + unit.toNanos(delay);
+ this.asString = String.valueOf(unit.toMillis(delay));
+ }
+
+ @Override
+ public long getExpireNanoTime()
+ {
+ return expireNanos;
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("%s@%x[%sms]", getClass().getSimpleName(), hashCode(), asString);
+ }
+ }
+}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java
index 5a20690f5a45..8a0e813665e5 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContentProducer.java
@@ -33,7 +33,7 @@ class AsyncContentProducer implements ContentProducer
private static final Throwable UNCONSUMED_CONTENT_EXCEPTION = new IOException("Unconsumed content")
{
@Override
- public synchronized Throwable fillInStackTrace()
+ public Throwable fillInStackTrace()
{
return this;
}
diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
index 5871d8f5abbf..b9b9b67b1bcd 100644
--- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
+++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java
@@ -681,7 +681,7 @@ public String toString()
}
finally
{
- synchronized (this)
+ try (AutoLock l = lock())
{
_onTimeoutThread = null;
}