Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish job telemetry to run-service. #3545

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Runner.Common/RunServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Task CompleteJobAsync(
IList<StepResult> stepResults,
IList<Annotation> jobAnnotations,
string environmentUrl,
IList<Telemetry> telemetry,
CancellationToken token);

Task<RenewJobResponse> RenewJobAsync(Guid planId, Guid jobId, CancellationToken token);
Expand Down Expand Up @@ -76,11 +77,12 @@ public Task CompleteJobAsync(
IList<StepResult> stepResults,
IList<Annotation> jobAnnotations,
string environmentUrl,
IList<Telemetry> telemetry,
CancellationToken cancellationToken)
{
CheckConnection();
return RetryRequest(
async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, result, outputs, stepResults, jobAnnotations, environmentUrl, cancellationToken), cancellationToken);
async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, result, outputs, stepResults, jobAnnotations, environmentUrl, telemetry, cancellationToken), cancellationToken);
}

public Task<RenewJobResponse> RenewJobAsync(Guid planId, Guid jobId, CancellationToken cancellationToken)
Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1198,7 +1198,7 @@ private async Task ForceFailJob(IRunServer runServer, Pipelines.AgentJobRequestM
jobAnnotations.Add(annotation.Value);
}

await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, environmentUrl: null, CancellationToken.None);
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, environmentUrl: null, telemetry: null, CancellationToken.None);
}
catch (Exception ex)
{
Expand Down
110 changes: 64 additions & 46 deletions src/Runner.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
using GitHub.Runner.Sdk;
using GitHub.Services.Common;
using GitHub.Services.WebApi;
using Sdk.RSWebApi.Contracts;
using Pipelines = GitHub.DistributedTask.Pipelines;

namespace GitHub.Runner.Worker
Expand Down Expand Up @@ -279,7 +280,12 @@ private async Task<TaskResult> CompleteJobAsync(IRunServer runServer, IExecution
jobContext.Debug($"Finishing: {message.JobDisplayName}");
TaskResult result = jobContext.Complete(taskResult);

await ShutdownQueue(throwOnFailure: false);
var jobQueueTelemetry = await ShutdownQueue(throwOnFailure: false);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

match the behavior for sending job complete to actions service.

// include any job telemetry from the background upload process.
if (jobQueueTelemetry?.Count > 0)
{
jobContext.Global.JobTelemetry.AddRange(jobQueueTelemetry);
}

// Make sure to clean temp after file upload since they may be pending fileupload still use the TEMP dir.
_tempDirectoryManager?.CleanupTempDirectory();
Expand All @@ -297,14 +303,21 @@ private async Task<TaskResult> CompleteJobAsync(IRunServer runServer, IExecution
environmentUrl = urlStringToken.Value;
}

// Get telemetry
IList<Telemetry> telemetry = null;
if (jobContext.Global.JobTelemetry.Count > 0)
{
telemetry = jobContext.Global.JobTelemetry.Select(x => new Telemetry { Type = x.Type.ToString(), Message = x.Message, }).ToList();
}

Trace.Info($"Raising job completed against run service");
var completeJobRetryLimit = 5;
var exceptions = new List<Exception>();
while (completeJobRetryLimit-- > 0)
{
try
{
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, result, jobContext.JobOutputs, jobContext.Global.StepsResult, jobContext.Global.JobAnnotations, environmentUrl, default);
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, result, jobContext.JobOutputs, jobContext.Global.StepsResult, jobContext.Global.JobAnnotations, environmentUrl, telemetry, default);
return result;
}
catch (Exception ex)
Expand All @@ -329,56 +342,14 @@ private async Task<TaskResult> CompleteJobAsync(IJobServer jobServer, IExecution

if (_runnerSettings.DisableUpdate == true)
{
try
{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i moved this to its own method in the bottom of the file, so i can easily diff the different between send job completion to run-service vs. actions-service.

var currentVersion = new PackageVersion(BuildConstants.RunnerPackage.Version);
ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
VssCredentials serverCredential = VssUtil.GetVssCredential(systemConnection);

var runnerServer = HostContext.GetService<IRunnerServer>();
await runnerServer.ConnectAsync(systemConnection.Url, serverCredential);
var serverPackages = await runnerServer.GetPackagesAsync("agent", BuildConstants.RunnerPackage.PackageName, 5, includeToken: false, cancellationToken: CancellationToken.None);
if (serverPackages.Count > 0)
{
serverPackages = serverPackages.OrderByDescending(x => x.Version).ToList();
Trace.Info($"Newer packages {StringUtil.ConvertToJson(serverPackages.Select(x => x.Version.ToString()))}");

var warnOnFailedJob = false; // any minor/patch version behind.
var warnOnOldRunnerVersion = false; // >= 2 minor version behind
if (serverPackages.Any(x => x.Version.CompareTo(currentVersion) > 0))
{
Trace.Info($"Current runner version {currentVersion} is behind the latest runner version {serverPackages[0].Version}.");
warnOnFailedJob = true;
}

if (serverPackages.Where(x => x.Version.Major == currentVersion.Major && x.Version.Minor > currentVersion.Minor).Count() > 1)
{
Trace.Info($"Current runner version {currentVersion} is way behind the latest runner version {serverPackages[0].Version}.");
warnOnOldRunnerVersion = true;
}

if (result == TaskResult.Failed && warnOnFailedJob)
{
jobContext.Warning($"This job failure may be caused by using an out of date self-hosted runner. You are currently using runner version {currentVersion}. Please update to the latest version {serverPackages[0].Version}");
}
else if (warnOnOldRunnerVersion)
{
jobContext.Warning($"This self-hosted runner is currently using runner version {currentVersion}. This version is out of date. Please update to the latest version {serverPackages[0].Version}");
}
}
}
catch (Exception ex)
{
// Ignore any error since suggest runner update is best effort.
Trace.Error($"Caught exception during runner version check: {ex}");
}
await WarningOutdatedRunnerAsync(jobContext, message, result);
}

try
{
var jobQueueTelemetry = await ShutdownQueue(throwOnFailure: true);
// include any job telemetry from the background upload process.
if (jobQueueTelemetry.Count > 0)
if (jobQueueTelemetry?.Count > 0)
{
jobContext.Global.JobTelemetry.AddRange(jobQueueTelemetry);
}
Expand Down Expand Up @@ -506,5 +477,52 @@ private async Task<IList<JobTelemetry>> ShutdownQueue(bool throwOnFailure)

return Array.Empty<JobTelemetry>();
}

private async Task WarningOutdatedRunnerAsync(IExecutionContext jobContext, Pipelines.AgentJobRequestMessage message, TaskResult result)
{
try
{
var currentVersion = new PackageVersion(BuildConstants.RunnerPackage.Version);
ServiceEndpoint systemConnection = message.Resources.Endpoints.Single(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection, StringComparison.OrdinalIgnoreCase));
VssCredentials serverCredential = VssUtil.GetVssCredential(systemConnection);

var runnerServer = HostContext.GetService<IRunnerServer>();
await runnerServer.ConnectAsync(systemConnection.Url, serverCredential);
var serverPackages = await runnerServer.GetPackagesAsync("agent", BuildConstants.RunnerPackage.PackageName, 5, includeToken: false, cancellationToken: CancellationToken.None);
if (serverPackages.Count > 0)
{
serverPackages = serverPackages.OrderByDescending(x => x.Version).ToList();
Trace.Info($"Newer packages {StringUtil.ConvertToJson(serverPackages.Select(x => x.Version.ToString()))}");

var warnOnFailedJob = false; // any minor/patch version behind.
var warnOnOldRunnerVersion = false; // >= 2 minor version behind
if (serverPackages.Any(x => x.Version.CompareTo(currentVersion) > 0))
{
Trace.Info($"Current runner version {currentVersion} is behind the latest runner version {serverPackages[0].Version}.");
warnOnFailedJob = true;
}

if (serverPackages.Where(x => x.Version.Major == currentVersion.Major && x.Version.Minor > currentVersion.Minor).Count() > 1)
{
Trace.Info($"Current runner version {currentVersion} is way behind the latest runner version {serverPackages[0].Version}.");
warnOnOldRunnerVersion = true;
}

if (result == TaskResult.Failed && warnOnFailedJob)
{
jobContext.Warning($"This job failure may be caused by using an out of date self-hosted runner. You are currently using runner version {currentVersion}. Please update to the latest version {serverPackages[0].Version}");
}
else if (warnOnOldRunnerVersion)
{
jobContext.Warning($"This self-hosted runner is currently using runner version {currentVersion}. This version is out of date. Please update to the latest version {serverPackages[0].Version}");
}
}
}
catch (Exception ex)
{
// Ignore any error since suggest runner update is best effort.
Trace.Error($"Caught exception during runner version check: {ex}");
}
}
}
}
3 changes: 3 additions & 0 deletions src/Sdk/RSWebApi/Contracts/CompleteJobRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class CompleteJobRequest
[DataMember(Name = "annotations", EmitDefaultValue = false)]
public IList<Annotation> Annotations { get; set; }

[DataMember(Name = "telemetry", EmitDefaultValue = false)]
public IList<Telemetry> Telemetry { get; set; }

[DataMember(Name = "environmentUrl", EmitDefaultValue = false)]
public string EnvironmentUrl { get; set; }
}
Expand Down
20 changes: 20 additions & 0 deletions src/Sdk/RSWebApi/Contracts/Telemetry.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using System.Runtime.Serialization;

namespace Sdk.RSWebApi.Contracts
{
[DataContract]
public struct Telemetry
{
public Telemetry(string message, string type)
{
Message = message;
Type = type;
}

[DataMember(Name = "message", EmitDefaultValue = false)]
public string Message { get; set; }

[DataMember(Name = "type", EmitDefaultValue = false)]
public string Type { get; set; }
}
}
3 changes: 3 additions & 0 deletions src/Sdk/RSWebApi/RunServiceHttpClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Http;
using System.Threading;
Expand Down Expand Up @@ -126,6 +127,7 @@ public async Task CompleteJobAsync(
IList<StepResult> stepResults,
IList<Annotation> jobAnnotations,
string environmentUrl,
IList<Telemetry> telemetry,
CancellationToken cancellationToken = default)
{
HttpMethod httpMethod = new HttpMethod("POST");
Expand All @@ -138,6 +140,7 @@ public async Task CompleteJobAsync(
StepResults = stepResults,
Annotations = jobAnnotations,
EnvironmentUrl = environmentUrl,
Telemetry = telemetry,
};

requestUri = new Uri(requestUri, "completejob");
Expand Down
Loading