Skip to content

Commit

Permalink
Fixes #4971 - Simplify Connection.upgradeFrom()/upgradeTo().
Browse files Browse the repository at this point in the history
Now the upgrade-from connection produces a "floating" buffer (not belonging to a pool), so that it can release the original buffer.

The upgrade-to connection is free to copy or store this "floating" buffer.

Updated javadocs and all implementations.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Jun 30, 2020
1 parent 43e5473 commit cef86c9
Show file tree
Hide file tree
Showing 11 changed files with 109 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ protected Parser getParser()

protected void setInputBuffer(ByteBuffer buffer)
{
if (buffer != null)
producer.setInputBuffer(buffer);
producer.setInputBuffer(buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,13 @@ public void upgrade(Connection newConnection)
oldConnection.onClose();
oldConnection.getEndPoint().setConnection(newConnection);

if (newConnection instanceof Connection.UpgradeTo)
((Connection.UpgradeTo)newConnection).onUpgradeTo(buffer);
else if (BufferUtil.hasContent(buffer))
throw new IllegalStateException("Cannot upgrade: " + newConnection + " does not implement " + Connection.UpgradeTo.class.getName());

if (BufferUtil.hasContent(buffer))
{
if (newConnection instanceof Connection.UpgradeTo)
((Connection.UpgradeTo)newConnection).onUpgradeTo(buffer);
else
throw new IllegalStateException("Cannot upgrade: " + newConnection + " does not implement " + Connection.UpgradeTo.class.getName());
}
newConnection.onOpen();
}

Expand Down
42 changes: 30 additions & 12 deletions jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,31 +96,49 @@ public interface Connection extends Closeable

long getCreatedTimeStamp();

/**
* <p>{@link Connection} implementations implement this interface when they
* can upgrade from the protocol they speak (for example HTTP/1.1)
* to a different protocol (e.g. HTTP/2).</p>
*
* @see EndPoint#upgrade(Connection)
* @see UpgradeTo
*/
interface UpgradeFrom
{
/**
* <p>Takes the input buffer from the connection on upgrade.</p>
* <p>This method is used to take any unconsumed input from
* a connection during an upgrade.</p>
* <p>Invoked during an {@link EndPoint#upgrade(Connection) upgrade}
* to produce a buffer containing bytes that have not been consumed by
* this connection, and that must be consumed by the upgrade-to
* connection.</p>
*
* @return A buffer of unconsumed input. The caller must return the buffer
* to the bufferpool when consumed and this connection must not.
* @return a buffer of unconsumed bytes to pass to the upgrade-to connection.
* The buffer does not belong to any pool and should be discarded after
* having consumed its bytes.
* The returned buffer may be null if there are no unconsumed bytes.
*/
ByteBuffer onUpgradeFrom();
}

/**
* <p>{@link Connection} implementations implement this interface when they
* can be upgraded to the protocol they speak (e.g. HTTP/2)
* from a different protocol (e.g. HTTP/1.1).</p>
*/
interface UpgradeTo
{
/**
* <p>Callback method invoked when this connection is upgraded.</p>
* <p>This must be called before {@link #onOpen()}.</p>
* <p>Invoked during an {@link EndPoint#upgrade(Connection) upgrade}
* to receive a buffer containing bytes that have not been consumed by
* the upgrade-from connection, and that must be consumed by this
* connection.</p>
*
* @param prefilled An optional buffer that can contain prefilled data. Typically this
* results from an upgrade of one protocol to the other where the old connection has buffered
* data destined for the new connection. The new connection must take ownership of the buffer
* and is responsible for returning it to the buffer pool
* @param buffer a non-null buffer of unconsumed bytes received from
* the upgrade-from connection.
* The buffer does not belong to any pool and should be discarded after
* having consumed its bytes.
*/
void onUpgradeTo(ByteBuffer prefilled);
void onUpgradeTo(ByteBuffer buffer);
}

/**
Expand Down
14 changes: 8 additions & 6 deletions jetty-io/src/main/java/org/eclipse/jetty/io/EndPoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,15 @@ public interface EndPoint extends Closeable
boolean isOptimizedForDirectBuffers();

/**
* Upgrade connections.
* Close the old connection, update the endpoint and open the new connection.
* If the oldConnection is an instance of {@link Connection.UpgradeFrom} then
* a prefilled buffer is requested and passed to the newConnection if it is an instance
* of {@link Connection.UpgradeTo}
* <p>Upgrades this EndPoint from the current connection to the given new connection.</p>
* <p>Closes the current connection, links this EndPoint to the new connection and
* then opens the new connection.</p>
* <p>If the current connection is an instance of {@link Connection.UpgradeFrom} then
* a buffer of unconsumed bytes is requested; if there are unconsumed bytes, and if
* the new connection is an instance of {@link Connection.UpgradeTo}, the unconsumed
* buffer is passed to the new connection.</p>
*
* @param newConnection The connection to upgrade to
* @param newConnection the connection to upgrade to
*/
void upgrade(Connection newConnection);
}
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,8 @@ private void acquireEncryptedOutput()
@Override
public void onUpgradeTo(ByteBuffer buffer)
{
if (BufferUtil.hasContent(buffer))
{
acquireEncryptedInput();
BufferUtil.append(_encryptedInput, buffer);
}
acquireEncryptedInput();
BufferUtil.append(_encryptedInput, buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.eclipse.jetty.server.HttpTransport;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.HandlerWrapper;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.HostPort;
import org.eclipse.jetty.util.Promise;
Expand Down Expand Up @@ -613,19 +612,20 @@ public DownstreamConnection(EndPoint endPoint, Executor executor, ByteBufferPool
@Override
public void onUpgradeTo(ByteBuffer buffer)
{
this.buffer = buffer == null ? BufferUtil.EMPTY_BUFFER : buffer;
this.buffer = buffer;
}

@Override
public void onOpen()
{
super.onOpen();
final int remaining = buffer.remaining();
int remaining = buffer.remaining();
write(getConnection().getEndPoint(), buffer, new Callback()
{
@Override
public void succeeded()
{
buffer = null;
if (LOG.isDebugEnabled())
LOG.debug("{} wrote initial {} bytes to server", DownstreamConnection.this, remaining);
fillInterested();
Expand All @@ -634,6 +634,7 @@ public void succeeded()
@Override
public void failed(Throwable x)
{
buffer = null;
if (LOG.isDebugEnabled())
LOG.debug(this + " failed to write initial " + remaining + " bytes to server", x);
close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,18 +153,24 @@ private DetectorConnection(EndPoint endp, Connector connector)
}

@Override
public void onUpgradeTo(ByteBuffer prefilled)
public void onUpgradeTo(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
LOG.debug("Detector {} copying prefilled buffer {}", getProtocol(), BufferUtil.toDetailString(prefilled));
if (BufferUtil.hasContent(prefilled))
BufferUtil.append(_buffer, prefilled);
LOG.debug("Detector {} copying unconsumed buffer {}", getProtocol(), BufferUtil.toDetailString(buffer));
BufferUtil.append(_buffer, buffer);
}

@Override
public ByteBuffer onUpgradeFrom()
{
return _buffer;
if (_buffer.hasRemaining())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_buffer.remaining());
unconsumed.put(_buffer);
_connector.getByteBufferPool().release(_buffer);
return unconsumed;
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,18 +192,18 @@ public ByteBuffer onUpgradeFrom()
{
if (BufferUtil.hasContent(_requestBuffer))
{
ByteBuffer buffer = _requestBuffer;
_requestBuffer = null;
return buffer;
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_requestBuffer.remaining());
unconsumed.put(_requestBuffer);
releaseRequestBuffer();
return unconsumed;
}
return null;
}

@Override
public void onUpgradeTo(ByteBuffer buffer)
{
if (BufferUtil.hasContent(buffer))
BufferUtil.append(getRequestBuffer(), buffer);
BufferUtil.append(getRequestBuffer(), buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,22 @@ public void onOpen()
@Override
public ByteBuffer onUpgradeFrom()
{
return _buffer;
if (_buffer.hasRemaining())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_buffer.remaining());
unconsumed.put(_buffer);
_connector.getByteBufferPool().release(_buffer);
return unconsumed;
}
return null;
}

@Override
public void onUpgradeTo(ByteBuffer prefilled)
public void onUpgradeTo(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
LOG.debug("Proxy v1 copying prefilled buffer {}", BufferUtil.toDetailString(prefilled));
if (BufferUtil.hasContent(prefilled))
BufferUtil.append(_buffer, prefilled);
LOG.debug("Proxy v1 copying unconsumed buffer {}", BufferUtil.toDetailString(buffer));
BufferUtil.append(_buffer, buffer);
}

/**
Expand Down Expand Up @@ -442,12 +448,11 @@ protected ProxyProtocolV2Connection(EndPoint endp, Connector connector, Connecti
}

@Override
public void onUpgradeTo(ByteBuffer prefilled)
public void onUpgradeTo(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
LOG.debug("Proxy v2 copying prefilled buffer {}", BufferUtil.toDetailString(prefilled));
if (BufferUtil.hasContent(prefilled))
BufferUtil.append(_buffer, prefilled);
LOG.debug("Proxy v2 copying unconsumed buffer {}", BufferUtil.toDetailString(buffer));
BufferUtil.append(_buffer, buffer);
}

@Override
Expand Down Expand Up @@ -540,7 +545,14 @@ public void onFillable()
@Override
public ByteBuffer onUpgradeFrom()
{
return _buffer;
if (_buffer.hasRemaining())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_buffer.remaining());
unconsumed.put(_buffer);
_connector.getByteBufferPool().release(_buffer);
return unconsumed;
}
return null;
}

private void parseBodyAndUpgrade() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public long getOnFillableCount()
private final LongAdder bytesIn = new LongAdder();
private WebSocketSession session;
private List<ExtensionConfig> extensions = new ArrayList<>();
private ByteBuffer prefillBuffer;
private ByteBuffer initialBuffer;
private Stats stats = new Stats();
private CloseInfo fatalCloseInfo;

Expand Down Expand Up @@ -255,13 +255,12 @@ public boolean opened()
{
if (connectionState.opened())
{
if (BufferUtil.hasContent(prefillBuffer))
if (BufferUtil.hasContent(initialBuffer))
{
if (LOG.isDebugEnabled())
{
LOG.debug("Parsing Upgrade prefill buffer ({} remaining)", prefillBuffer.remaining());
}
parser.parse(prefillBuffer);
LOG.debug("Parsing upgrade initial buffer ({} remaining)", initialBuffer.remaining());
parser.parse(initialBuffer);
initialBuffer = null;
}
fillInterested();
return true;
Expand Down Expand Up @@ -545,15 +544,13 @@ protected void onFillInterestedFailed(Throwable cause)
* be processed by the websocket parser before starting
* to read bytes from the connection
*
* @param prefilled the bytes of prefilled content encountered during upgrade
* @param initialBuffer the bytes of unconsumed content encountered during upgrade
*/
protected void setInitialBuffer(ByteBuffer prefilled)
protected void setInitialBuffer(ByteBuffer initialBuffer)
{
if (LOG.isDebugEnabled())
{
LOG.debug("set Initial Buffer - {}", BufferUtil.toDetailString(prefilled));
}
prefillBuffer = prefilled;
LOG.debug("Set initial buffer - {}", BufferUtil.toDetailString(initialBuffer));
this.initialBuffer = initialBuffer;
}

/**
Expand Down Expand Up @@ -646,14 +643,11 @@ public String toConnectionString()
* to read bytes from the connection
*/
@Override
public void onUpgradeTo(ByteBuffer prefilled)
public void onUpgradeTo(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
{
LOG.debug("onUpgradeTo({})", BufferUtil.toDetailString(prefilled));
}

setInitialBuffer(prefilled);
LOG.debug("onUpgradeTo({})", BufferUtil.toDetailString(buffer));
setInitialBuffer(buffer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,9 @@ public void onFillable()
}

@Override
public void onUpgradeTo(ByteBuffer prefilled)
public void onUpgradeTo(ByteBuffer buffer)
{
setInitialBuffer(prefilled);
setInitialBuffer(buffer);
}

@Override
Expand Down Expand Up @@ -259,20 +259,15 @@ public void writeRawSlowly(ByteBuffer buf, int segmentSize) throws IOException
* be processed by the websocket parser before starting
* to read bytes from the connection
*
* @param prefilled the bytes of prefilled content encountered during upgrade
* @param initialBuffer the bytes of unconsumed content encountered during upgrade
*/
protected void setInitialBuffer(ByteBuffer prefilled)
protected void setInitialBuffer(ByteBuffer initialBuffer)
{
if (log.isDebugEnabled())
if (BufferUtil.hasContent(initialBuffer))
{
log.debug("set Initial Buffer - {}", BufferUtil.toDetailString(prefilled));
}

if ((prefilled != null) && (prefilled.hasRemaining()))
{
networkBuffer = bufferPool.acquire(prefilled.remaining(), true);
networkBuffer = bufferPool.acquire(initialBuffer.remaining(), true);
BufferUtil.clearToFill(networkBuffer);
BufferUtil.put(prefilled, networkBuffer);
BufferUtil.put(initialBuffer, networkBuffer);
BufferUtil.flipToFlush(networkBuffer, 0);
}
}
Expand Down

0 comments on commit cef86c9

Please sign in to comment.