Skip to content

Commit

Permalink
common: Throw SocketException, ClosedByInterruptException on interrupt
Browse files Browse the repository at this point in the history
... from VirtualThreads, to be extra-compatible with the standard
implementation.

Add test cases to demonstrate the feature.

#158
  • Loading branch information
kohlschuetter committed Apr 20, 2024
1 parent eab8c72 commit 10eea5a
Show file tree
Hide file tree
Showing 25 changed files with 458 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ int read(ByteBuffer dst, AFSupplier<Integer> timeout, ByteBuffer socketAddressBu
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
timeout);
timeout, this::close);
}
configureVirtualBlocking(true);
}
Expand Down Expand Up @@ -239,7 +239,6 @@ int read(ByteBuffer dst, AFSupplier<Integer> timeout, ByteBuffer socketAddressBu
configureVirtualBlocking(false);
}
}

break; // NOPMD.AvoidBranchingStatementAsLastInLoop virtualThreadLoop
} while (true); // NOPMD.WhileLoopWithLiteralBoolean

Expand Down Expand Up @@ -305,7 +304,7 @@ int write(ByteBuffer src, AFSupplier<Integer> timeout, SocketAddress target, int
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
timeout);
timeout, this::close);
}
configureVirtualBlocking(true);
}
Expand Down Expand Up @@ -365,6 +364,7 @@ int write(ByteBuffer src, AFSupplier<Integer> timeout, SocketAddress target, int
* @param capacity The desired capacity.
* @return A byte buffer satisfying the requested capacity.
*/
@SuppressWarnings("null")
Lease<MutableHolder<@NonNull ByteBuffer>> getPrivateDirectByteBuffer(int capacity) {
if (capacity > TL_BUFFER_MAX_CAPACITY && TL_BUFFER_MAX_CAPACITY > 0) {
// Capacity exceeds configurable maximum limit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private void recv(DatagramPacket p, int options) throws IOException {
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
socketTimeout::get);
socketTimeout::get, this::close);
}
core.configureVirtualBlocking(true);
}
Expand Down Expand Up @@ -301,7 +301,7 @@ protected final void send(DatagramPacket p) throws IOException {
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
socketTimeout::get);
socketTimeout::get, this::close);
}
core.configureVirtualBlocking(true);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* junixsocket
*
* Copyright 2009-2024 Christian Kohlschütter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.newsclub.net.unix;

import java.io.IOException;

/**
* A supplier that can throw an IOException.
*
* @param <T> the type of results supplied by this supplier
*/
@FunctionalInterface
public interface AFIOSupplier<T> {

/**
* Gets a result.
*
* @return a result
* @throws IOException on error.
*/
T get() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private int receive(int maxReceive, int options) throws IOException {
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
AFPipe.DUMMY_TIMEOUT);
AFPipe.DUMMY_TIMEOUT, this::close);
}
NativeUnixSocket.configureBlocking(fdesc, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public abstract class AFServerSocket<A extends AFSocketAddress> extends ServerSo
private final AtomicBoolean deleteOnClose = new AtomicBoolean(true);

@SuppressWarnings("this-escape")
private final AFServerSocketChannel<?> channel = newChannel();
private final AFServerSocketChannel<A> channel = newChannel();
private @Nullable SocketAddressFilter bindFilter;

private final AtomicBoolean closed = new AtomicBoolean(false);
Expand Down Expand Up @@ -107,7 +107,7 @@ protected AFServerSocket(FileDescriptor fdObj) throws IOException {
*
* @return The new instance.
*/
protected abstract AFServerSocketChannel<?> newChannel();
protected abstract AFServerSocketChannel<A> newChannel();

/**
* Creates a new AFSocketImpl.
Expand Down Expand Up @@ -521,7 +521,7 @@ final AFSocketImpl<A> getAFImpl() {

@SuppressFBWarnings("EI_EXPOSE_REP")
@Override
public AFServerSocketChannel<?> getChannel() {
public AFServerSocketChannel<A> getChannel() {
return channel;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,12 @@ public final AFServerSocket<A> socket() {

@Override
public AFSocketChannel<A> accept() throws IOException {
AFSocket<A> socket = afSocket.accept1(false);
return socket == null ? null : socket.getChannel();
try {
AFSocket<A> socket = afSocket.accept1(false);
return socket == null ? null : socket.getChannel();
} catch (SocketClosedByInterruptException e) {
throw e.asClosedByInterruptException(); // NOPMD.PreserveStackTrace
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,15 @@ public final boolean isConnectionPending() {

@Override
public final boolean connect(SocketAddress remote) throws IOException {
boolean connected = afSocket.connect0(remote, 0);
if (!connected) {
connectPending.set(true);
try {
boolean connected = afSocket.connect0(remote, 0);
if (!connected) {
connectPending.set(true);
}
return connected;
} catch (SocketClosedByInterruptException e) {
throw e.asClosedByInterruptException(); // NOPMD.PreserveStackTrace
}
return connected;
}

@Override
Expand Down Expand Up @@ -230,7 +234,11 @@ public final A getRemoteSocketAddress() {

@Override
public final int read(ByteBuffer dst) throws IOException {
return afSocket.getAFImpl().read(dst, null);
try {
return afSocket.getAFImpl().read(dst, null);
} catch (SocketClosedByInterruptException e) {
throw e.asClosedByInterruptException(); // NOPMD.PreserveStackTrace
}
}

@Override
Expand All @@ -253,7 +261,11 @@ public final long write(ByteBuffer[] srcs, int offset, int length) throws IOExce

@Override
public final int write(ByteBuffer src) throws IOException {
return afSocket.getAFImpl().write(src);
try {
return afSocket.getAFImpl().write(src);
} catch (SocketClosedByInterruptException e) {
throw e.asClosedByInterruptException(); // NOPMD.PreserveStackTrace
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ final boolean accept0(SocketImpl socket) throws IOException {
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_ACCEPT, now,
socketTimeout::get);
socketTimeout::get, this::close);
}
}

Expand Down Expand Up @@ -473,7 +473,7 @@ final boolean connect0(SocketAddress addr, int connectTimeout) throws IOExceptio
if (virtualConnectTimeout != null) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fd, SelectionKey.OP_CONNECT,
now, virtualConnectTimeout);
now, virtualConnectTimeout, this::close);
}
} else {
Thread.yield();
Expand Down Expand Up @@ -520,7 +520,7 @@ final boolean connect0(SocketAddress addr, int connectTimeout) throws IOExceptio
core.configureVirtualBlocking(false);
}
}
} while (!Thread.interrupted());
} while (ThreadUtil.checkNotInterruptedOrThrow());
if (success) {
setSocketAddress(socketAddress);
this.connected.set(true);
Expand Down Expand Up @@ -634,7 +634,7 @@ public int read(byte[] buf, int off, int len) throws IOException {
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_READ, now,
socketTimeout::get);
socketTimeout::get, this::close);
}
core.configureVirtualBlocking(true);
}
Expand Down Expand Up @@ -700,7 +700,7 @@ public int read() throws IOException {
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_READ, now,
socketTimeout::get);
socketTimeout::get, this::close);
}
core.configureVirtualBlocking(true);
}
Expand Down Expand Up @@ -766,14 +766,14 @@ public int available() throws IOException {
public FileDescriptor getFileDescriptor() throws IOException {
return getFD();
}

}

private static boolean checkWriteInterruptedException(int bytesTransferred)
throws InterruptedIOException {
if (Thread.interrupted()) {
InterruptedIOException ex = new InterruptedIOException("Thread interrupted during write");
if (Thread.currentThread().isInterrupted()) {
InterruptedIOException ex = new InterruptedIOException("write");
ex.bytesTransferred = bytesTransferred;
Thread.currentThread().interrupt();
throw ex;
}
return true;
Expand Down Expand Up @@ -806,7 +806,7 @@ public void write(int oneByte) throws IOException {
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
socketTimeout::get);
socketTimeout::get, this::close);
}
core.configureVirtualBlocking(true);
}
Expand Down Expand Up @@ -876,7 +876,7 @@ public void write(byte[] buf, int off, int len) throws IOException {
if (virtualBlocking) {
if (park) {
VirtualThreadPoller.INSTANCE.parkThreadUntilReady(fdesc, SelectionKey.OP_WRITE, now,
socketTimeout::get);
socketTimeout::get, this::close);
}
core.configureVirtualBlocking(true);
}
Expand All @@ -891,7 +891,8 @@ public void write(byte[] buf, int off, int len) throws IOException {
}
if (written < 0) {
if (len == 0) {
// This exception is only useful to detect OS-level bugs that we need to work-around
// This exception is only useful to detect OS-level bugs that we need to
// work-around
// in native code.
// throw new IOException("Error while writing zero-length byte array; try -D"
// + AFSocket.PROP_LIBRARY_DISABLE_CAPABILITY_PREFIX
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* junixsocket
*
* Copyright 2009-2024 Christian Kohlschütter
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.newsclub.net.unix;

import java.io.Closeable;
import java.io.IOException;
import java.net.SocketException;
import java.nio.channels.ClosedByInterruptException;

/**
* A {@link SocketException} indicating that a socket was closed by interrupt.
*
* @author Christian Kohlschütter
*/
final class SocketClosedByInterruptException extends SocketClosedException {
private static final long serialVersionUID = 1L;

/**
* Constructs a new {@link SocketClosedByInterruptException}.
*/
private SocketClosedByInterruptException() {
super("Closed by interrupt");
}

static SocketClosedByInterruptException newInstanceAndClose(Closeable closeable) {
Throwable suppressed = null;
try {
if (closeable != null) {
closeable.close();
}
} catch (RuntimeException | IOException e) {
suppressed = e;
}
SocketClosedByInterruptException exc = new SocketClosedByInterruptException();
if (suppressed != null) {
exc.addSuppressed(suppressed);
}
return exc;
}

public ClosedByInterruptException asClosedByInterruptException() {
ClosedByInterruptException exc = new ClosedByInterruptException();
exc.initCause(this);
return exc;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
*
* @author Christian Kohlschütter
*/
public final class SocketClosedException extends SocketException {
public class SocketClosedException extends SocketException {
private static final long serialVersionUID = 1L;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.newsclub.net.unix;

import java.io.InterruptedIOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -65,6 +66,20 @@ public static ExecutorService newVirtualThreadPerTaskExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}

/**
* Checks if the current Thread has been interrupted, without clearing the flag; if interrupted,
* an {@link InterruptedIOException} is thrown, otherwise {@code true} is returned.
*
* @return {@code true}.
* @throws InterruptedIOException if interrupted.
*/
public static boolean checkNotInterruptedOrThrow() throws InterruptedIOException {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedIOException();
}
return true;
}

/**
* Ensures that the given operation is being executed on a system thread. If the current thread is
* a virtual thread, the operation is executed <em>synchronously</em> via
Expand Down
Loading

0 comments on commit 10eea5a

Please sign in to comment.