Skip to content

Commit

Permalink
使用新的序列化方式,减少GC
Browse files Browse the repository at this point in the history
  • Loading branch information
nnhy committed Sep 12, 2024
1 parent 6e999fa commit 6b9197c
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 53 deletions.
52 changes: 35 additions & 17 deletions NewLife.Remoting/ApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ public virtual Int32 InvokeOneWay(String action, Object? args = null, Byte flag
}
finally
{
msg.Payload.TryDispose();

var msCost = st.StopCount(sw) / 1000;
if (SlowTrace > 0 && msCost >= SlowTrace) WriteLog($"慢调用[{action}]({msg}),耗时{msCost:n0}ms");

Expand All @@ -292,25 +294,39 @@ public virtual Int32 InvokeOneWay(String action, Object? args = null, Byte flag
//if (resultType == typeof(Packet)) return rs.Payload;
if (rs.Payload == null) return default;

// 解码响应得到SRMP报文
var message = enc.Decode(rs) ?? throw new InvalidOperationException();

// 是否成功
if (message.Code is not ApiCode.Ok and not ApiCode.Ok200)
throw new ApiException(message.Code, message.Data?.ToStr().Trim('\"') ?? "") { Source = invoker + "/" + action };
try
{
// 解码响应得到SRMP报文
var message = enc.Decode(rs) ?? throw new InvalidOperationException();

if (message.Data == null) return default;
if (resultType == typeof(IPacket)) return (TResult)(Object)message.Data;
if (resultType == typeof(Packet)) return (TResult)(Object)message.Data;
// 是否成功
if (message.Code is not ApiCode.Ok and not ApiCode.Ok200)
throw new ApiException(message.Code, message.Data?.ToStr().Trim('\"') ?? "") { Source = invoker + "/" + action };

// 解码结果
var result = enc.DecodeResult(action, message.Data, rs, resultType);
if (result == null) return default;
if (resultType == typeof(Object)) return (TResult)result;
if (message.Data == null) return default;
if (resultType == typeof(IPacket)) return (TResult)(Object)message.Data;
if (resultType == typeof(Packet)) return (TResult)(Object)message.Data;

// 返回
//return (TResult?)enc.Convert(result, resultType);
return (TResult?)result;
try
{
// 解码结果
var result = enc.DecodeResult(action, message.Data, rs, resultType);
if (result == null) return default;
if (resultType == typeof(Object)) return (TResult)result;

// 返回
//return (TResult?)enc.Convert(result, resultType);
return (TResult?)result;
}
finally
{
message.TryDispose();
}
}
finally
{
rs.Payload.TryDispose();
}
}

/// <summary>单向调用,不等待返回</summary>
Expand Down Expand Up @@ -352,6 +368,8 @@ public Int32 InvokeWithClient(ISocketClient client, String action, Object? args,
}
finally
{
msg.Payload.TryDispose();

var msCost = st.StopCount(sw) / 1000;
if (SlowTrace > 0 && msCost >= SlowTrace) WriteLog($"慢调用[{action}],耗时{msCost:n0}ms");
}
Expand All @@ -371,7 +389,7 @@ private void Client_Received(Object? sender, ReceivedEventArgs e)
// Api解码消息得到Action和参数
if (e.Message is not IMessage msg) return;

var apiMessage = Encoder.Decode(msg);
using var apiMessage = Encoder.Decode(msg);
var e2 = new ApiReceivedEventArgs
{
Remote = sender as ISocketRemote,
Expand Down
4 changes: 4 additions & 0 deletions NewLife.Remoting/ApiNetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,5 +155,9 @@ public Int32 InvokeOneWay(String action, Object? args = null, Byte flag = 0)

throw;
}
finally
{
msg.Payload.TryDispose();
}
}
}
2 changes: 1 addition & 1 deletion NewLife.Remoting/ApiServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ public virtual void Stop(String? reason)
if (msg.Reply) return null;

var enc = session["Encoder"] as IEncoder ?? Encoder;
var request = enc.Decode(msg);
using var request = enc.Decode(msg);
if (request == null) return null;

// 根据动作名,开始跟踪
Expand Down
35 changes: 19 additions & 16 deletions NewLife.Remoting/IEncoder.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
using NewLife.Data;
using System.Text;
using NewLife.Buffers;
using NewLife.Data;
using NewLife.Log;
using NewLife.Messaging;

Expand Down Expand Up @@ -74,30 +76,31 @@ public abstract class EncoderBase
/// <param name="code"></param>
/// <param name="value"></param>
/// <returns></returns>
public virtual IPacket Encode(String action, Int32? code, IPacket? value)
public virtual IOwnerPacket Encode(String action, Int32? code, IPacket? value)
{
// 内存流,前面留空8字节用于协议头4字节(超长8字节)
var ms = new MemoryStream();
ms.Seek(8, SeekOrigin.Begin);
//var ms = new MemoryStream();
//ms.Seek(8, SeekOrigin.Begin);

var len = 8 + 1 + Encoding.UTF8.GetByteCount(action) + 4 + 4;
var pk = new ArrayPacket(len);

// 请求:action + args
// 响应:action + code + result
var writer = new BinaryWriter(ms);
var writer = new SpanWriter(pk.GetSpan());
writer.Advance(8);
writer.Write(action);

// 异常响应才有code。定长4字节
if (code != null && code.Value is not ApiCode.Ok and not 200) writer.Write(code.Value);

// 参数或结果。长度部分定长4字节
var pk = value;
if (pk != null) writer.Write(pk.Total);
if (value != null) writer.Write(value.Total);

var rs = new ArrayPacket(ms.GetBuffer(), 8, (Int32)ms.Length - 8)
{
Next = pk
};
pk = pk.Slice(8, writer.Position - 8);
if (value != null) pk.Next = value;

return rs;
return pk;
}

/// <summary>解码 请求/响应</summary>
Expand All @@ -109,8 +112,8 @@ public virtual IPacket Encode(String action, Int32? code, IPacket? value)

// 请求:action + args
// 响应:action + code + result
var ms = msg.Payload!.GetStream();
var reader = new BinaryReader(ms);
//var ms = msg.Payload!.GetStream();
var reader = new SpanReader(msg.Payload!.GetSpan());

message.Action = reader.ReadString();
if (message.Action.IsNullOrEmpty()) throw new Exception("解码错误,无法找到服务名!");
Expand All @@ -119,10 +122,10 @@ public virtual IPacket Encode(String action, Int32? code, IPacket? value)
if (msg.Reply && msg.Error) message.Code = reader.ReadInt32();

// 参数或结果
if (ms.Length > ms.Position)
if (reader.FreeCapacity > 0)
{
var len = reader.ReadInt32();
if (len > 0) message.Data = msg.Payload.Slice((Int32)ms.Position, len);
if (len > 0) message.Data = msg.Payload.Slice(reader.Position, len);
}

return message;
Expand Down
57 changes: 38 additions & 19 deletions NewLife.Remoting/WsClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ public virtual Int32 InvokeOneWay(String action, Object? args = null, Byte flag
}
finally
{
msg.Payload.TryDispose();

var msCost = st.StopCount(sw) / 1000;
if (SlowTrace > 0 && msCost >= SlowTrace) WriteLog($"慢调用[{action}]({msg}),耗时{msCost:n0}ms");

Expand All @@ -288,24 +290,39 @@ public virtual Int32 InvokeOneWay(String action, Object? args = null, Byte flag
//if (resultType == typeof(IPacket)) return rs.Payload;
if (rs.Payload == null) return default;

// 解码响应得到SRMP报文
var message = enc.Decode(rs) ?? throw new InvalidOperationException();

// 是否成功
if (message.Code is not ApiCode.Ok and not ApiCode.Ok200)
throw new ApiException(message.Code, message.Data?.ToStr().Trim('\"') ?? "") { Source = invoker + "/" + action };
try
{
// 解码响应得到SRMP报文
var message = enc.Decode(rs) ?? throw new InvalidOperationException();

if (message.Data == null) return default;
if (resultType == typeof(IPacket)) return (TResult)(Object)message.Data;
// 是否成功
if (message.Code is not ApiCode.Ok and not ApiCode.Ok200)
throw new ApiException(message.Code, message.Data?.ToStr().Trim('\"') ?? "") { Source = invoker + "/" + action };

// 解码结果
var result = enc.DecodeResult(action, message.Data, rs, resultType);
if (result == null) return default;
if (resultType == typeof(Object)) return (TResult)result;
if (message.Data == null) return default;
if (resultType == typeof(IPacket)) return (TResult)(Object)message.Data;
if (resultType == typeof(Packet)) return (TResult)(Object)message.Data;

// 返回
//return (TResult?)enc.Convert(result, resultType);
return (TResult?)result;
try
{
// 解码结果
var result = enc.DecodeResult(action, message.Data, rs, resultType);
if (result == null) return default;
if (resultType == typeof(Object)) return (TResult)result;

// 返回
//return (TResult?)enc.Convert(result, resultType);
return (TResult?)result;
}
finally
{
message.TryDispose();
}
}
finally
{
rs.Payload.TryDispose();
}
}

/// <summary>单向调用,不等待返回</summary>
Expand Down Expand Up @@ -338,10 +355,10 @@ public Int32 InvokeWithClient(WebSocket client, String action, Object? args, Byt
{
//return client.SendMessage(msg);
var codec = GetMessageCodec();
var pk = codec.Write(null, msg) as IPacket;
client.SendAsync(pk.ToSegment(), WebSocketMessageType.Binary, true, default).Wait();
var pk = codec.Write(null!, msg) as IPacket;
client.SendAsync(pk!.ToSegment(), WebSocketMessageType.Binary, true, default).Wait();

return pk.Total;
return pk!.Total;
}
catch (Exception ex)
{
Expand All @@ -352,6 +369,8 @@ public Int32 InvokeWithClient(WebSocket client, String action, Object? args, Byt
}
finally
{
msg.Payload.TryDispose();

var msCost = st.StopCount(sw) / 1000;
if (SlowTrace > 0 && msCost >= SlowTrace) WriteLog($"慢调用[{action}],耗时{msCost:n0}ms");
}
Expand All @@ -371,7 +390,7 @@ private void Client_Received(Object? sender, ReceivedEventArgs e)
// Api解码消息得到Action和参数
if (e.Message is not IMessage msg) return;

var apiMessage = Encoder.Decode(msg);
using var apiMessage = Encoder.Decode(msg);
var e2 = new ApiReceivedEventArgs
{
Remote = sender as ISocketRemote,
Expand Down

0 comments on commit 6b9197c

Please sign in to comment.