Skip to content

Commit

Permalink
Updates from reviews.
Browse files Browse the repository at this point in the history
Introduced:

* Chunk.retainAndCreate(ByteBuffer, boolean, Retainable)
* Chunk.createFromRetained(ByteBuffer, boolean, Retainable)

to replace from(ByteBuffer, boolean, Retainable).

The idea is to explicitly cover the 2 use cases that "wrap"
a Retainable, that in some case is already retained, in
some other case needs to be retained.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Dec 20, 2022
1 parent 180a77b commit c1a2c36
Show file tree
Hide file tree
Showing 14 changed files with 46 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,7 @@ public boolean content(ByteBuffer buffer)
if (chunk != null)
throw new IllegalStateException("Content generated with unconsumed content left");

RetainableByteBuffer networkBuffer = this.networkBuffer;
chunk = Content.Chunk.from(buffer, false, networkBuffer);
chunk = Content.Chunk.retainAndCreate(buffer, false, networkBuffer);
if (LOG.isDebugEnabled())
LOG.debug("Setting action to responseContentAvailable on {}", this);
if (getAndSetAction(this::responseContentAvailable) != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ public boolean onContent(int request, FCGI.StreamType stream, ByteBuffer buffer)
HttpChannelOverFCGI channel = HttpConnectionOverFCGI.this.channel;
if (channel != null)
{
Content.Chunk chunk = Content.Chunk.from(buffer, false, networkBuffer);
Content.Chunk chunk = Content.Chunk.retainAndCreate(buffer, false, networkBuffer);
if (getAndSetAction(() -> channel.content(chunk)) != null)
throw new IllegalStateException();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ public boolean onContent(int request, FCGI.StreamType streamType, ByteBuffer buf
LOG.debug("Request {} {} content {} on {}", request, streamType, buffer, stream);
if (stream != null)
{
stream.onContent(Content.Chunk.from(buffer, false, networkBuffer));
stream.onContent(Content.Chunk.retainAndCreate(buffer, false, networkBuffer));
// Signal that the content is processed asynchronously, to ensure backpressure.
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,16 +635,9 @@ public Content.Chunk read()
yield chunk;
state = State.MIDDLE;
if (chunk.hasRemaining())
{
Content.Chunk wrapped = Content.Chunk.from(chunk.getByteBuffer(), false, chunk);
chunk.release();
yield wrapped;
}
else
{
chunk.release();
yield Content.Chunk.EMPTY;
}
yield Content.Chunk.createFromRetained(chunk.getByteBuffer(), false, chunk);
chunk.release();
yield Content.Chunk.EMPTY;
}
case COMPLETE -> Content.Chunk.EOF;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ public Content.Chunk read(boolean fillInterestIfNeeded)
boolean last = frame.remaining() == 0 && frame.isEndStream();
if (last)
responseSuccess(getHttpExchange(), null);
Content.Chunk chunk = Content.Chunk.from(frame.getData(), last, data);
data.release();
return chunk;
return Content.Chunk.createFromRetained(frame.getData(), last, data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,7 @@ private Content.Chunk createChunk(Stream.Data data)
DataFrame frame = data.frame();
if (frame.isEndStream() && frame.remaining() == 0)
return Content.Chunk.EOF;
// Implicit retain because we are passing the ByteBuffer to the Chunk.
return Content.Chunk.from(frame.getData(), frame.isEndStream(), data);
return Content.Chunk.retainAndCreate(frame.getData(), frame.isEndStream(), data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,7 @@ public Content.Chunk read(boolean fillInterestIfNeeded)
boolean last = !byteBuffer.hasRemaining() && data.isLast();
if (last)
responseSuccess(getHttpExchange(), null);
Content.Chunk chunk = Content.Chunk.from(byteBuffer, last, data);
data.release();
return chunk;
return Content.Chunk.createFromRetained(byteBuffer, last, data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,7 @@ private Content.Chunk createChunk(Stream.Data data)
{
if (data == Stream.Data.EOF)
return Content.Chunk.EOF;
// Implicit retain because we are passing the ByteBuffer to the Chunk.
return Content.Chunk.from(data.getByteBuffer(), data.isLast(), data);
return Content.Chunk.retainAndCreate(data.getByteBuffer(), data.isLast(), data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,19 +483,44 @@ static Chunk from(ByteBuffer byteBuffer, boolean last, Consumer<ByteBuffer> rele
return last ? EOF : EMPTY;
}

/**
* <p>Creates a last/non-last Chunk with the given ByteBuffer, linked to the given {@link Retainable}.</p>
* <p>The {@link #retain()} and {@link #release()} methods of this Chunk will delegate to the given Retainable.</p>
* <p>Use this method when the Retainable is already retained.</p>
*
* @param byteBuffer the ByteBuffer with the bytes of this Chunk
* @param last whether the Chunk is the last one
* @param retainable the Retainable this Chunk links to
* @return a new Chunk
* @throws IllegalArgumentException if the {@code Retainable}
* {@link Retainable#canRetain() cannot be retained}
* @see #retainAndCreate(ByteBuffer, boolean, Retainable)
*/
static Chunk createFromRetained(ByteBuffer byteBuffer, boolean last, Retainable retainable)
{
if (!retainable.canRetain())
throw new IllegalArgumentException("Cannot create chunk from non-retainable " + retainable);
if (byteBuffer.hasRemaining())
return new ByteBufferChunk.WithRetainable(byteBuffer, last, Objects.requireNonNull(retainable));
retainable.release();
return last ? EOF : EMPTY;
}

/**
* <p>Creates a last/non-last Chunk with the given ByteBuffer, linked to the given {@link Retainable}.</p>
* <p>The given Retainable is retained, and the {@link #retain()} and {@link #release()} methods of
* this Chunk will delegate to the given Retainable.</p>
* <p>Use this method when the Retainable is not already retained.</p>
*
* @param byteBuffer the ByteBuffer with the bytes of this Chunk
* @param last whether the Chunk is the last one
* @param retainable the Retainable this Chunk links to
* @return a new Chunk
* @throws IllegalArgumentException if the {@code Retainable}
* {@link Retainable#canRetain() cannot be retained}
* @see #createFromRetained(ByteBuffer, boolean, Retainable)
*/
static Chunk from(ByteBuffer byteBuffer, boolean last, Retainable retainable)
static Chunk retainAndCreate(ByteBuffer byteBuffer, boolean last, Retainable retainable)
{
if (!retainable.canRetain())
throw new IllegalArgumentException("Cannot create chunk from non-retainable " + retainable);
Expand Down Expand Up @@ -575,15 +600,15 @@ static Chunk next(Chunk chunk)
* </ul>
* <p>If the source has remaining bytes, the returned {@code Chunk} retains
* the source {@code Chunk} and it is linked to it via
* {@link #from(ByteBuffer, boolean, Retainable)}.</p>
* {@link #retainAndCreate(ByteBuffer, boolean, Retainable)}.</p>
*
* @return a new {@code Chunk} retained from the source {@code Chunk} with a slice
* of the source {@code Chunk}'s {@code ByteBuffer}
*/
default Chunk slice()
{
if (hasRemaining())
return from(getByteBuffer().slice(), isLast(), this);
return retainAndCreate(getByteBuffer().slice(), isLast(), this);
else
return isLast() ? this : EMPTY;
}
Expand All @@ -600,7 +625,7 @@ default Chunk slice()
* returned depending on the value of {@code last}.</p>
* <p>If the source has remaining bytes, the returned {@code Chunk} retains
* the source {@code Chunk} and it is linked to it via
* {@link #from(ByteBuffer, boolean, Retainable)}.</p>
* {@link #retainAndCreate(ByteBuffer, boolean, Retainable)}.</p>
*
* @param position the position at which the slice begins
* @param limit the limit at which the slice ends
Expand All @@ -622,7 +647,7 @@ default Chunk slice(int position, int limit, boolean last)
ByteBuffer slice = sourceBuffer.slice();
sourceBuffer.limit(sourceLimit);
sourceBuffer.position(sourcePosition);
return from(slice, last, this);
return retainAndCreate(slice, last, this);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ public void write(Content.Chunk chunk, Callback callback)
{
if (chunk.canRetain())
{
// Implicit retain that will be paired with the release done by the reader.
chunk = Content.Chunk.from(chunk.getByteBuffer(), chunk.isLast(), new Retainable.Wrapper(chunk)
chunk = Content.Chunk.retainAndCreate(chunk.getByteBuffer(), chunk.isLast(), new Retainable.Wrapper(chunk)
{
@Override
public boolean release()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ public Content.Chunk read()
else
{
buffer.limit(read);
Content.Chunk chunk = Content.Chunk.from(buffer, false, streamBuffer);
streamBuffer.release();
return chunk;
return Content.Chunk.createFromRetained(buffer, false, streamBuffer);
}
}
catch (Throwable x)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,7 @@ public Content.Chunk read()
if (last)
IO.close(channel);

Content.Chunk chunk = Content.Chunk.from(byteBuffer, last, retainableByteBuffer);
retainableByteBuffer.release();
return chunk;
return Content.Chunk.createFromRetained(byteBuffer, last, retainableByteBuffer);
}

protected SeekableByteChannel open() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ public void testFromEmptyByteBufferWithRetainableReleaser()
{
Retainable.ReferenceCounter referenceCounter1 = new Retainable.ReferenceCounter(2);
assertThat(referenceCounter1.isRetained(), is(true));
assertThat(Content.Chunk.from(ByteBuffer.wrap(new byte[0]), true, referenceCounter1), sameInstance(Content.Chunk.EOF));
assertThat(Content.Chunk.createFromRetained(ByteBuffer.wrap(new byte[0]), true, referenceCounter1), sameInstance(Content.Chunk.EOF));
assertThat(referenceCounter1.isRetained(), is(false));
assertThat(referenceCounter1.release(), is(true));

Retainable.ReferenceCounter referenceCounter2 = new Retainable.ReferenceCounter(2);
assertThat(referenceCounter2.isRetained(), is(true));
assertThat(Content.Chunk.from(ByteBuffer.wrap(new byte[0]), false, referenceCounter2), sameInstance(Content.Chunk.EMPTY));
assertThat(Content.Chunk.createFromRetained(ByteBuffer.wrap(new byte[0]), false, referenceCounter2), sameInstance(Content.Chunk.EMPTY));
assertThat(referenceCounter2.isRetained(), is(false));
assertThat(referenceCounter2.release(), is(true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.WriteFlusher;
Expand Down Expand Up @@ -979,7 +978,7 @@ public boolean content(ByteBuffer buffer)
if (LOG.isDebugEnabled())
LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), _retainableByteBuffer, HttpConnection.this);

stream._chunk = Content.Chunk.from(buffer, false, new ChunkRetainable(_retainableByteBuffer, buffer));
stream._chunk = Content.Chunk.retainAndCreate(buffer, false, _retainableByteBuffer);
return true;
}

Expand Down Expand Up @@ -1094,26 +1093,6 @@ public void onComplianceViolation(ComplianceViolation.Mode mode, ComplianceViola
}
}

private class ChunkRetainable extends Retainable.Wrapper
{
private final ByteBuffer buffer;

private ChunkRetainable(Retainable retainable, ByteBuffer buffer)
{
super(retainable);
this.buffer = buffer;
}

@Override
public boolean release()
{
boolean released = super.release();
if (LOG.isDebugEnabled())
LOG.debug("content released {} {}/{} for {}", released, BufferUtil.toDetailString(buffer), getWrapped(), HttpConnection.this);
return released;
}
}

protected class HttpStreamOverHTTP1 implements HttpStream
{
private final long _nanoTime = NanoTime.now();
Expand Down

0 comments on commit c1a2c36

Please sign in to comment.