-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathHttpSinkBuilder.java
191 lines (162 loc) · 7.31 KB
/
HttpSinkBuilder.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package com.getindata.connectors.http;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import com.getindata.connectors.http.internal.HeaderPreprocessor;
import com.getindata.connectors.http.internal.SinkHttpClient;
import com.getindata.connectors.http.internal.SinkHttpClientBuilder;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
import com.getindata.connectors.http.internal.sink.httpclient.HttpRequest;
import com.getindata.connectors.http.internal.sink.httpclient.JavaNetSinkHttpClient;
import com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallback;
import com.getindata.connectors.http.internal.utils.HttpHeaderUtils;
/**
* Builder to construct {@link HttpSink}.
*
* <p>The following example shows the minimum setup to create a {@link HttpSink} that writes String
* values to an HTTP endpoint using POST method.
*
* <pre>{@code
* HttpSink<String> httpSink =
* HttpSink.<String>builder()
* .setEndpointUrl("http://example.com/myendpoint")
* .setElementConverter(
* (s, _context) -> new HttpSinkRequestEntry(
* "POST",
* s.getBytes(StandardCharsets.UTF_8)))
* .build();
* }</pre>
*
* <p>If the following parameters are not set in this builder, the following defaults will be used:
* <ul>
* <li>{@code maxBatchSize} will be 500,</li>
* <li>{@code maxInFlightRequests} will be 50,</li>
* <li>{@code maxBufferedRequests} will be 10000,</li>
* <li>{@code maxBatchSizeInBytes} will be 5 MB i.e. {@code 5 * 1024 * 1024},</li>
* <li>{@code maxTimeInBufferMS} will be 5000ms,</li>
* <li>{@code maxRecordSizeInBytes} will be 1 MB i.e. {@code 1024 * 1024}.</li>
* </ul>
* {@code endpointUrl} and {@code elementConverter} must be set by the user.
*
* @param <InputT> type of the elements that should be sent through HTTP request.
*/
public class HttpSinkBuilder<InputT> extends
AsyncSinkBaseBuilder<InputT, HttpSinkRequestEntry, HttpSinkBuilder<InputT>> {
private static final int DEFAULT_MAX_BATCH_SIZE = 500;
private static final int DEFAULT_MAX_IN_FLIGHT_REQUESTS = 50;
private static final int DEFAULT_MAX_BUFFERED_REQUESTS = 10_000;
private static final long DEFAULT_MAX_BATCH_SIZE_IN_B = 5 * 1024 * 1024;
private static final long DEFAULT_MAX_TIME_IN_BUFFER_MS = 5000;
private static final long DEFAULT_MAX_RECORD_SIZE_IN_B = 1024 * 1024;
private static final SinkHttpClientBuilder DEFAULT_CLIENT_BUILDER = JavaNetSinkHttpClient::new;
private static final HttpPostRequestCallback<HttpRequest>
DEFAULT_POST_REQUEST_CALLBACK = new Slf4jHttpPostRequestCallback();
private static final HeaderPreprocessor DEFAULT_HEADER_PREPROCESSOR =
HttpHeaderUtils.createDefaultHeaderPreprocessor();
private final Properties properties = new Properties();
// Mandatory field
private String endpointUrl;
// Mandatory field
private ElementConverter<InputT, HttpSinkRequestEntry> elementConverter;
// If not defined, should be set to DEFAULT_CLIENT_BUILDER
private SinkHttpClientBuilder sinkHttpClientBuilder;
// If not defined, should be set to DEFAULT_POST_REQUEST_CALLBACK
private HttpPostRequestCallback<HttpRequest> httpPostRequestCallback;
// If not defined, should be set to DEFAULT_HEADER_PREPROCESSOR
private HeaderPreprocessor headerPreprocessor;
HttpSinkBuilder() {
this.sinkHttpClientBuilder = DEFAULT_CLIENT_BUILDER;
this.httpPostRequestCallback = DEFAULT_POST_REQUEST_CALLBACK;
this.headerPreprocessor = DEFAULT_HEADER_PREPROCESSOR;
}
/**
* @param endpointUrl the URL of the endpoint
* @return {@link HttpSinkBuilder} itself
*/
public HttpSinkBuilder<InputT> setEndpointUrl(String endpointUrl) {
this.endpointUrl = endpointUrl;
return this;
}
/**
* @param sinkHttpClientBuilder builder for an implementation of {@link SinkHttpClient} that
* will be used by {@link HttpSink}
* @return {@link HttpSinkBuilder} itself
*/
public HttpSinkBuilder<InputT> setSinkHttpClientBuilder(
SinkHttpClientBuilder sinkHttpClientBuilder) {
this.sinkHttpClientBuilder = sinkHttpClientBuilder;
return this;
}
/**
* @param elementConverter the {@link ElementConverter} to be used for the sink
* @return {@link HttpSinkBuilder} itself
* @deprecated Converters set by this method might not work properly for Flink 1.16+. Use {@link
* #setElementConverter(SchemaLifecycleAwareElementConverter)} instead.
*/
@Deprecated
@PublicEvolving
public HttpSinkBuilder<InputT> setElementConverter(
ElementConverter<InputT, HttpSinkRequestEntry> elementConverter) {
this.elementConverter = elementConverter;
return this;
}
/**
* @param elementConverter the {@link SchemaLifecycleAwareElementConverter} to be used for the
* sink
* @return {@link HttpSinkBuilder} itself
*/
@PublicEvolving
public HttpSinkBuilder<InputT> setElementConverter(
SchemaLifecycleAwareElementConverter<InputT, HttpSinkRequestEntry> elementConverter) {
this.elementConverter = elementConverter;
return this;
}
public HttpSinkBuilder<InputT> setHttpPostRequestCallback(
HttpPostRequestCallback<HttpRequest> httpPostRequestCallback) {
this.httpPostRequestCallback = httpPostRequestCallback;
return this;
}
public HttpSinkBuilder<InputT> setHttpHeaderPreprocessor(
HeaderPreprocessor headerPreprocessor) {
this.headerPreprocessor = headerPreprocessor;
return this;
}
/**
* Set property for Http Sink.
* @param propertyName property name
* @param propertyValue property value
* @return {@link HttpSinkBuilder} itself
*/
public HttpSinkBuilder<InputT> setProperty(String propertyName, String propertyValue) {
this.properties.setProperty(propertyName, propertyValue);
return this;
}
/**
* Add properties to Http Sink configuration
* @param properties properties to add
* @return {@link HttpSinkBuilder} itself
*/
public HttpSinkBuilder<InputT> setProperties(Properties properties) {
this.properties.putAll(properties);
return this;
}
@Override
public HttpSink<InputT> build() {
return new HttpSink<>(
elementConverter,
Optional.ofNullable(getMaxBatchSize()).orElse(DEFAULT_MAX_BATCH_SIZE),
Optional.ofNullable(getMaxInFlightRequests()).orElse(DEFAULT_MAX_IN_FLIGHT_REQUESTS),
Optional.ofNullable(getMaxBufferedRequests()).orElse(DEFAULT_MAX_BUFFERED_REQUESTS),
Optional.ofNullable(getMaxBatchSizeInBytes()).orElse(DEFAULT_MAX_BATCH_SIZE_IN_B),
Optional.ofNullable(getMaxTimeInBufferMS()).orElse(DEFAULT_MAX_TIME_IN_BUFFER_MS),
Optional.ofNullable(getMaxRecordSizeInBytes()).orElse(DEFAULT_MAX_RECORD_SIZE_IN_B),
endpointUrl,
httpPostRequestCallback,
headerPreprocessor,
sinkHttpClientBuilder,
properties
);
}
}