Skip to content

Commit

Permalink
test: common: Rework InterruptIssue158Test
Browse files Browse the repository at this point in the history
Make test compile/run on Java 8 and newer.

Change assumptions around exceptions and interrupt state:
- Change expectation of ClosedByInterruptException to
  AsynchronousCloseException (its superclass).
- Only expect Thread#interrupted on actual ClosedByInterruptException.
- Use temporary AFUNIXSocketAddress instead of hardcoded one to prevent
  flaky results.

#158
  • Loading branch information
kohlschuetter committed Jun 30, 2024
1 parent e7508d7 commit b70c33c
Showing 1 changed file with 43 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@

import java.io.IOException;
import java.net.SocketException;
import java.net.StandardProtocolFamily;
import java.net.UnixDomainSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
Expand All @@ -35,6 +34,7 @@
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -48,6 +48,7 @@
import org.newsclub.net.unix.AFUNIXSocket;
import org.newsclub.net.unix.AFUNIXSocketAddress;
import org.newsclub.net.unix.AFUNIXSocketChannel;
import org.newsclub.net.unix.ThreadUtil;

/**
* Test interrupt-related behavior, as discussed in
Expand All @@ -56,39 +57,41 @@
* @author https://github.com/cenodis
* @author Christian Kohlschütter
*/
@SuppressWarnings("PMD")
public class InterruptIssue158Test {

private static final Path SOCKET_PATH = Path.of("/", "tmp", "test_socket");
private static final Path SOCKET_PATH;
private static final AFUNIXSocketAddress SOCKET_ADDR;

static {
try {
SOCKET_ADDR = AFUNIXSocketAddress.of(SOCKET_PATH);
} catch (SocketException e) {
SOCKET_ADDR = AFUNIXSocketAddress.ofNewTempFile();
SOCKET_PATH = SOCKET_ADDR.getFile().toPath();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static List<Arguments> clientProvider() {
return List.of(socket(false, AFUNIXSocket::newInstance, s -> s.connect(SOCKET_ADDR),
return Arrays.asList(socket(false, AFUNIXSocket::newInstance, s -> s.connect(SOCKET_ADDR),
SocketException.class, AFUNIXSocket::isClosed), socket(true, () -> AFUNIXSocket.connectTo(
SOCKET_ADDR), s -> s.getInputStream().read(), SocketException.class,
AFUNIXSocket::isClosed), socket(true, () -> AFUNIXSocket.connectTo(SOCKET_ADDR), s -> s
.getOutputStream().write(10), SocketException.class, AFUNIXSocket::isClosed),

socket(false, AFUNIXSocketChannel::open, s -> s.connect(SOCKET_ADDR),
ClosedByInterruptException.class, s -> !s.isOpen()), socket(true,
AsynchronousCloseException.class, s -> !s.isOpen()), socket(true,
InterruptIssue158Test::connectSocketChannel, s -> s.read(ByteBuffer.allocate(1)),
ClosedByInterruptException.class, s -> !s.isOpen()), socket(true,
AsynchronousCloseException.class, s -> !s.isOpen()), socket(true,
InterruptIssue158Test::connectSocketChannel, s -> s.write(ByteBuffer.allocate(
1)), ClosedByInterruptException.class, s -> !s.isOpen()));
1)), AsynchronousCloseException.class, s -> !s.isOpen()));
}

private static List<Arguments> serverProvider() {
return List.of(serverSocket(() -> AFUNIXServerSocket.bindOn(SOCKET_ADDR),
return Arrays.asList(serverSocket(() -> AFUNIXServerSocket.bindOn(SOCKET_ADDR),
AFUNIXServerSocket::accept, SocketException.class, AFUNIXServerSocket::isClosed),
serverSocket(InterruptIssue158Test::bindServerSocketChannel,
AFUNIXServerSocketChannel::accept, ClosedByInterruptException.class, s -> !s.isOpen()));
AFUNIXServerSocketChannel::accept, AsynchronousCloseException.class, s -> !s.isOpen()));
}

@ParameterizedTest
Expand Down Expand Up @@ -136,46 +139,48 @@ <T extends AutoCloseable> void testServerInterruptionWithDelay(IOSupplier<T> soc
<T extends AutoCloseable> void testSocketInterruption(boolean delay, IOSupplier<T> socket,
IOConsumer<T> blockingOp, Class<?> expectedException, Predicate<T> closeCheck)
throws Throwable {
var exceptionHolder = new AtomicReference<Throwable>();
var ready = new CountDownLatch(1);
var t = Thread.ofVirtual().start(() -> exceptionHolder.set(runOperation(ready, socket,
blockingOp, expectedException, closeCheck)));
AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
CountDownLatch ready = new CountDownLatch(1);
Thread t = ThreadUtil.startNewDaemonThread(true, () -> exceptionHolder.set(runOperation(ready,
socket, blockingOp, expectedException, closeCheck)));

ready.await();
if (delay) {
Thread.sleep(500);
}
t.interrupt();
if (!t.join(Duration.of(1, ChronoUnit.SECONDS))) {
t.join(Duration.of(1, ChronoUnit.SECONDS).toMillis());
if (t.isAlive()) {
throw new RuntimeException("Thread failed to terminate after interrupt");
}
var thrownException = exceptionHolder.get();
Throwable thrownException = exceptionHolder.get();
if (thrownException != null) {
throw thrownException;
}
}

private static void withServer(boolean acceptConnections, ThrowingRunnable func)
throws Throwable {
try (var serverSocket = ServerSocketChannel.open(StandardProtocolFamily.UNIX)) {
serverSocket.bind(UnixDomainSocketAddress.of(SOCKET_PATH));
try (ServerSocketChannel serverSocket = AFUNIXServerSocketChannel.open()) {
serverSocket.bind(SOCKET_ADDR);
Thread serverThread = null;
if (acceptConnections) {
serverThread = Thread.ofPlatform().daemon(true).start(() -> {
var clients = new ArrayList<SocketChannel>();
serverThread = ThreadUtil.startNewDaemonThread(false, () -> {
List<SocketChannel> clients = new ArrayList<>();
while (serverSocket.isOpen()) {
try {
var socket = serverSocket.accept();
SocketChannel socket = serverSocket.accept();
clients.add(socket);
} catch (ClosedChannelException e) {
return;
} catch (IOException e) {
throw new RuntimeException("Unable to accept socket ", e);
} finally {
for (var client : clients) {
for (SocketChannel client : clients) {
try {
client.close();
} catch (IOException ignored) {
// ignored
}
}
}
Expand All @@ -198,14 +203,24 @@ private static void withServer(boolean acceptConnections, ThrowingRunnable func)
<T extends AutoCloseable> Throwable runOperation(CountDownLatch ready, IOSupplier<T> socket,
IOConsumer<T> blockingOp, Class<?> expectedException, Predicate<T> closeCheck) {
try {
var sock = socket.get();
@SuppressWarnings("resource")
T sock = socket.get();
ready.countDown();
try {
blockingOp.accept(sock);
} catch (Exception e) {
// These tests usually expect the "Thread interrupted" state to be set.
// However, when we accept any SocketException to be thrown, that state is not
// deterministic.
// Also, when we expect any kind of AsynchronousCloseException, it is only expected to be
// set when the actual exception thrown is from the ClosedByInterruptException subclass.
boolean ignoreInterruptState = SocketException.class.equals(expectedException);
boolean interruptStateOK = Thread.interrupted() || (AsynchronousCloseException.class.equals(
expectedException) && !(e instanceof ClosedByInterruptException));

assertAll(() -> assertInstanceOf(expectedException, e, "Socket exception"),
() -> assertTrue(Thread.interrupted(), "Thread interrupted"), () -> assertTrue(
closeCheck.test(sock), "Socket closed"));
() -> assertTrue(ignoreInterruptState || interruptStateOK, "Thread interrupted"),
() -> assertTrue(closeCheck.test(sock), "Socket closed"));
} finally {
ready.countDown();
if (sock != null) {
Expand Down Expand Up @@ -233,13 +248,13 @@ private static <T> Arguments serverSocket(IOSupplier<T> socket, IOConsumer<T> bl
}

private static AFUNIXSocketChannel connectSocketChannel() throws IOException {
var socket = AFUNIXSocketChannel.open();
AFUNIXSocketChannel socket = AFUNIXSocketChannel.open();
socket.connect(SOCKET_ADDR);
return socket;
}

private static AFUNIXServerSocketChannel bindServerSocketChannel() throws IOException {
var socket = AFUNIXServerSocketChannel.open();
AFUNIXServerSocketChannel socket = AFUNIXServerSocketChannel.open();
socket.bind(SOCKET_ADDR);
return socket;
}
Expand Down

0 comments on commit b70c33c

Please sign in to comment.