diff --git a/lib/sequent/configuration.rb b/lib/sequent/configuration.rb index d69e7a6e..342afd56 100644 --- a/lib/sequent/configuration.rb +++ b/lib/sequent/configuration.rb @@ -12,7 +12,8 @@ class Configuration :event_record_class, :stream_record_class, :snapshot_event_class, - :transaction_provider + :transaction_provider, + :event_publisher attr_accessor :command_handlers, :command_filters @@ -47,6 +48,7 @@ def initialize self.snapshot_event_class = Sequent::Core::SnapshotEvent self.transaction_provider = Sequent::Core::Transactions::NoTransactions.new self.uuid_generator = Sequent::Core::RandomUuidGenerator + self.event_publisher = Sequent::Core::EventPublisher.new self.disable_event_handlers = false end diff --git a/lib/sequent/core/core.rb b/lib/sequent/core/core.rb index 1a8e03f5..0bc4d325 100644 --- a/lib/sequent/core/core.rb +++ b/lib/sequent/core/core.rb @@ -16,3 +16,4 @@ require_relative 'aggregate_snapshotter' require_relative 'workflow' require_relative 'random_uuid_generator' +require_relative 'event_publisher' diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb new file mode 100644 index 00000000..aa4dd2bd --- /dev/null +++ b/lib/sequent/core/event_publisher.rb @@ -0,0 +1,44 @@ +module Sequent + module Core + class EventPublisher + class PublishEventError < RuntimeError + attr_reader :event_handler_class, :event + + def initialize(event_handler_class, event) + @event_handler_class = event_handler_class + @event = event + end + + def message + "Event Handler: #{@event_handler_class.inspect}\nEvent: #{@event.inspect}\nCause: #{cause.inspect}" + end + end + + def initialize + @events_queue = Queue.new + end + def configuration + Sequent.configuration + end + + def publish_events(events) + return if configuration.disable_event_handlers + events.each { |event| @events_queue.push(event) } + process_events + end + + def process_events + while(!@events_queue.empty?) do + event = @events_queue.pop + configuration.event_handlers.each do |handler| + begin + handler.handle_message event + rescue + raise PublishEventError.new(handler.class, event) + end + end + end + end + end + end +end diff --git a/lib/sequent/core/event_store.rb b/lib/sequent/core/event_store.rb index 7d55fe6b..0cbfec69 100644 --- a/lib/sequent/core/event_store.rb +++ b/lib/sequent/core/event_store.rb @@ -9,19 +9,6 @@ class EventStore include ActiveRecord::ConnectionAdapters::Quoting extend Forwardable - class PublishEventError < RuntimeError - attr_reader :event_handler_class, :event - - def initialize(event_handler_class, event) - @event_handler_class = event_handler_class - @event = event - end - - def message - "Event Handler: #{@event_handler_class.inspect}\nEvent: #{@event.inspect}\nCause: #{cause.inspect}" - end - end - class OptimisticLockingError < RuntimeError end @@ -55,7 +42,7 @@ def initialize(configuration = Sequent.configuration) # def commit_events(command, streams_with_events) store_events(command, streams_with_events) - publish_events(streams_with_events.flat_map { |_, events| events }, event_handlers) + publish_events(streams_with_events.flat_map { |_, events| events }) end ## @@ -105,7 +92,7 @@ def stream_exists?(aggregate_id) def replay_events warn "[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`" events = yield.map { |event_hash| deserialize_event(event_hash) } - publish_events(events, event_handlers) + publish_events(events) end ## @@ -123,7 +110,7 @@ def replay_events_from_cursor(block_size: 2000, ids_replayed = [] cursor.each_row(block_size: block_size).each do |record| event = deserialize_event(record) - publish_events([event], event_handlers) + publish_events([event]) progress += 1 ids_replayed << record['id'] if progress % block_size == 0 @@ -190,17 +177,8 @@ def resolve_event_type(event_type) @event_types.fetch_or_store(event_type) { |k| Class.const_get(k) } end - def publish_events(events, event_handlers) - return if configuration.disable_event_handlers - event_handlers.each do |handler| - events.each do |event| - begin - handler.handle_message event - rescue - raise PublishEventError.new(handler.class, event) - end - end - end + def publish_events(events) + configuration.event_publisher.publish_events(events) end def store_events(command, streams_with_events = []) diff --git a/spec/lib/sequent/core/event_publisher_spec.rb b/spec/lib/sequent/core/event_publisher_spec.rb new file mode 100644 index 00000000..daceed23 --- /dev/null +++ b/spec/lib/sequent/core/event_publisher_spec.rb @@ -0,0 +1,69 @@ +require 'spec_helper' + +describe Sequent::Core::EventPublisher do + class OtherAggregateTriggered < Sequent::Core::Event + attrs other_aggregate_id: String + end + class EventAdded < Sequent::Core::Event; end + class TriggerTestCase < Sequent::Core::Command; end + class TriggerOtherAggregate < Sequent::Core::Command; end + + class TestAggregate < Sequent::Core::AggregateRoot + def trigger_other_aggregate(aggregate_id) + apply OtherAggregateTriggered, other_aggregate_id: aggregate_id + end + + def add_event + apply EventAdded + end + end + + class TestCommandHandler < Sequent::Core::BaseCommandHandler + on TriggerTestCase do |command| + agg1 = TestAggregate.new(aggregate_id: Sequent.new_uuid) + agg2 = TestAggregate.new(aggregate_id: Sequent.new_uuid) + + agg1.trigger_other_aggregate(agg2.id) + agg2.add_event + + repository.add_aggregate(agg1) + repository.add_aggregate(agg2) + end + + on TriggerOtherAggregate do |command| + agg = repository.load_aggregate(command.aggregate_id) + agg.add_event + end + end + + class TestWorkflow < Sequent::Core::Workflow + on OtherAggregateTriggered do |event| + execute_commands TriggerOtherAggregate.new(aggregate_id: event.other_aggregate_id) + end + end + + class TestEventHandler < Sequent::Core::BaseEventHandler + def initialize(*args) + @sequence_numbers = [] + super(*args) + end + + attr_reader :sequence_numbers + + on EventAdded do |event| + @sequence_numbers << event.sequence_number + end + end + + it 'handles events in the proper order' do + Sequent::Configuration.reset + test_event_handler = TestEventHandler.new + Sequent.configuration.event_handlers << TestWorkflow.new + Sequent.configuration.event_handlers << test_event_handler + Sequent.configuration.command_handlers << TestCommandHandler.new + + Sequent.command_service.execute_commands TriggerTestCase.new(aggregate_id: Sequent.new_uuid) + + expect(test_event_handler.sequence_numbers).to eq [1,2] + end +end diff --git a/spec/lib/sequent/core/event_store_spec.rb b/spec/lib/sequent/core/event_store_spec.rb index ace44a00..3269171e 100644 --- a/spec/lib/sequent/core/event_store_spec.rb +++ b/spec/lib/sequent/core/event_store_spec.rb @@ -244,7 +244,7 @@ class FailingHandler < Sequent::Core::BaseEventHandler end end - it { is_expected.to be_a(Sequent::Core::EventStore::PublishEventError) } + it { is_expected.to be_a(Sequent::Core::EventPublisher::PublishEventError) } it 'preserves its cause' do expect(publish_error.cause).to be_a(FailingHandler::Error)