diff --git a/pom.xml b/pom.xml index 9030d8450..6fe393a29 100644 --- a/pom.xml +++ b/pom.xml @@ -113,5 +113,10 @@ okhttp 2.4.0 + + org.slf4j + slf4j-api + 1.7.12 + diff --git a/src/main/java/org/influxdb/InfluxDB.java b/src/main/java/org/influxdb/InfluxDB.java index 19cd6b956..2bc9ab973 100644 --- a/src/main/java/org/influxdb/InfluxDB.java +++ b/src/main/java/org/influxdb/InfluxDB.java @@ -9,6 +9,8 @@ import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; +import com.google.common.base.Optional; + /** * Interface with all available methods to access a InfluxDB database. * @@ -66,6 +68,21 @@ public String value() { return this.value; } } + + /** + * Behaviour options for when a put fails to add to the buffer. This is + * particularly important when using capacity limited buffering. + */ + public enum BufferFailBehaviour { + /** Throw an exception if cannot add to buffer */ + THROW_EXCEPTION, + /** Drop (do not add) the element attempting to be added */ + DROP_CURRENT, + /** Drop the oldest element in the queue and add the current element */ + DROP_OLDEST, + /** Block the thread until the space becomes available (NB: Not tested) */ + BLOCK_THREAD, + } /** * Set the loglevel which is used for REST related actions. @@ -75,7 +92,7 @@ public String value() { * @return the InfluxDB instance to be able to use it in a fluent manner. */ public InfluxDB setLogLevel(final LogLevel logLevel); - + /** * Enable Batching of single Point writes to speed up writes significant. If either actions or * flushDurations is reached first, a batchwrite is issued. @@ -83,12 +100,75 @@ public String value() { * @param actions * the number of actions to collect * @param flushDuration - * the time to wait at most. + * the minimun time to wait at most. NB: The 'maximum' time is set to 5 time this number. * @param flushDurationTimeUnit * @return the InfluxDB instance to be able to use it in a fluent manner. */ + @Deprecated public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit); + + + /** + * Enable Batching of single Point writes to speed up writes significant. If either actions or + * flushDurations is reached first, a batchwrite is issued. + * + * @param actions + * the number of actions to collect + * @param flushIntervalMin + * the minimum time to wait before sending the batched writes. + * @param flushIntervalMax + * The maximum time, when backing off for failure, between sending batched writes. + * @param flushDurationTimeUnit + * @return the InfluxDB instance to be able to use it in a fluent manner. + */ + public InfluxDB enableBatch(final int flushActions, + final int flushIntervalMin, + final int flushIntervalMax, + final TimeUnit flushIntervalTimeUnit); + /** + * Enable Batching of single Point with a capacity limit. Batching provides + * a significant performance improvement in write speed. If either actions + * or flushDurations is reached first, a batchwrite is issued. + * + * This allows greater control over the behaviour when the capacity of the + * underlying buffer is limited. + * + * @param capacity + * the maximum number of points to hold. Should be NULL, for no + * buffering OR > 0 for buffering (NB: a capacity of 1 will not + * really buffer) + * @param flushActions + * the number of actions to collect before triggering a batched + * write + * @param flushIntervalMin + * the amount of time to wait before triggering a batched write + * @param flushIntervalMax + * the maximum amount of time to wait before triggering a batched write, + * when backing off due to failure + * @param flushIntervalTimeUnit + * the time unit for the flushIntervalMin and flushIntervalMax parameters + * @param behaviour + * the desired behaviour when capacity constrains are met + * @param discardOnFailedWrite + * if FALSE, the points from a failed batch write buffer will + * attempt to put them back onto the queue if TRUE, the points + * froma failed batch write will be discarded + * @param maxBatchWriteSize + * the maximum number of points to include in one batch write + * attempt. NB: this is different from the flushActions parameter, as + * the buffer can hold more than the flushActions parameter + * @return + */ + public InfluxDB enableBatch(final Integer capacity, + final int flushActions, + final int flushIntervalMin, + final int flushIntervalMax, + final TimeUnit flushIntervalTimeUnit, + BufferFailBehaviour behaviour, + boolean discardOnFailedWrite, + int maxBatchWriteSize); + /** * Disable Batching. */ @@ -114,7 +194,7 @@ public String value() { public String version(); /** - * Write a single Point to the database. + * Write a single Point to the database with ConsistencyLevel.One. * * @param database * the database to write to. @@ -123,36 +203,47 @@ public String value() { * @param point * The point to write */ + @Deprecated public void write(final String database, final String retentionPolicy, final Point point); + + /** + * Write a single Point to the database. + * + * @param database + * @param retentionPolicy + * @param consistencyLevel + * @param point + */ + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final Point point); /** * Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol. * * {@linkplain "https://github.com/influxdb/influxdb/pull/2696"} - * + * * @param batchPoints */ + @Deprecated public void write(final BatchPoints batchPoints); - + /** - * Write a set of Points to the influxdb database with the string records. - * + * Write a set of Points to the influxdb database with the new (>= 0.9.0rc32) lineprotocol. + * * {@linkplain "https://github.com/influxdb/influxdb/pull/2696"} - * - * @param records + * + * @param batchPoints */ - public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final String records); + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final List points); + /** - * Write a set of Points to the influxdb database with the list of string records. + * Write a set of Points to the influxdb database with the string records. * * {@linkplain "https://github.com/influxdb/influxdb/pull/2696"} * * @param records */ - public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List records); - - /** + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final String records); /** * Execute a query agains a database. @@ -196,4 +287,20 @@ public String value() { */ public List describeDatabases(); -} + /** + * Get the number of buffered points NB: If batching is not enabled this + * will return 0 + * + * @return + */ + public int getBufferedCount(); + + /** + * Retrieves, but does not remove, the first element of the buffer + * + * @return an Optional containing the first element in the queue if + * it is present + */ + public Optional peekFirstBuffered(); + +} \ No newline at end of file diff --git a/src/main/java/org/influxdb/dto/BatchPoints.java b/src/main/java/org/influxdb/dto/BatchPoints.java index e49e35f72..0d5fabb56 100644 --- a/src/main/java/org/influxdb/dto/BatchPoints.java +++ b/src/main/java/org/influxdb/dto/BatchPoints.java @@ -19,6 +19,7 @@ * @author stefan * */ +@Deprecated public class BatchPoints { private String database; private String retentionPolicy; diff --git a/src/main/java/org/influxdb/dto/Point.java b/src/main/java/org/influxdb/dto/Point.java index cd65509ef..655c8b979 100644 --- a/src/main/java/org/influxdb/dto/Point.java +++ b/src/main/java/org/influxdb/dto/Point.java @@ -3,6 +3,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.text.NumberFormat; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; @@ -62,7 +63,7 @@ public static final class Builder { /** * @param measurement */ - Builder(final String measurement) { + protected Builder(final String measurement) { this.measurement = measurement; } @@ -215,6 +216,10 @@ public Point build() { void setMeasurement(final String measurement) { this.measurement = measurement; } + + public String getMeasurement() { + return measurement; + } /** * @param time @@ -223,6 +228,10 @@ void setMeasurement(final String measurement) { void setTime(final Long time) { this.time = time; } + + public Long getTime() { + return time; + } /** * @param tags @@ -246,6 +255,10 @@ Map getTags() { void setPrecision(final TimeUnit precision) { this.precision = precision; } + + public TimeUnit getPrecision() { + return precision; + } /** * @param fields @@ -292,7 +305,7 @@ public String lineProtocol() { sb.append(formatedTime()); return sb.toString(); } - + private StringBuilder concatenatedTags() { final StringBuilder sb = new StringBuilder(); for (Entry tag : this.tags.entrySet()) { @@ -351,4 +364,13 @@ private StringBuilder formatedTime() { return sb; } + public static String toLineProtocol(List points) { + StringBuilder sb = new StringBuilder(); + for (Point point : points) { + sb.append(point.lineProtocol()).append("\n"); + } + return sb.toString(); + } + + } diff --git a/src/main/java/org/influxdb/impl/BatchProcessor.java b/src/main/java/org/influxdb/impl/BatchProcessor.java index de8e27091..870f7196d 100644 --- a/src/main/java/org/influxdb/impl/BatchProcessor.java +++ b/src/main/java/org/influxdb/impl/BatchProcessor.java @@ -3,19 +3,26 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; +import java.util.Map.Entry; +import java.util.concurrent.BlockingDeque; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; +import java.util.concurrent.atomic.AtomicBoolean; import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; +import org.influxdb.InfluxDB.BufferFailBehaviour; +import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.Point; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; /** @@ -26,23 +33,47 @@ * */ public class BatchProcessor { + private static final Logger logger = LoggerFactory.getLogger(BatchProcessor.class); + public static final int DEFAULT_ACTIONS = 10; + public static final int DEFAULT_FLUSH_INTERVAL_MIN = 1000; + public static final int DEFAULT_FLUSH_INTERVAL_MAX = 60000; + public static final TimeUnit DEFAULT_FLUSH_INTERVAL_TIME_UINT = TimeUnit.MILLISECONDS; + public static final int DEFAULT_MAX_BATCH_WRITE_SIZE = 50; - private static final Logger logger = Logger.getLogger(BatchProcessor.class.getName()); - protected final BlockingQueue queue = new LinkedBlockingQueue<>(); - private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); - final InfluxDBImpl influxDB; - final int actions; + private static final int BACKOFF_EXPONENT = 2; + + protected final BlockingDeque queue; + private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + private final InfluxDBImpl influxDB; + private final int flushActions; private final TimeUnit flushIntervalUnit; - private final int flushInterval; + private int flushInterval; + private final int flushIntervalMin; + private final int flushIntervalMax; + private final BufferFailBehaviour behaviour; + private final boolean discardOnFailedWrite; + private final int maxBatchWriteSize; + + private final AtomicBoolean writeInProgressLock = new AtomicBoolean(false); + private final AtomicBoolean waitForFlushIntervalToWriteLock = new AtomicBoolean(false); + + private final Object queueLock = new Object(); + private final ArrayList writeList; + /** * The Builder to create a BatchProcessor instance. */ public static final class Builder { private final InfluxDBImpl influxDB; - private int actions; - private TimeUnit flushIntervalUnit; - private int flushInterval; + private int flushActions = DEFAULT_ACTIONS; + private TimeUnit flushIntervalUnit = DEFAULT_FLUSH_INTERVAL_TIME_UINT; + private int flushIntervalMin = DEFAULT_FLUSH_INTERVAL_MIN; + private int flushIntervalMax = DEFAULT_FLUSH_INTERVAL_MAX; + private Integer capacity = null; + private BufferFailBehaviour behaviour = BufferFailBehaviour.THROW_EXCEPTION; + private boolean discardOnFailedWrite = true; + private int maxBatchWriteSize = DEFAULT_MAX_BATCH_WRITE_SIZE; /** * @param influxDB @@ -55,12 +86,13 @@ public Builder(final InfluxDB influxDB) { /** * The number of actions after which a batchwrite must be performed. * - * @param maxActions - * number of Points written after which a write must happen. + * @param actions + * number of Points written after which a write should + * happen. * @return this Builder to use it fluent */ - public Builder actions(final int maxActions) { - this.actions = maxActions; + public Builder actions(final int flushActions) { + this.flushActions = flushActions; return this; } @@ -74,47 +106,127 @@ public Builder actions(final int maxActions) { * * @return this Builder to use it fluent */ - public Builder interval(final int interval, final TimeUnit unit) { - this.flushInterval = interval; + public Builder interval(final int intervalMin, final int intervalMax, final TimeUnit unit) { + this.flushIntervalMin = intervalMin; + this.flushIntervalMax = intervalMax; this.flushIntervalUnit = unit; return this; } + /** + * The maximum queue capacity. + * + * @param capacity + * the maximum number of points to hold Should be NULL, for + * no buffering OR > 0 for buffering (NB: a capacity of 1 + * will not really buffer) + * @return this {@code Builder}, to allow chaining + */ + public Builder capacity(final Integer capacity) { + this.capacity = capacity; + return this; + } + + /** + * Set both the capacity and actions + * @param capacity + * @param actions + * @return this builder instance, for fluent usage + */ + public Builder capacityAndActions(final Integer capacity, final int flushActions) { + this.capacity = capacity; + this.flushActions = flushActions; + return this; + } + + /** + * The behaviour when a put to the buffer fails + * + * @param behaviour + * @return this builder instance, for fluent usage + */ + public Builder behaviour(final BufferFailBehaviour behaviour) { + this.behaviour = behaviour; + return this; + } + + /** + * Controls whether the buffer will keep or discard buffered points on + * network errors. + * + * @param discardOnFailedWrite + * @return this builder instance, for fluent usage + */ + public Builder discardOnFailedWrite(final boolean discardOnFailedWrite) { + this.discardOnFailedWrite = discardOnFailedWrite; + return this; + } + + /** + * The maximum number of points to write in a batch + * + * @param maxBatchWriteSize + * @return this builder instance, for fluent usage + */ + public Builder maxBatchWriteSize(final int maxBatchWriteSize) { + this.maxBatchWriteSize = maxBatchWriteSize; + return this; + } + /** * Create the BatchProcessor. * * @return the BatchProcessor instance. */ public BatchProcessor build() { - Preconditions.checkNotNull(this.actions, "actions may not be null"); - Preconditions.checkNotNull(this.flushInterval, "flushInterval may not be null"); - Preconditions.checkNotNull(this.flushIntervalUnit, "flushIntervalUnit may not be null"); - return new BatchProcessor(this.influxDB, this.actions, this.flushIntervalUnit, this.flushInterval); + Preconditions.checkArgument(flushActions > 0, "flushActions must be > 0"); + Preconditions.checkArgument(flushIntervalMin > 0, "flushIntervalMin must be > 0"); + Preconditions.checkNotNull(flushIntervalUnit, "flushIntervalUnit may not be null"); + Preconditions.checkArgument(flushIntervalMin <= flushIntervalMax, "flushIntervalMin must be <= flushIntervalMax"); + Preconditions.checkArgument(maxBatchWriteSize > 0, "maxBatchWriteSize must be > 0"); + + if (capacity != null) { + Preconditions.checkArgument(capacity > 0, "Capacity should be > 0 or NULL"); + Preconditions.checkArgument(capacity >= flushActions, "Capacity must be >= than flushActions"); + } else { + Preconditions.checkArgument(behaviour != BufferFailBehaviour.DROP_OLDEST, + "Behaviour cannot be DROP_OLDEST if capacity not set"); + } + + return new BatchProcessor(influxDB, flushActions, flushIntervalUnit, flushIntervalMin, flushIntervalMax, + capacity, behaviour, + discardOnFailedWrite, maxBatchWriteSize); } } static class BatchEntry { private final Point point; - private final String db; - private final String rp; + private final String database; + private final String retentionPolicy; + private final ConsistencyLevel consistencyLevel; - public BatchEntry(final Point point, final String db, final String rp) { + public BatchEntry(final Point point, final String database, ConsistencyLevel consistencyLevel, final String retentionPolicy) { super(); this.point = point; - this.db = db; - this.rp = rp; + this.database = database; + this.retentionPolicy = retentionPolicy; + this.consistencyLevel = consistencyLevel; } public Point getPoint() { - return this.point; + return point; } - public String getDb() { - return this.db; + public String getDatabase() { + return database; } - public String getRp() { - return this.rp; + public String getRetentionPolicy() { + return retentionPolicy; + } + + public ConsistencyLevel getConsistencyLevel() { + return consistencyLevel; } } @@ -129,51 +241,172 @@ public static Builder builder(final InfluxDB influxDB) { return new Builder(influxDB); } - BatchProcessor(final InfluxDBImpl influxDB, final int actions, final TimeUnit flushIntervalUnit, - final int flushInterval) { + BatchProcessor(final InfluxDBImpl influxDB, final int flushActions, final TimeUnit flushIntervalUnit, + final int flushIntervalMin, final int flushIntervalMax, final Integer capacity, final BufferFailBehaviour behaviour, + boolean discardOnFailedWrite, final int maxBatchWriteSize) { super(); this.influxDB = influxDB; - this.actions = actions; + this.flushActions = flushActions; this.flushIntervalUnit = flushIntervalUnit; - this.flushInterval = flushInterval; - + this.flushIntervalMin = flushIntervalMin; + this.flushIntervalMax = flushIntervalMax; + this.behaviour = behaviour; + this.discardOnFailedWrite = discardOnFailedWrite; + this.maxBatchWriteSize = maxBatchWriteSize; + writeList = Lists.newArrayListWithCapacity(maxBatchWriteSize); + + flushInterval = this.flushIntervalMin; + + if (capacity != null) { + if (capacity == 0) { + throw new IllegalArgumentException("capacity cannot be 0"); + } + queue = new LinkedBlockingDeque(capacity); + } else { + queue = new LinkedBlockingDeque(); + } + // Flush at specified Rate - this.scheduler.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - write(); + scheduleNextFlush(); + } + + private void scheduleNextFlush() { + logger.debug("scheduling next flush for {} {}", flushInterval, flushIntervalUnit); + scheduler.schedule(new FlushIntervalRunnable(), flushInterval, flushIntervalUnit); + } + + private class FlushIntervalRunnable implements Runnable { + public void run() { + logger.debug("Flush interval commenced"); + WriteResult result = attemptWrite(); + + switch (result){ + case FAILED: + logger.debug("Flush interval - FAILED"); + flushInterval = Math.min(flushInterval * BACKOFF_EXPONENT, flushIntervalMax); + break; + case NOT_ATTEMPTED: + logger.debug("Flush interval - NOT ATTEMPTED"); + break; + case SUCCESSFUL: + logger.debug("Flush interval - SUCCESS"); + flushInterval = flushIntervalMin; + waitForFlushIntervalToWriteLock.set(false); + break; + default: + throw new RuntimeException("Unhandled WriteResult enum value:" + result); } - }, this.flushInterval, this.flushInterval, this.flushIntervalUnit); + + scheduleNextFlush(); + } + } + + private class WriteRunnable implements Runnable{ + @Override + public void run() { + attemptWrite(); + } + } + + enum WriteResult { + NOT_ATTEMPTED, + SUCCESSFUL, + FAILED, + } + + WriteResult attemptWrite() { + if (writeInProgressLock.compareAndSet(false, true)) { + logger.debug("Attempting to write"); + boolean success = write(); + writeInProgressLock.set(false); + + return success ? WriteResult.SUCCESSFUL: WriteResult.FAILED; + } + logger.debug("Write already in progress, not attempting"); + return WriteResult.NOT_ATTEMPTED; + } + + void writeNow() { + // If there is no write in progress, schedule an immediate write + if (!writeInProgressLock.get()) { + logger.debug("Write NOT already in progress, scheduling WriteRunnable"); + scheduler.execute(new WriteRunnable()); + } } - void write() { - try { - if (this.queue.isEmpty()) { - return; + boolean write() { + if (queue.isEmpty()) { + return true; + } + + synchronized (queueLock) { + writeList.clear(); // probably redundant + // Never write the whole queue, it could be very big, so just get a + // temporary list + queue.drainTo(writeList, maxBatchWriteSize); + } + + // Map the writeList by the common (and hence batchable) fields + Map> databaseToBatchPoints = Maps.newHashMap(); + + for (BatchEntry batchEntry : writeList) { + BatchCommonFields common = BatchCommonFields.fromEntry(batchEntry); + + if (!databaseToBatchPoints.containsKey(common)) { + databaseToBatchPoints.put(common, new ArrayList()); } + databaseToBatchPoints.get(common).add(batchEntry); + } - Map databaseToBatchPoints = Maps.newHashMap(); - List batchEntries = new ArrayList<>(this.queue.size()); - this.queue.drainTo(batchEntries); + // For each collection of batchable fields, attempt a batched write + for (Entry> entry : databaseToBatchPoints.entrySet()) { + BatchCommonFields common = entry.getKey(); + List batchEntries = entry.getValue(); - for (BatchEntry batchEntry : batchEntries) { - String dbName = batchEntry.getDb(); - if (!databaseToBatchPoints.containsKey(dbName)) { - BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(batchEntry.getRp()).build(); - databaseToBatchPoints.put(dbName, batchPoints); - } - Point point = batchEntry.getPoint(); - databaseToBatchPoints.get(dbName).point(point); + List points = Lists.transform(batchEntries, new Function() { + @Override + public Point apply(BatchEntry input) { + return input.point; + } + }); + + try { + influxDB.writeBatched(common.database, common.retentionPolicy, common.consistencyLevel, points); + writeList.removeAll(batchEntries); + } catch (Exception e) { + // TODO: we should probably include some logging here } + } + + if (!writeList.isEmpty()) { + // Some points were not written, return them to the queue if + // necessary + synchronized (queueLock) { + if (!discardOnFailedWrite) { + // If we failed our write, add back the elements from this + // attempt in REVERSE order to maintain queue ordering + for (BatchEntry batchEntry : Lists.reverse(writeList)) { + boolean insertedAtStart = queue.offerFirst(batchEntry); + if (!insertedAtStart) { + // We have inserted as much as we can, may as well + // stop. - for (BatchPoints batchPoints : databaseToBatchPoints.values()) { - BatchProcessor.this.influxDB.write(batchPoints); + // NB: There is possibly a need for an enhancement + // here based on the behaviour attribute, but for + // now I cannot think of a more reasonable action + // than the current behaviour + break; + } + } + waitForFlushIntervalToWriteLock.set(true); + } + writeList.clear(); } - } catch (Throwable t) { - // any exception would stop the scheduler - logger.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t); + return false; } + + return true; } /** @@ -181,22 +414,130 @@ void write() { * * @param batchEntry * the batchEntry to write to the cache. + * @return */ - void put(final BatchEntry batchEntry) { - this.queue.add(batchEntry); - if (this.queue.size() >= this.actions) { - write(); + public boolean put(String database, String retentionPolicy, ConsistencyLevel consistency, Point point) { + BatchEntry entry = new BatchEntry(point, database, consistency, retentionPolicy); + boolean added = false; + + switch (behaviour) { + case DROP_CURRENT: + added = queue.offer(entry); + break; + case DROP_OLDEST: + added = addAndDropIfNecessary(entry); + break; + case THROW_EXCEPTION: + added = queue.add(entry); + break; + case BLOCK_THREAD: + try { + queue.put(entry); + added = true; + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + break; + default: + throw new UnsupportedOperationException("Behaviour not yet supported"); + } + + logger.debug("Queue size:{}", queue.size()); + + if (!waitForFlushIntervalToWriteLock.get()) { + if (queue.size() >= flushActions) { + logger.debug("No flush lock - Queue size[{}] actions[{}]", queue.size(), flushActions); + writeNow(); + } + } + + return added; + } + + private boolean addAndDropIfNecessary(BatchEntry entry) { + synchronized (queueLock) { + boolean added = queue.offer(entry); + if (!added) { + queue.poll(); // Remove the front of the queue + added = queue.add(entry); + } + return added; } } /** - * Flush the current open writes to influxdb and end stop the reaper thread. This should only be - * called if no batch processing is needed anymore. + * Flush the current open writes to influxdb and end stop the reaper thread. + * This should only be called if no batch processing is needed anymore. * */ void flush() { - this.write(); - this.scheduler.shutdown(); + write(); + scheduler.shutdown(); } + private static class BatchCommonFields { + private final String database; + private final String retentionPolicy; + private final ConsistencyLevel consistencyLevel; + + public BatchCommonFields(final String database, final String retentionPolicy, + final ConsistencyLevel consistencyLevel) { + this.database = database; + this.retentionPolicy = retentionPolicy; + this.consistencyLevel = consistencyLevel; + } + + @Override + public int hashCode() { + return Objects.hashCode(database, retentionPolicy, consistencyLevel); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + BatchCommonFields other = (BatchCommonFields) obj; + + return (Objects.equal(database, other.database) + && Objects.equal(retentionPolicy, other.retentionPolicy) + && Objects.equal(consistencyLevel, other.consistencyLevel)); + } + + public static BatchCommonFields fromEntry(BatchEntry batchEntry) { + return new BatchCommonFields(batchEntry.getDatabase(), + batchEntry.getRetentionPolicy(), batchEntry.getConsistencyLevel()); + } + } + + public int getBufferedCount() { + synchronized (queueLock) { + return writeList.size() + queue.size(); + } + } + + /** + * Retrieves, but does not remove, the first element of the buffer + * + * @return an Optional containing the first element in the queue + */ + public Optional peekFirstBuffered() { + BatchEntry batchEntry = null; + synchronized (queueLock) { + if (!writeList.isEmpty()) { + batchEntry = writeList.get(0); + } else { + batchEntry = queue.peekFirst(); + } + } + + if (batchEntry == null) { + return Optional.absent(); + } + + return Optional.of(batchEntry.point); + } } diff --git a/src/main/java/org/influxdb/impl/InfluxDBImpl.java b/src/main/java/org/influxdb/impl/InfluxDBImpl.java index 6a5daf35c..67927823d 100644 --- a/src/main/java/org/influxdb/impl/InfluxDBImpl.java +++ b/src/main/java/org/influxdb/impl/InfluxDBImpl.java @@ -5,22 +5,23 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.base.Joiner; import org.influxdb.InfluxDB; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; -import org.influxdb.impl.BatchProcessor.BatchEntry; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; +import com.squareup.okhttp.OkHttpClient; import retrofit.RestAdapter; import retrofit.client.Client; import retrofit.client.Header; +import retrofit.client.OkClient; import retrofit.client.Response; import retrofit.mime.TypedString; @@ -47,12 +48,16 @@ public InfluxDBImpl(final String url, final String username, final String passwo super(); this.username = username; this.password = password; - this.restAdapter = new RestAdapter.Builder() + restAdapter = new RestAdapter.Builder() .setEndpoint(url) .setErrorHandler(new InfluxDBErrorHandler()) .setClient(client) .build(); - this.influxDBService = this.restAdapter.create(InfluxDBService.class); + influxDBService = restAdapter.create(InfluxDBService.class); + } + + protected BatchProcessor getBatchProcessor() { + return batchProcessor; } @@ -60,16 +65,16 @@ public InfluxDBImpl(final String url, final String username, final String passwo public InfluxDB setLogLevel(final LogLevel logLevel) { switch (logLevel) { case NONE: - this.restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.NONE); + restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.NONE); break; case BASIC: - this.restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.BASIC); + restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.BASIC); break; case HEADERS: - this.restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.HEADERS); + restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.HEADERS); break; case FULL: - this.restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.FULL); + restAdapter.setLogLevel(retrofit.RestAdapter.LogLevel.FULL); break; default: break; @@ -78,28 +83,60 @@ public InfluxDB setLogLevel(final LogLevel logLevel) { return this; } - @Override - public InfluxDB enableBatch(final int actions, final int flushDuration, final TimeUnit flushDurationTimeUnit) { - if (this.batchEnabled.get()) { + public InfluxDB enableBatch( + final Integer capacity, + final int flushActions, + final int flushIntervalMin, + final int flushIntervalMax, + final TimeUnit flushIntervalTimeUnit, + BufferFailBehaviour behaviour, + boolean discardOnFailedWrite, + int maxBatchWriteSize) { + if (batchEnabled.get()) { throw new IllegalArgumentException("BatchProcessing is already enabled."); } - this.batchProcessor = BatchProcessor + batchProcessor = BatchProcessor .builder(this) - .actions(actions) - .interval(flushDuration, flushDurationTimeUnit) + .capacityAndActions(capacity, flushActions) + .interval(flushIntervalMin, flushIntervalMax, flushIntervalTimeUnit) + .behaviour(behaviour) + .discardOnFailedWrite(discardOnFailedWrite) + .maxBatchWriteSize(maxBatchWriteSize) .build(); - this.batchEnabled.set(true); + batchEnabled.set(true); + return this; + } + + @Override + public InfluxDB enableBatch(final int actions, + final int flushInterval, + final TimeUnit flushIntervalTimeUnit) { + return enableBatch(actions, flushInterval, 5 * flushInterval, flushIntervalTimeUnit); + } + + @Override + public InfluxDB enableBatch(final int flushActions, + final int flushIntervalMin, + final int flushIntervalMax, + final TimeUnit flushIntervalTimeUnit) { + + enableBatch(null, + flushActions, + flushIntervalMin, + flushIntervalMax, + flushIntervalTimeUnit, + BufferFailBehaviour.THROW_EXCEPTION, + true, flushActions); return this; } @Override public void disableBatch() { - this.batchEnabled.set(false); - this.batchProcessor.flush(); - if (this.logLevel != LogLevel.NONE) { - System.out.println( - "total writes:" + this.writeCount.get() + " unbatched:" + this.unBatchedCount.get() + "batchPoints:" - + this.batchedCount); + batchEnabled.set(false); + batchProcessor.flush(); + if (logLevel != LogLevel.NONE) { + System.out.println(String.format("Total writes:%d Unbatched:%d Batched:%d", + writeCount.get(), unBatchedCount.get(), batchedCount.get())); } } @@ -111,7 +148,7 @@ public boolean isBatchEnabled() { @Override public Pong ping() { Stopwatch watch = Stopwatch.createStarted(); - Response response = this.influxDBService.ping(); + Response response = influxDBService.ping(); List
headers = response.getHeaders(); String version = "unknown"; for (Header header : headers) { @@ -132,31 +169,50 @@ public String version() { @Override public void write(final String database, final String retentionPolicy, final Point point) { - if (this.batchEnabled.get()) { - BatchEntry batchEntry = new BatchEntry(point, database, retentionPolicy); - this.batchProcessor.put(batchEntry); + write(database, retentionPolicy, ConsistencyLevel.ONE, point); + } + + @Override + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final Point point) { + if (batchEnabled.get()) { + batchProcessor.put(database, retentionPolicy, consistencyLevel, point); } else { - BatchPoints batchPoints = BatchPoints.database(database).retentionPolicy(retentionPolicy).build(); - batchPoints.point(point); - this.write(batchPoints); - this.unBatchedCount.incrementAndGet(); + writeUnbatched(database, retentionPolicy, ConsistencyLevel.ONE, point); } - this.writeCount.incrementAndGet(); } - + @Override - public void write(final BatchPoints batchPoints) { - this.batchedCount.addAndGet(batchPoints.getPoints().size()); - TypedString lineProtocol = new TypedString(batchPoints.lineProtocol()); - this.influxDBService.writePoints( - this.username, - this.password, - batchPoints.getDatabase(), - batchPoints.getRetentionPolicy(), + public void write(BatchPoints batchPoints) { + write(batchPoints.getDatabase(), batchPoints.getRetentionPolicy(), ConsistencyLevel.ONE, batchPoints.getPoints()); + } + + @Override + public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final List points) { + writeBatched(database, retentionPolicy, consistencyLevel, points); + } + + protected void writeBatched(final String database, final String retentionPolicy, final ConsistencyLevel consistencyLevel, final List points) { + batchedCount.addAndGet(points.size()); + writeCount.addAndGet(points.size()); + writeLine(database, retentionPolicy, consistencyLevel, Point.toLineProtocol(points)); + } + + protected void writeUnbatched(String database, String retentionPolicy, ConsistencyLevel consistencyLevel, Point point) { + unBatchedCount.incrementAndGet(); + writeCount.incrementAndGet(); + writeLine(database, retentionPolicy, consistencyLevel, point.lineProtocol()); + } + + private void writeLine(String database, String retentionPolicy, ConsistencyLevel consistency, String line) { + TypedString lineProtocol = new TypedString(line); + influxDBService.writePoints( + username, + password, + database, + retentionPolicy, TimeUtil.toTimePrecision(TimeUnit.NANOSECONDS), - batchPoints.getConsistency().value(), + consistency.value(), lineProtocol); - } @Override @@ -170,19 +226,14 @@ public void write(final String database, final String retentionPolicy, final Con consistency.value(), new TypedString(records)); } - @Override - public void write(final String database, final String retentionPolicy, final ConsistencyLevel consistency, final List records) { - final String joinedRecords = Joiner.on("\n").join(records); - write(database, retentionPolicy, consistency, joinedRecords); - } /** * {@inheritDoc} */ @Override public QueryResult query(final Query query) { - QueryResult response = this.influxDBService - .query(this.username, this.password, query.getDatabase(), query.getCommand()); + QueryResult response = influxDBService + .query(username, password, query.getDatabase(), query.getCommand()); return response; } @@ -191,8 +242,8 @@ public QueryResult query(final Query query) { */ @Override public QueryResult query(final Query query, final TimeUnit timeUnit) { - QueryResult response = this.influxDBService - .query(this.username, this.password, query.getDatabase(), TimeUtil.toTimePrecision(timeUnit) , query.getCommand()); + QueryResult response = influxDBService + .query(username, password, query.getDatabase(), TimeUtil.toTimePrecision(timeUnit) , query.getCommand()); return response; } @@ -218,7 +269,7 @@ public void deleteDatabase(final String name) { */ @Override public List describeDatabases() { - QueryResult result = this.influxDBService.query(this.username, this.password, "SHOW DATABASES"); + QueryResult result = influxDBService.query(username, password, "SHOW DATABASES"); // {"results":[{"series":[{"name":"databases","columns":["name"],"values":[["mydb"]]}]}]} // Series [name=databases, columns=[name], values=[[mydb], [unittest_1433605300968]]] List> databaseNames = result.getResults().get(0).getSeries().get(0).getValues(); @@ -231,4 +282,24 @@ public List describeDatabases() { return databases; } + public int getBufferedCount() { + if (batchEnabled.get()) { + return batchProcessor.getBufferedCount(); + } + + return 0; + } + + @Override + public Optional peekFirstBuffered() { + if (batchEnabled.get()) { + Optional point = batchProcessor.peekFirstBuffered(); + + if (point.isPresent()) { + return Optional.of(point.get()); + } + } + + return Optional.absent(); + } } diff --git a/src/test/java/org/influxdb/InfluxDBTest.java b/src/test/java/org/influxdb/InfluxDBTest.java index 5ac8e0076..c21ab3cc1 100644 --- a/src/test/java/org/influxdb/InfluxDBTest.java +++ b/src/test/java/org/influxdb/InfluxDBTest.java @@ -7,8 +7,8 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.InfluxDB.LogLevel; -import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; import org.influxdb.dto.Query; @@ -22,6 +22,8 @@ import com.github.dockerjava.api.command.CreateContainerResponse; import com.github.dockerjava.core.DockerClientBuilder; import com.github.dockerjava.core.DockerClientConfig; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; /** * Test the InfluxDB API. @@ -165,9 +167,8 @@ public void testDescribeDatabases() { @Test(enabled = true) public void testWrite() { String dbName = "write_unittest_" + System.currentTimeMillis(); - this.influxDB.createDatabase(dbName); + influxDB.createDatabase(dbName); - BatchPoints batchPoints = BatchPoints.database(dbName).tag("async", "true").retentionPolicy("default").build(); Point point1 = Point .measurement("cpu") .tag("atag", "test") @@ -176,13 +177,11 @@ public void testWrite() { .addField("system", 1L) .build(); Point point2 = Point.measurement("disk").tag("atag", "test").addField("used", 80L).addField("free", 1L).build(); - batchPoints.point(point1); - batchPoints.point(point2); - this.influxDB.write(batchPoints); + influxDB.write(dbName, "default", ConsistencyLevel.ONE, Lists.newArrayList(point1, point2)); Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); QueryResult result = this.influxDB.query(query); Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty()); - this.influxDB.deleteDatabase(dbName); + influxDB.deleteDatabase(dbName); } /** @@ -227,11 +226,12 @@ public void testWriteMultipleStringDataLines() { String dbName = "write_unittest_" + System.currentTimeMillis(); this.influxDB.createDatabase(dbName); - this.influxDB.write(dbName, "default", InfluxDB.ConsistencyLevel.ONE, Arrays.asList( + final String joinedRecords = Joiner.on("\n").join(Arrays.asList( "cpu,atag=test1 idle=100,usertime=10,system=1", "cpu,atag=test2 idle=200,usertime=20,system=2", - "cpu,atag=test3 idle=300,usertime=30,system=3" - )); + "cpu,atag=test3 idle=300,usertime=30,system=3")); + + this.influxDB.write(dbName, "default", InfluxDB.ConsistencyLevel.ONE, joinedRecords); Query query = new Query("SELECT * FROM cpu GROUP BY *", dbName); QueryResult result = this.influxDB.query(query); diff --git a/src/test/java/org/influxdb/PerformanceTests.java b/src/test/java/org/influxdb/PerformanceTests.java index c92c2183a..bb671c055 100644 --- a/src/test/java/org/influxdb/PerformanceTests.java +++ b/src/test/java/org/influxdb/PerformanceTests.java @@ -1,9 +1,11 @@ package org.influxdb; +import java.util.List; import java.util.concurrent.TimeUnit; +import org.assertj.core.util.Lists; +import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.InfluxDB.LogLevel; -import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -25,34 +27,30 @@ public void setUp() { @Test(threadPoolSize = 10, enabled = false) public void writeSinglePointPerformance() throws InterruptedException { String dbName = "write_" + System.currentTimeMillis(); - this.influxDB.createDatabase(dbName); - this.influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS); + influxDB.createDatabase(dbName); + influxDB.enableBatch(2000, 100, TimeUnit.MILLISECONDS); Stopwatch watch = Stopwatch.createStarted(); for (int j = 0; j < SINGLE_POINT_COUNT; j++) { Point point = Point.measurement("cpu") .addField("idle", (double) j) .addField("user", 2.0 * j) .addField("system", 3.0 * j).build(); - this.influxDB.write(dbName, "default", point); + influxDB.write(dbName, "default", ConsistencyLevel.ONE, point); } - this.influxDB.disableBatch(); + influxDB.disableBatch(); System.out.println("Single Point Write for " + SINGLE_POINT_COUNT + " writes of Points took:" + watch); - this.influxDB.deleteDatabase(dbName); + influxDB.deleteDatabase(dbName); } @Test(enabled = false) public void writePerformance() { String dbName = "writepoints_" + System.currentTimeMillis(); - this.influxDB.createDatabase(dbName); + influxDB.createDatabase(dbName); Stopwatch watch = Stopwatch.createStarted(); for (int i = 0; i < COUNT; i++) { - BatchPoints batchPoints = BatchPoints - .database(dbName) - .tag("blubber", "bla") - .retentionPolicy("default") - .build(); + List points = Lists.newArrayList(); for (int j = 0; j < POINT_COUNT; j++) { Point point = Point .measurement("cpu") @@ -60,27 +58,27 @@ public void writePerformance() { .addField("user", 2.0 * j) .addField("system", 3.0 * j) .build(); - batchPoints.point(point); + points.add(point); } - this.influxDB.write(batchPoints); + influxDB.write(dbName, "default", ConsistencyLevel.ONE, points); } System.out.println("WritePoints for " + COUNT + " writes of " + POINT_COUNT + " Points took:" + watch); - this.influxDB.deleteDatabase(dbName); + influxDB.deleteDatabase(dbName); } @Test(enabled = true) public void maxWritePointsPerformance() { String dbName = "d"; - this.influxDB.createDatabase(dbName); - this.influxDB.enableBatch(100000, 60, TimeUnit.SECONDS); + influxDB.createDatabase(dbName); + influxDB.enableBatch(100000, 60, TimeUnit.SECONDS); Stopwatch watch = Stopwatch.createStarted(); for (int i = 0; i < 2000000; i++) { Point point = Point.measurement("s").addField("v", 1.0).build(); - this.influxDB.write(dbName, "default", point); + influxDB.write(dbName, "default", ConsistencyLevel.ONE, point); } System.out.println("5Mio points:" + watch); - this.influxDB.deleteDatabase(dbName); + influxDB.deleteDatabase(dbName); } } diff --git a/src/test/java/org/influxdb/TicketTests.java b/src/test/java/org/influxdb/TicketTests.java index 338db6287..d7ad7b580 100644 --- a/src/test/java/org/influxdb/TicketTests.java +++ b/src/test/java/org/influxdb/TicketTests.java @@ -5,13 +5,14 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.InfluxDB.LogLevel; -import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import org.testng.collections.Lists; import com.github.dockerjava.api.DockerClient; import com.github.dockerjava.api.command.CreateContainerResponse; @@ -49,7 +50,7 @@ public void setUp() throws InterruptedException, IOException { .withPassword("root") .build(); this.dockerClient = DockerClientBuilder.getInstance(config).build(); - String ip = "127.0.0.1"; + String ip = "192.168.59.103"; this.influxDB = InfluxDBFactory.connect("http://" + ip + ":8086", "root", "root"); boolean influxDBstarted = false; do { @@ -101,7 +102,7 @@ public void testTicket38() { .tag("host", "host-\"C") .tag("region", "region") .build(); - this.influxDB.write(dbName, "default", point1); + this.influxDB.write(dbName, "default", ConsistencyLevel.ONE, point1); this.influxDB.deleteDatabase(dbName); } @@ -113,17 +114,12 @@ public void testTicket38() { public void testTicket39() { String dbName = "ticket39_" + System.currentTimeMillis(); this.influxDB.createDatabase(dbName); - BatchPoints batchPoints = BatchPoints - .database(dbName) + + Point point = Point.measurement("my_type") + .field("my_field", "string_value") .tag("async", "true") - .retentionPolicy("default") - .consistency(InfluxDB.ConsistencyLevel.ALL) .build(); - Point.Builder builder = Point.measurement("my_type"); - builder.addField("my_field", "string_value"); - Point point = builder.build(); - batchPoints.point(point); - this.influxDB.write(batchPoints); + this.influxDB.write(dbName, "default", ConsistencyLevel.ALL, Lists.newArrayList(point)); this.influxDB.deleteDatabase(dbName); } @@ -136,8 +132,8 @@ public void testTicket40() { this.influxDB.createDatabase(dbName); this.influxDB.enableBatch(100, 100, TimeUnit.MICROSECONDS); for (int i = 0; i < 1000; i++) { - Point point = Point.measurement("cpu").addField("idle", 99.0).build(); - this.influxDB.write(dbName, "default", point); + Point point = Point.measurement("cpu").field("idle", 99).build(); + this.influxDB.write(dbName, "default", ConsistencyLevel.ONE, point); } this.influxDB.deleteDatabase(dbName); } diff --git a/src/test/java/org/influxdb/dto/PointTest.java b/src/test/java/org/influxdb/dto/PointTest.java index 60bf052b2..e7b964894 100644 --- a/src/test/java/org/influxdb/dto/PointTest.java +++ b/src/test/java/org/influxdb/dto/PointTest.java @@ -9,6 +9,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.assertj.core.util.Lists; import org.testng.annotations.Test; import com.google.common.collect.Maps; @@ -80,23 +81,17 @@ public void testTicket44() { point = Point.measurement("test").time(1, TimeUnit.MILLISECONDS).addField("a", 1.0).build(); assertThat(point.lineProtocol()).asString().isEqualTo("test a=1.0 1000000"); - point = Point.measurement("test").time(1, TimeUnit.NANOSECONDS).addField("a", 1.0).build(); - BatchPoints batchPoints = BatchPoints.database("db").point(point).build(); - assertThat(batchPoints.lineProtocol()).asString().isEqualTo("test a=1.0 1\n"); + point = Point.measurement("test").time(1, TimeUnit.NANOSECONDS).field("a", 1).build(); + assertThat(Point.toLineProtocol(Lists.newArrayList(point))).asString().isEqualTo("test a=1.0 1\n"); - point = Point.measurement("test").time(1, TimeUnit.MICROSECONDS).addField("a", 1.0).build(); - batchPoints = BatchPoints.database("db").point(point).build(); - assertThat(batchPoints.lineProtocol()).asString().isEqualTo("test a=1.0 1000\n"); + point = Point.measurement("test").time(1, TimeUnit.MICROSECONDS).field("a", 1).build(); + assertThat(Point.toLineProtocol(Lists.newArrayList(point))).asString().isEqualTo("test a=1.0 1000\n"); - point = Point.measurement("test").time(1, TimeUnit.MILLISECONDS).addField("a", 1.0).build(); - batchPoints = BatchPoints.database("db").point(point).build(); - assertThat(batchPoints.lineProtocol()).asString().isEqualTo("test a=1.0 1000000\n"); - - point = Point.measurement("test").addField("a", 1.0).time(1, TimeUnit.MILLISECONDS).build(); - batchPoints = BatchPoints.database("db").build(); - batchPoints = batchPoints.point(point); - assertThat(batchPoints.lineProtocol()).asString().isEqualTo("test a=1.0 1000000\n"); + point = Point.measurement("test").time(1, TimeUnit.MILLISECONDS).field("a", 1).build(); + assertThat(Point.toLineProtocol(Lists.newArrayList(point))).asString().isEqualTo("test a=1.0 1000000\n"); + point = Point.measurement("test").field("a", 1).time(1, TimeUnit.MILLISECONDS).build(); + assertThat(Point.toLineProtocol(Lists.newArrayList(point))).asString().isEqualTo("test a=1.0 1000000\n"); } /** diff --git a/src/test/java/org/influxdb/impl/BatchProcessorTest.java b/src/test/java/org/influxdb/impl/BatchProcessorTest.java index 41f88cf7a..aa4d22068 100644 --- a/src/test/java/org/influxdb/impl/BatchProcessorTest.java +++ b/src/test/java/org/influxdb/impl/BatchProcessorTest.java @@ -1,41 +1,333 @@ package org.influxdb.impl; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - +import java.util.List; import java.util.concurrent.TimeUnit; -import org.influxdb.InfluxDB; -import org.influxdb.dto.BatchPoints; +import org.influxdb.InfluxDB.BufferFailBehaviour; +import org.influxdb.InfluxDB.ConsistencyLevel; import org.influxdb.dto.Point; +import org.testng.Assert; import org.testng.annotations.Test; +import com.squareup.okhttp.OkHttpClient; + +import retrofit.client.OkClient; + public class BatchProcessorTest { + private static class AnonInfluxDBImpl extends InfluxDBImpl { + private static final String FAIL_DATABASE = "fail_db"; + private final boolean throwErrorOnWriteBatched; + private int writeCalled = 0; + + public AnonInfluxDBImpl(final boolean throwErrorOnWriteBatched) { + super("temp", "user", "pass", new OkClient(new OkHttpClient())); + this.throwErrorOnWriteBatched = throwErrorOnWriteBatched; + } + + @Override + protected void writeBatched(String database, String retentionPolicy, ConsistencyLevel consistencyLevel, + List points) { + writeCalled++; + if (throwErrorOnWriteBatched) { + throw new RuntimeException("Anon error"); + } + + if (FAIL_DATABASE.equals(database)) { + throw new RuntimeException("Will not write to fail db"); + } + } + + @Override + protected void writeUnbatched(String database, String retentionPolicy, ConsistencyLevel consistencyLevel, + Point point) { + } + } + + private static class QueueDepthRecordingDBImpl extends InfluxDBImpl { + private int queueDepth = 0; + private int writeCalled = 0; + + public QueueDepthRecordingDBImpl() { + super("temp", "user", "pass", new OkClient(new OkHttpClient())); + } + + @Override + protected void writeBatched(String database, String retentionPolicy, + ConsistencyLevel consistencyLevel, List points) { + writeCalled++; + queueDepth = getBufferedCount(); + } + + public int getQueueDepth() { + return queueDepth; + } - @Test - public void testSchedulerExceptionHandling() throws InterruptedException { - InfluxDB mockInfluxDB = mock(InfluxDBImpl.class); - BatchProcessor batchProcessor = BatchProcessor.builder(mockInfluxDB).actions(Integer.MAX_VALUE) - .interval(1, TimeUnit.NANOSECONDS).build(); + public BatchProcessor getBatchProcessor() { + return super.getBatchProcessor(); + } + } - doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class)); + private static class NonScheduledWriteBatchProcessor extends BatchProcessor { - Point point = Point.measurement("cpu").field("6", "").build(); - BatchProcessor.BatchEntry batchEntry1 = new BatchProcessor.BatchEntry(point, "db1", ""); - BatchProcessor.BatchEntry batchEntry2 = new BatchProcessor.BatchEntry(point, "db2", ""); + NonScheduledWriteBatchProcessor(InfluxDBImpl influxDB, int actions, TimeUnit flushIntervalUnit, + int flushInterval, Integer capacity, BufferFailBehaviour behaviour, boolean discardOnFailedWrite, + int maxBatchWriteSize) { + super(influxDB, actions, flushIntervalUnit, flushInterval, 5*flushInterval, capacity, behaviour, discardOnFailedWrite, + maxBatchWriteSize); + } + @Override + void writeNow() { + attemptWrite(); + } + } + private static Point getAnonPoint() { + return getPoint("anon"); + } + + private static Point getPoint(String measurement) { + return Point.measurement(measurement) + .field("field", "value").build(); + } + + private static AnonInfluxDBImpl getAnonInfluxDB() { + return new AnonInfluxDBImpl(false); + } + + private static AnonInfluxDBImpl getErrorThrowingDB() { + return new AnonInfluxDBImpl(true); + } + + private static QueueDepthRecordingDBImpl getQueueDepthRecordingDBImpl() { + return new QueueDepthRecordingDBImpl(); + } + + private final String ANON_DB = "db"; + private final String ANON_RETENTION = "default"; + private final ConsistencyLevel ANON_CONSISTENCY = ConsistencyLevel.ONE; - batchProcessor.put(batchEntry1); - Thread.sleep(200); // wait for scheduler + @Test(expectedExceptions={IllegalArgumentException.class}) + public void cannotBuildWithActionsGreaterThanCapacity() { + BatchProcessor.builder(getAnonInfluxDB()) + .capacityAndActions(1, 2) + .build(); + } + + @Test(expectedExceptions={IllegalStateException.class}) + public void itThrowsExceptionWhenQueueAtCapacityAndBehaviourIsThrowException() { + BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) + .interval(1, 2, TimeUnit.DAYS) + .capacityAndActions(1, 1) + .discardOnFailedWrite(false) + .behaviour(BufferFailBehaviour.THROW_EXCEPTION) + .build(); + + boolean putResult; + putResult = subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertTrue(putResult); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + } + + @Test + public void itEvictsTheOldestWhenQueueAtCapacityAndBehaviourIsDropOldest() { + BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) + .interval(1, 2, TimeUnit.DAYS) + .capacityAndActions(1, 1) + .discardOnFailedWrite(false) + .behaviour(BufferFailBehaviour.DROP_OLDEST) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure1"); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure2"); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure3"); + + } + + @Test + public void itDoesNotInsertIfQueueAtCapcityeAndBehaviourIsDropCurrentAndKeppOnFailedWrite() { + BatchProcessor subject = new NonScheduledWriteBatchProcessor(getErrorThrowingDB(), 1, TimeUnit.SECONDS, 1, 1, BufferFailBehaviour.DROP_CURRENT, false, 50); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); +// subject.attemptWrite(); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure1"); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure1"); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure1"); + + } + + @Test(expectedExceptions = { IllegalArgumentException.class }) + public void cannotBeBuiltWithDropOldestBehaviourAndWithoutCapacityLimit() { + BatchProcessor.builder(getAnonInfluxDB()) + .interval(1, 2, TimeUnit.DAYS) + .behaviour(BufferFailBehaviour.DROP_OLDEST) + .build(); + + } + + @Test + public void itRemovesPointsFromQueueAfterSuccessfulWrite() { + BatchProcessor subject = BatchProcessor.builder(getAnonInfluxDB()) + .interval(1, 2, TimeUnit.DAYS) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 1); + subject.write(); + Assert.assertEquals(subject.queue.size(), 0); + } + + @Test + public void keepOnFailedWriteProcessorRetainsPointsAfterExceptionThrown() { + BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) + .interval(1, 2, TimeUnit.DAYS) + .discardOnFailedWrite(false) + .build(); + + Point point = getAnonPoint(); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, point); + Assert.assertEquals(subject.queue.size(), 1); + subject.write(); + Assert.assertEquals(subject.queue.size(), 1); + // TODO this is bad, Law of Demeter violation! + Assert.assertEquals(subject.queue.peek().getPoint(), point); + } + + @Test + public void discardOnFailedWriteProcessorDropsPointsAfterExceptionThrown() { + BatchProcessor subject = BatchProcessor.builder(getErrorThrowingDB()) + .interval(1, 2, TimeUnit.DAYS) + .discardOnFailedWrite(true) + .build(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 1); + subject.write(); + Assert.assertEquals(subject.queue.size(), 0); + } + + @Test + public void writeCalledAfterActionsReached() { + AnonInfluxDBImpl influxDb = getAnonInfluxDB(); +// BatchProcessor subject = BatchProcessor.builder(influxDb) +// .interval(1, TimeUnit.DAYS) +// .actions(2) +// .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 2, TimeUnit.DAYS, 1, null, + BufferFailBehaviour.THROW_EXCEPTION, false, 50); - // first try throws an exception - verify(mockInfluxDB, times(1)).write(any(BatchPoints.class)); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 1); + Assert.assertEquals(influxDb.writeCalled, 0); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 0); + Assert.assertEquals(influxDb.writeCalled, 1); + } + + @Test + public void writeNotCascadedAfterWriteFailure() { + AnonInfluxDBImpl influxDb = getErrorThrowingDB(); +// BatchProcessor subject = BatchProcessor.builder(influxDb) +// .interval(1, TimeUnit.DAYS) +// .capacityAndActions(3, 1) +// .discardOnFailedWrite(false) +// .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 1, TimeUnit.DAYS, 1, 3, + BufferFailBehaviour.THROW_EXCEPTION, false, 50); - batchProcessor.put(batchEntry2); - Thread.sleep(200); // wait for scheduler - // without try catch the 2nd time does not occur - verify(mockInfluxDB, times(2)).write(any(BatchPoints.class)); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 1); + Assert.assertEquals(influxDb.writeCalled, 1); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 2); + Assert.assertEquals(influxDb.writeCalled, 1); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getAnonPoint()); + Assert.assertEquals(subject.queue.size(), 3); + Assert.assertEquals(influxDb.writeCalled, 1); } + + @Test + public void successfullyWrittenPointsAreNotReturnedToQueue() { + AnonInfluxDBImpl influxDb = getAnonInfluxDB(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 3, TimeUnit.DAYS, 1, 3, BufferFailBehaviour.DROP_CURRENT, false, 50); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); + subject.put(AnonInfluxDBImpl.FAIL_DATABASE, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); + Assert.assertEquals(influxDb.writeCalled, 2); // Once for ANON_TB, once for FAIL_DATABASE + Assert.assertEquals(subject.queue.size(), 1); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure3"); + } + + @Test + public void unsuccessfullyWrittenPointsAreReturnedToQueueInCorrectOrder() { + AnonInfluxDBImpl influxDb = getErrorThrowingDB(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 4, TimeUnit.DAYS, 1, 4, BufferFailBehaviour.THROW_EXCEPTION, false, 50); + + subject.put("db1", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted1")); + subject.put("db2", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted2")); + subject.put("db2", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted3")); + subject.put("db1", ANON_RETENTION, ANON_CONSISTENCY, getPoint("inserted4")); + + Assert.assertEquals(influxDb.writeCalled, 2); // Once for db1, once for db2 + Assert.assertEquals(subject.queue.size(), 4); + Assert.assertEquals(subject.queue.peekFirst().getPoint().getMeasurement(), "inserted1"); + Assert.assertEquals(subject.queue.peekLast().getPoint().getMeasurement(), "inserted4"); + } + + @Test + public void writeOnlyAttemptsUpToMaxBatchWrite() { + AnonInfluxDBImpl influxDb = getAnonInfluxDB(); +// BatchProcessor subject = BatchProcessor.builder(influxDb) +// .interval(1, TimeUnit.DAYS) +// .capacityAndActions(3, 3) +// .maxBatchWriteSize(2) +// .discardOnFailedWrite(false) +// .build(); + BatchProcessor subject = new NonScheduledWriteBatchProcessor(influxDb, 3, TimeUnit.DAYS, 1, 3, + BufferFailBehaviour.THROW_EXCEPTION, false, 2); + + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); + + Assert.assertEquals(influxDb.writeCalled, 1); + Assert.assertEquals(subject.queue.size(), 1); + Assert.assertEquals(subject.queue.peek().getPoint().getMeasurement(), "measure3"); + } + + @Test + public void testGetBufferedCountWorksInTheMiddleOfAWrite() { + QueueDepthRecordingDBImpl influxDb = getQueueDepthRecordingDBImpl(); + int capacity = 5; + int flushActions = 5; + int flushIntervalMin = 5; + int flushIntervalMax = 2* flushIntervalMin; + boolean discardOnFailedWrite = false; + int maxBatchWriteSize = 5; + influxDb.enableBatch(capacity, flushActions, flushIntervalMin, flushIntervalMax, + TimeUnit.SECONDS, BufferFailBehaviour.THROW_EXCEPTION, discardOnFailedWrite, maxBatchWriteSize); + BatchProcessor subject = influxDb.getBatchProcessor(); + + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure1")); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure2")); + subject.put(ANON_DB, ANON_RETENTION, ANON_CONSISTENCY, getPoint("measure3")); + Assert.assertEquals(subject.queue.size(), 3); + Assert.assertEquals(subject.getBufferedCount(), 3); + + subject.write(); + Assert.assertEquals(influxDb.writeCalled, 1); + Assert.assertEquals(influxDb.getQueueDepth(), 3); + Assert.assertEquals(subject.queue.size(), 0); + Assert.assertEquals(subject.getBufferedCount(), 0); + } }