Skip to content

Commit

Permalink
Merge pull request #73432 from dotnet/revert-73287-TelemetryFlushUsin…
Browse files Browse the repository at this point in the history
…gAsyncBatchingWorkQueue

Revert "Move AsyncBatchingWorkQueue usage in telemetry to TelemetryLogging level"
  • Loading branch information
arunchndr authored May 10, 2024
2 parents 18614bc + 4bbcd1d commit 324e078
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public RequestTelemetryLogger(string serverTypeName)
_requestCounters = new();
_findDocumentResults = new();
_usedForkedSolutionCounter = new();

TelemetryLogging.Flushed += OnFlushed;
}

public void UpdateFindDocumentTelemetryData(bool success, string? workspaceKind)
Expand Down Expand Up @@ -94,14 +92,6 @@ public void Dispose()
return;
}

// Flush all telemetry logged through TelemetryLogging
TelemetryLogging.Flush();

TelemetryLogging.Flushed -= OnFlushed;
}

private void OnFlushed(object? sender, EventArgs e)
{
foreach (var kvp in _requestCounters)
{
TelemetryLogging.Log(FunctionId.LSP_RequestCounter, KeyValueLogMessage.Create(LogType.Trace, m =>
Expand Down Expand Up @@ -134,6 +124,9 @@ private void OnFlushed(object? sender, EventArgs e)
}
}));

// Flush all telemetry logged through TelemetryLogging
TelemetryLogging.Flush();

_requestCounters.Clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ internal sealed class AggregatingTelemetryLog : ITelemetryLog
private readonly HistogramConfiguration? _histogramConfiguration;
private readonly string _eventName;
private readonly FunctionId _functionId;
private readonly AggregatingTelemetryLogManager _aggregatingTelemetryLogManager;
private readonly object _flushLock;

private ImmutableDictionary<string, (IHistogram<long> Histogram, TelemetryEvent TelemetryEvent, object Lock)> _histograms = ImmutableDictionary<string, (IHistogram<long>, TelemetryEvent, object)>.Empty;
Expand All @@ -39,7 +40,7 @@ internal sealed class AggregatingTelemetryLog : ITelemetryLog
/// <param name="functionId">Used to derive meter name</param>
/// <param name="bucketBoundaries">Optional values indicating bucket boundaries in milliseconds. If not specified,
/// all histograms created will use the default histogram configuration</param>
public AggregatingTelemetryLog(TelemetrySession session, FunctionId functionId, double[]? bucketBoundaries)
public AggregatingTelemetryLog(TelemetrySession session, FunctionId functionId, double[]? bucketBoundaries, AggregatingTelemetryLogManager aggregatingTelemetryLogManager)
{
var meterName = TelemetryLogger.GetPropertyName(functionId, "meter");
var meterProvider = new VSTelemetryMeterProvider();
Expand All @@ -48,6 +49,7 @@ public AggregatingTelemetryLog(TelemetrySession session, FunctionId functionId,
_meter = meterProvider.CreateMeter(meterName, version: MeterVersion);
_eventName = TelemetryLogger.GetEventName(functionId);
_functionId = functionId;
_aggregatingTelemetryLogManager = aggregatingTelemetryLogManager;
_flushLock = new();

if (bucketBoundaries != null)
Expand Down Expand Up @@ -102,6 +104,8 @@ public void Log(KeyValueLogMessage logMessage)
{
histogram.Record(value);
}

_aggregatingTelemetryLogManager.EnsureTelemetryWorkQueued();
}

public IDisposable? LogBlockTime(KeyValueLogMessage logMessage, int minThresholdMs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,62 @@
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.

using System;
using System.Collections.Immutable;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.Internal.Log;
using Microsoft.CodeAnalysis.Shared.TestHooks;
using Microsoft.VisualStudio.Telemetry;
using Roslyn.Utilities;

namespace Microsoft.CodeAnalysis.Telemetry;

/// <summary>
/// Manages creation and obtaining aggregated telemetry logs.
/// Manages creation and obtaining aggregated telemetry logs. Also, notifies logs to
/// send aggregated events every 30 minutes.
/// </summary>
internal sealed class AggregatingTelemetryLogManager
{
private static readonly TimeSpan s_batchedTelemetryCollectionPeriod = TimeSpan.FromMinutes(30);

private readonly TelemetrySession _session;
private readonly AsyncBatchingWorkQueue _postTelemetryQueue;

private ImmutableDictionary<FunctionId, AggregatingTelemetryLog> _aggregatingLogs = ImmutableDictionary<FunctionId, AggregatingTelemetryLog>.Empty;

public AggregatingTelemetryLogManager(TelemetrySession session)
public AggregatingTelemetryLogManager(TelemetrySession session, IAsynchronousOperationListener asyncListener)
{
_session = session;

_postTelemetryQueue = new AsyncBatchingWorkQueue(
s_batchedTelemetryCollectionPeriod,
PostCollectedTelemetryAsync,
asyncListener,
CancellationToken.None);
}

public ITelemetryLog? GetLog(FunctionId functionId, double[]? bucketBoundaries)
{
if (!_session.IsOptedIn)
return null;

return ImmutableInterlocked.GetOrAdd(
ref _aggregatingLogs,
functionId,
static (functionId, arg) => new AggregatingTelemetryLog(arg._session, functionId, arg.bucketBoundaries),
factoryArgument: (_session, bucketBoundaries));
return ImmutableInterlocked.GetOrAdd(ref _aggregatingLogs, functionId, functionId => new AggregatingTelemetryLog(_session, functionId, bucketBoundaries, this));
}

public void EnsureTelemetryWorkQueued()
{
// Ensure PostCollectedTelemetryAsync will get fired after the collection period.
_postTelemetryQueue.AddWork();
}

private ValueTask PostCollectedTelemetryAsync(CancellationToken token)
{
token.ThrowIfCancellationRequested();

Flush();

return ValueTaskFactory.CompletedTask;
}

public void Flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ internal sealed class TelemetryLogProvider : ITelemetryLogProvider
private readonly AggregatingTelemetryLogManager _aggregatingTelemetryLogManager;
private readonly VisualStudioTelemetryLogManager _visualStudioTelemetryLogManager;

private TelemetryLogProvider(TelemetrySession session, ILogger telemetryLogger)
private TelemetryLogProvider(TelemetrySession session, ILogger telemetryLogger, IAsynchronousOperationListener asyncListener)
{
_aggregatingTelemetryLogManager = new AggregatingTelemetryLogManager(session);
_aggregatingTelemetryLogManager = new AggregatingTelemetryLogManager(session, asyncListener);
_visualStudioTelemetryLogManager = new VisualStudioTelemetryLogManager(session, telemetryLogger);
}

public static TelemetryLogProvider Create(TelemetrySession session, ILogger telemetryLogger, IAsynchronousOperationListener asyncListener)
{
var logProvider = new TelemetryLogProvider(session, telemetryLogger);
var logProvider = new TelemetryLogProvider(session, telemetryLogger, asyncListener);

TelemetryLogging.SetLogProvider(logProvider, asyncListener);
TelemetryLogging.SetLogProvider(logProvider);

return logProvider;
}
Expand Down
34 changes: 2 additions & 32 deletions src/Workspaces/Core/Portable/Telemetry/TelemetryLogging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,26 @@
// See the LICENSE file in the project root for more information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.CodeAnalysis.Internal.Log;
using Microsoft.CodeAnalysis.Shared.TestHooks;
using Roslyn.Utilities;

namespace Microsoft.CodeAnalysis.Telemetry;

/// <summary>
/// Provides access to posting telemetry events or adding information
/// to aggregated telemetry events. Posts pending telemetry at 30
/// minute intervals.
/// to aggregated telemetry events.
/// </summary>
internal static class TelemetryLogging
{
private static ITelemetryLogProvider? s_logProvider;
private static AsyncBatchingWorkQueue? s_postTelemetryQueue;

public const string KeyName = "Name";
public const string KeyValue = "Value";
public const string KeyLanguageName = "LanguageName";
public const string KeyMetricName = "MetricName";

public static event EventHandler<EventArgs>? Flushed;

public static void SetLogProvider(ITelemetryLogProvider logProvider, IAsynchronousOperationListener asyncListener)
public static void SetLogProvider(ITelemetryLogProvider logProvider)
{
s_logProvider = logProvider;

InterlockedOperations.Initialize(ref s_postTelemetryQueue, () =>
new AsyncBatchingWorkQueue(
TimeSpan.FromMinutes(30),
PostCollectedTelemetryAsync,
asyncListener,
CancellationToken.None));

// Add the initial item to the queue to ensure later processing.
s_postTelemetryQueue?.AddWork();
}

/// <summary>
Expand Down Expand Up @@ -130,17 +112,5 @@ public static void LogAggregated(FunctionId functionId, KeyValueLogMessage logMe
public static void Flush()
{
s_logProvider?.Flush();

Flushed?.Invoke(null, EventArgs.Empty);
}

private static ValueTask PostCollectedTelemetryAsync(CancellationToken cancellationToken)
{
Flush();

// Ensure PostCollectedTelemetryAsync will get fired again after the collection period.
s_postTelemetryQueue?.AddWork();

return ValueTaskFactory.CompletedTask;
}
}

0 comments on commit 324e078

Please sign in to comment.