Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #9944 - remove integer from demand in websocket core #9945

Merged
merged 3 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,13 @@ public interface CoreSession extends OutgoingFrames, IncomingFrames, Configurati
void abort();

/**
* <p>Manages flow control by indicating demand for WebSocket frames.</p>
* <p>Manages flow control by indicating demand for a WebSocket frame.</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behaviour for calling demand when there is already pending demand needs to be documented. You are currently internally tracking a long count, so you have effectively kept the long and could reproduce with:

   public void demand(long n)
   {
       for (long i = 0; i < n; i++)
           demand();
   }

I'm not sure this is optimal. Could we throw ISE if there is already pending demand?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think you need to javadoc in this PR what is the expected behaviour is of a demand call with pending demand... even if the actual behaviour is implemented in a different PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be ReadPendingException or IllegalStateException?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadPendingException extends IllegalStateException so either works.

* <p>A call to {@link FrameHandler#onFrame(Frame, Callback)} will only
* be made if there is demand.</p>
*
* @param n the number of frames that can be handled in sequential calls to
* {@link FrameHandler#onFrame(Frame, Callback)}, must be positive.
* {@link FrameHandler#onFrame(Frame, Callback)}.
*/
void demand(long n);
void demand();

/**
* @return true if an extension has been negotiated which uses the RSV1 bit.
Expand Down Expand Up @@ -287,7 +286,7 @@ public void close(int statusCode, String reason, Callback callback)
}

@Override
public void demand(long n)
public void demand()
{
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.function.LongConsumer;
import java.util.stream.Collectors;

import org.eclipse.jetty.http.BadMessageException;
Expand All @@ -45,8 +44,8 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
private IncomingFrames incoming;
private OutgoingFrames outgoing;
private final Extension[] rsvClaims = new Extension[3];
private LongConsumer lastDemand;
private DemandChain demandChain = n -> lastDemand.accept(n);
private DemandChain lastDemand;
private DemandChain demandChain = () -> lastDemand.demand();

public ExtensionStack(WebSocketComponents components, Behavior behavior)
{
Expand Down Expand Up @@ -224,10 +223,9 @@ public void negotiate(List<ExtensionConfig> offeredConfigs, List<ExtensionConfig
ext.setNextOutgoingFrames(outgoing);
outgoing = ext;

if (ext instanceof DemandChain)
if (ext instanceof DemandChain demandingExtension)
{
DemandChain demandingExtension = (DemandChain)ext;
demandingExtension.setNextDemand(demandChain::demand);
demandingExtension.setNextDemand(demandChain);
demandChain = demandingExtension;
}
}
Expand Down Expand Up @@ -273,12 +271,12 @@ public void initialize(IncomingFrames incoming, OutgoingFrames outgoing, CoreSes
}
}

public void demand(long n)
public void demand()
{
demandChain.demand(n);
demandChain.demand();
}

public void setLastDemand(LongConsumer lastDemand)
public void setLastDemand(DemandChain lastDemand)
{
this.lastDemand = lastDemand;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
* or not. The error code will indicate the nature of the close.</li>
* </ul>
* <p>FrameHandler is responsible to manage the demand for more
* WebSocket frames, either directly by calling {@link CoreSession#demand(long)}
* WebSocket frames, either directly by calling {@link CoreSession#demand()}
* or by delegating the demand management to other components.</p>
*/
public interface FrameHandler extends IncomingFrames
Expand All @@ -64,7 +64,7 @@ public interface FrameHandler extends IncomingFrames
* <p>It is allowed to send WebSocket frames via
* {@link CoreSession#sendFrame(Frame, Callback, boolean)}.
* <p>WebSocket frames cannot be received until a call to
* {@link CoreSession#demand(long)} is made.</p>
* {@link CoreSession#demand()} is made.</p>
* <p>If the callback argument is failed, the implementation
* sends a CLOSE frame with {@link CloseStatus#SERVER_ERROR},
* and the connection will be closed.</p>
Expand All @@ -80,7 +80,7 @@ public interface FrameHandler extends IncomingFrames
* <p>This method will never be called concurrently for the
* same session; will be called sequentially to satisfy the
* outstanding demand signaled by calls to
* {@link CoreSession#demand(long)}.</p>
* {@link CoreSession#demand()}.</p>
* <p>Both control and data frames are passed to this method.</p>
* <p>CLOSE frames may be responded from this method, but if
* they are not responded, then the implementation will respond
Expand All @@ -89,7 +89,7 @@ public interface FrameHandler extends IncomingFrames
* that the buffers associated with the frame can be recycled.</p>
* <p>Additional WebSocket frames (of any type, including CLOSE
* frames) cannot be received until a call to
* {@link CoreSession#demand(long)} is made.</p>
* {@link CoreSession#demand()} is made.</p>
*
* @param frame the WebSocket frame.
* @param callback the callback to indicate success or failure of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,21 +343,18 @@ public void run()
fillAndParse();
}

public void demand(long n)
public void demand()
{
if (n <= 0)
throw new IllegalArgumentException("Demand must be positive");

boolean fillAndParse = false;
try (AutoLock l = lock.lock())
{
if (LOG.isDebugEnabled())
LOG.debug("demand {} d={} fp={} {} {}", n, demand, fillingAndParsing, networkBuffer, this);
LOG.debug("demand {} d={} fp={} {}", demand, fillingAndParsing, networkBuffer, this);

if (demand < 0)
return;

demand = MathUtils.cappedAdd(demand, n);
demand = MathUtils.cappedAdd(demand, 1);
gregw marked this conversation as resolved.
Show resolved Hide resolved

if (!fillingAndParsing)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,9 @@ public void onOpen()
}

@Override
public void demand(long n)
public void demand()
{
getExtensionStack().demand(n);
getExtensionStack().demand();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket.core.internal;

import java.nio.ByteBuffer;
import java.util.function.LongConsumer;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.AbstractExtension;
Expand Down Expand Up @@ -55,13 +54,13 @@ protected void forwardFrame(Frame frame, Callback callback, boolean batch)
}

@Override
public void demand(long n)
public void demand()
{
incomingFlusher.demand(n);
incomingFlusher.demand();
}

@Override
public void setNextDemand(LongConsumer nextDemand)
public void setNextDemand(DemandChain nextDemand)
{
incomingFlusher.setNextDemand(nextDemand);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public void onOpen(CoreSession coreSession, Callback callback)

this.coreSession = coreSession;
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}

@Override
Expand Down Expand Up @@ -210,7 +210,7 @@ protected void onTextFrame(Frame frame, Callback callback)
callback.succeeded();
}

coreSession.demand(1);
coreSession.demand();
}
catch (Throwable t)
{
Expand Down Expand Up @@ -244,7 +244,7 @@ protected void onBinaryFrame(Frame frame, Callback callback)
callback.succeeded();
}

coreSession.demand(1);
coreSession.demand();
}
catch (Throwable t)
{
Expand All @@ -264,13 +264,13 @@ protected void onContinuationFrame(Frame frame, Callback callback)

protected void onPingFrame(Frame frame, Callback callback)
{
coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), Callback.from(() -> coreSession.demand(1), callback), false);
coreSession.sendFrame(new Frame(OpCode.PONG, true, frame.getPayload()), Callback.from(() -> coreSession.demand(), callback), false);
}

protected void onPongFrame(Frame frame, Callback callback)
{
callback.succeeded();
coreSession.demand(1);
coreSession.demand();
}

protected void onCloseFrame(Frame frame, Callback callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.Inflater;
Expand Down Expand Up @@ -252,15 +251,15 @@ protected void nextOutgoingFrame(Frame frame, Callback callback, boolean batch)
}

@Override
public void setNextDemand(LongConsumer nextDemand)
public void setNextDemand(DemandChain nextDemand)
{
incomingFlusher.setNextDemand(nextDemand);
}

@Override
public void demand(long n)
public void demand()
{
incomingFlusher.demand(n);
incomingFlusher.demand();
}

private class OutgoingFlusher extends TransformingFlusher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ public boolean isAutoDemand()

/**
* <p>If {@link #isAutoDemand()} then demands for one more WebSocket frame
* via {@link CoreSession#demand(long)}; otherwise it is a no-operation,
* via {@link CoreSession#demand()}; otherwise it is a no-operation,
* because the demand is explicitly managed by the application function.</p>
*/
protected void autoDemand()
{
if (isAutoDemand())
getCoreSession().demand(1);
getCoreSession().demand();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void accept(Frame frame, Callback callback)
if (!frame.isFin() && !frame.hasPayload())
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
return;
}

Expand All @@ -96,7 +96,7 @@ public void accept(Frame frame, Callback callback)
}
else
{
getCoreSession().demand(1);
getCoreSession().demand();
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void accept(Frame frame, Callback callback)
if (!frame.isFin() && !frame.hasPayload())
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
return;
}

Expand All @@ -103,7 +103,7 @@ public void accept(Frame frame, Callback callback)
else
{
// Did not call the application so must explicitly demand here.
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void accept(Frame frame, Callback callback)
if (!frame.isFin() && !frame.hasPayload())
{
callback.succeeded();
session.demand(1);
session.demand();
return;
}

Expand Down Expand Up @@ -228,7 +228,7 @@ private void succeedCurrentEntry()
{
current.callback.succeeded();
if (!current.frame.isFin())
session.demand(1);
session.demand();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public interface MessageSink
* payload is consumed.</p>
* <p>The demand for more frames must be explicitly invoked,
* or arranged to be invoked asynchronously, by the implementation
* of this method, by calling {@link CoreSession#demand(long)}.</p>
* of this method, by calling {@link CoreSession#demand()}.</p>
*
* @param frame the frame to consume
* @param callback the callback to complete when the frame is consumed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void accept(Frame frame, Callback callback)
else
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void accept(Frame frame, Callback callback)
else
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public void accept(Frame frame, Callback callback)
else
{
callback.succeeded();
getCoreSession().demand(1);
getCoreSession().demand();
}
}
catch (Throwable t)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@

package org.eclipse.jetty.websocket.core.util;

import java.util.function.LongConsumer;

import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.ExtensionStack;

Expand All @@ -25,9 +23,9 @@
*/
public interface DemandChain
{
void demand(long n);
void demand();

default void setNextDemand(LongConsumer nextDemand)
default void setNextDemand(DemandChain nextDemand)
{
}
}
Loading