From 66de7ba6180e41480af2f4220334b62832586fac Mon Sep 17 00:00:00 2001
From: Ludovic Orban
Date: Wed, 15 Jun 2022 15:10:50 +0200
Subject: [PATCH] Improve ssl buffers handling (#8165)
* Fixes #8161 improve SSLConnection buffers handling
Added memory heuristic to ArrayRetainableByteBufferPool
Signed-off-by: Ludovic Orban
---
.../jetty/client/HttpClientTLSTest.java | 322 +++++++++++++++++-
.../jetty/io/AbstractByteBufferPool.java | 8 +-
.../eclipse/jetty/io/ArrayByteBufferPool.java | 9 +-
.../io/ArrayRetainableByteBufferPool.java | 57 +++-
.../eclipse/jetty/io/ssl/SslConnection.java | 116 +++++--
5 files changed, 457 insertions(+), 55 deletions(-)
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java
index d8ad678c430c..e4d5358cf78d 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java
@@ -22,12 +22,14 @@
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine;
@@ -36,6 +38,8 @@
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLPeerUnverifiedException;
import javax.net.ssl.SSLSocket;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
@@ -43,12 +47,16 @@
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
+import org.eclipse.jetty.io.ArrayByteBufferPool;
+import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ConnectionStatistics;
import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.io.RetainableByteBuffer;
+import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.io.ssl.SslHandshakeListener;
@@ -56,11 +64,13 @@
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.SecureRequestCustomizer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.toolchain.test.Net;
+import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
@@ -71,9 +81,14 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledForJreRange;
import org.junit.jupiter.api.condition.JRE;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -682,12 +697,7 @@ protected int networkFill(ByteBuffer input) throws IOException
// Trigger the creation of a new connection, but don't use it.
ConnectionPoolHelper.tryCreate(connectionPool);
// Verify that the connection has been created.
- while (true)
- {
- Thread.sleep(50);
- if (connectionPool.getConnectionCount() == 1)
- break;
- }
+ await().atMost(5, TimeUnit.SECONDS).until(connectionPool::getConnectionCount, is(1));
// Wait for the server to idle timeout the connection.
Thread.sleep(idleTimeout + idleTimeout / 2);
@@ -698,6 +708,299 @@ protected int networkFill(ByteBuffer input) throws IOException
assertEquals(0, clientBytes.get());
}
+ @Test
+ public void testEncryptedInputBufferRepooling() throws Exception
+ {
+ SslContextFactory.Server serverTLSFactory = createServerSslContextFactory();
+ QueuedThreadPool serverThreads = new QueuedThreadPool();
+ serverThreads.setName("server");
+ server = new Server(serverThreads);
+ var retainableByteBufferPool = new ArrayRetainableByteBufferPool()
+ {
+ @Override
+ public Pool poolFor(int capacity, boolean direct)
+ {
+ return super.poolFor(capacity, direct);
+ }
+ };
+ server.addBean(retainableByteBufferPool);
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ httpConfig.addCustomizer(new SecureRequestCustomizer());
+ HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
+ SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol())
+ {
+ @Override
+ protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
+ {
+ ByteBufferPool byteBufferPool = connector.getByteBufferPool();
+ RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class);
+ return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
+ {
+ @Override
+ protected int networkFill(ByteBuffer input) throws IOException
+ {
+ int n = super.networkFill(input);
+ if (n > 0)
+ throw new IOException("boom");
+ return n;
+ }
+ };
+ }
+ };
+ connector = new ServerConnector(server, 1, 1, ssl, http);
+ server.addConnector(connector);
+ server.setHandler(new EmptyServerHandler());
+ server.start();
+
+ SslContextFactory.Client clientTLSFactory = createClientSslContextFactory();
+ ClientConnector clientConnector = new ClientConnector();
+ clientConnector.setSelectors(1);
+ clientConnector.setSslContextFactory(clientTLSFactory);
+ QueuedThreadPool clientThreads = new QueuedThreadPool();
+ clientThreads.setName("client");
+ clientConnector.setExecutor(clientThreads);
+ client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector));
+ client.setExecutor(clientThreads);
+ client.start();
+
+ assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
+
+ Pool bucket = retainableByteBufferPool.poolFor(16 * 1024 + 1, ssl.isDirectBuffersForEncryption());
+ assertEquals(1, bucket.size());
+ assertEquals(1, bucket.getIdleCount());
+ }
+
+ @Test
+ public void testEncryptedOutputBufferRepooling() throws Exception
+ {
+ SslContextFactory.Server serverTLSFactory = createServerSslContextFactory();
+ QueuedThreadPool serverThreads = new QueuedThreadPool();
+ serverThreads.setName("server");
+ server = new Server(serverThreads);
+ List leakedBuffers = new ArrayList<>();
+ ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
+ {
+ @Override
+ public ByteBuffer acquire(int size, boolean direct)
+ {
+ ByteBuffer acquired = super.acquire(size, direct);
+ leakedBuffers.add(acquired);
+ return acquired;
+ }
+
+ @Override
+ public void release(ByteBuffer buffer)
+ {
+ leakedBuffers.remove(buffer);
+ super.release(buffer);
+ }
+ };
+ server.addBean(byteBufferPool);
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ httpConfig.addCustomizer(new SecureRequestCustomizer());
+ HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
+ SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol())
+ {
+ @Override
+ protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
+ {
+ ByteBufferPool byteBufferPool = connector.getByteBufferPool();
+ RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class);
+ return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
+ {
+ @Override
+ protected boolean networkFlush(ByteBuffer output) throws IOException
+ {
+ throw new IOException("bang");
+ }
+ };
+ }
+ };
+ connector = new ServerConnector(server, 1, 1, ssl, http);
+ server.addConnector(connector);
+ server.setHandler(new EmptyServerHandler());
+ server.start();
+
+ SslContextFactory.Client clientTLSFactory = createClientSslContextFactory();
+ ClientConnector clientConnector = new ClientConnector();
+ clientConnector.setSelectors(1);
+ clientConnector.setSslContextFactory(clientTLSFactory);
+ QueuedThreadPool clientThreads = new QueuedThreadPool();
+ clientThreads.setName("client");
+ clientConnector.setExecutor(clientThreads);
+ client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector));
+ client.setExecutor(clientThreads);
+ client.start();
+
+ assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
+
+ await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testEncryptedOutputBufferRepoolingAfterNetworkFlushReturnsFalse(boolean close) throws Exception
+ {
+ SslContextFactory.Server serverTLSFactory = createServerSslContextFactory();
+ QueuedThreadPool serverThreads = new QueuedThreadPool();
+ serverThreads.setName("server");
+ server = new Server(serverThreads);
+ List leakedBuffers = new ArrayList<>();
+ ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
+ {
+ @Override
+ public ByteBuffer acquire(int size, boolean direct)
+ {
+ ByteBuffer acquired = super.acquire(size, direct);
+ leakedBuffers.add(acquired);
+ return acquired;
+ }
+
+ @Override
+ public void release(ByteBuffer buffer)
+ {
+ leakedBuffers.remove(buffer);
+ super.release(buffer);
+ }
+ };
+ server.addBean(byteBufferPool);
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ httpConfig.addCustomizer(new SecureRequestCustomizer());
+ HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
+ AtomicBoolean failFlush = new AtomicBoolean(false);
+ SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol())
+ {
+ @Override
+ protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
+ {
+ ByteBufferPool byteBufferPool = connector.getByteBufferPool();
+ RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class);
+ return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
+ {
+ @Override
+ protected boolean networkFlush(ByteBuffer output) throws IOException
+ {
+ if (failFlush.get())
+ return false;
+ return super.networkFlush(output);
+ }
+ };
+ }
+ };
+ connector = new ServerConnector(server, 1, 1, ssl, http);
+ server.addConnector(connector);
+ server.setHandler(new EmptyServerHandler()
+ {
+ @Override
+ protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
+ {
+ failFlush.set(true);
+ if (close)
+ jettyRequest.getHttpChannel().getEndPoint().close();
+ else
+ jettyRequest.getHttpChannel().getEndPoint().shutdownOutput();
+ }
+ });
+ server.start();
+
+ SslContextFactory.Client clientTLSFactory = createClientSslContextFactory();
+ ClientConnector clientConnector = new ClientConnector();
+ clientConnector.setSelectors(1);
+ clientConnector.setSslContextFactory(clientTLSFactory);
+ QueuedThreadPool clientThreads = new QueuedThreadPool();
+ clientThreads.setName("client");
+ clientConnector.setExecutor(clientThreads);
+ client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector));
+ client.setExecutor(clientThreads);
+ client.start();
+
+ assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
+
+ await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testEncryptedOutputBufferRepoolingAfterNetworkFlushThrows(boolean close) throws Exception
+ {
+ SslContextFactory.Server serverTLSFactory = createServerSslContextFactory();
+ QueuedThreadPool serverThreads = new QueuedThreadPool();
+ serverThreads.setName("server");
+ server = new Server(serverThreads);
+ List leakedBuffers = new ArrayList<>();
+ ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool()
+ {
+ @Override
+ public ByteBuffer acquire(int size, boolean direct)
+ {
+ ByteBuffer acquired = super.acquire(size, direct);
+ leakedBuffers.add(acquired);
+ return acquired;
+ }
+
+ @Override
+ public void release(ByteBuffer buffer)
+ {
+ leakedBuffers.remove(buffer);
+ super.release(buffer);
+ }
+ };
+ server.addBean(byteBufferPool);
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ httpConfig.addCustomizer(new SecureRequestCustomizer());
+ HttpConnectionFactory http = new HttpConnectionFactory(httpConfig);
+ AtomicBoolean failFlush = new AtomicBoolean(false);
+ SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol())
+ {
+ @Override
+ protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
+ {
+ ByteBufferPool byteBufferPool = connector.getByteBufferPool();
+ RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class);
+ return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
+ {
+ @Override
+ protected boolean networkFlush(ByteBuffer output) throws IOException
+ {
+ if (failFlush.get())
+ throw new IOException();
+ return super.networkFlush(output);
+ }
+ };
+ }
+ };
+ connector = new ServerConnector(server, 1, 1, ssl, http);
+ server.addConnector(connector);
+ server.setHandler(new EmptyServerHandler()
+ {
+ @Override
+ protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
+ {
+ failFlush.set(true);
+ if (close)
+ jettyRequest.getHttpChannel().getEndPoint().close();
+ else
+ jettyRequest.getHttpChannel().getEndPoint().shutdownOutput();
+ }
+ });
+ server.start();
+
+ SslContextFactory.Client clientTLSFactory = createClientSslContextFactory();
+ ClientConnector clientConnector = new ClientConnector();
+ clientConnector.setSelectors(1);
+ clientConnector.setSslContextFactory(clientTLSFactory);
+ QueuedThreadPool clientThreads = new QueuedThreadPool();
+ clientThreads.setName("client");
+ clientConnector.setExecutor(clientThreads);
+ client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector));
+ client.setExecutor(clientThreads);
+ client.start();
+
+ assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send());
+
+ await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty()));
+ }
+
@Test
public void testNeverUsedConnectionThenClientIdleTimeout() throws Exception
{
@@ -780,12 +1083,7 @@ protected int networkFill(ByteBuffer input) throws IOException
// Trigger the creation of a new connection, but don't use it.
ConnectionPoolHelper.tryCreate(connectionPool);
// Verify that the connection has been created.
- while (true)
- {
- Thread.sleep(50);
- if (connectionPool.getConnectionCount() == 1)
- break;
- }
+ await().atMost(5, TimeUnit.SECONDS).until(connectionPool::getConnectionCount, is(1));
// Wait for the client to idle timeout the connection.
Thread.sleep(idleTimeout + idleTimeout / 2);
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java
index 03237c035ea1..759935f4bda0 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java
@@ -22,6 +22,10 @@
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
+/**
+ * The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()}
+ * divided by 4.
+ */
@ManagedObject
abstract class AbstractByteBufferPool implements ByteBufferPool
{
@@ -37,8 +41,8 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
- * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
- * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
+ * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
+ * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
*/
protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
index 0cdb05235316..02a6130c9aee 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
@@ -34,6 +34,8 @@
* Given a capacity {@code factor} of 1024, the first array element holds a queue of ByteBuffers
* each of capacity 1024, the second array element holds a queue of ByteBuffers each of capacity
* 2048, and so on.
+ * The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()}
+ * divided by 4.
*/
@ManagedObject
public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpable
@@ -48,6 +50,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa
/**
* Creates a new ArrayByteBufferPool with a default configuration.
+ * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic.
*/
public ArrayByteBufferPool()
{
@@ -56,6 +59,7 @@ public ArrayByteBufferPool()
/**
* Creates a new ArrayByteBufferPool with the given configuration.
+ * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor
@@ -68,6 +72,7 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity)
/**
* Creates a new ArrayByteBufferPool with the given configuration.
+ * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor
@@ -86,8 +91,8 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
- * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
- * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
+ * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
+ * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java
index 3fe42c147ee1..cf4c0e239bf4 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java
@@ -30,6 +30,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * A {@link RetainableByteBuffer} pool where RetainableByteBuffers are held in {@link Pool}s that are
+ * held in array elements.
+ * Given a capacity {@code factor} of 1024, the first array element holds a Pool of RetainableByteBuffers
+ * each of capacity 1024, the second array element holds a Pool of RetainableByteBuffers each of capacity
+ * 2048, and so on.
+ * The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()}
+ * divided by 4.
+ */
@ManagedObject
public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, Dumpable
{
@@ -45,21 +54,56 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool,
private final AtomicLong _currentDirectMemory = new AtomicLong();
private final Function _bucketIndexFor;
+ /**
+ * Creates a new ArrayRetainableByteBufferPool with a default configuration.
+ * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic.
+ */
public ArrayRetainableByteBufferPool()
{
- this(0, -1, -1, Integer.MAX_VALUE, -1L, -1L);
+ this(0, -1, -1, Integer.MAX_VALUE);
}
+ /**
+ * Creates a new ArrayRetainableByteBufferPool with the given configuration.
+ * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic.
+ *
+ * @param minCapacity the minimum ByteBuffer capacity
+ * @param factor the capacity factor
+ * @param maxCapacity the maximum ByteBuffer capacity
+ * @param maxBucketSize the maximum number of ByteBuffers for each bucket
+ */
public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize)
{
- this(minCapacity, factor, maxCapacity, maxBucketSize, -1L, -1L);
+ this(minCapacity, factor, maxCapacity, maxBucketSize, 0L, 0L);
}
+ /**
+ * Creates a new ArrayRetainableByteBufferPool with the given configuration.
+ *
+ * @param minCapacity the minimum ByteBuffer capacity
+ * @param factor the capacity factor
+ * @param maxCapacity the maximum ByteBuffer capacity
+ * @param maxBucketSize the maximum number of ByteBuffers for each bucket
+ * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
+ * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
+ */
public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory)
{
this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null);
}
+ /**
+ * Creates a new ArrayRetainableByteBufferPool with the given configuration.
+ *
+ * @param minCapacity the minimum ByteBuffer capacity
+ * @param factor the capacity factor
+ * @param maxCapacity the maximum ByteBuffer capacity
+ * @param maxBucketSize the maximum number of ByteBuffers for each bucket
+ * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic
+ * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic
+ * @param bucketIndexFor a {@link Function} that takes a capacity and returns a bucket index
+ * @param bucketCapacity a {@link Function} that takes a bucket index and returns a capacity
+ */
protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory,
Function bucketIndexFor, Function bucketCapacity)
{
@@ -91,8 +135,8 @@ protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapa
_maxCapacity = maxCapacity;
_direct = directArray;
_indirect = indirectArray;
- _maxHeapMemory = maxHeapMemory;
- _maxDirectMemory = maxDirectMemory;
+ _maxHeapMemory = (maxHeapMemory != 0L) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4;
+ _maxDirectMemory = (maxDirectMemory != 0L) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4;
_bucketIndexFor = bucketIndexFor;
}
@@ -156,6 +200,11 @@ private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direc
return retainableByteBuffer;
}
+ protected Pool poolFor(int capacity, boolean direct)
+ {
+ return bucketFor(capacity, direct);
+ }
+
private Bucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
index aeb34eab9c25..62214a1f866a 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java
@@ -419,8 +419,10 @@ public String toConnectionString()
connection instanceof AbstractConnection ? ((AbstractConnection)connection).toConnectionString() : connection);
}
- private void releaseEncryptedInputBuffer()
+ private void releaseEmptyEncryptedInputBuffer()
{
+ if (!_lock.isHeldByCurrentThread())
+ throw new IllegalStateException();
if (_encryptedInput != null && !_encryptedInput.hasRemaining())
{
_encryptedInput.release();
@@ -428,8 +430,10 @@ private void releaseEncryptedInputBuffer()
}
}
- protected void releaseDecryptedInputBuffer()
+ private void releaseEmptyDecryptedInputBuffer()
{
+ if (!_lock.isHeldByCurrentThread())
+ throw new IllegalStateException();
if (_decryptedInput != null && !_decryptedInput.hasRemaining())
{
_bufferPool.release(_decryptedInput);
@@ -437,7 +441,31 @@ protected void releaseDecryptedInputBuffer()
}
}
- private void releaseEncryptedOutputBuffer()
+ private void discardInputBuffers()
+ {
+ if (!_lock.isHeldByCurrentThread())
+ throw new IllegalStateException();
+ if (_encryptedInput != null)
+ _encryptedInput.clear();
+ BufferUtil.clear(_decryptedInput);
+ releaseEmptyInputBuffers();
+ }
+
+ private void releaseEmptyInputBuffers()
+ {
+ releaseEmptyEncryptedInputBuffer();
+ releaseEmptyDecryptedInputBuffer();
+ }
+
+ private void discardEncryptedOutputBuffer()
+ {
+ if (!_lock.isHeldByCurrentThread())
+ throw new IllegalStateException();
+ BufferUtil.clear(_encryptedOutput);
+ releaseEmptyEncryptedOutputBuffer();
+ }
+
+ private void releaseEmptyEncryptedOutputBuffer()
{
if (!_lock.isHeldByCurrentThread())
throw new IllegalStateException();
@@ -759,7 +787,7 @@ public int fill(ByteBuffer buffer) throws IOException
// See also system property "jsse.SSLEngine.acceptLargeFragments".
if (BufferUtil.isEmpty(_decryptedInput) && appBufferSize < getApplicationBufferSize())
{
- releaseDecryptedInputBuffer();
+ releaseEmptyDecryptedInputBuffer();
continue;
}
throw new IllegalStateException("Unexpected unwrap result " + unwrap);
@@ -790,6 +818,7 @@ public int fill(ByteBuffer buffer) throws IOException
}
catch (Throwable x)
{
+ discardInputBuffers();
Throwable f = handleException(x, "fill");
Throwable failure = handshakeFailed(f);
if (_flushState == FlushState.WAIT_FOR_FILL)
@@ -801,8 +830,7 @@ public int fill(ByteBuffer buffer) throws IOException
}
finally
{
- releaseEncryptedInputBuffer();
- releaseDecryptedInputBuffer();
+ releaseEmptyInputBuffers();
if (_flushState == FlushState.WAIT_FOR_FILL)
{
@@ -988,26 +1016,26 @@ public boolean flush(ByteBuffer... appOuts) throws IOException
}
}
- // finish of any previous flushes
- if (_encryptedOutput != null)
+ Boolean result = null;
+ try
{
- int remaining = _encryptedOutput.remaining();
- if (remaining > 0)
+ // finish of any previous flushes
+ if (_encryptedOutput != null)
{
- boolean flushed = networkFlush(_encryptedOutput);
- int written = remaining - _encryptedOutput.remaining();
- if (written > 0)
- _bytesOut.addAndGet(written);
- if (!flushed)
- return false;
+ int remaining = _encryptedOutput.remaining();
+ if (remaining > 0)
+ {
+ boolean flushed = networkFlush(_encryptedOutput);
+ int written = remaining - _encryptedOutput.remaining();
+ if (written > 0)
+ _bytesOut.addAndGet(written);
+ if (!flushed)
+ return false;
+ }
}
- }
- boolean isEmpty = BufferUtil.isEmpty(appOuts);
+ boolean isEmpty = BufferUtil.isEmpty(appOuts);
- Boolean result = null;
- try
- {
if (_flushState != FlushState.IDLE)
return result = false;
@@ -1121,7 +1149,7 @@ public boolean flush(ByteBuffer... appOuts) throws IOException
// See also system property "jsse.SSLEngine.acceptLargeFragments".
if (packetBufferSize < getPacketBufferSize())
{
- releaseEncryptedOutputBuffer();
+ releaseEmptyEncryptedOutputBuffer();
continue;
}
throw new IllegalStateException("Unexpected wrap result " + wrap);
@@ -1159,12 +1187,13 @@ public boolean flush(ByteBuffer... appOuts) throws IOException
}
catch (Throwable x)
{
+ discardEncryptedOutputBuffer();
Throwable failure = handleException(x, "flush");
throw handshakeFailed(failure);
}
finally
{
- releaseEncryptedOutputBuffer();
+ releaseEmptyEncryptedOutputBuffer();
if (LOG.isDebugEnabled())
LOG.debug(" endPoint.close()), write);
+ }, t -> disconnect()), write);
}
}
}
if (close)
- endPoint.close();
+ disconnect();
else
ensureFillInterested();
}
catch (Throwable x)
{
- LOG.trace("IGNORED", x);
- endPoint.close();
+ if (LOG.isTraceEnabled())
+ LOG.trace("IGNORED", x);
+ disconnect();
+ }
+ }
+
+ private void disconnect()
+ {
+ try (AutoLock l = _lock.lock())
+ {
+ discardEncryptedOutputBuffer();
}
+ getEndPoint().close();
}
private void closeOutbound()
@@ -1382,9 +1426,12 @@ private boolean isOutboundDone()
@Override
public void doClose()
{
+ try (AutoLock l = _lock.lock())
+ {
+ discardInputBuffers();
+ }
// First send the TLS Close Alert, then the FIN.
- doShutdownOutput();
- getEndPoint().close();
+ doShutdownOutput(true);
super.doClose();
}
@@ -1537,7 +1584,7 @@ public void succeeded()
{
if (LOG.isDebugEnabled())
LOG.debug("IncompleteWriteCB succeeded {}", SslConnection.this);
- releaseEncryptedOutputBuffer();
+ releaseEmptyEncryptedOutputBuffer();
_flushState = FlushState.IDLE;
interested = _fillState == FillState.INTERESTED;
@@ -1563,8 +1610,7 @@ public void failed(final Throwable x)
if (LOG.isDebugEnabled())
LOG.debug("IncompleteWriteCB failed {}", SslConnection.this, x);
- BufferUtil.clear(_encryptedOutput);
- releaseEncryptedOutputBuffer();
+ discardEncryptedOutputBuffer();
_flushState = FlushState.IDLE;
failFillInterest = _fillState == FillState.WAIT_FOR_FLUSH ||