From fd9e20b0332917ba3a2d39f26bc51c122d71e2bc Mon Sep 17 00:00:00 2001 From: Jordan Ganoff Date: Mon, 27 Feb 2017 22:35:18 -0800 Subject: [PATCH] Added InfluxDB.flush(). --- CHANGELOG.md | 1 + src/main/java/org/influxdb/InfluxDB.java | 8 +++++ .../org/influxdb/impl/BatchProcessor.java | 8 ++++- .../java/org/influxdb/impl/InfluxDBImpl.java | 13 ++++++++- src/test/java/org/influxdb/InfluxDBTest.java | 29 +++++++++++++++++++ 5 files changed, 57 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ad4623d38..88d5978fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ - Support chunking - Add a databaseExists method to InfluxDB interface + - [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Pending asynchronous writes can be explicitly flushed via `InfluxDB.flush()`. #### Fixes diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 2d1793dfb..698a470b1 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -273,6 +273,14 @@ public void write(final String database, final String retentionPolicy, */ public boolean databaseExists(final String name); + /** + * Send any buffered points to InfluxDB. This method is synchronous and will block while all pending points are + * written. + * + * @throws IllegalStateException if batching is not enabled. + */ + public void flush(); + /** * close thread for asynchronous batch write and UDP socket to release resources if need. */ diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index e276e6c46..0ad0d0ae9 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -263,9 +263,15 @@ public void run() { * called if no batch processing is needed anymore. * */ - void flush() { + void flushAndShutdown() { this.write(); this.scheduler.shutdown(); } + /** + * Flush the current open writes to InfluxDB. This will block until all pending points are written. + */ + void flush() { + this.write(); + } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 8adc8d748..85e3d5d4a 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -177,7 +177,7 @@ public InfluxDB enableBatch(final int actions, final int flushDuration, public void disableBatch() { this.batchEnabled.set(false); if (this.batchProcessor != null) { - this.batchProcessor.flush(); + this.batchProcessor.flushAndShutdown(); if (this.logLevel != LogLevel.NONE) { System.out.println( "total writes:" + this.writeCount.get() @@ -460,6 +460,17 @@ private T execute(final Call call) { } } + /** + * {@inheritDoc} + */ + @Override + public void flush() { + if (!batchEnabled.get()) { + throw new IllegalStateException("BatchProcessing is not enabled."); + } + batchProcessor.flush(); + } + /** * {@inheritDoc} */ diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index 068e44330..116f42552 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -621,4 +621,33 @@ public void accept(QueryResult result) { } } + @Test + public void testFlushPendingWritesWhenBatchingEnabled() { + String dbName = "flush_tests_" + System.currentTimeMillis(); + try { + this.influxDB.createDatabase(dbName); + + // Enable batching with a very large buffer and flush interval so writes will be triggered by our call to flush(). + this.influxDB.enableBatch(Integer.MAX_VALUE, Integer.MAX_VALUE, TimeUnit.HOURS); + + String measurement = TestUtils.getRandomMeasurement(); + Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); + this.influxDB.write(dbName, TestUtils.defaultRetentionPolicy(this.influxDB.version()), point); + this.influxDB.flush(); + + Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", dbName); + QueryResult result = this.influxDB.query(query); + Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); + } finally { + this.influxDB.deleteDatabase(dbName); + this.influxDB.disableBatch(); + } + } + + @Test(expected = IllegalStateException.class) + public void testFlushThrowsIfBatchingIsNotEnabled() { + Assert.assertFalse(this.influxDB.isBatchEnabled()); + this.influxDB.flush(); + } + }