Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data Loss bugfix / Additional Features #177

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions src/main/java/org/influxdb/dto/BatchPoints.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package org.influxdb.dto;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.influxdb.InfluxDB.ConsistencyLevel;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import org.influxdb.InfluxDB.ConsistencyLevel;

import java.util.List;
import java.util.Map;

/**
* {Purpose of This Type}
Expand Down Expand Up @@ -97,11 +96,11 @@ public Builder point(final Point pointToAdd) {
/**
* Add a set of Points to this set of points.
*
* @param pointsToAdd
* @param pointList
* @return the Builder instance
*/
public Builder points(final Point... pointsToAdd) {
this.points.addAll(Arrays.asList(pointsToAdd));
public Builder points(List<Point> pointList) {
this.points.addAll(pointList);
return this;
}

Expand Down
68 changes: 52 additions & 16 deletions src/main/java/org/influxdb/dto/Point.java
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package org.influxdb.dto;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.escape.Escaper;
import com.google.common.escape.Escapers;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.text.NumberFormat;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.StringJoiner;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.escape.Escaper;
import com.google.common.escape.Escapers;

/**
* Representation of a InfluxDB database Point.
*
Expand Down Expand Up @@ -150,13 +151,43 @@ public Builder addField(String field, Number value) {
}

public Builder addField(final String field, final String value) {
if (value == null) {
throw new IllegalArgumentException("Field value cannot be null");
}

fields.put(field, value);
return this;
}


public Builder addField(final String field, final boolean value, final boolean activated) {
if (activated)
fields.put(field, value);
return this;
}

public Builder addField(final String field, final long value, final boolean activated) {
if (activated)
fields.put(field, value);
return this;
}

public Builder addField(final String field, final double value, final boolean activated) {
if (activated)
fields.put(field, value);
return this;
}

public Builder addField(String field, Number value, final boolean activated) {
if (activated)
fields.put(field, value);
return this;
}

public Builder addField(final String field, final String value, final boolean activated) {
if (activated)
fields.put(field, value);
return this;
}




/**
* Add a Map of fields to this point.
Expand Down Expand Up @@ -303,8 +334,8 @@ private StringBuilder concatenatedTags() {
return sb;
}

private StringBuilder concatenateFields() {
final StringBuilder sb = new StringBuilder();
private String concatenateFields() {
final StringJoiner joiner = new StringJoiner(",");
final int fieldCount = this.fields.size();
int loops = 0;

Expand All @@ -314,7 +345,7 @@ private StringBuilder concatenateFields() {
numberFormat.setMinimumFractionDigits(1);

for (Entry<String, Object> field : this.fields.entrySet()) {
loops++;
StringBuilder sb = new StringBuilder();
Object value = field.getValue();
if (value == null) {
continue;
Expand All @@ -323,6 +354,7 @@ private StringBuilder concatenateFields() {
sb.append(KEY_ESCAPER.escape(field.getKey())).append("=");
if (value instanceof String) {
String stringValue = (String) value;
stringValue = stringValue.replace("\\", "/");
sb.append("\"").append(FIELD_ESCAPER.escape(stringValue)).append("\"");
} else if (value instanceof Number) {
if (value instanceof Double || value instanceof Float || value instanceof BigDecimal) {
Expand All @@ -335,11 +367,15 @@ private StringBuilder concatenateFields() {
}

if (loops < fieldCount) {
sb.append(",");
joiner.add(sb.toString());
}
loops++;
}

return sb;
if (joiner.toString().length() == 0)
throw new IllegalArgumentException("There must be at least one field as an input that is not null");

return joiner.toString();
}

private StringBuilder formatedTime() {
Expand Down
26 changes: 12 additions & 14 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
package org.influxdb.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;

/**
* A BatchProcessor can be attached to a InfluxDB Instance to collect single point writes and
* aggregates them to BatchPoints to get a better write performance.
Expand All @@ -29,6 +24,7 @@ public class BatchProcessor {

private static final Logger logger = Logger.getLogger(BatchProcessor.class.getName());
protected final BlockingQueue<BatchEntry> queue = new LinkedBlockingQueue<>();
private final List<BatchEntry> batchEntries = new ArrayList<>();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
final InfluxDBImpl influxDB;
final int actions;
Expand Down Expand Up @@ -147,14 +143,15 @@ public void run() {

}

void write() {

synchronized void write() {
try {
batchEntries.clear();
if (this.queue.isEmpty()) {
return;
}

Map<String, BatchPoints> databaseToBatchPoints = Maps.newHashMap();
List<BatchEntry> batchEntries = new ArrayList<>(this.queue.size());
this.queue.drainTo(batchEntries);

for (BatchEntry batchEntry : batchEntries) {
Expand All @@ -170,6 +167,7 @@ void write() {
for (BatchPoints batchPoints : databaseToBatchPoints.values()) {
BatchProcessor.this.influxDB.write(batchPoints);
}

} catch (Throwable t) {
// any exception would stop the scheduler
logger.log(Level.SEVERE, "Batch could not be sent. Data will be lost", t);
Expand Down
22 changes: 8 additions & 14 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
package org.influxdb.impl;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.base.Joiner;
import org.influxdb.InfluxDB;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.BatchProcessor.BatchEntry;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;

import org.influxdb.InfluxDB;
import org.influxdb.dto.*;
import org.influxdb.impl.BatchProcessor.BatchEntry;
import retrofit.RestAdapter;
import retrofit.client.Client;
import retrofit.client.Header;
import retrofit.client.Response;
import retrofit.mime.TypedString;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
* Implementation of a InluxDB API.
*
Expand Down
38 changes: 34 additions & 4 deletions src/test/java/org/influxdb/dto/PointTest.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.influxdb.dto;

import static org.assertj.core.api.Assertions.assertThat;
import com.google.common.collect.Maps;
import org.testng.annotations.Test;

import java.math.BigDecimal;
import java.math.BigInteger;
Expand All @@ -9,9 +10,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.testng.annotations.Test;

import com.google.common.collect.Maps;
import static org.assertj.core.api.Assertions.assertThat;

/**
* Test for the Point DTO.
Expand Down Expand Up @@ -187,6 +186,37 @@ public void testIgnoreNullPointerValue() {

assertThat(point.lineProtocol()).asString().isEqualTo("nulltest,foo=bar field1=\"value1\",field3=1.0 1");
}

@Test
public void testIgnoreFieldNullValues(){
Number val = null;
Point point;
// Line Protocol Example: cpu,host=server02,region=uswest value=3 1434055562000010000
point = Point.measurement("test").time(1, TimeUnit.NANOSECONDS).tag("tagKey", "abc").addField("a", 1.0).build();
assertThat(point.lineProtocol()).asString().isEqualTo("test,tagKey=abc a=1.0 1");

point = Point.measurement("test").time(1, TimeUnit.NANOSECONDS).tag("tagKey", "abc").addField("a", 1.0).addField("b", val).addField("c", val).build();
assertThat(point.lineProtocol()).asString().isEqualTo("test,tagKey=abc a=1.0 1");

point = Point.measurement("test").time(1, TimeUnit.NANOSECONDS).tag("tagKey", "abc").addField("a", val).addField("b", 1.0).build();
assertThat(point.lineProtocol()).asString().isEqualTo("test,tagKey=abc b=1.0 1");

point = Point.measurement("test").time(1, TimeUnit.NANOSECONDS).tag("tagKey", "abc").addField("a", val).addField("b", 1.0).addField("c", val).build();
assertThat(point.lineProtocol()).asString().isEqualTo("test,tagKey=abc b=1.0 1");

point = Point.measurement("test").time(1, TimeUnit.NANOSECONDS).tag("tagKey", "abc").addField("a", 1.0).addField("b", val).addField("c", 1.0).build();
assertThat(point.lineProtocol()).asString().isEqualTo("test,tagKey=abc a=1.0,c=1.0 1");

point = Point.measurement("test").time(1, TimeUnit.NANOSECONDS).tag("tagKey", "abc").addField("a", 1.0).addField("b", val).addField("d", val).addField("c", 1.0).build();
assertThat(point.lineProtocol()).asString().isEqualTo("test,tagKey=abc a=1.0,c=1.0 1");

//Special Case if all fields given have null values
/*
point = Point.measurement("test").time(1, TimeUnit.NANOSECONDS).tag("tagKey", "abc").addField("a", val).addField("b", val).addField("d", val).addField("c", val).build();
assertThat(point.lineProtocol()).asString().isEqualTo("test,tagKey=abc 1");
*/

}

/**
* Tests for issue #110
Expand Down
Loading