Skip to content

Commit

Permalink
Issue #4538 - MessageWriter delegates to MessageOutputStream
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <[email protected]>
  • Loading branch information
lachlan-roberts committed Feb 19, 2020
1 parent e2f86f9 commit 6eccc7e
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void sendFrame(Frame frame, Callback callback, boolean batch)

if (OpCode.isDataFrame(frame.getOpCode()))
{
messageSink.accept(frame, callback);
messageSink.accept(Frame.copy(frame), callback);
if (frame.isFin())
{
messageSink = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public MessageOutputStream(CoreSession coreSession, ByteBufferPool bufferPool)
this.bufferPool = bufferPool;
this.bufferSize = coreSession.getOutputBufferSize();
this.buffer = bufferPool.acquire(bufferSize, true);
BufferUtil.clear(buffer);
}

void setMessageType(byte opcode)
Expand Down Expand Up @@ -93,6 +92,20 @@ public void write(int b) throws IOException
}
}

public void write(ByteBuffer buffer) throws IOException
{
try
{
send(buffer);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}

@Override
public void flush() throws IOException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,12 @@

import java.io.IOException;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;

import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.CoreSession;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand All @@ -44,180 +37,33 @@
*/
public class MessageWriter extends Writer
{
private static final Logger LOG = Log.getLogger(MessageWriter.class);

private final MessageOutputStream outputStream;
private final CharsetEncoder utf8Encoder = UTF_8.newEncoder()
.onUnmappableCharacter(CodingErrorAction.REPORT)
.onMalformedInput(CodingErrorAction.REPORT);

private final CoreSession coreSession;
private long frameCount;
private Frame frame;
private CharBuffer buffer;
private Callback callback;
private boolean closed;

public MessageWriter(CoreSession coreSession, ByteBufferPool bufferPool)
{
this.coreSession = coreSession;
this.buffer = CharBuffer.allocate(coreSession.getOutputBufferSize());
this.frame = new Frame(OpCode.TEXT);
}

@Override
public void write(char[] chars, int off, int len) throws IOException
{
try
{
send(chars, off, len);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
this.outputStream = new MessageOutputStream(coreSession, bufferPool);
this.outputStream.setMessageType(OpCode.TEXT);
}

@Override
public void write(int c) throws IOException
public void write(char[] cbuf, int off, int len) throws IOException
{
try
{
send(new char[]{(char)c}, 0, 1);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
CharBuffer charBuffer = CharBuffer.wrap(cbuf, off, len);
outputStream.write(utf8Encoder.encode(charBuffer));
}

@Override
public void flush() throws IOException
{
try
{
flush(false);
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}

private void flush(boolean fin) throws IOException
{
synchronized (this)
{
if (closed)
throw new IOException("Stream is closed");

closed = fin;

buffer.flip();
ByteBuffer payload = utf8Encoder.encode(buffer);
buffer.flip();

if (LOG.isDebugEnabled())
LOG.debug("flush({}): {}", fin, BufferUtil.toDetailString(payload));
frame.setPayload(payload);
frame.setFin(fin);

FutureCallback b = new FutureCallback();
coreSession.sendFrame(frame, b, false);
b.block();

++frameCount;
// Any flush after the first will be a CONTINUATION frame.
frame = new Frame(OpCode.CONTINUATION);
}
}

private void send(char[] chars, int offset, int length) throws IOException
{
synchronized (this)
{
if (closed)
throw new IOException("Stream is closed");

CharBuffer source = CharBuffer.wrap(chars, offset, length);

int remaining = length;

while (remaining > 0)
{
int read = source.read(buffer);
if (read == -1)
{
return;
}

remaining -= read;

if (remaining > 0)
{
// If we could not write everything, it means
// that the buffer was full, so flush it.
flush(false);
}
}
}
outputStream.flush();
}

@Override
public void close() throws IOException
{
try
{
flush(true);
if (LOG.isDebugEnabled())
LOG.debug("Stream closed, {} frames sent", frameCount);
// Notify without holding locks.
notifySuccess();
}
catch (Throwable x)
{
// Notify without holding locks.
notifyFailure(x);
throw x;
}
}

public void setCallback(Callback callback)
{
synchronized (this)
{
this.callback = callback;
}
}

private void notifySuccess()
{
Callback callback;
synchronized (this)
{
callback = this.callback;
}
if (callback != null)
{
callback.succeeded();
}
}

private void notifyFailure(Throwable failure)
{
Callback callback;
synchronized (this)
{
callback = this.callback;
}
if (callback != null)
{
callback.failed(failure);
}
outputStream.close();
}
}

0 comments on commit 6eccc7e

Please sign in to comment.