Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes retainability of special Chunks #9073

Merged
merged 12 commits into from
Jan 13, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,7 @@ public void listeners() throws Exception
.onResponseBegin(response -> { /* ... */ })
.onResponseHeader((response, field) -> true)
.onResponseHeaders(response -> { /* ... */ })
.onResponseContentAsync((response, chunk, demander) ->
{
chunk.release();
demander.run();
})
.onResponseContentAsync((response, chunk, demander) -> demander.run())
.onResponseFailure((response, failure) -> { /* ... */ })
.onResponseSuccess(response -> { /* ... */ })
// Result hook.
Expand Down Expand Up @@ -505,12 +501,12 @@ private void forwardContent(Response response, Content.Source contentSource)
// nor demanded again until the demand callback is invoked.
return;
}
// Check if the chunk is the terminal one, in which case the
// Check if the chunk is last and empty, in which case the
// read/demand loop is done. Demanding again when the terminal
// chunk has been read will invoke the demand callback with
// the same terminal chunk, so this check must be present to
// avoid infinitely demanding and reading the terminal chunk.
if (chunk.isTerminal())
if (chunk.isLast() && !chunk.hasRemaining())
{
chunk.release();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,31 +102,27 @@ public void onContent(Response response, Content.Chunk chunk, Runnable demander)
{
if (LOG.isDebugEnabled())
LOG.debug("Skipped empty chunk {}", chunk);
chunk.release();
demander.run();
return;
}

boolean closed;
try (AutoLock.WithCondition l = lock.lock())
{
closed = this.closed;
if (!closed)
{
if (LOG.isDebugEnabled())
LOG.debug("Queueing chunk {}", chunk);
if (chunk.canRetain())
chunk.retain();
chunkCallbacks.add(new ChunkCallback(chunk, demander, response::abort));
l.signalAll();
return;
}
}

if (closed)
{
if (LOG.isDebugEnabled())
LOG.debug("InputStream closed, ignored chunk {}", chunk);
chunk.release();
response.abort(new AsynchronousCloseException());
}
if (LOG.isDebugEnabled())
LOG.debug("InputStream closed, ignored chunk {}", chunk);
response.abort(new AsynchronousCloseException());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,10 @@ default void onContent(Response response, Content.Chunk chunk, Runnable demander
try
{
onContent(response, chunk.getByteBuffer());
chunk.release();
demander.run();
}
catch (Throwable x)
{
chunk.release();
response.abort(x);
}
}
Expand Down Expand Up @@ -190,10 +188,22 @@ default void onContentSource(Response response, Content.Source contentSource)
response.abort(error.getCause());
return;
}
if (chunk.isTerminal())
if (chunk.isLast() && !chunk.hasRemaining())
{
chunk.release();
return;
lorban marked this conversation as resolved.
Show resolved Hide resolved
}

onContent(response, chunk, () -> contentSource.demand(demandCallback));
try
{
onContent(response, chunk, () -> contentSource.demand(demandCallback));
chunk.release();
}
catch (Throwable x)
{
chunk.release();
response.abort(x);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ protected Content.Chunk transform(Content.Chunk inputChunk)
return _chunk;

// Retain the input chunk because its ByteBuffer will be referenced by the Inflater.
if (retain && _chunk.hasRemaining())
if (retain && _chunk.canRetain())
_chunk.retain();
if (LOG.isDebugEnabled())
LOG.debug("decoding: {}", _chunk);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ private void onDemandCallback()
{
demultiplexerContentSource.onChunk(chunk);
}
chunk.release();
}

private void registerFailure(Throwable failure)
Expand Down Expand Up @@ -368,6 +369,12 @@ public boolean isLast()
throw new UnsupportedOperationException();
}

@Override
public boolean canRetain()
{
throw new UnsupportedOperationException();
}

@Override
public void retain()
{
Expand All @@ -383,7 +390,7 @@ public boolean release()
@Override
public String toString()
{
return "ALREADY_READ_CHUNK";
return "AlreadyReadChunk";
}
};
private final int index;
Expand All @@ -401,9 +408,18 @@ private void onChunk(Content.Chunk chunk)
if (LOG.isDebugEnabled())
LOG.debug("Registering content in multiplexed content source #{} that contains {}", index, currentChunk);
if (currentChunk == null || currentChunk == ALREADY_READ_CHUNK)
this.chunk = chunk.slice();
{
if (chunk.hasRemaining())
chunk = Content.Chunk.asChunk(chunk.getByteBuffer().slice(), chunk.isLast(), chunk);
// Retain the slice because it is stored for later reads.
if (chunk.canRetain())
chunk.retain();
this.chunk = chunk;
}
else if (!currentChunk.isLast())
{
throw new IllegalStateException("Cannot overwrite chunk");
}
onDemandCallback();
}

Expand Down Expand Up @@ -436,8 +452,8 @@ public Content.Chunk read()
}

Content.Chunk result = chunk;
if (result != null && !result.isTerminal())
chunk = ALREADY_READ_CHUNK;
if (result != null)
chunk = result.isLast() ? Content.Chunk.next(result) : ALREADY_READ_CHUNK;
if (LOG.isDebugEnabled())
LOG.debug("Content source #{} reading current chunk {}", index, result);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,10 @@ public boolean content(ByteBuffer buffer)
if (chunk != null)
throw new IllegalStateException("Content generated with unconsumed content left");

RetainableByteBuffer networkBuffer = this.networkBuffer;
// Retain the chunk because it is stored for later use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

--- // Retain the chunk because it is stored for later use.
+++ // Retain the networkBuffer because it is stored as a chunk for later use.

networkBuffer.retain();
chunk = Content.Chunk.from(buffer, false, networkBuffer);
chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);

if (LOG.isDebugEnabled())
LOG.debug("Setting action to responseContentAvailable on {}", this);
if (getAndSetAction(this::responseContentAvailable) != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ protected void service(Request request, org.eclipse.jetty.server.Response respon
.scheme(scenario.getScheme())
.onResponseContentAsync((response, chunk, demander) ->
{
chunk.release();
contentCount.incrementAndGet();
demanderRef.set(demander);
contentLatch.get().countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public void testServerContentTerminalClientContentDelay() throws Exception
client.newRequest("localhost", server.getLocalPort())
.onResponseContentAsync((response, chunk, demander) ->
{
chunk.release();
if (demanderRef.compareAndSet(null, demander))
firstContentLatch.countDown();
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,7 @@ protected void service(org.eclipse.jetty.server.Request request, org.eclipse.jet

AsyncRequestContent body = new AsyncRequestContent();
body.write(false, BufferUtil.allocate(512), Callback.NOOP);
body.write(false, BufferUtil.allocate(512), Callback.NOOP);
body.write(Content.Chunk.from(new IOException("explicitly_thrown_by_test")), Callback.NOOP);
body.write(false, BufferUtil.allocate(512), Callback.from(() -> body.fail(new IOException("explicitly_thrown_by_test"))));
CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
Expand Down Expand Up @@ -1412,7 +1411,6 @@ public boolean process(org.eclipse.jetty.server.Request request, org.eclipse.jet
@Override
public void onContent(Response response, Content.Chunk chunk, Runnable demander)
{
chunk.release();
// Do not notify the callback yet.
demanderRef.set(demander);
contentLatch.countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ protected void service(Request request, Response response) throws Throwable
}
else
{
if (chunk.hasRemaining())
chunk.release();
chunk.release();
if (chunk.isLast())
break;
NanoTime.spinWait(TimeUnit.MICROSECONDS.toNanos(1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public void testWriteFlushDemandFlushBlocksUntilRead() throws Exception

Content.Chunk chunk = content.read();
assertNotNull(chunk);
chunk.release();

// Flush should return.
assertTrue(await(task, 5000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,11 +484,13 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
networkBuffer.retain();
Content.Chunk chunk = Content.Chunk.from(buffer, false, networkBuffer);
if (getAndSetAction(() -> channel.content(chunk)) != null)
throw new IllegalStateException();
return true;
// No need to call networkBuffer.retain() here, since we know
// that the action will be run before releasing the networkBuffer.
// The receiver of the chunk decides whether to consume/retain it.
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);
if (getAndSetAction(() -> channel.content(chunk)) == null)
return true;
throw new IllegalStateException();
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ void content(Content.Chunk chunk)
{
if (this.chunk != null)
throw new IllegalStateException();
// Retain the chunk because it is stored for later reads.
if (chunk.canRetain())
chunk.retain();
this.chunk = chunk;
responseContentAvailable();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,6 @@ public boolean process(Request request, Response response, Callback callback)
{
response.abort(x);
}
finally
{
chunk.release();
}
})
.path(proxyContext.getContextPath() + path);
FutureResponseListener listener = new FutureResponseListener(request, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ private void notifyContentAvailable()

public void onContent(Content.Chunk chunk)
{
// Retain the chunk because it is stored for later reads.
if (chunk.canRetain())
chunk.retain();
_chunk = chunk;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,10 @@ public boolean onContent(int request, FCGI.StreamType streamType, ByteBuffer buf
LOG.debug("Request {} {} content {} on {}", request, streamType, buffer, stream);
if (stream != null)
{
networkBuffer.retain();
stream.onContent(Content.Chunk.from(buffer, false, networkBuffer));
// No need to call networkBuffer.retain() here.
// The receiver of the chunk decides whether to consume/retain it.
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);
stream.onContent(chunk);
// Signal that the content is processed asynchronously, to ensure backpressure.
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,6 @@ public boolean process(org.eclipse.jetty.server.Request request, org.eclipse.jet
.scheme(scheme)
.onResponseContentAsync((response, chunk, demander) ->
{
chunk.release();
contentCount.incrementAndGet();
demanderRef.set(demander);
contentLatch.get().countDown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,9 +222,9 @@ else if ((_flags & 0x2) == 0x2)

if (buffer.hasRemaining())
{
ByteBuffer chunk = buffer;
ByteBuffer decoded = buffer;
buffer = null;
if (decodedChunk(chunk))
if (decodedChunk(decoded))
return;
}
else if (_inflater.needsInput())
Expand Down
Loading