Skip to content

Commit

Permalink
Merge pull request #9945 from eclipse/jetty-12.0.x-WebSocketDemand
Browse files Browse the repository at this point in the history
Issue #9944 - remove integer from demand in websocket core
  • Loading branch information
lachlan-roberts authored Jun 28, 2023
2 parents 0863b55 + 2b4e896 commit 0941969
Show file tree
Hide file tree
Showing 43 changed files with 131 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.ReadPendingException;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -152,14 +153,14 @@ public interface CoreSession extends OutgoingFrames, IncomingFrames, Configurati
void abort();

/**
* <p>Manages flow control by indicating demand for WebSocket frames.</p>
* <p>Manages flow control by indicating demand for a WebSocket frame.</p>
* <p>A call to {@link FrameHandler#onFrame(Frame, Callback)} will only
* be made if there is demand.</p>
* <p>If a previous demand has not been fulfilled this will throw {@link ReadPendingException}</p>
*
* @param n the number of frames that can be handled in sequential calls to
* {@link FrameHandler#onFrame(Frame, Callback)}, must be positive.
* {@link FrameHandler#onFrame(Frame, Callback)}.
*/
void demand(long n);
void demand();

/**
* @return true if an extension has been negotiated which uses the RSV1 bit.
Expand Down Expand Up @@ -287,7 +288,7 @@ public void close(int statusCode, String reason, Callback callback)
}

@Override
public void demand(long n)
public void demand()
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;

import org.eclipse.jetty.http.BadMessageException;
Expand All @@ -45,8 +44,8 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
private IncomingFrames incoming;
private OutgoingFrames outgoing;
private final Extension[] rsvClaims = new Extension[3];
private LongConsumer lastDemand;
private DemandChain demandChain = n -> lastDemand.accept(n);
private DemandChain lastDemand;
private DemandChain demandChain = () -> lastDemand.demand();

public ExtensionStack(WebSocketComponents components, Behavior behavior)
{
Expand Down Expand Up @@ -224,10 +223,9 @@ public void negotiate(List<ExtensionConfig> offeredConfigs, List<ExtensionConfig
ext.setNextOutgoingFrames(outgoing);
outgoing = ext;

if (ext instanceof DemandChain)
if (ext instanceof DemandChain demandingExtension)
{
DemandChain demandingExtension = (DemandChain)ext;
demandingExtension.setNextDemand(demandChain::demand);
demandingExtension.setNextDemand(demandChain);
demandChain = demandingExtension;
}
}
Expand Down Expand Up @@ -273,12 +271,12 @@ public void initialize(IncomingFrames incoming, OutgoingFrames outgoing, CoreSes
}
}

public void demand(long n)
public void demand()
{
demandChain.demand(n);
demandChain.demand();
}

public void setLastDemand(LongConsumer lastDemand)
public void setLastDemand(DemandChain lastDemand)
{
this.lastDemand = lastDemand;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* or not. The error code will indicate the nature of the close.</li>
* </ul>
* <p>FrameHandler is responsible to manage the demand for more
* WebSocket frames, either directly by calling {@link CoreSession#demand(long)}
* WebSocket frames, either directly by calling {@link CoreSession#demand()}
* or by delegating the demand management to other components.</p>
*/
public interface FrameHandler extends IncomingFrames
Expand All @@ -64,7 +64,7 @@ public interface FrameHandler extends IncomingFrames
* <p>It is allowed to send WebSocket frames via
* {@link CoreSession#sendFrame(Frame, Callback, boolean)}.
* <p>WebSocket frames cannot be received until a call to
* {@link CoreSession#demand(long)} is made.</p>
* {@link CoreSession#demand()} is made.</p>
* <p>If the callback argument is failed, the implementation
* sends a CLOSE frame with {@link CloseStatus#SERVER_ERROR},
* and the connection will be closed.</p>
Expand All @@ -80,7 +80,7 @@ public interface FrameHandler extends IncomingFrames
* <p>This method will never be called concurrently for the
* same session; will be called sequentially to satisfy the
* outstanding demand signaled by calls to
* {@link CoreSession#demand(long)}.</p>
* {@link CoreSession#demand()}.</p>
* <p>Both control and data frames are passed to this method.</p>
* <p>CLOSE frames may be responded from this method, but if
* they are not responded, then the implementation will respond
Expand All @@ -89,7 +89,7 @@ public interface FrameHandler extends IncomingFrames
* that the buffers associated with the frame can be recycled.</p>
* <p>Additional WebSocket frames (of any type, including CLOSE
* frames) cannot be received until a call to
* {@link CoreSession#demand(long)} is made.</p>
* {@link CoreSession#demand()} is made.</p>
*
* @param frame the WebSocket frame.
* @param callback the callback to indicate success or failure of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,21 +343,18 @@ public void run()
fillAndParse();
}

public void demand(long n)
public void demand()
{
if (n <= 0)
throw new IllegalArgumentException("Demand must be positive");

boolean fillAndParse = false;
try (AutoLock l = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("demand {} d={} fp={} {} {}", n, demand, fillingAndParsing, networkBuffer, this);
LOG.debug("demand {} d={} fp={} {}", demand, fillingAndParsing, networkBuffer, this);

if (demand < 0)
return;

demand = MathUtils.cappedAdd(demand, n);
demand = MathUtils.cappedAdd(demand, 1);

if (!fillingAndParsing)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,9 @@ public void onOpen()
}

@Override
public void demand(long n)
public void demand()
{
getExtensionStack().demand(n);
getExtensionStack().demand();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket.core.internal;

import java.nio.ByteBuffer;
import java.util.function.LongConsumer;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.AbstractExtension;
Expand Down Expand Up @@ -55,13 +54,13 @@ protected void forwardFrame(Frame frame, Callback callback, boolean batch)
}

@Override
public void demand(long n)
public void demand()
{
incomingFlusher.demand(n);
incomingFlusher.demand();
}

@Override
public void setNextDemand(LongConsumer nextDemand)
public void setNextDemand(DemandChain nextDemand)
{
incomingFlusher.setNextDemand(nextDemand);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void onOpen(CoreSession coreSession, Callback callback)

this.coreSession = coreSession;
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}

@Override
Expand Down Expand Up @@ -210,7 +210,7 @@ protected void onTextFrame(Frame frame, Callback callback)
callback.succeeded();
}

coreSession.demand(1);
coreSession.demand();
}
catch (Throwable t)
{
Expand Down Expand Up @@ -244,7 +244,7 @@ protected void onBinaryFrame(Frame frame, Callback callback)
callback.succeeded();
}

coreSession.demand(1);
coreSession.demand();
}
catch (Throwable t)
{
Expand All @@ -264,13 +264,13 @@ protected void onContinuationFrame(Frame frame, Callback callback)

protected void onPingFrame(Frame frame, Callback callback)
{
coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), Callback.from(() -> coreSession.demand(1), callback), false);
coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), Callback.from(() -> coreSession.demand(), callback), false);
}

protected void onPongFrame(Frame frame, Callback callback)
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}

protected void onCloseFrame(Frame frame, Callback callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
Expand Down Expand Up @@ -252,15 +251,15 @@ protected void nextOutgoingFrame(Frame frame, Callback callback, boolean batch)
}

@Override
public void setNextDemand(LongConsumer nextDemand)
public void setNextDemand(DemandChain nextDemand)
{
incomingFlusher.setNextDemand(nextDemand);
}

@Override
public void demand(long n)
public void demand()
{
incomingFlusher.demand(n);
incomingFlusher.demand();
}

private class OutgoingFlusher extends TransformingFlusher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ public boolean isAutoDemand()

/**
* <p>If {@link #isAutoDemand()} then demands for one more WebSocket frame
* via {@link CoreSession#demand(long)}; otherwise it is a no-operation,
* via {@link CoreSession#demand()}; otherwise it is a no-operation,
* because the demand is explicitly managed by the application function.</p>
*/
protected void autoDemand()
{
if (isAutoDemand())
getCoreSession().demand(1);
getCoreSession().demand();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void accept(Frame frame, Callback callback)
if (!frame.isFin() && !frame.hasPayload())
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
return;
}

Expand All @@ -96,7 +96,7 @@ public void accept(Frame frame, Callback callback)
}
else
{
getCoreSession().demand(1);
getCoreSession().demand();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void accept(Frame frame, Callback callback)
if (!frame.isFin() && !frame.hasPayload())
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
return;
}

Expand All @@ -103,7 +103,7 @@ public void accept(Frame frame, Callback callback)
else
{
// Did not call the application so must explicitly demand here.
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void accept(Frame frame, Callback callback)
if (!frame.isFin() && !frame.hasPayload())
{
callback.succeeded();
session.demand(1);
session.demand();
return;
}

Expand Down Expand Up @@ -228,7 +228,7 @@ private void succeedCurrentEntry()
{
current.callback.succeeded();
if (!current.frame.isFin())
session.demand(1);
session.demand();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface MessageSink
* payload is consumed.</p>
* <p>The demand for more frames must be explicitly invoked,
* or arranged to be invoked asynchronously, by the implementation
* of this method, by calling {@link CoreSession#demand(long)}.</p>
* of this method, by calling {@link CoreSession#demand()}.</p>
*
* @param frame the frame to consume
* @param callback the callback to complete when the frame is consumed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void accept(Frame frame, Callback callback)
else
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void accept(Frame frame, Callback callback)
else
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void accept(Frame frame, Callback callback)
else
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

package org.eclipse.jetty.websocket.core.util;

import java.util.function.LongConsumer;

import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.ExtensionStack;

Expand All @@ -25,9 +23,9 @@
*/
public interface DemandChain
{
void demand(long n);
void demand();

default void setNextDemand(LongConsumer nextDemand)
default void setNextDemand(DemandChain nextDemand)
{
}
}
Loading

0 comments on commit 0941969

Please sign in to comment.