forked from influxdata/influxdb-java
-
Notifications
You must be signed in to change notification settings - Fork 1
/
BatchProcessor.java
290 lines (250 loc) · 8.72 KB
/
BatchProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package org.influxdb.impl;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBBatchListener;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
/**
* A BatchProcessor can be attached to a InfluxDB Instance to collect single point writes and
* aggregates them to BatchPoints to get a better write performance.
*
* @author stefan.majer [at] gmail.com
*/
public class BatchProcessor {
private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
protected final BlockingQueue<AbstractBatchEntry> queue;
final InfluxDBImpl influxDB;
final int actions;
private final ScheduledExecutorService scheduler;
private final TimeUnit flushIntervalUnit;
private final int flushInterval;
private final InfluxDBBatchListener listener;
BatchProcessor(final InfluxDBImpl influxDB, final ThreadFactory threadFactory, final int actions,
final TimeUnit flushIntervalUnit, final int flushInterval, final InfluxDBBatchListener listener) {
super();
this.influxDB = influxDB;
this.actions = actions;
this.flushIntervalUnit = flushIntervalUnit;
this.flushInterval = flushInterval;
this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory);
this.listener = listener;
if (actions > 1 && actions < Integer.MAX_VALUE) {
this.queue = new LinkedBlockingQueue<>(actions);
} else {
this.queue = new LinkedBlockingQueue<>();
}
// Flush at specified Rate
this.scheduler.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
write();
}
}, this.flushInterval, this.flushInterval, this.flushIntervalUnit);
}
/**
* Static method to create the Builder for this BatchProcessor.
*
* @param influxDB the influxdb database handle.
* @return the Builder to create the BatchProcessor.
*/
public static Builder builder(final InfluxDB influxDB) {
return new Builder(influxDB);
}
void write() {
final List<Point> batch = new ArrayList<>();
try {
if (this.queue.isEmpty()) {
return;
}
//for batch on HTTP.
Map<String, BatchPoints> batchKeyToBatchPoints = Maps.newHashMap();
//for batch on UDP.
Map<Integer, List<String>> udpPortToBatchPoints = Maps.newHashMap();
List<AbstractBatchEntry> batchEntries = new ArrayList<>(this.queue.size());
this.queue.drainTo(batchEntries);
for (AbstractBatchEntry batchEntry : batchEntries) {
Point point = batchEntry.getPoint();
batch.add(point);
if (batchEntry instanceof HttpBatchEntry) {
HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry);
String dbName = httpBatchEntry.getDb();
String rp = httpBatchEntry.getRp();
String batchKey = dbName + "_" + rp;
if (!batchKeyToBatchPoints.containsKey(batchKey)) {
BatchPoints batchPoints = BatchPoints.database(dbName)
.retentionPolicy(rp).build();
batchKeyToBatchPoints.put(batchKey, batchPoints);
}
batchKeyToBatchPoints.get(batchKey).point(point);
} else if (batchEntry instanceof UdpBatchEntry) {
UdpBatchEntry udpBatchEntry = UdpBatchEntry.class.cast(batchEntry);
int udpPort = udpBatchEntry.getUdpPort();
if (!udpPortToBatchPoints.containsKey(udpPort)) {
List<String> batchPoints = new ArrayList<String>();
udpPortToBatchPoints.put(udpPort, batchPoints);
}
udpPortToBatchPoints.get(udpPort).add(point.lineProtocol());
}
}
for (BatchPoints batchPoints : batchKeyToBatchPoints.values()) {
BatchProcessor.this.influxDB.write(batchPoints);
}
for (Entry<Integer, List<String>> entry : udpPortToBatchPoints.entrySet()) {
for (String lineprotocolStr : entry.getValue()) {
BatchProcessor.this.influxDB.write(entry.getKey(), lineprotocolStr);
}
}
listener.onPointBatchWrite(batch);
} catch (Throwable t) {
listener.onException(batch, t);
// any exception wouldn't stop the scheduler
LOG.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t);
}
}
/**
* Put a single BatchEntry to the cache for later processing.
*
* @param batchEntry the batchEntry to write to the cache.
*/
void put(final AbstractBatchEntry batchEntry) {
try {
this.queue.put(batchEntry);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if (this.queue.size() >= this.actions) {
this.scheduler.submit(new Runnable() {
@Override
public void run() {
write();
}
});
}
}
/**
* Flush the current open writes to influxdb and end stop the reaper thread. This should only be
* called if no batch processing is needed anymore.
*/
void flushAndShutdown() {
this.write();
this.scheduler.shutdown();
}
/**
* Flush the current open writes to InfluxDB. This will block until all pending points are written.
*/
void flush() {
this.write();
}
/**
* The Builder to create a BatchProcessor instance.
*/
public static final class Builder {
private final InfluxDBImpl influxDB;
private ThreadFactory threadFactory = Executors.defaultThreadFactory();
private int actions;
private TimeUnit flushIntervalUnit;
private int flushInterval;
private InfluxDBBatchListener listener = new BaseInfluxDBListener();
/**
* @param influxDB is mandatory.
*/
public Builder(final InfluxDB influxDB) {
this.influxDB = (InfluxDBImpl) influxDB;
}
/**
* @param threadFactory is optional.
*/
public Builder threadFactory(final ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
/**
* The number of actions after which a batchwrite must be performed.
*
* @param maxActions number of Points written after which a write must happen.
* @return this Builder to use it fluent
*/
public Builder actions(final int maxActions) {
this.actions = maxActions;
return this;
}
/**
* The interval at which at least should issued a write.
*
* @param interval the interval
* @param unit the TimeUnit of the interval
* @return this Builder to use it fluent
*/
public Builder interval(final int interval, final TimeUnit unit) {
this.flushInterval = interval;
this.flushIntervalUnit = unit;
return this;
}
public Builder listener(final InfluxDBBatchListener listener) {
this.listener = listener;
return this;
}
/**
* Create the BatchProcessor.
*
* @return the BatchProcessor instance.
*/
public BatchProcessor build() {
Preconditions.checkNotNull(this.influxDB, "influxDB may not be null");
Preconditions.checkArgument(this.actions > 0, "actions should > 0");
Preconditions.checkArgument(this.flushInterval > 0, "flushInterval should > 0");
Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null");
Preconditions.checkNotNull(this.threadFactory, "threadFactory may not be null");
Preconditions.checkNotNull(this.listener, "listener may not be null");
return new BatchProcessor(this.influxDB, this.threadFactory, this.actions, this.flushIntervalUnit,
this.flushInterval, this.listener);
}
}
abstract static class AbstractBatchEntry {
private final Point point;
public AbstractBatchEntry(final Point point) {
this.point = point;
}
public Point getPoint() {
return this.point;
}
}
static class HttpBatchEntry extends AbstractBatchEntry {
private final String db;
private final String rp;
public HttpBatchEntry(final Point point, final String db, final String rp) {
super(point);
this.db = db;
this.rp = rp;
}
public String getDb() {
return this.db;
}
public String getRp() {
return this.rp;
}
}
static class UdpBatchEntry extends AbstractBatchEntry {
private final int udpPort;
public UdpBatchEntry(final Point point, final int udpPort) {
super(point);
this.udpPort = udpPort;
}
public int getUdpPort() {
return this.udpPort;
}
}
}