Skip to content

Commit

Permalink
refactor: handle jobs while stream is open
Browse files Browse the repository at this point in the history
Refactor job worker such that it uses the JobActivator and the request directly. This makes it possible to register an own consumer which will be called if a new response was incoming. This improved the performance of the job handling. Make the consumer async, which allows to be used in the job worker with the data pipeline. Add a task in the ActivateJobsCommand, which just collects all responses.

Remove the retry from the activation, since the activation is retried in
the worker after the poll interval.
  • Loading branch information
ChrisKujawa committed Aug 20, 2021
1 parent bdcba9b commit 2ee7323
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 30 deletions.
2 changes: 1 addition & 1 deletion Client/Impl/Commands/ActivateJobsCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public IActivateJobsCommandStep3 WorkerName(string workerName)
public async Task<IActivateJobsResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var activateJobsResponses = new Responses.ActivateJobsResponses();
await activator.SendActivateRequest(Request, response => activateJobsResponses.Add(response), timeout?.FromUtcNow(), token);
await activator.SendActivateRequest(Request, response => Task.Run(() => activateJobsResponses.Add(response), token), timeout?.FromUtcNow(), token);
return activateJobsResponses;
}

Expand Down
8 changes: 4 additions & 4 deletions Client/Impl/Commands/JobActivator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Zeebe.Client.Impl.Commands
{
public delegate void ConsumeJob(IActivateJobsResponse response);
public delegate Task ConsumeJob(IActivateJobsResponse response);
internal class JobActivator
{
private readonly GatewayClient client;
Expand All @@ -21,7 +21,7 @@ public JobActivator(GatewayClient client)

public async Task SendActivateRequest(ActivateJobsRequest request, ConsumeJob consumer, DateTime? requestTimeout = null, CancellationToken? cancellationToken = null)
{
DateTime activateRequestTimeout = requestTimeout ?? CalculateRequestTimeout(request);
var activateRequestTimeout = requestTimeout ?? CalculateRequestTimeout(request);
using (var stream = client.ActivateJobs(request, deadline: activateRequestTimeout))
{
var responseStream = stream.ResponseStream;
Expand All @@ -30,7 +30,7 @@ public async Task SendActivateRequest(ActivateJobsRequest request, ConsumeJob co
{
var currentResponse = responseStream.Current;
var response = new ActivateJobsResponses(currentResponse);
consumer.Invoke(response);
await consumer.Invoke(response);
}
}
}
Expand All @@ -44,7 +44,7 @@ private static DateTime CalculateRequestTimeout(ActivateJobsRequest request)
: TimeSpan.FromSeconds((longPollingTimeout / 1000f) + 10).FromUtcNow();
}

private async Task<bool> MoveNext(IAsyncStreamReader<ActivateJobsResponse> stream, CancellationToken? cancellationToken = null)
private static async Task<bool> MoveNext(IAsyncStreamReader<ActivateJobsResponse> stream, CancellationToken? cancellationToken = null)
{
if (cancellationToken.HasValue)
{
Expand Down
27 changes: 17 additions & 10 deletions Client/Impl/Worker/JobWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using GatewayProtocol;
using Grpc.Core;
using Microsoft.Extensions.Logging;
using Zeebe.Client.Api.Misc;
using Zeebe.Client.Api.Responses;
using Zeebe.Client.Api.Worker;
using Zeebe.Client.Impl.Commands;
Expand All @@ -34,7 +36,8 @@ public class JobWorker : IJobWorker
private readonly CancellationTokenSource source;
private readonly ILogger<JobWorker> logger;
private readonly JobWorkerBuilder jobWorkerBuilder;
private readonly ActivateJobsCommand activateJobsCommand;
private readonly ActivateJobsRequest activateJobsRequest;
private readonly JobActivator jobActivator;
private readonly int maxJobsActive;
private readonly AsyncJobHandler jobHandler;
private readonly bool autoCompletion;
Expand All @@ -52,8 +55,9 @@ internal JobWorker(JobWorkerBuilder builder)
this.jobHandler = jobWorkerBuilder.Handler();
this.autoCompletion = builder.AutoCompletionEnabled();
this.pollInterval = jobWorkerBuilder.PollInterval();
this.activateJobsCommand = jobWorkerBuilder.Command;
this.maxJobsActive = jobWorkerBuilder.Command.Request.MaxJobsToActivate;
this.activateJobsRequest = jobWorkerBuilder.Request;
jobActivator = jobWorkerBuilder.Activator;
this.maxJobsActive = jobWorkerBuilder.Request.MaxJobsToActivate;
this.thresholdJobsActivation = maxJobsActive * 0.6;
}

Expand Down Expand Up @@ -114,8 +118,8 @@ internal void Open()

logger?.LogDebug(
"Job worker ({worker}) for job type {type} has been opened.",
activateJobsCommand.Request.Worker,
activateJobsCommand.Request.Type);
activateJobsRequest.Worker,
activateJobsRequest.Type);
}

private ExecutionDataflowBlockOptions CreateExecutionOptions(CancellationToken cancellationToken)
Expand Down Expand Up @@ -145,12 +149,14 @@ private async Task PollJobs(ITargetBlock<IJob> input, CancellationToken cancella
if (currentJobs < thresholdJobsActivation)
{
var jobCount = maxJobsActive - currentJobs;
activateJobsCommand.MaxJobsToActivate(jobCount);
activateJobsRequest.MaxJobsToActivate = jobCount;

try
{
var response = await activateJobsCommand.SendWithRetry(null, cancellationToken);
await HandleActivationResponse(input, response, jobCount);
await jobActivator.SendActivateRequest(activateJobsRequest,
async jobsResponse => await HandleActivationResponse(input, jobsResponse, jobCount),
null,
cancellationToken);
}
catch (RpcException rpcException)
{
Expand All @@ -168,7 +174,7 @@ private async Task HandleActivationResponse(ITargetBlock<IJob> input, IActivateJ
{
logger?.LogDebug(
"Job worker ({worker}) activated {activatedCount} of {requestCount} successfully.",
activateJobsCommand.Request.Worker,
activateJobsRequest.Worker,
response.Jobs.Count,
jobCount);

Expand Down Expand Up @@ -207,6 +213,7 @@ private void LogRpcException(RpcException rpcException)
{
case StatusCode.DeadlineExceeded:
case StatusCode.Cancelled:
case StatusCode.ResourceExhausted:
logLevel = LogLevel.Trace;
break;
default:
Expand All @@ -224,7 +231,7 @@ private async Task TryToAutoCompleteJob(JobClientWrapper jobClient, IJob activat
{
logger?.LogDebug(
"Job worker ({worker}) will auto complete job with key '{key}'",
activateJobsCommand.Request.Worker,
activateJobsRequest.Worker,
activatedJob.Key);
await jobClient.NewCompleteJobCommand(activatedJob)
.Send(cancellationToken);
Expand Down
29 changes: 15 additions & 14 deletions Client/Impl/Worker/JobWorkerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@

using System;
using System.Collections.Generic;
using System.Diagnostics.Contracts;
using System.Threading.Tasks;
using GatewayProtocol;
using Microsoft.Extensions.Logging;
using Zeebe.Client.Api.Misc;
using Zeebe.Client.Api.Worker;
using Zeebe.Client.Impl.Commands;

Expand All @@ -28,26 +29,26 @@ public class JobWorkerBuilder : IJobWorkerBuilderStep1, IJobWorkerBuilderStep2,
private TimeSpan pollInterval;
private AsyncJobHandler asyncJobHandler;
private bool autoCompletion;
internal JobActivator Activator { get; }
internal ActivateJobsRequest Request { get; }
internal byte ThreadCount { get; set; }
internal ILoggerFactory LoggerFactory { get; }
internal ActivateJobsCommand Command { get; }
internal IJobClient JobClient { get; }

public JobWorkerBuilder(
IZeebeClient zeebeClient,
public JobWorkerBuilder(IZeebeClient zeebeClient,
Gateway.GatewayClient gatewayClient,
ILoggerFactory loggerFactory = null)
{
LoggerFactory = loggerFactory;
Command = (ActivateJobsCommand) zeebeClient.NewActivateJobsCommand();
Activator = new JobActivator(gatewayClient);
Request = new ActivateJobsRequest();
JobClient = zeebeClient;
ThreadCount = 1;

zeebeClient.NewActivateJobsCommand();
}

public IJobWorkerBuilderStep2 JobType(string type)
{
Command.JobType(type);
Request.Type = type;
return this;
}

Expand All @@ -70,31 +71,31 @@ internal AsyncJobHandler Handler()

public IJobWorkerBuilderStep3 Timeout(TimeSpan timeout)
{
Command.Timeout(timeout);
Request.Timeout = (long) timeout.TotalMilliseconds;
return this;
}

public IJobWorkerBuilderStep3 Name(string workerName)
{
Command.WorkerName(workerName);
Request.Worker = workerName;
return this;
}

public IJobWorkerBuilderStep3 MaxJobsActive(int maxJobsActive)
{
Command.MaxJobsToActivate(maxJobsActive);
Request.MaxJobsToActivate = maxJobsActive;
return this;
}

public IJobWorkerBuilderStep3 FetchVariables(IList<string> fetchVariables)
{
Command.FetchVariables(fetchVariables);
Request.FetchVariable.AddRange(fetchVariables);
return this;
}

public IJobWorkerBuilderStep3 FetchVariables(params string[] fetchVariables)
{
Command.FetchVariables(fetchVariables);
Request.FetchVariable.AddRange(fetchVariables);
return this;
}

Expand All @@ -111,7 +112,7 @@ internal TimeSpan PollInterval()

public IJobWorkerBuilderStep3 PollingTimeout(TimeSpan pollingTimeout)
{
Command.PollingTimeout(pollingTimeout);
Request.RequestTimeout = (long) pollingTimeout.TotalMilliseconds;
return this;
}

Expand Down
2 changes: 1 addition & 1 deletion Client/ZeebeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private void AddKeepAliveToChannelOptions(List<ChannelOption> channelOptions, Ti

public IJobWorkerBuilderStep1 NewWorker()
{
return new JobWorkerBuilder(this, loggerFactory);
return new JobWorkerBuilder(this, gatewayClient, loggerFactory);
}

public IActivateJobsCommandStep1 NewActivateJobsCommand()
Expand Down

0 comments on commit 2ee7323

Please sign in to comment.