From fbd66a5cb4bd07fdbf919db9b14702438ce8d572 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 3 Oct 2024 12:33:44 +1000 Subject: [PATCH 1/7] Issue #11307 - Explicit demand control in WebSocket endpoints with only onWebSocketFrame Signed-off-by: Lachlan Roberts --- .../eclipse/jetty/websocket/api/Callback.java | 28 +++++ .../eclipse/jetty/websocket/api/Frame.java | 107 ++++++++++++++++++ .../common/JettyWebSocketFrameHandler.java | 65 ++++++++++- .../websocket/tests/ExplicitDemandTest.java | 31 +++-- .../tests/proxy/WebSocketProxyTest.java | 14 ++- .../tests/server/FrameListenerTest.java | 3 + 6 files changed, 228 insertions(+), 20 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Callback.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Callback.java index a07eca3e1258..719ff9695eb7 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Callback.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Callback.java @@ -57,6 +57,34 @@ public void fail(Throwable x) }; } + /** + * Creates a nested callback that runs completed after + * completing the nested callback. + * + * @param callback The nested callback + * @param completed The completion to run after the nested callback is completed + * @return a new callback. + */ + static Callback from(Callback callback, Runnable completed) + { + return new Callback() + { + @Override + public void succeed() + { + callback.succeed(); + completed.run(); + } + + @Override + public void fail(Throwable x) + { + callback.fail(x); + completed.run(); + } + }; + } + /** *

Method to invoke to succeed the callback.

* diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java index 0f5bd23923e9..4c8c3451777e 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java @@ -101,4 +101,111 @@ public String toString() boolean isRsv2(); boolean isRsv3(); + + class Wrapper implements Frame + { + private final Frame _frame; + + public Wrapper(Frame frame) + { + _frame = frame; + } + + @Override + public byte[] getMask() + { + return _frame.getMask(); + } + + @Override + public byte getOpCode() + { + return _frame.getOpCode(); + } + + @Override + public ByteBuffer getPayload() + { + return _frame.getPayload(); + } + + @Override + public int getPayloadLength() + { + return _frame.getPayloadLength(); + } + + @Override + public Type getType() + { + return _frame.getType(); + } + + @Override + public boolean hasPayload() + { + return _frame.hasPayload(); + } + + @Override + public boolean isFin() + { + return _frame.isFin(); + } + + @Override + public boolean isMasked() + { + return _frame.isMasked(); + } + + @Override + public boolean isRsv1() + { + return _frame.isRsv1(); + } + + @Override + public boolean isRsv2() + { + return _frame.isRsv2(); + } + + @Override + public boolean isRsv3() + { + return _frame.isRsv3(); + } + } + + static Frame copy(Frame frame) + { + ByteBuffer payloadCopy = copy(frame.getPayload()); + return new Frame.Wrapper(frame) + { + @Override + public ByteBuffer getPayload() + { + return payloadCopy; + } + + @Override + public int getPayloadLength() + { + return payloadCopy == null ? 0 : payloadCopy.remaining(); + } + }; + } + + private static ByteBuffer copy(ByteBuffer buffer) + { + if (buffer == null) + return null; + int p = buffer.position(); + ByteBuffer clone = buffer.isDirect() ? ByteBuffer.allocateDirect(buffer.remaining()) : ByteBuffer.allocate(buffer.remaining()); + clone.put(buffer); + clone.flip(); + buffer.position(p); + return clone; + } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 902784fda5d5..5effe583994a 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -206,18 +206,60 @@ public void onFrame(Frame frame, Callback coreCallback) coreCallback.failed(new WebSocketException(endpointInstance.getClass().getSimpleName() + " FRAME method error: " + cause.getMessage(), cause)); return; } + + switch (frame.getOpCode()) + { + case OpCode.TEXT -> + { + if (textHandle == null) + autoDemand(); + } + case OpCode.BINARY -> + { + if (binaryHandle == null) + autoDemand(); + } + case OpCode.CONTINUATION -> + { + if (activeMessageSink == null) + autoDemand(); + } + case OpCode.PING -> + { + if (pingHandle == null) + autoDemand(); + } + case OpCode.PONG -> + { + if (pongHandle == null) + autoDemand(); + } + case OpCode.CLOSE -> + { + // Do nothing. + } + default -> + { + coreCallback.failed(new IllegalStateException()); + return; + } + }; } Callback.Completable eventCallback = new Callback.Completable(); switch (frame.getOpCode()) { - case OpCode.CLOSE -> onCloseFrame(frame, eventCallback); - case OpCode.PING -> onPingFrame(frame, eventCallback); - case OpCode.PONG -> onPongFrame(frame, eventCallback); case OpCode.TEXT -> onTextFrame(frame, eventCallback); case OpCode.BINARY -> onBinaryFrame(frame, eventCallback); case OpCode.CONTINUATION -> onContinuationFrame(frame, eventCallback); - default -> coreCallback.failed(new IllegalStateException()); + case OpCode.PING -> onPingFrame(frame, eventCallback); + case OpCode.PONG -> onPongFrame(frame, eventCallback); + case OpCode.CLOSE -> onCloseFrame(frame, eventCallback); + default -> + { + coreCallback.failed(new IllegalStateException()); + return; + } }; // Combine the callback from the frame handler and the event handler. @@ -315,6 +357,13 @@ private void onPingFrame(Frame frame, Callback callback) } else { + // If we have a frameHandler it takes responsibility for handling the ping and demanding. + if (frameHandle != null) + { + callback.succeeded(); + return; + } + // Automatically respond. getSession().sendPong(frame.getPayload(), new org.eclipse.jetty.websocket.api.Callback() { @@ -358,7 +407,10 @@ private void onPongFrame(Frame frame, Callback callback) } else { - internalDemand(); + // If we have a frameHandler it takes responsibility for handling the pong and demanding. + callback.succeeded(); + if (frameHandle == null) + internalDemand(); } } @@ -387,7 +439,8 @@ private void acceptFrame(Frame frame, Callback callback) if (activeMessageSink == null) { callback.succeeded(); - internalDemand(); + if (frameHandle == null) + internalDemand(); return; } diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ExplicitDemandTest.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ExplicitDemandTest.java index 2994945b7c91..b25eb087105d 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ExplicitDemandTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ExplicitDemandTest.java @@ -72,12 +72,29 @@ public void onMessage(String message) throws IOException public static class ListenerSocket implements Session.Listener { final List frames = new CopyOnWriteArrayList<>(); + Session session; + + @Override + public void onWebSocketOpen(Session session) + { + this.session = session; + session.demand(); + } @Override public void onWebSocketFrame(Frame frame, Callback callback) { - frames.add(frame); + frames.add(Frame.copy(frame)); + + // Because no pingListener is registered, the frameListener is responsible for handling pings. + if (frame.getOpCode() == OpCode.PING) + { + session.sendPong(frame.getPayload(), Callback.from(callback, session::demand)); + return; + } + callback.succeed(); + session.demand(); } } @@ -109,27 +126,19 @@ public void onWebSocketFrame(Frame frame, Callback callback) if (frame.getOpCode() == OpCode.TEXT) textMessages.add(BufferUtil.toString(frame.getPayload())); callback.succeed(); + session.demand(); } } @WebSocket(autoDemand = false) public static class PingSocket extends ListenerSocket { - Session session; - - @Override - public void onWebSocketOpen(Session session) - { - this.session = session; - session.demand(); - } - @Override public void onWebSocketFrame(Frame frame, Callback callback) { - super.onWebSocketFrame(frame, callback); if (frame.getType() == Frame.Type.TEXT) session.sendPing(ByteBuffer.wrap("server-ping".getBytes(StandardCharsets.UTF_8)), Callback.NOOP); + super.onWebSocketFrame(frame, callback); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java index 62d8b8d100b4..e4725eb08438 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java @@ -347,10 +347,18 @@ public void onWebSocketFrame(Frame frame, Callback callback) { switch (frame.getOpCode()) { - case OpCode.PING -> pingMessages.add(BufferUtil.copy(frame.getPayload())); - case OpCode.PONG -> pongMessages.add(BufferUtil.copy(frame.getPayload())); + case OpCode.PING -> + { + pingMessages.add(BufferUtil.copy(frame.getPayload())); + session.sendPong(frame.getPayload(), callback); + } + case OpCode.PONG -> + { + pongMessages.add(BufferUtil.copy(frame.getPayload())); + callback.succeed(); + } + default -> callback.succeed(); } - callback.succeed(); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java index 6378deb4128e..9adc5a28813e 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/server/FrameListenerTest.java @@ -131,10 +131,12 @@ public static class FrameEndpoint implements Session.Listener { public CountDownLatch closeLatch = new CountDownLatch(1); public LinkedBlockingQueue frameEvents = new LinkedBlockingQueue<>(); + public Session session; @Override public void onWebSocketOpen(Session session) { + this.session = session; session.demand(); } @@ -147,6 +149,7 @@ public void onWebSocketFrame(Frame frame, Callback callback) BufferUtil.toUTF8String(frame.getPayload()), frame.getPayloadLength())); callback.succeed(); + session.demand(); } @Override From ba24335c6b97ad689c9ba17045e604eca7eeae3e Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 8 Oct 2024 14:04:25 +1100 Subject: [PATCH 2/7] PR #12342 - changes from review Signed-off-by: Lachlan Roberts --- .../common/JettyWebSocketFrameHandler.java | 43 +++---------------- 1 file changed, 5 insertions(+), 38 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 5effe583994a..653e0ac755c6 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -206,44 +206,6 @@ public void onFrame(Frame frame, Callback coreCallback) coreCallback.failed(new WebSocketException(endpointInstance.getClass().getSimpleName() + " FRAME method error: " + cause.getMessage(), cause)); return; } - - switch (frame.getOpCode()) - { - case OpCode.TEXT -> - { - if (textHandle == null) - autoDemand(); - } - case OpCode.BINARY -> - { - if (binaryHandle == null) - autoDemand(); - } - case OpCode.CONTINUATION -> - { - if (activeMessageSink == null) - autoDemand(); - } - case OpCode.PING -> - { - if (pingHandle == null) - autoDemand(); - } - case OpCode.PONG -> - { - if (pongHandle == null) - autoDemand(); - } - case OpCode.CLOSE -> - { - // Do nothing. - } - default -> - { - coreCallback.failed(new IllegalStateException()); - return; - } - }; } Callback.Completable eventCallback = new Callback.Completable(); @@ -361,6 +323,7 @@ private void onPingFrame(Frame frame, Callback callback) if (frameHandle != null) { callback.succeeded(); + autoDemand(); return; } @@ -411,6 +374,8 @@ private void onPongFrame(Frame frame, Callback callback) callback.succeeded(); if (frameHandle == null) internalDemand(); + else + autoDemand(); } } @@ -441,6 +406,8 @@ private void acceptFrame(Frame frame, Callback callback) callback.succeeded(); if (frameHandle == null) internalDemand(); + else + autoDemand(); return; } From c5ee9f63e826771386d38fe52ababba2652ed545 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 8 Oct 2024 14:27:54 +1100 Subject: [PATCH 3/7] PR #12342 - changes from review Signed-off-by: Lachlan Roberts --- .../org/eclipse/jetty/websocket/api/Frame.java | 15 +++++++++++++++ .../websocket/common/JettyWebSocketFrame.java | 14 ++++++++++++++ .../common/JettyWebSocketFrameHandler.java | 7 ++++++- 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java index 4c8c3451777e..460465434ce0 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java @@ -102,6 +102,15 @@ public String toString() boolean isRsv3(); + /** + * The effective opcode of the frame accounting for the CONTINUATION opcode. + * If the frame is a CONTINUATION frame for a TEXT message, this will return TEXT. + * If the frame is a CONTINUATION frame for a BINARY message, this will return BINARY. + * Otherwise, this will return the same opcode as the frame. + * @return the effective opcode of the frame. + */ + byte getEffectiveOpCode(); + class Wrapper implements Frame { private final Frame _frame; @@ -176,6 +185,12 @@ public boolean isRsv3() { return _frame.isRsv3(); } + + @Override + public byte getEffectiveOpCode() + { + return _frame.getEffectiveOpCode(); + } } static Frame copy(Frame frame) diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java index ed0580d37201..1622040d9d18 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java @@ -20,10 +20,18 @@ public class JettyWebSocketFrame implements org.eclipse.jetty.websocket.api.Frame { private final Frame frame; + private final byte effectiveOpCode; + @Deprecated public JettyWebSocketFrame(Frame frame) + { + this(frame, frame.getOpCode()); + } + + JettyWebSocketFrame(Frame frame, byte effectiveOpCode) { this.frame = frame; + this.effectiveOpCode = effectiveOpCode; } @Override @@ -92,6 +100,12 @@ public boolean isRsv3() return frame.isRsv3(); } + @Override + public byte getEffectiveOpCode() + { + return effectiveOpCode; + } + @Override public String toString() { diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 653e0ac755c6..39af5c854664 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -69,6 +69,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler private MessageSink binarySink; private MessageSink activeMessageSink; private WebSocketSession session; + private byte messageType; public JettyWebSocketFrameHandler(WebSocketContainer container, Object endpointInstance, JettyWebSocketFrameHandlerMetadata metadata) { @@ -193,13 +194,17 @@ private static MessageSink createMessageSink(Class sinkCl @Override public void onFrame(Frame frame, Callback coreCallback) { + if (frame.getOpCode() == OpCode.TEXT || frame.getOpCode() == OpCode.BINARY) + messageType = frame.getOpCode(); + CompletableFuture frameCallback = null; if (frameHandle != null) { try { + byte effectiveOpCode = frame.isDataFrame() ? messageType : frame.getOpCode(); frameCallback = new org.eclipse.jetty.websocket.api.Callback.Completable(); - frameHandle.invoke(new JettyWebSocketFrame(frame), frameCallback); + frameHandle.invoke(new JettyWebSocketFrame(frame, effectiveOpCode), frameCallback); } catch (Throwable cause) { From c6e7dc56c02b9ef9c9f62639bdc0f8c5a124b160 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 5 Nov 2024 11:09:56 +1100 Subject: [PATCH 4/7] PR #12342 - enforce websocket frameHandler cannot be used with message handlers Signed-off-by: Lachlan Roberts --- .../eclipse/jetty/websocket/api/Callback.java | 20 +++++-- .../eclipse/jetty/websocket/api/Frame.java | 5 ++ .../common/JettyWebSocketFrameHandler.java | 54 +++++-------------- .../JettyWebSocketFrameHandlerMetadata.java | 8 +++ 4 files changed, 42 insertions(+), 45 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Callback.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Callback.java index 719ff9695eb7..759153dbfdab 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Callback.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Callback.java @@ -72,15 +72,27 @@ static Callback from(Callback callback, Runnable completed) @Override public void succeed() { - callback.succeed(); - completed.run(); + try + { + callback.succeed(); + } + finally + { + completed.run(); + } } @Override public void fail(Throwable x) { - callback.fail(x); - completed.run(); + try + { + callback.fail(x); + } + finally + { + completed.run(); + } } }; } diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java index 460465434ce0..6e6b82f46b6b 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java @@ -120,6 +120,11 @@ public Wrapper(Frame frame) _frame = frame; } + public Frame getWrapped() + { + return _frame; + } + @Override public byte[] getMask() { diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 39af5c854664..a50c5541e46e 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -19,7 +19,6 @@ import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.util.BufferUtil; @@ -197,49 +196,37 @@ public void onFrame(Frame frame, Callback coreCallback) if (frame.getOpCode() == OpCode.TEXT || frame.getOpCode() == OpCode.BINARY) messageType = frame.getOpCode(); - CompletableFuture frameCallback = null; if (frameHandle != null) { try { byte effectiveOpCode = frame.isDataFrame() ? messageType : frame.getOpCode(); - frameCallback = new org.eclipse.jetty.websocket.api.Callback.Completable(); - frameHandle.invoke(new JettyWebSocketFrame(frame, effectiveOpCode), frameCallback); + frameHandle.invoke(new JettyWebSocketFrame(frame, effectiveOpCode), + org.eclipse.jetty.websocket.api.Callback.from(coreCallback::succeeded, coreCallback::failed)); } catch (Throwable cause) { coreCallback.failed(new WebSocketException(endpointInstance.getClass().getSimpleName() + " FRAME method error: " + cause.getMessage(), cause)); - return; } + + autoDemand(); + return; } - Callback.Completable eventCallback = new Callback.Completable(); switch (frame.getOpCode()) { - case OpCode.TEXT -> onTextFrame(frame, eventCallback); - case OpCode.BINARY -> onBinaryFrame(frame, eventCallback); - case OpCode.CONTINUATION -> onContinuationFrame(frame, eventCallback); - case OpCode.PING -> onPingFrame(frame, eventCallback); - case OpCode.PONG -> onPongFrame(frame, eventCallback); - case OpCode.CLOSE -> onCloseFrame(frame, eventCallback); + case OpCode.TEXT -> onTextFrame(frame, coreCallback); + case OpCode.BINARY -> onBinaryFrame(frame, coreCallback); + case OpCode.CONTINUATION -> onContinuationFrame(frame, coreCallback); + case OpCode.PING -> onPingFrame(frame, coreCallback); + case OpCode.PONG -> onPongFrame(frame, coreCallback); + case OpCode.CLOSE -> onCloseFrame(frame, coreCallback); default -> { coreCallback.failed(new IllegalStateException()); return; } }; - - // Combine the callback from the frame handler and the event handler. - CompletableFuture callback = eventCallback; - if (frameCallback != null) - callback = frameCallback.thenCompose(ignored -> eventCallback); - callback.whenComplete((r, x) -> - { - if (x == null) - coreCallback.succeeded(); - else - coreCallback.failed(x); - }); } @Override @@ -324,14 +311,6 @@ private void onPingFrame(Frame frame, Callback callback) } else { - // If we have a frameHandler it takes responsibility for handling the ping and demanding. - if (frameHandle != null) - { - callback.succeeded(); - autoDemand(); - return; - } - // Automatically respond. getSession().sendPong(frame.getPayload(), new org.eclipse.jetty.websocket.api.Callback() { @@ -375,12 +354,8 @@ private void onPongFrame(Frame frame, Callback callback) } else { - // If we have a frameHandler it takes responsibility for handling the pong and demanding. callback.succeeded(); - if (frameHandle == null) - internalDemand(); - else - autoDemand(); + internalDemand(); } } @@ -409,10 +384,7 @@ private void acceptFrame(Frame frame, Callback callback) if (activeMessageSink == null) { callback.succeeded(); - if (frameHandle == null) - internalDemand(); - else - autoDemand(); + internalDemand(); return; } diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerMetadata.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerMetadata.java index 0a5009558f8a..36724749ad0d 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerMetadata.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerMetadata.java @@ -46,6 +46,7 @@ public void setAutoDemand(boolean autoDemand) public void setBinaryHandle(Class sinkClass, MethodHandle binary, Object origin) { assertNotSet(this.binaryHandle, "BINARY Handler", origin); + assertNotSet(this.frameHandle, "FRAME Handler", origin); this.binaryHandle = binary; this.binarySink = sinkClass; } @@ -85,6 +86,10 @@ public MethodHandle getErrorHandle() public void setFrameHandle(MethodHandle frame, Object origin) { assertNotSet(this.frameHandle, "FRAME Handler", origin); + assertNotSet(this.textHandle, "TEXT Handler", origin); + assertNotSet(this.binaryHandle, "BINARY Handler", origin); + assertNotSet(this.pingHandle, "PING Handler", origin); + assertNotSet(this.pongHandle, "PONG Handler", origin); this.frameHandle = frame; } @@ -107,6 +112,7 @@ public MethodHandle getOpenHandle() public void setPingHandle(MethodHandle ping, Object origin) { assertNotSet(this.pingHandle, "PING Handler", origin); + assertNotSet(this.frameHandle, "FRAME Handler", origin); this.pingHandle = ping; } @@ -118,6 +124,7 @@ public MethodHandle getPingHandle() public void setPongHandle(MethodHandle pong, Object origin) { assertNotSet(this.pongHandle, "PONG Handler", origin); + assertNotSet(this.frameHandle, "FRAME Handler", origin); this.pongHandle = pong; } @@ -129,6 +136,7 @@ public MethodHandle getPongHandle() public void setTextHandle(Class sinkClass, MethodHandle text, Object origin) { assertNotSet(this.textHandle, "TEXT Handler", origin); + assertNotSet(this.frameHandle, "FRAME Handler", origin); this.textHandle = text; this.textSink = sinkClass; } From 4ac49f63dd1bb4d7f0d43c67363ee57046c26e26 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 5 Nov 2024 12:34:43 +1100 Subject: [PATCH 5/7] PR #12342 - fix some broken websocket tests Signed-off-by: Lachlan Roberts --- .../tests/client/ClientCloseTest.java | 21 +++++- .../tests/proxy/WebSocketProxyTest.java | 66 +++++++++++++++---- 2 files changed, 74 insertions(+), 13 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java index 877d9bbae90b..aadda72bad8c 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/client/ClientCloseTest.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.handler.ContextHandler; import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.websocket.api.Callback; import org.eclipse.jetty.websocket.api.Frame; import org.eclipse.jetty.websocket.api.Session; @@ -417,6 +418,7 @@ public static class ServerEndpoint implements Session.Listener.AutoDemanding private static final Logger LOG = LoggerFactory.getLogger(ServerEndpoint.class); private Session session; CountDownLatch block = new CountDownLatch(1); + Utf8StringBuilder stringBuilder = new Utf8StringBuilder(); @Override public void onWebSocketOpen(Session session) @@ -424,8 +426,7 @@ public void onWebSocketOpen(Session session) this.session = session; } - @Override - public void onWebSocketText(String message) + public void onText(String message) { try { @@ -510,7 +511,23 @@ else if (reason.startsWith("sleep|")) LOG.trace("IGNORED", x); } } + else + { + session.close(closeInfo.getCode(), reason, callback); + return; + } } + else if (frame.getEffectiveOpCode() == OpCode.TEXT) + { + stringBuilder.append(frame.getPayload()); + if (frame.isFin()) + { + String completeString = stringBuilder.toCompleteString(); + stringBuilder.reset(); + onText(completeString); + } + } + callback.succeed(); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java index e4725eb08438..e71dd5772b26 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/proxy/WebSocketProxyTest.java @@ -18,6 +18,7 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.eclipse.jetty.logging.StacklessLogging; @@ -32,7 +33,10 @@ import org.eclipse.jetty.websocket.api.Frame; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketFrame; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen; import org.eclipse.jetty.websocket.api.annotations.WebSocket; import org.eclipse.jetty.websocket.api.exceptions.WebSocketException; import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; @@ -44,6 +48,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -265,9 +271,6 @@ public void timeoutTest() throws Exception @Test public void testPingPong() throws Exception { - PingPongSocket serverEndpoint = new PingPongSocket(); - serverSocket = serverEndpoint; - PingPongSocket clientSocket = new PingPongSocket(); client.connect(clientSocket, proxyUri); assertTrue(clientSocket.openLatch.await(5, TimeUnit.SECONDS)); @@ -276,12 +279,10 @@ public void testPingPong() throws Exception // Test unsolicited pong from client. ByteBuffer b2 = BufferUtil.toBuffer("unsolicited pong from client"); clientSocket.session.sendPong(b2, Callback.NOOP); - assertThat(serverEndpoint.pingMessages.size(), is(0)); - assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from client"))); // Test unsolicited pong from server. ByteBuffer b1 = BufferUtil.toBuffer("unsolicited pong from server"); - serverEndpoint.session.sendPong(b1, Callback.NOOP); + serverSocket.session.sendPong(b1, Callback.NOOP); assertThat(clientSocket.pingMessages.size(), is(0)); assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(BufferUtil.toBuffer("unsolicited pong from server"))); @@ -293,7 +294,6 @@ public void testPingPong() throws Exception } for (int i = 0; i < 15; i++) { - assertThat(serverEndpoint.pingMessages.poll(5, TimeUnit.SECONDS), is(intToStringByteBuffer(i))); assertThat(clientSocket.pongMessages.poll(5, TimeUnit.SECONDS), is(intToStringByteBuffer(i))); } @@ -301,12 +301,11 @@ public void testPingPong() throws Exception for (int i = 0; i < 23; i++) { ByteBuffer b = intToStringByteBuffer(i); - serverEndpoint.session.sendPing(b, Callback.NOOP); + serverSocket.session.sendPing(b, Callback.NOOP); } for (int i = 0; i < 23; i++) { assertThat(clientSocket.pingMessages.poll(5, TimeUnit.SECONDS), is(intToStringByteBuffer(i))); - assertThat(serverEndpoint.pongMessages.poll(5, TimeUnit.SECONDS), is(intToStringByteBuffer(i))); } clientSocket.session.close(StatusCode.NORMAL, "closing from test", Callback.NOOP); @@ -328,7 +327,6 @@ public void testPingPong() throws Exception // Check we had no unexpected pings or pongs sent. assertThat(clientSocket.pingMessages.size(), is(0)); - assertThat(serverEndpoint.pingMessages.size(), is(0)); } private ByteBuffer intToStringByteBuffer(int i) @@ -337,11 +335,32 @@ private ByteBuffer intToStringByteBuffer(int i) } @WebSocket - public static class PingPongSocket extends EventSocket + public static class PingPongSocket { public BlockingQueue pingMessages = new BlockingArrayQueue<>(); public BlockingQueue pongMessages = new BlockingArrayQueue<>(); + private static final Logger LOG = LoggerFactory.getLogger(EventSocket.class); + + public Session session; + + public volatile int closeCode = StatusCode.UNDEFINED; + public volatile String closeReason; + public volatile Throwable error = null; + + public CountDownLatch openLatch = new CountDownLatch(1); + public CountDownLatch errorLatch = new CountDownLatch(1); + public CountDownLatch closeLatch = new CountDownLatch(1); + + @OnWebSocketOpen + public void onOpen(Session session) + { + this.session = session; + if (LOG.isDebugEnabled()) + LOG.debug("{} onOpen(): {}", this, session); + openLatch.countDown(); + } + @OnWebSocketFrame public void onWebSocketFrame(Frame frame, Callback callback) { @@ -360,6 +379,31 @@ public void onWebSocketFrame(Frame frame, Callback callback) default -> callback.succeed(); } } + + @OnWebSocketClose + public void onClose(int statusCode, String reason) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onClose(): {}:{}", this, statusCode, reason); + this.closeCode = statusCode; + this.closeReason = reason; + closeLatch.countDown(); + } + + @OnWebSocketError + public void onError(Throwable cause) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onError(): {}", this, cause); + error = cause; + errorLatch.countDown(); + } + + @Override + public String toString() + { + return String.format("[%s@%x]", getClass().getSimpleName(), hashCode()); + } } @WebSocket From 59d51364a0d603480fd420e24a4a8db2fb11cd9b Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Mon, 11 Nov 2024 17:58:16 +1100 Subject: [PATCH 6/7] PR #12342 - changes from review Signed-off-by: Lachlan Roberts --- .../jetty/websocket/common/JettyWebSocketFrame.java | 9 +++++++++ .../websocket/common/JettyWebSocketFrameHandler.java | 8 ++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java index 1622040d9d18..2b7a043cc93e 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java @@ -22,12 +22,21 @@ public class JettyWebSocketFrame implements org.eclipse.jetty.websocket.api.Fram private final Frame frame; private final byte effectiveOpCode; + /** + * @param frame the core websocket {@link Frame} to wrap as a {@link org.eclipse.jetty.websocket.api.Frame}. + * @deprecated there is no alternative intended to publicly construct a {@link JettyWebSocketFrame}. + */ @Deprecated public JettyWebSocketFrame(Frame frame) { this(frame, frame.getOpCode()); } + /** + * @param frame the core websocket {@link Frame} to wrap as a Jetty API {@link org.eclipse.jetty.websocket.api.Frame}. + * @param effectiveOpCode the effective OpCode of the Frame, where any CONTINUATION should be replaced with the + * initial opcode of that websocket message. + */ JettyWebSocketFrame(Frame frame, byte effectiveOpCode) { this.frame = frame; diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index a50c5541e46e..1b4e79e07a0f 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -221,12 +221,8 @@ public void onFrame(Frame frame, Callback coreCallback) case OpCode.PING -> onPingFrame(frame, coreCallback); case OpCode.PONG -> onPongFrame(frame, coreCallback); case OpCode.CLOSE -> onCloseFrame(frame, coreCallback); - default -> - { - coreCallback.failed(new IllegalStateException()); - return; - } - }; + default -> coreCallback.failed(new IllegalStateException()); + } } @Override From 8896b10fb30934dae8d2b60b17c9bc6052353b72 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Tue, 12 Nov 2024 14:51:45 +1100 Subject: [PATCH 7/7] PR #12342 - changes from review Signed-off-by: Lachlan Roberts --- .../eclipse/jetty/websocket/api/Frame.java | 127 ++---------------- .../eclipse/jetty/websocket/api/Session.java | 2 + .../websocket/common/JettyWebSocketFrame.java | 12 +- .../websocket/tests/ExplicitDemandTest.java | 24 +++- 4 files changed, 43 insertions(+), 122 deletions(-) diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java index 6e6b82f46b6b..5c347e2142b9 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Frame.java @@ -102,6 +102,15 @@ public String toString() boolean isRsv3(); + default CloseStatus getCloseStatus() + { + return null; + } + + record CloseStatus(int statusCode, String reason) + { + } + /** * The effective opcode of the frame accounting for the CONTINUATION opcode. * If the frame is a CONTINUATION frame for a TEXT message, this will return TEXT. @@ -110,122 +119,4 @@ public String toString() * @return the effective opcode of the frame. */ byte getEffectiveOpCode(); - - class Wrapper implements Frame - { - private final Frame _frame; - - public Wrapper(Frame frame) - { - _frame = frame; - } - - public Frame getWrapped() - { - return _frame; - } - - @Override - public byte[] getMask() - { - return _frame.getMask(); - } - - @Override - public byte getOpCode() - { - return _frame.getOpCode(); - } - - @Override - public ByteBuffer getPayload() - { - return _frame.getPayload(); - } - - @Override - public int getPayloadLength() - { - return _frame.getPayloadLength(); - } - - @Override - public Type getType() - { - return _frame.getType(); - } - - @Override - public boolean hasPayload() - { - return _frame.hasPayload(); - } - - @Override - public boolean isFin() - { - return _frame.isFin(); - } - - @Override - public boolean isMasked() - { - return _frame.isMasked(); - } - - @Override - public boolean isRsv1() - { - return _frame.isRsv1(); - } - - @Override - public boolean isRsv2() - { - return _frame.isRsv2(); - } - - @Override - public boolean isRsv3() - { - return _frame.isRsv3(); - } - - @Override - public byte getEffectiveOpCode() - { - return _frame.getEffectiveOpCode(); - } - } - - static Frame copy(Frame frame) - { - ByteBuffer payloadCopy = copy(frame.getPayload()); - return new Frame.Wrapper(frame) - { - @Override - public ByteBuffer getPayload() - { - return payloadCopy; - } - - @Override - public int getPayloadLength() - { - return payloadCopy == null ? 0 : payloadCopy.remaining(); - } - }; - } - - private static ByteBuffer copy(ByteBuffer buffer) - { - if (buffer == null) - return null; - int p = buffer.position(); - ByteBuffer clone = buffer.isDirect() ? ByteBuffer.allocateDirect(buffer.remaining()) : ByteBuffer.allocate(buffer.remaining()); - clone.put(buffer); - clone.flip(); - buffer.position(p); - return clone; - } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java index aba060cb4a3c..ecd950ec44d2 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java @@ -227,6 +227,7 @@ default void onWebSocketOpen(Session session) * or data frames either BINARY or TEXT.

* * @param frame the received frame + * @param callback the callback to complete once the frame has been processed. */ default void onWebSocketFrame(Frame frame, Callback callback) { @@ -284,6 +285,7 @@ default void onWebSocketPartialText(String payload, boolean last) *

A WebSocket BINARY message has been received.

* * @param payload the raw payload array received + * @param callback the callback to complete when the payload has been processed */ default void onWebSocketBinary(ByteBuffer payload, Callback callback) { diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java index 2b7a043cc93e..317aff7507bb 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrame.java @@ -16,6 +16,7 @@ import java.nio.ByteBuffer; import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.OpCode; public class JettyWebSocketFrame implements org.eclipse.jetty.websocket.api.Frame { @@ -26,7 +27,7 @@ public class JettyWebSocketFrame implements org.eclipse.jetty.websocket.api.Fram * @param frame the core websocket {@link Frame} to wrap as a {@link org.eclipse.jetty.websocket.api.Frame}. * @deprecated there is no alternative intended to publicly construct a {@link JettyWebSocketFrame}. */ - @Deprecated + @Deprecated(forRemoval = true, since = "12.1.0") public JettyWebSocketFrame(Frame frame) { this(frame, frame.getOpCode()); @@ -115,6 +116,15 @@ public byte getEffectiveOpCode() return effectiveOpCode; } + @Override + public CloseStatus getCloseStatus() + { + if (getOpCode() != OpCode.CLOSE) + return null; + org.eclipse.jetty.websocket.core.CloseStatus closeStatus = org.eclipse.jetty.websocket.core.CloseStatus.getCloseStatus(frame); + return new CloseStatus(closeStatus.getCode(), closeStatus.getReason()); + } + @Override public String toString() { diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ExplicitDemandTest.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ExplicitDemandTest.java index b25eb087105d..637f56251942 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ExplicitDemandTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/ExplicitDemandTest.java @@ -44,6 +44,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -72,6 +73,7 @@ public void onMessage(String message) throws IOException public static class ListenerSocket implements Session.Listener { final List frames = new CopyOnWriteArrayList<>(); + final List callbacks = new CopyOnWriteArrayList<>(); Session session; @Override @@ -84,16 +86,22 @@ public void onWebSocketOpen(Session session) @Override public void onWebSocketFrame(Frame frame, Callback callback) { - frames.add(Frame.copy(frame)); + frames.add(frame); + callbacks.add(callback); // Because no pingListener is registered, the frameListener is responsible for handling pings. if (frame.getOpCode() == OpCode.PING) { - session.sendPong(frame.getPayload(), Callback.from(callback, session::demand)); + session.sendPong(frame.getPayload(), Callback.from(session::demand, callback::fail)); + return; + } + else if (frame.getOpCode() == OpCode.CLOSE) + { + Frame.CloseStatus closeStatus = frame.getCloseStatus(); + session.close(closeStatus.statusCode(), closeStatus.reason(), Callback.NOOP); return; } - callback.succeed(); session.demand(); } } @@ -226,13 +234,23 @@ public void testNoAutoDemand() throws Exception Frame frame0 = listenerSocket.frames.get(0); assertThat(frame0.getType(), is(Frame.Type.PONG)); assertThat(StandardCharsets.UTF_8.decode(frame0.getPayload()).toString(), is("ping-0")); + Callback callback0 = listenerSocket.callbacks.get(0); + assertNotNull(callback0); + callback0.succeed(); + Frame frame1 = listenerSocket.frames.get(1); assertThat(frame1.getType(), is(Frame.Type.PONG)); assertThat(StandardCharsets.UTF_8.decode(frame1.getPayload()).toString(), is("ping-1")); + Callback callback1 = listenerSocket.callbacks.get(1); + assertNotNull(callback1); + callback1.succeed(); session.close(); await().atMost(5, TimeUnit.SECONDS).until(listenerSocket.frames::size, is(3)); assertThat(listenerSocket.frames.get(2).getType(), is(Frame.Type.CLOSE)); + Callback closeCallback = listenerSocket.callbacks.get(2); + assertNotNull(closeCallback); + closeCallback.succeed(); } @Test