diff --git a/CHANGELOG.md b/CHANGELOG.md index ad4623d38..88d5978fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Support chunking - Add a databaseExists method to InfluxDB interface + - [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Pending asynchronous writes can be explicitly flushed via `InfluxDB.flush()`. #### Fixes diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 2d1793dfb..698a470b1 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -273,6 +273,14 @@ public void write(final String database, final String retentionPolicy, */ public boolean databaseExists(final String name); + /** + * Send any buffered points to InfluxDB. This method is synchronous and will block while all pending points are + * written. + * + * @throws IllegalStateException if batching is not enabled. + */ + public void flush(); + /** * close thread for asynchronous batch write and UDP socket to release resources if need. */ diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index e276e6c46..0ad0d0ae9 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -263,9 +263,15 @@ public void run() { * called if no batch processing is needed anymore. * */ - void flush() { + void flushAndShutdown() { this.write(); this.scheduler.shutdown(); } + /** + * Flush the current open writes to InfluxDB. This will block until all pending points are written. + */ + void flush() { + this.write(); + } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 8adc8d748..85e3d5d4a 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -177,7 +177,7 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, public void disableBatch() { this.batchEnabled.set(false); if (this.batchProcessor != null) { - this.batchProcessor.flush(); + this.batchProcessor.flushAndShutdown(); if (this.logLevel != LogLevel.NONE) { System.out.println( "total writes:" + this.writeCount.get() @@ -460,6 +460,17 @@ private T execute(final Call call) { } } + /** + * {@inheritDoc} + */ + @Override + public void flush() { + if (!batchEnabled.get()) { + throw new IllegalStateException("BatchProcessing is not enabled."); + } + batchProcessor.flush(); + } + /** * {@inheritDoc} */ diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index 068e44330..116f42552 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -621,4 +621,33 @@ public void accept(QueryResult result) { } } + @Test + public void testFlushPendingWritesWhenBatchingEnabled() { + String dbName = "flush_tests_" + System.currentTimeMillis(); + try { + this.influxDB.createDatabase(dbName); + + // Enable batching with a very large buffer and flush interval so writes will be triggered by our call to flush(). + this.influxDB.enableBatch(Integer.MAX_VALUE, Integer.MAX_VALUE, TimeUnit.HOURS); + + String measurement = TestUtils.getRandomMeasurement(); + Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); + this.influxDB.write(dbName, TestUtils.defaultRetentionPolicy(this.influxDB.version()), point); + this.influxDB.flush(); + + Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", dbName); + QueryResult result = this.influxDB.query(query); + Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); + } finally { + this.influxDB.deleteDatabase(dbName); + this.influxDB.disableBatch(); + } + } + + @Test(expected = IllegalStateException.class) + public void testFlushThrowsIfBatchingIsNotEnabled() { + Assert.assertFalse(this.influxDB.isBatchEnabled()); + this.influxDB.flush(); + } + } diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 039d301f1..82a2e8de4 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -5,6 +5,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import java.io.IOException; import java.util.concurrent.TimeUnit; @@ -58,6 +59,32 @@ public void testBatchWriteWithDifferenctRp() throws InterruptedException, IOExce verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); } + @Test + public void testFlushWritesBufferedPointsAndDoesNotShutdownScheduler() throws InterruptedException { + InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); + BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB) + .actions(Integer.MAX_VALUE) + .interval(1, TimeUnit.NANOSECONDS).build(); + + Point point = Point.measurement("test").addField("region", "a").build(); + BatchProcessor.HttpBatchEntry httpBatchEntry = new BatchProcessor.HttpBatchEntry(point, "http", "http-rp"); + + batchProcessor.put(httpBatchEntry); + Thread.sleep(100); // wait for scheduler + // Our put should have been written + verify(mockInfluxDB).write(any(BatchPoints.class)); + + // Force a flush which should not stop the scheduler + batchProcessor.flush(); + + batchProcessor.put(httpBatchEntry); + Thread.sleep(100); // wait for scheduler + // Our second put should have been written if the scheduler is still running + verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + + verifyNoMoreInteractions(mockInfluxDB); + } + @Test(expected = IllegalArgumentException.class) public void testActionsIsZero() throws InterruptedException, IOException { InfluxDB mockInfluxDB = mock(InfluxDBImpl.class);