Skip to content

Commit

Permalink
Fixes #6323 - HttpClient requests with redirects gets stuck/never cal… (
Browse files Browse the repository at this point in the history
#6334)

Fixes #6323 - HttpClient requests with redirects gets stuck/never calls onComplete()

* Reworked the total timeout handling.
* Now a CyclicTimeouts handles the exchanges in each HttpDestination,
and a CyclicTimeouts handles the exchanges in each HttpConnection
(rather than in HttpChannel).
* Now adjusting the total timeout for copied requests generated by
redirects and authentication.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored Jun 3, 2021
1 parent f902d12 commit 2e7d174
Show file tree
Hide file tree
Showing 17 changed files with 783 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -217,8 +218,25 @@ public void onComplete(Result result)
path = request.getPath();
}
Request newRequest = client.copyRequest(request, requestURI);
// Disable the timeout so that only the one from the initial request applies.
newRequest.timeout(0, TimeUnit.MILLISECONDS);

// Adjust the timeout of the new request, taking into account the
// timeout of the previous request and the time already elapsed.
long timeoutAt = request.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
{
long newTimeout = timeoutAt - System.nanoTime();
if (newTimeout > 0)
{
newRequest.timeout(newTimeout, TimeUnit.NANOSECONDS);
}
else
{
TimeoutException failure = new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed");
forwardFailureComplete(request, failure, response, failure);
return;
}
}

if (path != null)
newRequest.path(path);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,24 @@
package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

public abstract class HttpChannel
public abstract class HttpChannel implements CyclicTimeouts.Expirable
{
protected static final Logger LOG = Log.getLogger(HttpChannel.class);

private final HttpDestination _destination;
private final TimeoutCompleteListener _totalTimeout;
private HttpExchange _exchange;

protected HttpChannel(HttpDestination destination)
{
_destination = destination;
_totalTimeout = new TimeoutCompleteListener(destination.getHttpClient().getScheduler());
}

public void destroy()
{
_totalTimeout.destroy();
}

public HttpDestination getHttpDestination()
Expand Down Expand Up @@ -109,6 +107,13 @@ public HttpExchange getHttpExchange()
}
}

@Override
public long getExpireNanoTime()
{
HttpExchange exchange = getHttpExchange();
return exchange != null ? exchange.getExpireNanoTime() : Long.MAX_VALUE;
}

protected abstract HttpSender getHttpSender();

protected abstract HttpReceiver getHttpReceiver();
Expand All @@ -117,16 +122,7 @@ public void send()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
{
HttpRequest request = exchange.getRequest();
long timeoutAt = request.getTimeoutAt();
if (timeoutAt != -1)
{
exchange.getResponseListeners().add(_totalTimeout);
_totalTimeout.schedule(request, timeoutAt);
}
send(exchange);
}
}

public abstract void send(HttpExchange exchange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.net.HttpCookie;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -35,23 +36,27 @@
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;

public abstract class HttpConnection implements Connection, Attachable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);

private final HttpDestination destination;
private final RequestTimeouts requestTimeouts;
private Object attachment;
private int idleTimeoutGuard;
private long idleTimeoutStamp;

protected HttpConnection(HttpDestination destination)
{
this.destination = destination;
this.requestTimeouts = new RequestTimeouts(destination.getHttpClient().getScheduler());
this.idleTimeoutStamp = System.nanoTime();
}

Expand All @@ -65,6 +70,8 @@ public HttpDestination getHttpDestination()
return destination;
}

protected abstract Iterator<HttpChannel> getHttpChannels();

@Override
public void send(Request request, Response.CompleteListener listener)
{
Expand Down Expand Up @@ -230,6 +237,7 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
SendFailure result;
if (channel.associate(exchange))
{
requestTimeouts.schedule(channel);
channel.send();
result = null;
}
Expand Down Expand Up @@ -292,9 +300,40 @@ public Object getAttachment()
return attachment;
}

protected void destroy()
{
requestTimeouts.destroy();
}

@Override
public String toString()
{
return String.format("%s@%h", getClass().getSimpleName(), this);
}

private class RequestTimeouts extends CyclicTimeouts<HttpChannel>
{
private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}

@Override
protected Iterator<HttpChannel> iterator()
{
return getHttpChannels();
}

@Override
protected boolean onExpired(HttpChannel channel)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
{
HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;

import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.AttributesMap;
import org.eclipse.jetty.util.log.Log;
Expand Down Expand Up @@ -143,6 +144,20 @@ public void updateResponseListeners(Response.ResponseListener overrideListener)
this.listeners = listeners;
}

/**
* <p>Returns the total timeout for the conversation.</p>
* <p>The conversation total timeout is the total timeout
* of the first request in the conversation.</p>
*
* @return the total timeout of the conversation
* @see Request#getTimeout()
*/
public long getTimeout()
{
HttpExchange firstExchange = exchanges.peekFirst();
return firstExchange == null ? 0 : firstExchange.getRequest().getTimeout();
}

public boolean abort(Throwable cause)
{
HttpExchange exchange = exchanges.peekLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@
import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
Expand All @@ -36,7 +35,7 @@
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HostPort;
Expand All @@ -55,7 +54,7 @@
@ManagedObject
public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable
{
protected static final Logger LOG = Log.getLogger(HttpDestination.class);
private static final Logger LOG = Log.getLogger(HttpDestination.class);

private final HttpClient client;
private final Origin origin;
Expand Down Expand Up @@ -270,10 +269,7 @@ public void send(HttpExchange exchange)
{
if (enqueue(exchanges, exchange))
{
long expiresAt = request.getTimeoutAt();
if (expiresAt != -1)
requestTimeouts.schedule(expiresAt);

requestTimeouts.schedule(exchange);
if (!client.isRunning() && exchanges.remove(exchange))
{
request.abort(new RejectedExecutionException(client + " is stopping"));
Expand Down Expand Up @@ -549,63 +545,27 @@ public String toString()
/**
* <p>Enforces the total timeout for for exchanges that are still in the queue.</p>
* <p>The total timeout for exchanges that are not in the destination queue
* is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
* is enforced in {@link HttpConnection}.</p>
*/
private class RequestTimeouts extends CyclicTimeout
private class RequestTimeouts extends CyclicTimeouts<HttpExchange>
{
private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE);

private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}

@Override
public void onTimeoutExpired()
protected Iterator<HttpExchange> iterator()
{
if (LOG.isDebugEnabled())
LOG.debug("{} timeouts check", this);

long now = System.nanoTime();
long earliest = Long.MAX_VALUE;
// Reset the earliest timeout so we can expire again.
// A concurrent call to schedule(long) may lose an earliest
// value, but the corresponding exchange is already enqueued
// and will be seen by scanning the exchange queue below.
earliestTimeout.set(earliest);

// Scan the message queue to abort expired exchanges
// and to find the exchange that expire the earliest.
for (HttpExchange exchange : exchanges)
{
HttpRequest request = exchange.getRequest();
long expiresAt = request.getTimeoutAt();
if (expiresAt == -1)
continue;
if (expiresAt <= now)
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
else if (expiresAt < earliest)
earliest = expiresAt;
}

if (earliest < Long.MAX_VALUE && client.isRunning())
schedule(earliest);
return exchanges.iterator();
}

private void schedule(long expiresAt)
@Override
protected boolean onExpired(HttpExchange exchange)
{
// Schedule a timeout for the earliest exchange that may expire.
// When the timeout expires, scan the exchange queue for the next
// earliest exchange that may expire, and reschedule a new timeout.
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
if (expiresAt < prevEarliest)
{
// A new request expires earlier than previous requests, schedule it.
long delay = Math.max(0, expiresAt - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
schedule(delay, TimeUnit.NANOSECONDS);
}
HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@

import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

public class HttpExchange
public class HttpExchange implements CyclicTimeouts.Expirable
{
private static final Logger LOG = Log.getLogger(HttpExchange.class);

Expand Down Expand Up @@ -86,6 +87,12 @@ public Throwable getResponseFailure()
}
}

@Override
public long getExpireNanoTime()
{
return request.getTimeoutAt();
}

/**
* <p>Associates the given {@code channel} to this exchange.</p>
* <p>Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.</p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,9 +474,9 @@ private void terminateResponse(HttpExchange exchange, Result result)
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(exchange, result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}, notifying {}", failure == null ? "succeeded" : "failed", result, listeners);
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);
if (ordered)
Expand Down
Loading

0 comments on commit 2e7d174

Please sign in to comment.