Skip to content

Commit

Permalink
Always use block/blockLast/verify with timeout (#3238)
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg authored May 13, 2024
1 parent ce2b98a commit d48cc15
Show file tree
Hide file tree
Showing 25 changed files with 175 additions and 147 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2022-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -70,7 +70,7 @@ void testContextPropagation(TcpClient client) {
.wiretap(true)
.connect()
.contextWrite(ctx -> ctx.putAllMap((HashMap<Object, Object>) ContextSnapshot.captureAll(registry)))
.block();
.block(Duration.ofSeconds(5));

assertThat(connection).isNotNull();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -131,7 +131,7 @@ public NettyOutbound withConnection(Consumer<? super Connection> withConnection)
ChannelFuture f = channel.writeOneOutbound(1);

outbound.sendFile(Paths.get(getClass().getResource("/largeFile.txt").toURI()))
.then().block();
.then().block(Duration.ofSeconds(5));

assertThat(channel.inboundMessages()).isEmpty();
assertThat(channel.outboundMessages()).hasSize(2);
Expand Down Expand Up @@ -313,7 +313,7 @@ public NettyOutbound withConnection(Consumer<? super Connection> withConnection)

ChannelFuture f = channel.writeOneOutbound(1);
outbound.sendFileChunked(path, 0, Files.size(path))
.then().block();
.then().block(Duration.ofSeconds(5));

assertThat(channel.inboundMessages()).isEmpty();
assertThat(messageWritten).containsExactly(Integer.class, ChunkedNioFile.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@
package reactor.netty.channel;

import java.lang.ref.WeakReference;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.IdentityHashMap;
Expand Down Expand Up @@ -156,7 +157,7 @@ void cleanupCancelCloseFuture(boolean flushOnEach) {
.consumeSubscriptionWith(s -> _w.add(new WeakReference<>(s)))
.then(channel::runPendingTasks)
.thenCancel()
.verify();
.verify(Duration.ofSeconds(5));

System.gc();
wait(_w.get(0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1419,7 +1419,7 @@ private void doTestSharedNameResolver(TcpClient client, boolean sharedClient) th
finally {
disposableServer.disposeNow();
loop.disposeLater()
.block();
.block(Duration.ofSeconds(5));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -101,7 +101,7 @@ void shutdownLaterDefers() {
TcpResources.disposeLoopsAndConnectionsLater();
assertThat(newTcpResources.isDisposed()).isFalse();

TcpResources.disposeLoopsAndConnectionsLater().block();
TcpResources.disposeLoopsAndConnectionsLater().block(Duration.ofSeconds(5));
assertThat(newTcpResources.isDisposed()).as("disposeLoopsAndConnectionsLater completion").isTrue();

assertThat(TcpResources.tcpResources.get()).isNull();
Expand Down Expand Up @@ -140,7 +140,7 @@ void blockShouldFail() throws InterruptedException {
try {
out.sendString(Flux.just("Hello World!"))
.then()
.block();
.block(Duration.ofSeconds(5));
}
catch (RuntimeException e) {
latch.countDown();
Expand Down Expand Up @@ -170,7 +170,7 @@ void testIssue1227() {

TcpResources current = TcpResources.tcpResources.get();
TcpResources.disposeLoopsAndConnectionsLater()
.block();
.block(Duration.ofSeconds(5));
assertThat(current.isDisposed()).isTrue();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@
import reactor.test.StepVerifier;

import java.net.UnknownHostException;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -51,7 +52,7 @@ void shouldCallHooksInSuccessScenario() {
.doAfterResolve((conn, socketAddress) -> doAfterResolve.set(conn.channel().attr(TRACE_ID_KEY).get()))
.doOnResolveError((conn, th) -> doOnResolveError.set(conn.channel().attr(TRACE_ID_KEY).get()))
.connect()
.block();
.block(Duration.ofSeconds(5));

assertThat(doOnResolve).hasValue(TRACE_ID_VALUE);
assertThat(doAfterResolve).hasValue(TRACE_ID_VALUE);
Expand Down Expand Up @@ -81,7 +82,8 @@ void shouldCallHooksInErrorScenario() {
})
.connect()
.as(StepVerifier::create)
.verifyError(UnknownHostException.class);
.expectError(UnknownHostException.class)
.verify(Duration.ofSeconds(5));

assertThat(doOnResolve).hasValue(TRACE_ID_VALUE);
assertThat(doAfterResolve).hasValue(0);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2023-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
import reactor.netty.tcp.TcpClientConfig;

import java.net.InetSocketAddress;
import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -36,13 +37,13 @@ void bind_whenBindException_thenChannelIsUnregistered() {
transportConfig,
new RecordingChannelInitializer(),
new InetSocketAddress("localhost", 0),
false).block();
false).block(Duration.ofSeconds(5));
final RecordingChannelInitializer channelInitializer = new RecordingChannelInitializer();
assertThatThrownBy(() -> TransportConnector.bind(
transportConfig,
channelInitializer,
new InetSocketAddress("localhost", ((InetSocketAddress) channel1.localAddress()).getPort()),
false).block());
false).block(Duration.ofSeconds(5)));
final Channel channel2 = channelInitializer.channel;
assertThat(channel1.isRegistered()).isTrue();
assertThat(channel2.isRegistered()).isFalse();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,8 @@
import org.junit.jupiter.api.Test;
import reactor.netty.resources.LoopResources;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThat;

/**
Expand All @@ -39,7 +41,7 @@ void testIssue1227() {

UdpResources current = UdpResources.udpResources.get();
UdpResources.shutdownLater()
.block();
.block(Duration.ofSeconds(5));
assertThat(current.isDisposed()).isTrue();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -115,7 +115,7 @@ private void doTestClientCreatedWithMetricsDoesntLoadGauge(HttpServer server, Ht
.responseContent()
.aggregate()
.asString()
.block()
.block(Duration.ofSeconds(5))
).doesNotThrowAnyException();

//we still assert that the custom recorder did receive events, since it is not based on micrometer
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -153,7 +153,7 @@ void testChannelInactiveThrowsIOException() throws Exception {

StepVerifier.create(response.log())
.expectError(IOException.class)
.verify();
.verify(Duration.ofSeconds(5));

abortServer.close();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -200,7 +200,7 @@ void doTestMaxActiveStreams_1_CustomPool(@Nullable BiFunction<Runnable, Duration
ConnectionProvider provider = builder.build();
doTestMaxActiveStreams(HttpClient.create(provider), 1, 1, 1);
provider.disposeLater()
.block();
.block(Duration.ofSeconds(5));
}

@Test
Expand All @@ -213,7 +213,7 @@ void testMaxActiveStreams_2_CustomPool() throws Exception {
ConnectionProvider provider = ConnectionProvider.create("testMaxActiveStreams_2", 1);
doTestMaxActiveStreams(HttpClient.create(provider), 2, 2, 0);
provider.disposeLater()
.block();
.block(Duration.ofSeconds(5));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2011-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,7 @@ void test() {
.asString(StandardCharsets.UTF_8)
.collectList())
.expectNextMatches(List::isEmpty)
.verifyComplete();
.expectComplete()
.verify(Duration.ofSeconds(5));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2020-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -650,7 +650,8 @@ void testIdleTimeoutAddedCorrectly(HttpServer server, HttpClient client) {
.asString()
.timeout(Duration.ofSeconds(10)))
.expectNext("Hello world!")
.verifyComplete();
.expectComplete()
.verify(Duration.ofSeconds(5));

try {
// Wait till all logs are flushed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2021 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -97,7 +97,7 @@ void shutdownLaterDefers() {
HttpResources.disposeLoopsAndConnectionsLater();
assertThat(newHttpResources.isDisposed()).isFalse();

HttpResources.disposeLoopsAndConnectionsLater().block();
HttpResources.disposeLoopsAndConnectionsLater().block(Duration.ofSeconds(5));
assertThat(newHttpResources.isDisposed()).as("disposeLoopsAndConnectionsLater completion").isTrue();

assertThat(HttpResources.httpResources.get()).isNull();
Expand All @@ -123,7 +123,7 @@ void testIssue1227() {

HttpResources current = HttpResources.httpResources.get();
HttpResources.disposeLoopsAndConnectionsLater()
.block();
.block(Duration.ofSeconds(5));
assertThat(current.isDisposed()).isTrue();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2017-2023 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2017-2024 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,7 +66,8 @@ void httpStatusCode404IsHandledByTheClient() {

StepVerifier.create(content)
.expectNext(404)
.verifyComplete();
.expectComplete()
.verify(Duration.ofSeconds(5));
}

@ParameterizedTest
Expand Down
Loading

0 comments on commit d48cc15

Please sign in to comment.