Skip to content

Commit

Permalink
refactor code for UDP batch support
Browse files Browse the repository at this point in the history
  • Loading branch information
jiafu1115 committed Nov 10, 2016
1 parent 739929d commit 2bfd299
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 24 deletions.
63 changes: 43 additions & 20 deletions src/main/java/org/influxdb/impl/BatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public class BatchProcessor {

private static final Logger LOG = Logger.getLogger(BatchProcessor.class.getName());
protected final BlockingQueue<BatchEntry> queue = new LinkedBlockingQueue<>();
protected final BlockingQueue<AbstractBatchEntry> queue = new LinkedBlockingQueue<>();
private final ScheduledExecutorService scheduler;
final InfluxDBImpl influxDB;
final int actions;
Expand Down Expand Up @@ -107,22 +107,28 @@ public BatchProcessor build() {
}
}

static class BatchEntry {
private final Point point;
abstract static class AbstractBatchEntry {
private final Point point;

public AbstractBatchEntry(final Point point) {
this.point = point;
}

public Point getPoint() {
return this.point;
}
}

static class HttpBatchEntry extends AbstractBatchEntry {
private final String db;
private final String rp;

public BatchEntry(final Point point, final String db, final String rp) {
super();
this.point = point;
public HttpBatchEntry(final Point point, final String db, final String rp) {
super(point);
this.db = db;
this.rp = rp;
}

public Point getPoint() {
return this.point;
}

public String getDb() {
return this.db;
}
Expand All @@ -132,6 +138,19 @@ public String getRp() {
}
}

static class UdpBatchEntry extends AbstractBatchEntry {
private final int udpPort;

public UdpBatchEntry(final Point point, final int udpPort) {
super(point);
this.udpPort = udpPort;
}

public int getUdpPort() {
return this.udpPort;
}
}

/**
* Static method to create the Builder for this BatchProcessor.
*
Expand Down Expand Up @@ -168,18 +187,22 @@ void write() {
}

Map<String, BatchPoints> databaseToBatchPoints = Maps.newHashMap();
List<BatchEntry> batchEntries = new ArrayList<>(this.queue.size());
List<AbstractBatchEntry> batchEntries = new ArrayList<>(this.queue.size());
this.queue.drainTo(batchEntries);

for (BatchEntry batchEntry : batchEntries) {
String dbName = batchEntry.getDb();
if (!databaseToBatchPoints.containsKey(dbName)) {
BatchPoints batchPoints = BatchPoints.database(dbName)
.retentionPolicy(batchEntry.getRp()).build();
databaseToBatchPoints.put(dbName, batchPoints);
for (AbstractBatchEntry batchEntry : batchEntries) {
if (batchEntry instanceof HttpBatchEntry) {
HttpBatchEntry httpBatchEntry = HttpBatchEntry.class.cast(batchEntry);
String dbName = httpBatchEntry.getDb();
if (!databaseToBatchPoints.containsKey(dbName)) {
BatchPoints batchPoints = BatchPoints.database(dbName)
.retentionPolicy(httpBatchEntry.getRp()).build();
databaseToBatchPoints.put(dbName, batchPoints);
}
Point point = batchEntry.getPoint();
databaseToBatchPoints.get(dbName).point(point);
}
Point point = batchEntry.getPoint();
databaseToBatchPoints.get(dbName).point(point);

}

for (BatchPoints batchPoints : databaseToBatchPoints.values()) {
Expand All @@ -197,7 +220,7 @@ void write() {
* @param batchEntry
* the batchEntry to write to the cache.
*/
void put(final BatchEntry batchEntry) {
void put(final AbstractBatchEntry batchEntry) {
this.queue.add(batchEntry);
if (this.queue.size() >= this.actions) {
this.scheduler.submit(new Runnable() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.influxdb.dto.Pong;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.influxdb.impl.BatchProcessor.BatchEntry;
import org.influxdb.impl.BatchProcessor.HttpBatchEntry;
import okhttp3.Headers;
import okhttp3.HttpUrl;
import okhttp3.MediaType;
Expand Down Expand Up @@ -180,7 +180,7 @@ public String version() {
@Override
public void write(final String database, final String retentionPolicy, final Point point) {
if (this.batchEnabled.get()) {
BatchEntry batchEntry = new BatchEntry(point, database, retentionPolicy);
HttpBatchEntry batchEntry = new HttpBatchEntry(point, database, retentionPolicy);
this.batchProcessor.put(batchEntry);
} else {
BatchPoints batchPoints = BatchPoints.database(database)
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/org/influxdb/impl/BatchProcessorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public void testSchedulerExceptionHandling() throws InterruptedException, IOExce
doThrow(new RuntimeException()).when(mockInfluxDB).write(any(BatchPoints.class));

Point point = Point.measurement("cpu").field("6", "").build();
BatchProcessor.BatchEntry batchEntry1 = new BatchProcessor.BatchEntry(point, "db1", "");
BatchProcessor.BatchEntry batchEntry2 = new BatchProcessor.BatchEntry(point, "db2", "");
BatchProcessor.HttpBatchEntry batchEntry1 = new BatchProcessor.HttpBatchEntry(point, "db1", "");
BatchProcessor.HttpBatchEntry batchEntry2 = new BatchProcessor.HttpBatchEntry(point, "db2", "");

batchProcessor.put(batchEntry1);
Thread.sleep(200); // wait for scheduler
Expand Down

0 comments on commit 2bfd299

Please sign in to comment.