Skip to content

Commit

Permalink
Fixes #5931 - SslConnection should implement getBytesIn()/getBytesOut().
Browse files Browse the repository at this point in the history
Updates after review.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed May 31, 2021
1 parent 0aa6720 commit cb2d57b
Show file tree
Hide file tree
Showing 4 changed files with 358 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -218,10 +219,23 @@ public void onComplete(Result result)
}
Request newRequest = client.copyRequest(request, requestURI);

// Adjust the timeout.
// Adjust the timeout of the new request, taking into account the
// timeout of the previous request and the time already elapsed.
long timeoutAt = request.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
newRequest.timeout(timeoutAt - System.nanoTime(), TimeUnit.MILLISECONDS);
{
long newTimeout = timeoutAt - System.nanoTime();
if (newTimeout > 0)
{
newRequest.timeout(newTimeout, TimeUnit.NANOSECONDS);
}
else
{
TimeoutException failure = new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed");
forwardFailureComplete(request, failure, response, failure);
return;
}
}

if (path != null)
newRequest.path(path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -313,10 +314,23 @@ private Request sendRedirect(HttpRequest httpRequest, Response response, Respons
{
Request redirect = client.copyRequest(httpRequest, location);

// Adjust the timeout.
// Adjust the timeout of the new request, taking into account the
// timeout of the previous request and the time already elapsed.
long timeoutAt = httpRequest.getTimeoutAt();
if (timeoutAt < Long.MAX_VALUE)
redirect.timeout(timeoutAt - System.nanoTime(), TimeUnit.MILLISECONDS);
{
long newTimeout = timeoutAt - System.nanoTime();
if (newTimeout > 0)
{
redirect.timeout(newTimeout, TimeUnit.NANOSECONDS);
}
else
{
TimeoutException failure = new TimeoutException("Total timeout " + httpRequest.getTimeout() + " ms elapsed");
fail(httpRequest, failure, response);
return null;
}
}

// Use given method
redirect.method(method);
Expand All @@ -333,17 +347,27 @@ private Request sendRedirect(HttpRequest httpRequest, Response response, Respons
}
catch (Throwable x)
{
fail(httpRequest, response, x);
fail(httpRequest, x, response);
return null;
}
}

protected void fail(Request request, Response response, Throwable failure)
{
fail(request, null, response, failure);
}

protected void fail(Request request, Throwable failure, Response response)
{
fail(request, failure, response, failure);
}

private void fail(Request request, Throwable requestFailure, Response response, Throwable responseFailure)
{
HttpConversation conversation = ((HttpRequest)request).getConversation();
conversation.updateResponseListeners(null);
List<Response.ResponseListener> listeners = conversation.getResponseListeners();
notifier.notifyFailure(listeners, response, failure);
notifier.notifyComplete(listeners, new Result(request, response, failure));
notifier.notifyFailure(listeners, response, responseFailure);
notifier.notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
}
}
62 changes: 47 additions & 15 deletions jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeouts.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,7 @@ public abstract class CyclicTimeouts<T extends CyclicTimeouts.Expirable> impleme

public CyclicTimeouts(Scheduler scheduler)
{
cyclicTimeout = new CyclicTimeout(scheduler)
{
@Override
public void onTimeoutExpired()
{
CyclicTimeouts.this.onTimeoutExpired();
}
};
cyclicTimeout = new Timeouts(scheduler);
}

/**
Expand All @@ -73,6 +66,9 @@ public void onTimeoutExpired()

/**
* <p>Invoked during the iteration when the given entity is expired.</p>
* <p>This method may be invoked multiple times, and even concurrently,
* for the same expirable entity and therefore the expiration of the
* entity, if any, should be an idempotent action.</p>
*
* @param expirable the entity that is expired
* @return whether the entity should be removed from the iterator via {@link Iterator#remove()}
Expand All @@ -82,7 +78,7 @@ public void onTimeoutExpired()
private void onTimeoutExpired()
{
if (LOG.isDebugEnabled())
LOG.debug("{} timeouts check", this);
LOG.debug("Timeouts check for {}", this);

long now = System.nanoTime();
long earliest = Long.MAX_VALUE;
Expand All @@ -92,18 +88,29 @@ private void onTimeoutExpired()
// be seen during the iteration below.
earliestTimeout.set(earliest);

Iterator<T> iterator = iterator();
if (iterator == null)
return;

// Scan the entities to abort expired entities
// and to find the entity that expires the earliest.
Iterator<T> iterator = iterator();
while (iterator.hasNext())
{
T expirable = iterator.next();
long expiresAt = expirable.getExpireNanoTime();

if (LOG.isDebugEnabled())
LOG.debug("Entity {} expires in {} ms for {}", expirable, TimeUnit.NANOSECONDS.toMillis(expiresAt - now), this);

if (expiresAt == -1)
continue;

if (expiresAt <= now)
{
if (onExpired(expirable))
boolean remove = onExpired(expirable);
if (LOG.isDebugEnabled())
LOG.debug("Entity {} expired, remove={} for {}", expirable, remove, this);
if (remove)
iterator.remove();
continue;
}
Expand Down Expand Up @@ -132,13 +139,19 @@ private void schedule(long expiresAt)
// When the timeout expires, scan the entities for the next
// earliest entity that may expire, and reschedule a new timeout.
long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
if (expiresAt < prevEarliest)
long expires = expiresAt;
while (expires < prevEarliest)
{
// A new entity expires earlier than previous entities, schedule it.
long delay = Math.max(0, expiresAt - System.nanoTime());
long delay = Math.max(0, expires - System.nanoTime());
if (LOG.isDebugEnabled())
LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay));
cyclicTimeout.schedule(delay, TimeUnit.NANOSECONDS);
LOG.debug("Scheduling timeout in {} ms for {}", TimeUnit.NANOSECONDS.toMillis(delay), this);
schedule(cyclicTimeout, delay, TimeUnit.NANOSECONDS);

// If we lost a race and overwrote a schedule() with an earlier time, then that earlier time
// is remembered by earliestTimeout, in which case we will loop and set it again ourselves.
prevEarliest = expires;
expires = earliestTimeout.get();
}
}

Expand All @@ -148,6 +161,11 @@ public void destroy()
cyclicTimeout.destroy();
}

boolean schedule(CyclicTimeout cyclicTimeout, long delay, TimeUnit unit)
{
return cyclicTimeout.schedule(delay, unit);
}

/**
* <p>An entity that may expire.</p>
*/
Expand All @@ -164,4 +182,18 @@ public interface Expirable
*/
public long getExpireNanoTime();
}

private class Timeouts extends CyclicTimeout
{
private Timeouts(Scheduler scheduler)
{
super(scheduler);
}

@Override
public void onTimeoutExpired()
{
CyclicTimeouts.this.onTimeoutExpired();
}
}
}
Loading

0 comments on commit cb2d57b

Please sign in to comment.