Skip to content

Commit

Permalink
Support reading from last snapshot event
Browse files Browse the repository at this point in the history
  • Loading branch information
Dave Thompson committed Oct 31, 2018
1 parent e0e4763 commit cb1d2ea
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 2 deletions.
35 changes: 35 additions & 0 deletions SimpleEventStore.AzureDocumentDb/AzureDocumentDbStorageEngine.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
Expand Down Expand Up @@ -94,6 +95,40 @@ public async Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string s
return events.AsReadOnly();
}

public async Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwardsFromLast(string streamId, Predicate<StorageEvent> readFromHere)
{
var eventsQuery = client.CreateDocumentQuery<DocumentDbStorageEvent>(commitsLink)
.Where(x => x.StreamId == streamId)
.OrderByDescending(x => x.EventNumber)
.AsDocumentQuery();

var eventsInReverseOrder = new List<StorageEvent>();
var finished = false;

while (!finished && eventsQuery.HasMoreResults)
{
var response = await eventsQuery.ExecuteNextAsync<DocumentDbStorageEvent>();
loggingOptions.OnSuccess(ResponseInformation.FromReadResponse(nameof(ReadStreamForwardsFromLast), response));

foreach (var e in response)
{
var storageEvent = e.ToStorageEvent(typeMap);
eventsInReverseOrder.Add(storageEvent);

if (readFromHere(storageEvent))
{
finished = true;
break;
}
}
}

return eventsInReverseOrder
.Reverse<StorageEvent>()
.ToList()
.AsReadOnly();
}

public async Task DeleteStream(string streamId)
{
while (true)
Expand Down
19 changes: 19 additions & 0 deletions SimpleEventStore.Tests/EventStoreReading.cs
Original file line number Diff line number Diff line change
Expand Up @@ -63,5 +63,24 @@ public async Task when_reading_a_stream_only_the_required_events_are_returned()
Assert.That(events.Count, Is.EqualTo(1));
Assert.That(events.First().EventBody, Is.InstanceOf<OrderDispatched>());
}

[Test]
public async Task when_reading_a_stream_from_snapshot_only_subsequent_events_are_returned()
{
var streamId = Guid.NewGuid().ToString();
var subject = await GetEventStore();

await subject.AppendToStream(streamId, 0, new EventData(Guid.NewGuid(), new OrderCreated(streamId)));
await subject.AppendToStream(streamId, 1, new EventData(Guid.NewGuid(), new OrderSnapshot(streamId)));
await subject.AppendToStream(streamId, 2, new EventData(Guid.NewGuid(), new OrderDispatched(streamId)));

var events = await subject.ReadStreamForwardsFromLast(
streamId,
storageEvent => storageEvent.EventBody is OrderSnapshot);

Assert.That(events.Count, Is.EqualTo(2));
Assert.That(events.First().EventBody, Is.InstanceOf<OrderSnapshot>());
Assert.That(events.Skip(1).Single().EventBody, Is.InstanceOf<OrderDispatched>());
}
}
}
12 changes: 12 additions & 0 deletions SimpleEventStore.Tests/Events/OrderSnapshot.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
namespace SimpleEventStore.Tests.Events
{
public class OrderSnapshot
{
public string OrderId { get; private set; }

public OrderSnapshot(string orderId)
{
OrderId = orderId;
}
}
}
17 changes: 16 additions & 1 deletion SimpleEventStore/EventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamI
{
Guard.IsNotNullOrEmpty(nameof(streamId), streamId);

return engine.ReadStreamForwards(streamId, 1, Int32.MaxValue);
return engine.ReadStreamForwards(streamId, 1, int.MaxValue);
}

public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead)
Expand All @@ -42,6 +42,21 @@ public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamI
return engine.ReadStreamForwards(streamId, startPosition, numberOfEventsToRead);
}

public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwardsFromLast(string streamId, Predicate<StorageEvent> readFromHere)
{
Guard.IsNotNullOrEmpty(nameof(streamId), streamId);
Guard.IsNotNull(nameof(readFromHere), readFromHere);

return engine.ReadStreamForwardsFromLast(streamId, readFromHere);
}

public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwardsFromLast<TSnapshot>(string streamId)
{
return ReadStreamForwardsFromLast(
streamId,
storageEvent => storageEvent.EventBody is TSnapshot);
}

public Task DeleteStream(string streamId)
{
Guard.IsNotNullOrEmpty(nameof(streamId), streamId);
Expand Down
2 changes: 2 additions & 0 deletions SimpleEventStore/IStorageEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public interface IStorageEngine

Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamId, int startPosition, int numberOfEventsToRead);

Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwardsFromLast(string streamId, Predicate<StorageEvent> readFromHere);

Task<IStorageEngine> Initialise();

Task DeleteStream(string streamId);
Expand Down
21 changes: 21 additions & 0 deletions SimpleEventStore/InMemory/EnumerableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
using System;
using System.Collections.Generic;

namespace SimpleEventStore.InMemory
{
static class EnumerableExtensions
{
public static IEnumerable<T> TakeUntilImmediatelyAfter<T>(this IEnumerable<T> source, Predicate<T> stopImmediatelyAfter)
{
foreach (var item in source)
{
yield return item;

if (stopImmediatelyAfter(item))
{
yield break;
}
}
}
}
}
25 changes: 24 additions & 1 deletion SimpleEventStore/InMemory/InMemoryStorageEngine.cs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
Expand Down Expand Up @@ -65,7 +66,29 @@ public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwards(string streamI
return Task.FromResult(EmptyStream);
}

IReadOnlyCollection<StorageEvent> stream = streams[streamId].Skip(startPosition - 1).Take(numberOfEventsToRead).ToList().AsReadOnly();
IReadOnlyCollection<StorageEvent> stream = streams[streamId]
.Skip(startPosition - 1)
.Take(numberOfEventsToRead)
.ToList()
.AsReadOnly();

return Task.FromResult(stream);
}

public Task<IReadOnlyCollection<StorageEvent>> ReadStreamForwardsFromLast(string streamId, Predicate<StorageEvent> readFromHere)
{
if (!streams.ContainsKey(streamId))
{
return Task.FromResult(EmptyStream);
}

IReadOnlyCollection<StorageEvent> stream = streams[streamId]
.Reverse<StorageEvent>()
.TakeUntilImmediatelyAfter(readFromHere)
.Reverse()
.ToList()
.AsReadOnly();

return Task.FromResult(stream);
}

Expand Down

0 comments on commit cb1d2ea

Please sign in to comment.