diff --git a/lib/sequent/configuration.rb b/lib/sequent/configuration.rb index d69e7a6e..342afd56 100644 --- a/lib/sequent/configuration.rb +++ b/lib/sequent/configuration.rb @@ -12,7 +12,8 @@ class Configuration :event_record_class, :stream_record_class, :snapshot_event_class, - :transaction_provider + :transaction_provider, + :event_publisher attr_accessor :command_handlers, :command_filters @@ -47,6 +48,7 @@ def initialize self.snapshot_event_class = Sequent::Core::SnapshotEvent self.transaction_provider = Sequent::Core::Transactions::NoTransactions.new self.uuid_generator = Sequent::Core::RandomUuidGenerator + self.event_publisher = Sequent::Core::EventPublisher.new self.disable_event_handlers = false end diff --git a/lib/sequent/core/command_service.rb b/lib/sequent/core/command_service.rb index 90e6af26..d05015a4 100644 --- a/lib/sequent/core/command_service.rb +++ b/lib/sequent/core/command_service.rb @@ -34,21 +34,8 @@ def initialize(configuration) # * If the command is valid all +command_handlers+ that +handles_message?+ is invoked # * The +repository+ commits the command and all uncommitted_events resulting from the command def execute_commands(*commands) - begin - transaction_provider.transactional do - commands.each do |command| - filters.each { |filter| filter.execute(command) } - - raise CommandNotValid.new(command) unless command.valid? - parsed_command = command.parse_attrs_to_correct_types - command_handlers.select { |h| h.class.handles_message?(parsed_command) }.each { |h| h.handle_message parsed_command } - repository.commit(parsed_command) - end - end - ensure - repository.clear - end - + commands.each { |command| command_queue.push(command) } + process_commands end def remove_event_handler(clazz) @@ -57,6 +44,34 @@ def remove_event_handler(clazz) private + def process_commands + Sequent::Util.skip_if_already_processing(:command_service_process_commands) do + begin + transaction_provider.transactional do + while(!command_queue.empty?) do + process_command(command_queue.pop) + end + end + ensure + command_queue.clear + repository.clear + end + end + end + + def process_command(command) + filters.each { |filter| filter.execute(command) } + + raise CommandNotValid.new(command) unless command.valid? + parsed_command = command.parse_attrs_to_correct_types + command_handlers.select { |h| h.class.handles_message?(parsed_command) }.each { |h| h.handle_message parsed_command } + repository.commit(parsed_command) + end + + def command_queue + Thread.current[:command_service_commands] ||= Queue.new + end + def event_store configuration.event_store end diff --git a/lib/sequent/core/core.rb b/lib/sequent/core/core.rb index 1a8e03f5..0bc4d325 100644 --- a/lib/sequent/core/core.rb +++ b/lib/sequent/core/core.rb @@ -16,3 +16,4 @@ require_relative 'aggregate_snapshotter' require_relative 'workflow' require_relative 'random_uuid_generator' +require_relative 'event_publisher' diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb new file mode 100644 index 00000000..3607db73 --- /dev/null +++ b/lib/sequent/core/event_publisher.rb @@ -0,0 +1,63 @@ +module Sequent + module Core + # + # EventPublisher ensures that, for every thread, events will be published in the order in which they are queued for publishing. + # + # This potentially introduces a wrinkle into your plans: You therefore should not split a "unit of work" across multiple threads. + # + # If you want other behaviour, you are free to implement your own version of EventPublisher and configure Sequent to use it. + # + class EventPublisher + class PublishEventError < RuntimeError + attr_reader :event_handler_class, :event + + def initialize(event_handler_class, event) + @event_handler_class = event_handler_class + @event = event + end + + def message + "Event Handler: #{@event_handler_class.inspect}\nEvent: #{@event.inspect}\nCause: #{cause.inspect}" + end + end + + def publish_events(events) + return if configuration.disable_event_handlers + events.each { |event| events_queue.push(event) } + process_events + end + + private + + def events_queue + Thread.current[:events_queue] ||= Queue.new + end + + def process_events + Sequent::Util.skip_if_already_processing(:events_queue_lock) do + begin + while(!events_queue.empty?) do + process_event(events_queue.pop) + end + ensure + events_queue.clear + end + end + end + + def process_event(event) + configuration.event_handlers.each do |handler| + begin + handler.handle_message event + rescue + raise PublishEventError.new(handler.class, event) + end + end + end + + def configuration + Sequent.configuration + end + end + end +end diff --git a/lib/sequent/core/event_store.rb b/lib/sequent/core/event_store.rb index 7d55fe6b..34aedd27 100644 --- a/lib/sequent/core/event_store.rb +++ b/lib/sequent/core/event_store.rb @@ -9,19 +9,6 @@ class EventStore include ActiveRecord::ConnectionAdapters::Quoting extend Forwardable - class PublishEventError < RuntimeError - attr_reader :event_handler_class, :event - - def initialize(event_handler_class, event) - @event_handler_class = event_handler_class - @event = event - end - - def message - "Event Handler: #{@event_handler_class.inspect}\nEvent: #{@event.inspect}\nCause: #{cause.inspect}" - end - end - class OptimisticLockingError < RuntimeError end @@ -39,7 +26,7 @@ def message end attr_accessor :configuration - def_delegators :@configuration, :stream_record_class, :event_record_class, :snapshot_event_class, :event_handlers + def_delegators :@configuration, :stream_record_class, :event_record_class, :snapshot_event_class def initialize(configuration = Sequent.configuration) self.configuration = configuration @@ -55,7 +42,7 @@ def initialize(configuration = Sequent.configuration) # def commit_events(command, streams_with_events) store_events(command, streams_with_events) - publish_events(streams_with_events.flat_map { |_, events| events }, event_handlers) + publish_events(streams_with_events.flat_map { |_, events| events }) end ## @@ -105,7 +92,7 @@ def stream_exists?(aggregate_id) def replay_events warn "[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`" events = yield.map { |event_hash| deserialize_event(event_hash) } - publish_events(events, event_handlers) + publish_events(events) end ## @@ -123,7 +110,7 @@ def replay_events_from_cursor(block_size: 2000, ids_replayed = [] cursor.each_row(block_size: block_size).each do |record| event = deserialize_event(record) - publish_events([event], event_handlers) + publish_events([event]) progress += 1 ids_replayed << record['id'] if progress % block_size == 0 @@ -190,17 +177,8 @@ def resolve_event_type(event_type) @event_types.fetch_or_store(event_type) { |k| Class.const_get(k) } end - def publish_events(events, event_handlers) - return if configuration.disable_event_handlers - event_handlers.each do |handler| - events.each do |event| - begin - handler.handle_message event - rescue - raise PublishEventError.new(handler.class, event) - end - end - end + def publish_events(events) + configuration.event_publisher.publish_events(events) end def store_events(command, streams_with_events = []) diff --git a/lib/sequent/sequent.rb b/lib/sequent/sequent.rb index 437ea9a4..b0b37658 100644 --- a/lib/sequent/sequent.rb +++ b/lib/sequent/sequent.rb @@ -1,4 +1,5 @@ require_relative 'core/core' +require_relative 'util/util' require_relative 'migrations/migrations' require_relative 'configuration' diff --git a/lib/sequent/util/skip_if_already_processing.rb b/lib/sequent/util/skip_if_already_processing.rb new file mode 100644 index 00000000..7ea685dd --- /dev/null +++ b/lib/sequent/util/skip_if_already_processing.rb @@ -0,0 +1,15 @@ +module Sequent + module Util + def self.skip_if_already_processing(already_processing_key, &block) + return if Thread.current[already_processing_key] + + begin + Thread.current[already_processing_key] = true + + block.yield + ensure + Thread.current[already_processing_key] = nil + end + end + end +end diff --git a/lib/sequent/util/util.rb b/lib/sequent/util/util.rb new file mode 100644 index 00000000..214b0326 --- /dev/null +++ b/lib/sequent/util/util.rb @@ -0,0 +1 @@ +require_relative 'skip_if_already_processing' diff --git a/spec/lib/sequent/core/event_publisher_spec.rb b/spec/lib/sequent/core/event_publisher_spec.rb new file mode 100644 index 00000000..daceed23 --- /dev/null +++ b/spec/lib/sequent/core/event_publisher_spec.rb @@ -0,0 +1,69 @@ +require 'spec_helper' + +describe Sequent::Core::EventPublisher do + class OtherAggregateTriggered < Sequent::Core::Event + attrs other_aggregate_id: String + end + class EventAdded < Sequent::Core::Event; end + class TriggerTestCase < Sequent::Core::Command; end + class TriggerOtherAggregate < Sequent::Core::Command; end + + class TestAggregate < Sequent::Core::AggregateRoot + def trigger_other_aggregate(aggregate_id) + apply OtherAggregateTriggered, other_aggregate_id: aggregate_id + end + + def add_event + apply EventAdded + end + end + + class TestCommandHandler < Sequent::Core::BaseCommandHandler + on TriggerTestCase do |command| + agg1 = TestAggregate.new(aggregate_id: Sequent.new_uuid) + agg2 = TestAggregate.new(aggregate_id: Sequent.new_uuid) + + agg1.trigger_other_aggregate(agg2.id) + agg2.add_event + + repository.add_aggregate(agg1) + repository.add_aggregate(agg2) + end + + on TriggerOtherAggregate do |command| + agg = repository.load_aggregate(command.aggregate_id) + agg.add_event + end + end + + class TestWorkflow < Sequent::Core::Workflow + on OtherAggregateTriggered do |event| + execute_commands TriggerOtherAggregate.new(aggregate_id: event.other_aggregate_id) + end + end + + class TestEventHandler < Sequent::Core::BaseEventHandler + def initialize(*args) + @sequence_numbers = [] + super(*args) + end + + attr_reader :sequence_numbers + + on EventAdded do |event| + @sequence_numbers << event.sequence_number + end + end + + it 'handles events in the proper order' do + Sequent::Configuration.reset + test_event_handler = TestEventHandler.new + Sequent.configuration.event_handlers << TestWorkflow.new + Sequent.configuration.event_handlers << test_event_handler + Sequent.configuration.command_handlers << TestCommandHandler.new + + Sequent.command_service.execute_commands TriggerTestCase.new(aggregate_id: Sequent.new_uuid) + + expect(test_event_handler.sequence_numbers).to eq [1,2] + end +end diff --git a/spec/lib/sequent/core/event_store_spec.rb b/spec/lib/sequent/core/event_store_spec.rb index ace44a00..3269171e 100644 --- a/spec/lib/sequent/core/event_store_spec.rb +++ b/spec/lib/sequent/core/event_store_spec.rb @@ -244,7 +244,7 @@ class FailingHandler < Sequent::Core::BaseEventHandler end end - it { is_expected.to be_a(Sequent::Core::EventStore::PublishEventError) } + it { is_expected.to be_a(Sequent::Core::EventPublisher::PublishEventError) } it 'preserves its cause' do expect(publish_error.cause).to be_a(FailingHandler::Error) diff --git a/spec/lib/sequent/support/database_spec.rb b/spec/lib/sequent/support/database_spec.rb index 03e20db8..9799f668 100644 --- a/spec/lib/sequent/support/database_spec.rb +++ b/spec/lib/sequent/support/database_spec.rb @@ -1,4 +1,5 @@ require 'spec_helper' +require 'tmpdir' require 'sequent/support'