Skip to content

Commit

Permalink
common: Do not rethrow ClosedChannelException
Browse files Browse the repository at this point in the history
When we already get a ClosedChannelException, do not let
AbstractInterruptibleChannel#end rethrow another one.

#158
  • Loading branch information
kohlschuetter committed Jun 30, 2024
1 parent 8d2ab9d commit c665106
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,45 +143,48 @@ public final AFDatagramChannel<A> disconnect() throws IOException {
@Override
public final A receive(ByteBuffer dst) throws IOException {
boolean complete = false;
IOException exception = null;
try {
begin();
A ret = afSocket.getAFImpl().receive(dst);
complete = true;
return ret;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
throw (exception = InterruptibleChannelUtil.handleException(this, e)); // NOPMD.PreserveStackTrace
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
}
}

@Override
public final int send(ByteBuffer src, SocketAddress target) throws IOException {
boolean complete = false;
IOException exception = null;
try {
begin();
int ret = afSocket.getAFImpl().send(src, target);
complete = true;
return ret;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
throw (exception = InterruptibleChannelUtil.handleException(this, e)); // NOPMD.PreserveStackTrace
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
}
}

@Override
public final int read(ByteBuffer dst) throws IOException {
boolean complete = false;
IOException exception = null;
try {
begin();
int ret = afSocket.getAFImpl().read(dst, null);
complete = true;
return ret;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
throw (exception = InterruptibleChannelUtil.handleException(this, e)); // NOPMD.PreserveStackTrace
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
}
}

Expand All @@ -197,15 +200,16 @@ public final long read(ByteBuffer[] dsts, int offset, int length) throws IOExcep
@Override
public final int write(ByteBuffer src) throws IOException {
boolean complete = false;
IOException exception = null;
try {
begin();
int ret = afSocket.getAFImpl().write(src);
complete = true;
return ret;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
throw (exception = InterruptibleChannelUtil.handleException(this, e)); // NOPMD.PreserveStackTrace
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,16 @@ public final AFServerSocket<A> socket() {
@Override
public AFSocketChannel<A> accept() throws IOException {
boolean complete = false;
IOException exception = null;
try {
begin();
AFSocket<A> socket = afSocket.accept1(false);
complete = true;
return socket == null ? null : socket.getChannel();
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
throw (exception = InterruptibleChannelUtil.handleException(this, e)); // NOPMD.PreserveStackTrace
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ public final boolean isConnectionPending() {
@Override
public final boolean connect(SocketAddress remote) throws IOException {
boolean complete = false;
IOException exception = null;
try {
begin();
boolean connected = afSocket.connect0(remote, 0);
Expand All @@ -209,9 +210,9 @@ public final boolean connect(SocketAddress remote) throws IOException {
complete = true;
return connected;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally { // NOPMD.DoNotThrowExceptionInFinally
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
throw (exception = InterruptibleChannelUtil.handleException(this, e)); // NOPMD.PreserveStackTrace
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
}
}

Expand All @@ -224,6 +225,7 @@ public final boolean finishConnect() throws IOException {
}

boolean complete = false;
IOException exception = null;
try {
begin();
boolean connected = NativeUnixSocket.finishConnect(afSocket.getFileDescriptor())
Expand All @@ -234,9 +236,9 @@ public final boolean finishConnect() throws IOException {
complete = true;
return connected;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
throw (exception = InterruptibleChannelUtil.handleException(this, e)); // NOPMD.PreserveStackTrace
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
}
}

Expand All @@ -253,15 +255,16 @@ public final A getRemoteSocketAddress() {
@Override
public final int read(ByteBuffer dst) throws IOException {
boolean complete = false;
IOException exception = null;
try {
begin();
int read = afSocket.getAFImpl().read(dst, null);
complete = true;
return read;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally { // NOPMD.DoNotThrowExceptionInFinally
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
throw (exception = InterruptibleChannelUtil.handleException(this, e)); // NOPMD.PreserveStackTrace
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
}
}

Expand All @@ -286,15 +289,16 @@ public final long write(ByteBuffer[] srcs, int offset, int length) throws IOExce
@Override
public final int write(ByteBuffer src) throws IOException {
boolean complete = false;
IOException exception = null;
try {
begin();
int written = afSocket.getAFImpl().write(src);
complete = true;
return written;
} catch (IOException e) {
throw InterruptibleChannelUtil.handleException(this, e);
} finally { // NOPMD.DoNotThrowExceptionInFinally
InterruptibleChannelUtil.endInterruptable(this, this::end, complete);
throw (exception = InterruptibleChannelUtil.handleException(this, e)); // NOPMD.PreserveStackTrace
} finally {
InterruptibleChannelUtil.endInterruptable(this, this::end, complete, exception);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,17 @@ interface EndMethod {
* method.
* @param complete {@code true} if the block started with {@code begin} succeeded without an
* exception.
* @param exception An optional exception that was caught in the try-catch-finally block.
* @throws AsynchronousCloseException on error.
*/
static void endInterruptable(AbstractInterruptibleChannel channel, EndMethod end,
boolean complete) throws AsynchronousCloseException {
boolean complete, IOException exception) throws AsynchronousCloseException {
if (!complete) {
if (exception instanceof ClosedChannelException) {
// we already have caught a valid exception; we don't need to throw one from within "end"
complete = true;
}
}
try {
end.end(complete);
} catch (AsynchronousCloseException e) {
Expand Down Expand Up @@ -80,6 +87,7 @@ private static <T extends Exception> T closeAndThrow(AbstractInterruptibleChanne
* @param e The exception
* @return The exception.
*/
@SuppressWarnings("null")
static IOException handleException(AbstractInterruptibleChannel channel, IOException e) {
if (e instanceof SocketClosedException || e instanceof ClosedChannelException
|| e instanceof BrokenPipeSocketException) {
Expand All @@ -90,6 +98,11 @@ static IOException handleException(AbstractInterruptibleChannel channel, IOExcep
t.interrupt();
}
}

if (!(e instanceof ClosedChannelException)) {
// Make sure the caught exception is transformed into the expected exception
e = (ClosedChannelException) new ClosedChannelException().initCause(e);
}
return closeAndThrow(channel, e);
} else {
return e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
Expand Down Expand Up @@ -52,8 +51,8 @@

/**
* Test interrupt-related behavior, as discussed in
* <a href="https://github.com/kohlschutter/junixsocket/issues/158">issue 158.</a>
*
* <a href="https://github.com/kohlschutter/junixsocket/issues/158">issue 158</a>.
*
* @author https://github.com/cenodis
* @author Christian Kohlschütter
*/
Expand All @@ -80,18 +79,18 @@ private static List<Arguments> clientProvider() {
.getOutputStream().write(10), SocketException.class, AFUNIXSocket::isClosed),

socket(false, AFUNIXSocketChannel::open, s -> s.connect(SOCKET_ADDR),
AsynchronousCloseException.class, s -> !s.isOpen()), socket(true,
ClosedChannelException.class, s -> !s.isOpen()), socket(true,
InterruptIssue158Test::connectSocketChannel, s -> s.read(ByteBuffer.allocate(1)),
AsynchronousCloseException.class, s -> !s.isOpen()), socket(true,
ClosedChannelException.class, s -> !s.isOpen()), socket(true,
InterruptIssue158Test::connectSocketChannel, s -> s.write(ByteBuffer.allocate(
1)), AsynchronousCloseException.class, s -> !s.isOpen()));
1)), ClosedChannelException.class, s -> !s.isOpen()));
}

private static List<Arguments> serverProvider() {
return Arrays.asList(serverSocket(() -> AFUNIXServerSocket.bindOn(SOCKET_ADDR),
AFUNIXServerSocket::accept, SocketException.class, AFUNIXServerSocket::isClosed),
serverSocket(InterruptIssue158Test::bindServerSocketChannel,
AFUNIXServerSocketChannel::accept, AsynchronousCloseException.class, s -> !s.isOpen()));
AFUNIXServerSocketChannel::accept, ClosedChannelException.class, s -> !s.isOpen()));
}

@ParameterizedTest
Expand Down Expand Up @@ -212,10 +211,10 @@ <T extends AutoCloseable> Throwable runOperation(CountDownLatch ready, IOSupplie
// These tests usually expect the "Thread interrupted" state to be set.
// However, when we accept any SocketException to be thrown, that state is not
// deterministic.
// Also, when we expect any kind of AsynchronousCloseException, it is only expected to be
// Also, when we expect any kind of ClosedChannelException, it is only expected to be
// set when the actual exception thrown is from the ClosedByInterruptException subclass.
boolean ignoreInterruptState = SocketException.class.equals(expectedException);
boolean interruptStateOK = Thread.interrupted() || (AsynchronousCloseException.class.equals(
boolean interruptStateOK = Thread.interrupted() || (ClosedChannelException.class.equals(
expectedException) && !(e instanceof ClosedByInterruptException));

assertAll(() -> assertInstanceOf(expectedException, e, "Socket exception"),
Expand Down

0 comments on commit c665106

Please sign in to comment.