Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add queue-based event pub to make sure events are published in order #97

Merged
merged 10 commits into from
Nov 2, 2017
4 changes: 3 additions & 1 deletion lib/sequent/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ class Configuration
:event_record_class,
:stream_record_class,
:snapshot_event_class,
:transaction_provider
:transaction_provider,
:event_publisher

attr_accessor :command_handlers,
:command_filters
Expand Down Expand Up @@ -47,6 +48,7 @@ def initialize
self.snapshot_event_class = Sequent::Core::SnapshotEvent
self.transaction_provider = Sequent::Core::Transactions::NoTransactions.new
self.uuid_generator = Sequent::Core::RandomUuidGenerator
self.event_publisher = Sequent::Core::EventPublisher.new
self.disable_event_handlers = false
end

Expand Down
45 changes: 30 additions & 15 deletions lib/sequent/core/command_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,8 @@ def initialize(configuration)
# * If the command is valid all +command_handlers+ that +handles_message?+ is invoked
# * The +repository+ commits the command and all uncommitted_events resulting from the command
def execute_commands(*commands)
begin
transaction_provider.transactional do
commands.each do |command|
filters.each { |filter| filter.execute(command) }

raise CommandNotValid.new(command) unless command.valid?
parsed_command = command.parse_attrs_to_correct_types
command_handlers.select { |h| h.class.handles_message?(parsed_command) }.each { |h| h.handle_message parsed_command }
repository.commit(parsed_command)
end
end
ensure
repository.clear
end

commands.each { |command| command_queue.push(command) }
process_commands
end

def remove_event_handler(clazz)
Expand All @@ -57,6 +44,34 @@ def remove_event_handler(clazz)

private

def process_commands
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract this to a separate class like the EventPublisher. Both are quite similar in intent and behavior. Same suggestion made for the EventPublisher also apply here then :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CommandService is a separate class like EventPublisher already.

Sequent::Util.skip_if_already_processing(:command_service_process_commands) do
begin
transaction_provider.transactional do
while(!command_queue.empty?) do
process_command(command_queue.pop)
end
end
ensure
command_queue.clear
repository.clear
end
end
end

def process_command(command)
filters.each { |filter| filter.execute(command) }

raise CommandNotValid.new(command) unless command.valid?
parsed_command = command.parse_attrs_to_correct_types
command_handlers.select { |h| h.class.handles_message?(parsed_command) }.each { |h| h.handle_message parsed_command }
repository.commit(parsed_command)
end

def command_queue
Thread.current[:command_service_commands] ||= Queue.new
end

def event_store
configuration.event_store
end
Expand Down
1 change: 1 addition & 0 deletions lib/sequent/core/core.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@
require_relative 'aggregate_snapshotter'
require_relative 'workflow'
require_relative 'random_uuid_generator'
require_relative 'event_publisher'
63 changes: 63 additions & 0 deletions lib/sequent/core/event_publisher.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
module Sequent
module Core
#
# EventPublisher ensures that, for every thread, events will be published in the order in which they are queued for publishing.
#
# This potentially introduces a wrinkle into your plans: You therefore should not split a "unit of work" across multiple threads.
#
# If you want other behaviour, you are free to implement your own version of EventPublisher and configure Sequent to use it.
#
class EventPublisher
class PublishEventError < RuntimeError
attr_reader :event_handler_class, :event

def initialize(event_handler_class, event)
@event_handler_class = event_handler_class
@event = event
end

def message
"Event Handler: #{@event_handler_class.inspect}\nEvent: #{@event.inspect}\nCause: #{cause.inspect}"
end
end

def publish_events(events)
return if configuration.disable_event_handlers
events.each { |event| events_queue.push(event) }
process_events
end

private

def events_queue
Thread.current[:events_queue] ||= Queue.new
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Queue only supports FIFO I think. We can safely use Array here to support both FIFO and LIFO.

Copy link
Contributor Author

@derekkraan derekkraan Nov 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should hold off on implementing something like this until someone asks for it / needs it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are changing the current behavior so I think it is good to support both cases and the current one as default, with maybe a warning message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Neither of those scenarios is the current one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right... Okay, let's communicate this in the changelog when we release.

end

def process_events
Sequent::Util.skip_if_already_processing(:events_queue_lock) do
begin
while(!events_queue.empty?) do
process_event(events_queue.pop)
end
ensure
events_queue.clear
end
end
end

def process_event(event)
configuration.event_handlers.each do |handler|
begin
handler.handle_message event
rescue
raise PublishEventError.new(handler.class, event)
end
end
end

def configuration
Sequent.configuration
end
end
end
end
34 changes: 6 additions & 28 deletions lib/sequent/core/event_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,6 @@ class EventStore
include ActiveRecord::ConnectionAdapters::Quoting
extend Forwardable

class PublishEventError < RuntimeError
attr_reader :event_handler_class, :event

def initialize(event_handler_class, event)
@event_handler_class = event_handler_class
@event = event
end

def message
"Event Handler: #{@event_handler_class.inspect}\nEvent: #{@event.inspect}\nCause: #{cause.inspect}"
end
end

class OptimisticLockingError < RuntimeError
end

Expand All @@ -39,7 +26,7 @@ def message
end

attr_accessor :configuration
def_delegators :@configuration, :stream_record_class, :event_record_class, :snapshot_event_class, :event_handlers
def_delegators :@configuration, :stream_record_class, :event_record_class, :snapshot_event_class

def initialize(configuration = Sequent.configuration)
self.configuration = configuration
Expand All @@ -55,7 +42,7 @@ def initialize(configuration = Sequent.configuration)
#
def commit_events(command, streams_with_events)
store_events(command, streams_with_events)
publish_events(streams_with_events.flat_map { |_, events| events }, event_handlers)
publish_events(streams_with_events.flat_map { |_, events| events })
end

##
Expand Down Expand Up @@ -105,7 +92,7 @@ def stream_exists?(aggregate_id)
def replay_events
warn "[DEPRECATION] `replay_events` is deprecated in favor of `replay_events_from_cursor`"
events = yield.map { |event_hash| deserialize_event(event_hash) }
publish_events(events, event_handlers)
publish_events(events)
end

##
Expand All @@ -123,7 +110,7 @@ def replay_events_from_cursor(block_size: 2000,
ids_replayed = []
cursor.each_row(block_size: block_size).each do |record|
event = deserialize_event(record)
publish_events([event], event_handlers)
publish_events([event])
progress += 1
ids_replayed << record['id']
if progress % block_size == 0
Expand Down Expand Up @@ -190,17 +177,8 @@ def resolve_event_type(event_type)
@event_types.fetch_or_store(event_type) { |k| Class.const_get(k) }
end

def publish_events(events, event_handlers)
return if configuration.disable_event_handlers
event_handlers.each do |handler|
events.each do |event|
begin
handler.handle_message event
rescue
raise PublishEventError.new(handler.class, event)
end
end
end
def publish_events(events)
configuration.event_publisher.publish_events(events)
end

def store_events(command, streams_with_events = [])
Expand Down
1 change: 1 addition & 0 deletions lib/sequent/sequent.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require_relative 'core/core'
require_relative 'util/util'
require_relative 'migrations/migrations'
require_relative 'configuration'

Expand Down
15 changes: 15 additions & 0 deletions lib/sequent/util/skip_if_already_processing.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module Sequent
module Util
def self.skip_if_already_processing(already_processing_key, &block)
return if Thread.current[already_processing_key]

begin
Thread.current[already_processing_key] = true

block.yield
ensure
Thread.current[already_processing_key] = nil
end
end
end
end
1 change: 1 addition & 0 deletions lib/sequent/util/util.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
require_relative 'skip_if_already_processing'
69 changes: 69 additions & 0 deletions spec/lib/sequent/core/event_publisher_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
require 'spec_helper'

describe Sequent::Core::EventPublisher do
class OtherAggregateTriggered < Sequent::Core::Event
attrs other_aggregate_id: String
end
class EventAdded < Sequent::Core::Event; end
class TriggerTestCase < Sequent::Core::Command; end
class TriggerOtherAggregate < Sequent::Core::Command; end

class TestAggregate < Sequent::Core::AggregateRoot
def trigger_other_aggregate(aggregate_id)
apply OtherAggregateTriggered, other_aggregate_id: aggregate_id
end

def add_event
apply EventAdded
end
end

class TestCommandHandler < Sequent::Core::BaseCommandHandler
on TriggerTestCase do |command|
agg1 = TestAggregate.new(aggregate_id: Sequent.new_uuid)
agg2 = TestAggregate.new(aggregate_id: Sequent.new_uuid)

agg1.trigger_other_aggregate(agg2.id)
agg2.add_event

repository.add_aggregate(agg1)
repository.add_aggregate(agg2)
end

on TriggerOtherAggregate do |command|
agg = repository.load_aggregate(command.aggregate_id)
agg.add_event
end
end

class TestWorkflow < Sequent::Core::Workflow
on OtherAggregateTriggered do |event|
execute_commands TriggerOtherAggregate.new(aggregate_id: event.other_aggregate_id)
end
end

class TestEventHandler < Sequent::Core::BaseEventHandler
def initialize(*args)
@sequence_numbers = []
super(*args)
end

attr_reader :sequence_numbers

on EventAdded do |event|
@sequence_numbers << event.sequence_number
end
end

it 'handles events in the proper order' do
Sequent::Configuration.reset
test_event_handler = TestEventHandler.new
Sequent.configuration.event_handlers << TestWorkflow.new
Sequent.configuration.event_handlers << test_event_handler
Sequent.configuration.command_handlers << TestCommandHandler.new

Sequent.command_service.execute_commands TriggerTestCase.new(aggregate_id: Sequent.new_uuid)

expect(test_event_handler.sequence_numbers).to eq [1,2]
end
end
2 changes: 1 addition & 1 deletion spec/lib/sequent/core/event_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ class FailingHandler < Sequent::Core::BaseEventHandler
end
end

it { is_expected.to be_a(Sequent::Core::EventStore::PublishEventError) }
it { is_expected.to be_a(Sequent::Core::EventPublisher::PublishEventError) }

it 'preserves its cause' do
expect(publish_error.cause).to be_a(FailingHandler::Error)
Expand Down
1 change: 1 addition & 0 deletions spec/lib/sequent/support/database_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'spec_helper'
require 'tmpdir'

require 'sequent/support'

Expand Down