diff --git a/documentation/jetty-documentation/src/main/asciidoc/programming-guide/client/websocket/client-websocket.adoc b/documentation/jetty-documentation/src/main/asciidoc/programming-guide/client/websocket/client-websocket.adoc index 153cbb271698..531a7c828b00 100644 --- a/documentation/jetty-documentation/src/main/asciidoc/programming-guide/client/websocket/client-websocket.adoc +++ b/documentation/jetty-documentation/src/main/asciidoc/programming-guide/client/websocket/client-websocket.adoc @@ -35,7 +35,7 @@ The Maven artifact coordinates are the following: [[pg-client-websocket-start]] ==== Starting WebSocketClient -The main class is `org.eclipse.jetty.ee9.websocket.client.WebSocketClient`; you instantiate it, configure it, and then start it like may other Jetty components. +The main class is `org.eclipse.jetty.websocket.client.WebSocketClient`; you instantiate it, configure it, and then start it like may other Jetty components. This is a minimal example: [source,java,indent=0] @@ -124,7 +124,7 @@ In code: include::../../{doc_code}/org/eclipse/jetty/docs/programming/client/websocket/WebSocketClientDocs.java[tags=connectHTTP11] ---- -`WebSocketClient.connect()` links the client-side WebSocket _endpoint_ to a specific server URI, and returns a `CompletableFuture` of an `org.eclipse.jetty.ee9.websocket.api.Session`. +`WebSocketClient.connect()` links the client-side WebSocket _endpoint_ to a specific server URI, and returns a `CompletableFuture` of an `org.eclipse.jetty.websocket.api.Session`. The endpoint offers APIs to _receive_ WebSocket data (or errors) from the server, while the session offers APIs to _send_ WebSocket data to the server. @@ -133,7 +133,7 @@ The endpoint offers APIs to _receive_ WebSocket data (or errors) from the server Initiating a WebSocket communication with a server using HTTP/1.1 is detailed in link:https://tools.ietf.org/html/rfc8441[RFC 8441]. -A WebSocket client establishes a TCP connection to the server or reuses an existing one currently used for HTTP/2, then sends an HTTP/2 _connect_ request over an HTTP/2 stream. +A WebSocket client establishes a TCP connection to the server or reuses an existing one currently used for HTTP/2, then sends an HTTP/2 _CONNECT_ request over an HTTP/2 stream. If the server supports upgrading to WebSocket, it responds with HTTP status code `200`, then switches the communication over that stream, either incoming or outgoing, to happen using HTTP/2 `DATA` frames wrapping WebSocket frames. diff --git a/documentation/jetty-documentation/src/main/asciidoc/programming-guide/websocket.adoc b/documentation/jetty-documentation/src/main/asciidoc/programming-guide/websocket.adoc index 09b140a57ada..40ce4f6bfc73 100644 --- a/documentation/jetty-documentation/src/main/asciidoc/programming-guide/websocket.adoc +++ b/documentation/jetty-documentation/src/main/asciidoc/programming-guide/websocket.adoc @@ -35,44 +35,48 @@ A WebSocket endpoint is the entity that receives WebSocket events. The WebSocket events are the following: -* The _connect_ event. +* The _open_ event. This event is emitted when the WebSocket communication has been successfully established. -Applications interested in the connect event receive the WebSocket _session_ so that they can use it to send data to the remote peer. +Applications interested in the open event receive the WebSocket _session_ so that they can use it to send data to the remote peer. * The _close_ event. This event is emitted when the WebSocket communication has been closed. Applications interested in the close event receive a WebSocket status code and an optional close reason message. * The _error_ event. This event is emitted when the WebSocket communication encounters a fatal error, such as an I/O error (for example, the network connection has been broken), or a protocol error (for example, the remote peer sends an invalid WebSocket frame). Applications interested in the error event receive a `Throwable` that represent the error. -* The _message_ event. -The message event is emitted when a WebSocket message is received. -Only one thread at a time will be delivering a message event to the `onMessage` method; the next message event will not be delivered until the previous call to the `onMessage` method has exited. -Endpoints will always be notified of message events in the same order they were received over the network. +* The _frame_ events. +The frame events are emitted when a WebSocket frame is received, either a control frame such as PING, PONG or CLOSE, or a data frame such as BINARY or TEXT. +One or more data frames of the same type define a _message_. +* The _message_ events. +The message event are emitted when a WebSocket message is received. The message event can be of two types: -** Textual message event. +** TEXT. Applications interested in this type of messages receive a `String` representing the UTF-8 bytes received. -** Binary message event. -Applications interested in this type of messages receive a `byte[]` representing the raw bytes received. +** BINARY. +Applications interested in this type of messages receive a `ByteBuffer` representing the raw bytes received. + +Only one thread at a time will be delivering frame or message events to the corresponding methods; the next frame or message event will not be delivered until the previous call to the corresponding method has exited, and if there is demand for it. +Endpoints will always be notified of message events in the same order they were received over the network. [[pg-websocket-endpoints-listener]] ===== Listener Endpoints -A WebSocket endpoint may implement the `org.eclipse.jetty.ee9.websocket.api.WebSocketListener` interface to receive WebSocket events: +A WebSocket endpoint may implement the `org.eclipse.jetty.websocket.api.Session.Listener` interface to receive WebSocket events: [source,java,indent=0] ---- include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=listenerEndpoint] ---- -<1> Your listener class implements `WebSocketListener`. +<1> Your listener class implements `Session.Listener`. ====== Message Streaming Reads If you need to deal with large WebSocket messages, you may reduce the memory usage by streaming the message content. For large WebSocket messages, the memory usage may be large due to the fact that the text or the bytes must be accumulated until the message is complete before delivering the message event. -To stream textual or binary messages, you must implement interface `org.eclipse.jetty.ee9.websocket.api.WebSocketPartialListener` instead of `WebSocketListener`. +To stream textual or binary messages, you override either `org.eclipse.jetty.websocket.api.Session.Listener.onWebSocketPartialText(\...)` or `org.eclipse.jetty.websocket.api.Session.Listener.onWebSocketPartialBinary(\...)`. -Interface `WebSocketPartialListener` exposes one method for textual messages, and one method to binary messages that receive _chunks_ of, respectively, text and bytes that form the whole WebSocket message. +These methods that receive _chunks_ of, respectively, text and bytes that form the whole WebSocket message. You may accumulate the chunks yourself, or process each chunk as it arrives, or stream the chunks elsewhere, for example: @@ -84,7 +88,7 @@ include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=s [[pg-websocket-endpoints-annotated]] ===== Annotated Endpoints -A WebSocket endpoint may annotate methods with `org.eclipse.jetty.ee9.websocket.api.annotations.*` annotations to receive WebSocket events. +A WebSocket endpoint may annotate methods with `org.eclipse.jetty.websocket.api.annotations.*` annotations to receive WebSocket events. Each annotated method may take an optional `Session` argument as its first parameter: [source,java,indent=0] @@ -92,7 +96,7 @@ Each annotated method may take an optional `Session` argument as its first param include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=annotatedEndpoint] ---- <1> Use the `@WebSocket` annotation at the class level to make it a WebSocket endpoint. -<2> Use the `@OnWebSocketConnect` annotation for the _connect_ event. +<2> Use the `@OnWebSocketOpen` annotation for the _open_ event. As this is the first event notified to the endpoint, you can configure the `Session` object. <3> Use the `@OnWebSocketClose` annotation for the _close_ event. The method may take an optional `Session` as first parameter. @@ -103,20 +107,12 @@ The method may take an optional `Session` as first parameter. [NOTE] ==== -For binary messages, you may declare the annotated method with either or these two signatures: +For binary messages, you may declare the annotated method with this signature: [source,java] ---- @OnWebSocketMessage -public void methodName(byte[] bytes, int offset, int length) { ... } ----- - -or - -[source,java] ----- -@OnWebSocketMessage -public void methodName(ByteBuffer buffer) { ... } +public void methodName(ByteBuffer buffer, Callback callback) { ... } ---- ==== @@ -144,8 +140,8 @@ A WebSocket session is the entity that offers an API to send data to the remote [[pg-websocket-session-configure]] ===== Configuring the Session -You may configure the WebSocket session behavior using the `org.eclipse.jetty.ee9.websocket.api.Session` APIs. -You want to do this as soon as you have access to the `Session` object, typically from the xref:pg-websocket-endpoints[_connect_ event] handler: +You may configure the WebSocket session behavior using the `org.eclipse.jetty.websocket.api.Session` APIs. +You want to do this as soon as you have access to the `Session` object, typically from the xref:pg-websocket-endpoints[_open_ event] handler: [source,java,indent=0] ---- @@ -180,44 +176,17 @@ Please refer to the `Session` link:{javadoc-url}/org/eclipse/jetty/websocket/api [[pg-websocket-session-send]] ===== Sending Data -To send data to the remote peer, you need to obtain the `RemoteEndpoint` object from the `Session`, and then use its API to send data. - -`RemoteEndpoint` offers two styles of APIs to send data: - -* Blocking APIs, where the call returns when the data has been sent, or throws an `IOException` if the data cannot be sent. -* Non-blocking APIs, where a callback object is notified when the data has been sent, or when there was a failure sending the data. - -[[pg-websocket-session-send-blocking]] -====== Blocking APIs - -`RemoteEndpoint` blocking APIs throw `IOException`: - -[source,java,indent=0] ----- -include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=sendBlocking] ----- - -Blocking APIs are simpler to use since they can be invoked one after the other sequentially. - -[CAUTION] -==== -Sending large messages to the remote peer may cause the sending thread to block, possibly exhausting the thread pool. -Consider using non-blocking APIs for large messages. -==== - -[[pg-websocket-session-send-non-blocking]] -====== Non-Blocking APIs - -`RemoteEndpoint` non-blocking APIs have an additional callback parameter: +To send data to the remote peer, you can use the non-blocking APIs offered by `Session`. [source,java,indent=0] ---- include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=sendNonBlocking] ---- -<1> Non-blocking APIs require a `WriteCallback` parameter. +<1> Non-blocking APIs require a `Callback` parameter. <2> Note how the second send must be performed from inside the callback. <3> Sequential sends may throw `WritePendingException`. +// TODO: rewrite this section in light of maxOutgoingFrames. Non-blocking APIs are more difficult to use since you are required to meet the following condition: [IMPORTANT] @@ -248,49 +217,42 @@ While non-blocking APIs are more difficult to use, they don't block the sender t If you need to send large WebSocket messages, you may reduce the memory usage by streaming the message content. -Both blocking and non-blocking APIs offer `sendPartial*(...)` methods that allow you to send a chunk of the whole message at a time, therefore reducing the memory usage since it is not necessary to have the whole message `String` or `byte[]` in memory to send it. - -Streaming sends using blocking APIs is quite simple: - -[source,java,indent=0] ----- -include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=streamSendBlocking] ----- +The APIs offer `sendPartial*(\...)` methods that allow you to send a chunk of the whole message at a time, therefore reducing the memory usage since it is not necessary to have the whole message `String` or `ByteBuffer` in memory to send it. -Streaming sends using non-blocking APIs is more complicated, as you should wait (without blocking!) for the callbacks to complete. +The APIs for streaming the message content are non-blocking and therefore slightly more complicated to use, as you should wait (without blocking!) for the callbacks to complete. -Fortunately, Jetty provides you with the `IteratingCallback` utility class (described in more details xref:pg-arch-io-echo[in this section]) which greatly simplify the use of non-blocking APIs: +Fortunately, Jetty provides the `IteratingCallback` utility class (described in more details xref:pg-arch-io-echo[in this section]) which greatly simplify the use of non-blocking APIs: [source,java,indent=0] ---- include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=streamSendNonBlocking] ---- -<1> Implementing `WriteCallback` allows to pass `this` to `sendPartialBytes(...)`. -<2> The `process()` method is called iteratively when each `sendPartialBytes(...)` is completed. -<3> Send the message chunks. +<1> Implementing `Callback` allows to pass `this` to `sendPartialBinary(...)`. +<2> The `process()` method is called iteratively when each `sendPartialBinary(...)` is completed. +<3> Sends the message chunks. [[pg-websocket-session-ping]] ===== Sending Ping/Pong -The WebSocket protocol defines two special frame, named `Ping` and `Pong` that may be interesting to applications for these use cases: +The WebSocket protocol defines two special frame, named `PING` and `PONG` that may be interesting to applications for these use cases: * Calculate the round-trip time with the remote peer. * Keep the connection from being closed due to idle timeout -- a heartbeat-like mechanism. -To handle `Ping`/`Pong` events, you may implement interface `org.eclipse.jetty.ee9.websocket.api.WebSocketPingPongListener`. +To handle `PING`/`PONG` events, you may implement methods `Session.Listener.onWebSocketPing(ByteBuffer)` and/or `Session.Listener.onWebSocketPong(ByteBuffer)`. [NOTE] ==== -`Ping`/`Pong` events are not supported when using annotations. +`PING`/`PONG` events are also supported when using annotations via the `OnWebSocketFrame` annotation. ==== -`Ping` frames may contain opaque application bytes, and the WebSocket implementation replies to them with a `Pong` frame containing the same bytes: +`PING` frames may contain opaque application bytes, and the WebSocket implementation replies to them with a `PONG` frame containing the same bytes: [source,java,indent=0] ---- include::{doc_code}/org/eclipse/jetty/docs/programming/WebSocketDocs.java[tags=pingPongListener] ---- -<1> The WebSocket endpoint class must implement `WebSocketPingPongListener` +<1> The WebSocket endpoint class must implement `Session.Listener` [[pg-websocket-session-close]] ===== Closing the Session diff --git a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java index 9433a1d49194..3065ffb18867 100644 --- a/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java +++ b/documentation/jetty-documentation/src/main/java/org/eclipse/jetty/docs/programming/WebSocketDocs.java @@ -13,7 +13,6 @@ package org.eclipse.jetty.docs.programming; -import java.io.IOException; import java.io.InputStream; import java.io.Reader; import java.nio.ByteBuffer; @@ -22,17 +21,13 @@ import org.eclipse.jetty.util.IteratingCallback; import org.eclipse.jetty.util.NanoTime; -import org.eclipse.jetty.websocket.api.RemoteEndpoint; +import org.eclipse.jetty.websocket.api.Callback; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; -import org.eclipse.jetty.websocket.api.WebSocketListener; -import org.eclipse.jetty.websocket.api.WebSocketPartialListener; -import org.eclipse.jetty.websocket.api.WebSocketPingPongListener; -import org.eclipse.jetty.websocket.api.WriteCallback; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen; import org.eclipse.jetty.websocket.api.annotations.WebSocket; @SuppressWarnings("unused") @@ -40,14 +35,14 @@ public class WebSocketDocs { @SuppressWarnings("InnerClassMayBeStatic") // tag::listenerEndpoint[] - public class ListenerEndPoint implements WebSocketListener // <1> + public class ListenerEndPoint implements Session.Listener // <1> { private Session session; @Override - public void onWebSocketConnect(Session session) + public void onWebSocketOpen(Session session) { - // The WebSocket connection is established. + // The WebSocket endpoint has been opened. // Store the session to be able to send data to the remote peer. this.session = session; @@ -56,13 +51,13 @@ public void onWebSocketConnect(Session session) session.setMaxTextMessageSize(16 * 1024); // You may immediately send a message to the remote peer. - session.getRemote().sendString("connected", WriteCallback.NOOP); + session.sendText("connected", Callback.NOOP); } @Override public void onWebSocketClose(int statusCode, String reason) { - // The WebSocket connection is closed. + // The WebSocket endpoint has been closed. // You may dispose resources. disposeResources(); @@ -71,7 +66,7 @@ public void onWebSocketClose(int statusCode, String reason) @Override public void onWebSocketError(Throwable cause) { - // The WebSocket connection failed. + // The WebSocket endpoint failed. // You may log the error. cause.printStackTrace(); @@ -87,11 +82,11 @@ public void onWebSocketText(String message) // You may echo it back if it matches certain criteria. if (message.startsWith("echo:")) - session.getRemote().sendString(message.substring("echo:".length()), WriteCallback.NOOP); + session.sendText(message.substring("echo:".length()), Callback.NOOP); } @Override - public void onWebSocketBinary(byte[] payload, int offset, int length) + public void onWebSocketBinary(ByteBuffer payload, Callback callback) { // A WebSocket binary message is received. @@ -99,17 +94,18 @@ public void onWebSocketBinary(byte[] payload, int offset, int length) byte[] pngBytes = new byte[]{(byte)0x89, 'P', 'N', 'G'}; for (int i = 0; i < pngBytes.length; ++i) { - if (pngBytes[i] != payload[offset + i]) + if (pngBytes[i] != payload.get(i)) return; } - savePNGImage(payload, offset, length); + savePNGImage(payload); + callback.succeed(); } } // end::listenerEndpoint[] @SuppressWarnings("InnerClassMayBeStatic") // tag::streamingListenerEndpoint[] - public class StreamingListenerEndpoint implements WebSocketPartialListener + public class StreamingListenerEndpoint implements Session.Listener { private Path textPath; @@ -121,10 +117,12 @@ public void onWebSocketPartialText(String payload, boolean fin) } @Override - public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin) + public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin, Callback callback) { // Save chunks to file. appendToFile(payload, fin); + // Complete the callback. + callback.succeed(); } } // end::streamingListenerEndpoint[] @@ -136,10 +134,10 @@ public class AnnotatedEndPoint { private Session session; - @OnWebSocketConnect // <2> - public void onConnect(Session session) + @OnWebSocketOpen // <2> + public void onOpen(Session session) { - // The WebSocket connection is established. + // The WebSocket endpoint has been opened. // Store the session to be able to send data to the remote peer. this.session = session; @@ -148,13 +146,13 @@ public void onConnect(Session session) session.setMaxTextMessageSize(16 * 1024); // You may immediately send a message to the remote peer. - session.getRemote().sendString("connected", WriteCallback.NOOP); + session.sendText("connected", Callback.NOOP); } @OnWebSocketClose // <3> public void onClose(int statusCode, String reason) { - // The WebSocket connection is closed. + // The WebSocket endpoint has been closed. // You may dispose resources. disposeResources(); @@ -163,7 +161,7 @@ public void onClose(int statusCode, String reason) @OnWebSocketError // <4> public void onError(Throwable cause) { - // The WebSocket connection failed. + // The WebSocket endpoint failed. // You may log the error. cause.printStackTrace(); @@ -179,11 +177,11 @@ public void onTextMessage(Session session, String message) // <3> // You may echo it back if it matches certain criteria. if (message.startsWith("echo:")) - session.getRemote().sendString(message.substring("echo:".length()), WriteCallback.NOOP); + session.sendText(message.substring("echo:".length()), Callback.NOOP); } @OnWebSocketMessage // <5> - public void onBinaryMessage(byte[] payload, int offset, int length) + public void onBinaryMessage(ByteBuffer payload, Callback callback) { // A WebSocket binary message is received. @@ -191,10 +189,10 @@ public void onBinaryMessage(byte[] payload, int offset, int length) byte[] pngBytes = new byte[]{(byte)0x89, 'P', 'N', 'G'}; for (int i = 0; i < pngBytes.length; ++i) { - if (pngBytes[i] != payload[offset + i]) + if (pngBytes[i] != payload.get(i)) return; } - savePNGImage(payload, offset, length); + savePNGImage(payload); } } // end::annotatedEndpoint[] @@ -222,10 +220,10 @@ public void onBinaryMessage(InputStream stream) @SuppressWarnings("InnerClassMayBeStatic") // tag::sessionConfigure[] - public class ConfigureEndpoint implements WebSocketListener + public class ConfigureEndpoint implements Session.Listener { @Override - public void onWebSocketConnect(Session session) + public void onWebSocketOpen(Session session) { // Configure the max length of incoming messages. session.setMaxTextMessageSize(16 * 1024); @@ -236,38 +234,6 @@ public void onWebSocketConnect(Session session) } // end::sessionConfigure[] - @SuppressWarnings("InnerClassMayBeStatic") - // tag::sendBlocking[] - @WebSocket - public class BlockingSendEndpoint - { - @OnWebSocketMessage - public void onText(Session session, String text) - { - // Obtain the RemoteEndpoint APIs. - RemoteEndpoint remote = session.getRemote(); - - try - { - // Send textual data to the remote peer. - remote.sendString("data"); - - // Send binary data to the remote peer. - ByteBuffer bytes = readImageFromFile(); - remote.sendBytes(bytes); - - // Send a PING frame to the remote peer. - remote.sendPing(ByteBuffer.allocate(8).putLong(NanoTime.now()).flip()); - } - catch (IOException x) - { - // No need to rethrow or close the session. - System.getLogger("websocket").log(System.Logger.Level.WARNING, "could not send data", x); - } - } - } - // end::sendBlocking[] - @SuppressWarnings("InnerClassMayBeStatic") // tag::sendNonBlocking[] @WebSocket @@ -276,27 +242,24 @@ public class NonBlockingSendEndpoint @OnWebSocketMessage public void onText(Session session, String text) { - // Obtain the RemoteEndpoint APIs. - RemoteEndpoint remote = session.getRemote(); - // Send textual data to the remote peer. - remote.sendString("data", new WriteCallback() // <1> + session.sendText("data", new Callback() // <1> { @Override - public void writeSuccess() + public void succeed() { // Send binary data to the remote peer. ByteBuffer bytes = readImageFromFile(); - remote.sendBytes(bytes, new WriteCallback() // <2> + session.sendBinary(bytes, new Callback() // <2> { @Override - public void writeSuccess() + public void succeed() { // Both sends succeeded. } @Override - public void writeFailed(Throwable x) + public void fail(Throwable x) { System.getLogger("websocket").log(System.Logger.Level.WARNING, "could not send binary data", x); } @@ -304,53 +267,18 @@ public void writeFailed(Throwable x) } @Override - public void writeFailed(Throwable x) + public void fail(Throwable x) { // No need to rethrow or close the session. System.getLogger("websocket").log(System.Logger.Level.WARNING, "could not send textual data", x); } }); - // remote.sendString("wrong", WriteCallback.NOOP); // May throw WritePendingException! <3> + // remote.sendString("wrong", Callback.NOOP); // May throw WritePendingException! <3> } } // end::sendNonBlocking[] - @SuppressWarnings("InnerClassMayBeStatic") - // tag::streamSendBlocking[] - @WebSocket - public class StreamSendBlockingEndpoint - { - @OnWebSocketMessage - public void onText(Session session, String text) - { - try - { - RemoteEndpoint remote = session.getRemote(); - while (true) - { - ByteBuffer chunk = readChunkToSend(); - if (chunk == null) - { - // No more bytes, finish the WebSocket message. - remote.sendPartialBytes(ByteBuffer.allocate(0), true); - break; - } - else - { - // Send the chunk. - remote.sendPartialBytes(chunk, false); - } - } - } - catch (IOException x) - { - x.printStackTrace(); - } - } - } - // end::streamSendBlocking[] - @SuppressWarnings("InnerClassMayBeStatic") // tag::streamSendNonBlocking[] @WebSocket @@ -359,18 +287,17 @@ public class StreamSendNonBlockingEndpoint @OnWebSocketMessage public void onText(Session session, String text) { - RemoteEndpoint remote = session.getRemote(); - new Sender(remote).iterate(); + new Sender(session).iterate(); } - private class Sender extends IteratingCallback implements WriteCallback // <1> + private class Sender extends IteratingCallback implements Callback // <1> { - private final RemoteEndpoint remote; + private final Session session; private boolean finished; - private Sender(RemoteEndpoint remote) + private Sender(Session session) { - this.remote = remote; + this.session = session; } @Override @@ -383,27 +310,27 @@ protected Action process() throws Throwable // <2> if (chunk == null) { // No more bytes, finish the WebSocket message. - remote.sendPartialBytes(ByteBuffer.allocate(0), true, this); // <3> + session.sendPartialBinary(ByteBuffer.allocate(0), true, this); // <3> finished = true; return Action.SCHEDULED; } else { // Send the chunk. - remote.sendPartialBytes(ByteBuffer.allocate(0), false, this); // <3> + session.sendPartialBinary(ByteBuffer.allocate(0), false, this); // <3> return Action.SCHEDULED; } } @Override - public void writeSuccess() + public void succeed() { // When the send succeeds, succeed this IteratingCallback. succeeded(); } @Override - public void writeFailed(Throwable x) + public void fail(Throwable x) { // When the send fails, fail this IteratingCallback. failed(x); @@ -420,14 +347,14 @@ protected void onCompleteFailure(Throwable x) @SuppressWarnings("InnerClassMayBeStatic") // tag::pingPongListener[] - public class RoundTripListenerEndpoint implements WebSocketPingPongListener // <1> + public class RoundTripListenerEndpoint implements Session.Listener // <1> { @Override - public void onWebSocketConnect(Session session) + public void onWebSocketOpen(Session session) { // Send to the remote peer the local nanoTime. ByteBuffer buffer = ByteBuffer.allocate(8).putLong(NanoTime.now()).flip(); - session.getRemote().sendPing(buffer, WriteCallback.NOOP); + session.sendPing(buffer, Callback.NOOP); } @Override @@ -451,7 +378,7 @@ public class CloseEndpoint public void onText(Session session, String text) { if ("close".equalsIgnoreCase(text)) - session.close(StatusCode.NORMAL, "bye"); + session.close(StatusCode.NORMAL, "bye", Callback.NOOP); } } // end::sessionClose[] @@ -476,7 +403,7 @@ private static void disposeResources() { } - private static void savePNGImage(byte[] payload, int offset, int length) + private static void savePNGImage(ByteBuffer byteBuffer) { } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java index 0b02c2e7e081..51ebf73455ec 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java @@ -152,13 +152,12 @@ public interface CoreSession extends OutgoingFrames, IncomingFrames, Configurati void abort(); /** - * Manage flow control by indicating demand for handling Frames. A call to - * {@link FrameHandler#onFrame(Frame, Callback)} will only be made if a - * corresponding demand has been signaled. It is an error to call this method - * if {@link FrameHandler#isAutoDemanding()} returns true. + *
Manages flow control by indicating demand for WebSocket frames.
+ *A call to {@link FrameHandler#onFrame(Frame, Callback)} will only + * be made if there is demand.
* - * @param n The number of frames that can be handled (in sequential calls to - * {@link FrameHandler#onFrame(Frame, Callback)}). May not be negative. + * @param n the number of frames that can be handled in sequential calls to + * {@link FrameHandler#onFrame(Frame, Callback)}, must be positive. */ void demand(long n); @@ -235,7 +234,8 @@ public WebSocketComponents getWebSocketComponents() @Override public ByteBufferPool getByteBufferPool() { - return null; + WebSocketComponents components = getWebSocketComponents(); + return components != null ? components.getByteBufferPool() : null; } @Override diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java index 545fcb6f268f..3db3a5b5b3b8 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java @@ -16,102 +16,113 @@ import org.eclipse.jetty.util.Callback; /** - * Interface for local WebSocket Endpoint Frame handling. - * - *- * This is the receiver of Parsed Frames. It is implemented by the Application (or Application API layer or Framework) - * as the primary API to/from the Core websocket implementation. The instance to be used for each websocket connection - * is instantiated by the application, either: - *
+ *Handles incoming WebSocket frames for a given endpoint.
+ *FrameHandler is the receiver of parsed WebSocket frames. + * It is implemented by application code as the primary API to + * interact with the WebSocket implementation.
+ *The FrameHandler instance to be used for each WebSocket + * connection is instantiated by the application, either:
*- * Once instantiated the FrameHandler follows is used as follows: - *
+ *Once instantiated the FrameHandler is used as follows:
*FrameHandler is responsible to manage the demand for more + * WebSocket frames, either directly by calling {@link CoreSession#demand(long)} + * or by delegating the demand management to other components.
*/ public interface FrameHandler extends IncomingFrames { /** - * Async notification that Connection is being opened. - *- * FrameHandler can write during this call, but can not receive frames until the callback is succeeded. - *
- *
- * If the FrameHandler succeeds the callback we transition to OPEN state and can now receive frames if auto-demanding,
- * or can now call {@link CoreSession#demand(long)} to receive frames if it is not auto-demanding.
- * If the FrameHandler fails the callback a close frame will be sent with {@link CloseStatus#SERVER_ERROR} and
- * the connection will be closed.
- *
Invoked when the WebSocket connection is opened.
+ *It is allowed to send WebSocket frames via + * {@link CoreSession#sendFrame(Frame, Callback, boolean)}. + *
WebSocket frames cannot be received until a call to + * {@link CoreSession#demand(long)} is made.
+ *If the callback argument is failed, the implementation + * sends a CLOSE frame with {@link CloseStatus#SERVER_ERROR}, + * and the connection will be closed.
* * @param coreSession the session associated with this connection. - * @param callback the callback to indicate success in processing (or failure) + * @param callback the callback to indicate success or failure of + * the processing of this event. */ void onOpen(CoreSession coreSession, Callback callback); /** - * Receiver of all Frames. - * This method will never be called in parallel for the same session and will be called - * sequentially to satisfy all outstanding demand signaled by calls to - * {@link CoreSession#demand(long)}. - * Control and Data frames are passed to this method. - * Close frames may be responded to by the handler, but if an appropriate close response is not - * sent once the callback is succeeded, then a response close will be generated and sent. + *Invoked when a WebSocket frame is received.
+ *This method will never be called concurrently for the + * same session; will be called sequentially to satisfy the + * outstanding demand signaled by calls to + * {@link CoreSession#demand(long)}.
+ *Both control and data frames are passed to this method.
+ *CLOSE frames may be responded from this method, but if + * they are not responded, then the implementation will respond + * when the callback is completed.
+ *The callback argument must be completed to indicate + * that the buffers associated with the frame can be recycled.
+ *Additional WebSocket frames (of any type, including CLOSE + * frames) cannot be received until a call to + * {@link CoreSession#demand(long)} is made.
* - * @param frame the raw frame - * @param callback the callback to indicate success in processing frame (or failure) + * @param frame the WebSocket frame. + * @param callback the callback to indicate success or failure of + * the processing of this event. */ void onFrame(Frame frame, Callback callback); /** - * An error has occurred or been detected in websocket-core and being reported to FrameHandler. - * A call to onError will be followed by a call to {@link #onClosed(CloseStatus, Callback)} giving the close status - * derived from the error. This will not be called more than once, {@link #onClosed(CloseStatus, Callback)} + *Invoked when an error has occurred or has been detected.
+ *A call to this method will be followed by a call to + * {@link #onClosed(CloseStatus, Callback)} with the close status + * derived from the error.
+ *This method will not be called more than once, {@link #onClosed(CloseStatus, Callback)} * will be called on the callback completion. * - * @param cause the reason for the error - * @param callback the callback to indicate success in processing (or failure) + * @param cause the error cause + * @param callback the callback to indicate success or failure of + * the processing of this event. */ void onError(Throwable cause, Callback callback); /** - * This is the Close Handshake Complete event. - *
- * The connection is now closed, no reading or writing is possible anymore. - * Implementations of FrameHandler can cleanup their resources for this connection now. - * This method will be called only once. - *
+ *Invoked when a WebSocket close event happened.
+ *The WebSocket connection is closed, no reading or writing + * is possible anymore.
+ *Implementations of this method may cleanup resources + * that have been allocated.
+ *This method will not be called more than once.
* - * @param closeStatus the close status received from remote, or in the case of abnormal closure from local. - * @param callback the callback to indicate success in processing (or failure) + * @param closeStatus the close status received from the remote peer, + * or generated locally in the case of abnormal closures. + * @param callback the callback to indicate success or failure of + * the processing of this event. */ void onClosed(CloseStatus closeStatus, Callback callback); - - /** - * Does the FrameHandler manage it's own demand? - * - * @return true if demand will be managed by an automatic call to demand(1) after every succeeded callback passed to - * {@link #onFrame(Frame, Callback)}. If false the FrameHandler will need to manage its own demand by calling - * {@link CoreSession#demand(long)} when it is willing to receive new Frames. - */ - default boolean isAutoDemanding() - { - return true; - } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/OutgoingFrames.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/OutgoingFrames.java index 8d3eef067531..2b70c78aec88 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/OutgoingFrames.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/OutgoingFrames.java @@ -27,7 +27,7 @@ public interface OutgoingFrames * layers and extensions present in the implementation. ** If you are implementing a mutation, you are obliged to handle - * the incoming WriteCallback appropriately. + * the incoming Callback appropriately. * * @param frame the frame to eventually write to the network layer. * @param callback the callback to notify when the frame is written. diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java index 4715542f7741..da5738982ae3 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java @@ -55,7 +55,6 @@ public class WebSocketCoreSession implements CoreSession, Dumpable private final WebSocketSessionState sessionState = new WebSocketSessionState(); private final FrameHandler handler; private final Negotiated negotiated; - private final boolean autoDemanding; private final Flusher flusher = new Flusher(this); private final ExtensionStack extensionStack; @@ -80,7 +79,6 @@ public WebSocketCoreSession(FrameHandler handler, Behavior behavior, Negotiated this.handler = handler; this.behavior = behavior; this.negotiated = negotiated; - this.autoDemanding = handler.isAutoDemanding(); extensionStack = negotiated.getExtensions(); extensionStack.initialize(new IncomingAdaptor(), new OutgoingAdaptor(), this); } @@ -113,14 +111,6 @@ protected void handle(Runnable runnable) } } - /** - * @return True if the sessions handling is demanding. - */ - public boolean isAutoDemanding() - { - return autoDemanding; - } - public ExtensionStack getExtensionStack() { return negotiated.getExtensions(); @@ -382,21 +372,18 @@ public void onOpen() if (LOG.isDebugEnabled()) LOG.debug("ConnectionState: Transition to CONNECTED"); - Callback openCallback = Callback.from( - () -> - { - sessionState.onOpen(); - if (LOG.isDebugEnabled()) - LOG.debug("ConnectionState: Transition to OPEN"); - if (autoDemanding) - autoDemand(); - }, - x -> - { - if (LOG.isDebugEnabled()) - LOG.debug("Error during OPEN", x); - processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, x), NOOP); - }); + Callback openCallback = Callback.from(() -> + { + sessionState.onOpen(); + if (LOG.isDebugEnabled()) + LOG.debug("ConnectionState: Transition to OPEN"); + }, + x -> + { + if (LOG.isDebugEnabled()) + LOG.debug("Error during OPEN", x); + processHandlerError(new CloseException(CloseStatus.SERVER_ERROR, x), NOOP); + }); try { @@ -417,16 +404,9 @@ public void onOpen() @Override public void demand(long n) { - if (autoDemanding) - throw new IllegalStateException("FrameHandler is not demanding: " + this); getExtensionStack().demand(n); } - public void autoDemand() - { - getExtensionStack().demand(1); - } - @Override public boolean isRsv1Used() { @@ -655,13 +635,7 @@ public void onFrame(Frame frame, Callback callback) // Handle inbound frame if (frame.getOpCode() != OpCode.CLOSE) { - Callback handlerCallback = !isAutoDemanding() ? callback : Callback.from(() -> - { - callback.succeeded(); - autoDemand(); - }, callback::failed); - - handle(() -> handler.onFrame(frame, handlerCallback)); + handle(() -> handler.onFrame(frame, callback)); return; } @@ -677,22 +651,21 @@ public void onFrame(Frame frame, Callback callback) } else { - closeCallback = Callback.from( - () -> + closeCallback = Callback.from(() -> + { + if (sessionState.isOutputOpen()) + { + CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); + if (LOG.isDebugEnabled()) + LOG.debug("ConnectionState: sending close response {}", closeStatus); + close(closeStatus == null ? CloseStatus.NO_CODE_STATUS : closeStatus, callback); + } + else { - if (sessionState.isOutputOpen()) - { - CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); - if (LOG.isDebugEnabled()) - LOG.debug("ConnectionState: sending close response {}", closeStatus); - close(closeStatus == null ? CloseStatus.NO_CODE_STATUS : closeStatus, callback); - } - else - { - callback.succeeded(); - } - }, - x -> processHandlerError(x, callback)); + callback.succeeded(); + } + }, + x -> processHandlerError(x, callback)); } handler.onFrame(frame, closeCallback); @@ -783,7 +756,7 @@ public WebSocketComponents getWebSocketComponents() @Override public String toString() { - return String.format("WSCoreSession@%x{%s,%s,%s,af=%b,i/o=%d/%d,fs=%d}->%s", + return String.format("WebSocketCoreSession@%x{%s,%s,%s,af=%b,i/o=%d/%d,fs=%d}->%s", hashCode(), behavior, sessionState, diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/MessageHandler.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/MessageHandler.java index 326a6c1dadc0..81bf7f7585c0 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/MessageHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/MessageHandler.java @@ -32,11 +32,8 @@ import org.slf4j.LoggerFactory; /** - * A utility implementation of FrameHandler that defragments - * text frames into a String message before calling {@link #onText(String, Callback)}. - * Flow control is by default automatic, but an implementation - * may extend {@link #isAutoDemanding()} to return false and then explicitly control - * demand with calls to {@link CoreSession#demand(long)}. + *
A utility implementation of FrameHandler that aggregates TEXT frames + * into a String message before calling {@link #onText(String, Callback)}. */ public class MessageHandler implements FrameHandler { @@ -120,6 +117,7 @@ public void onOpen(CoreSession coreSession, Callback callback) this.coreSession = coreSession; callback.succeeded(); + coreSession.demand(1); } @Override @@ -130,30 +128,26 @@ public void onFrame(Frame frame, Callback callback) switch (frame.getOpCode()) { - case OpCode.CLOSE: - onCloseFrame(frame, callback); - break; - case OpCode.PING: - onPingFrame(frame, callback); - break; - case OpCode.PONG: - onPongFrame(frame, callback); - break; - case OpCode.TEXT: + case OpCode.CLOSE -> onCloseFrame(frame, callback); + case OpCode.PING -> onPingFrame(frame, callback); + case OpCode.PONG -> onPongFrame(frame, callback); + case OpCode.TEXT -> + { dataType = OpCode.TEXT; onTextFrame(frame, callback); - break; - case OpCode.BINARY: + } + case OpCode.BINARY -> + { dataType = OpCode.BINARY; onBinaryFrame(frame, callback); - break; - case OpCode.CONTINUATION: + } + case OpCode.CONTINUATION -> + { onContinuationFrame(frame, callback); if (frame.isFin()) dataType = OpCode.UNDEFINED; - break; - default: - callback.failed(new IllegalStateException()); + } + default -> callback.failed(new IllegalStateException()); } } @@ -199,7 +193,8 @@ protected void onTextFrame(Frame frame, Callback callback) long currentSize = frame.getPayload().remaining() + textBuffer.length(); if (currentSize > maxSize) throw new MessageTooLargeException("Message larger than " + maxSize + " bytes"); - textBuffer.append(frame.getPayload()); + else + textBuffer.append(frame.getPayload()); } if (frame.isFin()) @@ -211,8 +206,11 @@ protected void onTextFrame(Frame frame, Callback callback) { if (textBuffer.hasCodingErrors()) throw new BadPayloadException("Invalid UTF-8"); - callback.succeeded(); + else + callback.succeeded(); } + + coreSession.demand(1); } catch (Throwable t) { @@ -232,8 +230,8 @@ protected void onBinaryFrame(Frame frame, Callback callback) long currentSize = frame.getPayload().remaining() + binaryBuffer.size(); if (currentSize > maxSize) throw new MessageTooLargeException("Message larger than " + maxSize + " bytes"); - - BufferUtil.writeTo(frame.getPayload(), binaryBuffer); + else + BufferUtil.writeTo(frame.getPayload(), binaryBuffer); } if (frame.isFin()) @@ -245,6 +243,8 @@ protected void onBinaryFrame(Frame frame, Callback callback) { callback.succeeded(); } + + coreSession.demand(1); } catch (Throwable t) { @@ -256,27 +256,21 @@ protected void onContinuationFrame(Frame frame, Callback callback) { switch (dataType) { - case OpCode.BINARY: - onBinaryFrame(frame, callback); - break; - - case OpCode.TEXT: - onTextFrame(frame, callback); - break; - - default: - throw new IllegalStateException(); + case OpCode.BINARY -> onBinaryFrame(frame, callback); + case OpCode.TEXT -> onTextFrame(frame, callback); + default -> throw new IllegalStateException(); } } protected void onPingFrame(Frame frame, Callback callback) { - coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), callback, false); + coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), Callback.from(() -> coreSession.demand(1), callback), false); } protected void onPongFrame(Frame frame, Callback callback) { callback.succeeded(); + coreSession.demand(1); } protected void onCloseFrame(Frame frame, Callback callback) @@ -346,7 +340,7 @@ public void sendText(Callback callback, boolean batch, final String... parts) int i = 0; @Override - protected Action process() throws Throwable + protected Action process() { if (i + 1 > parts.length) return Action.SUCCEEDED; @@ -398,7 +392,7 @@ public void sendBinary(Callback callback, boolean batch, final ByteBuffer... par int i = 0; @Override - protected Action process() throws Throwable + protected Action process() { if (i + 1 > parts.length) return Action.SUCCEEDED; diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/AbstractMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/AbstractMessageSink.java index 3fbed69422b5..819b1a6154ae 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/AbstractMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/AbstractMessageSink.java @@ -18,14 +18,81 @@ import org.eclipse.jetty.websocket.core.CoreSession; +/** + *
Abstract implementation of {@link MessageSink}.
+ *Management of demand for WebSocket frames may either be entirely managed + * by the {@link MessageSink} implementation ({@code autoDemand==true}); or + * it may be managed collaboratively between the application and the + * {@link MessageSink} implementation ({@code autoDemand==true}).
+ *{@link MessageSink} implementations must handle the demand for WebSocket + * frames in this way:
+ *Method {@link #autoDemand()} helps to manage the demand after the + * invocation of the application function returns successfully.
+ */ public abstract class AbstractMessageSink implements MessageSink { - protected final CoreSession session; - protected final MethodHandle methodHandle; + private final CoreSession session; + private final MethodHandle methodHandle; + private final boolean autoDemand; - public AbstractMessageSink(CoreSession session, MethodHandle methodHandle) + /** + * Creates a new {@link MessageSink}. + * + * @param session the WebSocket session + * @param methodHandle the application function to invoke + * @param autoDemand whether this {@link MessageSink} manages demand automatically + * as explained in {@link AbstractMessageSink} + */ + public AbstractMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand) { this.session = Objects.requireNonNull(session, "CoreSession"); this.methodHandle = Objects.requireNonNull(methodHandle, "MethodHandle"); + this.autoDemand = autoDemand; + } + + /** + * @return the WebSocket session + */ + public CoreSession getCoreSession() + { + return session; + } + + /** + * @return the application function + */ + public MethodHandle getMethodHandle() + { + return methodHandle; + } + + /** + * @return whether this {@link MessageSink} automatically demands for more + * WebSocket frames after the invocation of the application function has returned. + */ + public boolean isAutoDemand() + { + return autoDemand; + } + + /** + *If {@link #isAutoDemand()} then demands for one more WebSocket frame + * via {@link CoreSession#demand(long)}; otherwise it is a no-operation, + * because the demand is explicitly managed by the application function.
+ */ + protected void autoDemand() + { + if (isAutoDemand()) + getCoreSession().demand(1); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteArrayMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteArrayMessageSink.java index 6724c856ed33..aec69940e339 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteArrayMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteArrayMessageSink.java @@ -25,22 +25,31 @@ import org.eclipse.jetty.websocket.core.exception.InvalidSignatureException; import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException; +/** + *A {@link MessageSink} implementation that accumulates BINARY frames + * into a message that is then delivered to the application function + * passed to the constructor in the form of a {@code byte[]}.
+ */ public class ByteArrayMessageSink extends AbstractMessageSink { - private static final byte[] EMPTY_BUFFER = new byte[0]; - private ByteBufferCallbackAccumulator out; + private ByteBufferCallbackAccumulator accumulator; - public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle) + /** + * Creates a new {@link ByteArrayMessageSink}. + * + * @param session the WebSocket session + * @param methodHandle the application function to invoke when a new message has been assembled + * @param autoDemand whether this {@link MessageSink} manages demand automatically + */ + public ByteArrayMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand) { - super(session, methodHandle); + super(session, methodHandle, autoDemand); // This uses the offset length byte array signature not supported by jakarta websocket. // The jakarta layer instead uses decoders for whole byte array messages instead of this message sink. MethodType onMessageType = MethodType.methodType(Void.TYPE, byte[].class, int.class, int.class); if (methodHandle.type().changeReturnType(void.class) != onMessageType.changeReturnType(void.class)) - { throw InvalidSignatureException.build(onMessageType, methodHandle.type()); - } } @Override @@ -48,62 +57,59 @@ public void accept(Frame frame, Callback callback) { try { - long size = (out == null ? 0 : out.getLength()) + frame.getPayloadLength(); - long maxBinaryMessageSize = session.getMaxBinaryMessageSize(); - if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize) + long size = (accumulator == null ? 0 : accumulator.getLength()) + frame.getPayloadLength(); + long maxSize = getCoreSession().getMaxBinaryMessageSize(); + if (maxSize > 0 && size > maxSize) { - throw new MessageTooLargeException( - String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d", size, maxBinaryMessageSize)); + callback.failed(new MessageTooLargeException(String.format("Binary message too large: %,d > %,d", size, maxSize))); + return; } - // If we are fin and no OutputStream has been created we don't need to aggregate. - if (frame.isFin() && (out == null)) + ByteBuffer payload = frame.getPayload(); + if (frame.isFin() && accumulator == null) { - if (frame.hasPayload()) - { - byte[] buf = BufferUtil.toArray(frame.getPayload()); - methodHandle.invoke(buf, 0, buf.length); - } - else - methodHandle.invoke(EMPTY_BUFFER, 0, 0); - + byte[] buf = BufferUtil.toArray(payload); + getMethodHandle().invoke(buf, 0, buf.length); callback.succeeded(); - session.demand(1); + autoDemand(); return; } - // Aggregate the frame payload. - if (frame.hasPayload()) + if (!frame.isFin() && !frame.hasPayload()) { - ByteBuffer payload = frame.getPayload(); - if (out == null) - out = new ByteBufferCallbackAccumulator(); - out.addEntry(payload, callback); + callback.succeeded(); + getCoreSession().demand(1); + return; } - // If the methodHandle throws we don't want to fail callback twice. - callback = Callback.NOOP; + if (accumulator == null) + accumulator = new ByteBufferCallbackAccumulator(); + accumulator.addEntry(payload, callback); + if (frame.isFin()) { - byte[] buf = out.takeByteArray(); - methodHandle.invoke(buf, 0, buf.length); + // Do not complete twice the callback if the invocation fails. + callback = Callback.NOOP; + byte[] buf = accumulator.takeByteArray(); + getMethodHandle().invoke(buf, 0, buf.length); + autoDemand(); + } + else + { + getCoreSession().demand(1); } - session.demand(1); } catch (Throwable t) { - if (out != null) - out.fail(t); + if (accumulator != null) + accumulator.fail(t); callback.failed(t); } finally { if (frame.isFin()) - { - // reset - out = null; - } + accumulator = null; } } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteBufferMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteBufferMessageSink.java index 1da91ad09c2d..1fa44b8c924f 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteBufferMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteBufferMessageSink.java @@ -16,32 +16,46 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodType; import java.nio.ByteBuffer; -import java.util.Objects; import org.eclipse.jetty.io.ByteBufferCallbackAccumulator; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.RetainableByteBuffer; -import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.CoreSession; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.exception.InvalidSignatureException; import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException; +/** + *A {@link MessageSink} implementation that accumulates BINARY frames + * into a message that is then delivered to the application function + * passed to the constructor in the form of a {@link ByteBuffer}.
+ */ public class ByteBufferMessageSink extends AbstractMessageSink { - private ByteBufferCallbackAccumulator out; + private ByteBufferCallbackAccumulator accumulator; - public ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle) + /** + * Creates a new {@link ByteBufferMessageSink}. + * + * @param session the WebSocket session + * @param methodHandle the application function to invoke when a new message has been assembled + * @param autoDemand whether this {@link MessageSink} manages demand automatically + */ + public ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand) { - super(session, methodHandle); + this(session, methodHandle, autoDemand, true); + } + + protected ByteBufferMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand, boolean validateSignature) + { + super(session, methodHandle, autoDemand); - // Validate onMessageMethod - Objects.requireNonNull(methodHandle, "MethodHandle"); - MethodType onMessageType = MethodType.methodType(Void.TYPE, ByteBuffer.class); - if (methodHandle.type() != onMessageType) + if (validateSignature) { - throw InvalidSignatureException.build(onMessageType, methodHandle.type()); + MethodType onMessageType = MethodType.methodType(Void.TYPE, ByteBuffer.class); + if (methodHandle.type() != onMessageType) + throw InvalidSignatureException.build(onMessageType, methodHandle.type()); } } @@ -50,69 +64,64 @@ public void accept(Frame frame, Callback callback) { try { - long size = (out == null ? 0 : out.getLength()) + frame.getPayloadLength(); - long maxBinaryMessageSize = session.getMaxBinaryMessageSize(); - if (maxBinaryMessageSize > 0 && size > maxBinaryMessageSize) + long size = (accumulator == null ? 0 : accumulator.getLength()) + frame.getPayloadLength(); + long maxSize = getCoreSession().getMaxBinaryMessageSize(); + if (maxSize > 0 && size > maxSize) { - throw new MessageTooLargeException(String.format("Binary message too large: (actual) %,d > (configured max binary message size) %,d", - size, maxBinaryMessageSize)); + callback.failed(new MessageTooLargeException(String.format("Binary message too large: %,d > %,d", size, maxSize))); + return; } - // If we are fin and no OutputStream has been created we don't need to aggregate. - if (frame.isFin() && (out == null)) + if (frame.isFin() && accumulator == null) { - if (frame.hasPayload()) - methodHandle.invoke(frame.getPayload()); - else - methodHandle.invoke(BufferUtil.EMPTY_BUFFER); - - callback.succeeded(); - session.demand(1); + invoke(getMethodHandle(), frame.getPayload(), callback); + autoDemand(); return; } - // Aggregate the frame payload. - if (frame.hasPayload()) + if (!frame.isFin() && !frame.hasPayload()) { - ByteBuffer payload = frame.getPayload(); - if (out == null) - out = new ByteBufferCallbackAccumulator(); - out.addEntry(payload, callback); + callback.succeeded(); + getCoreSession().demand(1); + return; } - // If the methodHandle throws we don't want to fail callback twice. - callback = Callback.NOOP; + if (accumulator == null) + accumulator = new ByteBufferCallbackAccumulator(); + accumulator.addEntry(frame.getPayload(), callback); + if (frame.isFin()) { - ByteBufferPool bufferPool = session.getByteBufferPool(); - RetainableByteBuffer buffer = bufferPool.acquire(out.getLength(), false); + ByteBufferPool bufferPool = getCoreSession().getByteBufferPool(); + RetainableByteBuffer buffer = bufferPool.acquire(accumulator.getLength(), false); ByteBuffer byteBuffer = buffer.getByteBuffer(); - out.writeTo(byteBuffer); - - try - { - methodHandle.invoke(byteBuffer); - } - finally - { - buffer.release(); - } + accumulator.writeTo(byteBuffer); + callback = Callback.from(buffer::release); + invoke(getMethodHandle(), byteBuffer, callback); + autoDemand(); + } + else + { + // Did not call the application so must explicitly demand here. + getCoreSession().demand(1); } - - session.demand(1); } catch (Throwable t) { - if (out != null) - out.fail(t); + if (accumulator != null) + accumulator.fail(t); callback.failed(t); } finally { if (frame.isFin()) - { - out = null; - } + accumulator = null; } } + + protected void invoke(MethodHandle methodHandle, ByteBuffer byteBuffer, Callback callback) throws Throwable + { + methodHandle.invoke(byteBuffer); + callback.succeeded(); + } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/DispatchedMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/DispatchedMessageSink.java index ec382cd45536..57c0006f57e1 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/DispatchedMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/DispatchedMessageSink.java @@ -14,6 +14,8 @@ package org.eclipse.jetty.websocket.core.messages; import java.io.Closeable; +import java.io.InputStream; +import java.io.Reader; import java.lang.invoke.MethodHandle; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -24,144 +26,89 @@ import org.eclipse.jetty.websocket.core.Frame; /** - * Centralized logic for Dispatched Message Handling. - *- * A Dispatched MessageSink can consist of 1 or more {@link #accept(Frame, Callback)} calls. - *
- * The first {@link #accept(Frame, Callback)} in a message will trigger a dispatch to the - * function specified in the constructor. - *
- * The completion of the dispatched function call is the sign that the next message is suitable - * for processing from the network. (The connection fillAndParse should remain idle for the - * NEXT message until such time as the dispatched function call has completed) - *
- *- * There are a few use cases we need to handle. - *
- *- * 1. Normal Processing - *
- *- * Connection Thread | DispatchedMessageSink | Thread 2 - * TEXT accept() - * - dispatch - function.read(stream) - * CONT accept() stream.read() - * CONT accept() stream.read() - * CONT=fin accept() stream.read() - * EOF stream.read EOF - * IDLE - * exit method - * RESUME(NEXT MSG) - *- *
- * 2. Early Exit (with no activity) - *
- *- * Connection Thread | DispatchedMessageSink | Thread 2 - * TEXT accept() - * - dispatch - function.read(stream) - * CONT accept() exit method (normal return) - * IDLE - * TIMEOUT - *- *
- * 3. Early Exit (due to exception) - *
- *- * Connection Thread | DispatchedMessageSink | Thread 2 - * TEXT accept() - * - dispatch - function.read(stream) - * CONT accept() exit method (throwable) - * callback.fail() - * endpoint.onError() - * close(error) - *- *
- * 4. Early Exit (with Custom Threading) - *
- *- * Connection Thread | DispatchedMessageSink | Thread 2 | Thread 3 - * TEXT accept() - * - dispatch - function.read(stream) - * thread.new(stream) stream.read() - * exit method - * CONT accept() stream.read() - * CONT accept() stream.read() - * CONT=fin accept() stream.read() - * EOF stream.read EOF - * RESUME(NEXT MSG) - *+ *
A partial implementation of {@link MessageSink} for methods that consume WebSocket + * messages using blocking stream APIs, typically via {@link InputStream} or {@link Reader}.
+ *The first call to {@link #accept(Frame, Callback)} triggers the application function + * specified in the constructor to be invoked in a different thread.
+ *Subsequent calls to {@link #accept(Frame, Callback)} feed a nested {@link MessageSink} + * that in turns feeds the {@link InputStream} or {@link Reader} stream.
+ *Implementations of this class must manage the demand for WebSocket frames, and + * therefore must always be auto-demanding.
+ *Upon return from the application function, the stream is closed. + * This means that the stream must be consumed synchronously within the invocation of the + * application function.
+ *The demand for the next WebSocket message is performed when both the application + * function has returned and the last frame has been consumed (signaled by completing the + * callback associated with the frame).
+ *Throwing from the application function results in the WebSocket connection to be + * closed.
*/ public abstract class DispatchedMessageSink extends AbstractMessageSink { - private CompletableFutureA consumer of WebSocket data frames (either BINARY or TEXT).
+ *{@link FrameHandler} delegates the processing of data frames + * to {@link MessageSink}, including the processing of the demand + * for the next frames.
*/ public interface MessageSink { /** - * Consume the frame payload to the message. + *Consumes the WebSocket frame, possibly asynchronously + * when this method has returned.
+ *The callback argument must be completed when the frame + * payload is consumed.
+ *The demand for more frames must be explicitly invoked, + * or arranged to be invoked asynchronously, by the implementation + * of this method, by calling {@link CoreSession#demand(long)}.
* - * @param frame the frame, its payload (and fin state) to append - * @param callback the callback for how the frame was consumed + * @param frame the frame to consume + * @param callback the callback to complete when the frame is consumed */ void accept(Frame frame, Callback callback); + + /** + *Fails this {@link MessageSink} with the given cause.
+ * + * @param failure the cause of the failure + */ + default void fail(Throwable failure) + { + } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteArrayMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteArrayMessageSink.java index 32f47717b52f..fd34a4b7c68f 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteArrayMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteArrayMessageSink.java @@ -20,13 +20,23 @@ import org.eclipse.jetty.websocket.core.CoreSession; import org.eclipse.jetty.websocket.core.Frame; +/** + *A {@link MessageSink} implementation that delivers BINARY frames + * to the application function passed to the constructor in the form + * of a {@code byte[]}.
+ */ public class PartialByteArrayMessageSink extends AbstractMessageSink { - private static byte[] EMPTY_BUFFER = new byte[0]; - - public PartialByteArrayMessageSink(CoreSession session, MethodHandle methodHandle) + /** + * Creates a new {@link PartialByteArrayMessageSink}. + * + * @param session the WebSocket session + * @param methodHandle the application function to invoke when a new frame has arrived + * @param autoDemand whether this {@link MessageSink} manages demand automatically + */ + public PartialByteArrayMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand) { - super(session, methodHandle); + super(session, methodHandle, autoDemand); } @Override @@ -36,12 +46,16 @@ public void accept(Frame frame, Callback callback) { if (frame.hasPayload() || frame.isFin()) { - byte[] buffer = frame.hasPayload() ? BufferUtil.toArray(frame.getPayload()) : EMPTY_BUFFER; - methodHandle.invoke(buffer, frame.isFin()); + byte[] buffer = BufferUtil.toArray(frame.getPayload()); + getMethodHandle().invoke(buffer, frame.isFin()); + callback.succeeded(); + autoDemand(); + } + else + { + callback.succeeded(); + getCoreSession().demand(1); } - - callback.succeeded(); - session.demand(1); } catch (Throwable t) { diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteBufferMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteBufferMessageSink.java index 215640637a70..cde238b56569 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteBufferMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteBufferMessageSink.java @@ -14,16 +14,29 @@ package org.eclipse.jetty.websocket.core.messages; import java.lang.invoke.MethodHandle; +import java.nio.ByteBuffer; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.CoreSession; import org.eclipse.jetty.websocket.core.Frame; +/** + *A {@link MessageSink} implementation that delivers BINARY frames + * to the application function passed to the constructor in the form + * of a {@link ByteBuffer}.
+ */ public class PartialByteBufferMessageSink extends AbstractMessageSink { - public PartialByteBufferMessageSink(CoreSession session, MethodHandle methodHandle) + /** + * Creates a new {@link PartialByteBufferMessageSink}. + * + * @param session the WebSocket session + * @param methodHandle the application function to invoke when a new frame has arrived + * @param autoDemand whether this {@link MessageSink} manages demand automatically + */ + public PartialByteBufferMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand) { - super(session, methodHandle); + super(session, methodHandle, autoDemand); } @Override @@ -32,14 +45,25 @@ public void accept(Frame frame, Callback callback) try { if (frame.hasPayload() || frame.isFin()) - methodHandle.invoke(frame.getPayload(), frame.isFin()); - - callback.succeeded(); - session.demand(1); + { + invoke(getMethodHandle(), frame.getPayload(), frame.isFin(), callback); + autoDemand(); + } + else + { + callback.succeeded(); + getCoreSession().demand(1); + } } catch (Throwable t) { callback.failed(t); } } + + protected void invoke(MethodHandle methodHandle, ByteBuffer byteBuffer, boolean fin, Callback callback) throws Throwable + { + methodHandle.invoke(byteBuffer, fin); + callback.succeeded(); + } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialStringMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialStringMessageSink.java index 37f34fd751ab..9b37e15cf451 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialStringMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialStringMessageSink.java @@ -14,7 +14,6 @@ package org.eclipse.jetty.websocket.core.messages; import java.lang.invoke.MethodHandle; -import java.util.Objects; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Utf8StringBuilder; @@ -22,14 +21,25 @@ import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.exception.BadPayloadException; +/** + *A {@link MessageSink} implementation that delivers TEXT frames + * to the application function passed to the constructor in the form + * of a {@link String}.
+ */ public class PartialStringMessageSink extends AbstractMessageSink { - private Utf8StringBuilder out; + private Utf8StringBuilder accumulator; - public PartialStringMessageSink(CoreSession session, MethodHandle methodHandle) + /** + * Creates a new {@link PartialStringMessageSink}. + * + * @param session the WebSocket session + * @param methodHandle the application function to invoke when a new frame has arrived + * @param autoDemand whether this {@link MessageSink} manages demand automatically + */ + public PartialStringMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand) { - super(session, methodHandle); - Objects.requireNonNull(methodHandle, "MethodHandle"); + super(session, methodHandle, autoDemand); } @Override @@ -37,28 +47,34 @@ public void accept(Frame frame, Callback callback) { try { - if (out == null) - out = new Utf8StringBuilder(session.getInputBufferSize()); + if (accumulator == null) + accumulator = new Utf8StringBuilder(getCoreSession().getInputBufferSize()); + + accumulator.append(frame.getPayload()); - out.append(frame.getPayload()); if (frame.isFin()) { - String complete = out.takeCompleteString(() -> new BadPayloadException("Invalid UTF-8")); - methodHandle.invoke(complete, true); - out = null; + String complete = accumulator.takeCompleteString(() -> new BadPayloadException("Invalid UTF-8")); + getMethodHandle().invoke(complete, true); } else { - String partial = out.takePartialString(() -> new BadPayloadException("Invalid UTF-8")); - methodHandle.invoke(partial, false); + String partial = accumulator.takePartialString(() -> new BadPayloadException("Invalid UTF-8")); + getMethodHandle().invoke(partial, false); } callback.succeeded(); - session.demand(1); + + autoDemand(); } catch (Throwable t) { callback.failed(t); } + finally + { + if (frame.isFin()) + accumulator = null; + } } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ReaderMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ReaderMessageSink.java index c3350549a764..8f8e4f1759a1 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ReaderMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ReaderMessageSink.java @@ -16,18 +16,17 @@ import java.lang.invoke.MethodHandle; import org.eclipse.jetty.websocket.core.CoreSession; -import org.eclipse.jetty.websocket.core.Frame; public class ReaderMessageSink extends DispatchedMessageSink { - public ReaderMessageSink(CoreSession session, MethodHandle methodHandle) + public ReaderMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand) { - super(session, methodHandle); + super(session, methodHandle, autoDemand); } @Override - public MessageReader newSink(Frame frame) + public MessageReader newMessageSink() { - return new MessageReader(session.getInputBufferSize()); + return new MessageReader(getCoreSession()); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/StringMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/StringMessageSink.java index 40ab9d4978c6..87595d20bf4d 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/StringMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/StringMessageSink.java @@ -22,14 +22,26 @@ import org.eclipse.jetty.websocket.core.exception.BadPayloadException; import org.eclipse.jetty.websocket.core.exception.MessageTooLargeException; +/** + *A {@link MessageSink} implementation that accumulates TEXT frames + * into a message that is then delivered to the application function + * passed to the constructor in the form of a {@link String}.
+ */ public class StringMessageSink extends AbstractMessageSink { private Utf8StringBuilder out; private int size; - public StringMessageSink(CoreSession session, MethodHandle methodHandle) + /** + * Creates a new {@link StringMessageSink}. + * + * @param session the WebSocket session + * @param methodHandle the application function to invoke when a new message has been assembled + * @param autoDemand whether this {@link MessageSink} manages demand automatically + */ + public StringMessageSink(CoreSession session, MethodHandle methodHandle, boolean autoDemand) { - super(session, methodHandle); + super(session, methodHandle, autoDemand); this.size = 0; } @@ -39,23 +51,29 @@ public void accept(Frame frame, Callback callback) try { size += frame.getPayloadLength(); - long maxTextMessageSize = session.getMaxTextMessageSize(); - if (maxTextMessageSize > 0 && size > maxTextMessageSize) + long maxSize = getCoreSession().getMaxTextMessageSize(); + if (maxSize > 0 && size > maxSize) { - throw new MessageTooLargeException(String.format("Text message too large: (actual) %,d > (configured max text message size) %,d", - size, maxTextMessageSize)); + callback.failed(new MessageTooLargeException(String.format("Text message too large: %,d > %,d", size, maxSize))); + return; } if (out == null) - out = new Utf8StringBuilder(session.getInputBufferSize()); + out = new Utf8StringBuilder(getCoreSession().getInputBufferSize()); out.append(frame.getPayload()); + if (frame.isFin()) { - methodHandle.invoke(out.takeCompleteString(() -> new BadPayloadException("Invalid UTF-8"))); + getMethodHandle().invoke(out.takeCompleteString(() -> new BadPayloadException("Invalid UTF-8"))); + callback.succeeded(); + autoDemand(); + } + else + { + callback.succeeded(); + getCoreSession().demand(1); } - callback.succeeded(); - session.demand(1); } catch (Throwable t) { @@ -65,7 +83,6 @@ public void accept(Frame frame, Callback callback) { if (frame.isFin()) { - // reset size = 0; out = null; } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/AutoFragmentTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/AutoFragmentTest.java index 945635824337..cd587a435dee 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/AutoFragmentTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/AutoFragmentTest.java @@ -74,13 +74,13 @@ public void testOutgoingAutoFragmentToMaxFrameSize() throws Exception // Turn off fragmentation on the server. assertTrue(serverHandler.open.await(5, TimeUnit.SECONDS)); - serverHandler.coreSession.setMaxFrameSize(0); - serverHandler.coreSession.setAutoFragment(false); + serverHandler.getCoreSession().setMaxFrameSize(0); + serverHandler.getCoreSession().setAutoFragment(false); // Set the client to fragment to the maxFrameSize. int maxFrameSize = 30; - clientHandler.coreSession.setMaxFrameSize(maxFrameSize); - clientHandler.coreSession.setAutoFragment(true); + clientHandler.getCoreSession().setMaxFrameSize(maxFrameSize); + clientHandler.getCoreSession().setAutoFragment(true); // Send a message which is too large. int size = maxFrameSize * 2; @@ -88,7 +88,7 @@ public void testOutgoingAutoFragmentToMaxFrameSize() throws Exception Arrays.fill(array, 0, size, (byte)'X'); ByteBuffer message = BufferUtil.toBuffer(array); Frame sentFrame = new Frame(OpCode.BINARY, BufferUtil.copy(message)); - clientHandler.coreSession.sendFrame(sentFrame, Callback.NOOP, false); + clientHandler.getCoreSession().sendFrame(sentFrame, Callback.NOOP, false); // We should not receive any frames larger than the max frame size. // So our message should be split into two frames. @@ -121,20 +121,20 @@ public void testIncomingAutoFragmentToMaxFrameSize() throws Exception connect.get(5, TimeUnit.SECONDS); // Turn off fragmentation on the client. - clientHandler.coreSession.setMaxFrameSize(0); - clientHandler.coreSession.setAutoFragment(false); + clientHandler.getCoreSession().setMaxFrameSize(0); + clientHandler.getCoreSession().setAutoFragment(false); // Set the server should fragment to the maxFrameSize. int maxFrameSize = 30; assertTrue(serverHandler.open.await(5, TimeUnit.SECONDS)); - serverHandler.coreSession.setMaxFrameSize(maxFrameSize); - serverHandler.coreSession.setAutoFragment(true); + serverHandler.getCoreSession().setMaxFrameSize(maxFrameSize); + serverHandler.getCoreSession().setAutoFragment(true); // Send a message which is too large. int size = maxFrameSize * 2; byte[] message = new byte[size]; Arrays.fill(message, 0, size, (byte)'X'); - clientHandler.coreSession.sendFrame(new Frame(OpCode.BINARY, BufferUtil.toBuffer(message)), Callback.NOOP, false); + clientHandler.getCoreSession().sendFrame(new Frame(OpCode.BINARY, BufferUtil.toBuffer(message)), Callback.NOOP, false); // We should not receive any frames larger than the max frame size. // So our message should be split into two frames. @@ -166,14 +166,14 @@ public void testIncomingAutoFragmentWithPermessageDeflate() throws Exception connect.get(5, TimeUnit.SECONDS); // Turn off fragmentation on the client. - clientHandler.coreSession.setMaxFrameSize(0); - clientHandler.coreSession.setAutoFragment(false); + clientHandler.getCoreSession().setMaxFrameSize(0); + clientHandler.getCoreSession().setAutoFragment(false); // Set a small maxFrameSize on the server. int maxFrameSize = 10; assertTrue(serverHandler.open.await(5, TimeUnit.SECONDS)); - serverHandler.coreSession.setMaxFrameSize(maxFrameSize); - serverHandler.coreSession.setAutoFragment(true); + serverHandler.getCoreSession().setMaxFrameSize(maxFrameSize); + serverHandler.getCoreSession().setAutoFragment(true); // Generate a large random payload. int payloadSize = 1000; @@ -187,7 +187,7 @@ public void testIncomingAutoFragmentWithPermessageDeflate() throws Exception BufferUtil.flipToFlush(payload, 0); // Send the large random payload which should be fragmented on the server. - clientHandler.coreSession.sendFrame(new Frame(OpCode.BINARY, BufferUtil.copy(payload)), Callback.NOOP, false); + clientHandler.getCoreSession().sendFrame(new Frame(OpCode.BINARY, BufferUtil.copy(payload)), Callback.NOOP, false); // Assemble the message from the fragmented frames. ByteBuffer message = BufferUtil.allocate(payloadSize * 2); @@ -219,14 +219,14 @@ public void testGzipBomb() throws Exception connect.get(5, TimeUnit.SECONDS); // Turn off fragmentation on the client. - clientHandler.coreSession.setMaxFrameSize(0); - clientHandler.coreSession.setAutoFragment(false); + clientHandler.getCoreSession().setMaxFrameSize(0); + clientHandler.getCoreSession().setAutoFragment(false); // Set a small maxFrameSize on the server. int maxFrameSize = 1024; assertTrue(serverHandler.open.await(5, TimeUnit.SECONDS)); - serverHandler.coreSession.setMaxFrameSize(maxFrameSize); - serverHandler.coreSession.setAutoFragment(true); + serverHandler.getCoreSession().setMaxFrameSize(maxFrameSize); + serverHandler.getCoreSession().setAutoFragment(true); // Highly compressible payload. byte[] data = new byte[512 * 1024]; @@ -234,7 +234,7 @@ public void testGzipBomb() throws Exception ByteBuffer payload = ByteBuffer.wrap(data); // Send the payload which should be fragmented on the server. - clientHandler.coreSession.sendFrame(new Frame(OpCode.BINARY, BufferUtil.copy(payload)), Callback.NOOP, false); + clientHandler.getCoreSession().sendFrame(new Frame(OpCode.BINARY, BufferUtil.copy(payload)), Callback.NOOP, false); // Assemble the message from the fragmented frames. ByteBuffer message = BufferUtil.allocate(payload.remaining() * 2); @@ -282,13 +282,13 @@ public void testOutgoingAutoFragmentWithPermessageDeflate() throws Exception connect.get(5, TimeUnit.SECONDS); // Turn off fragmentation on the client. - clientHandler.coreSession.setMaxFrameSize(0); - clientHandler.coreSession.setAutoFragment(false); + clientHandler.getCoreSession().setMaxFrameSize(0); + clientHandler.getCoreSession().setAutoFragment(false); // Set maxFrameSize and autoFragment on the server. assertTrue(serverHandler.open.await(5, TimeUnit.SECONDS)); - serverHandler.coreSession.setMaxFrameSize(maxFrameSize); - serverHandler.coreSession.setAutoFragment(true); + serverHandler.getCoreSession().setMaxFrameSize(maxFrameSize); + serverHandler.getCoreSession().setAutoFragment(true); // Send the payload which should be fragmented by the server permessage-deflate. ByteBuffer sendPayload = BufferUtil.copy(payload); diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java index 597876cd0baf..bdf956869844 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java @@ -83,7 +83,6 @@ public void onFrame(Frame frame, Callback callback) public void onError(Throwable cause, Callback callback) { callback.succeeded(); - _coreSession.demand(1); } @Override @@ -91,12 +90,6 @@ public void onClosed(CloseStatus closeStatus, Callback callback) { callback.succeeded(); } - - @Override - public boolean isAutoDemanding() - { - return false; - } } @Test diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandingIncomingFramesCapture.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandingIncomingFramesCapture.java index 112ed3df2c8f..365021d011f9 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandingIncomingFramesCapture.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandingIncomingFramesCapture.java @@ -33,8 +33,7 @@ public void onFrame(Frame frame, Callback callback) } finally { - if (_coreSession.isAutoDemanding()) - _coreSession.autoDemand(); + _coreSession.demand(1); } } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java index abeb5bebb191..0f3ff967f13b 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java @@ -55,5 +55,7 @@ public void onFrame(Frame frame, Callback callback) { callback.succeeded(); } + + coreSession.demand(1); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/FrameBufferTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/FrameBufferTest.java index 01023ee72691..5c22ddb2fff8 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/FrameBufferTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/FrameBufferTest.java @@ -85,8 +85,8 @@ public void testSendSameFrameMultipleTimes() throws Exception TestFrameHandler clientHandler = new TestFrameHandler(); client.connect(clientHandler, server.getUri()).get(5, TimeUnit.SECONDS); serverHandler.open.await(5, TimeUnit.SECONDS); - clientHandler.coreSession.setAutoFragment(false); - serverHandler.coreSession.setAutoFragment(false); + clientHandler.getCoreSession().setAutoFragment(false); + serverHandler.getCoreSession().setAutoFragment(false); int payloadLen = 32 * 1024; byte[] array = new byte[payloadLen]; diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java index 44243ba0dce5..5bf8b4f9e2b9 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java @@ -46,7 +46,6 @@ public class MessageHandlerTest static byte[] fourByteUtf8Bytes = fourByteUtf8String.getBytes(StandardCharsets.UTF_8); static byte[] nonUtf8Bytes = {0x7F, (byte)0xFF, (byte)0xFF}; - boolean autoDemanding; CoreSession coreSession; List
+ * NOTE: We don't expose org.eclipse.jetty.util.Callback here as that would complicate matters with the WebAppContext's classloader isolation.
+ */
+public interface Callback
+{
+ Callback NOOP = new Callback()
+ {
+ };
+
+ /**
+ * Creates a callback from the given success and failure lambdas.
+ *
+ * @param success called when the callback succeeds
+ * @param failure called when the callback fails
+ * @return a new callback
+ */
+ static Callback from(Runnable success, Consumer
+ * Callback invoked when the write succeeds.
+ *
+ * Callback invoked when the write fails.
+ * Implementations allow to configure WebSocket parameters.
@@ -39,6 +44,13 @@ public interface WebSocketPolicy
*/
int getInputBufferSize();
+ /**
+ * The input (read from network layer) buffer size.
+ *
+ * @param size the size in bytes
+ */
+ void setInputBufferSize(int size);
+
/**
* The output (write to network layer) buffer size.
*
@@ -49,6 +61,13 @@ public interface WebSocketPolicy
*/
int getOutputBufferSize();
+ /**
+ * The output (write to network layer) buffer size.
+ *
+ * @param size the size in bytes
+ */
+ void setOutputBufferSize(int size);
+
/**
* Get the maximum size of a binary message during parsing.
*
@@ -64,6 +83,16 @@ public interface WebSocketPolicy
*/
long getMaxBinaryMessageSize();
+ /**
+ * The maximum size of a binary message during parsing/generating.
+ *
+ * Binary messages over this maximum will result in a close code 1009 {@link StatusCode#MESSAGE_TOO_LARGE}
+ *
@@ -80,73 +109,64 @@ public interface WebSocketPolicy
long getMaxTextMessageSize();
/**
- * The maximum payload size of any WebSocket Frame which can be received.
- *
- * @return the maximum size of a WebSocket Frame.
- */
- long getMaxFrameSize();
-
- /**
- * If true, frames are automatically fragmented to respect the maximum frame size.
- *
- * @return whether to automatically fragment incoming WebSocket Frames.
- */
- boolean isAutoFragment();
-
- /**
- * The duration that a websocket may be idle before being closed by the implementation
+ * The maximum size of a text message during parsing/generating.
+ *
+ * Text messages over this maximum will result in a close code 1009 {@link StatusCode#MESSAGE_TOO_LARGE}
*
- * @param duration the timeout duration (may not be null or negative)
+ * @param size the maximum allowed size of a text message.
*/
- void setIdleTimeout(Duration duration);
+ void setMaxTextMessageSize(long size);
/**
- * The input (read from network layer) buffer size.
+ * The maximum payload size of any WebSocket Frame which can be received.
*
- * @param size the size in bytes
+ * @return the maximum size of a WebSocket Frame.
*/
- void setInputBufferSize(int size);
+ long getMaxFrameSize();
/**
- * The output (write to network layer) buffer size.
+ * The maximum payload size of any WebSocket Frame which can be received.
+ *
+ * WebSocket Frames over this maximum will result in a close code 1009 {@link StatusCode#MESSAGE_TOO_LARGE}
+ *
- * Binary messages over this maximum will result in a close code 1009 {@link StatusCode#MESSAGE_TOO_LARGE}
- *
- * Text messages over this maximum will result in a close code 1009 {@link StatusCode#MESSAGE_TOO_LARGE}
+ * If set to true, frames are automatically fragmented to respect the maximum frame size.
*
- * @param size the maximum allowed size of a text message.
+ * @param autoFragment whether to automatically fragment incoming WebSocket Frames.
*/
- void setMaxTextMessageSize(long size);
+ void setAutoFragment(boolean autoFragment);
/**
- * The maximum payload size of any WebSocket Frame which can be received.
- *
- * WebSocket Frames over this maximum will result in a close code 1009 {@link StatusCode#MESSAGE_TOO_LARGE}
- *
- * Note: this is a blocking call
- *
- * @param data the message to be sent
- * @throws IOException if unable to send the bytes
- */
- void sendBytes(ByteBuffer data) throws IOException;
-
- /**
- * Initiates the asynchronous transmission of a binary message. This method returns before the message is transmitted.
- * Developers may provide a callback to be notified when the message has been transmitted or resulted in an error.
- *
- * @param data the data being sent
- * @param callback callback to notify of success or failure of the write operation
- */
- void sendBytes(ByteBuffer data, WriteCallback callback);
-
- /**
- * Send a binary message in pieces, blocking until all of the message has been transmitted.
- * The runtime reads the message in order. Non-final pieces are
- * sent with isLast set to false. The final piece must be sent with isLast set to true.
- *
- * @param fragment the piece of the message being sent
- * @param isLast true if this is the last piece of the partial bytes
- * @throws IOException if unable to send the partial bytes
- */
- void sendPartialBytes(ByteBuffer fragment, boolean isLast) throws IOException;
-
- /**
- * Initiates the asynchronous transmission of a partial binary message. This method returns before the message is
- * transmitted.
- * The runtime reads the message in order. Non-final pieces are sent with isLast
- * set to false. The final piece must be sent with isLast set to true.
- * Developers may provide a callback to be notified when the message has been transmitted or resulted in an error.
- *
- * @param fragment the data being sent
- * @param isLast true if this is the last piece of the partial bytes
- * @param callback callback to notify of success or failure of the write operation
- */
- void sendPartialBytes(ByteBuffer fragment, boolean isLast, WriteCallback callback);
-
- /**
- * Send a text message, blocking until all bytes of the message has been transmitted.
- *
- * Note: this is a blocking call
- *
- * @param text the message to be sent
- * @throws IOException if unable to send the text message
- */
- void sendString(String text) throws IOException;
-
- /**
- * Initiates the asynchronous transmission of a text message. This method may return before the message is
- * transmitted. Developers may provide a callback to
- * be notified when the message has been transmitted or resulted in an error.
- *
- * @param text the text being sent
- * @param callback callback to notify of success or failure of the write operation
- */
- void sendString(String text, WriteCallback callback);
-
- /**
- * Send a text message in pieces, blocking until all of the message has been transmitted. The runtime reads the
- * message in order. Non-final pieces are sent
- * with isLast set to false. The final piece must be sent with isLast set to true.
- *
- * @param fragment the piece of the message being sent
- * @param isLast true if this is the last piece of the partial bytes
- * @throws IOException if unable to send the partial bytes
- */
- void sendPartialString(String fragment, boolean isLast) throws IOException;
-
- /**
- * Initiates the asynchronous transmission of a partial text message.
- * This method may return before the message is transmitted.
- * The runtime reads the message in order. Non-final pieces are sent with isLast
- * set to false. The final piece must be sent with isLast set to true.
- * Developers may provide a callback to be notified when the message has been transmitted or resulted in an error.
- *
- * @param fragment the text being sent
- * @param isLast true if this is the last piece of the partial bytes
- * @param callback callback to notify of success or failure of the write operation
- */
- void sendPartialString(String fragment, boolean isLast, WriteCallback callback);
-
- /**
- * Send a Ping message containing the given application data to the remote endpoint, blocking until all of the
- * message has been transmitted.
- * The corresponding Pong message may be picked up using the MessageHandler.Pong handler.
- *
- * @param applicationData the data to be carried in the ping request
- * @throws IOException if unable to send the ping
- */
- void sendPing(ByteBuffer applicationData) throws IOException;
-
- /**
- * Asynchronously send a Ping message containing the given application data to the remote endpoint.
- * The corresponding Pong message may be picked up using the MessageHandler.Pong handler.
- *
- * @param applicationData the data to be carried in the ping request
- * @param callback callback to notify of success or failure of the write operation
- */
- void sendPing(ByteBuffer applicationData, WriteCallback callback);
-
- /**
- * Allows the developer to send an unsolicited Pong message containing the given application data
- * in order to serve as a unidirectional heartbeat for the session, this will block until
- * all of the message has been transmitted.
- *
- * @param applicationData the application data to be carried in the pong response.
- * @throws IOException if unable to send the pong
- */
- void sendPong(ByteBuffer applicationData) throws IOException;
-
- /**
- * Allows the developer to asynchronously send an unsolicited Pong message containing the given application data
- * in order to serve as a unidirectional heartbeat for the session.
- *
- * @param applicationData the application data to be carried in the pong response.
- * @param callback callback to notify of success or failure of the write operation
- */
- void sendPong(ByteBuffer applicationData, WriteCallback callback);
-
- /**
- * @return the batch mode with which messages are sent.
- * @see #flush()
- */
- BatchMode getBatchMode();
-
- /**
- * Set the batch mode with which messages are sent.
- *
- * @param mode the batch mode to use
- * @see #flush()
- */
- void setBatchMode(BatchMode mode);
-
- /**
- * Get the maximum number of data frames allowed to be waiting to be sent at any one time.
- * The default value is -1, this indicates there is no limit on how many frames can be
- * queued to be sent by the implementation. If the limit is exceeded, subsequent frames
- * sent are failed with a {@link java.nio.channels.WritePendingException} but
- * the connection is not failed and will remain open.
- *
- * @return the max number of frames.
- */
- int getMaxOutgoingFrames();
-
- /**
- * Set the maximum number of data frames allowed to be waiting to be sent at any one time.
- * The default value is -1, this indicates there is no limit on how many frames can be
- * queued to be sent by the implementation. If the limit is exceeded, subsequent frames
- * sent are failed with a {@link java.nio.channels.WritePendingException} but
- * the connection is not failed and will remain open.
- *
- * @param maxOutgoingFrames the max number of frames.
- */
- void setMaxOutgoingFrames(int maxOutgoingFrames);
-
- /**
- * Get the SocketAddress for the established connection.
- *
- * @return the SocketAddress for the established connection.
- */
- SocketAddress getRemoteAddress();
-
- /**
- * Flushes messages that may have been batched by the implementation.
- *
- * @throws IOException if the flush fails
- */
- void flush() throws IOException;
-}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java
index 3215c13577b8..aba060cb4a3c 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java
@@ -14,184 +14,349 @@
package org.eclipse.jetty.websocket.api;
import java.io.Closeable;
-import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
-import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
+import org.eclipse.jetty.websocket.api.annotations.WebSocket;
/**
- * Session represents an active link of communications with a Remote WebSocket Endpoint.
+ * {@link Session} represents an active link of
+ * communication with a remote WebSocket endpoint. {@link Session} APIs can be used to configure
+ * the various parameters that control the behavior
+ * of the WebSocket communication, such as
+ * {@link #setMaxTextMessageSize(long)}, and to send
+ * WebSocket frames or messages to the other peer. The passive link of communication that receives
+ * WebSocket events is {@link Listener}.
- * This will enqueue a graceful close to the remote endpoint.
+ * Explicitly demands for WebSocket events. This method should be called only when the WebSocket endpoint is not
+ * demanding automatically, as defined by {@link WebSocket#autoDemand()}
+ * and {@link Listener.AutoDemanding}. In general, invoking this method results in a listener method or
+ * an annotated method to be called when the corresponding event is
+ * ready to be delivered. For WebSocket endpoints that wants to receive frame events
+ * (for example by overriding {@link Listener#onWebSocketFrame(Frame, Callback)}),
+ * invoking this method will result in a frame event being delivered to
+ * the listener/annotated method when a new frame is received. For WebSocket endpoints that want to receive whole message
+ * events (for example by overriding {@link Listener#onWebSocketText(String)}),
+ * invoking this method will result in a message event being delivered to
+ * the listener/annotated method when a new message is received.
+ * The implementation will automatically demand for more frames until a
+ * whole message is assembled and then deliver the whole message as event. Note that even when the WebSocket endpoint is interested in whole
+ * messages, calling this method is necessary not only to possibly receive
+ * the next whole message, but also to receive control frames (such as
+ * PING or CLOSE frames).
+ * Failing to call this method after receiving a whole message results
+ * in the CLOSE frame event to not be processed, and therefore for the
+ * endpoint to not notice when the other peer closed the WebSocket
+ * communication.
- * This will enqueue a graceful close to the remote endpoint.
+ * Initiates the asynchronous send of a BINARY message, notifying
+ * the given callback when the message send is completed, either
+ * successfully or with a failure.
- * This will enqueue a graceful close to the remote endpoint.
+ * Initiates the asynchronous send of a BINARY frame, possibly part
+ * of a larger binary message, notifying the given callback when the frame
+ * send is completed, either successfully or with a failure. Non-final frames must be sent with the parameter {@code last=false}.
+ * The final frame must be sent with {@code last=true}.
- * This will enqueue a graceful close to the remote endpoint.
+ * Initiates the asynchronous send of a TEXT message, notifying
+ * the given callback when the message send is completed, either
+ * successfully or with a failure.
- * This will terminate the connection, without sending a websocket close frame.
- *
- * Once called, any read/write activity on the websocket from this point will be indeterminate.
- *
- * Once the underlying connection has been determined to be closed, the various onClose() events (either
- * {@link WebSocketListener#onWebSocketClose(int, String)} or {@link OnWebSocketClose}) will be called on your
- * websocket.
+ * Initiates the asynchronous send of a TEXT frame, possibly part
+ * of a larger binary message, notifying the given callback when the frame
+ * send is completed, either successfully or with a failure. Non-final frames must be sent with the parameter {@code last=false}.
+ * The final frame must be sent with {@code last=true}. Initiates the asynchronous send of a PING frame, notifying the given
+ * callback when the frame send is completed, either successfully or with
+ * a failure.
- * Do not assume that this will return a {@link InetSocketAddress} in all cases.
- * Use of various proxies, and even UnixSockets can result a SocketAddress being returned
- * without supporting {@link InetSocketAddress}
- * Initiates the asynchronous send of a PONG frame, notifying the given
+ * callback when the frame send is completed, either successfully or with
+ * a failure. Equivalent to {@code close(StatusCode.NORMAL, null, Callback.NOOP)}. Sends a websocket CLOSE frame, with status code and reason, notifying
+ * the given callback when the frame send is completed, either successfully
+ * or with a failure. Abruptly closes the WebSocket connection without sending a CLOSE frame.
- * Do not assume that this will return a {@link InetSocketAddress} in all cases.
- * Use of various proxies, and even UnixSockets can result a SocketAddress being returned
- * without supporting {@link InetSocketAddress}
- * Returns the version of the WebSocket protocol currently being used. This is taken as the value of the {@code Sec-WebSocket-Version} header
+ * used in the {@link #getUpgradeRequest() upgrade request}.
*
+ * @return the WebSocket protocol version
+ */
+ String getProtocolVersion();
+
+ /**
* @return the UpgradeRequest used to create this session
*/
UpgradeRequest getUpgradeRequest();
/**
- * Get the UpgradeResponse used to create this session
- *
* @return the UpgradeResponse used to create this session
*/
UpgradeResponse getUpgradeResponse();
/**
- * Return true if and only if the underlying socket is open.
- *
* @return whether the session is open
*/
boolean isOpen();
/**
- * Return true if and only if the underlying socket is using a secure transport.
- *
- * @return whether its using a secure transport
+ * @return whether the underlying socket is using a secure transport
*/
boolean isSecure();
/**
- * Suspend the delivery of incoming WebSocket frames.
- *
- * If this is called from inside the scope of the message handler the suspend takes effect immediately.
- * If suspend is called outside the scope of the message handler then the call may take effect
- * after 1 more frame is delivered.
- * The passive link of communication with a remote WebSocket endpoint. Applications provide WebSocket endpoints that implement this interface
+ * to receive WebSocket events from the remote peer, and can use
+ * {@link Session} for configuration and to send WebSocket frames or messages
+ * to the other peer. A WebSocket {@link Session} has opened successfully and is ready to be used. Applications can store the given {@link Session} as a field so it can be used
+ * to send messages back to the other peer. A WebSocket frame has been received. The received frames may be control frames such as PING, PONG or CLOSE,
+ * or data frames either BINARY or TEXT. A WebSocket PING frame has been received. A WebSocket PONG frame has been received. A WebSocket BINARY (or associated CONTINUATION) frame has been received. The {@code ByteBuffer} is read-only, and will be recycled when the {@code callback}
+ * is completed. A WebSocket TEXT (or associated CONTINUATION) frame has been received.
+ * Note that due to framing, there is a above average chance of any UTF8 sequences being split on the
+ * border between two frames will result in either the previous frame, or the next frame having an
+ * invalid UTF8 sequence, but the combined frames having a valid UTF8 sequence.
+ *
+ * The String being provided here will not end in a split UTF8 sequence. Instead this partial sequence
+ * will be held over until the next frame is received.
+ * @param last whether this is the last frame
+ */
+ default void onWebSocketPartialText(String payload, boolean last)
+ {
+ }
+
+ /**
+ * A WebSocket BINARY message has been received. A WebSocket TEXT message has been received. A WebSocket error has occurred during the processing of WebSocket frames. Usually errors occurs from bad or malformed incoming packets, for example
+ * text frames that do not contain UTF-8 bytes, frames that are too big, or other
+ * violations of the WebSocket specification. The WebSocket {@link Session} will be closed, but applications may
+ * explicitly {@link Session#close(int, String, Callback) close} the
+ * {@link Session} providing a different status code or reason. The WebSocket {@link Session} has been closed. Tag interface that signals that the WebSocket endpoint
+ * is demanding for WebSocket frames automatically.
- * Convenient abstract class to base standard WebSocket implementations off of.
- */
-public class WebSocketAdapter implements WebSocketListener
-{
- private volatile Session session;
- private RemoteEndpoint remote;
-
- public RemoteEndpoint getRemote()
- {
- return remote;
- }
-
- public Session getSession()
- {
- return session;
- }
-
- public boolean isConnected()
- {
- Session sess = this.session;
- return (sess != null) && (sess.isOpen());
- }
-
- public boolean isNotConnected()
- {
- return !isConnected();
- }
-
- @Override
- public void onWebSocketBinary(byte[] payload, int offset, int len)
- {
- /* do nothing */
- }
-
- @Override
- public void onWebSocketClose(int statusCode, String reason)
- {
- /* do nothing */
- }
-
- @Override
- public void onWebSocketConnect(Session sess)
- {
- this.session = sess;
- this.remote = sess.getRemote();
- }
-
- @Override
- public void onWebSocketError(Throwable cause)
- {
- /* do nothing */
- }
-
- @Override
- public void onWebSocketText(String message)
- {
- /* do nothing */
- }
-}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketBehavior.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketBehavior.java
deleted file mode 100644
index cf6dd8b743f7..000000000000
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketBehavior.java
+++ /dev/null
@@ -1,26 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-package org.eclipse.jetty.websocket.api;
-
-/**
- * Behavior for how the WebSocket should operate.
- *
- * This dictated by the RFC 6455 spec in various places, where certain behavior must be performed depending on
- * operation as a CLIENT vs a SERVER
- */
-public enum WebSocketBehavior
-{
- CLIENT,
- SERVER
-}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnectionListener.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnectionListener.java
deleted file mode 100644
index 445900db0645..000000000000
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketConnectionListener.java
+++ /dev/null
@@ -1,58 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-package org.eclipse.jetty.websocket.api;
-
-/**
- * Core WebSocket Connection Listener
- */
-public interface WebSocketConnectionListener
-{
- /**
- * A Close Event was received.
- *
- * The underlying Connection will be considered closed at this point.
- *
- * @param statusCode the close status code. (See {@link StatusCode})
- * @param reason the optional reason for the close.
- */
- default void onWebSocketClose(int statusCode, String reason)
- {
- }
-
- /**
- * A WebSocket {@link Session} has connected successfully and is ready to be used.
- *
- * Note: It is a good idea to track this session as a field in your object so that you can write messages back via the {@link RemoteEndpoint}
- *
- * @param session the websocket session.
- */
- default void onWebSocketConnect(Session session)
- {
- }
-
- /**
- * A WebSocket exception has occurred.
- *
- * This is a way for the internal implementation to notify of exceptions occured during the processing of websocket.
- *
- * Usually this occurs from bad / malformed incoming packets. (example: bad UTF8 data, frames that are too big, violations of the spec)
- *
- * This will result in the {@link Session} being closed by the implementing side.
- *
- * @param cause the error that occurred.
- */
- default void onWebSocketError(Throwable cause)
- {
- }
-}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketFrameListener.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketFrameListener.java
deleted file mode 100644
index d61643a2d089..000000000000
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketFrameListener.java
+++ /dev/null
@@ -1,27 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-package org.eclipse.jetty.websocket.api;
-
-/**
- * WebSocket Frame Listener interface for incoming WebSocket frames.
- */
-public interface WebSocketFrameListener extends WebSocketConnectionListener
-{
- /**
- * A WebSocket frame has been received.
- *
- * @param frame the immutable frame received
- */
- void onWebSocketFrame(Frame frame);
-}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketListener.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketListener.java
deleted file mode 100644
index 52869d0030de..000000000000
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketListener.java
+++ /dev/null
@@ -1,40 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-package org.eclipse.jetty.websocket.api;
-
-/**
- * Basic WebSocket Listener interface for incoming WebSocket message events.
- */
-public interface WebSocketListener extends WebSocketConnectionListener
-{
- /**
- * A WebSocket binary frame has been received.
- *
- * @param payload the raw payload array received
- * @param offset the offset in the payload array where the data starts
- * @param len the length of bytes in the payload
- */
- default void onWebSocketBinary(byte[] payload, int offset, int len)
- {
- }
-
- /**
- * A WebSocket Text frame was received.
- *
- * @param message the message
- */
- default void onWebSocketText(String message)
- {
- }
-}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketPartialListener.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketPartialListener.java
deleted file mode 100644
index cd317e5aa2a0..000000000000
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketPartialListener.java
+++ /dev/null
@@ -1,53 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-package org.eclipse.jetty.websocket.api;
-
-import java.nio.ByteBuffer;
-
-/**
- * WebSocket Partial Message Listener interface for incoming WebSocket TEXT/BINARY/CONTINUATION frames.
- */
-public interface WebSocketPartialListener extends WebSocketConnectionListener
-{
- /**
- * A WebSocket BINARY (or associated CONTINUATION) frame has been received.
- *
- * Important Note: The payload {@code ByteBuffer} cannot be modified, and the ByteBuffer object itself
- * will be recycled on completion of this method call, make a copy of the data contained within if you want to
- * retain it between calls.
- *
- * @param payload the binary message frame payload
- * @param fin true if this is the final frame, false otherwise
- */
- default void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
- {
- }
-
- /**
- * A WebSocket TEXT (or associated CONTINUATION) frame has been received.
- *
- * @param payload the text message payload
- *
- * Note that due to framing, there is a above average chance of any UTF8 sequences being split on the
- * border between two frames will result in either the previous frame, or the next frame having an
- * invalid UTF8 sequence, but the combined frames having a valid UTF8 sequence.
- *
- * The String being provided here will not end in a split UTF8 sequence. Instead this partial sequence
- * will be held over until the next frame is received.
- * @param fin true if this is the final frame, false otherwise
- */
- default void onWebSocketPartialText(String payload, boolean fin)
- {
- }
-}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketPingPongListener.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketPingPongListener.java
deleted file mode 100644
index f238c736e1e3..000000000000
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WebSocketPingPongListener.java
+++ /dev/null
@@ -1,40 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-package org.eclipse.jetty.websocket.api;
-
-import java.nio.ByteBuffer;
-
-/**
- * WebSocket PING/PONG Listener interface for incoming WebSocket PING/PONG frames.
- */
-public interface WebSocketPingPongListener extends WebSocketConnectionListener
-{
- /**
- * A WebSocket PING has been received.
- *
- * @param payload the ping payload
- */
- default void onWebSocketPing(ByteBuffer payload)
- {
- }
-
- /**
- * A WebSocket PONG has been received.
- *
- * @param payload the pong payload
- */
- default void onWebSocketPong(ByteBuffer payload)
- {
- }
-}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WriteCallback.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WriteCallback.java
deleted file mode 100644
index c66b40bdc210..000000000000
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/WriteCallback.java
+++ /dev/null
@@ -1,62 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-package org.eclipse.jetty.websocket.api;
-
-/**
- * Callback for Write events.
- *
- * NOTE: We don't expose org.eclipse.jetty.util.Callback here as that would complicate matters with the WebAppContext's classloader isolation.
- */
-public interface WriteCallback
-{
- WriteCallback NOOP = new WriteCallback()
- {
- };
-
- /**
- *
- * Callback invoked when the write fails.
- *
- * Callback invoked when the write succeeds.
- *
- * Acceptable method patterns. Annotation for methods to receive WebSocket close events. Acceptable method patterns:
- * Acceptable method patterns. Annotation for methods to receive WebSocket errors. Acceptable method patterns:
- * Acceptable method patterns. Annotation for methods to receive WebSocket frame events. Acceptable method patterns:
- * Acceptable method patterns.
+ * Annotation for methods to receive BINARY or TEXT WebSocket events. Acceptable method patterns: Note: that the {@link Reader} in this case will always use UTF-8 encoding/charset (this is dictated by the RFC 6455 spec for Text Messages. If you need to
- * use a non-UTF-8 encoding/charset, you are instructed to use the binary messaging techniques. NOTE Method that takes a {@link Reader} must have
+ * {@link WebSocket#autoDemand()} set to {@code true}. NOTE The {@link Reader} argument will always use the UTF-8 charset,
+ * (as dictated by RFC 6455). If you need to use a different charset,
+ * you must use BINARY messages. NOTE Method that takes a {@link InputStream} must have
+ * {@link WebSocket#autoDemand()} set to {@code true}. These are used to receive partial messages without aggregating them into a complete WebSocket message. Instead the a boolean
- * argument is supplied to indicate whether this is the last segment of data of the message. See {@link WebSocketPartialListener}
- * interface for more details on partial messages. These are used to receive individual frames (and therefore partial
+ * messages) without aggregating the frames into a complete WebSocket message.
+ * A {@code boolean} parameter is supplied to indicate whether the frame is
+ * the last segment of data of the message. Note: Similar to the signatures above these can all be used with an optional first {@link Session} parameter.
- * Only 1 acceptable method pattern for this annotation. Annotation for methods to receive WebSocket connect events. Acceptable method patterns: Annotation for classes to be WebSocket endpoints. Returns whether demand for WebSocket frames is automatically performed
+ * upon successful return from methods annotated with {@link OnWebSocketOpen},
+ * {@link OnWebSocketFrame} and {@link OnWebSocketMessage}. If the demand is not automatic, then {@link Session#demand()} must be
+ * explicitly invoked to receive more WebSocket frames (both control and
+ * data frames, including CLOSE frames).
- * Binary messages over this maximum will result in a close code 1009 {@link StatusCode#MESSAGE_TOO_LARGE}
- */
- int maxBinaryMessageSize() default -1;
-
- /**
- * The time in ms (milliseconds) that a websocket may be idle before closing.
- */
- int idleTimeout() default -1;
-
- /**
- * The maximum size of a text message during parsing/generating.
- *
- * Text messages over this maximum will result in a close code 1009 {@link StatusCode#MESSAGE_TOO_LARGE}
- */
- int maxTextMessageSize() default -1;
-
- /**
- * The output frame buffering mode.
- *
- * Default: {@link BatchMode#AUTO}
- */
- BatchMode batchMode() default BatchMode.AUTO;
+ boolean autoDemand() default true;
}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/package-info.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/package-info.java
deleted file mode 100644
index bfd7fb65cdeb..000000000000
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-/**
- * Jetty WebSocket API : WebSocket POJO Annotations
- */
-package org.eclipse.jetty.websocket.api.annotations;
-
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/exceptions/InvalidWebSocketException.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/exceptions/InvalidWebSocketException.java
index 247ed00f692f..3d9680c4adbf 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/exceptions/InvalidWebSocketException.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/exceptions/InvalidWebSocketException.java
@@ -13,8 +13,7 @@
package org.eclipse.jetty.websocket.api.exceptions;
-import org.eclipse.jetty.websocket.api.WebSocketAdapter;
-import org.eclipse.jetty.websocket.api.WebSocketListener;
+import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
/**
@@ -22,12 +21,10 @@
*
* A valid WebSocket should do one of the following:
*
- * Note: {@code methodName} can be any name you want to use.
+ *
- *
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
-@Target(value =
- {ElementType.METHOD})
+@Target(ElementType.METHOD)
public @interface OnWebSocketClose
{
- /* no config */
}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketError.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketError.java
index ab59ebd96382..838397db217a 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketError.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketError.java
@@ -19,23 +19,17 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import org.eclipse.jetty.websocket.api.Session;
-
/**
- * Annotation for receiving websocket errors (exceptions) that have occurred internally in the websocket implementation.
- * public void methodName({@link Session} session, int statusCode, String reason)
- * Note: {@code methodName} can be any name you want to use.
+ *
- *
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
-@Target(value =
- {ElementType.METHOD})
+@Target(ElementType.METHOD)
public @interface OnWebSocketError
{
- /* no config */
}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketFrame.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketFrame.java
index 711e0a40e49e..17a28bd96205 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketFrame.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketFrame.java
@@ -20,23 +20,20 @@
import java.lang.annotation.Target;
import org.eclipse.jetty.websocket.api.Frame;
-import org.eclipse.jetty.websocket.api.Session;
/**
- * (ADVANCED) Annotation for tagging methods to receive frame events.
- * public void methodName({@link Throwable} error)
public void methodName({@link Session} session, {@link Throwable} error)
- * Note: {@code methodName} can be any name you want to use.
+ *
- *
+ *
+ * @see Frame
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
-@Target(value =
- {ElementType.METHOD})
+@Target(ElementType.METHOD)
public @interface OnWebSocketFrame
{
- /* no config */
}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketMessage.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketMessage.java
index 71bb4a472322..2e7029aa7792 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketMessage.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/OnWebSocketMessage.java
@@ -13,6 +13,7 @@
package org.eclipse.jetty.websocket.api.annotations;
+import java.io.InputStream;
import java.io.Reader;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
@@ -20,15 +21,9 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.WebSocketPartialListener;
-
/**
- * Annotation for tagging methods to receive Binary or Text Message events.
- * public void methodName({@link Frame} frame)
public void methodName({@link Session} session, {@link Frame} frame)
- * Note: {@code methodName} can be any name you want to use.
- *
*
- *
- *
+ *
- *
- *
- * Note: {@code methodName} can be any name you want to use.
+ *
- *
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
-@Target(value =
- {ElementType.METHOD})
-public @interface OnWebSocketConnect
+@Target(ElementType.METHOD)
+public @interface OnWebSocketOpen
{
- /* no config */
}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/WebSocket.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/WebSocket.java
index 89bf5af750cc..29f54f0b341c 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/WebSocket.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/annotations/WebSocket.java
@@ -15,50 +15,31 @@
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-import org.eclipse.jetty.websocket.api.BatchMode;
-import org.eclipse.jetty.websocket.api.StatusCode;
+import org.eclipse.jetty.websocket.api.Session;
/**
- * Tags a POJO as being a WebSocket class.
+ * public void methodName({@link Session} session)
- *
*/
-@SuppressWarnings("serial")
public class InvalidWebSocketException extends WebSocketException
{
public InvalidWebSocketException(String message)
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/exceptions/package-info.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/exceptions/package-info.java
deleted file mode 100644
index 94b0379a261f..000000000000
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/exceptions/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-/**
- * Jetty WebSocket API : Exception Types
- */
-package org.eclipse.jetty.websocket.api.exceptions;
-
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/package-info.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/package-info.java
deleted file mode 100644
index 4579e7bf04ff..000000000000
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-api/src/main/java/org/eclipse/jetty/websocket/api/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-//
-// ========================================================================
-// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
-//
-// This program and the accompanying materials are made available under the
-// terms of the Eclipse Public License v. 2.0 which is available at
-// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
-// which is available at https://www.apache.org/licenses/LICENSE-2.0.
-//
-// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
-// ========================================================================
-//
-
-/**
- * Jetty WebSocket API
- */
-package org.eclipse.jetty.websocket.api;
-
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java
index 0b3071684c11..2d2105e94ac0 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java
@@ -35,10 +35,9 @@
import org.eclipse.jetty.util.component.Graceful;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.ShutdownThread;
+import org.eclipse.jetty.websocket.api.Configurable;
import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketContainer;
-import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WebSocketSessionListener;
import org.eclipse.jetty.websocket.client.internal.JettyClientUpgradeRequest;
import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandler;
@@ -52,7 +51,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy, WebSocketContainer
+public class WebSocketClient extends ContainerLifeCycle implements Configurable, WebSocketContainer
{
private static final Logger LOG = LoggerFactory.getLogger(WebSocketClient.class);
private final WebSocketCoreClient coreClient;
@@ -177,12 +176,6 @@ public void dump(Appendable out, String indent) throws IOException
dumpObjects(out, indent, getOpenSessions());
}
- @Override
- public WebSocketBehavior getBehavior()
- {
- return WebSocketBehavior.CLIENT;
- }
-
@Override
public void addSessionListener(WebSocketSessionListener listener)
{
@@ -217,12 +210,25 @@ public Duration getIdleTimeout()
return configurationCustomizer.getIdleTimeout();
}
+ @Override
+ public void setIdleTimeout(Duration duration)
+ {
+ configurationCustomizer.setIdleTimeout(duration);
+ getHttpClient().setIdleTimeout(duration.toMillis());
+ }
+
@Override
public int getInputBufferSize()
{
return configurationCustomizer.getInputBufferSize();
}
+ @Override
+ public void setInputBufferSize(int size)
+ {
+ configurationCustomizer.setInputBufferSize(size);
+ }
+
@Override
public int getOutputBufferSize()
{
@@ -230,70 +236,69 @@ public int getOutputBufferSize()
}
@Override
- public long getMaxBinaryMessageSize()
+ public void setOutputBufferSize(int size)
{
- return configurationCustomizer.getMaxBinaryMessageSize();
+ configurationCustomizer.setOutputBufferSize(size);
}
@Override
- public long getMaxTextMessageSize()
+ public long getMaxBinaryMessageSize()
{
- return configurationCustomizer.getMaxTextMessageSize();
+ return configurationCustomizer.getMaxBinaryMessageSize();
}
@Override
- public long getMaxFrameSize()
+ public void setMaxBinaryMessageSize(long size)
{
- return configurationCustomizer.getMaxFrameSize();
+ configurationCustomizer.setMaxBinaryMessageSize(size);
}
@Override
- public boolean isAutoFragment()
+ public long getMaxTextMessageSize()
{
- return configurationCustomizer.isAutoFragment();
+ return configurationCustomizer.getMaxTextMessageSize();
}
@Override
- public void setIdleTimeout(Duration duration)
+ public void setMaxTextMessageSize(long size)
{
- configurationCustomizer.setIdleTimeout(duration);
- getHttpClient().setIdleTimeout(duration.toMillis());
+ configurationCustomizer.setMaxTextMessageSize(size);
}
@Override
- public void setInputBufferSize(int size)
+ public long getMaxFrameSize()
{
- configurationCustomizer.setInputBufferSize(size);
+ return configurationCustomizer.getMaxFrameSize();
}
@Override
- public void setOutputBufferSize(int size)
+ public void setMaxFrameSize(long maxFrameSize)
{
- configurationCustomizer.setOutputBufferSize(size);
+ configurationCustomizer.setMaxFrameSize(maxFrameSize);
}
@Override
- public void setMaxBinaryMessageSize(long size)
+ public boolean isAutoFragment()
{
- configurationCustomizer.setMaxBinaryMessageSize(size);
+ return configurationCustomizer.isAutoFragment();
}
@Override
- public void setMaxTextMessageSize(long size)
+ public void setAutoFragment(boolean autoFragment)
{
- configurationCustomizer.setMaxTextMessageSize(size);
+ configurationCustomizer.setAutoFragment(autoFragment);
}
@Override
- public void setMaxFrameSize(long maxFrameSize)
+ public int getMaxOutgoingFrames()
{
- configurationCustomizer.setMaxFrameSize(maxFrameSize);
+ return configurationCustomizer.getMaxOutgoingFrames();
}
@Override
- public void setAutoFragment(boolean autoFragment)
+ public void setMaxOutgoingFrames(int maxOutgoingFrames)
{
- configurationCustomizer.setAutoFragment(autoFragment);
+ configurationCustomizer.setMaxOutgoingFrames(maxOutgoingFrames);
}
public SocketAddress getBindAddress()
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/examples/ClientDemo.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/examples/ClientDemo.java
index 1f35d5174a61..6ff57fc31528 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/examples/ClientDemo.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/examples/ClientDemo.java
@@ -24,7 +24,6 @@
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.core.OpCode;
@@ -37,26 +36,13 @@
*/
public class ClientDemo
{
- public class TestSocket extends WebSocketAdapter
+ public class TestSocket extends Session.Listener.Abstract
{
@Override
- public void onWebSocketBinary(byte[] payload, int offset, int len)
- {
- }
-
- @Override
- public void onWebSocketClose(int statusCode, String reason)
- {
- super.onWebSocketClose(statusCode, reason);
- }
-
- @Override
- public void onWebSocketConnect(Session session)
+ public void onWebSocketOpen(Session session)
{
if (verbose)
- {
- System.err.printf("%s#onWebSocketConnect %s %s\n", this.getClass().getSimpleName(), session, session.getClass().getSimpleName());
- }
+ System.err.printf("%s#onWebSocketOpen %s %s\n", this.getClass().getSimpleName(), session, session.getClass().getSimpleName());
}
public void send(byte op, byte[] data, int maxFragmentLength)
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/examples/SimpleEchoSocket.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/examples/SimpleEchoSocket.java
index 5a843c54dfe9..9c8a40090cf3 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/examples/SimpleEchoSocket.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/examples/SimpleEchoSocket.java
@@ -16,22 +16,21 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
-import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
/**
* Basic Echo Client Socket
*/
-@WebSocket(maxTextMessageSize = 64 * 1024)
+@WebSocket
public class SimpleEchoSocket
{
private final CountDownLatch closeLatch;
- @SuppressWarnings("unused")
- private Session session;
public SimpleEchoSocket()
{
@@ -47,20 +46,19 @@ public boolean awaitClose(int duration, TimeUnit unit) throws InterruptedExcepti
public void onClose(int statusCode, String reason)
{
System.out.printf("Connection closed: %d - %s%n", statusCode, reason);
- this.session = null;
this.closeLatch.countDown(); // trigger latch
}
- @OnWebSocketConnect
- public void onConnect(Session session)
+ @OnWebSocketOpen
+ public void onOpen(Session session)
{
- System.out.printf("Got connect: %s%n", session);
- this.session = session;
+ System.out.printf("Open: %s%n", session);
+ session.setMaxTextMessageSize(64 * 1024);
try
{
- session.getRemote().sendString("Hello");
- session.getRemote().sendString("Thanks for the conversation.");
- session.close(StatusCode.NORMAL, "I'm done");
+ session.sendText("Hello", Callback.NOOP);
+ session.sendText("Thanks for the conversation.", Callback.NOOP);
+ session.close(StatusCode.NORMAL, "I'm done", Callback.NOOP);
}
catch (Throwable t)
{
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientBadUriTest.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientBadUriTest.java
index b6d982161a9f..b15a09a59fea 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientBadUriTest.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-client/src/test/java/org/eclipse/jetty/websocket/client/WebSocketClientBadUriTest.java
@@ -19,7 +19,7 @@
import org.eclipse.jetty.toolchain.test.jupiter.TestTrackerExtension;
import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -40,7 +40,7 @@ public static class OpenTrackingSocket
{
public CountDownLatch openLatch = new CountDownLatch(1);
- @OnWebSocketConnect
+ @OnWebSocketOpen
public void onOpen(Session session)
{
openLatch.countDown();
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java
index e777fd9778f8..9d9665dd49af 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java
@@ -14,21 +14,20 @@
package org.eclipse.jetty.websocket.common;
import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
-import java.util.concurrent.Executor;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
-import org.eclipse.jetty.util.thread.AutoLock;
-import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketContainer;
-import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.core.CloseStatus;
-import org.eclipse.jetty.websocket.core.Configuration;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.FrameHandler;
@@ -48,71 +47,45 @@
public class JettyWebSocketFrameHandler implements FrameHandler
{
- private enum SuspendState
- {
- DEMANDING,
- SUSPENDING,
- SUSPENDED,
- CLOSED
- }
-
- private final AutoLock lock = new AutoLock();
+ private final AtomicBoolean closeNotified = new AtomicBoolean();
private final Logger log;
private final WebSocketContainer container;
private final Object endpointInstance;
- private final BatchMode batchMode;
- private final AtomicBoolean closeNotified = new AtomicBoolean();
+ private final JettyWebSocketFrameHandlerMetadata metadata;
private MethodHandle openHandle;
private MethodHandle closeHandle;
private MethodHandle errorHandle;
private MethodHandle textHandle;
- private final Class extends MessageSink> textSinkClass;
private MethodHandle binaryHandle;
+ private final Class extends MessageSink> textSinkClass;
private final Class extends MessageSink> binarySinkClass;
private MethodHandle frameHandle;
private MethodHandle pingHandle;
private MethodHandle pongHandle;
private UpgradeRequest upgradeRequest;
private UpgradeResponse upgradeResponse;
-
- private final Configuration.Customizer customizer;
private MessageSink textSink;
private MessageSink binarySink;
private MessageSink activeMessageSink;
private WebSocketSession session;
- private SuspendState state = SuspendState.DEMANDING;
- private Runnable delayedOnFrame;
- private CoreSession coreSession;
-
- public JettyWebSocketFrameHandler(WebSocketContainer container,
- Object endpointInstance,
- MethodHandle openHandle, MethodHandle closeHandle, MethodHandle errorHandle,
- MethodHandle textHandle, MethodHandle binaryHandle,
- Class extends MessageSink> textSinkClass,
- Class extends MessageSink> binarySinkClass,
- MethodHandle frameHandle,
- MethodHandle pingHandle, MethodHandle pongHandle,
- BatchMode batchMode,
- Configuration.Customizer customizer)
+
+ public JettyWebSocketFrameHandler(WebSocketContainer container, Object endpointInstance, JettyWebSocketFrameHandlerMetadata metadata)
{
this.log = LoggerFactory.getLogger(endpointInstance.getClass());
-
this.container = container;
this.endpointInstance = endpointInstance;
-
- this.openHandle = openHandle;
- this.closeHandle = closeHandle;
- this.errorHandle = errorHandle;
- this.textHandle = textHandle;
- this.binaryHandle = binaryHandle;
- this.textSinkClass = textSinkClass;
- this.binarySinkClass = binarySinkClass;
- this.frameHandle = frameHandle;
- this.pingHandle = pingHandle;
- this.pongHandle = pongHandle;
-
- this.batchMode = batchMode;
- this.customizer = customizer;
+ this.metadata = metadata;
+
+ this.openHandle = InvokerUtils.bindTo(metadata.getOpenHandle(), endpointInstance);
+ this.closeHandle = InvokerUtils.bindTo(metadata.getCloseHandle(), endpointInstance);
+ this.errorHandle = InvokerUtils.bindTo(metadata.getErrorHandle(), endpointInstance);
+ this.textHandle = InvokerUtils.bindTo(metadata.getTextHandle(), endpointInstance);
+ this.binaryHandle = InvokerUtils.bindTo(metadata.getBinaryHandle(), endpointInstance);
+ this.textSinkClass = metadata.getTextSink();
+ this.binarySinkClass = metadata.getBinarySink();
+ this.frameHandle = InvokerUtils.bindTo(metadata.getFrameHandle(), endpointInstance);
+ this.pingHandle = InvokerUtils.bindTo(metadata.getPingHandle(), endpointInstance);
+ this.pongHandle = InvokerUtils.bindTo(metadata.getPongHandle(), endpointInstance);
}
public void setUpgradeRequest(UpgradeRequest upgradeRequest)
@@ -135,11 +108,6 @@ public UpgradeResponse getUpgradeResponse()
return upgradeResponse;
}
- public BatchMode getBatchMode()
- {
- return batchMode;
- }
-
public WebSocketSession getSession()
{
return session;
@@ -150,8 +118,7 @@ public void onOpen(CoreSession coreSession, Callback callback)
{
try
{
- customizer.customize(coreSession);
- this.coreSession = coreSession;
+ metadata.customize(coreSession);
session = new WebSocketSession(container, coreSession, this);
if (!session.isOpen())
throw new IllegalStateException("Session is not open");
@@ -165,13 +132,11 @@ public void onOpen(CoreSession coreSession, Callback callback)
pingHandle = InvokerUtils.bindTo(pingHandle, session);
pongHandle = InvokerUtils.bindTo(pongHandle, session);
- Executor executor = container.getExecutor();
-
if (textHandle != null)
- textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, session);
+ textSink = createMessageSink(textSinkClass, session, textHandle, isAutoDemand());
if (binaryHandle != null)
- binarySink = JettyWebSocketFrameHandlerFactory.createMessageSink(binaryHandle, binarySinkClass, executor, session);
+ binarySink = createMessageSink(binarySinkClass, session, binaryHandle, isAutoDemand());
if (openHandle != null)
openHandle.invoke();
@@ -180,74 +145,90 @@ public void onOpen(CoreSession coreSession, Callback callback)
container.notifySessionListeners((listener) -> listener.onWebSocketSessionOpened(session));
callback.succeeded();
- demand();
}
catch (Throwable cause)
{
callback.failed(new WebSocketException(endpointInstance.getClass().getSimpleName() + " OPEN method error: " + cause.getMessage(), cause));
}
+ finally
+ {
+ autoDemand();
+ }
}
- @Override
- public void onFrame(Frame frame, Callback callback)
+ private static MessageSink createMessageSink(Class extends MessageSink> sinkClass, WebSocketSession session, MethodHandle msgHandle, boolean autoDemanding)
{
- try (AutoLock l = lock.lock())
- {
- switch (state)
- {
- case DEMANDING:
- break;
-
- case SUSPENDING:
- delayedOnFrame = () -> onFrame(frame, callback);
- state = SuspendState.SUSPENDED;
- return;
+ if (msgHandle == null)
+ return null;
+ if (sinkClass == null)
+ return null;
- default:
- throw new IllegalStateException();
- }
-
- // If we have received a close frame, set state to closed to disallow further suspends and resumes.
- if (frame.getOpCode() == OpCode.CLOSE)
- state = SuspendState.CLOSED;
+ try
+ {
+ MethodHandles.Lookup lookup = JettyWebSocketFrameHandlerFactory.getServerMethodHandleLookup();
+ MethodHandle ctorHandle = lookup.findConstructor(sinkClass,
+ MethodType.methodType(void.class, CoreSession.class, MethodHandle.class, boolean.class));
+ return (MessageSink)ctorHandle.invoke(session.getCoreSession(), msgHandle, autoDemanding);
}
+ catch (NoSuchMethodException e)
+ {
+ throw new RuntimeException("Missing expected MessageSink constructor found at: " + sinkClass.getName(), e);
+ }
+ catch (IllegalAccessException | InstantiationException | InvocationTargetException e)
+ {
+ throw new RuntimeException("Unable to create MessageSink: " + sinkClass.getName(), e);
+ }
+ catch (RuntimeException e)
+ {
+ throw e;
+ }
+ catch (Throwable t)
+ {
+ throw new RuntimeException(t);
+ }
+ }
- // Send to raw frame handling on user side (eg: WebSocketFrameListener)
+ @Override
+ public void onFrame(Frame frame, Callback coreCallback)
+ {
+ CompletableFuture
Maps the given {@code pathSpec} to the creator of WebSocket endpoints.
*The {@code pathSpec} format is that supported by
@@ -296,12 +301,7 @@ public void setInvocationType(InvocationType invocationType)
this.invocationType = invocationType;
}
- private static class Configuration extends org.eclipse.jetty.websocket.core.Configuration.ConfigurationCustomizer implements WebSocketPolicy
+ private static class Configuration extends org.eclipse.jetty.websocket.core.Configuration.ConfigurationCustomizer implements Configurable
{
- @Override
- public WebSocketBehavior getBehavior()
- {
- return WebSocketBehavior.SERVER;
- }
}
}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/AnnoMaxMessageEndpoint.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/AnnoMaxMessageEndpoint.java
index 7b3cf15a9afc..2ddeb1ca3442 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/AnnoMaxMessageEndpoint.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/AnnoMaxMessageEndpoint.java
@@ -15,17 +15,24 @@
import java.io.IOException;
+import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
+import org.eclipse.jetty.websocket.api.annotations.OnWebSocketOpen;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
-@SuppressWarnings("unused")
-@WebSocket(maxTextMessageSize = 100 * 1024)
+@WebSocket
public class AnnoMaxMessageEndpoint
{
+ @OnWebSocketOpen
+ public void onOpen(Session session)
+ {
+ session.setMaxTextMessageSize(100 * 1024);
+ }
+
@OnWebSocketMessage
public void onMessage(Session session, String msg) throws IOException
{
- session.getRemote().sendString(msg);
+ session.sendText(msg, Callback.NOOP);
}
}
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/AnnotatedPartialListenerTest.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/AnnotatedPartialListenerTest.java
index 94305bfe218e..eb12483e66d9 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/AnnotatedPartialListenerTest.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/AnnotatedPartialListenerTest.java
@@ -24,9 +24,8 @@
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.WebSocketPartialListener;
-import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.exceptions.InvalidWebSocketException;
@@ -42,26 +41,26 @@
public class AnnotatedPartialListenerTest
{
- public static class PartialEchoSocket implements WebSocketPartialListener
+ public static class PartialEchoSocket implements Session.Listener.AutoDemanding
{
private Session session;
@Override
- public void onWebSocketConnect(Session session)
+ public void onWebSocketOpen(Session session)
{
this.session = session;
}
@Override
- public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin)
+ public void onWebSocketPartialBinary(ByteBuffer payload, boolean fin, Callback callback)
{
- session.getRemote().sendPartialBytes(payload, fin, WriteCallback.NOOP);
+ session.sendPartialBinary(payload, fin, callback);
}
@Override
public void onWebSocketPartialText(String payload, boolean fin)
{
- session.getRemote().sendPartialString(payload, fin, WriteCallback.NOOP);
+ session.sendPartialText(payload, fin, Callback.NOOP);
}
}
@@ -98,12 +97,13 @@ public static class MessageSegment
}
@OnWebSocketMessage
- public void onMessage(ByteBuffer buffer, boolean last)
+ public void onMessage(ByteBuffer buffer, boolean last, Callback callback)
{
MessageSegment messageSegment = new MessageSegment();
messageSegment.buffer = BufferUtil.copy(buffer);
messageSegment.last = last;
messages.add(messageSegment);
+ callback.succeed();
}
}
@@ -111,12 +111,12 @@ public void onMessage(ByteBuffer buffer, boolean last)
public static class InvalidDoubleBinaryListener
{
@OnWebSocketMessage
- public void onMessage(ByteBuffer bytes, boolean last)
+ public void onMessage(ByteBuffer bytes, boolean last, Callback callback)
{
}
@OnWebSocketMessage
- public void onMessage(ByteBuffer bytes)
+ public void onMessage(ByteBuffer bytes, Callback callback)
{
}
}
@@ -175,9 +175,10 @@ public void testAnnotatedPartialString() throws Exception
PartialStringListener endpoint = new PartialStringListener();
try (Session session = client.connect(endpoint, serverUri).get(5, TimeUnit.SECONDS))
{
- session.getRemote().sendPartialString("hell", false);
- session.getRemote().sendPartialString("o w", false);
- session.getRemote().sendPartialString("orld", true);
+ Callback.Completable.with(c -> session.sendPartialText("hell", false, c))
+ .compose(c -> session.sendPartialText("o w", false, c))
+ .compose(c -> session.sendPartialText("orld", true, c))
+ .get();
}
PartialStringListener.MessageSegment segment;
@@ -201,9 +202,10 @@ public void testAnnotatedPartialByteBuffer() throws Exception
PartialByteBufferListener endpoint = new PartialByteBufferListener();
try (Session session = client.connect(endpoint, serverUri).get(5, TimeUnit.SECONDS))
{
- session.getRemote().sendPartialBytes(BufferUtil.toBuffer("hell"), false);
- session.getRemote().sendPartialBytes(BufferUtil.toBuffer("o w"), false);
- session.getRemote().sendPartialBytes(BufferUtil.toBuffer("orld"), true);
+ Callback.Completable.with(c -> session.sendPartialBinary(BufferUtil.toBuffer("hell"), false, c))
+ .compose(c -> session.sendPartialBinary(BufferUtil.toBuffer("o w"), false, c))
+ .compose(c -> session.sendPartialBinary(BufferUtil.toBuffer("orld"), true, c))
+ .get();
}
PartialByteBufferListener.MessageSegment segment;
diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java
index fe50e8497763..ebc6453f6130 100644
--- a/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java
+++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/CloseTrackingEndpoint.java
@@ -21,8 +21,9 @@
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session;
-import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.core.WebSocketConnection;
import org.eclipse.jetty.websocket.core.WebSocketCoreSession;
@@ -35,7 +36,7 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
-public class CloseTrackingEndpoint extends WebSocketAdapter
+public class CloseTrackingEndpoint extends Session.Listener.AbstractAutoDemanding
{
private static final Logger LOG = LoggerFactory.getLogger(CloseTrackingEndpoint.class);
@@ -43,7 +44,7 @@ public class CloseTrackingEndpoint extends WebSocketAdapter
public String closeReason = null;
public CountDownLatch closeLatch = new CountDownLatch(1);
public AtomicInteger closeCount = new AtomicInteger(0);
- public CountDownLatch openLatch = new CountDownLatch(1);
+ public CountDownLatch connectLatch = new CountDownLatch(1);
public CountDownLatch errorLatch = new CountDownLatch(1);
public LinkedBlockingQueue