Skip to content

Commit

Permalink
Fixes #9210 - Jetty 12 - Review Pool and Pool.Entry (#9211)
Browse files Browse the repository at this point in the history
* Fixes #9210 - Jetty 12 - Review Pool and Pool.Entry

* Extracted interface Pool, renamed implementation to ConcurrentPool.
* Extracted Pool.Entry as interface.
* Moved StrategyType to ConcurrentPool.
* Made Pool.Factory.wrap() work in order to wrap Pool instances.
* Removed constructors that were explicitly taking Pool parameters, replaced by a single Pool.Factory parameter.
* Added javadocs.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored Jan 30, 2023
1 parent b0e5074 commit af6ecfe
Show file tree
Hide file tree
Showing 25 changed files with 1,500 additions and 855 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -48,24 +47,34 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen

private final AtomicInteger pending = new AtomicInteger();
private final HttpDestination destination;
private final Pool<Connection> pool;
private final Pool.Factory<Connection> poolFactory;
private Pool<Connection> pool;
private boolean maximizeConnections;
private volatile long maxDurationNanos;
private volatile int maxUsage;
private volatile int initialMaxMultiplex;

protected AbstractConnectionPool(Destination destination, Pool<Connection> pool, int initialMaxMultiplex)
protected AbstractConnectionPool(Destination destination, Pool.Factory<Connection> poolFactory, int initialMaxMultiplex)
{
this.destination = (HttpDestination)destination;
this.pool = pool;
this.poolFactory = poolFactory;
this.initialMaxMultiplex = initialMaxMultiplex;
}

@Override
protected void doStart() throws Exception
{
pool = poolFactory.wrap(poolFactory.newPool());
addBean(pool);
super.doStart();
}

@Override
protected void doStop() throws Exception
{
pool.close();
super.doStop();
removeBean(pool);
pool.terminate().forEach(this::close);
}

@Override
Expand All @@ -77,7 +86,7 @@ public CompletableFuture<Void> preCreateConnections(int connectionCount)
List<CompletableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < connectionCount; i++)
{
Pool<Connection>.Entry entry = pool.reserve();
Pool.Entry<Connection> entry = pool.reserve();
if (entry == null)
break;
pending.incrementAndGet();
Expand Down Expand Up @@ -156,7 +165,7 @@ public int getIdleConnectionCount()
@ManagedAttribute(value = "The max number of connections", readonly = true)
public int getMaxConnectionCount()
{
return pool.getMaxEntries();
return pool.getMaxSize();
}

@ManagedAttribute(value = "The number of connections", readonly = true)
Expand All @@ -177,13 +186,6 @@ public boolean isEmpty()
return pool.size() == 0;
}

@Override
@ManagedAttribute("Whether this pool is closed")
public boolean isClosed()
{
return pool.isClosed();
}

@ManagedAttribute("Whether the pool tries to maximize the number of connections used")
public boolean isMaximizeConnections()
{
Expand Down Expand Up @@ -270,7 +272,7 @@ protected void tryCreate(boolean create)
}

// Create the connection.
Pool<Connection>.Entry entry = pool.reserve();
Pool.Entry<Connection> entry = pool.reserve();
if (entry == null)
{
pending.decrementAndGet();
Expand All @@ -290,7 +292,7 @@ public boolean accept(Connection connection)
{
if (!(connection instanceof Attachable attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Pool<Connection>.Entry entry = pool.reserve();
Pool.Entry<Connection> entry = pool.reserve();
if (entry == null)
return false;
if (LOG.isDebugEnabled())
Expand All @@ -311,7 +313,7 @@ protected Connection activate()
{
while (true)
{
Pool<Connection>.Entry entry = pool.acquire();
Pool.Entry<Connection> entry = pool.acquire();
if (entry != null)
{
Connection connection = entry.getPooled();
Expand Down Expand Up @@ -375,7 +377,7 @@ public boolean release(Connection connection)
if (!deactivate(connection))
return false;
released(connection);
return idle(connection, isClosed());
return idle(connection, isStopped());
}

protected boolean deactivate(Connection connection)
Expand Down Expand Up @@ -452,52 +454,42 @@ protected void removed(Connection connection)
{
}

Queue<Connection> getIdleConnections()
Collection<Connection> getIdleConnections()
{
return pool.values().stream()
return pool.stream()
.filter(Pool.Entry::isIdle)
.filter(entry -> !entry.isClosed())
.filter(entry -> !entry.isTerminated())
.map(Pool.Entry::getPooled)
.collect(toCollection(ArrayDeque::new));
}

Collection<Connection> getActiveConnections()
{
return pool.values().stream()
return pool.stream()
.filter(entry -> !entry.isIdle())
.filter(entry -> !entry.isClosed())
.filter(entry -> !entry.isTerminated())
.map(Pool.Entry::getPooled)
.collect(Collectors.toList());
}

@Override
public void close()
private void close(Pool.Entry<Connection> entry)
{
// Forcibly release and remove entries to do our best effort calling the listeners.
try
// Forcibly release and remove entries to
// do our best effort calling the listeners.
Connection connection = entry.getPooled();
while (entry.isInUse())
{
for (Pool<Connection>.Entry entry : pool.values())
if (entry.release())
{
while (entry.isInUse())
{
if (entry.release())
{
released(entry.getPooled());
break;
}
}
if (entry.remove())
removed(entry.getPooled());
released(connection);
break;
}
}
catch (Throwable x)
if (entry.remove())
{
if (LOG.isDebugEnabled())
LOG.debug("Detected concurrent modification while forcibly releasing the pooled connections", x);
// We could not call the listeners for all entries, but at least the following
// pool.close() call will still release all resources.
removed(connection);
IO.close(connection);
}
pool.close();
}

@Override
Expand All @@ -509,7 +501,7 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public boolean sweep()
{
pool.values().stream()
pool.stream()
.map(Pool.Entry::getPooled)
.filter(connection -> connection instanceof Sweeper.Sweepable)
.forEach(connection ->
Expand Down Expand Up @@ -546,9 +538,9 @@ public String toString()

private class FutureConnection extends Promise.Completable<Connection>
{
private final Pool<Connection>.Entry reserved;
private final Pool.Entry<Connection> reserved;

public FutureConnection(Pool<Connection>.Entry reserved)
public FutureConnection(Pool.Entry<Connection> reserved)
{
this.reserved = reserved;
}
Expand Down Expand Up @@ -590,11 +582,11 @@ public void failed(Throwable x)

private static class EntryHolder
{
private final Pool<Connection>.Entry entry;
private final Pool.Entry<Connection> entry;
private final long creationNanoTime = NanoTime.now();
private final AtomicInteger usage = new AtomicInteger();

private EntryHolder(Pool<Connection>.Entry entry)
private EntryHolder(Pool.Entry<Connection> entry)
{
this.entry = Objects.requireNonNull(entry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@

package org.eclipse.jetty.client;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

/**
* <p>Client-side connection pool abstraction.</p>
*/
public interface ConnectionPool extends Closeable
public interface ConnectionPool
{
/**
* Optionally pre-create up to {@code connectionCount}
Expand All @@ -42,12 +41,6 @@ default CompletableFuture<Void> preCreateConnections(int connectionCount)
*/
boolean isEmpty();

/**
* @return whether this ConnectionPool has been closed
* @see #close()
*/
boolean isClosed();

/**
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true},
Expand Down Expand Up @@ -86,9 +79,6 @@ default CompletableFuture<Void> preCreateConnections(int connectionCount)
*/
boolean remove(Connection connection);

@Override
void close();

/**
* Factory for ConnectionPool instances.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.ConcurrentPool;
import org.eclipse.jetty.util.annotation.ManagedObject;

@ManagedObject
public class DuplexConnectionPool extends AbstractConnectionPool
{
public DuplexConnectionPool(Destination destination, int maxConnections)
{
super(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, false), 1);
super(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, false), 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.eclipse.jetty.client;

import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.component.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,13 +36,6 @@ public LeakTrackingConnectionPool(Destination destination, int maxConnections)
addBean(leakDetector);
}

@Override
public void close()
{
super.close();
LifeCycle.stop(this);
}

@Override
protected void acquired(Connection connection)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,43 @@

package org.eclipse.jetty.client;

import java.util.function.ToIntFunction;

import org.eclipse.jetty.util.ConcurrentPool;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;

@ManagedObject
public class MultiplexConnectionPool extends AbstractConnectionPool
{
public MultiplexConnectionPool(Destination destination, int maxConnections, int initialMaxMultiplex)
{
this(destination, Pool.StrategyType.FIRST, maxConnections, false, initialMaxMultiplex);
}

protected MultiplexConnectionPool(Destination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, int initialMaxMultiplex)
/**
* <p>Returns a function that computes the max multiplex value
* for a given {@link Connection}, if possible, otherwise returns
* the given {@code defaultMaxMultiplex} value.</p>
*
* @param defaultMaxMultiplex the default max multiplex value
* @return a function that computes the max multiplex value for a connection
*/
public static ToIntFunction<Connection> newMaxMultiplexer(int defaultMaxMultiplex)
{
super(destination, new Pool<>(strategy, maxConnections, cache, connection ->
return connection ->
{
int maxMultiplex = initialMaxMultiplex;
int maxMultiplex = defaultMaxMultiplex;
if (connection instanceof MaxMultiplexable maxMultiplexable)
maxMultiplex = maxMultiplexable.getMaxMultiplex();
return maxMultiplex;
}), initialMaxMultiplex);
};
}

public MultiplexConnectionPool(Destination destination, int maxConnections, int initialMaxMultiplex)
{
this(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.FIRST, maxConnections, false, newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex);
}

protected MultiplexConnectionPool(Destination destination, Pool.Factory<Connection> poolFactory, int initialMaxMultiplex)
{
super(destination, poolFactory, initialMaxMultiplex);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.ConcurrentPool;
import org.eclipse.jetty.util.annotation.ManagedObject;

/**
Expand All @@ -23,8 +23,8 @@
@ManagedObject
public class RandomConnectionPool extends MultiplexConnectionPool
{
public RandomConnectionPool(Destination destination, int maxConnections, int maxMultiplex)
public RandomConnectionPool(Destination destination, int maxConnections, int initialMaxMultiplex)
{
super(destination, Pool.StrategyType.RANDOM, maxConnections, false, maxMultiplex);
super(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.RANDOM, maxConnections, false, MultiplexConnectionPool.newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.ConcurrentPool;
import org.eclipse.jetty.util.annotation.ManagedObject;

/**
Expand Down Expand Up @@ -48,9 +48,9 @@ public RoundRobinConnectionPool(Destination destination, int maxConnections)
this(destination, maxConnections, 1);
}

public RoundRobinConnectionPool(Destination destination, int maxConnections, int maxMultiplex)
public RoundRobinConnectionPool(Destination destination, int maxConnections, int initialMaxMultiplex)
{
super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, maxMultiplex);
super(destination, () -> new ConcurrentPool<>(ConcurrentPool.StrategyType.ROUND_ROBIN, maxConnections, false, newMaxMultiplexer(initialMaxMultiplex)), initialMaxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void run()
{
if (done.compareAndSet(false, true))
{
boolean closed = isClosed();
boolean closed = isStopped();
if (LOG.isDebugEnabled())
LOG.debug("Validated {}", connection);
quarantine.remove(connection);
Expand Down
Loading

0 comments on commit af6ecfe

Please sign in to comment.