Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #6323 - HttpClient requests with redirects gets stuck/never cal… #6355

Merged
merged 1 commit into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -211,8 +212,25 @@ public void onComplete(Result result)
path = request.getPath();
}
Request newRequest = client.copyRequest(request, requestURI);
// Disable the timeout so that only the one from the initial request applies.
newRequest.timeout(0, TimeUnit.MILLISECONDS);

// Adjust the timeout of the new request, taking into account the
// timeout of the previous request and the time already elapsed.
long timeoutAt = request.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
{
long newTimeout = timeoutAt - System.nanoTime();
if (newTimeout > 0)
{
newRequest.timeout(newTimeout, TimeUnit.NANOSECONDS);
}
else
{
TimeoutException failure = new TimeoutException("Total timeout " + request.getConversation().getTimeout() + " ms elapsed");
forwardFailureComplete(request, failure, response, failure);
return;
}
}

if (path != null)
newRequest.path(path);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,26 @@
package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class HttpChannel
public abstract class HttpChannel implements CyclicTimeouts.Expirable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpChannel.class);

private final AutoLock _lock = new AutoLock();
private final HttpDestination _destination;
private final TimeoutCompleteListener _totalTimeout;
private HttpExchange _exchange;

protected HttpChannel(HttpDestination destination)
{
_destination = destination;
_totalTimeout = new TimeoutCompleteListener(destination.getHttpClient().getScheduler());
}

public void destroy()
{
_totalTimeout.destroy();
}

public HttpDestination getHttpDestination()
Expand Down Expand Up @@ -106,6 +104,13 @@ public HttpExchange getHttpExchange()
}
}

@Override
public long getExpireNanoTime()
{
HttpExchange exchange = getHttpExchange();
return exchange != null ? exchange.getExpireNanoTime() : Long.MAX_VALUE;
}

protected abstract HttpSender getHttpSender();

protected abstract HttpReceiver getHttpReceiver();
Expand All @@ -114,15 +119,7 @@ public void send()
{
HttpExchange exchange = getHttpExchange();
if (exchange != null)
{
long timeoutAt = exchange.getExpireNanoTime();
if (timeoutAt != Long.MAX_VALUE)
{
exchange.getResponseListeners().add(_totalTimeout);
_totalTimeout.schedule(exchange.getRequest(), timeoutAt);
}
send(exchange);
}
}

public abstract void send(HttpExchange exchange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.net.HttpCookie;
import java.net.URI;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand All @@ -30,9 +31,11 @@
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.HttpCookieStore;
import org.eclipse.jetty.util.thread.AutoLock;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -42,13 +45,15 @@ public abstract class HttpConnection implements IConnection, Attachable

private final AutoLock lock = new AutoLock();
private final HttpDestination destination;
private final RequestTimeouts requestTimeouts;
private Object attachment;
private int idleTimeoutGuard;
private long idleTimeoutStamp;

protected HttpConnection(HttpDestination destination)
{
this.destination = destination;
this.requestTimeouts = new RequestTimeouts(destination.getHttpClient().getScheduler());
this.idleTimeoutStamp = System.nanoTime();
}

Expand All @@ -62,6 +67,8 @@ public HttpDestination getHttpDestination()
return destination;
}

protected abstract Iterator<HttpChannel> getHttpChannels();

@Override
public void send(Request request, Response.CompleteListener listener)
{
Expand Down Expand Up @@ -99,6 +106,7 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
SendFailure result;
if (channel.associate(exchange))
{
requestTimeouts.schedule(channel);
channel.send();
result = null;
}
Expand Down Expand Up @@ -228,16 +236,6 @@ private StringBuilder convertCookies(List<HttpCookie> cookies, StringBuilder bui
return builder;
}

private void applyProxyAuthentication(Request request, ProxyConfiguration.Proxy proxy)
{
if (proxy != null)
{
Authentication.Result result = getHttpClient().getAuthenticationStore().findAuthenticationResult(proxy.getURI());
if (result != null)
result.apply(request);
}
}

private void applyRequestAuthentication(Request request)
{
AuthenticationStore authenticationStore = getHttpClient().getAuthenticationStore();
Expand All @@ -253,6 +251,16 @@ private void applyRequestAuthentication(Request request)
}
}

private void applyProxyAuthentication(Request request, ProxyConfiguration.Proxy proxy)
{
if (proxy != null)
{
Authentication.Result result = getHttpClient().getAuthenticationStore().findAuthenticationResult(proxy.getURI());
if (result != null)
result.apply(request);
}
}

public boolean onIdleTimeout(long idleTimeout)
{
try (AutoLock l = lock.lock())
Expand Down Expand Up @@ -288,9 +296,40 @@ public Object getAttachment()
return attachment;
}

protected void destroy()
{
requestTimeouts.destroy();
}

@Override
public String toString()
{
return String.format("%s@%h", getClass().getSimpleName(), this);
}

private class RequestTimeouts extends CyclicTimeouts<HttpChannel>
{
private RequestTimeouts(Scheduler scheduler)
{
super(scheduler);
}

@Override
protected Iterator<HttpChannel> iterator()
{
return getHttpChannels();
}

@Override
protected boolean onExpired(HttpChannel channel)
{
HttpExchange exchange = channel.getHttpExchange();
if (exchange != null)
{
HttpRequest request = exchange.getRequest();
request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed"));
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque;

import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.AttributesMap;
import org.slf4j.Logger;
Expand Down Expand Up @@ -138,6 +139,20 @@ public void updateResponseListeners(Response.ResponseListener overrideListener)
this.listeners = listeners;
}

/**
* <p>Returns the total timeout for the conversation.</p>
* <p>The conversation total timeout is the total timeout
* of the first request in the conversation.</p>
*
* @return the total timeout of the conversation
* @see Request#getTimeout()
*/
public long getTimeout()
{
HttpExchange firstExchange = exchanges.peekFirst();
return firstExchange == null ? 0 : firstExchange.getRequest().getTimeout();
}

public boolean abort(Throwable cause)
{
HttpExchange exchange = exchanges.peekLast();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ public interface Multiplexed
/**
* <p>Enforces the total timeout for for exchanges that are still in the queue.</p>
* <p>The total timeout for exchanges that are not in the destination queue
* is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.</p>
* is enforced in {@link HttpConnection}.</p>
*/
private class RequestTimeouts extends CyclicTimeouts<HttpExchange>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,9 @@ private void terminateResponse(HttpExchange exchange, Result result)
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(exchange, result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}, notifying {}", failure == null ? "succeeded" : "failed", result, listeners);
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result);
if (ordered)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -106,8 +107,8 @@ public boolean isRedirect(Response response)
*/
public Result redirect(Request request, Response response) throws InterruptedException, ExecutionException
{
final AtomicReference<Result> resultRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Result> resultRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
Request redirect = redirect(request, response, new BufferingResponseListener()
{
@Override
Expand Down Expand Up @@ -302,13 +303,29 @@ private URI sanitize(String location)
}
}

private Request sendRedirect(final HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method)
private Request sendRedirect(HttpRequest httpRequest, Response response, Response.CompleteListener listener, URI location, String method)
{
try
{
Request redirect = client.copyRequest(httpRequest, location);
// Disable the timeout so that only the one from the initial request applies.
redirect.timeout(0, TimeUnit.MILLISECONDS);

// Adjust the timeout of the new request, taking into account the
// timeout of the previous request and the time already elapsed.
long timeoutAt = httpRequest.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
{
long newTimeout = timeoutAt - System.nanoTime();
if (newTimeout > 0)
{
redirect.timeout(newTimeout, TimeUnit.NANOSECONDS);
}
else
{
TimeoutException failure = new TimeoutException("Total timeout " + httpRequest.getConversation().getTimeout() + " ms elapsed");
fail(httpRequest, failure, response);
return null;
}
}

// Use given method
redirect.method(method);
Expand All @@ -325,17 +342,27 @@ private Request sendRedirect(final HttpRequest httpRequest, Response response, R
}
catch (Throwable x)
{
fail(httpRequest, response, x);
fail(httpRequest, x, response);
return null;
}
}

protected void fail(Request request, Response response, Throwable failure)
{
fail(request, null, response, failure);
}

protected void fail(Request request, Throwable failure, Response response)
{
fail(request, failure, response, failure);
}

private void fail(Request request, Throwable requestFailure, Response response, Throwable responseFailure)
{
HttpConversation conversation = ((HttpRequest)request).getConversation();
conversation.updateResponseListeners(null);
List<Response.ResponseListener> listeners = conversation.getResponseListeners();
notifier.notifyFailure(listeners, response, failure);
notifier.notifyComplete(listeners, new Result(request, response, failure));
notifier.notifyFailure(listeners, response, responseFailure);
notifier.notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.io.CyclicTimeout;
import org.eclipse.jetty.io.CyclicTimeouts;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @deprecated Do not use it, use {@link CyclicTimeouts} instead.
*/
@Deprecated
public class TimeoutCompleteListener extends CyclicTimeout implements Response.CompleteListener
{
private static final Logger LOG = LoggerFactory.getLogger(TimeoutCompleteListener.class);
Expand Down
Loading