Skip to content

Commit

Permalink
fix issue #445 : #445
Browse files Browse the repository at this point in the history
  • Loading branch information
lxhoan committed Jul 26, 2018
1 parent a4a77da commit c557891
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 21 deletions.
12 changes: 7 additions & 5 deletions src/main/java/org/influxdb/impl/InfluxDBImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ public InfluxDBImpl(final String url, final String username, final String passwo
setLogLevel(LOG_LEVEL);

this.gzipRequestInterceptor = new GzipRequestInterceptor();
client.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor);
OkHttpClient.Builder clonedBuilder = client.build().newBuilder();
clonedBuilder.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor);

Factory converterFactory = null;
switch (responseFormat) {
case MSGPACK:
client.addInterceptor(chain -> {
clonedBuilder.addInterceptor(chain -> {
Request request = chain.request().newBuilder().addHeader("Accept", APPLICATION_MSGPACK)
.addHeader("Accept-Encoding", "identity").build();
return chain.proceed(request);
Expand All @@ -147,8 +148,8 @@ public InfluxDBImpl(final String url, final String username, final String passwo
break;
}

this.retrofit = new Retrofit.Builder().baseUrl(url).client(client.build()).addConverterFactory(converterFactory)
.build();
this.retrofit = new Retrofit.Builder().baseUrl(url).client(
clonedBuilder.build()).addConverterFactory(converterFactory).build();
this.influxDBService = this.retrofit.create(InfluxDBService.class);

}
Expand All @@ -171,8 +172,9 @@ public InfluxDBImpl(final String url, final String username, final String passwo
setLogLevel(LOG_LEVEL);

this.gzipRequestInterceptor = new GzipRequestInterceptor();
OkHttpClient.Builder clonedBuilder = client.build().newBuilder();
this.retrofit = new Retrofit.Builder().baseUrl(url)
.client(client.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor).build())
.client(clonedBuilder.addInterceptor(loggingInterceptor).addInterceptor(gzipRequestInterceptor).build())
.addConverterFactory(MoshiConverterFactory.create()).build();
this.influxDBService = influxDBService;

Expand Down
94 changes: 78 additions & 16 deletions src/test/java/org/influxdb/InfluxDBTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
package org.influxdb;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.influxdb.InfluxDB.LogLevel;
import org.influxdb.InfluxDB.ResponseFormat;
import org.influxdb.dto.BatchPoints;
Expand All @@ -18,21 +36,7 @@
import org.junit.platform.runner.JUnitPlatform;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import okhttp3.OkHttpClient;

/**
* Test the InfluxDB API.
Expand Down Expand Up @@ -891,5 +895,63 @@ public void testMessagePackOnOldDbVersion() {
influxDB.describeDatabases();
});
}


/**
* test for issue #445
* make sure reusing of OkHttpClient.Builder causes no error
* @throws InterruptedException
*/
@Test
public void testIssue445() throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(100);

final int maxCallables = 10_000;
List<Callable<String>> callableList = new ArrayList<>(maxCallables);
for (int i = 0; i < maxCallables; i++) {
callableList.add(new Callable<String>() {
@Override
public String call() throws Exception {
MyInfluxDBBean myBean = new MyInfluxDBBean();
return myBean.connectAndDoNothing1();
}
});
}
System.out.println("Invoking all callableList (size()=" + callableList.size() + ")");
executor.invokeAll(callableList);
System.out.println("Shutting down...");
executor.shutdown();
System.out.println("Shutdown requested and waiting for termination...");
if (!executor.awaitTermination(20, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
Assertions.assertTrue(MyInfluxDBBean.OK);
//assert that MyInfluxDBBean.OKHTTP_BUILDER stays untouched (no interceptor added)
Assertions.assertTrue(MyInfluxDBBean.OKHTTP_BUILDER.interceptors().isEmpty());
}

private static final class MyInfluxDBBean {

static final OkHttpClient.Builder OKHTTP_BUILDER = new OkHttpClient.Builder();
static volatile Boolean OK = true;

InfluxDB influxClient;

String connectAndDoNothing1() {
if (!OK) {
return null;
}
try {
influxClient = InfluxDBFactory.connect("http://127.0.0.1:8086", "admin", "admin", OKHTTP_BUILDER);
influxClient.close();
} catch (Exception e) {
synchronized (OK) {
if (OK) {
OK = false;
}
}

}
return null;
}
}
}

0 comments on commit c557891

Please sign in to comment.