Skip to content

Commit

Permalink
Added support for cancellation token to EventStore (#48)
Browse files Browse the repository at this point in the history
  • Loading branch information
getsetcode authored and asosMikeGore committed Nov 6, 2019
1 parent 7c48972 commit 60f1460
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 8 deletions.
19 changes: 19 additions & 0 deletions src/SimpleEventStore/SimpleEventStore.Tests/EventStoreAppending.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using SimpleEventStore.Tests.Events;
Expand Down Expand Up @@ -112,5 +113,23 @@ public async Task when_appending_to_a_new_stream_the_event_metadata_is_saved()
var stream = await subject.ReadStreamForwards(streamId);
Assert.That(((TestMetadata)stream.Single().Metadata).Value, Is.EqualTo(metadata.Value));
}

[Test]
public async Task when_appending_to_a_stream_the_engine_honours_cancellation_token()
{
var streamId = Guid.NewGuid().ToString();
var subject = await GetEventStore();
var metadata = new TestMetadata { Value = "Hello" };
var @event = new EventData(Guid.NewGuid(), new OrderCreated(streamId), metadata);

using (var cts = new CancellationTokenSource())
{
cts.Cancel();

AsyncTestDelegate act = () => subject.AppendToStream(streamId, 0, cts.Token, @event);

Assert.That(act, Throws.InstanceOf<OperationCanceledException>());
}
}
}
}
23 changes: 20 additions & 3 deletions src/SimpleEventStore/SimpleEventStore.Tests/EventStoreReading.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
using System;
using NUnit.Framework;
using SimpleEventStore.Tests.Events;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
using SimpleEventStore.Tests.Events;

namespace SimpleEventStore.Tests
{
Expand Down Expand Up @@ -63,5 +64,21 @@ public async Task when_reading_a_stream_only_the_required_events_are_returned()
Assert.That(events.Count, Is.EqualTo(1));
Assert.That(events.First().EventBody, Is.InstanceOf<OrderDispatched>());
}

[Test]
public async Task when_reading_a_stream_the_engine_honours_cancellation_token()
{
var streamId = Guid.NewGuid().ToString();
var subject = await GetEventStore();

using (var cts = new CancellationTokenSource())
{
cts.Cancel();

AsyncTestDelegate act = () => subject.ReadStreamForwards(streamId, cts.Token);

Assert.That(act, Throws.InstanceOf<OperationCanceledException>());
}
}
}
}
16 changes: 11 additions & 5 deletions src/SimpleEventStore/SimpleEventStore/EventStore.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace SimpleEventStore
Expand All @@ -14,6 +15,11 @@ public EventStore(IStorageEngine engine)
}

public Task AppendToStream(string streamId, int expectedVersion, params EventData[] events)
{
return AppendToStream(streamId, expectedVersion, default, events);
}

public Task AppendToStream(string streamId, int expectedVersion, CancellationToken cancellationToken, params EventData[] events)
{
Guard.IsNotNullOrEmpty(nameof(streamId), streamId);

Expand All @@ -25,21 +31,21 @@ public Task AppendToStream(string streamId, int expectedVersion, params EventDat
storageEvents.Add(new StorageEvent(streamId, events[i], ++eventVersion));
}

return engine.AppendToStream(streamId, storageEvents);
return engine.AppendToStream(streamId, storageEvents, cancellationToken);
}

public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId)
public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, CancellationToken cancellationToken = default)
{
Guard.IsNotNullOrEmpty(nameof(streamId), streamId);

return engine.ReadStreamForwards(streamId, 1, Int32.MaxValue);
return engine.ReadStreamForwards(streamId, 1, Int32.MaxValue, cancellationToken);
}

public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead)
public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead, CancellationToken cancellationToken = default)
{
Guard.IsNotNullOrEmpty(nameof(streamId), streamId);

return engine.ReadStreamForwards(streamId, startPosition, numberOfEventsToRead);
return engine.ReadStreamForwards(streamId, startPosition, numberOfEventsToRead, cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public Task AppendToStream(string streamId, IEnumerable<StorageEvent> events, Ca
throw new ConcurrencyException($"Concurrency conflict when appending to stream {streamId}. Expected revision {firstEvent.EventNumber} : Actual revision {streams[streamId].Count}");
}

cancellationToken.ThrowIfCancellationRequested();

streams[streamId].AddRange(events);
AddEventsToAllStream(events);
});
Expand All @@ -44,6 +46,8 @@ private void AddEventsToAllStream(IEnumerable<StorageEvent> events)

public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

if (!streams.ContainsKey(streamId))
{
return Task.FromResult(EmptyStream);
Expand All @@ -55,6 +59,8 @@ public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamI

public Task<IStorageEngine> Initialise(CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();

return Task.FromResult<IStorageEngine>(this);
}
}
Expand Down

0 comments on commit 60f1460

Please sign in to comment.