-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathHttpSink.java
74 lines (67 loc) · 2.62 KB
/
HttpSink.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.getindata.connectors.http;
import java.util.Properties;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
import com.getindata.connectors.http.internal.sink.HttpSinkInternal;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
/**
* A public implementation for {@code HttpSink} that performs async requests against a specified
* HTTP endpoint using the buffering protocol specified in
* {@link org.apache.flink.connector.base.sink.AsyncSinkBase}.
*
* <p>
* To create a new instance of this class use {@link HttpSinkBuilder}. An example would be:
* <pre>{@code
* HttpSink<String> httpSink =
* HttpSink.<String>builder()
* .setEndpointUrl("http://example.com/myendpoint")
* .setElementConverter(
* (s, _context) -> new HttpSinkRequestEntry("POST", "text/plain",
* s.getBytes(StandardCharsets.UTF_8)))
* .build();
* }</pre>
*
* @param <InputT> type of the elements that should be sent through HTTP request.
*/
@PublicEvolving
public class HttpSink<InputT> extends HttpSinkInternal<InputT> {
HttpSink(
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter,
int maxBatchSize,
int maxInFlightRequests,
int maxBufferedRequests,
long maxBatchSizeInBytes,
long maxTimeInBufferMS,
long maxRecordSizeInBytes,
String endpointUrl,
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback,
HeaderPreprocessor headerPreprocessor,
SinkHttpClientBuilder sinkHttpClientBuilder,
Properties properties) {
super(elementConverter,
maxBatchSize,
maxInFlightRequests,
maxBufferedRequests,
maxBatchSizeInBytes,
maxTimeInBufferMS,
maxRecordSizeInBytes,
endpointUrl,
httpPostRequestCallback,
headerPreprocessor,
sinkHttpClientBuilder,
properties
);
}
/**
* Create a {@link HttpSinkBuilder} constructing a new {@link HttpSink}.
*
* @param <InputT> type of the elements that should be sent through HTTP request
* @return {@link HttpSinkBuilder}
*/
public static <InputT> HttpSinkBuilder<InputT> builder() {
return new HttpSinkBuilder<>();
}
}