Skip to content

Commit

Permalink
Refine how Request / Channel / Stream completion works (#9684)
Browse files Browse the repository at this point in the history
Fixes #9684
+ Restore LargeHeaderTest Test
+ Fix Bad Content-Length produced if write + error occurs
+ Fix race between callback failure and error handling failure
+ Introduce new ResponseCompleteTest to attempt to capture complete race issue
+ More DEBUG + ignoring failure for completeStream
+ Removed ErrorResponse in favour or an errorMode

---------

Signed-off-by: Simone Bordet <[email protected]>
Co-authored-by: gregw <[email protected]>
Co-authored-by: Simone Bordet <[email protected]>
  • Loading branch information
3 people authored May 15, 2023
1 parent 11d1f65 commit 2fdcae4
Show file tree
Hide file tree
Showing 18 changed files with 1,384 additions and 453 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.jetty.http;

import java.nio.ByteBuffer;
import java.util.EnumSet;

import org.eclipse.jetty.util.Index;
import org.eclipse.jetty.util.StringUtil;
Expand Down Expand Up @@ -132,6 +133,18 @@ public enum HttpHeader
C_STATUS(":status", true),
C_PROTOCOL(":protocol");

public static final EnumSet<HttpHeader> CONTENT_HEADERS;
public static final EnumSet<HttpHeader> CONTENT_HEADERS_304;

static
{
CONTENT_HEADERS = EnumSet.of(
CONTENT_TYPE, CONTENT_LENGTH, CONTENT_ENCODING, CONTENT_LANGUAGE, CONTENT_RANGE, CONTENT_MD5, CONTENT_LOCATION, TRANSFER_ENCODING, CACHE_CONTROL, LAST_MODIFIED, EXPIRES, VARY, ETAG
);
CONTENT_HEADERS_304 = EnumSet.copyOf(CONTENT_HEADERS);
CONTENT_HEADERS_304.remove(ETAG);
}

public static final Index<HttpHeader> CACHE = new Index.Builder<HttpHeader>()
.caseSensitive(false)
.withAll(HttpHeader.values(), HttpHeader::toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ public void succeeded()
});

// Wait for WINDOW_UPDATEs to be processed by the client.
await().atMost(1000, TimeUnit.SECONDS).until(() -> ((HTTP2Session)client).updateSendWindow(0), Matchers.greaterThan(0));
await().atMost(5, TimeUnit.SECONDS).until(() -> ((HTTP2Session)client).updateSendWindow(0), Matchers.greaterThan(0));

latch.set(new CountDownLatch(2 * streams.size()));
// Notify all blocked threads to wakeup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ default void push(MetaData.Request resource)
/**
* <p>Adds a listener for asynchronous errors.</p>
* <p>The listener is a predicate function that should return {@code true} to indicate
* that the function has completed (either successfully or with a failure) the callback
* that the function will complete (either successfully or with a failure) the callback
* received from {@link org.eclipse.jetty.server.Handler#handle(Request, Response, Callback)}, or
* {@code false} otherwise.</p>
* <p>Listeners are processed in sequence, and the first that returns {@code true}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.eclipse.jetty.http.QuotedQualityCSV;
import org.eclipse.jetty.io.ByteBufferOutputStream;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.Retainable;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
Expand Down Expand Up @@ -86,6 +87,8 @@ public boolean errorPageForMethod(String method)
@Override
public boolean handle(Request request, Response response, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("handle({}, {}, {})", request, response, callback);
if (_cacheControl != null)
response.getHeaders().put(_cacheControl);

Expand Down Expand Up @@ -256,22 +259,7 @@ else if (charsets.contains(StandardCharsets.ISO_8859_1))
}

response.getHeaders().put(type.getContentTypeField(charset));
response.write(true, buffer.getByteBuffer(), new Callback.Nested(callback)
{
@Override
public void succeeded()
{
buffer.release();
super.succeeded();
}

@Override
public void failed(Throwable x)
{
buffer.release();
super.failed(x);
}
});
response.write(true, buffer.getByteBuffer(), new WriteErrorCallback(callback, buffer));

return true;
}
Expand Down Expand Up @@ -576,5 +564,34 @@ public Set<String> getAttributeNameSet()
names.add(ERROR_EXCEPTION);
return names;
}

@Override
public String toString()
{
return "%s@%x:%s".formatted(getClass().getSimpleName(), hashCode(), getWrapped());
}
}

/**
* The callback used by
* {@link ErrorHandler#generateAcceptableResponse(Request, Response, Callback, String, List, int, String, Throwable)}
* when calling {@link Response#write(boolean, ByteBuffer, Callback)} to wrap the passed in {@link Callback}
* so that the {@link RetainableByteBuffer} used can be released.
*/
private static class WriteErrorCallback extends Callback.Nested
{
private final Retainable _retainable;

public WriteErrorCallback(Callback callback, Retainable retainable)
{
super(callback);
_retainable = retainable;
}

@Override
public void completed()
{
_retainable.release();
}
}
}
Loading

0 comments on commit 2fdcae4

Please sign in to comment.