Skip to content

Commit

Permalink
Fixes #4967 - Possible buffer corruption in HTTP/2 session failures
Browse files Browse the repository at this point in the history
Partially reverted the changes introduced in #4855, because they
were working only when sends were synchronous.

Introduced ByteBufferPool.remove(ByteBuffer) to fix the issue.
Now when a concurrent failure happens while frames are being
generated or sent, the buffer is discarded instead of being
recycled, therefore resolving the buffer corruption.

Signed-off-by: Simone Bordet <[email protected]>
  • Loading branch information
sbordet committed Jun 26, 2020
1 parent 04ebc00 commit 8cec990
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 270 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public void dispose() throws Exception
{
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Server BufferPool - leaked removes", serverBufferPool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
}

Expand All @@ -101,6 +102,7 @@ public void dispose() throws Exception
LeakTrackingByteBufferPool pool = (LeakTrackingByteBufferPool)clientBufferPool;
assertThat("Client BufferPool - leaked acquires", pool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Client BufferPool - leaked releases", pool.getLeakedReleases(), Matchers.is(0L));
assertThat("Client BufferPool - leaked removes", pool.getLeakedRemoves(), Matchers.is(0L));
assertThat("Client BufferPool - unreleased", pool.getLeakedResources(), Matchers.is(0L));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,7 @@ private boolean isStale()
// It's an application frame; is the stream gone already?
if (stream == null)
return true;
return stream.isReset();
return stream.isResetOrFailed();
}

private boolean isProtocolFrame(Frame frame)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.IdleTimeout;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
Expand All @@ -54,16 +55,17 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private final AtomicReference<Object> attachment = new AtomicReference<>();
private final AtomicReference<ConcurrentMap<String, Object>> attributes = new AtomicReference<>();
private final AtomicReference<CloseState> closeState = new AtomicReference<>(CloseState.NOT_CLOSED);
private final AtomicReference<Callback> writing = new AtomicReference<>();
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final long timeStamp = System.nanoTime();
private final ISession session;
private final int streamId;
private final boolean local;
private Callback sendCallback;
private Throwable failure;
private boolean localReset;
private Listener listener;
private boolean remoteReset;
private Listener listener;
private long dataLength;

public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local)
Expand Down Expand Up @@ -128,17 +130,31 @@ public void data(DataFrame frame, Callback callback)
@Override
public void reset(ResetFrame frame, Callback callback)
{
if (isReset())
return;
localReset = true;
synchronized (this)
{
if (isReset())
return;
localReset = true;
failure = new EOFException("reset");
}
session.frames(this, callback, frame, Frame.EMPTY_ARRAY);
}

private boolean startWrite(Callback callback)
{
if (writing.compareAndSet(null, callback))
return true;
callback.failed(new WritePendingException());
Throwable failure;
synchronized (this)
{
failure = this.failure;
if (failure == null && sendCallback == null)
{
sendCallback = callback;
return true;
}
}
if (failure == null)
failure = new WritePendingException();
callback.failed(failure);
return false;
}

Expand All @@ -163,7 +179,27 @@ public Object removeAttribute(String key)
@Override
public boolean isReset()
{
return localReset || remoteReset;
synchronized (this)
{
return localReset || remoteReset;
}
}

private boolean isFailed()
{
synchronized (this)
{
return failure != null;
}
}

@Override
public boolean isResetOrFailed()
{
synchronized (this)
{
return isReset() || isFailed();
}
}

@Override
Expand Down Expand Up @@ -336,7 +372,11 @@ private void onData(DataFrame frame, Callback callback)

private void onReset(ResetFrame frame, Callback callback)
{
remoteReset = true;
synchronized (this)
{
remoteReset = true;
failure = new EofException("reset");
}
close();
session.removeStream(this);
notifyReset(this, frame, callback);
Expand All @@ -357,8 +397,12 @@ private void onWindowUpdate(WindowUpdateFrame frame, Callback callback)

private void onFailure(FailureFrame frame, Callback callback)
{
// Don't close or remove the stream, as the listener may
// want to use it, for example to send a RST_STREAM frame.
synchronized (this)
{
failure = frame.getFailure();
}
close();
session.removeStream(this);
notifyFailure(this, frame, callback);
}

Expand Down Expand Up @@ -541,7 +585,12 @@ public void failed(Throwable x)

private Callback endWrite()
{
return writing.getAndSet(null);
synchronized (this)
{
Callback callback = sendCallback;
sendCallback = null;
return callback;
}
}

private void notifyData(Stream stream, DataFrame frame, Callback callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,11 @@ public interface IStream extends Stream, Closeable
* @see #isClosed()
*/
boolean isRemotelyClosed();

/**
* @return whether this stream has been reset (locally or remotely) or has been failed
* @see #isReset()
* @see Listener#onFailure(Stream, int, String, Throwable, Callback)
*/
boolean isResetOrFailed();
}
Loading

0 comments on commit 8cec990

Please sign in to comment.