Skip to content

Commit

Permalink
Fixes #10679 - Review HTTP/2 rate control. (#10681)
Browse files Browse the repository at this point in the history
* Bumped the rate control rate from 50 events/s to 128.
* Added rate control for all CONTINUATION frames.
* Added rate control for invalid PUSH_PROMISE frames.
* Added rate control for RST_STREAM frames.
* Added rate control for all SETTINGS frames.
* Fixed growth of header block accumulation buffer.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet authored Oct 9, 2023
1 parent 90fdd42 commit dbb9451
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
*/
public abstract class BodyParser
{
protected static final Logger LOG = LoggerFactory.getLogger(BodyParser.class);
private static final Logger LOG = LoggerFactory.getLogger(BodyParser.class);

private final HeaderParser headerParser;
private final Parser.Listener listener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,28 @@ public boolean parse(ByteBuffer buffer)
int remaining = buffer.remaining();
if (remaining < length)
{
headerBlockFragments.storeFragment(buffer, remaining, false);
ContinuationFrame frame = new ContinuationFrame(getStreamId(), false);
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_continuation_frame_rate");

if (!headerBlockFragments.storeFragment(buffer, remaining, false))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_continuation_stream");

length -= remaining;
break;
}
else
{
boolean last = hasFlag(Flags.END_HEADERS);
headerBlockFragments.storeFragment(buffer, length, last);
boolean endHeaders = hasFlag(Flags.END_HEADERS);
ContinuationFrame frame = new ContinuationFrame(getStreamId(), endHeaders);
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_continuation_frame_rate");

if (!headerBlockFragments.storeFragment(buffer, length, endHeaders))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_continuation_stream");

reset();
if (last)
if (endHeaders)
return onHeaders(buffer);
return true;
}
Expand All @@ -103,17 +115,21 @@ private boolean onHeaders(ByteBuffer buffer)
ByteBuffer headerBlock = headerBlockFragments.complete();
MetaData metaData = headerBlockParser.parse(headerBlock, headerBlock.remaining());
headerBlockFragments.getByteBufferPool().release(headerBlock);
if (metaData == null)
return true;
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, headerBlockFragments.getPriorityFrame(), headerBlockFragments.isEndStream());
headerBlockFragments.reset();

if (metaData == HeaderBlockParser.SESSION_FAILURE)
return false;
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, headerBlockFragments.getPriorityFrame(), headerBlockFragments.isEndStream());
if (metaData == HeaderBlockParser.STREAM_FAILURE)

if (metaData != HeaderBlockParser.STREAM_FAILURE)
{
notifyHeaders(frame);
}
else
{
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_continuation_frame_rate");
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
}
notifyHeaders(frame);
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,53 @@
public class HeaderBlockFragments
{
private final ByteBufferPool byteBufferPool;
private final int maxCapacity;
private PriorityFrame priorityFrame;
private boolean endStream;
private int streamId;
private boolean endStream;
private ByteBuffer storage;

@Deprecated
public HeaderBlockFragments(ByteBufferPool byteBufferPool)
{
this(byteBufferPool, 8192);
}

public HeaderBlockFragments(ByteBufferPool byteBufferPool, int maxCapacity)
{
this.byteBufferPool = byteBufferPool;
this.maxCapacity = maxCapacity;
}

public ByteBufferPool getByteBufferPool()
{
return byteBufferPool;
}

public void storeFragment(ByteBuffer fragment, int length, boolean last)
void reset()
{
priorityFrame = null;
streamId = 0;
endStream = false;
storage = null;
}

public boolean storeFragment(ByteBuffer fragment, int length, boolean last)
{
if (storage == null)
{
int space = last ? length : length * 2;
storage = byteBufferPool.acquire(space, fragment.isDirect());
if (length > maxCapacity)
return false;
int capacity = last ? length : length * 2;
storage = byteBufferPool.acquire(capacity, fragment.isDirect());
storage.clear();
}

// Grow the storage if necessary.
if (storage.remaining() < length)
{
if (storage.position() + length > maxCapacity)
return false;
int space = last ? length : length * 2;
int capacity = storage.position() + space;
ByteBuffer newStorage = byteBufferPool.acquire(capacity, storage.isDirect());
Expand All @@ -63,6 +83,7 @@ public void storeFragment(ByteBuffer fragment, int length, boolean last)
fragment.limit(fragment.position() + length);
storage.put(fragment);
fragment.limit(limit);
return true;
}

public PriorityFrame getPriorityFrame()
Expand All @@ -87,10 +108,8 @@ public void setEndStream(boolean endStream)

public ByteBuffer complete()
{
ByteBuffer result = storage;
storage = null;
result.flip();
return result;
storage.flip();
return storage;
}

public int getStreamId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,13 @@
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PriorityFrame;
import org.eclipse.jetty.util.BufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HeadersBodyParser extends BodyParser
{
private static final Logger LOG = LoggerFactory.getLogger(HeadersBodyParser.class);

private final HeaderBlockParser headerBlockParser;
private final HeaderBlockFragments headerBlockFragments;
private State state = State.PREPARE;
Expand Down Expand Up @@ -71,8 +75,15 @@ else if (hasFlag(Flags.END_HEADERS))
}
else
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
if (headerBlockFragments.getStreamId() != 0)
{
connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
}
else
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
}
}
}

Expand Down Expand Up @@ -167,6 +178,18 @@ else if (hasFlag(Flags.PRIORITY))
break;
}
case HEADERS:
{
if (!hasFlag(Flags.END_HEADERS))
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
if (hasFlag(Flags.PRIORITY))
headerBlockFragments.setPriorityFrame(new PriorityFrame(getStreamId(), parentStreamId, weight, exclusive));
}
state = State.HEADER_BLOCK;
break;
}
case HEADER_BLOCK:
{
if (hasFlag(Flags.END_HEADERS))
{
Expand All @@ -191,7 +214,7 @@ else if (hasFlag(Flags.PRIORITY))
{
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, null, isEndStream());
if (!rateControlOnEvent(frame))
connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
}
}
}
Expand All @@ -200,16 +223,14 @@ else if (hasFlag(Flags.PRIORITY))
int remaining = buffer.remaining();
if (remaining < length)
{
headerBlockFragments.storeFragment(buffer, remaining, false);
if (!headerBlockFragments.storeFragment(buffer, remaining, false))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
length -= remaining;
}
else
{
headerBlockFragments.setStreamId(getStreamId());
headerBlockFragments.setEndStream(isEndStream());
if (hasFlag(Flags.PRIORITY))
headerBlockFragments.setPriorityFrame(new PriorityFrame(getStreamId(), parentStreamId, weight, exclusive));
headerBlockFragments.storeFragment(buffer, length, false);
if (!headerBlockFragments.storeFragment(buffer, length, false))
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_headers_frame");
state = State.PADDING;
loop = paddingLength == 0;
}
Expand Down Expand Up @@ -253,6 +274,6 @@ private void onHeaders(HeadersFrame frame)

private enum State
{
PREPARE, PADDING_LENGTH, EXCLUSIVE, PARENT_STREAM_ID, PARENT_STREAM_ID_BYTES, WEIGHT, HEADERS, PADDING
PREPARE, PADDING_LENGTH, EXCLUSIVE, PARENT_STREAM_ID, PARENT_STREAM_ID_BYTES, WEIGHT, HEADERS, HEADER_BLOCK, PADDING
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void init(Listener listener)
this.listener = listener;
unknownBodyParser = new UnknownBodyParser(headerParser, listener);
HeaderBlockParser headerBlockParser = new HeaderBlockParser(headerParser, byteBufferPool, hpackDecoder, unknownBodyParser);
HeaderBlockFragments headerBlockFragments = new HeaderBlockFragments(byteBufferPool);
HeaderBlockFragments headerBlockFragments = new HeaderBlockFragments(byteBufferPool, hpackDecoder.getMaxHeaderListSize());
bodyParsers[FrameType.DATA.getType()] = new DataBodyParser(headerParser, listener);
bodyParsers[FrameType.HEADERS.getType()] = new HeadersBodyParser(headerParser, listener, headerBlockParser, headerBlockFragments);
bodyParsers[FrameType.PRIORITY.getType()] = new PriorityBodyParser(headerParser, listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.Flags;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;

public class PushPromiseBodyParser extends BodyParser
Expand Down Expand Up @@ -65,13 +66,9 @@ public boolean parse(ByteBuffer buffer)
length = getBodyLength();

if (isPadding())
{
state = State.PADDING_LENGTH;
}
else
{
state = State.STREAM_ID;
}
break;
}
case PADDING_LENGTH:
Expand Down Expand Up @@ -131,7 +128,15 @@ public boolean parse(ByteBuffer buffer)
state = State.PADDING;
loop = paddingLength == 0;
if (metaData != HeaderBlockParser.STREAM_FAILURE)
{
onPushPromise(streamId, metaData);
}
else
{
HeadersFrame frame = new HeadersFrame(getStreamId(), metaData, null, isEndStream());
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_headers_frame_rate");
}
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public boolean parse(ByteBuffer buffer)
{
if (buffer.remaining() >= 4)
{
return onReset(buffer.getInt());
return onReset(buffer, buffer.getInt());
}
else
{
Expand All @@ -73,7 +73,7 @@ public boolean parse(ByteBuffer buffer)
--cursor;
error += currByte << (8 * cursor);
if (cursor == 0)
return onReset(error);
return onReset(buffer, error);
break;
}
default:
Expand All @@ -85,9 +85,11 @@ public boolean parse(ByteBuffer buffer)
return false;
}

private boolean onReset(int error)
private boolean onReset(ByteBuffer buffer, int error)
{
ResetFrame frame = new ResetFrame(getStreamId(), error);
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_rst_stream_frame_rate");
reset();
notifyReset(frame);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ protected void emptyBody(ByteBuffer buffer)
return;
boolean isReply = hasFlag(Flags.ACK);
SettingsFrame frame = new SettingsFrame(Collections.emptyMap(), isReply);
if (!isReply && !rateControlOnEvent(frame))
connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_settings_frame_rate");
else
onSettings(frame);
onSettings(buffer, frame);
}

private boolean validateFrame(ByteBuffer buffer, int streamId, int bodyLength)
Expand Down Expand Up @@ -218,11 +215,13 @@ protected boolean onSettings(ByteBuffer buffer, Map<Integer, Integer> settings)
return connectionFailure(buffer, ErrorCode.PROTOCOL_ERROR.code, "invalid_settings_max_frame_size");

SettingsFrame frame = new SettingsFrame(settings, hasFlag(Flags.ACK));
return onSettings(frame);
return onSettings(buffer, frame);
}

private boolean onSettings(SettingsFrame frame)
private boolean onSettings(ByteBuffer buffer, SettingsFrame frame)
{
if (!rateControlOnEvent(frame))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_settings_frame_rate");
reset();
notifySettings(frame);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public boolean parse(ByteBuffer buffer)
boolean parsed = cursor == 0;
if (parsed && !rateControlOnEvent(new UnknownFrame(getFrameType())))
return connectionFailure(buffer, ErrorCode.ENHANCE_YOUR_CALM_ERROR.code, "invalid_unknown_frame_rate");

return parsed;
}

Expand Down
Loading

0 comments on commit dbb9451

Please sign in to comment.