Skip to content

Commit

Permalink
Merge pull request #802 from Cysharp/feature/ImproveHeartbeat
Browse files Browse the repository at this point in the history
Allow timeouts longer than the interval for sending heartbeats
  • Loading branch information
mayuki authored Jul 5, 2024
2 parents 13ece1c + 98b30e7 commit c9f43a4
Show file tree
Hide file tree
Showing 18 changed files with 648 additions and 328 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,20 @@ public StreamingHubMessageType ReadMessageType()
return (clientRequestMessageId, methodId, data.Slice(offset));
}

public ReadOnlyMemory<byte> ReadServerHeartbeat()
public (byte Sequence, ReadOnlyMemory<byte> Metadata) ReadServerHeartbeat()
{
//var type = reader.ReadByte(); // Type is already read by ReadMessageType
reader.Skip(); // Dummy (1)
var sequence = reader.ReadByte(); // Sequence
reader.Skip(); // Dummy (2)
reader.Skip(); // Dummy (3)

return data.Slice((int)reader.Consumed);
return (sequence, data.Slice((int)reader.Consumed));
}

public long ReadClientHeartbeatResponse()
public (byte Sequence, long SentAt) ReadClientHeartbeatResponse()
{
//var type = reader.ReadByte(); // Type is already read by ReadMessageType
reader.Skip(); // Dummy (1)
var sequence = reader.ReadByte(); // Sequence
reader.Skip(); // Dummy (2)
reader.Skip(); // Dummy (3)

Expand All @@ -96,7 +96,7 @@ public long ReadClientHeartbeatResponse()
if (arrayLen == 0) throw new InvalidOperationException("Invalid client heartbeat response. An extra data is empty.");
var sentAt = reader.ReadInt64();

return sentAt;
return (sequence, sentAt);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,83 +199,70 @@ public static void WriteClientResultResponseMessageForError(IBufferWriter<byte>
writer.Flush();
}


// Array(5)[127, Nil, Nil, Nil, <Extra>]
static ReadOnlySpan<byte> ServerHeartbeatMessageForServerToClientHeader => new byte[] { 0x95, 0x7f, 0xc0, 0xc0, 0xc0 };

/// <summary>
/// Writes a server heartbeat message for sending from the server.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteServerHeartbeatMessageHeader(IBufferWriter<byte> bufferWriter)
public static void WriteServerHeartbeatMessageHeader(IBufferWriter<byte> bufferWriter, short sequence)
{
bufferWriter.Write(ServerHeartbeatMessageForServerToClientHeader);
//var writer = new MessagePackWriter(bufferWriter);
//writer.WriteArrayHeader(5);
//writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat)
//writer.WriteNil(); // Dummy
//writer.WriteNil(); // Dummy
//writer.WriteNil(); // Dummy
//writer.Flush();
// // <Metadata>
// Array(5)[127, Sequence(int8), Nil, Nil, <Metadata>]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(5);
writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat)
writer.Write(sequence); // Sequence
writer.WriteNil(); // Dummy
writer.WriteNil(); // Dummy
writer.Flush();
// // <Metadata>
}

// Array(4)[127, Nil, Nil, Nil]
static ReadOnlySpan<byte> ServerHeartbeatMessageForClientToServer => new byte[] { 0x94, 0x7f, 0xc0, 0xc0, 0xc0 };

/// <summary>
/// Writes a server heartbeat message for sending response from the client.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteServerHeartbeatMessageResponse(IBufferWriter<byte> bufferWriter)
public static void WriteServerHeartbeatMessageResponse(IBufferWriter<byte> bufferWriter, short sequence)
{
bufferWriter.Write(ServerHeartbeatMessageForClientToServer);
//var writer = new MessagePackWriter(bufferWriter);
//writer.WriteArrayHeader(4);
//writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat)
//writer.WriteNil(); // Dummy
//writer.WriteNil(); // Dummy
//writer.WriteNil(); // Dummy
//writer.Flush();
// Array(4)[127, Sequence(int8), Nil, Nil]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(4);
writer.Write(0x7f); // Type = 0x7f / 127 (Heartbeat)
writer.Write(sequence); // Sequence
writer.WriteNil(); // Dummy
writer.WriteNil(); // Dummy
writer.Flush();
}

// Array(4)[0x7e(126), Nil, Nil, <Extra>]
static ReadOnlySpan<byte> ClientHeartbeatMessageHeader => new byte[] { 0x94, 0x7e, 0xc0, 0xc0 };

/// <summary>
/// Writes a client heartbeat message for sending from the client.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteClientHeartbeatMessageHeader(IBufferWriter<byte> bufferWriter)
public static void WriteClientHeartbeatMessageHeader(IBufferWriter<byte> bufferWriter, short sequence)
{
bufferWriter.Write(ClientHeartbeatMessageHeader);
//var writer = new MessagePackWriter(bufferWriter);
//writer.WriteArrayHeader(4);
//writer.Write(0x7f); // Type = 0x7e / 126 (ClientHeartbeat)
//writer.WriteNil(); // Dummy
//writer.WriteNil(); // Dummy
//writer.Flush();
// // <Extra>
// Array(4)[0x7e(126), Sequence(int8), Nil, <Extra>]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(4);
writer.Write(0x7e); // Type = 0x7e / 126 (ClientHeartbeat)
writer.Write(sequence); // Sequence
writer.WriteNil(); // Dummy
writer.Flush();
// // <Extra>
}

// Array(5)[0x7e(126), Nil, Nil, Nil, <Extra>]
static ReadOnlySpan<byte> ClientHeartbeatMessageResponseHeader => new byte[] { 0x95, 0x7e, 0xc0, 0xc0, 0xc0 };

/// <summary>
/// Writes a client heartbeat message for sending response from the server.
/// </summary>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public static void WriteClientHeartbeatMessageResponseHeader(IBufferWriter<byte> bufferWriter)
public static void WriteClientHeartbeatMessageResponseHeader(IBufferWriter<byte> bufferWriter, short sequence)
{
bufferWriter.Write(ClientHeartbeatMessageResponseHeader);
//var writer = new MessagePackWriter(bufferWriter);
//writer.WriteArrayHeader(5);
//writer.Write(0x7f); // Type = 0x7e / 126 (Heartbeat)
//writer.WriteNil(); // Dummy
//writer.WriteNil(); // Dummy
//writer.WriteNil(); // Dummy
//writer.Flush();
// // <Extra>
// Array(5)[0x7e(126), Sequence(int8), Nil, Nil, <Extra>]
var writer = new MessagePackWriter(bufferWriter);
writer.WriteArrayHeader(5);
writer.Write(0x7e); // Type = 0x7e / 126 (Heartbeat)
writer.Write(sequence); // Sequence
writer.WriteNil(); // Dummy
writer.WriteNil(); // Dummy
writer.Flush();
// // <Extra>
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,24 @@ public StreamingHubMessageType ReadMessageType()
return (clientResultMessageId, clientMethodId, statusCode, detail, message);
}

public ReadOnlyMemory<byte> ReadClientHeartbeat()
public (short Sequence, ReadOnlyMemory<byte> Extra) ReadClientHeartbeat()
{
// [Nil, Nil, [SentAt(long)]]
// [Sequence(int8), Nil, [SentAt(long)]]
var sequence = reader.ReadInt16(); // Sequence
reader.Skip(); // Dummy
var extra = data.Slice((int)reader.Consumed);

return (sequence, data.Slice((int)reader.Consumed));
}

public short ReadServerHeartbeatResponse()
{
// [Sequence(int8), Nil, Nil]
var sequence = reader.ReadInt16(); // Sequence
reader.Skip(); // Dummy
reader.Skip(); // Dummy

return data.Slice((int)reader.Consumed);
return sequence;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ internal class StreamingHubClientHeartbeatManager : IDisposable
readonly TimeProvider timeProvider;
#endif

SendOrPostCallback? serverHeartbeatCallbackCache;
SendOrPostCallback? clientHeartbeatResponseCallbackCache;
SendOrPostCallback? processServerHeartbeatCoreCache;
SendOrPostCallback? proecssClientHeartbeatResponseCoreCache;
Task? heartbeatLoopTask;
short sequence;
bool isTimeoutTimerRunning;

public CancellationToken TimeoutToken => timeoutTokenSource.Token;

Expand Down Expand Up @@ -90,100 +92,109 @@ await Task.Delay(heartbeatInterval
shutdownTokenSource.Token.ThrowIfCancellationRequested();

// Writes a ClientHeartbeat to the writer queue.
_ = writer.TryWrite(BuildClientHeartbeatMessage());
_ = writer.TryWrite(BuildClientHeartbeatMessage(sequence));

// Start/Restart the timeout cancellation timer.
timeoutTokenSource.CancelAfter(timeoutPeriod);
if (!isTimeoutTimerRunning)
{
// Start/Restart the timeout cancellation timer.
timeoutTokenSource.CancelAfter(timeoutPeriod);
isTimeoutTimerRunning = true;
}

sequence++;
}
}

public void ProcessClientHeartbeatResponse(StreamingHubPayload payload)
{
if (shutdownTokenSource.IsCancellationRequested) return;

// Cancel the running timeout cancellation timer.
timeoutTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
proecssClientHeartbeatResponseCoreCache ??= ProcessClientHeartbeatResponseCore(onClientHeartbeatResponseReceived);

if (onClientHeartbeatResponseReceived is { } heartbeatReceived)
if (synchronizationContext is null)
{
clientHeartbeatResponseCallbackCache ??= CreateClientHeartbeatResponseCallback(heartbeatReceived);

if (synchronizationContext is null)
{
clientHeartbeatResponseCallbackCache(payload);
}
else
{
synchronizationContext.Post(clientHeartbeatResponseCallbackCache, payload);
}
proecssClientHeartbeatResponseCoreCache(payload);
}
else
{
synchronizationContext.Post(proecssClientHeartbeatResponseCoreCache, payload);
}
}

public void ProcessServerHeartbeat(StreamingHubPayload payload)
{
if (shutdownTokenSource.IsCancellationRequested) return;

if (onServerHeartbeatReceived is { } heartbeatReceived)
{
serverHeartbeatCallbackCache ??= CreateServerHeartbeatCallback(heartbeatReceived);
processServerHeartbeatCoreCache ??= ProcessServerHeartbeatCore(onServerHeartbeatReceived);

if (synchronizationContext is null)
{
serverHeartbeatCallbackCache(payload);
}
else
{
synchronizationContext.Post(serverHeartbeatCallbackCache, payload);
}
if (synchronizationContext is null)
{
processServerHeartbeatCoreCache(payload);
}
else
{
synchronizationContext.Post(processServerHeartbeatCoreCache, payload);
}

// Writes a ServerHeartbeatResponse to the writer queue.
_ = writer.TryWrite(BuildServerHeartbeatMessage());
}

SendOrPostCallback CreateClientHeartbeatResponseCallback(Action<ClientHeartbeatEvent> heartbeatReceivedAction) => (state) =>
SendOrPostCallback ProcessClientHeartbeatResponseCore(Action<ClientHeartbeatEvent>? clientHeartbeatReceivedAction) => (state) =>
{
var p = (StreamingHubPayload)state!;

var reader = new StreamingHubClientMessageReader(p.Memory);
var payload = (StreamingHubPayload)state!;
var reader = new StreamingHubClientMessageReader(payload.Memory);
_ = reader.ReadMessageType();
var (sentSequence, sentAt) = reader.ReadClientHeartbeatResponse();

if (sentSequence == (sequence - 1)/* NOTE: Sequence already 1 advanced.*/)
{
// Cancel the running timeout cancellation timer.
timeoutTokenSource.CancelAfter(Timeout.InfiniteTimeSpan);
isTimeoutTimerRunning = false;
}

var now =
#if NET8_0_OR_GREATER
var now = timeProvider.GetUtcNow();
timeProvider.GetUtcNow();
#else
var now = DateTimeOffset.UtcNow;
DateTimeOffset.UtcNow;
#endif
var sentAt = reader.ReadClientHeartbeatResponse();
var elapsed = now.ToUnixTimeMilliseconds() - sentAt;

heartbeatReceivedAction(new ClientHeartbeatEvent(elapsed));
StreamingHubPayloadPool.Shared.Return(p);
clientHeartbeatReceivedAction?.Invoke(new ClientHeartbeatEvent(elapsed));
StreamingHubPayloadPool.Shared.Return(payload);
};

SendOrPostCallback CreateServerHeartbeatCallback(Action<ReadOnlyMemory<byte>> heartbeatReceivedAction) => (state) =>
SendOrPostCallback ProcessServerHeartbeatCore(Action<ReadOnlyMemory<byte>>? serverHeartbeatReceivedAction) => (state) =>
{
var p = (StreamingHubPayload)state!;
var remain = p.Memory.Slice(5); // header
heartbeatReceivedAction(remain);
StreamingHubPayloadPool.Shared.Return(p);
var payload = (StreamingHubPayload)state!;
var reader = new StreamingHubClientMessageReader(payload.Memory);
_ = reader.ReadMessageType();
var (serverSentSequence, metadata) = reader.ReadServerHeartbeat();

serverHeartbeatReceivedAction?.Invoke(metadata);

// Writes a ServerHeartbeatResponse to the writer queue.
_ = writer.TryWrite(BuildServerHeartbeatMessage(serverSentSequence));

StreamingHubPayloadPool.Shared.Return(payload);
};

StreamingHubPayload BuildServerHeartbeatMessage()
StreamingHubPayload BuildServerHeartbeatMessage(short serverSequence)
{
using var buffer = ArrayPoolBufferWriter.RentThreadStaticWriter();
StreamingHubMessageWriter.WriteServerHeartbeatMessageResponse(buffer);
StreamingHubMessageWriter.WriteServerHeartbeatMessageResponse(buffer, serverSequence);
return StreamingHubPayloadPool.Shared.RentOrCreate(buffer.WrittenSpan);
}

StreamingHubPayload BuildClientHeartbeatMessage()
StreamingHubPayload BuildClientHeartbeatMessage(short clientSequence)
{
using var buffer = ArrayPoolBufferWriter.RentThreadStaticWriter();
StreamingHubMessageWriter.WriteClientHeartbeatMessageHeader(buffer);
StreamingHubMessageWriter.WriteClientHeartbeatMessageHeader(buffer, clientSequence);

var now =
#if NET8_0_OR_GREATER
var now = timeProvider.GetUtcNow();
timeProvider.GetUtcNow();
#else
var now = DateTimeOffset.UtcNow;
DateTimeOffset.UtcNow;
#endif

// Extra: [SentAt(long)]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using MagicOnion.Internal;
using System;
using System.Diagnostics;
using System.Threading.Tasks;
using System.Threading.Tasks.Sources;

Expand Down Expand Up @@ -63,6 +64,7 @@ public void Reset()
public short Version
=> core.Version;

[DebuggerNonUserCode]
public T GetResult(short token)
{
try
Expand Down
Loading

0 comments on commit c9f43a4

Please sign in to comment.