Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP-42 Add support for Batch request submission in HTTP Sink. #58

Merged
merged 13 commits into from
Jul 4, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

## [Unreleased]

### Added
- Add support for batch request submission in HTTP sink. The mode can be changed by setting
`gid.connector.http.sink.writer.request.mode` with value `single` or `batch`. The default value
is `batch` bode which is breaking change comparing to previous versions. Additionally,
`gid.connector.http.sink.request.batch.size` option can be used to set batch size. By default,
batch size is 500 which is same as default value of HttpSink `maxBatchSize` parameter.

### Changed
- Changed API for public HttpSink builder. The `setHttpPostRequestCallback` expects a `PostRequestCallback`
of generic type [HttpRequest](src/main/java/com/getindata/connectors/http/internal/sink/httpclient/HttpRequest.java)
instead `HttpSinkRequestEntry`.
- Changed HTTP sink reqeust and response processing thread pool sizes from 16 to 1.

## [0.9.0] - 2023-02-10

- Add support for Flink 1.16.
Expand Down
124 changes: 104 additions & 20 deletions README.md

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion src/main/java/com/getindata/connectors/http/HttpSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
import com.getindata.connectors.http.internal.sink.HttpSinkInternal;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;

/**
* A public implementation for {@code HttpSink} that performs async requests against a specified
Expand Down Expand Up @@ -41,7 +42,7 @@ public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
String endpointUrl,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
SinkHttpClientBuilder sinkHttpClientBuilder,
Properties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
import com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallback;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
Expand Down Expand Up @@ -62,7 +63,7 @@ public class HttpSinkBuilder<InputT> extends

private static final SinkHttpClientBuilder DEFAULT_CLIENT_BUILDER = JavaNetSinkHttpClient::new;

private static final HttpPostRequestCallback<HttpSinkRequestEntry>
private static final HttpPostRequestCallback<HttpRequest>
DEFAULT_POST_REQUEST_CALLBACK = new Slf4jHttpPostRequestCallback();

private static final HeaderPreprocessor DEFAULT_HEADER_PREPROCESSOR =
Expand All @@ -80,7 +81,7 @@ public class HttpSinkBuilder<InputT> extends
private SinkHttpClientBuilder sinkHttpClientBuilder;

// If not defined, should be set to DEFAULT_POST_REQUEST_CALLBACK
private HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
private HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;

// If not defined, should be set to DEFAULT_HEADER_PREPROCESSOR
private HeaderPreprocessor headerPreprocessor;
Expand Down Expand Up @@ -138,7 +139,7 @@ public HttpSinkBuilder<InputT> setElementConverter(
}

public HttpSinkBuilder<InputT> setHttpPostRequestCallback(
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback) {
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback) {
this.httpPostRequestCallback = httpPostRequestCallback;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,22 @@
import org.apache.flink.annotation.PublicEvolving;

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.RequestSubmitterFactory;

/**
* Builder building {@link SinkHttpClient}.
*/
@PublicEvolving
public interface SinkHttpClientBuilder extends Serializable {

// TODO Consider moving HttpPostRequestCallback and HeaderPreprocessor to be a
// TODO Consider moving HttpPostRequestCallback and HeaderPreprocessor, RequestSubmitter to be a
// SinkHttpClientBuilder fields. This method is getting more and more arguments.
SinkHttpClient build(
Properties properties,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
RequestSubmitterFactory requestSubmitterFactory

);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import lombok.ToString;

import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;

/**
* Data class holding {@link HttpSinkRequestEntry} instances that {@link SinkHttpClient} attempted
Expand All @@ -20,11 +21,11 @@ public class SinkHttpClientResponse {
* A list of successfully written requests.
*/
@NonNull
private final List<HttpSinkRequestEntry> successfulRequests;
private final List<HttpRequest> successfulRequests;

/**
* A list of requests that {@link SinkHttpClient} failed to write.
*/
@NonNull
private final List<HttpSinkRequestEntry> failedRequests;
private final List<HttpRequest> failedRequests;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
@UtilityClass
@NoArgsConstructor(access = AccessLevel.NONE)
// TODO Change this name to HttpConnectorConfigProperties
public final class HttpConnectorConfigConstants {

public static final String PROP_DELIM = ",";
Expand Down Expand Up @@ -84,4 +85,14 @@ public final class HttpConnectorConfigConstants {
GID_CONNECTOR_HTTP + "sink.writer.thread-pool.size";

// -----------------------------------------------------


// ------ Sink reqeust submitter settings ------
public static final String SINK_HTTP_REQUEST_MODE =
GID_CONNECTOR_HTTP + "sink.writer.request.mode";

public static final String SINK_HTTP_BATCH_REQUEST_SIZE =
GID_CONNECTOR_HTTP + "sink.request.batch.size";

// ---------------------------------------------
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.getindata.connectors.http.internal.config;

public enum SinkRequestSubmitMode {

SINGLE("single"),
BATCH("batch");

private final String mode;

SinkRequestSubmitMode(String mode) {
this.mode = mode;
}

public String getMode() {
return mode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
import com.getindata.connectors.http.SchemaLifecycleAwareElementConverter;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.config.SinkRequestSubmitMode;
import com.getindata.connectors.http.internal.sink.httpclient.BatchRequestSubmitterFactory;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.PerRequestRequestSubmitterFactory;
import com.getindata.connectors.http.internal.sink.httpclient.RequestSubmitterFactory;

/**
* An internal implementation of HTTP Sink that performs async requests against a specified HTTP
Expand Down Expand Up @@ -59,7 +65,7 @@ public class HttpSinkInternal<InputT> extends AsyncSinkBase<InputT, HttpSinkRequ
// makes it possible to serialize `HttpSink`
private final SinkHttpClientBuilder sinkHttpClientBuilder;

private final HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback;
private final HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;

private final HeaderPreprocessor headerPreprocessor;

Expand All @@ -74,7 +80,7 @@ protected HttpSinkInternal(
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
String endpointUrl,
HttpPostRequestCallback<HttpSinkRequestEntry> httpPostRequestCallback,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
SinkHttpClientBuilder sinkHttpClientBuilder,
Properties properties) {
Expand Down Expand Up @@ -127,7 +133,12 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> cr
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
endpointUrl,
sinkHttpClientBuilder.build(properties, httpPostRequestCallback, headerPreprocessor),
sinkHttpClientBuilder.build(
properties,
httpPostRequestCallback,
headerPreprocessor,
getRequestSubmitterFactory()
),
Collections.emptyList(),
properties
);
Expand All @@ -149,15 +160,29 @@ public StatefulSinkWriter<InputT, BufferedRequestState<HttpSinkRequestEntry>> re
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
endpointUrl,
sinkHttpClientBuilder.build(properties, httpPostRequestCallback, headerPreprocessor),
sinkHttpClientBuilder.build(
properties,
httpPostRequestCallback,
headerPreprocessor,
getRequestSubmitterFactory()
),
recoveredState,
properties
);
}

@Override
public SimpleVersionedSerializer<BufferedRequestState<HttpSinkRequestEntry>>
getWriterStateSerializer() {
getWriterStateSerializer() {
return new HttpSinkWriterStateSerializer();
}

private RequestSubmitterFactory getRequestSubmitterFactory() {

if (SinkRequestSubmitMode.SINGLE.getMode().equalsIgnoreCase(
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_REQUEST_MODE))) {
return new PerRequestRequestSubmitterFactory();
}
return new BatchRequestSubmitterFactory(getMaxBatchSize());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.getindata.connectors.http.internal.sink.httpclient;

import java.net.http.HttpClient;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.flink.util.concurrent.ExecutorThreadFactory;

import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.utils.ThreadUtils;

public abstract class AbstractRequestSubmitter implements RequestSubmitter {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is base class for PerReqeust and Batch submitter implementations.


protected static final int HTTP_CLIENT_PUBLISHING_THREAD_POOL_SIZE = 1;

protected static final String DEFAULT_REQUEST_TIMEOUT_SECONDS = "30";

/**
* Thread pool to handle HTTP response from HTTP client.
*/
protected final ExecutorService publishingThreadPool;

protected final int httpRequestTimeOutSeconds;

protected final String[] headersAndValues;

protected final HttpClient httpClient;

public AbstractRequestSubmitter(
Properties properties,
String[] headersAndValues,
HttpClient httpClient) {

this.headersAndValues = headersAndValues;
this.publishingThreadPool =
Executors.newFixedThreadPool(
HTTP_CLIENT_PUBLISHING_THREAD_POOL_SIZE,
new ExecutorThreadFactory(
"http-sink-client-response-worker", ThreadUtils.LOGGING_EXCEPTION_HANDLER));

this.httpRequestTimeOutSeconds = Integer.parseInt(
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_TIMEOUT_SECONDS,
DEFAULT_REQUEST_TIMEOUT_SECONDS)
);

this.httpClient = httpClient;
}
}
Loading