Skip to content

Commit

Permalink
improv(ResourceWatcher): unlink the Process Activity from the parent Run
Browse files Browse the repository at this point in the history
Unlinking the Resource Process Activity from the parent Run Activity avoids creating a too big Span (the whole Run) while still tracking the 'Run' as an Activity with its Dependencies/Steps.
  • Loading branch information
AndreaCuneo committed Nov 22, 2024
1 parent 07c0f47 commit 01cfa90
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 41 deletions.
27 changes: 27 additions & 0 deletions Ark.Tools.ResourceWatcher.ApplicationInsights/Ex.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Ark.Tools.ApplicationInsights.HostedService;
using System.Threading.Tasks;
using System.Threading;
using Microsoft.ApplicationInsights;

namespace Ark.Tools.ResourceWatcher.ApplicationInsights
{
Expand All @@ -14,7 +17,31 @@ public static IHostBuilder AddApplicationInsightsForWorkerHost(this IHostBuilder
.ConfigureServices((ctx, services) =>
{
services.AddSingleton<ITelemetryModule, ResourceWatcherTelemetryModule>();
services.AddHostedService<StartTelemetryHack>();
});
}

private class StartTelemetryHack : IHostedService
{
#pragma warning disable IDE0052 // Remove unread private members
private readonly TelemetryClient _client;
#pragma warning restore IDE0052 // Remove unread private members

public StartTelemetryHack(TelemetryClient client)
{
// only used to 'force' creation of the TelemetryClient which in turn triggers the ResourceWatcherTelemetryModule init and thus the subscription of the Listener.
_client = client;
}

public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,20 +326,19 @@ public override void OnProcessResourceStop(string tenant, ProcessContext process
var currentActivity = Activity.Current;
if (currentActivity == null) return;

var telemetry = new DependencyTelemetry
var telemetry = new RequestTelemetry
{
Id = currentActivity.Id,
Duration = currentActivity.Duration,
Name = currentActivity.OperationName,
Success = exception == null ? true : false,
Timestamp = currentActivity.StartTimeUtc,
Type = _type
};

//Telemetry operation context
telemetry.Context.Operation.Id = currentActivity.RootId;
telemetry.Context.Operation.ParentId = currentActivity.ParentId;

//Properties and metrics
telemetry.Properties.Add("Tenant", tenant);
_propertiesProcessContext(telemetry, processContext);
Expand All @@ -364,7 +363,7 @@ public override void OnProcessResourceStop(string tenant, ProcessContext process
this._client.TrackException(telemetryException);
}

this._client.TrackDependency(telemetry);
this._client.TrackRequest(telemetry);
}

#endregion
Expand Down
99 changes: 69 additions & 30 deletions Ark.Tools.ResourceWatcher/ResourceWatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using NodaTime;
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
Expand Down Expand Up @@ -132,58 +133,96 @@ protected virtual async Task _runOnce(RunType runType, CancellationToken ctk = d
try
{
//GetResources
using var activityResource = _diagnosticSource.GetResourcesStart();
var list = await _getResources(now, ctk);

var infos = await _getResourcesInfo(ctk);
//Check State - check which entries are new or have been modified.
var evaluated = await _evaluateActions(list, ctk);

var bad = infos.GroupBy(x => x.ResourceId).FirstOrDefault(x => x.Count() > 1);
if (bad != null)
_diagnosticSource.ThrowDuplicateResourceIdRetrived(bad.Key);
//Process
var toProcess = evaluated.Where(x => !x.ResultType.HasValue).ToList();
var skipped = evaluated.Count - toProcess.Count;

if (_config.SkipResourcesOlderThanDays.HasValue)
infos = infos
.Where(x => _getEarliestModified(x).Date > LocalDateTime.FromDateTime(now).Date.PlusDays(-(int)_config.SkipResourcesOlderThanDays.Value))
;
_logger.Info("Found {SkippedCount} resources to skip", skipped);
_logger.Info("Found {ToProcessCount} resources to process with parallelism {DegreeOfParallelism}", toProcess.Count, _config.DegreeOfParallelism);

var list = infos.ToList();
_diagnosticSource.GetResourcesSuccessful(activityResource, list.Count);
// Unlink the _processEntry Span from the parent run to avoid RootId being too big
// This is mainly an hack for Application Insights but in general we want to avoid too big RootId with 100s of 1000s Spans
Activity.Current = null;
try
{
var count = toProcess.Count;
await toProcess.Parallel((int)_config.DegreeOfParallelism, (i, x, ct) =>
{
x.Total = count;
x.Index = i + 1;
return _processEntry(x, ct);
}, ctk);
}
finally
{
Activity.Current = activityRun;
}

//Check State - check which entries are new or have been modified.
using var activityCheckState = _diagnosticSource.CheckStateStart();
_diagnosticSource.RunSuccessful(activityRun, evaluated);

if (activityRun.Duration > _config.RunDurationNotificationLimit)
_diagnosticSource.RunTookTooLong(activityRun);
}
catch (Exception ex)
{
_diagnosticSource.RunFailed(activityRun, ex);
throw;
}
}

private async Task<IList<ProcessContext>> _evaluateActions(IList<IResourceMetadata> list, CancellationToken ctk)
{
using var activityCheckState = _diagnosticSource.CheckStateStart();
try
{
var states = _config.IgnoreState ? Enumerable.Empty<ResourceState>() : await _stateProvider.LoadStateAsync(_config.Tenant, list.Select(i => i.ResourceId).ToArray(), ctk);

var evaluated = _createEvalueteList(list, states);

_diagnosticSource.CheckStateSuccessful(activityCheckState, evaluated);

//Process
var skipped = evaluated.Where(x => x.ResultType == ResultType.Skipped).ToList();
var toProcess = evaluated.Where(x => !x.ResultType.HasValue).ToList();
return evaluated;
} catch (Exception ex) {
_diagnosticSource.CheckStateFailed(activityCheckState, ex);
throw;
}
}

_logger.Info("Found {SkippedCount} resources to skip", skipped.Count);
_logger.Info("Found {ToProcessCount} resources to process with parallelism {DegreeOfParallelism}", toProcess.Count, _config.DegreeOfParallelism);
private async Task<IList<IResourceMetadata>> _getResources(DateTime now, CancellationToken ctk)
{
using var activityResource = _diagnosticSource.GetResourcesStart();

var count = toProcess.Count;
await toProcess.Parallel((int)_config.DegreeOfParallelism, (i, x, ct) => {
x.Total = count;
x.Index = i + 1;
return _processEntry(x, ct);
}, ctk);

_diagnosticSource.RunSuccessful(activityRun, evaluated);
try
{
var infos = await _getResourcesInfo(ctk);

if (activityRun.Duration > _config.RunDurationNotificationLimit)
_diagnosticSource.RunTookTooLong(activityRun);
var bad = infos.GroupBy(x => x.ResourceId).FirstOrDefault(x => x.Count() > 1);
if (bad != null)
_diagnosticSource.ThrowDuplicateResourceIdRetrived(bad.Key);

if (_config.SkipResourcesOlderThanDays.HasValue)
infos = infos
.Where(x => _getEarliestModified(x).Date > LocalDateTime.FromDateTime(now).Date.PlusDays(-(int)_config.SkipResourcesOlderThanDays.Value))
;

var list = infos.ToList();

_diagnosticSource.GetResourcesSuccessful(activityResource, list.Count);
return list;
}
catch (Exception ex)
{
_diagnosticSource.RunFailed(activityRun, ex);
_diagnosticSource.GetResourcesFailed(activityResource, ex);
throw;
}
}

private List<ProcessContext> _createEvalueteList(List<IResourceMetadata> list, IEnumerable<ResourceState> states)
private IList<ProcessContext> _createEvalueteList(IList<IResourceMetadata> list, IEnumerable<ResourceState> states)
{
var ev = list.GroupJoin(states, i => i.ResourceId, s => s.ResourceId, (i, s) =>
{
Expand Down
15 changes: 13 additions & 2 deletions Ark.Tools.ResourceWatcher/ResourceWatcherDiagnosticSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void RunFailed(Activity activity, Exception ex)
_logger.Error(ex, $"Check failed for tenant {_tenant} in {activity.Duration}");
}

public void RunSuccessful(Activity activity, List<ProcessContext> evaluated)
public void RunSuccessful(Activity activity, IList<ProcessContext> evaluated)
{
_stop(activity, () =>
{
Expand Down Expand Up @@ -187,6 +187,17 @@ public void CheckStateSuccessful(Activity activity, IEnumerable<ProcessContext>
}
);
}


public void CheckStateFailed(Activity activity, Exception ex)
{
_stop(activity, () => new
{
Exception = ex,
Tenant = _tenant,
}
);
}
#endregion

#region ProcessResource
Expand Down Expand Up @@ -324,7 +335,7 @@ public void ReportRunConsecutiveFailureLimitReached(Exception ex, int count)
}
#endregion

private Activity _start(string operationName, Func<object> getPayload)
private Activity _start(string operationName, Func<object> getPayload, bool unlinkFromParent = false)
{
string activityName = BaseActivityName + "." + operationName;

Expand Down
6 changes: 3 additions & 3 deletions Samples/TestWorker/Host/Test_Host.cs
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,9 @@ private static Host _configureForTest(IConfiguration configuration, Action<Test_
MaxRetries = 2,
};

var rebusCfg = new RebusResourceNotifier_Config(configuration["ConnectionStrings:Test.Rebus"])
{
};
//var rebusCfg = new RebusResourceNotifier_Config(configuration["ConnectionStrings:Test.Rebus"])
//{
//};

configurer?.Invoke(baseCfg);

Expand Down
4 changes: 2 additions & 2 deletions Samples/TestWorker/appsettings.Development.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
}
},
"ConnectionStrings": {
"NLog.Database": "Data Source=(localdb)\\MSSQLLocalDB;Initial Catalog=Logs;Integrated Security=True;Persist Security Info=False;Pooling=True;MultipleActiveResultSets=True;Connect Timeout=60;Encrypt=False;TrustServerCertificate=True",
"Workers.Database": "Data Source=(localdb)\\MSSQLLocalDB;Initial Catalog=WorkerState;Integrated Security=True;Persist Security Info=False;Pooling=True;MultipleActiveResultSets=True;Connect Timeout=60;Encrypt=False;TrustServerCertificate=True"
"NLog.Database": "Data Source=127.0.0.1;User Id=sa;Password=SpecFlowLocalDbPassword85!;Initial Catalog=Logs;Pooling=True;Connect Timeout=60;Encrypt=True;TrustServerCertificate=True",
"Workers.Database": "Data Source=127.0.0.1;User Id=sa;Password=SpecFlowLocalDbPassword85!;Initial Catalog=WorkerState;Pooling=True;Connect Timeout=60;Encrypt=True;TrustServerCertificate=True"
},
"Test": {
"Recipe": "Test"
Expand Down

0 comments on commit 01cfa90

Please sign in to comment.