Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #4904 - WebsocketClient creates more connections than needed. #4911

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,19 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);

private final AtomicBoolean closed = new AtomicBoolean();

/**
* The connectionCount encodes both the total connections plus the pending connection counts, so both can be atomically changed.
* The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections.
*/
private final AtomicBiInteger connections = new AtomicBiInteger();
private final Destination destination;
private final AtomicBoolean closed = new AtomicBoolean();
private final HttpDestination destination;
private final int maxConnections;
private final Callback requester;

protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester)
{
this.destination = destination;
this.destination = (HttpDestination)destination;
gregw marked this conversation as resolved.
Show resolved Hide resolved
this.maxConnections = maxConnections;
this.requester = requester;
}
Expand Down Expand Up @@ -98,16 +97,43 @@ public boolean isClosed()

@Override
public Connection acquire()
{
return acquire(true);
}

/**
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
*
* @param create whether to schedule the opening of a connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate(int)
*/
protected Connection acquire(boolean create)
gregw marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documented contract is not respected by the RoundRobinConnectionPool subclass. That should probably be documented too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented.

{
Connection connection = activate();
if (connection == null)
{
tryCreate(-1);
if (create)
tryCreate(destination.getQueuedRequestCount());
gregw marked this conversation as resolved.
Show resolved Hide resolved
connection = activate();
}
return connection;
}

/**
* <p>Schedules the opening of a new connection.</p>
* <p>Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter:
* if {@code maxPending} is greater than the current number of connections scheduled for opening,
* then this method returns without scheduling the opening of a new connection;
* if {@code maxPending} is negative, a new connection is always scheduled for opening.</p>
*
* @param maxPending the max desired number of connections scheduled for opening,
* or a negative number to always trigger the opening of a new connection
*/
protected void tryCreate(int maxPending)
{
while (true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
}
else
{
// Association may fail, for example if the application
// aborted the request, so we must release the channel.
channel.release();
result = new SendFailure(new HttpRequestException("Could not associate request to connection", request), false);
}
Expand All @@ -242,6 +244,8 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
}
else
{
// This connection has been timed out by another thread
// that will take care of removing it from the pool.
return new SendFailure(new TimeoutException(), true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public ConnectionPool getConnectionPool()
@Override
public void succeeded()
{
send();
send(false);
}

@Override
Expand Down Expand Up @@ -307,26 +307,40 @@ protected boolean enqueue(Queue<HttpExchange> queue, HttpExchange exchange)
}

public void send()
{
send(true);
}

private void send(boolean create)
{
if (getHttpExchanges().isEmpty())
return;
process();
process(create);
}

private void process()
private void process(boolean create)
{
// The loop is necessary in case of a new multiplexed connection,
// when a single thread notified of the connection opening must
// process all queued exchanges.
// In other cases looping is a work-stealing optimization.
while (true)
{
Connection connection = connectionPool.acquire();
Connection connection;
if (connectionPool instanceof AbstractConnectionPool)
connection = ((AbstractConnectionPool)connectionPool).acquire(create);
else
connection = connectionPool.acquire();
if (connection == null)
break;
boolean proceed = process(connection);
if (!proceed)
ProcessResult result = process(connection);
if (result == ProcessResult.FINISH)
break;
create = result == ProcessResult.RESTART;
}
}

public boolean process(Connection connection)
public ProcessResult process(Connection connection)
{
HttpClient client = getHttpClient();
HttpExchange exchange = getHttpExchanges().poll();
Expand All @@ -342,7 +356,7 @@ public boolean process(Connection connection)
LOG.debug("{} is stopping", client);
connection.close();
}
return false;
return ProcessResult.FINISH;
}
else
{
Expand All @@ -353,31 +367,37 @@ public boolean process(Connection connection)
if (LOG.isDebugEnabled())
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// Won't use this connection, release it back.
if (!connectionPool.release(connection))
boolean released = connectionPool.release(connection);
if (!released)
connection.close();
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
return getHttpExchanges().size() > 0
? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART)
: ProcessResult.FINISH;
}
else

SendFailure failure = send(connection, exchange);
if (failure == null)
{
SendFailure result = send(connection, exchange);
if (result != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", result, exchange);
if (result.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return false;
}
request.abort(result.failure);
}
// Aggressively send other queued requests
// in case connections are multiplexed.
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
}
return getHttpExchanges().peek() != null;

if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", failure, exchange);
if (failure.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return ProcessResult.FINISH;
}
request.abort(failure.failure);
return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
}
}

Expand Down Expand Up @@ -419,7 +439,7 @@ public void release(Connection connection)
if (connectionPool.isActive(connection))
{
if (connectionPool.release(connection))
send();
send(false);
else
connection.close();
}
Expand All @@ -439,25 +459,30 @@ public void release(Connection connection)

public boolean remove(Connection connection)
{
return connectionPool.remove(connection);
}

public void close(Connection connection)
{
boolean removed = remove(connection);
boolean removed = connectionPool.remove(connection);

if (getHttpExchanges().isEmpty())
{
tryRemoveIdleDestination();
}
else
else if (removed)
{
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries.
if (removed)
process();
// Process queued requests that may be waiting.
// We may create a connection that is not
// needed, but it will eventually idle timeout.
process(true);
}
return removed;
}

/**
* @param connection the connection to remove
* @deprecated use {@link #remove(Connection)} instead
*/
@Deprecated
public void close(Connection connection)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since close(Connection) and remove(Connection) are now identical, shouldn't one of them be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deprecated close(Connection) to be removed in 10.

{
remove(connection);
}

/**
Expand Down Expand Up @@ -581,4 +606,9 @@ private void schedule(long expiresAt)
}
}
}

private enum ProcessResult
{
RESTART, CONTINUE, FINISH
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public Connection acquire()
Connection connection = activate();
if (connection == null)
{
int maxPending = 1 + destination.getQueuedRequestCount() / getMaxMultiplex();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this one-liner instead?

int maxPending = (destination.getQueuedRequestCount() + getMaxMultiplex() - 1) / getMaxMultiplex();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is obfuscated... Does it do ceiling(a/b)?

Copy link
Contributor

@lorban lorban Jun 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's the integer version of the following:

Math.ceil((double) destination.getQueuedRequestCount() / getMaxMultiplex())

int queuedRequests = destination.getQueuedRequestCount();
int maxMultiplex = getMaxMultiplex();
int maxPending = queuedRequests / maxMultiplex;
if (maxPending * maxMultiplex != queuedRequests)
++maxPending;
tryCreate(maxPending);
connection = activate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,21 @@ public void setMaxMultiplex(int maxMultiplex)
}
}

/**
* <p>Returns an idle connection, if available, following a round robin algorithm;
* otherwise it always tries to create a new connection, up until the max connection count.</p>
*
* @param create this parameter is ignored and assumed to be always {@code true}
* @return an idle connection or {@code null} if no idle connections are available
*/
@Override
protected Connection acquire(boolean create)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned earlier, this implementation breaks the javadoc's contract.

{
// The nature of this connection pool is such that a
// connection must always be present in the next slot.
return super.acquire(true);
}

@Override
protected void onCreated(Connection connection)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ protected void close(Throwable failure)
{
if (closed.compareAndSet(false, true))
{
getHttpDestination().close(this);
getHttpDestination().remove(this);
abort(failure);
channel.destroy();
getEndPoint().shutdownOutput();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ public void onContent(Response response, ByteBuffer content)
int length = content.remaining();
if (length > BufferUtil.space(buffer))
{
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
if (requiredCapacity > maxLength)
int remaining = buffer == null ? 0 : buffer.remaining();
if (remaining + length > maxLength)
response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded"));
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
int newCapacity = Math.min(Integer.highestOneBit(requiredCapacity) << 1, maxLength);
buffer = BufferUtil.ensureCapacity(buffer, newCapacity);
}
Expand Down
Loading