Skip to content

Commit

Permalink
Merge pull request #239 from jiafu1115/patch-21
Browse files Browse the repository at this point in the history
prepare code changes to support UDP async
  • Loading branch information
majst01 authored Nov 12, 2016
2 parents 0b7a493 + 6355973 commit 359a9b6
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 26 deletions.
63 changes: 43 additions & 20 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class BatchProcessor {

private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
protected final BlockingQueue<BatchEntry> queue = new LinkedBlockingQueue<>();
protected final BlockingQueue<AbstractBatchEntry> queue = new LinkedBlockingQueue<>();
private final ScheduledExecutorService scheduler;
final InfluxDBImpl influxDB;
final int actions;
Expand Down Expand Up @@ -107,22 +107,28 @@ public BatchProcessor build() {
}
}

static class BatchEntry {
private final Point point;
abstract static class AbstractBatchEntry {
private final Point point;

public AbstractBatchEntry(final Point point) {
this.point = point;
}

public Point getPoint() {
return this.point;
}
}

static class HttpBatchEntry extends AbstractBatchEntry {
private final String db;
private final String rp;

public BatchEntry(final Point point, final String db, final String rp) {
super();
this.point = point;
public HttpBatchEntry(final Point point, final String db, final String rp) {
super(point);
this.db = db;
this.rp = rp;
}

public Point getPoint() {
return this.point;
}

public String getDb() {
return this.db;
}
Expand All @@ -132,6 +138,19 @@ public String getRp() {
}
}

static class UdpBatchEntry extends AbstractBatchEntry {
private final int udpPort;

public UdpBatchEntry(final Point point, final int udpPort) {
super(point);
this.udpPort = udpPort;
}

public int getUdpPort() {
return this.udpPort;
}
}

/**
* Static method to create the Builder for this BatchProcessor.
*
Expand Down Expand Up @@ -168,18 +187,22 @@ void write() {
}

Map<String, BatchPoints> databaseToBatchPoints = Maps.newHashMap();
List<BatchEntry> batchEntries = new ArrayList<>(this.queue.size());
List<AbstractBatchEntry> batchEntries = new ArrayList<>(this.queue.size());
this.queue.drainTo(batchEntries);

for (BatchEntry batchEntry : batchEntries) {
String dbName = batchEntry.getDb();
if (!databaseToBatchPoints.containsKey(dbName)) {
BatchPoints batchPoints = BatchPoints.database(dbName)
.retentionPolicy(batchEntry.getRp()).build();
databaseToBatchPoints.put(dbName, batchPoints);
for (AbstractBatchEntry batchEntry : batchEntries) {
if (batchEntry instanceof HttpBatchEntry) {
HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry);
String dbName = httpBatchEntry.getDb();
if (!databaseToBatchPoints.containsKey(dbName)) {
BatchPoints batchPoints = BatchPoints.database(dbName)
.retentionPolicy(httpBatchEntry.getRp()).build();
databaseToBatchPoints.put(dbName, batchPoints);
}
Point point = batchEntry.getPoint();
databaseToBatchPoints.get(dbName).point(point);
}
Point point = batchEntry.getPoint();
databaseToBatchPoints.get(dbName).point(point);

}

for (BatchPoints batchPoints : databaseToBatchPoints.values()) {
Expand All @@ -197,7 +220,7 @@ void write() {
* @param batchEntry
* the batchEntry to write to the cache.
*/
void put(final BatchEntry batchEntry) {
void put(final AbstractBatchEntry batchEntry) {
this.queue.add(batchEntry);
if (this.queue.size() >= this.actions) {
this.scheduler.submit(new Runnable() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.BatchProcessor.BatchEntry;
import org.influxdb.impl.BatchProcessor.HttpBatchEntry;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
Expand Down Expand Up @@ -208,7 +208,7 @@ public String version() {
@Override
public void write(final String database, final String retentionPolicy, final Point point) {
if (this.batchEnabled.get()) {
BatchEntry batchEntry = new BatchEntry(point, database, retentionPolicy);
HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy);
this.batchProcessor.put(batchEntry);
} else {
BatchPoints batchPoints = BatchPoints.database(database)
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ public void testWriteStringData() {
public void testWriteStringDataThroughUDP() {
String measurement = TestUtils.getRandomMeasurement();
this.influxDB.write(UDP_PORT, measurement + ",atag=test idle=90,usertime=9,system=1");
//write with UDP may be executed on server after query with HTTP. so sleep 1s to handle this case
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
//write with UDP may be executed on server after query with HTTP. so sleep 2s to handle this case
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
Query query = new Query("SELECT * FROM " + measurement + " GROUP BY *", UDP_DATABASE);
QueryResult result = this.influxDB.query(query);
Assert.assertFalse(result.getResults().get(0).getSeries().get(0).getTags().isEmpty());
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/influxdb/impl/BatchProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce
doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class));

Point point = Point.measurement("cpu").field("6", "").build();
BatchProcessor.BatchEntry batchEntry1 = new BatchProcessor.BatchEntry(point, "db1", "");
BatchProcessor.BatchEntry batchEntry2 = new BatchProcessor.BatchEntry(point, "db2", "");
BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "");
BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", "");

batchProcessor.put(batchEntry1);
Thread.sleep(200); // wait for scheduler
Expand Down

0 comments on commit 359a9b6

Please sign in to comment.