Skip to content

Commit

Permalink
PR #12342 - changes from review
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <[email protected]>
  • Loading branch information
lachlan-roberts committed Nov 12, 2024
1 parent 59d5136 commit 8896b10
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,15 @@ public String toString()

boolean isRsv3();

default CloseStatus getCloseStatus()
{
return null;
}

record CloseStatus(int statusCode, String reason)
{
}

/**
* The effective opcode of the frame accounting for the CONTINUATION opcode.
* If the frame is a CONTINUATION frame for a TEXT message, this will return TEXT.
Expand All @@ -110,122 +119,4 @@ public String toString()
* @return the effective opcode of the frame.
*/
byte getEffectiveOpCode();

class Wrapper implements Frame
{
private final Frame _frame;

public Wrapper(Frame frame)
{
_frame = frame;
}

public Frame getWrapped()
{
return _frame;
}

@Override
public byte[] getMask()
{
return _frame.getMask();
}

@Override
public byte getOpCode()
{
return _frame.getOpCode();
}

@Override
public ByteBuffer getPayload()
{
return _frame.getPayload();
}

@Override
public int getPayloadLength()
{
return _frame.getPayloadLength();
}

@Override
public Type getType()
{
return _frame.getType();
}

@Override
public boolean hasPayload()
{
return _frame.hasPayload();
}

@Override
public boolean isFin()
{
return _frame.isFin();
}

@Override
public boolean isMasked()
{
return _frame.isMasked();
}

@Override
public boolean isRsv1()
{
return _frame.isRsv1();
}

@Override
public boolean isRsv2()
{
return _frame.isRsv2();
}

@Override
public boolean isRsv3()
{
return _frame.isRsv3();
}

@Override
public byte getEffectiveOpCode()
{
return _frame.getEffectiveOpCode();
}
}

static Frame copy(Frame frame)
{
ByteBuffer payloadCopy = copy(frame.getPayload());
return new Frame.Wrapper(frame)
{
@Override
public ByteBuffer getPayload()
{
return payloadCopy;
}

@Override
public int getPayloadLength()
{
return payloadCopy == null ? 0 : payloadCopy.remaining();
}
};
}

private static ByteBuffer copy(ByteBuffer buffer)
{
if (buffer == null)
return null;
int p = buffer.position();
ByteBuffer clone = buffer.isDirect() ? ByteBuffer.allocateDirect(buffer.remaining()) : ByteBuffer.allocate(buffer.remaining());
clone.put(buffer);
clone.flip();
buffer.position(p);
return clone;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ default void onWebSocketOpen(Session session)
* or data frames either BINARY or TEXT.</p>
*
* @param frame the received frame
* @param callback the callback to complete once the frame has been processed.
*/
default void onWebSocketFrame(Frame frame, Callback callback)
{
Expand Down Expand Up @@ -284,6 +285,7 @@ default void onWebSocketPartialText(String payload, boolean last)
* <p>A WebSocket BINARY message has been received.</p>
*
* @param payload the raw payload array received
* @param callback the callback to complete when the payload has been processed
*/
default void onWebSocketBinary(ByteBuffer payload, Callback callback)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.nio.ByteBuffer;

import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;

public class JettyWebSocketFrame implements org.eclipse.jetty.websocket.api.Frame
{
Expand All @@ -26,7 +27,7 @@ public class JettyWebSocketFrame implements org.eclipse.jetty.websocket.api.Fram
* @param frame the core websocket {@link Frame} to wrap as a {@link org.eclipse.jetty.websocket.api.Frame}.
* @deprecated there is no alternative intended to publicly construct a {@link JettyWebSocketFrame}.
*/
@Deprecated
@Deprecated(forRemoval = true, since = "12.1.0")
public JettyWebSocketFrame(Frame frame)
{
this(frame, frame.getOpCode());
Expand Down Expand Up @@ -115,6 +116,15 @@ public byte getEffectiveOpCode()
return effectiveOpCode;
}

@Override
public CloseStatus getCloseStatus()
{
if (getOpCode() != OpCode.CLOSE)
return null;
org.eclipse.jetty.websocket.core.CloseStatus closeStatus = org.eclipse.jetty.websocket.core.CloseStatus.getCloseStatus(frame);
return new CloseStatus(closeStatus.getCode(), closeStatus.getReason());
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -72,6 +73,7 @@ public void onMessage(String message) throws IOException
public static class ListenerSocket implements Session.Listener
{
final List<Frame> frames = new CopyOnWriteArrayList<>();
final List<Callback> callbacks = new CopyOnWriteArrayList<>();
Session session;

@Override
Expand All @@ -84,16 +86,22 @@ public void onWebSocketOpen(Session session)
@Override
public void onWebSocketFrame(Frame frame, Callback callback)
{
frames.add(Frame.copy(frame));
frames.add(frame);
callbacks.add(callback);

// Because no pingListener is registered, the frameListener is responsible for handling pings.
if (frame.getOpCode() == OpCode.PING)
{
session.sendPong(frame.getPayload(), Callback.from(callback, session::demand));
session.sendPong(frame.getPayload(), Callback.from(session::demand, callback::fail));
return;
}
else if (frame.getOpCode() == OpCode.CLOSE)
{
Frame.CloseStatus closeStatus = frame.getCloseStatus();
session.close(closeStatus.statusCode(), closeStatus.reason(), Callback.NOOP);
return;
}

callback.succeed();
session.demand();
}
}
Expand Down Expand Up @@ -226,13 +234,23 @@ public void testNoAutoDemand() throws Exception
Frame frame0 = listenerSocket.frames.get(0);
assertThat(frame0.getType(), is(Frame.Type.PONG));
assertThat(StandardCharsets.UTF_8.decode(frame0.getPayload()).toString(), is("ping-0"));
Callback callback0 = listenerSocket.callbacks.get(0);
assertNotNull(callback0);
callback0.succeed();

Frame frame1 = listenerSocket.frames.get(1);
assertThat(frame1.getType(), is(Frame.Type.PONG));
assertThat(StandardCharsets.UTF_8.decode(frame1.getPayload()).toString(), is("ping-1"));
Callback callback1 = listenerSocket.callbacks.get(1);
assertNotNull(callback1);
callback1.succeed();

session.close();
await().atMost(5, TimeUnit.SECONDS).until(listenerSocket.frames::size, is(3));
assertThat(listenerSocket.frames.get(2).getType(), is(Frame.Type.CLOSE));
Callback closeCallback = listenerSocket.callbacks.get(2);
assertNotNull(closeCallback);
closeCallback.succeed();
}

@Test
Expand Down

0 comments on commit 8896b10

Please sign in to comment.