diff --git a/MANUAL.md b/MANUAL.md index 254eca36..62d45d67 100644 --- a/MANUAL.md +++ b/MANUAL.md @@ -17,24 +17,33 @@ influxDB.query(new Query("CREATE RETENTION POLICY " + retentionPolicyName + " ON " + databaseName + " DURATION 1d REPLICATION 1 DEFAULT")); influxDB.setRetentionPolicy(retentionPolicyName); // (3) -influxDB.enableBatch(BatchOptions.DEFAULTS); // (4) - -influxDB.write(Point.measurement("h2o_feet") // (5) +influxDB.enableBatch( + BatchOptions.DEFAULTS + .threadFactory(runnable -> { + Thread thread = new Thread(runnable); + thread.setDaemon(true); + return thread; + }) +); // (4) + +Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close)); // (5) + +influxDB.write(Point.measurement("h2o_feet") // (6) .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .tag("location", "santa_monica") .addField("level description", "below 3 feet") .addField("water_level", 2.064d) .build()); -influxDB.write(Point.measurement("h2o_feet") // (5) +influxDB.write(Point.measurement("h2o_feet") // (6) .tag("location", "coyote_creek") .addField("level description", "between 6 and 9 feet") .addField("water_level", 8.12d) - .build()); // (6) + .build()); -Thread.sleep(5_000L); // (7) +Thread.sleep(5_000L); -QueryResult queryResult = influxDB.query(new Query("SELECT * FROM h2o_feet")); +QueryResult queryResult = influxDB.query(new Query("SELECT * FROM h2o_feet")); // (7) System.out.println(queryResult); // It will print something like: @@ -44,8 +53,6 @@ System.out.println(queryResult); // [2020-03-22T20:50:12.929Z, below 3 feet, santa_monica, 2.064], // [2020-03-22T20:50:12.929Z, between 6 and 9 feet, coyote_creek, 8.12] // ]]], error=null]], error=null] - -influxDB.close(); // (8) ``` ### Connecting to InfluxDB @@ -137,13 +144,26 @@ With batching enabled the client provides two strategies how to deal with errors When new data points are written before the previous (failed) points are successfully written, those are queued inside the client and wait until older data points are successfully written. Size of this queue is limited and configured by `BatchOptions.bufferLimit` property. When the limit is reached, the oldest points in the queue are dropped. 'Retry on error' strategy is used when individual write batch size defined by `BatchOptions.actions` is lower than `BatchOptions.bufferLimit`. -Note: +#### Ensure application exit when batching is enabled +`BatchOptions.DEFAULTS` creates a non-daemon thread pool which prevents the JVM from initiating shutdown in the case of +exceptions or successful completion of the main thread. This will prevent shutdown hooks (many frameworks and plain JVM +applications use these to close/ cleanup resources) from running, preventing graceful termination of the application. + +Thus, configuring batch options with a daemon thread pool will solve this issue and will for example ensure that the registered +(5) shutdown hook is run to close the `InfluxDB` client properly (flushing and closing of resources will happen). + +### Close InfluxDB Client on JVM Termination +(5) In order to ensure that in-flight points are flushed and resources are released properly, it is essential to call +`influxDB.close()` the client when it is no longer required. + +Registering a shutdown hook is a good way to ensure that this is done on application termination regardless of exceptions +that are thrown in the main thread of the code. Note that if you are using a framework, do check the documentation for its +way of configuring shutdown lifecycle hooks or if it might already be calling `close` automatically. -* Batching functionality creates an internal thread pool that needs to be shutdown explicitly as part of a graceful application shutdown or the application will terminate properly. To do so, call `influxDB.close()`. ### Writing to InfluxDB -(5) ... +(6) ... `----8<----BEGIN DRAFT----8<----` diff --git a/README.md b/README.md index d96c23bb..9590fcd6 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,17 @@ influxDB.query(new Query("CREATE RETENTION POLICY " + retentionPolicyName influxDB.setRetentionPolicy(retentionPolicyName); // Enable batch writes to get better performance. -influxDB.enableBatch(BatchOptions.DEFAULTS); +influxDB.enableBatch( + BatchOptions.DEFAULTS + .threadFactory(runnable -> { + Thread thread = new Thread(runnable); + thread.setDaemon(true); + return thread; + }) +); + +// Close it if your application is terminating or you are not using it anymore. +Runtime.getRuntime().addShutdownHook(new Thread(influxDB::close)); // Write points to InfluxDB. influxDB.write(Point.measurement("h2o_feet") @@ -100,9 +110,6 @@ System.out.println(queryResult); // [2020-03-22T20:50:12.929Z, below 3 feet, santa_monica, 2.064], // [2020-03-22T20:50:12.929Z, between 6 and 9 feet, coyote_creek, 8.12] // ]]], error=null]], error=null] - -// Close it if your application is terminating or you are not using it anymore. -influxDB.close(); ``` ## Contribute