Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Converted writeTrailers to a static method #8940

Merged
merged 9 commits into from
Nov 29, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ public void process(Request request, Response response, Callback callback)
{
HttpFields.Mutable trailers = HttpFields.build();
response.setTrailersSupplier(() -> trailers);
Content.copy(request, response, response::writeTrailers, callback);
Content.copy(request, response, Response.newTrailersChunkProcessor(response), callback);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.function.BiPredicate;
import java.util.function.Consumer;

import org.eclipse.jetty.io.content.ContentSinkOutputStream;
Expand Down Expand Up @@ -80,12 +79,12 @@ public static void copy(Source source, Sink sink, Callback callback)
*
* @param source the source to copy from
* @param sink the sink to copy to
* @param chunkHandler a (possibly {@code null}) predicate to handle the current chunk and its callback
* @param chunkProcessor a (possibly {@code null}) predicate to handle the current chunk and its callback
* @param callback the callback to notify when the copy is complete
*/
public static void copy(Source source, Sink sink, BiPredicate<Chunk, Callback> chunkHandler, Callback callback)
public static void copy(Source source, Sink sink, Chunk.Processor chunkProcessor, Callback callback)
{
new ContentCopier(source, sink, chunkHandler, callback).iterate();
new ContentCopier(source, sink, chunkProcessor, callback).iterate();
}

/**
Expand Down Expand Up @@ -683,5 +682,21 @@ public boolean release()
return true;
}
}

/**
* <p>Implementations of this interface may process {@link Chunk}s being copied by the
* {@link Content#copy(Source, Sink, Processor, Callback)} method, so that
* {@link Chunk}s of unknown types can be copied.
* @see Content#copy(Source, Sink, Processor, Callback)
*/
interface Processor
{
/**
* @param chunk The chunk to be considered for processing.
* @param callback The callback that will be called once the accepted chunk is processed.
* @return True if the chunk will be process and the callback will be called (or may have already been called), false otherwise.
*/
boolean process(Chunk chunk, Callback callback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

package org.eclipse.jetty.io.internal;

import java.util.function.BiPredicate;

import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingNestedCallback;
Expand All @@ -23,16 +21,16 @@ public class ContentCopier extends IteratingNestedCallback
{
private final Content.Source source;
private final Content.Sink sink;
private final BiPredicate<Content.Chunk, Callback> chunkHandler;
private final Content.Chunk.Processor chunkProcessor;
private Content.Chunk current;
private boolean terminated;

public ContentCopier(Content.Source source, Content.Sink sink, BiPredicate<Content.Chunk, Callback> chunkHandler, Callback callback)
public ContentCopier(Content.Source source, Content.Sink sink, Content.Chunk.Processor chunkProcessor, Callback callback)
{
super(callback);
this.source = source;
this.sink = sink;
this.chunkHandler = chunkHandler;
this.chunkProcessor = chunkProcessor;
}

@Override
Expand All @@ -55,7 +53,7 @@ protected Action process() throws Throwable
return Action.IDLE;
}

if (chunkHandler != null && chunkHandler.test(current, this))
if (chunkProcessor != null && chunkProcessor.process(current, this))
return Action.SCHEDULED;

if (current instanceof Error error)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,49 @@ public interface Response extends Content.Sink

CompletableFuture<Void> writeInterim(int status, HttpFields headers);

// TODO: make it static, otherwise we must override it in Wrapper.
default boolean writeTrailers(Content.Chunk chunk, Callback ignored)
/**
* <p>Return chunk processor suitable to be passed to the
gregw marked this conversation as resolved.
Show resolved Hide resolved
* {@link Content#copy(Content.Source, Content.Sink, Content.Chunk.Processor, Callback)}
* method, that will handles {@link Trailers} chunks
gregw marked this conversation as resolved.
Show resolved Hide resolved
* by adding the their fields to the {@link HttpFields} supplied by
gregw marked this conversation as resolved.
Show resolved Hide resolved
* {@link Response#getTrailersSupplier()}.</p>
* <p>This is specifically useful for writing trailer that have been received via
gregw marked this conversation as resolved.
Show resolved Hide resolved
* the {@link Content.Source#read()} API, for example when echoing a request to a response:</p>
* <pre>
* Content.copy(request, response, Response.asTrailerChunkHandler(response), callback);
* </pre>
* @param response The response for which to process trailer chunks.
gregw marked this conversation as resolved.
Show resolved Hide resolved
* If the {@link Response#setTrailersSupplier(Supplier)}
* method has not been called prior to this method, then a noop processor is returned.
* @return A chunk processor that will add trailer chunks to the response's trailer supplied fields.
* @see Content#copy(Content.Source, Content.Sink, Content.Chunk.Processor, Callback)
* @see Trailers
*/
static Content.Chunk.Processor newTrailersChunkProcessor(Response response)
{
if (chunk instanceof Trailers trailers)
Supplier<HttpFields> supplier = response.getTrailersSupplier();
if (supplier == null)
return (chunk, callback) -> false;

return (chunk, callback) ->
{
HttpFields requestTrailers = trailers.getTrailers();
if (requestTrailers != null)
if (chunk instanceof Trailers trailers)
{
Supplier<HttpFields> supplier = getTrailersSupplier();
if (supplier != null)
HttpFields requestTrailers = trailers.getTrailers();
if (requestTrailers != null)
{
// Call supplier in lambda to get latest responseTrailers
HttpFields responseTrailers = supplier.get();
if (responseTrailers instanceof HttpFields.Mutable mutable)
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
mutable.add(requestTrailers);
callback.succeeded();
return true;
}
}
}
}
return false;
return false;
};
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void process(Request request, Response response, Callback callback)
response.getHeaders().putLongField(HttpHeader.CONTENT_LENGTH, contentLength);

if (contentLength > 0 || contentLength == -1 && request.getHeaders().contains(HttpHeader.TRANSFER_ENCODING))
Content.copy(request, response, response::writeTrailers, callback);
Content.copy(request, response, Response.newTrailersChunkProcessor(response), callback);
else
callback.succeeded();
}
Expand Down