Skip to content

Commit

Permalink
Fixes #6323 - HttpClient requests with redirects gets stuck/never cal…
Browse files Browse the repository at this point in the history
…ls onComplete()

* Reworked the total timeout handling.
* Now a CyclicTimeouts handles the exchanges in each HttpDestination,
and a CyclicTimeouts handles the exchanges in each HttpConnection
(rather than in HttpChannel).
* Now adjusting the total timeout for copied requests generated by
redirects and authentication.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed May 28, 2021
1 parent b90ac0c commit 9218a2b
Show file tree
Hide file tree
Showing 15 changed files with 429 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,12 @@ public void onComplete(Result result)
path = request.getPath();
}
Request newRequest = client.copyRequest(request, requestURI);
// Disable the timeout so that only the one from the initial request applies.
newRequest.timeout(0, TimeUnit.MILLISECONDS);

// Adjust the timeout.
long timeoutAt = request.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
newRequest.timeout(timeoutAt - System.nanoTime(), TimeUnit.MILLISECONDS);

if (path != null)
newRequest.path(path);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,24 @@
package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

public abstract class HttpChannel
public abstract class HttpChannel implements CyclicTimeouts.Expirable
{
protected static final Logger LOG = Log.getLogger(HttpChannel.class);

private final HttpDestination _destination;
private final TimeoutCompleteListener _totalTimeout;
private HttpExchange _exchange;

protected HttpChannel(HttpDestination destination)
{
_destination = destination;
_totalTimeout = new TimeoutCompleteListener(destination.getHttpClient().getScheduler());
}

public void destroy()
{
_totalTimeout.destroy();
}

public HttpDestination getHttpDestination()
Expand Down Expand Up @@ -109,6 +107,13 @@ public HttpExchange getHttpExchange()
}
}

@Override
public long getExpireNanoTime()
{
HttpExchange exchange = getHttpExchange();
return exchange != null ? exchange.getExpireNanoTime() : Long.MAX_VALUE;
}

protected abstract HttpSender getHttpSender();

protected abstract HttpReceiver getHttpReceiver();
Expand All @@ -117,16 +122,7 @@ public void send()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
{
HttpRequest request = exchange.getRequest();
long timeoutAt = request.getTimeoutAt();
if (timeoutAt != -1)
{
exchange.getResponseListeners().add(_totalTimeout);
_totalTimeout.schedule(request, timeoutAt);
}
send(exchange);
}
}

public abstract void send(HttpExchange exchange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.net.HttpCookie;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -35,23 +36,27 @@
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;

public abstract class HttpConnection implements Connection, Attachable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);

private final HttpDestination destination;
private final RequestTimeouts requestTimeouts;
private Object attachment;
private int idleTimeoutGuard;
private long idleTimeoutStamp;

protected HttpConnection(HttpDestination destination)
{
this.destination = destination;
this.requestTimeouts = new RequestTimeouts(destination.getHttpClient().getScheduler());
this.idleTimeoutStamp = System.nanoTime();
}

Expand All @@ -65,6 +70,8 @@ public HttpDestination getHttpDestination()
return destination;
}

protected abstract Iterator<HttpChannel> getHttpChannels();

@Override
public void send(Request request, Response.CompleteListener listener)
{
Expand Down Expand Up @@ -230,6 +237,7 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
SendFailure result;
if (channel.associate(exchange))
{
requestTimeouts.schedule(channel);
channel.send();
result = null;
}
Expand Down Expand Up @@ -292,9 +300,68 @@ public Object getAttachment()
return attachment;
}

protected void destroy()
{
requestTimeouts.destroy();
}

@Override
public String toString()
{
return String.format("%s@%h", getClass().getSimpleName(), this);
}

private class RequestTimeouts extends CyclicTimeouts<HttpChannel>
{
private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}

@Override
protected Iterator<HttpChannel> iterator()
{
return getHttpChannels();
}

@Override
protected boolean onExpired(HttpChannel channel)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
{
HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
}
return false;
}
}

protected static final class IteratorWrapper<T> implements Iterator<T>
{
private final Iterator<? extends T> iterator;

public IteratorWrapper(Iterator<? extends T> iterator)
{
this.iterator = iterator;
}

@Override
public boolean hasNext()
{
return iterator.hasNext();
}

@Override
public T next()
{
return iterator.next();
}

@Override
public void remove()
{
iterator.remove();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
Expand All @@ -36,7 +35,7 @@
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HostPort;
Expand All @@ -55,7 +54,7 @@
@ManagedObject
public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable
{
protected static final Logger LOG = Log.getLogger(HttpDestination.class);
private static final Logger LOG = Log.getLogger(HttpDestination.class);

private final HttpClient client;
private final Origin origin;
Expand Down Expand Up @@ -270,10 +269,7 @@ public void send(HttpExchange exchange)
{
if (enqueue(exchanges, exchange))
{
long expiresAt = request.getTimeoutAt();
if (expiresAt != -1)
requestTimeouts.schedule(expiresAt);

requestTimeouts.schedule(exchange);
if (!client.isRunning() && exchanges.remove(exchange))
{
request.abort(new RejectedExecutionException(client + " is stopping"));
Expand Down Expand Up @@ -549,63 +545,27 @@ public String toString()
/**
* <p>Enforces the total timeout for for exchanges that are still in the queue.</p>
* <p>The total timeout for exchanges that are not in the destination queue
* is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
* is enforced in {@link HttpConnection}.</p>
*/
private class RequestTimeouts extends CyclicTimeout
private class RequestTimeouts extends CyclicTimeouts<HttpExchange>
{
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);

private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}

@Override
public void onTimeoutExpired()
protected Iterator<HttpExchange> iterator()
{
if (LOG.isDebugEnabled())
LOG.debug("{} timeouts check", this);

long now = System.nanoTime();
long earliest = Long.MAX_VALUE;
// Reset the earliest timeout so we can expire again.
// A concurrent call to schedule(long) may lose an earliest
// value, but the corresponding exchange is already enqueued
// and will be seen by scanning the exchange queue below.
earliestTimeout.set(earliest);

// Scan the message queue to abort expired exchanges
// and to find the exchange that expire the earliest.
for (HttpExchange exchange : exchanges)
{
HttpRequest request = exchange.getRequest();
long expiresAt = request.getTimeoutAt();
if (expiresAt == -1)
continue;
if (expiresAt <= now)
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
else if (expiresAt < earliest)
earliest = expiresAt;
}

if (earliest < Long.MAX_VALUE && client.isRunning())
schedule(earliest);
return exchanges.iterator();
}

private void schedule(long expiresAt)
@Override
protected boolean onExpired(HttpExchange exchange)
{
// Schedule a timeout for the earliest exchange that may expire.
// When the timeout expires, scan the exchange queue for the next
// earliest exchange that may expire, and reschedule a new timeout.
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
if (expiresAt < prevEarliest)
{
// A new request expires earlier than previous requests, schedule it.
long delay = Math.max(0, expiresAt - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
schedule(delay, TimeUnit.NANOSECONDS);
}
HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@

import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

public class HttpExchange
public class HttpExchange implements CyclicTimeouts.Expirable
{
private static final Logger LOG = Log.getLogger(HttpExchange.class);

Expand Down Expand Up @@ -86,6 +87,12 @@ public Throwable getResponseFailure()
}
}

@Override
public long getExpireNanoTime()
{
return request.getTimeoutAt();
}

/**
* <p>Associates the given {@code channel} to this exchange.</p>
* <p>Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,9 @@ private void terminateResponse(HttpExchange exchange, Result result)
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(exchange, result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}, notifying {}", failure == null ? "succeeded" : "failed", result, listeners);
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);
if (ordered)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ public boolean isRedirect(Response response)
*/
public Result redirect(Request request, Response response) throws InterruptedException, ExecutionException
{
final AtomicReference<Result> resultRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Result> resultRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Request redirect = redirect(request, response, new BufferingResponseListener()
{
@Override
Expand Down Expand Up @@ -307,13 +307,16 @@ private Request redirect(Request request, Response response, Response.CompleteLi
}
}

private Request sendRedirect(final HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method)
private Request sendRedirect(HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method)
{
try
{
Request redirect = client.copyRequest(httpRequest, location);
// Disable the timeout so that only the one from the initial request applies.
redirect.timeout(0, TimeUnit.MILLISECONDS);

// Adjust the timeout.
long timeoutAt = httpRequest.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
redirect.timeout(timeoutAt - System.nanoTime(), TimeUnit.MILLISECONDS);

// Use given method
redirect.method(method);
Expand Down
Loading

0 comments on commit 9218a2b

Please sign in to comment.