Skip to content

Commit

Permalink
test: Rework InterruptIssue158Test
Browse files Browse the repository at this point in the history
Previously, the test may fail sporadically due to a race condition
between receiving a "thread interrupted" state on a live connection and
one that has been closed already (resulting in an
AsynchronousCloseException instead of a ClosedByInterruptException).

Make sure that the client connection is closed from the servce side only
after a delay that is significantly longer than any other delay in this
test class. Run that close() in a separate Thread to avoid unnecessarily
long delays.

Finally, set all expected exceptions back to ClosedByInterruptException,
as intended by the original author (cenodis).

#158
  • Loading branch information
kohlschuetter committed Jul 1, 2024
1 parent 919b063 commit 83b3e6a
Showing 1 changed file with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
Expand All @@ -33,10 +32,12 @@
import java.nio.file.Path;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

Expand Down Expand Up @@ -82,11 +83,11 @@ private static List<Arguments> clientProvider() {
socket(true, () -> AFUNIXSocket.connectTo(SOCKET_ADDR), s -> s.getOutputStream().write(10),
SocketException.class, AFUNIXSocket::isClosed), //
socket(false, AFUNIXSocketChannel::open, s -> s.connect(SOCKET_ADDR),
AsynchronousCloseException.class, s -> !s.isOpen()), //
ClosedByInterruptException.class, s -> !s.isOpen()), //
socket(true, InterruptIssue158Test::connectSocketChannel, s -> s.read(ByteBuffer.allocate(
1)), AsynchronousCloseException.class, s -> !s.isOpen()), //
1)), ClosedByInterruptException.class, s -> !s.isOpen()), //
socket(true, InterruptIssue158Test::connectSocketChannel, s -> s.write(ByteBuffer.allocate(
1)), AsynchronousCloseException.class, s -> !s.isOpen()) //
1)), ClosedByInterruptException.class, s -> !s.isOpen()) //
);
}

Expand All @@ -95,7 +96,7 @@ private static List<Arguments> serverProvider() {
serverSocket(() -> AFUNIXServerSocket.bindOn(SOCKET_ADDR), AFUNIXServerSocket::accept,
SocketException.class, AFUNIXServerSocket::isClosed), //
serverSocket(InterruptIssue158Test::bindServerSocketChannel,
AFUNIXServerSocketChannel::accept, AsynchronousCloseException.class, s -> !s.isOpen())//
AFUNIXServerSocketChannel::accept, ClosedByInterruptException.class, s -> !s.isOpen())//
);
}

Expand Down Expand Up @@ -166,27 +167,33 @@ <T extends AutoCloseable> void testSocketInterruption(boolean delay, IOSupplier<

private static void withServer(boolean acceptConnections, ThrowingRunnable func)
throws Throwable {

ScheduledExecutorService cleanup = Executors.newSingleThreadScheduledExecutor();

try (ServerSocketChannel serverSocket = AFUNIXServerSocketChannel.open()) {
serverSocket.bind(SOCKET_ADDR);
Thread serverThread = null;
if (acceptConnections) {
serverThread = ThreadUtil.startNewDaemonThread(false, () -> {
List<SocketChannel> clients = new ArrayList<>();
while (serverSocket.isOpen()) {
SocketChannel socket = null;
try {
SocketChannel socket = serverSocket.accept();
clients.add(socket);
socket = serverSocket.accept();
} catch (ClosedChannelException e) {
return;
} catch (IOException e) {
throw new RuntimeException("Unable to accept socket ", e);
} finally {
for (SocketChannel client : clients) {
try {
client.close();
} catch (IOException ignored) {
// ignored
}
if (socket != null) {
final SocketChannel socketToClose = socket;
cleanup.schedule(() -> {
try {
socketToClose.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}, 1, TimeUnit.SECONDS);
}
}
}
Expand Down Expand Up @@ -220,8 +227,8 @@ <T extends AutoCloseable> Throwable runOperation(CountDownLatch ready, IOSupplie
// Also, when we expect any kind of ClosedChannelException, it is only expected to be
// set when the actual exception thrown is from the ClosedByInterruptException subclass.
boolean ignoreInterruptState = SocketException.class.equals(expectedException);
boolean interruptStateOK = Thread.interrupted() || (AsynchronousCloseException.class.equals(
expectedException) && !(e instanceof ClosedByInterruptException));
boolean interruptStateOK = Thread.interrupted() || (ClosedChannelException.class
.isAssignableFrom(expectedException) && !(e instanceof ClosedByInterruptException));

assertAll(() -> assertInstanceOf(expectedException, e, "Socket exception"),
() -> assertTrue(ignoreInterruptState || interruptStateOK, "Thread interrupted"),
Expand Down

0 comments on commit 83b3e6a

Please sign in to comment.