-
Notifications
You must be signed in to change notification settings - Fork 45
/
Copy pathBatchRequestSubmitter.java
118 lines (98 loc) · 4.43 KB
/
BatchRequestSubmitter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package com.getindata.connectors.http.internal.sink.httpclient;
import java.net.URI;
import java.net.http.HttpClient.Version;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.BodyPublisher;
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.sink.HttpSinkRequestEntry;
@Slf4j
public class BatchRequestSubmitter extends AbstractRequestSubmitter {
private final int httpReqeustBatchSize;
public BatchRequestSubmitter(Properties properties, String[] headersAndValue) {
super(properties, headersAndValue);
this.httpReqeustBatchSize = Integer.parseInt(
properties.getProperty(HttpConnectorConfigConstants.SINK_HTTP_BATCH_REQUEST_SIZE)
);
}
@Override
public List<CompletableFuture<JavaNetHttpResponseWrapper>> submit(
String endpointUrl,
List<HttpSinkRequestEntry> requestToSubmit) {
var responseFutures = new ArrayList<CompletableFuture<JavaNetHttpResponseWrapper>>();
int counter = 0;
String previousReqeustMethod = requestToSubmit.get(0).method;
List<HttpSinkRequestEntry> reqeustBatch = new ArrayList<>(httpReqeustBatchSize);
for (var entry : requestToSubmit) {
reqeustBatch.add(entry);
if (++counter % httpReqeustBatchSize == 0
|| !previousReqeustMethod.equalsIgnoreCase(entry.method)) {
responseFutures.add(sendBatch(endpointUrl, reqeustBatch));
reqeustBatch.clear();
}
}
if (!reqeustBatch.isEmpty()) {
responseFutures.add(sendBatch(endpointUrl, reqeustBatch));
}
return responseFutures;
}
private CompletableFuture<JavaNetHttpResponseWrapper> sendBatch(
String endpointUrl,
List<HttpSinkRequestEntry> reqeustBatch) {
System.out.println("aaaaa " + new String(reqeustBatch.get(0).element));
var endpointUri = URI.create(endpointUrl);
return httpClient
.sendAsync(
buildHttpRequest(reqeustBatch, endpointUri),
HttpResponse.BodyHandlers.ofString())
.exceptionally(ex -> {
// TODO This will be executed on a ForkJoinPool Thread... refactor this someday.
log.error("Request fatally failed because of an exception", ex);
return null;
})
.thenApplyAsync(
// TODO HTTP-42
res -> new JavaNetHttpResponseWrapper(reqeustBatch.get(0), res),
publishingThreadPool
);
}
private HttpRequest buildHttpRequest(List<HttpSinkRequestEntry> reqeustBatch, URI endpointUri) {
try {
var method = reqeustBatch.get(0).method;
List<byte[]> elements = new ArrayList<>(reqeustBatch.size());
BodyPublisher publisher;
// By default, Java's BodyPublishers.ofByteArrays(elements) will just put Jsons
// into the HTTP body without any context.
// What we do here is we pack every Json/byteArray into Json Array hence '[' and ']'
// at the end, and we separate every element with comma.
elements.add("[".getBytes(StandardCharsets.UTF_8));
for (HttpSinkRequestEntry entry : reqeustBatch) {
elements.add(entry.element);
elements.add(",".getBytes(StandardCharsets.UTF_8));
}
elements.set(elements.size() - 1, "]".getBytes(StandardCharsets.UTF_8));
publisher = BodyPublishers.ofByteArrays(elements);
Builder requestBuilder = HttpRequest
.newBuilder()
.uri(endpointUri)
.version(Version.HTTP_1_1)
.timeout(Duration.ofSeconds(httpRequestTimeOutSeconds))
.method(method, publisher);
if (headersAndValues.length != 0) {
requestBuilder.headers(headersAndValues);
}
return requestBuilder.build();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}