Skip to content

Commit

Permalink
Add queue-based event pub to make sure events are published in order
Browse files Browse the repository at this point in the history
  • Loading branch information
derekkraan committed Oct 30, 2017
1 parent 9b5f0f4 commit 58be6ab
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 29 deletions.
4 changes: 3 additions & 1 deletion lib/sequent/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class Configuration
:event_record_class,
:stream_record_class,
:snapshot_event_class,
:transaction_provider
:transaction_provider,
:event_publisher

attr_accessor :command_handlers,
:command_filters
Expand Down Expand Up @@ -47,6 +48,7 @@ def initialize
self.snapshot_event_class = Sequent::Core::SnapshotEvent
self.transaction_provider = Sequent::Core::Transactions::NoTransactions.new
self.uuid_generator = Sequent::Core::RandomUuidGenerator
self.event_publisher = Sequent::Core::EventPublisher.new
self.disable_event_handlers = false
end

Expand Down
1 change: 1 addition & 0 deletions lib/sequent/core/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
require_relative 'aggregate_snapshotter'
require_relative 'workflow'
require_relative 'random_uuid_generator'
require_relative 'event_publisher'
44 changes: 44 additions & 0 deletions lib/sequent/core/event_publisher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
module Sequent
module Core
class EventPublisher
class PublishEventError < RuntimeError
attr_reader :event_handler_class, :event

def initialize(event_handler_class, event)
@event_handler_class = event_handler_class
@event = event
end

def message
"Event Handler: #{@event_handler_class.inspect}\nEvent: #{@event.inspect}\nCause: #{cause.inspect}"
end
end

def initialize
@events_queue = Queue.new
end
def configuration
Sequent.configuration
end

def publish_events(events)
return if configuration.disable_event_handlers
events.each { |event| @events_queue.push(event) }
process_events
end

def process_events
while(!@events_queue.empty?) do
event = @events_queue.pop
configuration.event_handlers.each do |handler|
begin
handler.handle_message event
rescue
raise PublishEventError.new(handler.class, event)
end
end
end
end
end
end
end
32 changes: 5 additions & 27 deletions lib/sequent/core/event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,6 @@ class EventStore
include ActiveRecord::ConnectionAdapters::Quoting
extend Forwardable

class PublishEventError < RuntimeError
attr_reader :event_handler_class, :event

def initialize(event_handler_class, event)
@event_handler_class = event_handler_class
@event = event
end

def message
"Event Handler: #{@event_handler_class.inspect}\nEvent: #{@event.inspect}\nCause: #{cause.inspect}"
end
end

class OptimisticLockingError < RuntimeError
end

Expand Down Expand Up @@ -55,7 +42,7 @@ def initialize(configuration = Sequent.configuration)
#
def commit_events(command, streams_with_events)
store_events(command, streams_with_events)
publish_events(streams_with_events.flat_map { |_, events| events }, event_handlers)
publish_events(streams_with_events.flat_map { |_, events| events })
end

##
Expand Down Expand Up @@ -105,7 +92,7 @@ def stream_exists?(aggregate_id)
def replay_events
warn "[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`"
events = yield.map { |event_hash| deserialize_event(event_hash) }
publish_events(events, event_handlers)
publish_events(events)
end

##
Expand All @@ -123,7 +110,7 @@ def replay_events_from_cursor(block_size: 2000,
ids_replayed = []
cursor.each_row(block_size: block_size).each do |record|
event = deserialize_event(record)
publish_events([event], event_handlers)
publish_events([event])
progress += 1
ids_replayed << record['id']
if progress % block_size == 0
Expand Down Expand Up @@ -190,17 +177,8 @@ def resolve_event_type(event_type)
@event_types.fetch_or_store(event_type) { |k| Class.const_get(k) }
end

def publish_events(events, event_handlers)
return if configuration.disable_event_handlers
event_handlers.each do |handler|
events.each do |event|
begin
handler.handle_message event
rescue
raise PublishEventError.new(handler.class, event)
end
end
end
def publish_events(events)
configuration.event_publisher.publish_events(events)
end

def store_events(command, streams_with_events = [])
Expand Down
69 changes: 69 additions & 0 deletions spec/lib/sequent/core/event_publisher_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
require 'spec_helper'

describe Sequent::Core::EventPublisher do
class OtherAggregateTriggered < Sequent::Core::Event
attrs other_aggregate_id: String
end
class EventAdded < Sequent::Core::Event; end
class TriggerTestCase < Sequent::Core::Command; end
class TriggerOtherAggregate < Sequent::Core::Command; end

class TestAggregate < Sequent::Core::AggregateRoot
def trigger_other_aggregate(aggregate_id)
apply OtherAggregateTriggered, other_aggregate_id: aggregate_id
end

def add_event
apply EventAdded
end
end

class TestCommandHandler < Sequent::Core::BaseCommandHandler
on TriggerTestCase do |command|
agg1 = TestAggregate.new(aggregate_id: Sequent.new_uuid)
agg2 = TestAggregate.new(aggregate_id: Sequent.new_uuid)

agg1.trigger_other_aggregate(agg2.id)
agg2.add_event

repository.add_aggregate(agg1)
repository.add_aggregate(agg2)
end

on TriggerOtherAggregate do |command|
agg = repository.load_aggregate(command.aggregate_id)
agg.add_event
end
end

class TestWorkflow < Sequent::Core::Workflow
on OtherAggregateTriggered do |event|
execute_commands TriggerOtherAggregate.new(aggregate_id: event.other_aggregate_id)
end
end

class TestEventHandler < Sequent::Core::BaseEventHandler
def initialize(*args)
@sequence_numbers = []
super(*args)
end

attr_reader :sequence_numbers

on EventAdded do |event|
@sequence_numbers << event.sequence_number
end
end

it 'handles events in the proper order' do
Sequent::Configuration.reset
test_event_handler = TestEventHandler.new
Sequent.configuration.event_handlers << TestWorkflow.new
Sequent.configuration.event_handlers << test_event_handler
Sequent.configuration.command_handlers << TestCommandHandler.new

Sequent.command_service.execute_commands TriggerTestCase.new(aggregate_id: Sequent.new_uuid)

expect(test_event_handler.sequence_numbers).to eq [1,2]
end
end
2 changes: 1 addition & 1 deletion spec/lib/sequent/core/event_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class FailingHandler < Sequent::Core::BaseEventHandler
end
end

it { is_expected.to be_a(Sequent::Core::EventStore::PublishEventError) }
it { is_expected.to be_a(Sequent::Core::EventPublisher::PublishEventError) }

it 'preserves its cause' do
expect(publish_error.cause).to be_a(FailingHandler::Error)
Expand Down

0 comments on commit 58be6ab

Please sign in to comment.