Skip to content

Commit

Permalink
Implement high-performance logging (#193)
Browse files Browse the repository at this point in the history
* Implement high-performance logging
  • Loading branch information
Jonnern authored Mar 22, 2024
1 parent 108c8fc commit de72fe4
Show file tree
Hide file tree
Showing 11 changed files with 1,293 additions and 112 deletions.
5 changes: 3 additions & 2 deletions CryptoExchange.Net/Clients/BaseSocketClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Text;
using System.Threading.Tasks;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects.Sockets;
using Microsoft.Extensions.Logging;

Expand Down Expand Up @@ -63,7 +64,7 @@ public virtual async Task UnsubscribeAsync(UpdateSubscription subscription)
if (subscription == null)
throw new ArgumentNullException(nameof(subscription));

_logger.Log(LogLevel.Information, $"Socket {subscription.SocketId} Unsubscribing subscription " + subscription.Id);
_logger.UnsubscribingSubscription(subscription.SocketId, subscription.Id);
await subscription.CloseAsync().ConfigureAwait(false);
}

Expand All @@ -86,7 +87,7 @@ public virtual async Task UnsubscribeAllAsync()
/// <returns></returns>
public virtual async Task ReconnectAsync()
{
_logger.Log(LogLevel.Information, $"Reconnecting all {CurrentConnections} connections");
_logger.ReconnectingAllConnections(CurrentConnections);
var tasks = new List<Task>();
foreach (var client in ApiClients.OfType<SocketApiClient>())
{
Expand Down
17 changes: 9 additions & 8 deletions CryptoExchange.Net/Clients/RestApiClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Threading.Tasks;
using CryptoExchange.Net.Converters.JsonNet;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Requests;
Expand Down Expand Up @@ -152,9 +153,9 @@ protected virtual async Task<WebCallResult> SendRequestAsync(

var result = await GetResponseAsync<object>(request.Data, cancellationToken).ConfigureAwait(false);
if (!result)
_logger.Log(LogLevel.Warning, $"[Req {result.RequestId}] {result.ResponseStatusCode} Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}");
_logger.RestApiErrorReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), result.Error?.ToString());
else
_logger.Log(LogLevel.Debug, $"[Req {result.RequestId}] {result.ResponseStatusCode} Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? ": " + result.OriginalData : "")}");
_logger.RestApiResponseReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), OutputOriginalData ? result.OriginalData : "[Data only available when OutputOriginal = true]");

if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false))
continue;
Expand Down Expand Up @@ -204,9 +205,9 @@ protected virtual async Task<WebCallResult<T>> SendRequestAsync<T>(

var result = await GetResponseAsync<T>(request.Data, cancellationToken).ConfigureAwait(false);
if (!result)
_logger.Log(LogLevel.Warning, $"[Req {result.RequestId}] {result.ResponseStatusCode} Error received in {result.ResponseTime!.Value.TotalMilliseconds}ms: {result.Error}");
_logger.RestApiErrorReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), result.Error?.ToString());
else
_logger.Log(LogLevel.Debug, $"[Req {result.RequestId}] {result.ResponseStatusCode} Response received in {result.ResponseTime!.Value.TotalMilliseconds}ms{(OutputOriginalData ? ": " + result.OriginalData : "")}");
_logger.RestApiResponseReceived(result.RequestId, result.ResponseStatusCode, (long)Math.Floor(result.ResponseTime!.Value.TotalMilliseconds), OutputOriginalData ? result.OriginalData : "[Data only available when OutputOriginal = true]");

if (await ShouldRetryRequestAsync(result, currentTry).ConfigureAwait(false))
continue;
Expand Down Expand Up @@ -256,7 +257,7 @@ protected virtual async Task<CallResult<IRequest>> PrepareRequestAsync(
var syncTimeResult = await syncTask.ConfigureAwait(false);
if (!syncTimeResult)
{
_logger.Log(LogLevel.Debug, $"[Req {requestId}] Failed to sync time, aborting request: " + syncTimeResult.Error);
_logger.RestApiFailedToSyncTime(requestId, syncTimeResult.Error!.ToString());
return syncTimeResult.As<IRequest>(default);
}
}
Expand All @@ -274,11 +275,11 @@ protected virtual async Task<CallResult<IRequest>> PrepareRequestAsync(

if (signed && AuthenticationProvider == null)
{
_logger.Log(LogLevel.Warning, $"[Req {requestId}] Request {uri.AbsolutePath} failed because no ApiCredentials were provided");
_logger.RestApiNoApiCredentials(requestId, uri.AbsolutePath);
return new CallResult<IRequest>(new NoApiCredentialsError());
}

_logger.Log(LogLevel.Information, $"[Req {requestId}] Creating request for " + uri);
_logger.RestApiCreatingRequest(requestId, uri);
var paramsPosition = parameterPosition ?? ParameterPositions[method];
var request = ConstructRequest(uri, method, parameters?.OrderBy(p => p.Key).ToDictionary(p => p.Key, p => p.Value), signed, paramsPosition, arraySerialization ?? ArraySerialization, requestBodyFormat ?? RequestBodyFormat, requestId, additionalHeaders);

Expand All @@ -291,7 +292,7 @@ protected virtual async Task<CallResult<IRequest>> PrepareRequestAsync(
paramString += " with headers " + string.Join(", ", headers.Select(h => h.Key + $"=[{string.Join(",", h.Value)}]"));

TotalRequestsMade++;
_logger.Log(LogLevel.Trace, $"[Req {requestId}] Sending {method}{(signed ? " signed" : "")} request to {request.Uri}{paramString ?? " "}");
_logger.RestApiSendingRequest(requestId, method, signed ? "signed": "", request.Uri, paramString);
return new CallResult<IRequest>(request);
}

Expand Down
38 changes: 18 additions & 20 deletions CryptoExchange.Net/Clients/SocketApiClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using CryptoExchange.Net.Converters.JsonNet;
using CryptoExchange.Net.Interfaces;
using CryptoExchange.Net.Logging.Extensions;
using CryptoExchange.Net.Objects;
using CryptoExchange.Net.Objects.Options;
using CryptoExchange.Net.Objects.Sockets;
Expand All @@ -8,10 +9,7 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
Expand Down Expand Up @@ -214,7 +212,7 @@ protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync(stri
var success = socketConnection.AddSubscription(subscription);
if (!success)
{
_logger.Log(LogLevel.Trace, $"[Sckt {socketConnection.SocketId}] failed to add subscription, retrying on different connection");
_logger.FailedToAddSubscriptionRetryOnDifferentConnection(socketConnection.SocketId);
continue;
}

Expand Down Expand Up @@ -242,7 +240,7 @@ protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync(stri

if (socketConnection.PausedActivity)
{
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] has been paused, can't subscribe at this moment");
_logger.HasBeenPausedCantSubscribeAtThisMoment(socketConnection.SocketId);
return new CallResult<UpdateSubscription>(new ServerError("Socket is paused"));
}

Expand All @@ -255,7 +253,7 @@ protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync(stri
if (!subResult)
{
waitEvent?.Set();
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] failed to subscribe: {subResult.Error}");
_logger.FailedToSubscribe(socketConnection.SocketId, subResult.Error?.ToString());
// If this was a timeout we still need to send an unsubscribe to prevent messages coming in later
var unsubscribe = subResult.Error is CancellationRequestedError;
await socketConnection.CloseAsync(subscription, unsubscribe).ConfigureAwait(false);
Expand All @@ -270,13 +268,13 @@ protected virtual async Task<CallResult<UpdateSubscription>> SubscribeAsync(stri
{
subscription.CancellationTokenRegistration = ct.Register(async () =>
{
_logger.Log(LogLevel.Information, $"[Sckt {socketConnection.SocketId}] Cancellation token set, closing subscription {subscription.Id}");
_logger.CancellationTokenSetClosingSubscription(socketConnection.SocketId, subscription.Id);
await socketConnection.CloseAsync(subscription).ConfigureAwait(false);
}, false);
}

waitEvent?.Set();
_logger.Log(LogLevel.Information, $"[Sckt {socketConnection.SocketId}] subscription {subscription.Id} completed successfully");
_logger.SubscriptionCompletedSuccessfully(socketConnection.SocketId, subscription.Id);
return new CallResult<UpdateSubscription>(new UpdateSubscription(socketConnection, subscription));
}

Expand Down Expand Up @@ -333,7 +331,7 @@ protected virtual async Task<CallResult<T>> QueryAsync<T>(string url, Query<T> q

if (socketConnection.PausedActivity)
{
_logger.Log(LogLevel.Warning, $"[Sckt {socketConnection.SocketId}] has been paused, can't send query at this moment");
_logger.HasBeenPausedCantSendQueryAtThisMoment(socketConnection.SocketId);
return new CallResult<T>(new ServerError("Socket is paused"));
}

Expand Down Expand Up @@ -374,15 +372,15 @@ public virtual async Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnec
if (AuthenticationProvider == null)
return new CallResult<bool>(new NoApiCredentialsError());

_logger.Log(LogLevel.Debug, $"[Sckt {socket.SocketId}] Attempting to authenticate");
_logger.AttemptingToAuthenticate(socket.SocketId);
var authRequest = GetAuthenticationRequest();
if (authRequest != null)
{
var result = await socket.SendAndWaitQueryAsync(authRequest).ConfigureAwait(false);

if (!result)
{
_logger.Log(LogLevel.Warning, $"[Sckt {socket.SocketId}] authentication failed");
_logger.AuthenticationFailed(socket.SocketId);
if (socket.Connected)
await socket.CloseAsync().ConfigureAwait(false);

Expand All @@ -391,7 +389,7 @@ public virtual async Task<CallResult<bool>> AuthenticateSocketAsync(SocketConnec
}
}

_logger.Log(LogLevel.Debug, $"[Sckt {socket.SocketId}] authenticated");
_logger.Authenticated(socket.SocketId);
socket.Authenticated = true;
return new CallResult<bool>(true);
}
Expand Down Expand Up @@ -467,12 +465,12 @@ protected virtual async Task<CallResult<SocketConnection>> GetSocketConnection(s
var connectionAddress = await GetConnectionUrlAsync(address, authenticated).ConfigureAwait(false);
if (!connectionAddress)
{
_logger.Log(LogLevel.Warning, $"Failed to determine connection url: " + connectionAddress.Error);
_logger.FailedToDetermineConnectionUrl(connectionAddress.Error?.ToString());
return connectionAddress.As<SocketConnection>(null);
}

if (connectionAddress.Data != address)
_logger.Log(LogLevel.Debug, $"Connection address set to " + connectionAddress.Data);
_logger.ConnectionAddressSetTo(connectionAddress.Data!);

// Create new socket
var socket = CreateSocket(connectionAddress.Data!);
Expand Down Expand Up @@ -536,7 +534,7 @@ protected virtual WebSocketParameters GetWebSocketParameters(string address)
protected virtual IWebsocket CreateSocket(string address)
{
var socket = SocketFactory.CreateWebsocket(_logger, GetWebSocketParameters(address));
_logger.Log(LogLevel.Debug, $"[Sckt {socket.Id}] created for " + address);
_logger.SocketCreatedForAddress(socket.Id, address);
return socket;
}

Expand All @@ -562,7 +560,7 @@ public virtual async Task<bool> UnsubscribeAsync(int subscriptionId)
if (subscription == null || connection == null)
return false;

_logger.Log(LogLevel.Information, $"[Sckt {connection.SocketId}] unsubscribing subscription " + subscriptionId);
_logger.UnsubscribingSubscription(connection.SocketId, subscriptionId);
await connection.CloseAsync(subscription).ConfigureAwait(false);
return true;
}
Expand All @@ -577,7 +575,7 @@ public virtual async Task UnsubscribeAsync(UpdateSubscription subscription)
if (subscription == null)
throw new ArgumentNullException(nameof(subscription));

_logger.Log(LogLevel.Information, $"[Sckt {subscription.SocketId}] Unsubscribing subscription " + subscription.Id);
_logger.UnsubscribingSubscription(subscription.SocketId, subscription.Id);
await subscription.CloseAsync().ConfigureAwait(false);
}

Expand All @@ -591,7 +589,7 @@ public virtual async Task UnsubscribeAllAsync()
if (sum == 0)
return;

_logger.Log(LogLevel.Information, $"Unsubscribing all {socketConnections.Sum(s => s.Value.UserSubscriptionCount)} subscriptions");
_logger.UnsubscribingAll(socketConnections.Sum(s => s.Value.UserSubscriptionCount));
var tasks = new List<Task>();
{
var socketList = socketConnections.Values;
Expand All @@ -608,7 +606,7 @@ public virtual async Task UnsubscribeAllAsync()
/// <returns></returns>
public virtual async Task ReconnectAsync()
{
_logger.Log(LogLevel.Information, $"Reconnecting all {socketConnections.Count} connections");
_logger.ReconnectingAllConnections(socketConnections.Count);
var tasks = new List<Task>();
{
var socketList = socketConnections.Values;
Expand Down Expand Up @@ -660,7 +658,7 @@ public override void Dispose()
_disposing = true;
if (socketConnections.Sum(s => s.Value.UserSubscriptionCount) > 0)
{
_logger.Log(LogLevel.Debug, "Disposing socket client, closing all subscriptions");
_logger.DisposingSocketClient();
_ = UnsubscribeAllAsync();
}
semaphoreSlim?.Dispose();
Expand Down
Loading

0 comments on commit de72fe4

Please sign in to comment.