Skip to content

Commit

Permalink
Revert Serializing Outbound Transport Messages on IO Threads (#64632) (
Browse files Browse the repository at this point in the history
…#64654)

Serializing outbound transport message on the IO loop was introduced in #56961. Unfortunately it turns out that this is incompatible with assumptions made by CCR code here: https://github.com/elastic/elasticsearch/blob/f22ddf822e24bb17f1772c3bacb7ee97a00339b8/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/repositories/GetCcrRestoreFileChunkAction.java#L60-L61 and that are not easy to work around on short notice.

Raising reverting this move (as a temporary solution, it's still a valuable change long-term) as a blocker therefore as this seriously affects the stability of the initial phase of the CCR following by causing corrupted bytes to be send to the follower.
  • Loading branch information
original-brownbear authored Nov 5, 2020
1 parent 9e4105e commit 51e9d6f
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,9 @@
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.OutboundHandler;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.Transports;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.Queue;
Expand Down Expand Up @@ -93,15 +91,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
assert msg instanceof OutboundHandler.SendContext;
assert msg instanceof ByteBuf;
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
final boolean queued = queuedWrites.offer(new WriteOperation((OutboundHandler.SendContext) msg, promise));
final boolean queued = queuedWrites.offer(new WriteOperation((ByteBuf) msg, promise));
assert queued;
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
}

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws IOException {
public void channelWritabilityChanged(ChannelHandlerContext ctx) {
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
if (ctx.channel().isWritable()) {
doFlush(ctx);
Expand All @@ -110,7 +108,7 @@ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws IOExcept
}

@Override
public void flush(ChannelHandlerContext ctx) throws IOException {
public void flush(ChannelHandlerContext ctx) {
assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
Channel channel = ctx.channel();
if (channel.isWritable() || channel.isActive() == false) {
Expand All @@ -126,7 +124,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}

private void doFlush(ChannelHandlerContext ctx) throws IOException {
private void doFlush(ChannelHandlerContext ctx) {
assert ctx.executor().inEventLoop();
final Channel channel = ctx.channel();
if (channel.isActive() == false) {
Expand All @@ -144,25 +142,24 @@ private void doFlush(ChannelHandlerContext ctx) throws IOException {
break;
}
final WriteOperation write = currentWrite;
final ByteBuf currentBuffer = write.buffer();
if (currentBuffer.readableBytes() == 0) {
if (write.buf.readableBytes() == 0) {
write.promise.trySuccess();
currentWrite = null;
continue;
}
final int readableBytes = currentBuffer.readableBytes();
final int readableBytes = write.buf.readableBytes();
final int bufferSize = Math.min(readableBytes, 1 << 18);
final int readerIndex = currentBuffer.readerIndex();
final int readerIndex = write.buf.readerIndex();
final boolean sliced = readableBytes != bufferSize;
final ByteBuf writeBuffer;
if (sliced) {
writeBuffer = currentBuffer.retainedSlice(readerIndex, bufferSize);
currentBuffer.readerIndex(readerIndex + bufferSize);
writeBuffer = write.buf.retainedSlice(readerIndex, bufferSize);
write.buf.readerIndex(readerIndex + bufferSize);
} else {
writeBuffer = currentBuffer;
writeBuffer = write.buf;
}
final ChannelFuture writeFuture = ctx.write(writeBuffer);
if (sliced == false || currentBuffer.readableBytes() == 0) {
if (sliced == false || write.buf.readableBytes() == 0) {
currentWrite = null;
writeFuture.addListener(future -> {
assert ctx.executor().inEventLoop();
Expand Down Expand Up @@ -197,24 +194,13 @@ private void failQueuedWrites() {

private static final class WriteOperation {

private ByteBuf buf;

private OutboundHandler.SendContext context;
private final ByteBuf buf;

private final ChannelPromise promise;

WriteOperation(OutboundHandler.SendContext context, ChannelPromise promise) {
this.context = context;
WriteOperation(ByteBuf buf, ChannelPromise promise) {
this.buf = buf;
this.promise = promise;
}

ByteBuf buffer() throws IOException {
if (buf == null) {
buf = Netty4Utils.toByteBuf(context.get());
context = null;
}
assert context == null;
return buf;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.concurrent.CompletableContext;
import org.elasticsearch.transport.OutboundHandler;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TransportException;

Expand Down Expand Up @@ -142,11 +142,11 @@ public InetSocketAddress getRemoteAddress() {
}

@Override
public void sendMessage(OutboundHandler.SendContext sendContext) {
channel.writeAndFlush(sendContext, addPromise(sendContext, channel));
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
channel.writeAndFlush(Netty4Utils.toByteBuf(reference), addPromise(listener, channel));

if (channel.eventLoop().isShutdown()) {
sendContext.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
listener.onFailure(new TransportException("Cannot send message, event loop is shutting down."));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.nio.NioSocketChannel;
import org.elasticsearch.transport.OutboundHandler;
import org.elasticsearch.transport.TcpChannel;

import java.io.IOException;
import java.nio.channels.SocketChannel;

public class NioTcpChannel extends NioSocketChannel implements TcpChannel {
Expand All @@ -40,16 +38,8 @@ public NioTcpChannel(boolean isServer, String profile, SocketChannel socketChann
this.profile = profile;
}

@Override
public void sendMessage(OutboundHandler.SendContext sendContext) {
final BytesReference message;
try {
message = sendContext.get();
} catch (IOException e) {
sendContext.onFailure(e);
return;
}
getContext().sendMessage(BytesReference.toByteBuffers(message), ActionListener.toBiConsumer(sendContext));
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.io.IOException;
import java.util.Set;

public final class OutboundHandler {
final class OutboundHandler {

private static final Logger logger = LogManager.getLogger(OutboundHandler.class);

Expand All @@ -66,7 +66,12 @@ public final class OutboundHandler {

void sendBytes(TcpChannel channel, BytesReference bytes, ActionListener<Void> listener) {
SendContext sendContext = new SendContext(channel, () -> bytes, listener);
internalSend(channel, sendContext);
try {
internalSend(channel, sendContext);
} catch (IOException e) {
// This should not happen as the bytes are already serialized
throw new AssertionError(e);
}
}

/**
Expand Down Expand Up @@ -120,17 +125,17 @@ private void sendMessage(TcpChannel channel, OutboundMessage networkMessage, Act
internalSend(channel, sendContext);
}

private void internalSend(TcpChannel channel, SendContext sendContext) {
private void internalSend(TcpChannel channel, SendContext sendContext) throws IOException {
channel.getChannelStats().markAccessed(threadPool.relativeTimeInMillis());
BytesReference reference = sendContext.get();
// stash thread context so that channel event loop is not polluted by thread context
try (ThreadContext.StoredContext existing = threadPool.getThreadContext().stashContext()) {
channel.sendMessage(sendContext);
channel.sendMessage(reference, sendContext);
} catch (RuntimeException ex) {
sendContext.onFailure(ex);
CloseableChannel.closeChannel(channel);
throw ex;
}

}

void setMessageListener(TransportMessageListener listener) {
Expand All @@ -143,7 +148,7 @@ void setMessageListener(TransportMessageListener listener) {

private static class MessageSerializer implements CheckedSupplier<BytesReference, IOException>, Releasable {

private OutboundMessage message;
private final OutboundMessage message;
private final BigArrays bigArrays;
private volatile ReleasableBytesStreamOutput bytesStreamOutput;

Expand All @@ -154,12 +159,8 @@ private MessageSerializer(OutboundMessage message, BigArrays bigArrays) {

@Override
public BytesReference get() throws IOException {
try {
bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays);
return message.serialize(bytesStreamOutput);
} finally {
message = null;
}
bytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays);
return message.serialize(bytesStreamOutput);
}

@Override
Expand All @@ -168,10 +169,10 @@ public void close() {
}
}

public class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {
private class SendContext extends NotifyOnceListener<Void> implements CheckedSupplier<BytesReference, IOException> {

private final TcpChannel channel;
private CheckedSupplier<BytesReference, IOException> messageSupplier;
private final CheckedSupplier<BytesReference, IOException> messageSupplier;
private final ActionListener<Void> listener;
private final Releasable optionalReleasable;
private long messageSize = -1;
Expand All @@ -189,13 +190,10 @@ private SendContext(TcpChannel channel, CheckedSupplier<BytesReference, IOExcept
this.optionalReleasable = optionalReleasable;
}

@Override
public BytesReference get() throws IOException {
BytesReference message;
try {
assert messageSupplier != null;
message = messageSupplier.get();
messageSupplier = null;
messageSize = message.length();
TransportLogger.logOutboundMessage(channel, message);
return message;
Expand All @@ -214,7 +212,6 @@ protected void innerOnResponse(Void v) {

@Override
protected void innerOnFailure(Exception e) {
messageSupplier = null;
if (NetworkExceptionHelper.isCloseConnectionException(e)) {
logger.debug(() -> new ParameterizedMessage("send message failed [channel: {}]", channel), e);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.transport;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.network.CloseableChannel;
import org.elasticsearch.common.unit.TimeValue;

Expand Down Expand Up @@ -58,11 +59,13 @@ public interface TcpChannel extends CloseableChannel {
InetSocketAddress getRemoteAddress();

/**
* Sends a tcp message to the channel.
* Sends a tcp message to the channel. The listener will be executed once the send process has been
* completed.
*
* @param sendContext Send Context
* @param reference to send to channel
* @param listener to execute upon send completion
*/
void sendMessage(OutboundHandler.SendContext sendContext);
void sendMessage(BytesReference reference, ActionListener<Void> listener);

/**
* Adds a listener that will be executed when the channel is connected. If the channel is still
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.concurrent.CompletableContext;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -89,14 +88,9 @@ public InetSocketAddress getRemoteAddress() {
}

@Override
public void sendMessage(OutboundHandler.SendContext sendContext) {
try {
messageCaptor.set(sendContext.get());
} catch (IOException e) {
sendContext.onFailure(e);
return;
}
listenerCaptor.set(sendContext);
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
messageCaptor.set(reference);
listenerCaptor.set(listener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.InboundPipeline;
import org.elasticsearch.transport.OutboundHandler;
import org.elasticsearch.transport.StatsTracker;
import org.elasticsearch.transport.TcpChannel;
import org.elasticsearch.transport.TcpServerChannel;
Expand Down Expand Up @@ -366,15 +365,8 @@ public ChannelStats getChannelStats() {
}

@Override
public void sendMessage(OutboundHandler.SendContext sendContext) {
final BytesReference message;
try {
message = sendContext.get();
} catch (IOException e) {
sendContext.onFailure(e);
return;
}
getContext().sendMessage(BytesReference.toByteBuffers(message), ActionListener.toBiConsumer(sendContext));
public void sendMessage(BytesReference reference, ActionListener<Void> listener) {
getContext().sendMessage(BytesReference.toByteBuffers(reference), ActionListener.toBiConsumer(listener));
}
}

Expand Down

0 comments on commit 51e9d6f

Please sign in to comment.