Skip to content

Commit

Permalink
HTTP-42-BatchRequest - add support for batch request processing in HT…
Browse files Browse the repository at this point in the history
…TP sink #4

Signed-off-by: Krzysztof Chmielewski <[email protected]>
  • Loading branch information
kristoffSC committed Jun 20, 2023
1 parent 8c314fe commit 9467a83
Show file tree
Hide file tree
Showing 31 changed files with 300 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,20 @@

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.RequestSubmitterFactory;

/**
* Builder building {@link SinkHttpClient}.
*/
@PublicEvolving
public interface SinkHttpClientBuilder extends Serializable {

// TODO Consider moving HttpPostRequestCallback and HeaderPreprocessor to be a
// TODO Consider moving HttpPostRequestCallback and HeaderPreprocessor, RequestSubmitter to be a
// SinkHttpClientBuilder fields. This method is getting more and more arguments.
SinkHttpClient build(
Properties properties,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor
HeaderPreprocessor headerPreprocessor,
RequestSubmitterFactory requestSubmitterFactory
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
*/
@UtilityClass
@NoArgsConstructor(access = AccessLevel.NONE)
public final class HttpConnectorConfigConstants {
public final class HttpConnectorConfigProperties {

public static final String PROP_DELIM = ",";

Expand Down Expand Up @@ -74,9 +74,6 @@ public final class HttpConnectorConfigConstants {
public static final String SINK_HTTP_TIMEOUT_SECONDS =
GID_CONNECTOR_HTTP + "sink.request.timeout";

public static final String SINK_HTTP_BATCH_REQUEST_SIZE =
GID_CONNECTOR_HTTP + "sink.request.batch.size";

public static final String LOOKUP_HTTP_PULING_THREAD_POOL_SIZE =
GID_CONNECTOR_HTTP + "source.lookup.request.thread-pool.size";

Expand All @@ -87,4 +84,14 @@ public final class HttpConnectorConfigConstants {
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";

// -----------------------------------------------------


// ------ Sink reqeust submitter settings ------
public static final String SINK_HTTP_REQUEST_MODE =
GID_CONNECTOR_HTTP + "sink.writer.request.mode";

public static final String SINK_HTTP_BATCH_REQUEST_SIZE =
GID_CONNECTOR_HTTP + "sink.request.batch.size";

// ---------------------------------------------
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.getindata.connectors.http.internal.config;

public enum SinkRequestSubmitMode {

PER_REQUEST("PerRequest"),
BATCH("Batch");

private final String mode;

SinkRequestSubmitMode(String mode) {
this.mode = mode;
}

public String getMode() {
return mode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
import com.getindata.connectors.http.SchemaLifecycleAwareElementConverter;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigProperties;
import com.getindata.connectors.http.internal.config.SinkRequestSubmitMode;
import com.getindata.connectors.http.internal.sink.httpclient.BatchRequestSubmitterFactory;
import com.getindata.connectors.http.internal.sink.httpclient.PerRequestRequestSubmitterFactory;
import com.getindata.connectors.http.internal.sink.httpclient.RequestSubmitterFactory;

/**
* An internal implementation of HTTP Sink that performs async requests against a specified HTTP
Expand Down Expand Up @@ -127,7 +132,12 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
endpointUrl,
sinkHttpClientBuilder.build(properties, httpPostRequestCallback, headerPreprocessor),
sinkHttpClientBuilder.build(
properties,
httpPostRequestCallback,
headerPreprocessor,
getRequestSubmitterFactory()
),
Collections.emptyList(),
properties
);
Expand All @@ -149,15 +159,29 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
endpointUrl,
sinkHttpClientBuilder.build(properties, httpPostRequestCallback, headerPreprocessor),
sinkHttpClientBuilder.build(
properties,
httpPostRequestCallback,
headerPreprocessor,
getRequestSubmitterFactory()
),
recoveredState,
properties
);
}

@Override
public SimpleVersionedSerializer<BufferedRequestState<HttpSinkRequestEntry>>
getWriterStateSerializer() {
getWriterStateSerializer() {
return new HttpSinkWriterStateSerializer();
}

private RequestSubmitterFactory getRequestSubmitterFactory() {

if (SinkRequestSubmitMode.PER_REQUEST.getMode().equalsIgnoreCase(
properties.getProperty(HttpConnectorConfigProperties.SINK_HTTP_REQUEST_MODE))) {
return new PerRequestRequestSubmitterFactory();
}
return new BatchRequestSubmitterFactory(getMaxBatchSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigProperties;
import com.getindata.connectors.http.internal.utils.ThreadUtils;

/**
Expand Down Expand Up @@ -68,7 +68,7 @@ public HttpSinkWriter(
this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter();

int sinkWriterThreadPollSize = Integer.parseInt(properties.getProperty(
HttpConnectorConfigConstants.SINK_HTTP_WRITER_THREAD_POOL_SIZE,
HttpConnectorConfigProperties.SINK_HTTP_WRITER_THREAD_POOL_SIZE,
HTTP_SINK_WRITER_THREAD_POOL_SIZE
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigProperties;
import com.getindata.connectors.http.internal.utils.JavaNetHttpClientFactory;
import com.getindata.connectors.http.internal.utils.ThreadUtils;

Expand Down Expand Up @@ -38,7 +38,7 @@ public AbstractRequestSubmitter(Properties properties, String[] headersAndValues
"http-sink-client-response-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));

this.httpRequestTimeOutSeconds = Integer.parseInt(
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_TIMEOUT_SECONDS,
properties.getProperty(HttpConnectorConfigProperties.SINK_HTTP_TIMEOUT_SECONDS,
DEFAULT_REQUEST_TIMEOUT_SECONDS)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,19 @@

import lombok.extern.slf4j.Slf4j;

import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigProperties;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;

@Slf4j
public class BatchRequestSubmitter extends AbstractRequestSubmitter {

// TODO HTTP-42 we MUST use maxBatchSize from HttpSink here
protected static final String DEFAULT_HTTP_BATCH_REQUEST_SIZE = "20";

private final int httpReqeustBatchSize;

public BatchRequestSubmitter(Properties properties, String[] headersAndValue) {
super(properties, headersAndValue);

this.httpReqeustBatchSize = Integer.parseInt(
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_BATCH_REQUEST_SIZE,
DEFAULT_HTTP_BATCH_REQUEST_SIZE)
properties.getProperty(HttpConnectorConfigProperties.SINK_HTTP_BATCH_REQUEST_SIZE)
);
}

Expand Down Expand Up @@ -63,16 +59,16 @@ public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
}

private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
String endpointUrl,
List<HttpSinkRequestEntry> reqeustBatch) {
String endpointUrl,
List<HttpSinkRequestEntry> reqeustBatch) {

var endpointUri = URI.create(endpointUrl);
return httpClient
.sendAsync(
buildHttpRequest(reqeustBatch, endpointUri),
HttpResponse.BodyHandlers.ofString())
.exceptionally(ex -> {
// TODO This will be executed on a ForJoinPool Thread... refactor this someday.
// TODO This will be executed on a ForkJoinPool Thread... refactor this someday.
log.error("Request fatally failed because of an exception", ex);
return null;
})
Expand All @@ -90,21 +86,17 @@ private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, UR
List<byte[]> elements = new ArrayList<>(reqeustBatch.size());

BodyPublisher publisher;
if (reqeustBatch.size() > 1) {
// Buy default, Java's BodyPublishers.ofByteArrays(elements) will just put Jsons
// into the HTTP body without any context.
// What we do here is we pack every Json/byteArray into Json Array hence '[' and ']'
// at the end, and we separate every element with comma.
elements.add("[".getBytes(StandardCharsets.UTF_8));
for (HttpSinkRequestEntry entry : reqeustBatch) {
elements.add(entry.element);
elements.add(",".getBytes(StandardCharsets.UTF_8));
}
elements.set(elements.size() - 1, "]".getBytes(StandardCharsets.UTF_8));
publisher = BodyPublishers.ofByteArrays(elements);
} else {
publisher = BodyPublishers.ofByteArray(reqeustBatch.get(0).element);
// By default, Java's BodyPublishers.ofByteArrays(elements) will just put Jsons
// into the HTTP body without any context.
// What we do here is we pack every Json/byteArray into Json Array hence '[' and ']'
// at the end, and we separate every element with comma.
elements.add("[".getBytes(StandardCharsets.UTF_8));
for (HttpSinkRequestEntry entry : reqeustBatch) {
elements.add(entry.element);
elements.add(",".getBytes(StandardCharsets.UTF_8));
}
elements.set(elements.size() - 1, "]".getBytes(StandardCharsets.UTF_8));
publisher = BodyPublishers.ofByteArrays(elements);

Builder requestBuilder = HttpRequest
.newBuilder()
Expand All @@ -122,6 +114,4 @@ private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, UR
throw new RuntimeException(e);
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.util.Properties;

import org.apache.flink.util.StringUtils;

import com.getindata.connectors.http.internal.config.ConfigException;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigProperties;

public class BatchRequestSubmitterFactory implements RequestSubmitterFactory {

private final String maxBatchSize;

public BatchRequestSubmitterFactory(int maxBatchSize) {
this.maxBatchSize = String.valueOf(maxBatchSize);
}

@Override
public RequestSubmitter createSubmitter(Properties properties, String[] headersAndValues) {
String batchRequestSize =
properties.getProperty(HttpConnectorConfigProperties.SINK_HTTP_BATCH_REQUEST_SIZE);
if (StringUtils.isNullOrWhitespaceOnly(batchRequestSize)) {
properties.setProperty(
HttpConnectorConfigProperties.SINK_HTTP_BATCH_REQUEST_SIZE,
maxBatchSize
);
} else {
try {
// TODO Create property validator someday.
int batchSize = Integer.parseInt(batchRequestSize);
if (batchSize < 1) {
throw new ConfigException(
String.format("Property %s must be greater than 0 but was: %s",
HttpConnectorConfigProperties.SINK_HTTP_BATCH_REQUEST_SIZE,
batchRequestSize)
);
}
} catch (NumberFormatException e) {
// TODO Create property validator someday.
throw new ConfigException(
String.format("Property %s must be an integer but was: %s",
HttpConnectorConfigProperties.SINK_HTTP_BATCH_REQUEST_SIZE,
batchRequestSize),
e
);
}
}
return new BatchRequestSubmitter(properties, headersAndValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientResponse;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigProperties;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
import com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallback;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;

/**
Expand All @@ -41,18 +40,15 @@ public class JavaNetSinkHttpClient implements SinkHttpClient {

private final RequestSubmitter requestSubmitter;

public JavaNetSinkHttpClient(Properties properties, HeaderPreprocessor headerPreprocessor) {
this(properties, new Slf4jHttpPostRequestCallback(), headerPreprocessor);
}

public JavaNetSinkHttpClient(
Properties properties,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor) {
HeaderPreprocessor headerPreprocessor,
RequestSubmitterFactory requestSubmitterFactory) {

this.httpPostRequestCallback = httpPostRequestCallback;
this.headerMap = HttpHeaderUtils.prepareHeaderMap(
HttpConnectorConfigConstants.SINK_HEADER_PREFIX,
HttpConnectorConfigProperties.SINK_HEADER_PREFIX,
properties,
headerPreprocessor
);
Expand All @@ -62,16 +58,17 @@ public JavaNetSinkHttpClient(
ComposeHttpStatusCodeCheckerConfig checkerConfig =
ComposeHttpStatusCodeCheckerConfig.builder()
.properties(properties)
.whiteListPrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODE_WHITE_LIST)
.errorCodePrefix(HttpConnectorConfigConstants.HTTP_ERROR_SINK_CODES_LIST)
.whiteListPrefix(HttpConnectorConfigProperties.HTTP_ERROR_SINK_CODE_WHITE_LIST)
.errorCodePrefix(HttpConnectorConfigProperties.HTTP_ERROR_SINK_CODES_LIST)
.build();

this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);

// TODO HTTP-42
this.headersAndValues = HttpHeaderUtils.toHeaderAndValueArray(this.headerMap);
//this.requestSubmitter = new PerRequestSubmitter(properties, headersAndValues);
this.requestSubmitter = new BatchRequestSubmitter(properties, headersAndValues);
this.requestSubmitter = requestSubmitterFactory.createSubmitter(
properties,
headersAndValues
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.util.Properties;

public class PerRequestRequestSubmitterFactory implements RequestSubmitterFactory {

@Override
public RequestSubmitter createSubmitter(Properties properties, String[] headersAndValues) {
return new PerRequestSubmitter(properties, headersAndValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
buildHttpRequest(entry, endpointUri),
HttpResponse.BodyHandlers.ofString())
.exceptionally(ex -> {
// TODO This will be executed on a ForJoinPool Thread... refactor this someday.
// TODO This will be executed on a ForkJoinPool Thread... refactor this someday.
log.error("Request fatally failed because of an exception", ex);
return null;
})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.util.Properties;

public interface RequestSubmitterFactory {

RequestSubmitter createSubmitter(Properties properties, String[] headersAndValues);

}
Loading

0 comments on commit 9467a83

Please sign in to comment.