-
Notifications
You must be signed in to change notification settings - Fork 115
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
test: Refactor InterruptIssue158Test, add Java SDK support
Refactor the unit tests for issue 158 such that we can compare the behavior of junixsocket AFUNIXSocketChannel etc. and the Java 16+ JEP380 as well as regular Java inet versions. Add both tests to the selftest, but disable the JEP380/Inet versions by default (enable with -Dselftest.enable-module.junixsocket-common.JEP380=true -Dselftest.enable-module.junixsocket-common.JavaInet=true ) Add concise exception logging (enable with -Dselftest.issue.158.debug=true ). Lastly, move some JEP380-specific logic into its own class. #158
- Loading branch information
1 parent
78fdae6
commit 0bed969
Showing
8 changed files
with
558 additions
and
297 deletions.
There are no files selected for viewing
320 changes: 320 additions & 0 deletions
320
junixsocket-common/src/test/java/org/newsclub/net/unix/InterruptIssue158Test.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,320 @@ | ||
/* | ||
* junixsocket | ||
* | ||
* Copyright 2009-2024 Christian Kohlschütter | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.newsclub.net.unix; | ||
|
||
import static org.junit.jupiter.api.Assertions.assertAll; | ||
import static org.junit.jupiter.api.Assertions.assertInstanceOf; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
||
import java.io.IOException; | ||
import java.net.ServerSocket; | ||
import java.net.Socket; | ||
import java.net.SocketAddress; | ||
import java.net.SocketException; | ||
import java.nio.ByteBuffer; | ||
import java.nio.channels.ClosedByInterruptException; | ||
import java.nio.channels.ClosedChannelException; | ||
import java.nio.channels.ServerSocketChannel; | ||
import java.nio.channels.SocketChannel; | ||
import java.time.Duration; | ||
import java.time.temporal.ChronoUnit; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.concurrent.Semaphore; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.function.Predicate; | ||
|
||
import org.junit.jupiter.api.AfterEach; | ||
import org.junit.jupiter.api.BeforeEach; | ||
import org.junit.jupiter.api.TestInfo; | ||
import org.junit.jupiter.api.TestInstance; | ||
import org.junit.jupiter.api.TestInstance.Lifecycle; | ||
import org.junit.jupiter.params.ParameterizedTest; | ||
import org.junit.jupiter.params.provider.Arguments; | ||
import org.junit.jupiter.params.provider.MethodSource; | ||
|
||
import com.kohlschutter.testutil.TestAbortedNotAnIssueException; | ||
import com.kohlschutter.util.SystemPropertyUtil; | ||
|
||
/** | ||
* Test interrupt-related behavior, as discussed in | ||
* <a href="https://github.com/kohlschutter/junixsocket/issues/158">issue 158</a>. | ||
* | ||
* @author https://github.com/cenodis | ||
* @author Christian Kohlschütter | ||
*/ | ||
@SuppressWarnings({"PMD", "exports"}) | ||
@TestInstance(Lifecycle.PER_CLASS) | ||
public abstract class InterruptIssue158Test<A extends SocketAddress> extends SocketTestBase<A> { | ||
// enable for additional debugging to System.out | ||
private static boolean DEBUG = SystemPropertyUtil.getBooleanSystemProperty( | ||
"selftest.issue.158.debug", false); | ||
private static boolean DEBUG_VERBOSE = (System.getProperty("com.kohlschutter.selftest") == null) | ||
&& SystemPropertyUtil.getBooleanSystemProperty("selftest.issue.158.debug.verbose", true); | ||
|
||
private final A address; | ||
private TestInfo testInfo; | ||
|
||
@SuppressWarnings("unchecked") | ||
protected InterruptIssue158Test(AddressSpecifics<A> asp) { | ||
super(asp); | ||
|
||
try { | ||
address = (A) newTempAddress(); | ||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
@BeforeEach | ||
public void beforeEach(TestInfo info) { | ||
this.testInfo = info; | ||
} | ||
|
||
@AfterEach | ||
public void afterEach() { | ||
deleteSocketFile(address); | ||
} | ||
|
||
protected abstract void deleteSocketFile(A sa); | ||
|
||
public List<Arguments> clientProvider() { | ||
return Arrays.asList( // | ||
// variants | ||
socket(false, this::newSocket, s -> s.connect(address), SocketException.class, | ||
Socket::isClosed), // | ||
socket(true, () -> newConnectedSocket(address), s -> s.getInputStream().read(), | ||
SocketException.class, Socket::isClosed), // | ||
socket(true, () -> newConnectedSocket(address), s -> s.getOutputStream().write(10), | ||
SocketException.class, Socket::isClosed), // | ||
socket(false, this::newSocketChannel, s -> s.connect(address), | ||
ClosedByInterruptException.class, s -> !s.isOpen()), // | ||
socket(true, this::connectSocketChannel, s -> s.read(ByteBuffer.allocate(1)), | ||
ClosedByInterruptException.class, s -> !s.isOpen()), // | ||
socket(true, this::connectSocketChannel, s -> s.write(ByteBuffer.allocate(1)), | ||
ClosedByInterruptException.class, s -> !s.isOpen()) // | ||
); | ||
} | ||
|
||
public List<Arguments> serverProvider() { | ||
return Arrays.asList( // | ||
serverSocket(() -> newServerSocketBindOn(address), ServerSocket::accept, | ||
SocketException.class, ServerSocket::isClosed), // | ||
serverSocket(this::bindServerSocketChannel, ServerSocketChannel::accept, | ||
ClosedByInterruptException.class, s -> !s.isOpen())// | ||
); | ||
} | ||
|
||
@ParameterizedTest(name = "variant {index}") | ||
@MethodSource("clientProvider") | ||
public <T extends AutoCloseable> void testClientInterruption(boolean acceptConnections, | ||
IOSupplier<T> socket, IOConsumer<T> blockingOp, Class<?> expectedException, | ||
Predicate<T> closeCheck) throws Throwable { | ||
withServer(acceptConnections, () -> testSocketInterruption(false, socket, blockingOp, | ||
expectedException, closeCheck)); | ||
} | ||
|
||
@ParameterizedTest(name = "variant {index}") | ||
@MethodSource("clientProvider") | ||
public <T extends AutoCloseable> void testClientInterruptionWithDelay(boolean acceptConnections, | ||
IOSupplier<T> socket, IOConsumer<T> blockingOp, Class<?> expectedException, | ||
Predicate<T> closeCheck) throws Throwable { | ||
withServer(acceptConnections, () -> testSocketInterruption(true, socket, blockingOp, | ||
expectedException, closeCheck)); | ||
} | ||
|
||
@ParameterizedTest(name = "variant {index}") | ||
@MethodSource("serverProvider") | ||
public <T extends AutoCloseable> void testServerInterruption(IOSupplier<T> socket, | ||
IOConsumer<T> blockingOp, Class<?> expectedException, Predicate<T> closeCheck) | ||
throws Throwable { | ||
testSocketInterruption(false, socket, blockingOp, expectedException, closeCheck); | ||
} | ||
|
||
@ParameterizedTest(name = "variant {index}") | ||
@MethodSource("serverProvider") | ||
public <T extends AutoCloseable> void testServerInterruptionWithDelay(IOSupplier<T> socket, | ||
IOConsumer<T> blockingOp, Class<?> expectedException, Predicate<T> closeCheck) | ||
throws Throwable { | ||
testSocketInterruption(true, socket, blockingOp, expectedException, closeCheck); | ||
} | ||
|
||
public <T extends AutoCloseable> void testSocketInterruption(boolean delay, IOSupplier<T> socket, | ||
IOConsumer<T> blockingOp, Class<?> expectedException, Predicate<T> closeCheck) | ||
throws Throwable { | ||
AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(); | ||
CountDownLatch ready = new CountDownLatch(1); | ||
Thread t = ThreadUtil.startNewDaemonThread(true, () -> exceptionHolder.set(runOperation(ready, | ||
socket, blockingOp, expectedException, closeCheck))); | ||
|
||
ready.await(); | ||
if (delay) { | ||
Thread.sleep(500); | ||
} | ||
t.interrupt(); | ||
t.join(Duration.of(1, ChronoUnit.SECONDS).toMillis()); | ||
if (t.isAlive()) { | ||
throw new RuntimeException("Thread failed to terminate after interrupt"); | ||
} | ||
Throwable thrownException = exceptionHolder.get(); | ||
if (thrownException != null) { | ||
throw thrownException; | ||
} | ||
} | ||
|
||
private void withServer(boolean acceptConnections, ThrowingRunnable func) throws Throwable { | ||
Semaphore done = new Semaphore(0); | ||
try (ServerSocketChannel serverSocket = newServerSocketChannel()) { | ||
serverSocket.bind(address); | ||
Thread serverThread = null; | ||
if (acceptConnections) { | ||
serverThread = ThreadUtil.startNewDaemonThread(false, () -> { | ||
while (serverSocket.isOpen()) { | ||
SocketChannel socket = null; | ||
try { | ||
socket = serverSocket.accept(); | ||
} catch (ClosedChannelException e) { | ||
return; | ||
} catch (IOException e) { | ||
throw new RuntimeException("Unable to accept socket ", e); | ||
} finally { | ||
if (socket != null) { | ||
final SocketChannel socketToClose = socket; | ||
CompletableFuture.runAsync(() -> { | ||
try { | ||
done.tryAcquire(1, TimeUnit.SECONDS); | ||
} catch (InterruptedException e) { | ||
// ignore | ||
} | ||
try { | ||
socketToClose.close(); | ||
} catch (IOException e) { | ||
// TODO Auto-generated catch block | ||
e.printStackTrace(); | ||
} | ||
}); | ||
} | ||
} | ||
} | ||
}); | ||
} | ||
try { | ||
func.run(); | ||
} finally { | ||
done.release(); | ||
serverSocket.close(); | ||
if (serverThread != null) { | ||
serverThread.join(); | ||
} | ||
} | ||
} | ||
} | ||
|
||
<T extends AutoCloseable> Throwable runOperation(CountDownLatch ready, IOSupplier<T> socket, | ||
IOConsumer<T> blockingOp, Class<?> expectedException, Predicate<T> closeCheck) { | ||
|
||
boolean supported = false; | ||
Exception exc = null; | ||
try { | ||
@SuppressWarnings({"resource"}) | ||
T sock = socket.get(); | ||
ready.countDown(); | ||
|
||
supported = true; | ||
try { | ||
blockingOp.accept(sock); | ||
} catch (Exception e) { | ||
exc = e; | ||
// These tests usually expect the "Thread interrupted" state to be set. | ||
// However, when we accept any SocketException to be thrown, that state is not | ||
// deterministic. | ||
// Also, when we expect any kind of ClosedChannelException, it is only expected to be | ||
// set when the actual exception thrown is from the ClosedByInterruptException subclass. | ||
boolean ignoreInterruptState = SocketException.class.equals(expectedException); | ||
boolean interruptStateOK = Thread.interrupted() || (ClosedChannelException.class | ||
.isAssignableFrom(expectedException) && !(e instanceof ClosedByInterruptException)); | ||
|
||
assertAll(() -> assertInstanceOf(expectedException, e, "Socket exception"), | ||
() -> assertTrue(ignoreInterruptState || interruptStateOK, "Thread interrupted"), | ||
() -> assertTrue(closeCheck.test(sock), "Socket closed")); | ||
} finally { | ||
ready.countDown(); | ||
if (sock != null) { | ||
try { | ||
sock.close(); | ||
} catch (Exception e) { | ||
throw new RuntimeException("Unable to clean up socket", e); | ||
} | ||
} | ||
} | ||
} catch (TestAbortedNotAnIssueException e) { | ||
return e; | ||
} catch (Throwable e) { | ||
e.printStackTrace(); | ||
return e; | ||
} finally { | ||
ready.countDown(); | ||
if (DEBUG) { | ||
// print concise results for debugging: | ||
if (DEBUG_VERBOSE) { | ||
System.out.print(testInfo.getTestClass().get().getName() + "." + testInfo.getTestMethod() | ||
.get().getName() + " " + testInfo.getDisplayName() + ": "); | ||
} | ||
System.out.println((supported ? (exc == null ? "no exception" : exc) : "unsupported")); | ||
} | ||
} | ||
return null; | ||
} | ||
|
||
private static <T> Arguments socket(boolean acceptConnections, IOSupplier<T> socket, | ||
IOConsumer<T> blockingOp, Class<?> expectedException, Predicate<T> closeCheck) { | ||
return Arguments.of(acceptConnections, socket, blockingOp, expectedException, closeCheck); | ||
} | ||
|
||
private static <T> Arguments serverSocket(IOSupplier<T> socket, IOConsumer<T> blockingOp, | ||
Class<?> expectedException, Predicate<T> closeCheck) { | ||
return Arguments.of(socket, blockingOp, expectedException, closeCheck); | ||
} | ||
|
||
private SocketChannel connectSocketChannel() throws IOException { | ||
SocketChannel socket = newSocketChannel(); | ||
socket.connect(address); | ||
return socket; | ||
} | ||
|
||
private ServerSocketChannel bindServerSocketChannel() throws IOException { | ||
ServerSocketChannel socket = newServerSocketChannel(); | ||
socket.bind(address); | ||
return socket; | ||
} | ||
|
||
private interface IOSupplier<T> { | ||
T get() throws IOException; | ||
} | ||
|
||
private interface IOConsumer<T> { | ||
void accept(T t) throws IOException; | ||
} | ||
|
||
private interface ThrowingRunnable { | ||
void run() throws Throwable; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.