diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index caa2a20aa..ef8b5e639 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -29,7 +29,7 @@ public class BatchProcessor { private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName()); - protected final BlockingQueue queue = new LinkedBlockingQueue<>(); + protected final BlockingQueue queue = new LinkedBlockingQueue<>(); private final ScheduledExecutorService scheduler; final InfluxDBImpl influxDB; final int actions; @@ -107,22 +107,28 @@ public BatchProcessor build() { } } - static class BatchEntry { - private final Point point; + abstract static class AbstractBatchEntry { + private final Point point; + + public AbstractBatchEntry(final Point point) { + this.point = point; + } + + public Point getPoint() { + return this.point; + } + } + + static class HttpBatchEntry extends AbstractBatchEntry { private final String db; private final String rp; - public BatchEntry(final Point point, final String db, final String rp) { - super(); - this.point = point; + public HttpBatchEntry(final Point point, final String db, final String rp) { + super(point); this.db = db; this.rp = rp; } - public Point getPoint() { - return this.point; - } - public String getDb() { return this.db; } @@ -132,6 +138,19 @@ public String getRp() { } } + static class UdpBatchEntry extends AbstractBatchEntry { + private final int udpPort; + + public UdpBatchEntry(final Point point, final int udpPort) { + super(point); + this.udpPort = udpPort; + } + + public int getUdpPort() { + return this.udpPort; + } + } + /** * Static method to create the Builder for this BatchProcessor. * @@ -168,18 +187,22 @@ void write() { } Map databaseToBatchPoints = Maps.newHashMap(); - List batchEntries = new ArrayList<>(this.queue.size()); + List batchEntries = new ArrayList<>(this.queue.size()); this.queue.drainTo(batchEntries); - for (BatchEntry batchEntry : batchEntries) { - String dbName = batchEntry.getDb(); - if (!databaseToBatchPoints.containsKey(dbName)) { - BatchPoints batchPoints = BatchPoints.database(dbName) - .retentionPolicy(batchEntry.getRp()).build(); - databaseToBatchPoints.put(dbName, batchPoints); + for (AbstractBatchEntry batchEntry : batchEntries) { + if (batchEntry instanceof HttpBatchEntry) { + HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry); + String dbName = httpBatchEntry.getDb(); + if (!databaseToBatchPoints.containsKey(dbName)) { + BatchPoints batchPoints = BatchPoints.database(dbName) + .retentionPolicy(httpBatchEntry.getRp()).build(); + databaseToBatchPoints.put(dbName, batchPoints); + } + Point point = batchEntry.getPoint(); + databaseToBatchPoints.get(dbName).point(point); } - Point point = batchEntry.getPoint(); - databaseToBatchPoints.get(dbName).point(point); + } for (BatchPoints batchPoints : databaseToBatchPoints.values()) { @@ -197,7 +220,7 @@ void write() { * @param batchEntry * the batchEntry to write to the cache. */ - void put(final BatchEntry batchEntry) { + void put(final AbstractBatchEntry batchEntry) { this.queue.add(batchEntry); if (this.queue.size() >= this.actions) { this.scheduler.submit(new Runnable() { diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 90aaef1f0..e31a5f492 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -12,7 +12,7 @@ import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; -import org.influxdb.impl.BatchProcessor.BatchEntry; +import org.influxdb.impl.BatchProcessor.HttpBatchEntry; import okhttp3.Headers; import okhttp3.HttpUrl; import okhttp3.MediaType; @@ -180,7 +180,7 @@ public String version() { @Override public void write(final String database, final String retentionPolicy, final Point point) { if (this.batchEnabled.get()) { - BatchEntry batchEntry = new BatchEntry(point, database, retentionPolicy); + HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy); this.batchProcessor.put(batchEntry); } else { BatchPoints batchPoints = BatchPoints.database(database) diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index c07b88822..90fea6990 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -25,8 +25,8 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); Point point = Point.measurement("cpu").field("6", "").build(); - BatchProcessor.BatchEntry batchEntry1 = new BatchProcessor.BatchEntry(point, "db1", ""); - BatchProcessor.BatchEntry batchEntry2 = new BatchProcessor.BatchEntry(point, "db2", ""); + BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", ""); + BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", ""); batchProcessor.put(batchEntry1); Thread.sleep(200); // wait for scheduler