Skip to content

Commit

Permalink
HTTP-42-BatchRequest - add support for batch request processing in HT…
Browse files Browse the repository at this point in the history
…TP sink #9 tests

Signed-off-by: Krzysztof Chmielewski <[email protected]>
  • Loading branch information
kristoffSC committed Jun 30, 2023
1 parent bbcb439 commit 8996221
Show file tree
Hide file tree
Showing 18 changed files with 106 additions and 67 deletions.
3 changes: 2 additions & 1 deletion src/main/java/com/getindata/connectors/http/HttpSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
import com.getindata.connectors.http.internal.sink.HttpSinkInternal;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;

/**
* A public implementation for {@code HttpSink} that performs async requests against a specified
Expand Down Expand Up @@ -41,7 +42,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
String endpointUrl,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
SinkHttpClientBuilder sinkHttpClientBuilder,
Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
import com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallback;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
Expand Down Expand Up @@ -62,7 +63,7 @@ public class HttpSinkBuilder<InputT> extends

private static final SinkHttpClientBuilder DEFAULT_CLIENT_BUILDER = JavaNetSinkHttpClient::new;

private static final HttpPostRequestCallback<HttpSinkRequestEntry>
private static final HttpPostRequestCallback<HttpRequest>
DEFAULT_POST_REQUEST_CALLBACK = new Slf4jHttpPostRequestCallback();

private static final HeaderPreprocessor DEFAULT_HEADER_PREPROCESSOR =
Expand All @@ -80,7 +81,7 @@ public class HttpSinkBuilder<InputT> extends
private SinkHttpClientBuilder sinkHttpClientBuilder;

// If not defined, should be set to DEFAULT_POST_REQUEST_CALLBACK
private HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
private HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;

// If not defined, should be set to DEFAULT_HEADER_PREPROCESSOR
private HeaderPreprocessor headerPreprocessor;
Expand Down Expand Up @@ -138,7 +139,7 @@ public HttpSinkBuilder<InputT> setElementConverter(
}

public HttpSinkBuilder<InputT> setHttpPostRequestCallback(
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback) {
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback) {
this.httpPostRequestCallback = httpPostRequestCallback;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import org.apache.flink.annotation.PublicEvolving;

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.RequestSubmitterFactory;

/**
Expand All @@ -19,8 +19,9 @@ public interface SinkHttpClientBuilder extends Serializable {
// SinkHttpClientBuilder fields. This method is getting more and more arguments.
SinkHttpClient build(
Properties properties,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
RequestSubmitterFactory requestSubmitterFactory

);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.ToString;

import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;

/**
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
Expand All @@ -20,11 +21,11 @@ public class SinkHttpClientResponse {
* A list of successfully written requests.
*/
@NonNull
private final List<HttpSinkRequestEntry> successfulRequests;
private final List<HttpRequest> successfulRequests;

/**
* A list of requests that {@link SinkHttpClient} failed to write.
*/
@NonNull
private final List<HttpSinkRequestEntry> failedRequests;
private final List<HttpRequest> failedRequests;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.config.SinkRequestSubmitMode;
import com.getindata.connectors.http.internal.sink.httpclient.BatchRequestSubmitterFactory;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.PerRequestRequestSubmitterFactory;
import com.getindata.connectors.http.internal.sink.httpclient.RequestSubmitterFactory;

Expand Down Expand Up @@ -64,7 +65,7 @@ public class HttpSinkInternal<InputT> extends AsyncSinkBase<InputT, HttpSinkRequ
// makes it possible to serialize `HttpSink`
private final SinkHttpClientBuilder sinkHttpClientBuilder;

private final HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
private final HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;

private final HeaderPreprocessor headerPreprocessor;

Expand All @@ -79,7 +80,7 @@ protected HttpSinkInternal(
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
String endpointUrl,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
SinkHttpClientBuilder sinkHttpClientBuilder,
Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublisher;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
Expand All @@ -25,6 +24,12 @@
@Slf4j
public class BatchRequestSubmitter extends AbstractRequestSubmitter {

private static final byte[] BATCH_START_BYTES = "[".getBytes(StandardCharsets.UTF_8);

private static final byte[] BATCH_END_BYTES = "]".getBytes(StandardCharsets.UTF_8);

private static final byte[] BATCH_ELEMENT_DELIM_BYTES = ",".getBytes(StandardCharsets.UTF_8);

private final int httpReqeustBatchSize;

public BatchRequestSubmitter(
Expand Down Expand Up @@ -87,10 +92,10 @@ private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
String endpointUrl,
List<HttpSinkRequestEntry> reqeustBatch) {

var endpointUri = URI.create(endpointUrl);
HttpRequest httpRequest = buildHttpRequest(reqeustBatch, URI.create(endpointUrl));
return httpClient
.sendAsync(
buildHttpRequest(reqeustBatch, endpointUri),
httpRequest.getHttpRequest(),
HttpResponse.BodyHandlers.ofString())
.exceptionally(ex -> {
// TODO This will be executed on a ForkJoinPool Thread... refactor this someday.
Expand All @@ -99,7 +104,7 @@ private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
})
.thenApplyAsync(
// TODO HTTP-42
res -> new JavaNetHttpResponseWrapper(reqeustBatch.get(0), res),
res -> new JavaNetHttpResponseWrapper(httpRequest, res),
publishingThreadPool
);
}
Expand All @@ -115,15 +120,15 @@ private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, UR
// into the HTTP body without any context.
// What we do here is we pack every Json/byteArray into Json Array hence '[' and ']'
// at the end, and we separate every element with comma.
elements.add("[".getBytes(StandardCharsets.UTF_8));
elements.add(BATCH_START_BYTES);
for (HttpSinkRequestEntry entry : reqeustBatch) {
elements.add(entry.element);
elements.add(",".getBytes(StandardCharsets.UTF_8));
elements.add(BATCH_ELEMENT_DELIM_BYTES);
}
elements.set(elements.size() - 1, "]".getBytes(StandardCharsets.UTF_8));
elements.set(elements.size() - 1, BATCH_END_BYTES);
publisher = BodyPublishers.ofByteArrays(elements);

Builder requestBuilder = HttpRequest
Builder requestBuilder = java.net.http.HttpRequest
.newBuilder()
.uri(endpointUri)
.version(Version.HTTP_1_1)
Expand All @@ -134,7 +139,7 @@ private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, UR
requestBuilder.headers(headersAndValues);
}

return requestBuilder.build();
return new HttpRequest(requestBuilder.build(), elements, method);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.util.List;

import lombok.Data;

@Data
public class HttpRequest {

public final java.net.http.HttpRequest httpRequest;

public final List<byte[]> elements;

public final String method;

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ final class JavaNetHttpResponseWrapper {
* A representation of a single {@link com.getindata.connectors.http.HttpSink} request.
*/
@NonNull
private final HttpSinkRequestEntry sinkRequestEntry;
private final HttpRequest httpRequest;

/**
* A response to an HTTP request based on {@link HttpSinkRequestEntry}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {

private final HttpStatusCodeChecker statusCodeChecker;

private final HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
private final HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;

private final RequestSubmitter requestSubmitter;

public JavaNetSinkHttpClient(
Properties properties,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
RequestSubmitterFactory requestSubmitterFactory) {

Expand Down Expand Up @@ -92,11 +92,11 @@ private CompletableFuture<List<JavaNetHttpResponseWrapper>> submitRequests(
private SinkHttpClientResponse prepareSinkHttpClientResponse(
List<JavaNetHttpResponseWrapper> responses,
String endpointUrl) {
var successfulResponses = new ArrayList<HttpSinkRequestEntry>();
var failedResponses = new ArrayList<HttpSinkRequestEntry>();
var successfulResponses = new ArrayList<HttpRequest>();
var failedResponses = new ArrayList<HttpRequest>();

for (var response : responses) {
var sinkRequestEntry = response.getSinkRequestEntry();
var sinkRequestEntry = response.getHttpRequest();
var optResponse = response.getResponse();

httpPostRequestCallback.call(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
import java.net.http.HttpResponse;
Expand Down Expand Up @@ -37,17 +36,18 @@ public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
var responseFutures = new ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();

for (var entry : requestToSubmit) {
HttpRequest httpRequest = buildHttpRequest(entry, endpointUri);
var response = httpClient
.sendAsync(
buildHttpRequest(entry, endpointUri),
httpRequest.getHttpRequest(),
HttpResponse.BodyHandlers.ofString())
.exceptionally(ex -> {
// TODO This will be executed on a ForkJoinPool Thread... refactor this someday.
log.error("Request fatally failed because of an exception", ex);
return null;
})
.thenApplyAsync(
res -> new JavaNetHttpResponseWrapper(entry, res),
res -> new JavaNetHttpResponseWrapper(httpRequest, res),
publishingThreadPool
);
responseFutures.add(response);
Expand All @@ -56,7 +56,7 @@ public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
}

private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endpointUri) {
Builder requestBuilder = HttpRequest
Builder requestBuilder = java.net.http.HttpRequest
.newBuilder()
.uri(endpointUri)
.version(Version.HTTP_1_1)
Expand All @@ -68,6 +68,10 @@ private HttpRequest buildHttpRequest(HttpSinkRequestEntry requestEntry, URI endp
requestBuilder.headers(headersAndValues);
}

return requestBuilder.build();
return new HttpRequest(
requestBuilder.build(),
List.of(requestEntry.element),
requestEntry.method
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.getindata.connectors.http.HttpSink;
import com.getindata.connectors.http.HttpSinkBuilder;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
import com.getindata.connectors.http.internal.table.SerializationSchemaElementConverter;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
Expand Down Expand Up @@ -79,7 +80,7 @@ public class HttpDynamicSink extends AsyncDynamicTableSink<HttpSinkRequestEntry>

private final EncodingFormat<SerializationSchema<RowData>> encodingFormat;

private final HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
private final HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;

private final ReadableConfig tableOptions;

Expand All @@ -93,7 +94,7 @@ protected HttpDynamicSink(
@Nullable Long maxTimeInBufferMS,
DataType consumedDataType,
EncodingFormat<SerializationSchema<RowData>> encodingFormat,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
ReadableConfig tableOptions,
Properties properties) {
super(maxBatchSize, maxInFlightRequests, maxBufferedRequests, maxBufferSizeInBytes,
Expand Down Expand Up @@ -172,7 +173,7 @@ public static class HttpDynamicTableSinkBuilder

private EncodingFormat<SerializationSchema<RowData>> encodingFormat;

private HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
private HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;

/**
* @param tableOptions the {@link ReadableConfig} consisting of options listed in table
Expand Down Expand Up @@ -200,7 +201,7 @@ public HttpDynamicTableSinkBuilder setEncodingFormat(
* @return {@link HttpDynamicTableSinkBuilder} itself
*/
public HttpDynamicTableSinkBuilder setHttpPostRequestCallback(
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback) {
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback) {
this.httpPostRequestCallback = httpPostRequestCallback;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

import com.getindata.connectors.http.HttpPostRequestCallbackFactory;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.utils.ConfigUtils;
import static com.getindata.connectors.http.internal.table.sink.HttpDynamicSinkConnectorOptions.*;

Expand Down Expand Up @@ -43,7 +43,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
new AsyncSinkConfigurationValidator(tableOptions).getValidatedConfigurations();

// generics type erasure, so we have to do an unchecked cast
final HttpPostRequestCallbackFactory<HttpSinkRequestEntry> postRequestCallbackFactory =
final HttpPostRequestCallbackFactory<HttpRequest> postRequestCallbackFactory =
FactoryUtil.discoverFactory(
context.getClassLoader(),
HttpPostRequestCallbackFactory.class, // generics type erasure
Expand Down
Loading

0 comments on commit 8996221

Please sign in to comment.