Skip to content

Commit

Permalink
HTTP-42-BatchRequest - add support for batch request processing in HT…
Browse files Browse the repository at this point in the history
…TP - doc.

Signed-off-by: Krzysztof Chmielewski <[email protected]>
  • Loading branch information
kristoffSC committed Jun 30, 2023
1 parent 26db840 commit 2f4b615
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 20 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

## [Unreleased]

### Added
- Add support for batch request submission in HTTP sink. The mode can be changed by setting
`gid.connector.http.sink.writer.request.mode` with value `single` or `batch`. The default value
is `batch` bode which is breaking change comparing to previous versions. Additionally,
`gid.connector.http.sink.request.batch.size` option can be used to set batch size. By default,
batch size is 500 which is same as default value of HttpSink `maxBatchSize` parameter.

### Changed
- Changed API for public HttpSink builder. The `setHttpPostRequestCallback` expects a `PostRequestCallback`
of generic type [HttpRequest](src/main/java/com/getindata/connectors/http/internal/sink/httpclient/HttpRequest.java)
instead `HttpSinkRequestEntry`.
- Changed HTTP sink reqeust and response processing thread pool sizes from 16 to 1.

## [0.9.0] - 2023-02-10

- Add support for Flink 1.16.
Expand Down
124 changes: 104 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,81 @@ Due to the fact that `HttpSink` sends bytes inside HTTP request's body, one can

Other examples of usage of the Table API can be found in [some tests](src/test/java/com/getindata/connectors/http/table/HttpDynamicSinkInsertTest.java).

### Request submission
Starting from version 0.10 HTTP Sink by default submits events in batch. Before version 0.10 the default and only submission type was `single`.
This is a breaking compatibility change.

The submission mode can be changed using `gid.connector.http.sink.writer.request.mode` property using `single` or `batch` as property value.

#### Batch submission mode
In batch mode, a number of events (processed elements) will be batched and submitted in one HTTP reqeust.
In this mode, HTTP PUT/POST request's body contains a Json array, where every element of this array represents
individual event.

An example of Http Sink batch request body containing data for three events:
```json
[
{
"id": 1,
"first_name": "Ninette",
"last_name": "Clee",
"gender": "Female",
"stock": "CDZI",
"currency": "RUB",
"tx_date": "2021-08-24 15:22:59"
},
{
"id": 2,
"first_name": "Rob",
"last_name": "Zombie",
"gender": "Male",
"stock": "DGICA",
"currency": "GBP",
"tx_date": "2021-10-25 20:53:54"
},
{
"id": 3,
"first_name": "Adam",
"last_name": "Jones",
"gender": "Male",
"stock": "DGICA",
"currency": "PLN",
"tx_date": "2021-10-26 20:53:54"
}
]
```

By default, batch size is set to 500 which is the same as Http Sink's `maxBatchSize` property and has value of 500.
The `maxBatchSize' property sets maximal number of events that will by buffered by Flink runtime before passing it to Http Sink for processing.

In order to change submission batch size use `gid.connector.http.sink.request.batch.size` property. For example:

Streaming API:
```java
HttpSink.<String>builder()
.setEndpointUrl("http://example.com/myendpoint")
.setElementConverter(
(s, _context) -> new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8)))
.setProperty("gid.connector.http.sink.request.batch.size", "50")
.build();
```
SQL:
```roomsql
CREATE TABLE http (
id bigint,
some_field string
) WITH (
'connector' = 'http-sink',
'url' = 'http://example.com/myendpoint',
'format' = 'json',
'gid.connector.http.sink.request.batch.size' = '50'
)
```

#### Single submission mode
In this mode every processed event is submitted as individual HTTP POST/PUT request.


#### Http headers
It is possible to set HTTP headers that will be added to HTTP request send by sink connector.
Headers are defined via property key `gid.connector.http.sink.header.HEADER_NAME = header value` for example:
Expand Down Expand Up @@ -307,26 +382,28 @@ If the used value starts from prefix `Basic `, it will be used as header value a
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |

### HTTP Sink
| Option | Required | Description/Value |
|------------------------------------------------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. |
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
| format | required | Specify what format to use. |
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
| sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. |
| sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. |
| sink.flush-buffer.timeout | optional | Threshold time in milliseconds for an element to be in a buffer before being flushed. |
| gid.connector.http.sink.request-callback | optional | Specify which `HttpPostRequestCallback` implementation to use. By default, it is set to `slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`. |
| gid.connector.http.sink.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. |
| gid.connector.http.sink.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. |
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |
| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. |
| gid.connector.http.sink.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
| gid.connector.http.sink.writer.thread-pool.size | optional | Sets the size of pool thread for HTTP Sink request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 16 threads will be used. |
| Option | Required | Description/Value |
|---------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. |
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
| format | required | Specify what format to use. |
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
| sink.requests.max-buffered | optional | Maximum number of buffered records before applying backpressure. |
| sink.flush-buffer.size | optional | The maximum size of a batch of entries that may be sent to the HTTP endpoint measured in bytes. |
| sink.flush-buffer.timeout | optional | Threshold time in milliseconds for an element to be in a buffer before being flushed. |
| gid.connector.http.sink.request-callback | optional | Specify which `HttpPostRequestCallback` implementation to use. By default, it is set to `slf4j-logger` corresponding to `Slf4jHttpPostRequestCallback`. |
| gid.connector.http.sink.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Sink, separated with comma. |
| gid.connector.http.sink.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.sink.error.code` list, separated with comma. |
| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. |
| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. |
| gid.connector.http.sink.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
| gid.connector.http.sink.writer.thread-pool.size | optional | Sets the size of pool thread for HTTP Sink request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 1 thread will be used. |
| gid.connector.http.sink.writer.request.mode | optional | Sets Http Sink request submission mode. Two modes are available to select, `single` and `batch` which is the default mode if option is not specified. |
| gid.connector.http.sink.request.batch.size | optional | Applicable only for `gid.connector.http.sink.writer.request.mode = batch`. Sets number of individual events/requests that will be submitted as one HTTP request by HTTP sink. The default value is 500 which is same as HTTP Sink `maxBatchSize` |

## Build and deployment
To build the project locally you need to have `maven 3` and Java 11+. </br>
Expand Down Expand Up @@ -404,6 +481,13 @@ Implementation of an HTTP Sink is based on Flink's `AsyncSinkBase` introduced in
#### Http Response to Table schema mapping
The mapping from Http Json Response to SQL table schema is done via Flink's Json Format [5].

## Breaking changes
- Version 0.10
- Http Sink submission mode changed from single to batch. From now, body of HTTP POUT/POST request will contain a Json array.
- Changed API for public HttpSink builder. The `setHttpPostRequestCallback` expects a `PostRequestCallback`
of generic type [HttpRequest](src/main/java/com/getindata/connectors/http/internal/sink/httpclient/HttpRequest.java)
instead `HttpSinkRequestEntry`.

## TODO

### HTTP TableLookup Source
Expand Down

0 comments on commit 2f4b615

Please sign in to comment.