Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor send api #265

Merged
merged 3 commits into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 5 additions & 13 deletions Client/Api/Commands/IFinalCommandStep.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,15 @@ public interface IFinalCommandStep<T>
/// </example>
/// </summary>
/// <param name="timeout">the time span after request should be timed out.</param>
/// <param name="token">the token that manages the cancellation of the request.</param>
/// <returns>a task tracking state of success/failure of the command.</returns>
Task<T> Send(TimeSpan? timeout = null);
Task<T> Send(TimeSpan? timeout = null, CancellationToken token = default);

/// <summary>
/// Sends the command to the Zeebe broker. This operation is asynchronous. In case of success, the
/// task returns the event that was generated by the Zeebe broker in response to the command.
///
/// <para>Use <c>await ...Send(CancellationToken.None);</c> to wait until the response is available.</para>
///
/// <example>
/// <code>
/// T response = await command.Send(CancellationToken.None);
/// </code>
/// </example>
/// Convenient method, which delegates to <c>Send(TimeSpan? timeout = null, CancellationToken token = default)</c>.
/// </summary>
/// <param name="token">the token that manages the cancellation of the request.</param>
/// <param name="cancellationToken">the token that manages the cancellation of the request.</param>
/// <returns>a task tracking state of success/failure of the command.</returns>
Task<T> Send(CancellationToken token);
Task<T> Send(CancellationToken cancellationToken);
}
}
4 changes: 3 additions & 1 deletion Client/Api/Commands/IFinalCommandWithRetryStep.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading;
using System.Threading.Tasks;

namespace Zeebe.Client.Api.Commands
Expand All @@ -23,7 +24,8 @@ public interface IFinalCommandWithRetryStep<T> : IFinalCommandStep<T>
/// </summary>
///
/// <param name="timeout">the time span after request should be timed out</param>
/// <param name="token">the token that manages the cancellation of the request.</param>
/// <returns>a task tracking state of success/failure of the command.</returns>
Task<T> SendWithRetry(TimeSpan? timeout = null);
Task<T> SendWithRetry(TimeSpan? timeout = null, CancellationToken token = default);
}
}
20 changes: 5 additions & 15 deletions Client/Impl/Commands/ActivateJobsCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,17 @@ public IActivateJobsCommandStep3 WorkerName(string workerName)
return this;
}

public async Task<IActivateJobsResponse> Send(TimeSpan? timeout = null)
public async Task<IActivateJobsResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
return await Send(timeout, null);
return await activator.SendActivateRequest(Request, timeout?.FromUtcNow(), token);
}

public Task<IActivateJobsResponse> Send(CancellationToken token)
public async Task<IActivateJobsResponse> Send(CancellationToken cancellationToken)
{
return Send(null, token);
return await Send(token: cancellationToken);
}

public async Task<IActivateJobsResponse> Send(TimeSpan? timeout, CancellationToken? cancellationToken)
{
return await activator.SendActivateRequest(Request, timeout?.FromUtcNow(), cancellationToken);
}

public async Task<IActivateJobsResponse> SendWithRetry(TimeSpan? timespan)
{
return await asyncRetryStrategy.DoWithRetry(() => Send(timespan));
}

public async Task<IActivateJobsResponse> SendWithRetry(TimeSpan? timespan, CancellationToken? cancellationToken)
public async Task<IActivateJobsResponse> SendWithRetry(TimeSpan? timespan, CancellationToken cancellationToken = default)
{
return await asyncRetryStrategy.DoWithRetry(() => Send(timespan, cancellationToken));
}
Expand Down
14 changes: 6 additions & 8 deletions Client/Impl/Commands/CancelProcessInstanceCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,21 @@ public CancelProcessInstanceCommand(Gateway.GatewayClient client, IAsyncRetryStr
this.asyncRetryStrategy = asyncRetryStrategy;
}

public async Task<ICancelProcessInstanceResponse> Send(TimeSpan? timeout = null)
public async Task<ICancelProcessInstanceResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = client.CancelProcessInstanceAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = client.CancelProcessInstanceAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
await asyncReply.ResponseAsync;
return new CancelProcessInstanceResponse();
}

public async Task<ICancelProcessInstanceResponse> Send(CancellationToken token)
public async Task<ICancelProcessInstanceResponse> Send(CancellationToken cancellationToken)
{
var asyncReply = client.CancelProcessInstanceAsync(request, cancellationToken: token);
await asyncReply.ResponseAsync;
return new CancelProcessInstanceResponse();
return await Send(token: cancellationToken);
}

public async Task<ICancelProcessInstanceResponse> SendWithRetry(TimeSpan? timespan = null)
public async Task<ICancelProcessInstanceResponse> SendWithRetry(TimeSpan? timespan = null, CancellationToken token = default)
{
return await asyncRetryStrategy.DoWithRetry(() => Send(timespan));
return await asyncRetryStrategy.DoWithRetry(() => Send(timespan, token));
}
}
}
10 changes: 4 additions & 6 deletions Client/Impl/Commands/CompleteJobCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,16 @@ public ICompleteJobCommandStep1 Variables(string variables)
return this;
}

public async Task<ICompleteJobResponse> Send(TimeSpan? timeout = null)
public async Task<ICompleteJobResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = gatewayClient.CompleteJobAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = gatewayClient.CompleteJobAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
await asyncReply.ResponseAsync;
return new Responses.CompleteJobResponse();
}

public async Task<ICompleteJobResponse> Send(CancellationToken token)
public async Task<ICompleteJobResponse> Send(CancellationToken cancellationToken)
{
var asyncReply = gatewayClient.CompleteJobAsync(request, cancellationToken: token);
await asyncReply.ResponseAsync;
return new Responses.CompleteJobResponse();
return await Send(token: cancellationToken);
}
}
}
10 changes: 4 additions & 6 deletions Client/Impl/Commands/CreateProcessInstanceCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,16 @@ public ICreateProcessInstanceWithResultCommandStep1 WithResult()
return new CreateProcessInstanceCommandWithResult(client, request);
}

public async Task<IProcessInstanceResponse> Send(TimeSpan? timeout = null)
public async Task<IProcessInstanceResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = client.CreateProcessInstanceAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = client.CreateProcessInstanceAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
var response = await asyncReply.ResponseAsync;
return new ProcessInstanceResponse(response);
}

public async Task<IProcessInstanceResponse> Send(CancellationToken token)
public async Task<IProcessInstanceResponse> Send(CancellationToken cancellationToken)
{
var asyncReply = client.CreateProcessInstanceAsync(request, cancellationToken: token);
var response = await asyncReply.ResponseAsync;
return new ProcessInstanceResponse(response);
return await Send(token: cancellationToken);
}
}
}
10 changes: 4 additions & 6 deletions Client/Impl/Commands/CreateProcessInstanceCommandWithResult.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public ICreateProcessInstanceWithResultCommandStep1 FetchVariables(params string
}

/// <inheritdoc/>
public async Task<IProcessInstanceResult> Send(TimeSpan? timeout = null)
public async Task<IProcessInstanceResult> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
// this timeout will be used for the Gateway-Broker communication
createWithResultRequest.RequestTimeout = (long)(timeout?.TotalMilliseconds ?? DefaultGatewayBrokerTimeoutMillisecond);
Expand All @@ -48,16 +48,14 @@ public async Task<IProcessInstanceResult> Send(TimeSpan? timeout = null)
var clientDeadline = TimeSpan.FromMilliseconds(createWithResultRequest.RequestTimeout +
DefaultTimeoutAdditionMillisecond).FromUtcNow();

var asyncReply = client.CreateProcessInstanceWithResultAsync(createWithResultRequest, deadline: clientDeadline);
var asyncReply = client.CreateProcessInstanceWithResultAsync(createWithResultRequest, deadline: clientDeadline, cancellationToken: token);
var response = await asyncReply.ResponseAsync;
return new ProcessInstanceResultResponse(response);
}

public async Task<IProcessInstanceResult> Send(CancellationToken token)
public async Task<IProcessInstanceResult> Send(CancellationToken cancellationToken)
{
var asyncReply = client.CreateProcessInstanceWithResultAsync(createWithResultRequest, cancellationToken: token);
var response = await asyncReply.ResponseAsync;
return new ProcessInstanceResultResponse(response);
return await Send(token: cancellationToken);
}
}
}
10 changes: 4 additions & 6 deletions Client/Impl/Commands/DeployProcessCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,16 @@ public IDeployProcessCommandBuilderStep2 AddResourceStringUtf8(string resourceSt
return this;
}

public async Task<IDeployResponse> Send(TimeSpan? timeout = null)
public async Task<IDeployResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = gatewayClient.DeployProcessAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = gatewayClient.DeployProcessAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
var response = await asyncReply.ResponseAsync;
return new DeployResponse(response);
}

public async Task<IDeployResponse> Send(CancellationToken token)
public async Task<IDeployResponse> Send(CancellationToken cancellationToken)
{
var asyncReply = gatewayClient.DeployProcessAsync(request, cancellationToken: token);
var response = await asyncReply.ResponseAsync;
return new DeployResponse(response);
return await Send(token: cancellationToken);
}

private void AddProcess(ByteString resource, string resourceName)
Expand Down
10 changes: 4 additions & 6 deletions Client/Impl/Commands/FailJobCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,16 @@ public IFailJobCommandStep2 ErrorMessage(string errorMsg)
return this;
}

public async Task<IFailJobResponse> Send(TimeSpan? timeout = null)
public async Task<IFailJobResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = gatewayClient.FailJobAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = gatewayClient.FailJobAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
await asyncReply.ResponseAsync;
return new FailJobResponse();
}

public async Task<IFailJobResponse> Send(CancellationToken token)
public async Task<IFailJobResponse> Send(CancellationToken cancellationToken)
{
var asyncReply = gatewayClient.FailJobAsync(request, cancellationToken: token);
await asyncReply.ResponseAsync;
return new FailJobResponse();
return await Send(token: cancellationToken);
}
}
}
10 changes: 4 additions & 6 deletions Client/Impl/Commands/PublishMessageCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,16 @@ public IPublishMessageCommandStep3 TimeToLive(TimeSpan timeToLive)
return this;
}

public async Task<IPublishMessageResponse> Send(TimeSpan? timeout = null)
public async Task<IPublishMessageResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = gatewayClient.PublishMessageAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = gatewayClient.PublishMessageAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
await asyncReply.ResponseAsync;
return new PublishMessageResponse();
}

public async Task<IPublishMessageResponse> Send(CancellationToken token)
public async Task<IPublishMessageResponse> Send(CancellationToken cancellationToken)
{
var asyncReply = gatewayClient.PublishMessageAsync(request, cancellationToken: token);
await asyncReply.ResponseAsync;
return new PublishMessageResponse();
return await Send(token: cancellationToken);
}
}
}
10 changes: 4 additions & 6 deletions Client/Impl/Commands/ResolveIncidentCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@ public ResolveIncidentCommand(Gateway.GatewayClient client, long incidentKey)
this.client = client;
}

public async Task<IResolveIncidentResponse> Send(TimeSpan? timeout = null)
public async Task<IResolveIncidentResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = client.ResolveIncidentAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = client.ResolveIncidentAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
await asyncReply.ResponseAsync;
return new ResolveIncidentResponse();
}

public async Task<IResolveIncidentResponse> Send(CancellationToken token)
public async Task<IResolveIncidentResponse> Send(CancellationToken cancellationToken)
{
var asyncReply = client.ResolveIncidentAsync(request, cancellationToken: token);
await asyncReply.ResponseAsync;
return new ResolveIncidentResponse();
return await Send(token: cancellationToken);
}
}
}
10 changes: 4 additions & 6 deletions Client/Impl/Commands/SetVariablesCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,16 @@ public ISetVariablesCommandStep2 Local()
return this;
}

public async Task<ISetVariablesResponse> Send(TimeSpan? timeout = null)
public async Task<ISetVariablesResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = client.SetVariablesAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = client.SetVariablesAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
var response = await asyncReply.ResponseAsync;
return new SetVariablesResponse(response);
}

public async Task<ISetVariablesResponse> Send(CancellationToken token)
public async Task<ISetVariablesResponse> Send(CancellationToken cancellationToken)
{
var asyncReply = client.SetVariablesAsync(request, cancellationToken: token);
var response = await asyncReply.ResponseAsync;
return new SetVariablesResponse(response);
return await Send(token: cancellationToken);
}
}
}
10 changes: 4 additions & 6 deletions Client/Impl/Commands/ThrowErrorCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,16 @@ public IThrowErrorCommandStep2 ErrorMessage(string errorMessage)
return this;
}

public async Task<IThrowErrorResponse> Send(TimeSpan? timeout = null)
public async Task<IThrowErrorResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = gatewayClient.ThrowErrorAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = gatewayClient.ThrowErrorAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
await asyncReply.ResponseAsync;
return new Responses.ThrowErrorResponse();
}

public async Task<IThrowErrorResponse> Send(CancellationToken token)
public async Task<IThrowErrorResponse> Send(CancellationToken cancellationToken)
{
var asyncReply = gatewayClient.ThrowErrorAsync(request, cancellationToken: token);
await asyncReply.ResponseAsync;
return new Responses.ThrowErrorResponse();
return await Send(token: cancellationToken);
}
}
}
11 changes: 4 additions & 7 deletions Client/Impl/Commands/TopologyRequestCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,17 @@ public TopologyRequestCommand(Gateway.GatewayClient client)
gatewayClient = client;
}

public async Task<ITopology> Send(TimeSpan? timeout = null)
public async Task<ITopology> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = gatewayClient.TopologyAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = gatewayClient.TopologyAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
var response = await asyncReply.ResponseAsync;

return new Topology(response);
}

public async Task<ITopology> Send(CancellationToken token)
public async Task<ITopology> Send(CancellationToken cancellationToken)
{
var asyncReply = gatewayClient.TopologyAsync(request, cancellationToken: token);
var response = await asyncReply.ResponseAsync;

return new Topology(response);
return await Send(token: cancellationToken);
}
}
}
10 changes: 4 additions & 6 deletions Client/Impl/Commands/UpdateRetriesCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,16 @@ public IUpdateRetriesCommandStep2 Retries(int retries)
return this;
}

public async Task<IUpdateRetriesResponse> Send(TimeSpan? timeout = null)
public async Task<IUpdateRetriesResponse> Send(TimeSpan? timeout = null, CancellationToken token = default)
{
var asyncReply = client.UpdateJobRetriesAsync(request, deadline: timeout?.FromUtcNow());
var asyncReply = client.UpdateJobRetriesAsync(request, deadline: timeout?.FromUtcNow(), cancellationToken: token);
await asyncReply.ResponseAsync;
return new UpdateRetriesResponse();
}

public async Task<IUpdateRetriesResponse> Send(CancellationToken token)
public async Task<IUpdateRetriesResponse> Send(CancellationToken cancellationToken)
{
var asyncReply = client.UpdateJobRetriesAsync(request, cancellationToken: token);
await asyncReply.ResponseAsync;
return new UpdateRetriesResponse();
return await Send(token: cancellationToken);
}
}
}
Loading