Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #4904 - WebsocketClient creates more connections than needed. #4911

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
* The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections.
*/
private final AtomicBiInteger connections = new AtomicBiInteger();
private final Destination destination;
private final HttpDestination destination;
private final int maxConnections;
private final Callback requester;

protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester)
{
this.destination = destination;
this.destination = (HttpDestination)destination;
gregw marked this conversation as resolved.
Show resolved Hide resolved
this.maxConnections = maxConnections;
this.requester = requester;
}
Expand Down Expand Up @@ -98,11 +98,17 @@ public boolean isClosed()

@Override
public Connection acquire()
{
return acquire(true);
}

protected Connection acquire(boolean create)
gregw marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The documented contract is not respected by the RoundRobinConnectionPool subclass. That should probably be documented too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documented.

{
Connection connection = activate();
if (connection == null)
{
tryCreate(-1);
if (create)
tryCreate(destination.getQueuedRequestCount());
gregw marked this conversation as resolved.
Show resolved Hide resolved
connection = activate();
}
return connection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,8 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
}
else
{
// Association may fail for example if the application
gregw marked this conversation as resolved.
Show resolved Hide resolved
// aborted the request, so we must release the channel.
channel.release();
result = new SendFailure(new HttpRequestException("Could not associate request to connection", request), false);
}
Expand All @@ -242,6 +244,8 @@ protected SendFailure send(HttpChannel channel, HttpExchange exchange)
}
else
{
// This connection has been timed out by another thread
// that will take care of removing it from the pool.
return new SendFailure(new TimeoutException(), true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ public ConnectionPool getConnectionPool()
@Override
public void succeeded()
{
send();
send(false);
}

@Override
Expand Down Expand Up @@ -307,26 +307,40 @@ protected boolean enqueue(Queue<HttpExchange> queue, HttpExchange exchange)
}

public void send()
{
send(true);
}

private void send(boolean create)
{
if (getHttpExchanges().isEmpty())
return;
process();
process(create);
}

private void process()
private void process(boolean create)
{
// The loop is necessary in case of a new multiplexed connection,
// when a single thread notified of the connection opening must
// process all queued exchanges.
// In other cases looping is a work-stealing optimization.
while (true)
{
Connection connection = connectionPool.acquire();
Connection connection;
if (connectionPool instanceof AbstractConnectionPool)
connection = ((AbstractConnectionPool)connectionPool).acquire(create);
else
connection = connectionPool.acquire();
if (connection == null)
break;
boolean proceed = process(connection);
if (!proceed)
ProcessResult result = process(connection);
if (result == ProcessResult.FINISH)
break;
create = result == ProcessResult.RESTART;
}
}

public boolean process(Connection connection)
public ProcessResult process(Connection connection)
{
HttpClient client = getHttpClient();
HttpExchange exchange = getHttpExchanges().poll();
Expand All @@ -342,7 +356,7 @@ public boolean process(Connection connection)
LOG.debug("{} is stopping", client);
connection.close();
}
return false;
return ProcessResult.FINISH;
}
else
{
Expand All @@ -353,31 +367,37 @@ public boolean process(Connection connection)
if (LOG.isDebugEnabled())
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// Won't use this connection, release it back.
if (!connectionPool.release(connection))
boolean released = connectionPool.release(connection);
if (!released)
connection.close();
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
return getHttpExchanges().size() > 0
? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART)
: ProcessResult.FINISH;
}
else

SendFailure failure = send(connection, exchange);
if (failure == null)
{
SendFailure result = send(connection, exchange);
if (result != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", result, exchange);
if (result.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return false;
}
request.abort(result.failure);
}
// Aggressively send other queued requests
// in case connections are multiplexed.
return getHttpExchanges().size() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
}
return getHttpExchanges().peek() != null;

if (LOG.isDebugEnabled())
LOG.debug("Send failed {} for {}", failure, exchange);
if (failure.retry)
{
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return ProcessResult.FINISH;
}
request.abort(failure.failure);
return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
}
}

Expand Down Expand Up @@ -419,7 +439,7 @@ public void release(Connection connection)
if (connectionPool.isActive(connection))
{
if (connectionPool.release(connection))
send();
send(false);
else
connection.close();
}
Expand All @@ -439,25 +459,27 @@ public void release(Connection connection)

public boolean remove(Connection connection)
{
return connectionPool.remove(connection);
}

public void close(Connection connection)
{
boolean removed = remove(connection);
boolean removed = connectionPool.remove(connection);

if (getHttpExchanges().isEmpty())
{
tryRemoveIdleDestination();
}
else
{
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries.
// We need to execute queued requests
// even if this connection was removed.
// We may create a connection that is not
// needed, but it will eventually idle timeout.
if (removed)
process();
process(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this check that the queue is not 0? Why acquire a connection if there are no waiting exchanges?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for the queue is done few lines above. Do you want to do it again here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opps didn't see that.

An else if (removed) would be clearer that there is a condition above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think an else if would then require the code comment to be moved to a less ideal place so I'm inclined to leave as is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment would be better in the body of the else if, as it would not need to repeat the "if removed":

if (getHttpExchanges().isEmpty())
{
    tryRemoveIdleDestination();
}
else if (removed)
{
   // Process queued exchanges because waiting exchanges may now be able to progress. 
   process(true);
}

}
return connectionPool.remove(connection);
}

public void close(Connection connection)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since close(Connection) and remove(Connection) are now identical, shouldn't one of them be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I deprecated close(Connection) to be removed in 10.

{
remove(connection);
}

/**
Expand Down Expand Up @@ -581,4 +603,9 @@ private void schedule(long expiresAt)
}
}
}

private enum ProcessResult
{
RESTART, CONTINUE, FINISH
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public Connection acquire()
Connection connection = activate();
if (connection == null)
{
int maxPending = 1 + destination.getQueuedRequestCount() / getMaxMultiplex();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this one-liner instead?

int maxPending = (destination.getQueuedRequestCount() + getMaxMultiplex() - 1) / getMaxMultiplex();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is obfuscated... Does it do ceiling(a/b)?

Copy link
Contributor

@lorban lorban Jun 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's the integer version of the following:

Math.ceil((double) destination.getQueuedRequestCount() / getMaxMultiplex())

int queuedRequests = destination.getQueuedRequestCount();
int maxMultiplex = getMaxMultiplex();
int maxPending = queuedRequests / maxMultiplex;
if (maxPending * maxMultiplex != queuedRequests)
++maxPending;
tryCreate(maxPending);
connection = activate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ public void setMaxMultiplex(int maxMultiplex)
}
}

@Override
protected Connection acquire(boolean create)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned earlier, this implementation breaks the javadoc's contract.

{
// The nature of this connection pool is such that a
// connection must always be present in the next slot.
return super.acquire(true);
}

@Override
protected void onCreated(Connection connection)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ public void onContent(Response response, ByteBuffer content)
int length = content.remaining();
if (length > BufferUtil.space(buffer))
{
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
if (requiredCapacity > maxLength)
int remaining = buffer == null ? 0 : buffer.remaining();
if (remaining + length > maxLength)
response.abort(new IllegalArgumentException("Buffering capacity " + maxLength + " exceeded"));
int requiredCapacity = buffer == null ? length : buffer.capacity() + length;
int newCapacity = Math.min(Integer.highestOneBit(requiredCapacity) << 1, maxLength);
buffer = BufferUtil.ensureCapacity(buffer, newCapacity);
}
Expand Down
Loading