Skip to content

Commit

Permalink
Ensure ConnectionProvider metrics are disposed unconditionally when g…
Browse files Browse the repository at this point in the history
…raceful shutdown (#3235)

Dispose metrics not only when onErrorResume, but also on a happy path.

Fixes #3226
  • Loading branch information
violetagg authored May 8, 2024
1 parent ff9b096 commit 066a645
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,28 +186,14 @@ public final Mono<Void> disposeLater() {
if (pool instanceof GracefulShutdownInstrumentedPool) {
return ((GracefulShutdownInstrumentedPool<T>) pool)
.disposeGracefully(disposeTimeout)
.then(deRegisterDefaultMetrics(id, poolFactory.registrar, remoteAddress))
.onErrorResume(t -> {
log.error("Connection pool for [{}] didn't shut down gracefully", e.getKey(), t);
return Mono.fromRunnable(() -> {
if (poolFactory.registrar != null) {
poolFactory.registrar.get().deRegisterMetrics(name, id, remoteAddress);
}
else if (Metrics.isInstrumentationAvailable()) {
deRegisterDefaultMetrics(id, remoteAddress);
}
});
return deRegisterDefaultMetrics(id, poolFactory.registrar, remoteAddress);
});
}
return pool.disposeLater().then(
Mono.<Void>fromRunnable(() -> {
if (poolFactory.registrar != null) {
poolFactory.registrar.get().deRegisterMetrics(name, id, remoteAddress);
}
else if (Metrics.isInstrumentationAvailable()) {
deRegisterDefaultMetrics(id, remoteAddress);
}
})
);
return pool.disposeLater()
.then(deRegisterDefaultMetrics(id, poolFactory.registrar, remoteAddress));
})
.collect(Collectors.toList());
if (pools.isEmpty()) {
Expand Down Expand Up @@ -314,6 +300,17 @@ protected void deRegisterDefaultMetrics(String id, SocketAddress remoteAddress)
MicrometerPooledConnectionProviderMeterRegistrar.INSTANCE.deRegisterMetrics(name, id, remoteAddress);
}

Mono<Void> deRegisterDefaultMetrics(String id, @Nullable Supplier<? extends MeterRegistrar> registrar, SocketAddress remoteAddress) {
return Mono.fromRunnable(() -> {
if (registrar != null) {
registrar.get().deRegisterMetrics(name, id, remoteAddress);
}
else if (Metrics.isInstrumentationAvailable()) {
deRegisterDefaultMetrics(id, remoteAddress);
}
});
}

final boolean compareAddresses(SocketAddress origin, SocketAddress target) {
if (origin.equals(target)) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.netty.BaseHttpTest;
Expand Down Expand Up @@ -226,8 +228,9 @@ private void doTest(HttpServer server, HttpClient client, String poolName, boole
assertGauge(registry, CONNECTION_PROVIDER_PREFIX + MAX_PENDING_CONNECTIONS, NAME, poolName).hasValueEqualTo(expectedMaxPendingAcquire);
}

@Test
void testConnectionPoolPendingAcquireSize() throws Exception {
@ParameterizedTest
@ValueSource(longs = {0, 50, 500})
void testConnectionPoolPendingAcquireSize(long disposeTimeoutMillis) throws Exception {
Http2SslContextSpec serverCtx = Http2SslContextSpec.forServer(ssc.certificate(), ssc.privateKey());
Http2SslContextSpec clientCtx =
Http2SslContextSpec.forClient()
Expand All @@ -244,13 +247,21 @@ void testConnectionPoolPendingAcquireSize() throws Exception {
.delayElements(Duration.ofMillis(4))))
.bindNow();

ConnectionProvider provider = ConnectionProvider
ConnectionProvider.Builder builder = ConnectionProvider
.builder("testConnectionPoolPendingAcquireSize")
.pendingAcquireMaxCount(1000)
.maxConnections(500)
.metrics(true)
.build();
.metrics(true);
ConnectionProvider provider = disposeTimeoutMillis == 0 ? builder.build() :
builder.disposeTimeout(Duration.ofMillis(disposeTimeoutMillis)).build();

CountDownLatch meterRemoved = new CountDownLatch(1);
String name = CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS;
registry.config().onMeterRemoved(meter -> {
if (name.equals(meter.getId().getName())) {
meterRemoved.countDown();
}
});
try {
CountDownLatch latch = new CountDownLatch(1);
AtomicInteger counter = new AtomicInteger();
Expand Down Expand Up @@ -290,14 +301,17 @@ void testConnectionPoolPendingAcquireSize() throws Exception {

assertThat(latch.await(30, TimeUnit.SECONDS)).isTrue();

assertGauge(registry, CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, NAME, "http2.testConnectionPoolPendingAcquireSize").hasValueEqualTo(0);
assertGauge(registry, name, NAME, "http2.testConnectionPoolPendingAcquireSize").hasValueEqualTo(0);
}
finally {
provider.disposeLater()
.block(Duration.ofSeconds(30));
}

assertThat(meterRemoved.await(30, TimeUnit.SECONDS)).isTrue();

// deRegistered
assertGauge(registry, CONNECTION_PROVIDER_PREFIX + PENDING_STREAMS, NAME, "http2.testConnectionPoolPendingAcquireSize").isNull();
assertGauge(registry, name, NAME, "http2.testConnectionPoolPendingAcquireSize").isNull();
}

private double getGaugeValue(String gaugeName, String poolName) {
Expand Down

0 comments on commit 066a645

Please sign in to comment.