You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
As I started discussing in #289, I'm building an InfluxDB producer that I'd like to guarantee at-least-once delivery for. The intended use is in a streaming context where I need to guarantee all records I've received up to a certain point have been durably persisted and must handle a rate of records such that writes must be batched to keep up.
I couldn't directly use the existing automatic batching capabilities of the InfluxDBImpl and BatchProcessor because I had no visibility into when a point was persisted. After reading the related issues and PRs here (#148 and #108 specifically) I decided the best approach for my use case was to implement a client that would automatically batch records but also gave me the ability to be notified when an individual point's batch write was complete - successful or not. It uses an InfluxDB client to send batches of points as quickly as possible. Points are grouped in to as few batch writes as possible and are flushed at a configurable interval and whenever the buffer reaches a specific capacity.
Is this something you'd like to see contributed to this repository or should it live on its own?
Here's the interface:
/**
* Asynchronous InfluxDB client that accumulates individual point writes into batches.
*/
public interface BatchAccumulatingInfluxDBClient {
/**
* Asynchronously queue a point to be written and invoke the provided callback after the point has been written. This may block if the underlying
* implementation cannot buffer the point immediately. The provided callback will be invoked after the attempt to write the batch finishes and before the
* future that was returned at invocation time is completed.
*
* @param database The database to write the point to.
* @param retentionPolicy The retention policy for the point.
* @param point The point to write.
* @param callback A callback to invoke on successful write or when an error was encountered while attempting to write the record.
* @return A future indicating the status of the asynchronous operation.
* @throws InterruptedException if the thread is interrupted while blocking.
* @throws TimeoutException if the write needs to block and has to wait longer then the configured timeout to enqueue the write.
*/
Future<Void> write(String database, String retentionPolicy, InfluxDB.ConsistencyLevel consistencyLevel, Point point, WriteCallback callback)
throws InterruptedException, TimeoutException;
/**
* Triggers all pending batched records to be sent and blocks until the completion of the writes.
*
* @throws InterruptedException if the thread is interrupted while blocking.
*/
void flush() throws InterruptedException;
}
The text was updated successfully, but these errors were encountered:
As I started discussing in #289, I'm building an InfluxDB producer that I'd like to guarantee at-least-once delivery for. The intended use is in a streaming context where I need to guarantee all records I've received up to a certain point have been durably persisted and must handle a rate of records such that writes must be batched to keep up.
I couldn't directly use the existing automatic batching capabilities of the
InfluxDBImpl
andBatchProcessor
because I had no visibility into when a point was persisted. After reading the related issues and PRs here (#148 and #108 specifically) I decided the best approach for my use case was to implement a client that would automatically batch records but also gave me the ability to be notified when an individual point's batch write was complete - successful or not. It uses anInfluxDB
client to send batches of points as quickly as possible. Points are grouped in to as few batch writes as possible and are flushed at a configurable interval and whenever the buffer reaches a specific capacity.Is this something you'd like to see contributed to this repository or should it live on its own?
Here's the interface:
The text was updated successfully, but these errors were encountered: