Skip to content

Commit

Permalink
Document steps to ensure application terminates properly when batch m…
Browse files Browse the repository at this point in the history
…ode is enabled (#763)
  • Loading branch information
adyang authored Aug 21, 2021
1 parent 54901a1 commit b2cb6d1
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 16 deletions.
44 changes: 32 additions & 12 deletions MANUAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,33 @@ influxDB.query(new Query("CREATE RETENTION POLICY " + retentionPolicyName
+ " ON " + databaseName + " DURATION 1d REPLICATION 1 DEFAULT"));
influxDB.setRetentionPolicy(retentionPolicyName); // (3)

influxDB.enableBatch(BatchOptions.DEFAULTS); // (4)

influxDB.write(Point.measurement("h2o_feet") // (5)
influxDB.enableBatch(
BatchOptions.DEFAULTS
.threadFactory(runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
})
); // (4)

Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close)); // (5)

influxDB.write(Point.measurement("h2o_feet") // (6)
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.tag("location", "santa_monica")
.addField("level description", "below 3 feet")
.addField("water_level", 2.064d)
.build());

influxDB.write(Point.measurement("h2o_feet") // (5)
influxDB.write(Point.measurement("h2o_feet") // (6)
.tag("location", "coyote_creek")
.addField("level description", "between 6 and 9 feet")
.addField("water_level", 8.12d)
.build()); // (6)
.build());

Thread.sleep(5_000L); // (7)
Thread.sleep(5_000L);

QueryResult queryResult = influxDB.query(new Query("SELECT * FROM h2o_feet"));
QueryResult queryResult = influxDB.query(new Query("SELECT * FROM h2o_feet")); // (7)

System.out.println(queryResult);
// It will print something like:
Expand All @@ -44,8 +53,6 @@ System.out.println(queryResult);
// [2020-03-22T20:50:12.929Z, below 3 feet, santa_monica, 2.064],
// [2020-03-22T20:50:12.929Z, between 6 and 9 feet, coyote_creek, 8.12]
// ]]], error=null]], error=null]

influxDB.close(); // (8)
```

### Connecting to InfluxDB
Expand Down Expand Up @@ -137,13 +144,26 @@ With batching enabled the client provides two strategies how to deal with errors
When new data points are written before the previous (failed) points are successfully written, those are queued inside the client and wait until older data points are successfully written.
Size of this queue is limited and configured by `BatchOptions.bufferLimit` property. When the limit is reached, the oldest points in the queue are dropped. 'Retry on error' strategy is used when individual write batch size defined by `BatchOptions.actions` is lower than `BatchOptions.bufferLimit`.

Note:
#### Ensure application exit when batching is enabled
`BatchOptions.DEFAULTS` creates a non-daemon thread pool which prevents the JVM from initiating shutdown in the case of
exceptions or successful completion of the main thread. This will prevent shutdown hooks (many frameworks and plain JVM
applications use these to close/ cleanup resources) from running, preventing graceful termination of the application.

Thus, configuring batch options with a daemon thread pool will solve this issue and will for example ensure that the registered
(5) shutdown hook is run to close the `InfluxDB` client properly (flushing and closing of resources will happen).

### Close InfluxDB Client on JVM Termination
(5) In order to ensure that in-flight points are flushed and resources are released properly, it is essential to call
`influxDB.close()` the client when it is no longer required.

Registering a shutdown hook is a good way to ensure that this is done on application termination regardless of exceptions
that are thrown in the main thread of the code. Note that if you are using a framework, do check the documentation for its
way of configuring shutdown lifecycle hooks or if it might already be calling `close` automatically.

* Batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a graceful application shutdown or the application will terminate properly. To do so, call `influxDB.close()`.

### Writing to InfluxDB

(5) ...
(6) ...

`----8<----BEGIN DRAFT----8<----`

Expand Down
15 changes: 11 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,17 @@ influxDB.query(new Query("CREATE RETENTION POLICY " + retentionPolicyName
influxDB.setRetentionPolicy(retentionPolicyName);

// Enable batch writes to get better performance.
influxDB.enableBatch(BatchOptions.DEFAULTS);
influxDB.enableBatch(
BatchOptions.DEFAULTS
.threadFactory(runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
return thread;
})
);

// Close it if your application is terminating or you are not using it anymore.
Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close));

// Write points to InfluxDB.
influxDB.write(Point.measurement("h2o_feet")
Expand Down Expand Up @@ -100,9 +110,6 @@ System.out.println(queryResult);
// [2020-03-22T20:50:12.929Z, below 3 feet, santa_monica, 2.064],
// [2020-03-22T20:50:12.929Z, between 6 and 9 feet, coyote_creek, 8.12]
// ]]], error=null]], error=null]

// Close it if your application is terminating or you are not using it anymore.
influxDB.close();
```

## Contribute
Expand Down

0 comments on commit b2cb6d1

Please sign in to comment.