Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite job worker #159

Merged
merged 1 commit into from
Aug 21, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions Client.UnitTests/BaseZeebeTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

using System;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Grpc.Core;
Expand All @@ -28,8 +30,8 @@ public class BaseZeebeTest
private IZeebeClient client;

public Server Server => server;
public GatewayTestService TestService => testService;
public IZeebeClient ZeebeClient => client;
protected GatewayTestService TestService => testService;
protected IZeebeClient ZeebeClient => client;

[SetUp]
public void Init()
Expand All @@ -43,17 +45,30 @@ public void Init()
server.Services.Add(serviceDefinition);
server.Start();

client = Client.ZeebeClient.Builder().UseGatewayAddress("localhost:26500").UsePlainText().Build();
client = Client.ZeebeClient
.Builder()
.UseGatewayAddress("localhost:26500")
.UsePlainText()
.Build();
}

[TearDown]
public void Stop()
{
client.Dispose();
server.ShutdownAsync().Wait();
testService.Requests.Clear();
testService = null;
server = null;
client = null;
}

public void AwaitRequestCount(Type type, int requestCount)
{
while (TestService.Requests[type].Count < requestCount)
{
Thread.Sleep(TimeSpan.FromMilliseconds(100));
}
}
}
}
8 changes: 8 additions & 0 deletions Client.UnitTests/Client.UnitTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@
<RootNamespace>Zeebe.Client</RootNamespace>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Debug' ">
<PlatformTarget>x64</PlatformTarget>
</PropertyGroup>

<PropertyGroup Condition=" '$(Configuration)' == 'Release' ">
<PlatformTarget>x64</PlatformTarget>
</PropertyGroup>

<ItemGroup>
<None Update="Resources\**" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>
Expand Down
173 changes: 173 additions & 0 deletions Client.UnitTests/JobHandlerTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using GatewayProtocol;
using Microsoft.Extensions.Logging;
using NLog;
using NUnit.Framework;
using Zeebe.Client.Api.Responses;

namespace Zeebe.Client.Impl.Worker
{
[TestFixture]
public class JobHandlerTest : BaseZeebeTest
{
private ConcurrentQueue<IJob> workItems;
private ConcurrentQueue<IJob> seenJobs;
private JobWorkerSignal jobWorkerSignal;
private JobHandlerExecutor jobHandler;
private CancellationTokenSource tokenSource;

[SetUp]
public void SetupTest()
{
workItems = new ConcurrentQueue<IJob>();
seenJobs = new ConcurrentQueue<IJob>();
var jobWorkerBuilder = new JobWorkerBuilder(ZeebeClient);
jobWorkerSignal = new JobWorkerSignal();

jobWorkerBuilder
.JobType("foo")
.Handler((jobClient, job) => { seenJobs.Enqueue(job); })
.MaxJobsActive(3)
.Name("jobWorker")
.Timeout(TimeSpan.FromSeconds(123L))
.PollInterval(TimeSpan.FromMilliseconds(100))
.PollingTimeout(TimeSpan.FromSeconds(5L));
jobHandler = new JobHandlerExecutor(jobWorkerBuilder, workItems, jobWorkerSignal);
tokenSource = new CancellationTokenSource();
}

[TearDown]
public async Task CleanUp()
{
tokenSource.Cancel();
// delay disposing, since poll and handler take some time to close
await Task.Delay(TimeSpan.FromMilliseconds(200))
.ContinueWith(t => { tokenSource.Dispose(); });

tokenSource = null;
jobHandler = null;
seenJobs = null;
workItems = null;
jobWorkerSignal = null;
}

[Test]
public void ShouldHandleJob()
{
// given
var expectedJob = CreateActivatedJob(1);
workItems.Enqueue(CreateActivatedJob(1));

// when
ScheduleHandling();

// then
var hasJobHandled = jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1));
Assert.IsTrue(hasJobHandled);
AwaitJobsHaveSeen(1);

Assert.AreEqual(1, seenJobs.Count);
Assert.IsTrue(seenJobs.TryDequeue(out var actualJob));
Assert.AreEqual(expectedJob, actualJob);
}

[Test]
public void ShouldTriggerJobHandling()
{
// given
var expectedJob = CreateActivatedJob(1);
ScheduleHandling();
jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1));

// when
workItems.Enqueue(CreateActivatedJob(1));
jobWorkerSignal.SignalJobPolled();

// then
var hasJobHandled = jobWorkerSignal.AwaitJobHandling(TimeSpan.FromSeconds(1));
Assert.IsTrue(hasJobHandled);
AwaitJobsHaveSeen(1);

Assert.AreEqual(1, seenJobs.Count);
Assert.IsTrue(seenJobs.TryDequeue(out var actualJob));
Assert.AreEqual(expectedJob, actualJob);
}

[Test]
public void ShouldHandleJobsInOrder()
{
// given
workItems.Enqueue(CreateActivatedJob(1));
workItems.Enqueue(CreateActivatedJob(2));
workItems.Enqueue(CreateActivatedJob(3));

// when
ScheduleHandling();

// then
AwaitJobsHaveSeen(3);

IJob actualJob;
Assert.IsTrue(seenJobs.TryDequeue(out actualJob));
Assert.AreEqual(1, actualJob.Key);
Assert.IsTrue(seenJobs.TryDequeue(out actualJob));
Assert.AreEqual(2, actualJob.Key);
Assert.IsTrue(seenJobs.TryDequeue(out actualJob));
Assert.AreEqual(3, actualJob.Key);
}

[Test]
public void ShouldNotHandleDuplicateOnConcurrentHandlers()
{
// given
workItems.Enqueue(CreateActivatedJob(1));
workItems.Enqueue(CreateActivatedJob(2));
workItems.Enqueue(CreateActivatedJob(3));

// when
ScheduleHandling();
ScheduleHandling();

// then
AwaitJobsHaveSeen(3);
CollectionAssert.AllItemsAreUnique(seenJobs);
}

private async void AwaitJobsHaveSeen(int expectedCount)
{
while (!tokenSource.IsCancellationRequested && seenJobs.Count < expectedCount)
{
await Task.Delay(25);
}
}

private void ScheduleHandling()
{
Task.Run(() => jobHandler.HandleActivatedJobs(tokenSource.Token), tokenSource.Token);
}

private static Responses.ActivatedJob CreateActivatedJob(long key)
{
return new Responses.ActivatedJob(new ActivatedJob
{
Key = key,
Worker = "jobWorker",
Type = "foo",
Variables = "{\"foo\":1}",
CustomHeaders = "{\"customFoo\":\"1\"}",
Retries = 3,
Deadline = 123932,
BpmnProcessId = "process",
ElementId = "job1",
ElementInstanceKey = 23,
WorkflowInstanceKey = 29,
WorkflowDefinitionVersion = 3,
WorkflowKey = 21
});
}
}
}
Loading