From 0b0225b96a64aca3269861dff330ecbe58bd49a6 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Mon, 30 Oct 2017 14:30:57 +0100 Subject: [PATCH 01/10] Add queue-based event pub to make sure events are published in order --- lib/sequent/configuration.rb | 4 +- lib/sequent/core/core.rb | 1 + lib/sequent/core/event_publisher.rb | 44 ++++++++++++ lib/sequent/core/event_store.rb | 34 ++------- spec/lib/sequent/core/event_publisher_spec.rb | 69 +++++++++++++++++++ spec/lib/sequent/core/event_store_spec.rb | 2 +- 6 files changed, 124 insertions(+), 30 deletions(-) create mode 100644 lib/sequent/core/event_publisher.rb create mode 100644 spec/lib/sequent/core/event_publisher_spec.rb diff --git a/lib/sequent/configuration.rb b/lib/sequent/configuration.rb index d69e7a6e..342afd56 100644 --- a/lib/sequent/configuration.rb +++ b/lib/sequent/configuration.rb @@ -12,7 +12,8 @@ class Configuration :event_record_class, :stream_record_class, :snapshot_event_class, - :transaction_provider + :transaction_provider, + :event_publisher attr_accessor :command_handlers, :command_filters @@ -47,6 +48,7 @@ def initialize self.snapshot_event_class = Sequent::Core::SnapshotEvent self.transaction_provider = Sequent::Core::Transactions::NoTransactions.new self.uuid_generator = Sequent::Core::RandomUuidGenerator + self.event_publisher = Sequent::Core::EventPublisher.new self.disable_event_handlers = false end diff --git a/lib/sequent/core/core.rb b/lib/sequent/core/core.rb index 1a8e03f5..0bc4d325 100644 --- a/lib/sequent/core/core.rb +++ b/lib/sequent/core/core.rb @@ -16,3 +16,4 @@ require_relative 'aggregate_snapshotter' require_relative 'workflow' require_relative 'random_uuid_generator' +require_relative 'event_publisher' diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb new file mode 100644 index 00000000..aa4dd2bd --- /dev/null +++ b/lib/sequent/core/event_publisher.rb @@ -0,0 +1,44 @@ +module Sequent + module Core + class EventPublisher + class PublishEventError < RuntimeError + attr_reader :event_handler_class, :event + + def initialize(event_handler_class, event) + @event_handler_class = event_handler_class + @event = event + end + + def message + "Event Handler: #{@event_handler_class.inspect}\nEvent: #{@event.inspect}\nCause: #{cause.inspect}" + end + end + + def initialize + @events_queue = Queue.new + end + def configuration + Sequent.configuration + end + + def publish_events(events) + return if configuration.disable_event_handlers + events.each { |event| @events_queue.push(event) } + process_events + end + + def process_events + while(!@events_queue.empty?) do + event = @events_queue.pop + configuration.event_handlers.each do |handler| + begin + handler.handle_message event + rescue + raise PublishEventError.new(handler.class, event) + end + end + end + end + end + end +end diff --git a/lib/sequent/core/event_store.rb b/lib/sequent/core/event_store.rb index 7d55fe6b..34aedd27 100644 --- a/lib/sequent/core/event_store.rb +++ b/lib/sequent/core/event_store.rb @@ -9,19 +9,6 @@ class EventStore include ActiveRecord::ConnectionAdapters::Quoting extend Forwardable - class PublishEventError < RuntimeError - attr_reader :event_handler_class, :event - - def initialize(event_handler_class, event) - @event_handler_class = event_handler_class - @event = event - end - - def message - "Event Handler: #{@event_handler_class.inspect}\nEvent: #{@event.inspect}\nCause: #{cause.inspect}" - end - end - class OptimisticLockingError < RuntimeError end @@ -39,7 +26,7 @@ def message end attr_accessor :configuration - def_delegators :@configuration, :stream_record_class, :event_record_class, :snapshot_event_class, :event_handlers + def_delegators :@configuration, :stream_record_class, :event_record_class, :snapshot_event_class def initialize(configuration = Sequent.configuration) self.configuration = configuration @@ -55,7 +42,7 @@ def initialize(configuration = Sequent.configuration) # def commit_events(command, streams_with_events) store_events(command, streams_with_events) - publish_events(streams_with_events.flat_map { |_, events| events }, event_handlers) + publish_events(streams_with_events.flat_map { |_, events| events }) end ## @@ -105,7 +92,7 @@ def stream_exists?(aggregate_id) def replay_events warn "[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`" events = yield.map { |event_hash| deserialize_event(event_hash) } - publish_events(events, event_handlers) + publish_events(events) end ## @@ -123,7 +110,7 @@ def replay_events_from_cursor(block_size: 2000, ids_replayed = [] cursor.each_row(block_size: block_size).each do |record| event = deserialize_event(record) - publish_events([event], event_handlers) + publish_events([event]) progress += 1 ids_replayed << record['id'] if progress % block_size == 0 @@ -190,17 +177,8 @@ def resolve_event_type(event_type) @event_types.fetch_or_store(event_type) { |k| Class.const_get(k) } end - def publish_events(events, event_handlers) - return if configuration.disable_event_handlers - event_handlers.each do |handler| - events.each do |event| - begin - handler.handle_message event - rescue - raise PublishEventError.new(handler.class, event) - end - end - end + def publish_events(events) + configuration.event_publisher.publish_events(events) end def store_events(command, streams_with_events = []) diff --git a/spec/lib/sequent/core/event_publisher_spec.rb b/spec/lib/sequent/core/event_publisher_spec.rb new file mode 100644 index 00000000..daceed23 --- /dev/null +++ b/spec/lib/sequent/core/event_publisher_spec.rb @@ -0,0 +1,69 @@ +require 'spec_helper' + +describe Sequent::Core::EventPublisher do + class OtherAggregateTriggered < Sequent::Core::Event + attrs other_aggregate_id: String + end + class EventAdded < Sequent::Core::Event; end + class TriggerTestCase < Sequent::Core::Command; end + class TriggerOtherAggregate < Sequent::Core::Command; end + + class TestAggregate < Sequent::Core::AggregateRoot + def trigger_other_aggregate(aggregate_id) + apply OtherAggregateTriggered, other_aggregate_id: aggregate_id + end + + def add_event + apply EventAdded + end + end + + class TestCommandHandler < Sequent::Core::BaseCommandHandler + on TriggerTestCase do |command| + agg1 = TestAggregate.new(aggregate_id: Sequent.new_uuid) + agg2 = TestAggregate.new(aggregate_id: Sequent.new_uuid) + + agg1.trigger_other_aggregate(agg2.id) + agg2.add_event + + repository.add_aggregate(agg1) + repository.add_aggregate(agg2) + end + + on TriggerOtherAggregate do |command| + agg = repository.load_aggregate(command.aggregate_id) + agg.add_event + end + end + + class TestWorkflow < Sequent::Core::Workflow + on OtherAggregateTriggered do |event| + execute_commands TriggerOtherAggregate.new(aggregate_id: event.other_aggregate_id) + end + end + + class TestEventHandler < Sequent::Core::BaseEventHandler + def initialize(*args) + @sequence_numbers = [] + super(*args) + end + + attr_reader :sequence_numbers + + on EventAdded do |event| + @sequence_numbers << event.sequence_number + end + end + + it 'handles events in the proper order' do + Sequent::Configuration.reset + test_event_handler = TestEventHandler.new + Sequent.configuration.event_handlers << TestWorkflow.new + Sequent.configuration.event_handlers << test_event_handler + Sequent.configuration.command_handlers << TestCommandHandler.new + + Sequent.command_service.execute_commands TriggerTestCase.new(aggregate_id: Sequent.new_uuid) + + expect(test_event_handler.sequence_numbers).to eq [1,2] + end +end diff --git a/spec/lib/sequent/core/event_store_spec.rb b/spec/lib/sequent/core/event_store_spec.rb index ace44a00..3269171e 100644 --- a/spec/lib/sequent/core/event_store_spec.rb +++ b/spec/lib/sequent/core/event_store_spec.rb @@ -244,7 +244,7 @@ class FailingHandler < Sequent::Core::BaseEventHandler end end - it { is_expected.to be_a(Sequent::Core::EventStore::PublishEventError) } + it { is_expected.to be_a(Sequent::Core::EventPublisher::PublishEventError) } it 'preserves its cause' do expect(publish_error.cause).to be_a(FailingHandler::Error) From b2e0441fe10318950bd0a0c20bb133696b799208 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Mon, 30 Oct 2017 14:45:18 +0100 Subject: [PATCH 02/10] Only process events at the highest level (don't enter this loop twice) --- lib/sequent/core/event_publisher.rb | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb index aa4dd2bd..b9c09893 100644 --- a/lib/sequent/core/event_publisher.rb +++ b/lib/sequent/core/event_publisher.rb @@ -16,9 +16,7 @@ def message def initialize @events_queue = Queue.new - end - def configuration - Sequent.configuration + @mutex = Mutex.new end def publish_events(events) @@ -27,18 +25,29 @@ def publish_events(events) process_events end + private + def process_events - while(!@events_queue.empty?) do - event = @events_queue.pop - configuration.event_handlers.each do |handler| - begin - handler.handle_message event - rescue - raise PublishEventError.new(handler.class, event) + # only process events at the highest level + return if @mutex.locked? + + @mutex.synchronize do + while(!@events_queue.empty?) do + event = @events_queue.pop + configuration.event_handlers.each do |handler| + begin + handler.handle_message event + rescue + raise PublishEventError.new(handler.class, event) + end end end end end + + def configuration + Sequent.configuration + end end end end From 1de7799143765e89edf084b1d9d082749350aba1 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Mon, 30 Oct 2017 15:00:33 +0100 Subject: [PATCH 03/10] Synchronize events within a single thread --- lib/sequent/core/event_publisher.rb | 39 ++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb index b9c09893..d2581e1d 100644 --- a/lib/sequent/core/event_publisher.rb +++ b/lib/sequent/core/event_publisher.rb @@ -1,6 +1,13 @@ module Sequent module Core class EventPublisher + # + # EventPublisher ensures that, for every thread, events will be published in the order in which they are meant to be published. + # + # This potentially introduces a wrinkle into your plans: You therefore should not split a "unit of work" across multiple threads. + # + # If you do not want this, you are free to implement your own version of EventPublisher and configure sequent to use it. + # class PublishEventError < RuntimeError attr_reader :event_handler_class, :event @@ -14,26 +21,34 @@ def message end end - def initialize - @events_queue = Queue.new - @mutex = Mutex.new - end - def publish_events(events) return if configuration.disable_event_handlers - events.each { |event| @events_queue.push(event) } + events.each { |event| events_queue.push(event) } process_events end private - def process_events - # only process events at the highest level - return if @mutex.locked? + def events_queue + Thread.current[:events_queue] ||= Queue.new + end + + def skip_if_locked(&block) + Thread.current[:events_queue_locked] = false if Thread.current[:events_queue_locked].nil? - @mutex.synchronize do - while(!@events_queue.empty?) do - event = @events_queue.pop + return if Thread.current[:events_queue_locked] + + Thread.current[:events_queue_locked] = true + + block.yield + ensure + Thread.current[:events_queue_locked] = false + end + + def process_events + skip_if_locked do + while(!events_queue.empty?) do + event = events_queue.pop configuration.event_handlers.each do |handler| begin handler.handle_message event From 331dc5efef81fd47532493139a25e61abbb6f31a Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Mon, 30 Oct 2017 15:14:09 +0100 Subject: [PATCH 04/10] Make name more descriptive --- lib/sequent/core/event_publisher.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb index d2581e1d..5fd249a3 100644 --- a/lib/sequent/core/event_publisher.rb +++ b/lib/sequent/core/event_publisher.rb @@ -33,7 +33,7 @@ def events_queue Thread.current[:events_queue] ||= Queue.new end - def skip_if_locked(&block) + def skip_if_already_processing(&block) Thread.current[:events_queue_locked] = false if Thread.current[:events_queue_locked].nil? return if Thread.current[:events_queue_locked] @@ -46,7 +46,7 @@ def skip_if_locked(&block) end def process_events - skip_if_locked do + skip_if_already_processing do while(!events_queue.empty?) do event = events_queue.pop configuration.event_handlers.each do |handler| From 959fe1f6ae72306c2e8dce983a75060e05f26868 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Mon, 30 Oct 2017 15:15:03 +0100 Subject: [PATCH 05/10] Comment on the right level --- lib/sequent/core/event_publisher.rb | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb index 5fd249a3..4aae443d 100644 --- a/lib/sequent/core/event_publisher.rb +++ b/lib/sequent/core/event_publisher.rb @@ -1,13 +1,13 @@ module Sequent module Core + # + # EventPublisher ensures that, for every thread, events will be published in the order in which they are meant to be published. + # + # This potentially introduces a wrinkle into your plans: You therefore should not split a "unit of work" across multiple threads. + # + # If you do not want this, you are free to implement your own version of EventPublisher and configure sequent to use it. + # class EventPublisher - # - # EventPublisher ensures that, for every thread, events will be published in the order in which they are meant to be published. - # - # This potentially introduces a wrinkle into your plans: You therefore should not split a "unit of work" across multiple threads. - # - # If you do not want this, you are free to implement your own version of EventPublisher and configure sequent to use it. - # class PublishEventError < RuntimeError attr_reader :event_handler_class, :event From c3d4d6d95460e128c0c6fe98fc8fb50227d11460 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Mon, 30 Oct 2017 15:31:40 +0100 Subject: [PATCH 06/10] Clear queue if there is a problem --- lib/sequent/core/event_publisher.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb index 4aae443d..10ef8103 100644 --- a/lib/sequent/core/event_publisher.rb +++ b/lib/sequent/core/event_publisher.rb @@ -53,6 +53,7 @@ def process_events begin handler.handle_message event rescue + events_queue.clear raise PublishEventError.new(handler.class, event) end end From 4b3c5cb6c5b937d50830490678bf5a96bf18f3af Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Mon, 30 Oct 2017 17:11:03 +0100 Subject: [PATCH 07/10] Process additional commands after the current command has finished --- lib/sequent/core/command_service.rb | 42 ++++++++++++------- lib/sequent/core/event_publisher.rb | 14 +------ lib/sequent/sequent.rb | 1 + .../util/skip_if_already_processing.rb | 15 +++++++ lib/sequent/util/util.rb | 1 + 5 files changed, 46 insertions(+), 27 deletions(-) create mode 100644 lib/sequent/util/skip_if_already_processing.rb create mode 100644 lib/sequent/util/util.rb diff --git a/lib/sequent/core/command_service.rb b/lib/sequent/core/command_service.rb index 90e6af26..05d020b6 100644 --- a/lib/sequent/core/command_service.rb +++ b/lib/sequent/core/command_service.rb @@ -34,21 +34,10 @@ def initialize(configuration) # * If the command is valid all +command_handlers+ that +handles_message?+ is invoked # * The +repository+ commits the command and all uncommitted_events resulting from the command def execute_commands(*commands) - begin - transaction_provider.transactional do - commands.each do |command| - filters.each { |filter| filter.execute(command) } - - raise CommandNotValid.new(command) unless command.valid? - parsed_command = command.parse_attrs_to_correct_types - command_handlers.select { |h| h.class.handles_message?(parsed_command) }.each { |h| h.handle_message parsed_command } - repository.commit(parsed_command) - end - end - ensure - repository.clear + commands.each do |command| + command_queue.push(command) end - + process_commands end def remove_event_handler(clazz) @@ -57,6 +46,31 @@ def remove_event_handler(clazz) private + def process_commands + Sequent::Util.skip_if_already_processing(:command_service_process_commands) do + begin + transaction_provider.transactional do + while(!command_queue.empty?) do + command = command_queue.pop + filters.each { |filter| filter.execute(command) } + + raise CommandNotValid.new(command) unless command.valid? + parsed_command = command.parse_attrs_to_correct_types + command_handlers.select { |h| h.class.handles_message?(parsed_command) }.each { |h| h.handle_message parsed_command } + repository.commit(parsed_command) + end + end + ensure + command_queue.clear + repository.clear + end + end + end + + def command_queue + Thread.current[:command_service_commands] ||= Queue.new + end + def event_store configuration.event_store end diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb index 10ef8103..f93a5adc 100644 --- a/lib/sequent/core/event_publisher.rb +++ b/lib/sequent/core/event_publisher.rb @@ -33,20 +33,8 @@ def events_queue Thread.current[:events_queue] ||= Queue.new end - def skip_if_already_processing(&block) - Thread.current[:events_queue_locked] = false if Thread.current[:events_queue_locked].nil? - - return if Thread.current[:events_queue_locked] - - Thread.current[:events_queue_locked] = true - - block.yield - ensure - Thread.current[:events_queue_locked] = false - end - def process_events - skip_if_already_processing do + Sequent::Util.skip_if_already_processing(:events_queue_lock) do while(!events_queue.empty?) do event = events_queue.pop configuration.event_handlers.each do |handler| diff --git a/lib/sequent/sequent.rb b/lib/sequent/sequent.rb index 437ea9a4..b0b37658 100644 --- a/lib/sequent/sequent.rb +++ b/lib/sequent/sequent.rb @@ -1,4 +1,5 @@ require_relative 'core/core' +require_relative 'util/util' require_relative 'migrations/migrations' require_relative 'configuration' diff --git a/lib/sequent/util/skip_if_already_processing.rb b/lib/sequent/util/skip_if_already_processing.rb new file mode 100644 index 00000000..7ea685dd --- /dev/null +++ b/lib/sequent/util/skip_if_already_processing.rb @@ -0,0 +1,15 @@ +module Sequent + module Util + def self.skip_if_already_processing(already_processing_key, &block) + return if Thread.current[already_processing_key] + + begin + Thread.current[already_processing_key] = true + + block.yield + ensure + Thread.current[already_processing_key] = nil + end + end + end +end diff --git a/lib/sequent/util/util.rb b/lib/sequent/util/util.rb new file mode 100644 index 00000000..214b0326 --- /dev/null +++ b/lib/sequent/util/util.rb @@ -0,0 +1 @@ +require_relative 'skip_if_already_processing' From 4e2453e9b8d5adb277b1102371b548b8b15e53e3 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 1 Nov 2017 09:36:20 +0100 Subject: [PATCH 08/10] Cosmetic changes --- lib/sequent/core/command_service.rb | 4 +--- lib/sequent/core/event_publisher.rb | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/lib/sequent/core/command_service.rb b/lib/sequent/core/command_service.rb index 05d020b6..a6dc246a 100644 --- a/lib/sequent/core/command_service.rb +++ b/lib/sequent/core/command_service.rb @@ -34,9 +34,7 @@ def initialize(configuration) # * If the command is valid all +command_handlers+ that +handles_message?+ is invoked # * The +repository+ commits the command and all uncommitted_events resulting from the command def execute_commands(*commands) - commands.each do |command| - command_queue.push(command) - end + commands.each { |command| command_queue.push(command) } process_commands end diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb index f93a5adc..aad56259 100644 --- a/lib/sequent/core/event_publisher.rb +++ b/lib/sequent/core/event_publisher.rb @@ -1,11 +1,11 @@ module Sequent module Core # - # EventPublisher ensures that, for every thread, events will be published in the order in which they are meant to be published. + # EventPublisher ensures that, for every thread, events will be published in the order in which they are queued for publishing. # # This potentially introduces a wrinkle into your plans: You therefore should not split a "unit of work" across multiple threads. # - # If you do not want this, you are free to implement your own version of EventPublisher and configure sequent to use it. + # If you want other behaviour, you are free to implement your own version of EventPublisher and configure Sequent to use it. # class EventPublisher class PublishEventError < RuntimeError From ed7d206de5b81060f2ede0427fdb429d282a0a72 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 1 Nov 2017 09:59:00 +0100 Subject: [PATCH 09/10] efactor code to make it easier to extract functionality at a later date --- lib/sequent/core/command_service.rb | 17 ++++++++++------- lib/sequent/core/event_publisher.rb | 24 +++++++++++++++--------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/lib/sequent/core/command_service.rb b/lib/sequent/core/command_service.rb index a6dc246a..d05015a4 100644 --- a/lib/sequent/core/command_service.rb +++ b/lib/sequent/core/command_service.rb @@ -49,13 +49,7 @@ def process_commands begin transaction_provider.transactional do while(!command_queue.empty?) do - command = command_queue.pop - filters.each { |filter| filter.execute(command) } - - raise CommandNotValid.new(command) unless command.valid? - parsed_command = command.parse_attrs_to_correct_types - command_handlers.select { |h| h.class.handles_message?(parsed_command) }.each { |h| h.handle_message parsed_command } - repository.commit(parsed_command) + process_command(command_queue.pop) end end ensure @@ -65,6 +59,15 @@ def process_commands end end + def process_command(command) + filters.each { |filter| filter.execute(command) } + + raise CommandNotValid.new(command) unless command.valid? + parsed_command = command.parse_attrs_to_correct_types + command_handlers.select { |h| h.class.handles_message?(parsed_command) }.each { |h| h.handle_message parsed_command } + repository.commit(parsed_command) + end + def command_queue Thread.current[:command_service_commands] ||= Queue.new end diff --git a/lib/sequent/core/event_publisher.rb b/lib/sequent/core/event_publisher.rb index aad56259..3607db73 100644 --- a/lib/sequent/core/event_publisher.rb +++ b/lib/sequent/core/event_publisher.rb @@ -35,16 +35,22 @@ def events_queue def process_events Sequent::Util.skip_if_already_processing(:events_queue_lock) do - while(!events_queue.empty?) do - event = events_queue.pop - configuration.event_handlers.each do |handler| - begin - handler.handle_message event - rescue - events_queue.clear - raise PublishEventError.new(handler.class, event) - end + begin + while(!events_queue.empty?) do + process_event(events_queue.pop) end + ensure + events_queue.clear + end + end + end + + def process_event(event) + configuration.event_handlers.each do |handler| + begin + handler.handle_message event + rescue + raise PublishEventError.new(handler.class, event) end end end From 1d55cc48ec88d5a870e034e8af0732f84f718148 Mon Sep 17 00:00:00 2001 From: Derek Kraan Date: Wed, 1 Nov 2017 09:59:05 +0100 Subject: [PATCH 10/10] Require 'tmpdir' --- spec/lib/sequent/support/database_spec.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/spec/lib/sequent/support/database_spec.rb b/spec/lib/sequent/support/database_spec.rb index 03e20db8..9799f668 100644 --- a/spec/lib/sequent/support/database_spec.rb +++ b/spec/lib/sequent/support/database_spec.rb @@ -1,4 +1,5 @@ require 'spec_helper' +require 'tmpdir' require 'sequent/support'