From 1beafd374edf3a02786e34f45dffb1ac6de03982 Mon Sep 17 00:00:00 2001 From: Andy Flury Date: Thu, 9 Feb 2017 18:16:24 +0100 Subject: [PATCH] set initial capacity on LinkedBlockingQueue inside BatchProcessor --- .../java/org/influxdb/impl/BatchProcessor.java | 13 +++++++++++-- src/test/java/org/influxdb/InfluxDBTest.java | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index 83c4924b3..e276e6c46 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -30,7 +30,7 @@ public class BatchProcessor { private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName()); - protected final BlockingQueue queue = new LinkedBlockingQueue<>(); + protected final BlockingQueue queue; private final ScheduledExecutorService scheduler; final InfluxDBImpl influxDB; final int actions; @@ -171,6 +171,11 @@ public static Builder builder(final InfluxDB influxDB) { this.flushIntervalUnit = flushIntervalUnit; this.flushInterval = flushInterval; this.scheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); + if (actions > 1 && actions < Integer.MAX_VALUE) { + this.queue = new LinkedBlockingQueue<>(actions); + } else { + this.queue = new LinkedBlockingQueue<>(); + } // Flush at specified Rate this.scheduler.scheduleAtFixedRate(new Runnable() { @Override @@ -238,7 +243,11 @@ void write() { * the batchEntry to write to the cache. */ void put(final AbstractBatchEntry batchEntry) { - this.queue.add(batchEntry); + try { + this.queue.put(batchEntry); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } if (this.queue.size() >= this.actions) { this.scheduler.submit(new Runnable() { @Override diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index 132df79ac..a07b7fd2f 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -209,6 +209,24 @@ public void testAsyncWritePointThroughUDP() { this.influxDB.disableBatch(); } } + + + /** + * Test the implementation of {@link InfluxDB#write(int, Point)}'s async support. + */ + @Test(expected = RuntimeException.class) + public void testAsyncWritePointThroughUDPFail() { + this.influxDB.enableBatch(1, 1, TimeUnit.SECONDS); + try{ + Assert.assertTrue(this.influxDB.isBatchEnabled()); + String measurement = TestUtils.getRandomMeasurement(); + Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); + Thread.currentThread().interrupt(); + this.influxDB.write(UDP_PORT, point); + }finally{ + this.influxDB.disableBatch(); + } + } /** * Test writing to the database using string protocol.