diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java index 51ebf73455ec..c9f6bae28b3d 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/CoreSession.java @@ -16,6 +16,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; +import java.nio.channels.ReadPendingException; import java.util.List; import java.util.Map; @@ -152,14 +153,14 @@ public interface CoreSession extends OutgoingFrames, IncomingFrames, Configurati void abort(); /** - *

Manages flow control by indicating demand for WebSocket frames.

+ *

Manages flow control by indicating demand for a WebSocket frame.

*

A call to {@link FrameHandler#onFrame(Frame, Callback)} will only * be made if there is demand.

+ *

If a previous demand has not been fulfilled this will throw {@link ReadPendingException}

* - * @param n the number of frames that can be handled in sequential calls to - * {@link FrameHandler#onFrame(Frame, Callback)}, must be positive. + * {@link FrameHandler#onFrame(Frame, Callback)}. */ - void demand(long n); + void demand(); /** * @return true if an extension has been negotiated which uses the RSV1 bit. @@ -287,7 +288,7 @@ public void close(int statusCode, String reason, Callback callback) } @Override - public void demand(long n) + public void demand() { } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/ExtensionStack.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/ExtensionStack.java index c3a12bfdc081..58bdb6ad544e 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/ExtensionStack.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/ExtensionStack.java @@ -18,7 +18,6 @@ import java.util.Collections; import java.util.List; import java.util.ListIterator; -import java.util.function.LongConsumer; import java.util.stream.Collectors; import org.eclipse.jetty.http.BadMessageException; @@ -45,8 +44,8 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable private IncomingFrames incoming; private OutgoingFrames outgoing; private final Extension[] rsvClaims = new Extension[3]; - private LongConsumer lastDemand; - private DemandChain demandChain = n -> lastDemand.accept(n); + private DemandChain lastDemand; + private DemandChain demandChain = () -> lastDemand.demand(); public ExtensionStack(WebSocketComponents components, Behavior behavior) { @@ -224,10 +223,9 @@ public void negotiate(List offeredConfigs, List * *

FrameHandler is responsible to manage the demand for more - * WebSocket frames, either directly by calling {@link CoreSession#demand(long)} + * WebSocket frames, either directly by calling {@link CoreSession#demand()} * or by delegating the demand management to other components.

*/ public interface FrameHandler extends IncomingFrames @@ -64,7 +64,7 @@ public interface FrameHandler extends IncomingFrames *

It is allowed to send WebSocket frames via * {@link CoreSession#sendFrame(Frame, Callback, boolean)}. *

WebSocket frames cannot be received until a call to - * {@link CoreSession#demand(long)} is made.

+ * {@link CoreSession#demand()} is made.

*

If the callback argument is failed, the implementation * sends a CLOSE frame with {@link CloseStatus#SERVER_ERROR}, * and the connection will be closed.

@@ -80,7 +80,7 @@ public interface FrameHandler extends IncomingFrames *

This method will never be called concurrently for the * same session; will be called sequentially to satisfy the * outstanding demand signaled by calls to - * {@link CoreSession#demand(long)}.

+ * {@link CoreSession#demand()}.

*

Both control and data frames are passed to this method.

*

CLOSE frames may be responded from this method, but if * they are not responded, then the implementation will respond @@ -89,7 +89,7 @@ public interface FrameHandler extends IncomingFrames * that the buffers associated with the frame can be recycled.

*

Additional WebSocket frames (of any type, including CLOSE * frames) cannot be received until a call to - * {@link CoreSession#demand(long)} is made.

+ * {@link CoreSession#demand()} is made.

* * @param frame the WebSocket frame. * @param callback the callback to indicate success or failure of diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java index 66d985299278..11909455d2e3 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketConnection.java @@ -343,21 +343,18 @@ public void run() fillAndParse(); } - public void demand(long n) + public void demand() { - if (n <= 0) - throw new IllegalArgumentException("Demand must be positive"); - boolean fillAndParse = false; try (AutoLock l = lock.lock()) { if (LOG.isDebugEnabled()) - LOG.debug("demand {} d={} fp={} {} {}", n, demand, fillingAndParsing, networkBuffer, this); + LOG.debug("demand {} d={} fp={} {}", demand, fillingAndParsing, networkBuffer, this); if (demand < 0) return; - demand = MathUtils.cappedAdd(demand, n); + demand = MathUtils.cappedAdd(demand, 1); if (!fillingAndParsing) { diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java index da5738982ae3..32102dd7f6ba 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/WebSocketCoreSession.java @@ -402,9 +402,9 @@ public void onOpen() } @Override - public void demand(long n) + public void demand() { - getExtensionStack().demand(n); + getExtensionStack().demand(); } @Override diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java index d41267f6dbe3..b048421d35fd 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/FragmentExtension.java @@ -14,7 +14,6 @@ package org.eclipse.jetty.websocket.core.internal; import java.nio.ByteBuffer; -import java.util.function.LongConsumer; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.AbstractExtension; @@ -55,13 +54,13 @@ protected void forwardFrame(Frame frame, Callback callback, boolean batch) } @Override - public void demand(long n) + public void demand() { - incomingFlusher.demand(n); + incomingFlusher.demand(); } @Override - public void setNextDemand(LongConsumer nextDemand) + public void setNextDemand(DemandChain nextDemand) { incomingFlusher.setNextDemand(nextDemand); } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/MessageHandler.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/MessageHandler.java index 81bf7f7585c0..bdcad67dacc0 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/MessageHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/MessageHandler.java @@ -117,7 +117,7 @@ public void onOpen(CoreSession coreSession, Callback callback) this.coreSession = coreSession; callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } @Override @@ -210,7 +210,7 @@ protected void onTextFrame(Frame frame, Callback callback) callback.succeeded(); } - coreSession.demand(1); + coreSession.demand(); } catch (Throwable t) { @@ -244,7 +244,7 @@ protected void onBinaryFrame(Frame frame, Callback callback) callback.succeeded(); } - coreSession.demand(1); + coreSession.demand(); } catch (Throwable t) { @@ -264,13 +264,13 @@ protected void onContinuationFrame(Frame frame, Callback callback) protected void onPingFrame(Frame frame, Callback callback) { - coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), Callback.from(() -> coreSession.demand(1), callback), false); + coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), Callback.from(() -> coreSession.demand(), callback), false); } protected void onPongFrame(Frame frame, Callback callback) { callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } protected void onCloseFrame(Frame frame, Callback callback) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java index 14cb66a15e37..67cb1ebcb578 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/PerMessageDeflateExtension.java @@ -17,7 +17,6 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.LongConsumer; import java.util.zip.DataFormatException; import java.util.zip.Deflater; import java.util.zip.Inflater; @@ -252,15 +251,15 @@ protected void nextOutgoingFrame(Frame frame, Callback callback, boolean batch) } @Override - public void setNextDemand(LongConsumer nextDemand) + public void setNextDemand(DemandChain nextDemand) { incomingFlusher.setNextDemand(nextDemand); } @Override - public void demand(long n) + public void demand() { - incomingFlusher.demand(n); + incomingFlusher.demand(); } private class OutgoingFlusher extends TransformingFlusher diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/AbstractMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/AbstractMessageSink.java index 819b1a6154ae..6a33b5a4c823 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/AbstractMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/AbstractMessageSink.java @@ -87,12 +87,12 @@ public boolean isAutoDemand() /** *

If {@link #isAutoDemand()} then demands for one more WebSocket frame - * via {@link CoreSession#demand(long)}; otherwise it is a no-operation, + * via {@link CoreSession#demand()}; otherwise it is a no-operation, * because the demand is explicitly managed by the application function.

*/ protected void autoDemand() { if (isAutoDemand()) - getCoreSession().demand(1); + getCoreSession().demand(); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteArrayMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteArrayMessageSink.java index f243a9cd49a8..e6be28115aae 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteArrayMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteArrayMessageSink.java @@ -78,7 +78,7 @@ public void accept(Frame frame, Callback callback) if (!frame.isFin() && !frame.hasPayload()) { callback.succeeded(); - getCoreSession().demand(1); + getCoreSession().demand(); return; } @@ -96,7 +96,7 @@ public void accept(Frame frame, Callback callback) } else { - getCoreSession().demand(1); + getCoreSession().demand(); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteBufferMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteBufferMessageSink.java index d6bc3fe34f60..afcec2dda7ef 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteBufferMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/ByteBufferMessageSink.java @@ -82,7 +82,7 @@ public void accept(Frame frame, Callback callback) if (!frame.isFin() && !frame.hasPayload()) { callback.succeeded(); - getCoreSession().demand(1); + getCoreSession().demand(); return; } @@ -103,7 +103,7 @@ public void accept(Frame frame, Callback callback) else { // Did not call the application so must explicitly demand here. - getCoreSession().demand(1); + getCoreSession().demand(); } } catch (Throwable t) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/MessageInputStream.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/MessageInputStream.java index afd93bde9832..ecc7fefea714 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/MessageInputStream.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/MessageInputStream.java @@ -65,7 +65,7 @@ public void accept(Frame frame, Callback callback) if (!frame.isFin() && !frame.hasPayload()) { callback.succeeded(); - session.demand(1); + session.demand(); return; } @@ -228,7 +228,7 @@ private void succeedCurrentEntry() { current.callback.succeeded(); if (!current.frame.isFin()) - session.demand(1); + session.demand(); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/MessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/MessageSink.java index 896ae3baad23..7792327a19ab 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/MessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/MessageSink.java @@ -33,7 +33,7 @@ public interface MessageSink * payload is consumed.

*

The demand for more frames must be explicitly invoked, * or arranged to be invoked asynchronously, by the implementation - * of this method, by calling {@link CoreSession#demand(long)}.

+ * of this method, by calling {@link CoreSession#demand()}.

* * @param frame the frame to consume * @param callback the callback to complete when the frame is consumed diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteArrayMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteArrayMessageSink.java index fd34a4b7c68f..c07b35484bcc 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteArrayMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteArrayMessageSink.java @@ -54,7 +54,7 @@ public void accept(Frame frame, Callback callback) else { callback.succeeded(); - getCoreSession().demand(1); + getCoreSession().demand(); } } catch (Throwable t) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteBufferMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteBufferMessageSink.java index cde238b56569..5cdf0ebba02d 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteBufferMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/PartialByteBufferMessageSink.java @@ -52,7 +52,7 @@ public void accept(Frame frame, Callback callback) else { callback.succeeded(); - getCoreSession().demand(1); + getCoreSession().demand(); } } catch (Throwable t) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/StringMessageSink.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/StringMessageSink.java index 01650fc64472..ca7c6a063bb5 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/StringMessageSink.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/messages/StringMessageSink.java @@ -72,7 +72,7 @@ public void accept(Frame frame, Callback callback) else { callback.succeeded(); - getCoreSession().demand(1); + getCoreSession().demand(); } } catch (Throwable t) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandChain.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandChain.java index c0cc0ecee478..bf85b5b83d2c 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandChain.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandChain.java @@ -13,8 +13,6 @@ package org.eclipse.jetty.websocket.core.util; -import java.util.function.LongConsumer; - import org.eclipse.jetty.websocket.core.Extension; import org.eclipse.jetty.websocket.core.ExtensionStack; @@ -25,9 +23,9 @@ */ public interface DemandChain { - void demand(long n); + void demand(); - default void setNextDemand(LongConsumer nextDemand) + default void setNextDemand(DemandChain nextDemand) { } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java index 9c86efc33dab..be55fc88d424 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/util/DemandingFlusher.java @@ -15,7 +15,6 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.LongConsumer; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.CountingCallback; @@ -44,7 +43,7 @@ public abstract class DemandingFlusher extends IteratingCallback implements Dema private final IncomingFrames _emitFrame; private final AtomicLong _demand = new AtomicLong(); private final AtomicReference _failure = new AtomicReference<>(); - private LongConsumer _nextDemand; + private DemandChain _nextDemand; private Frame _frame; private Callback _callback; @@ -76,21 +75,21 @@ public DemandingFlusher(IncomingFrames emitFrame) protected abstract boolean handle(Frame frame, Callback callback, boolean first); @Override - public void demand(long n) + public void demand() { - _demand.getAndUpdate(d -> Math.addExact(d, n)); + _demand.incrementAndGet(); iterate(); } @Override - public void setNextDemand(LongConsumer nextDemand) + public void setNextDemand(DemandChain nextDemand) { _nextDemand = nextDemand; } /** * Used to supply the flusher with a new frame. This frame should only arrive if demanded - * through the {@link LongConsumer} provided by {@link #setNextDemand(LongConsumer)}. + * through the {@link DemandChain} provided by {@link #setNextDemand(DemandChain)}. * @param frame the WebSocket frame. * @param callback to release frame payload. */ @@ -160,7 +159,7 @@ protected Action process() throws Throwable if (_needContent) { _needContent = false; - _nextDemand.accept(1); + _nextDemand.demand(); return Action.SCHEDULED; } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java index bdf956869844..77c0a5463700 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandTest.java @@ -69,14 +69,14 @@ public void onOpen(CoreSession coreSession, Callback callback) { _coreSession = coreSession; callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } @Override public void onFrame(Frame frame, Callback callback) { callback.succeeded(); - _coreSession.demand(1); + _coreSession.demand(); } @Override @@ -111,7 +111,7 @@ public void onFrame(Frame frame, Callback callback) _coreSession.abort(); // Demand should not throw even if closed. - _coreSession.demand(1); + _coreSession.demand(); errorFuture.complete(null); } catch (Throwable t) diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandingIncomingFramesCapture.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandingIncomingFramesCapture.java index 365021d011f9..b66c219123ba 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandingIncomingFramesCapture.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/DemandingIncomingFramesCapture.java @@ -33,7 +33,7 @@ public void onFrame(Frame frame, Callback callback) } finally { - _coreSession.demand(1); + _coreSession.demand(); } } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java index 0f3ff967f13b..7f21f05ac414 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/EchoFrameHandler.java @@ -56,6 +56,6 @@ public void onFrame(Frame frame, Callback callback) callback.succeeded(); } - coreSession.demand(1); + coreSession.demand(); } } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestAsyncFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestAsyncFrameHandler.java index 5715268773d4..b6cc1cce0a6f 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestAsyncFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestAsyncFrameHandler.java @@ -51,7 +51,7 @@ public void onOpen(CoreSession coreSession, Callback callback) LOG.debug("[{}] onOpen {}", name, coreSession); this.coreSession = coreSession; callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); openLatch.countDown(); } @@ -62,7 +62,7 @@ public void onFrame(Frame frame, Callback callback) LOG.debug("[{}] onFrame {}", name, frame); receivedFrames.offer(Frame.copy(frame)); callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } @Override diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java index 7fb8b865b06e..d4f1ef133da9 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java @@ -71,7 +71,7 @@ public void onFrame(Frame frame) protected void demand() { - coreSession.demand(1); + coreSession.demand(); } @Override diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java index 8d4d7c954265..66608a7dcd7d 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java @@ -111,7 +111,7 @@ public void setup(State state, String scheme) throws Exception case ISHUT: { client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true)); - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); Frame frame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS); assertThat(new CloseStatus(frame.getPayload()).getCode(), is(CloseStatus.NORMAL)); assertThat(serverHandler.coreSession.toString(), containsString("ISHUT")); @@ -193,7 +193,7 @@ public void testClientClosesOutputISHUT(String scheme) throws Exception public void testClientCloseOSHUT(String scheme) throws Exception { setup(State.OSHUT, scheme); - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true)); assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS)); serverHandler.receivedCallback.poll().succeeded(); @@ -209,7 +209,7 @@ public void testClientCloseOSHUT(String scheme) throws Exception public void testClientDifferentCloseOSHUT(String scheme) throws Exception { setup(State.OSHUT, scheme); - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.BAD_PAYLOAD), true)); assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS)); serverHandler.receivedCallback.poll().succeeded(); @@ -227,7 +227,7 @@ public void testClientCloseServerFailCloseOSHUT(String scheme) throws Exception try (StacklessLogging ignored = new StacklessLogging(WebSocketCoreSession.class)) { setup(State.OSHUT, scheme); - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true)); assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS)); serverHandler.receivedCallback.poll().failed(new Exception("Test")); @@ -246,7 +246,7 @@ public void testClientSendsBadFrameOPEN(String scheme) throws Exception setup(State.OPEN, scheme); client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false)); - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS)); assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.PROTOCOL)); assertThat(serverHandler.closeStatus.getReason(), containsString("Client MUST mask all frames")); @@ -259,7 +259,7 @@ public void testClientSendsBadFrameOSHUT(String scheme) throws Exception setup(State.OSHUT, scheme); client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.PONG, "pong frame not masked", false)); - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS)); assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.PROTOCOL)); assertThat(serverHandler.closeStatus.getReason(), containsString("Client MUST mask all frames")); @@ -335,7 +335,7 @@ public void testClientAbortsOPEN(String scheme) throws Exception client.close(); assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS)); - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS)); assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE)); } @@ -348,7 +348,7 @@ public void testClientAbortsOSHUT(String scheme) throws Exception client.close(); assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS)); - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS)); assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.NO_CLOSE)); } @@ -376,7 +376,7 @@ public void testOnFrameThrowsOPEN(String scheme) throws Exception try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class)) { - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS)); } @@ -394,7 +394,7 @@ public void testOnFrameThrowsOSHUT(String scheme) throws Exception try (StacklessLogging stacklessLogging = new StacklessLogging(WebSocketCoreSession.class)) { - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS)); } @@ -446,7 +446,7 @@ public void doubleNormalClose(String scheme) throws Exception client.getOutputStream().write(RawFrameBuilder.buildClose( new CloseStatus(CloseStatus.NORMAL, "normal response 1"), true)); - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); assertNotNull(serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS)); Callback closeFrameCallback = Objects.requireNonNull(serverHandler.receivedCallback.poll()); closeFrameCallback.succeeded(); @@ -517,7 +517,7 @@ public void testThrowFromOnCloseFrame(String scheme) throws Exception CloseStatus closeStatus = new CloseStatus(CloseStatus.NORMAL, "throw from onFrame"); client.getOutputStream().write(RawFrameBuilder.buildClose(closeStatus, true)); - serverHandler.coreSession.demand(1); + serverHandler.coreSession.demand(); assertTrue(serverHandler.closed.await(5, TimeUnit.SECONDS)); assertThat(serverHandler.closeStatus.getCode(), is(CloseStatus.SERVER_ERROR)); assertThat(serverHandler.closeStatus.getReason(), containsString("deliberately throwing from onFrame")); diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java index 8012efe24db9..d20196c589d6 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java @@ -65,7 +65,7 @@ public void testSendFrameInOnOpen() throws Exception assertThat(s.toString(), containsString("CONNECTED")); s.sendFrame(new Frame(OpCode.TEXT, "Hello"), NOOP, false); c.succeeded(); - s.demand(1); + s.demand(); }); Frame.Parsed frame = receiveFrame(client.getInputStream()); assertThat(frame.getPayloadAsUTF8(), is("Hello")); @@ -155,7 +155,7 @@ public void testAsyncOnOpen() throws Exception // Demanding in onOpen will allow you to receive frames. client.getOutputStream().write(RawFrameBuilder.buildFrame(OpCode.TEXT, "message in onOpen", true)); assertNull(serverHandler.receivedFrames.poll(1, TimeUnit.SECONDS)); - coreSession.demand(1); + coreSession.demand(); Frame rcvFrame = serverHandler.receivedFrames.poll(5, TimeUnit.SECONDS); assertNotNull(rcvFrame); assertThat(rcvFrame.getPayloadAsUTF8(), is("message in onOpen")); @@ -163,7 +163,7 @@ public void testAsyncOnOpen() throws Exception // Demand to receive the close frame. client.getOutputStream().write(RawFrameBuilder.buildClose(new CloseStatus(CloseStatus.NORMAL), true)); assertFalse(serverHandler.closeLatch.await(1, TimeUnit.SECONDS)); - coreSession.demand(1); + coreSession.demand(); assertTrue(serverHandler.closeLatch.await(5, TimeUnit.SECONDS)); // Closed handled normally diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/client/WebSocketClientServerTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/client/WebSocketClientServerTest.java index 0638c6e000a8..a164569fbb8c 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/client/WebSocketClientServerTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/client/WebSocketClientServerTest.java @@ -104,7 +104,7 @@ public void onFrame(Frame frame, Callback callback) else { callback.succeeded(); - getCoreSession().demand(1); + getCoreSession().demand(); } } }; diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/ExtensionTool.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/ExtensionTool.java index e3359d513f89..d350c12e28e1 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/ExtensionTool.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/ExtensionTool.java @@ -82,7 +82,7 @@ public void parseIncomingHex(String... rawhex) byte[] net; // Simulate initial demand from onOpen(). - coreSession.demand(1); + coreSession.demand(); for (int i = 0; i < parts; i++) { @@ -102,7 +102,7 @@ public void parseIncomingHex(String... rawhex) public void succeeded() { super.succeeded(); - coreSession.demand(1); + coreSession.demand(); } }; ext.onFrame(frame, callback); @@ -172,7 +172,7 @@ public Tester newTester(String parameterizedExtension) private WebSocketCoreSession newWebSocketCoreSession(List configs) { ExtensionStack exStack = new ExtensionStack(components, Behavior.SERVER); - exStack.setLastDemand(l -> {}); // Never delegate to WebSocketConnection as it is null for this test. + exStack.setLastDemand(() -> {}); // Never delegate to WebSocketConnection as it is null for this test. exStack.negotiate(configs, configs); return new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack), components); } diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/FragmentExtensionTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/FragmentExtensionTest.java index 15110d91c043..c19b7aa0dcc9 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/FragmentExtensionTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/FragmentExtensionTest.java @@ -59,7 +59,7 @@ public void testIncomingFrames() throws Exception ext.setNextIncomingFrames(capture); // Simulate initial demand from onOpen(). - coreSession.demand(1); + coreSession.demand(); // Quote List quote = new ArrayList<>(); @@ -131,7 +131,7 @@ public void testIncomingPing() ext.setNextIncomingFrames(capture); // Simulate initial demand from onOpen(). - coreSession.demand(1); + coreSession.demand(); String payload = "Are you there?"; Frame ping = new Frame(OpCode.PING).setPayload(payload); @@ -333,7 +333,7 @@ private WebSocketCoreSession newSessionFromConfig(Configuration.ConfigurationCus { ExtensionStack exStack = new ExtensionStack(components, Behavior.SERVER); exStack.negotiate(configs, configs); - exStack.setLastDemand(l -> {}); // Never delegate to WebSocketConnection as it is null for this test. + exStack.setLastDemand(() -> {}); // Never delegate to WebSocketConnection as it is null for this test. WebSocketCoreSession coreSession = new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack), components); configuration.customize(configuration); return coreSession; diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/PerMessageDeflateExtensionTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/PerMessageDeflateExtensionTest.java index 6db995869b9b..1ab713aac6bc 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/PerMessageDeflateExtensionTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/PerMessageDeflateExtensionTest.java @@ -295,7 +295,7 @@ public void testIncomingPing() ext.setNextIncomingFrames(capture); // Simulate initial demand from onOpen(). - coreSession.demand(1); + coreSession.demand(); String payload = "Are you there?"; Frame ping = new Frame(OpCode.PING).setPayload(payload); @@ -333,7 +333,7 @@ public void testIncomingUncompressedFrames() ext.setNextIncomingFrames(capture); // Simulate initial demand from onOpen(). - coreSession.demand(1); + coreSession.demand(); // Quote List quote = new ArrayList<>(); @@ -386,7 +386,7 @@ public void testIncomingFrameNoPayload() ext.setNextIncomingFrames(capture); // Simulate initial demand from onOpen(). - coreSession.demand(1); + coreSession.demand(); Frame ping = new Frame(OpCode.TEXT); ping.setRsv1(true); @@ -613,7 +613,7 @@ private WebSocketCoreSession newSessionFromConfig(ConfigurationCustomizer config { ExtensionStack exStack = new ExtensionStack(components, Behavior.SERVER); exStack.negotiate(configs, configs); - exStack.setLastDemand(l -> {}); // Never delegate to WebSocketConnection as it is null for this test. + exStack.setLastDemand(() -> {}); // Never delegate to WebSocketConnection as it is null for this test. WebSocketCoreSession coreSession = new WebSocketCoreSession(new TestMessageHandler(), Behavior.SERVER, Negotiated.from(exStack), components); configuration.customize(configuration); return coreSession; diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/PermessageDeflateDemandTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/PermessageDeflateDemandTest.java index 5f3284ff0ebf..ca91ccfbd521 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/PermessageDeflateDemandTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/extensions/PermessageDeflateDemandTest.java @@ -120,7 +120,7 @@ public void onOpen(CoreSession coreSession, Callback callback) { _coreSession = coreSession; callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } @Override @@ -170,7 +170,7 @@ public void onFrame(Frame frame, Callback callback) callback.succeeded(); } - _coreSession.demand(1); + _coreSession.demand(); } @Override diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java index a5484a9c1d10..085c80732b70 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/proxy/WebSocketProxy.java @@ -137,7 +137,7 @@ private void onOpenSuccess(Callback callback) else { callback.succeeded(); - client2ProxySession.demand(1); + client2ProxySession.demand(); } } @@ -226,7 +226,7 @@ public void onFrame(Frame frame, Callback callback) proxy2Server.send(frame, Callback.from(() -> { c.succeeded(); - client2ProxySession.demand(1); + client2ProxySession.demand(); }, c::failed)); } else @@ -471,7 +471,7 @@ private void onConnectSuccess(CoreSession coreSession, Callback callback) else { callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } } @@ -591,7 +591,7 @@ public void onFrame(Frame frame, Callback callback) client2Proxy.send(frame, Callback.from(() -> { c.succeeded(); - proxy2ServerSession.demand(1); + proxy2ServerSession.demand(); }, c::failed)); } else diff --git a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/server/WebSocketServerTest.java b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/server/WebSocketServerTest.java index d8e9483ade87..57cc99a7ea6a 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/server/WebSocketServerTest.java +++ b/jetty-core/jetty-websocket/jetty-websocket-core-tests/src/test/java/org/eclipse/jetty/websocket/core/server/WebSocketServerTest.java @@ -110,7 +110,8 @@ protected void demand() Frame frame = serverHandler.receivedFrames.poll(250, TimeUnit.MILLISECONDS); assertNull(frame); - serverHandler.getCoreSession().demand(2); + serverHandler.getCoreSession().demand(); + serverHandler.getCoreSession().demand(); frame = serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS); assertNotNull(frame); @@ -124,7 +125,7 @@ protected void demand() client.getOutputStream().write(RawFrameBuilder.buildClose(CloseStatus.NORMAL_STATUS, true)); assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS)); - serverHandler.getCoreSession().demand(1); + serverHandler.getCoreSession().demand(); assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS)); frame = serverHandler.receivedFrames.poll(10, TimeUnit.SECONDS); assertNotNull(frame); @@ -148,7 +149,7 @@ public void testDemandAndRetain() throws Exception public void onOpen(CoreSession coreSession, Callback callback) { super.onOpen(coreSession, callback); - coreSession.demand(1); + coreSession.demand(); } @Override @@ -157,7 +158,7 @@ public void onFrame(Frame frame, Callback callback) LOG.info("onFrame: " + BufferUtil.toDetailString(frame.getPayload())); receivedFrames.offer(frame); receivedCallbacks.offer(callback); - getCoreSession().demand(1); + getCoreSession().demand(); } }; @@ -248,7 +249,9 @@ public void testTcpCloseNoDemand() throws Exception public void onOpen(CoreSession coreSession, Callback callback) { super.onOpen(coreSession, callback); - coreSession.demand(3); + coreSession.demand(); + coreSession.demand(); + coreSession.demand(); } @Override @@ -289,7 +292,7 @@ protected void demand() client.close(); assertFalse(serverHandler.closed.await(250, TimeUnit.MILLISECONDS)); - serverHandler.getCoreSession().demand(1); + serverHandler.getCoreSession().demand(); assertTrue(serverHandler.closed.await(10, TimeUnit.SECONDS)); } } @@ -307,7 +310,8 @@ public void onOpen(CoreSession coreSession, Callback callback) { super.onOpen(coreSession); callback.succeeded(); - coreSession.demand(2); + coreSession.demand(); + coreSession.demand(); } @Override @@ -375,7 +379,8 @@ public void onOpen(CoreSession coreSession, Callback callback) { super.onOpen(coreSession); callback.succeeded(); - coreSession.demand(2); + coreSession.demand(); + coreSession.demand(); } @Override diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 9d9665dd49af..5a998f9806b2 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -402,7 +402,7 @@ boolean isAutoDemand() private void autoDemand() { if (isAutoDemand()) - session.getCoreSession().demand(1); + session.getCoreSession().demand(); } public String toString() diff --git a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 8999602a90d1..79ad7521625d 100644 --- a/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-core/jetty-websocket/jetty-websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -55,7 +55,7 @@ public void demand() { if (frameHandler.isAutoDemand()) throw new IllegalStateException("auto-demanding endpoint cannot explicitly demand"); - coreSession.demand(1); + coreSession.demand(); } @Override diff --git a/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-common/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/common/JakartaWebSocketFrameHandler.java b/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-common/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/common/JakartaWebSocketFrameHandler.java index 8527b3a6951e..30681596212d 100644 --- a/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-common/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/common/JakartaWebSocketFrameHandler.java +++ b/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-common/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/common/JakartaWebSocketFrameHandler.java @@ -178,7 +178,7 @@ public void onOpen(CoreSession coreSession, Callback callback) container.notifySessionListeners((listener) -> listener.onJakartaWebSocketSessionOpened(session)); callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } catch (Throwable cause) { @@ -578,7 +578,7 @@ private void acceptMessage(Frame frame, Callback callback) if (activeMessageSink == null) { callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); return; } @@ -593,12 +593,12 @@ public void onPing(Frame frame, Callback callback) coreSession.sendFrame(new Frame(OpCode.PONG).setPayload(frame.getPayload()), Callback.from(() -> { callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); }, x -> { // Ignore failures, as we might be OSHUT but receive a PING. callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); }), false); } @@ -616,7 +616,7 @@ public void onPong(Frame frame, Callback callback) JakartaWebSocketPongMessage pongMessage = new JakartaWebSocketPongMessage(payload); pongHandle.invoke(pongMessage); callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } catch (Throwable cause) { @@ -626,7 +626,7 @@ public void onPong(Frame frame, Callback callback) else { callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } } diff --git a/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-common/src/test/java/org/eclipse/jetty/ee10/websocket/jakarta/common/AbstractSessionTest.java b/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-common/src/test/java/org/eclipse/jetty/ee10/websocket/jakarta/common/AbstractSessionTest.java index d759362e8363..4f0b2e7359f9 100644 --- a/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-common/src/test/java/org/eclipse/jetty/ee10/websocket/jakarta/common/AbstractSessionTest.java +++ b/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-common/src/test/java/org/eclipse/jetty/ee10/websocket/jakarta/common/AbstractSessionTest.java @@ -81,7 +81,7 @@ public void waitForDemand(long timeout, TimeUnit timeUnit) throws InterruptedExc } @Override - public void demand(long n) + public void demand() { demand.release(); } diff --git a/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/tests/NetworkFuzzer.java b/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/tests/NetworkFuzzer.java index bb127c34cbc0..6acedda02096 100644 --- a/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/tests/NetworkFuzzer.java +++ b/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/tests/NetworkFuzzer.java @@ -229,7 +229,7 @@ public void onOpen(CoreSession coreSession, Callback callback) this.coreSession = coreSession; this.openLatch.countDown(); callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } @Override @@ -237,7 +237,7 @@ public void onFrame(Frame frame, Callback callback) { receivedFrames.offer(Frame.copy(frame)); callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } @Override diff --git a/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/tests/framehandlers/FrameEcho.java b/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/tests/framehandlers/FrameEcho.java index 0a8de6276e29..4618a7bb994c 100644 --- a/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/tests/framehandlers/FrameEcho.java +++ b/jetty-ee10/jetty-ee10-websocket/jetty-ee10-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee10/websocket/jakarta/tests/framehandlers/FrameEcho.java @@ -32,7 +32,7 @@ public void onOpen(CoreSession coreSession, Callback callback) { this.coreSession = coreSession; callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } @Override diff --git a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-common/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/common/JakartaWebSocketFrameHandler.java b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-common/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/common/JakartaWebSocketFrameHandler.java index feaa31fe733f..a7d36d2412a6 100644 --- a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-common/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/common/JakartaWebSocketFrameHandler.java +++ b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-common/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/common/JakartaWebSocketFrameHandler.java @@ -179,7 +179,7 @@ public void onOpen(CoreSession coreSession, Callback callback) container.notifySessionListeners((listener) -> listener.onJakartaWebSocketSessionOpened(session)); callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } catch (Throwable cause) { @@ -585,7 +585,7 @@ private void acceptMessage(Frame frame, Callback callback) if (activeMessageSink == null) { callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); return; } @@ -600,12 +600,12 @@ public void onPing(Frame frame, Callback callback) coreSession.sendFrame(new Frame(OpCode.PONG).setPayload(frame.getPayload()), Callback.from(() -> { callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); }, x -> { // Ignore failures, as we might be OSHUT but receive a PING. callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); }), false); } @@ -623,7 +623,7 @@ public void onPong(Frame frame, Callback callback) JakartaWebSocketPongMessage pongMessage = new JakartaWebSocketPongMessage(payload); pongHandle.invoke(pongMessage); callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } catch (Throwable cause) { @@ -633,7 +633,7 @@ public void onPong(Frame frame, Callback callback) else { callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } } diff --git a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-common/src/test/java/org/eclipse/jetty/ee9/websocket/jakarta/common/AbstractSessionTest.java b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-common/src/test/java/org/eclipse/jetty/ee9/websocket/jakarta/common/AbstractSessionTest.java index aa559af2a140..32d8c4760cbe 100644 --- a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-common/src/test/java/org/eclipse/jetty/ee9/websocket/jakarta/common/AbstractSessionTest.java +++ b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-common/src/test/java/org/eclipse/jetty/ee9/websocket/jakarta/common/AbstractSessionTest.java @@ -75,7 +75,7 @@ public void waitForDemand(long timeout, TimeUnit timeUnit) throws InterruptedExc } @Override - public void demand(long n) + public void demand() { demand.release(); } diff --git a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/tests/NetworkFuzzer.java b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/tests/NetworkFuzzer.java index 201b31b9ddae..f5e75693dbdc 100644 --- a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/tests/NetworkFuzzer.java +++ b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/tests/NetworkFuzzer.java @@ -229,7 +229,7 @@ public void onOpen(CoreSession coreSession, Callback callback) this.coreSession = coreSession; this.openLatch.countDown(); callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } @Override @@ -237,7 +237,7 @@ public void onFrame(Frame frame, Callback callback) { receivedFrames.offer(Frame.copy(frame)); callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } @Override diff --git a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/tests/framehandlers/FrameEcho.java b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/tests/framehandlers/FrameEcho.java index 79314e3c1cfc..a3ff7d5e45d3 100644 --- a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/tests/framehandlers/FrameEcho.java +++ b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jakarta-tests/src/main/java/org/eclipse/jetty/ee9/websocket/jakarta/tests/framehandlers/FrameEcho.java @@ -32,7 +32,7 @@ public void onOpen(CoreSession coreSession, Callback callback) { this.coreSession = coreSession; callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); } @Override @@ -41,7 +41,7 @@ public void onFrame(Frame frame, Callback callback) Runnable succeedAndDemand = () -> { callback.succeeded(); - coreSession.demand(1); + coreSession.demand(); }; if (frame.isControlFrame()) diff --git a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jetty-common/src/main/java/org/eclipse/jetty/ee9/websocket/common/JettyWebSocketFrameHandler.java b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jetty-common/src/main/java/org/eclipse/jetty/ee9/websocket/common/JettyWebSocketFrameHandler.java index 9a348fd08111..6acd91608a00 100644 --- a/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jetty-common/src/main/java/org/eclipse/jetty/ee9/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-ee9/jetty-ee9-websocket/jetty-ee9-websocket-jetty-common/src/main/java/org/eclipse/jetty/ee9/websocket/common/JettyWebSocketFrameHandler.java @@ -473,7 +473,7 @@ public void resume() if (frame != null) onFrame(frame, callback); else - session.getCoreSession().demand(1); + session.getCoreSession().demand(); } } @@ -498,7 +498,7 @@ private void demand() } if (demand) - session.getCoreSession().demand(1); + session.getCoreSession().demand(); } public static Throwable convertCause(Throwable cause)