Skip to content

Commit

Permalink
Added InfluxDB.flush().
Browse files Browse the repository at this point in the history
  • Loading branch information
jganoff committed Feb 28, 2017
1 parent 46c29a3 commit fd9e20b
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

- Support chunking
- Add a databaseExists method to InfluxDB interface
- [Issue #289] (https://github.com/influxdata/influxdb-java/issues/289) Batching enhancements: Pending asynchronous writes can be explicitly flushed via `InfluxDB.flush()`.

#### Fixes

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/influxdb/InfluxDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ public void write(final String database, final String retentionPolicy,
*/
public boolean databaseExists(final String name);

/**
* Send any buffered points to InfluxDB. This method is synchronous and will block while all pending points are
* written.
*
* @throws IllegalStateException if batching is not enabled.
*/
public void flush();

/**
* close thread for asynchronous batch write and UDP socket to release resources if need.
*/
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,15 @@ public void run() {
* called if no batch processing is needed anymore.
*
*/
void flush() {
void flushAndShutdown() {
this.write();
this.scheduler.shutdown();
}

/**
* Flush the current open writes to InfluxDB. This will block until all pending points are written.
*/
void flush() {
this.write();
}
}
13 changes: 12 additions & 1 deletion src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public InfluxDB enableBatch(final int actions, final int flushDuration,
public void disableBatch() {
this.batchEnabled.set(false);
if (this.batchProcessor != null) {
this.batchProcessor.flush();
this.batchProcessor.flushAndShutdown();
if (this.logLevel != LogLevel.NONE) {
System.out.println(
"total writes:" + this.writeCount.get()
Expand Down Expand Up @@ -460,6 +460,17 @@ private <T> T execute(final Call<T> call) {
}
}

/**
* {@inheritDoc}
*/
@Override
public void flush() {
if (!batchEnabled.get()) {
throw new IllegalStateException("BatchProcessing is not enabled.");
}
batchProcessor.flush();
}

/**
* {@inheritDoc}
*/
Expand Down
29 changes: 29 additions & 0 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -621,4 +621,33 @@ public void accept(QueryResult result) {
}
}

@Test
public void testFlushPendingWritesWhenBatchingEnabled() {
String dbName = "flush_tests_" + System.currentTimeMillis();
try {
this.influxDB.createDatabase(dbName);

// Enable batching with a very large buffer and flush interval so writes will be triggered by our call to flush().
this.influxDB.enableBatch(Integer.MAX_VALUE, Integer.MAX_VALUE, TimeUnit.HOURS);

String measurement = TestUtils.getRandomMeasurement();
Point point = Point.measurement(measurement).tag("atag", "test").addField("used", 80L).addField("free", 1L).build();
this.influxDB.write(dbName, TestUtils.defaultRetentionPolicy(this.influxDB.version()), point);
this.influxDB.flush();

Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", dbName);
QueryResult result = this.influxDB.query(query);
Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
} finally {
this.influxDB.deleteDatabase(dbName);
this.influxDB.disableBatch();
}
}

@Test(expected = IllegalStateException.class)
public void testFlushThrowsIfBatchingIsNotEnabled() {
Assert.assertFalse(this.influxDB.isBatchEnabled());
this.influxDB.flush();
}

}

0 comments on commit fd9e20b

Please sign in to comment.