Skip to content

Commit

Permalink
Refactor code to make it easier to extract functionality at a later date
Browse files Browse the repository at this point in the history
  • Loading branch information
derekkraan committed Nov 1, 2017
1 parent 4e2453e commit bcc7b89
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
17 changes: 10 additions & 7 deletions lib/sequent/core/command_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,7 @@ def process_commands
begin
transaction_provider.transactional do
while(!command_queue.empty?) do
command = command_queue.pop
filters.each { |filter| filter.execute(command) }

raise CommandNotValid.new(command) unless command.valid?
parsed_command = command.parse_attrs_to_correct_types
command_handlers.select { |h| h.class.handles_message?(parsed_command) }.each { |h| h.handle_message parsed_command }
repository.commit(parsed_command)
process_command(command_queue.pop)
end
end
ensure
Expand All @@ -65,6 +59,15 @@ def process_commands
end
end

def process_command(command)
filters.each { |filter| filter.execute(command) }

raise CommandNotValid.new(command) unless command.valid?
parsed_command = command.parse_attrs_to_correct_types
command_handlers.select { |h| h.class.handles_message?(parsed_command) }.each { |h| h.handle_message parsed_command }
repository.commit(parsed_command)
end

def command_queue
Thread.current[:command_service_commands] ||= Queue.new
end
Expand Down
20 changes: 11 additions & 9 deletions lib/sequent/core/event_publisher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,22 @@ def events_queue

def process_events
Sequent::Util.skip_if_already_processing(:events_queue_lock) do
while(!events_queue.empty?) do
event = events_queue.pop
configuration.event_handlers.each do |handler|
begin
handler.handle_message event
rescue
events_queue.clear
raise PublishEventError.new(handler.class, event)
end
begin
while(!events_queue.empty?) do
process_event(events_queue.pop)
end
ensure
events_queue.clear
end
end
end

def process_event(event)
configuration.event_handlers.each { |handler| handler.handle_message event }
rescue
raise PublishEventError.new(handler.class, event)
end

def configuration
Sequent.configuration
end
Expand Down

0 comments on commit bcc7b89

Please sign in to comment.