Skip to content

Commit

Permalink
Merge pull request #284 from sartography/feature/improved-timer-events
Browse files Browse the repository at this point in the history
Feature/improved timer events
  • Loading branch information
essweine authored Jan 20, 2023
2 parents 450ef3b + dc8d139 commit 7378639
Show file tree
Hide file tree
Showing 34 changed files with 1,455 additions and 704 deletions.
31 changes: 17 additions & 14 deletions SpiffWorkflow/bpmn/parser/event_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,19 @@
from .ValidationException import ValidationException
from .TaskParser import TaskParser
from .util import first, one
from ..specs.events.event_definitions import (MultipleEventDefinition, TimerEventDefinition, MessageEventDefinition,
ErrorEventDefinition, EscalationEventDefinition,
SignalEventDefinition,
CancelEventDefinition, CycleTimerEventDefinition,
TerminateEventDefinition, NoneEventDefinition)
from ..specs.events.event_definitions import (
MultipleEventDefinition,
TimeDateEventDefinition,
DurationTimerEventDefinition,
CycleTimerEventDefinition,
MessageEventDefinition,
ErrorEventDefinition,
EscalationEventDefinition,
SignalEventDefinition,
CancelEventDefinition,
TerminateEventDefinition,
NoneEventDefinition
)


CAMUNDA_MODEL_NS = 'http://camunda.org/schema/1.0/bpmn'
Expand Down Expand Up @@ -81,18 +89,16 @@ def parse_timer_event(self):
"""Parse the timerEventDefinition node and return an instance of TimerEventDefinition."""

try:
label = self.node.get('name', self.node.get('id'))
name = self.node.get('name', self.node.get('id'))
time_date = first(self.xpath('.//bpmn:timeDate'))
if time_date is not None:
return TimerEventDefinition(label, time_date.text)

return TimeDateEventDefinition(name, time_date.text)
time_duration = first(self.xpath('.//bpmn:timeDuration'))
if time_duration is not None:
return TimerEventDefinition(label, time_duration.text)

return DurationTimerEventDefinition(name, time_duration.text)
time_cycle = first(self.xpath('.//bpmn:timeCycle'))
if time_cycle is not None:
return CycleTimerEventDefinition(label, time_cycle.text)
return CycleTimerEventDefinition(name, time_cycle.text)
raise ValidationException("Unknown Time Specification", node=self.node, file_name=self.filename)
except Exception as e:
raise ValidationException("Time Specification Error. " + str(e), node=self.node, file_name=self.filename)
Expand Down Expand Up @@ -170,9 +176,6 @@ def create_task(self):
event_definition = self.get_event_definition([MESSAGE_EVENT_XPATH, SIGNAL_EVENT_XPATH, TIMER_EVENT_XPATH])
task = self._create_task(event_definition)
self.spec.start.connect(task)
if isinstance(event_definition, CycleTimerEventDefinition):
# We are misusing cycle timers, so this is a hack whereby we will revisit ourselves if we fire.
task.connect(task)
return task

def handles_multiple_outgoing(self):
Expand Down
44 changes: 31 additions & 13 deletions SpiffWorkflow/bpmn/serializer/bpmn_converters.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,21 @@

from .dictionary import DictionaryConverter

from ..specs.events.event_definitions import MultipleEventDefinition, SignalEventDefinition, MessageEventDefinition, NoneEventDefinition
from ..specs.events.event_definitions import TimerEventDefinition, CycleTimerEventDefinition, TerminateEventDefinition
from ..specs.events.event_definitions import ErrorEventDefinition, EscalationEventDefinition, CancelEventDefinition
from ..specs.events.event_definitions import CorrelationProperty, NamedEventDefinition
from ..specs.events.event_definitions import (
NoneEventDefinition,
MultipleEventDefinition,
SignalEventDefinition,
MessageEventDefinition,
CorrelationProperty,
TimeDateEventDefinition,
DurationTimerEventDefinition,
CycleTimerEventDefinition,
ErrorEventDefinition,
EscalationEventDefinition,
CancelEventDefinition,
TerminateEventDefinition,
NamedEventDefinition
)

from ..specs.BpmnSpecMixin import BpmnSpecMixin
from ...operators import Attrib, PathAttrib
Expand Down Expand Up @@ -89,9 +100,19 @@ def __init__(self, spec_class, data_converter, typename=None):
self.data_converter = data_converter
self.typename = typename if typename is not None else spec_class.__name__

event_definitions = [ NoneEventDefinition, CancelEventDefinition, TerminateEventDefinition,
SignalEventDefinition, MessageEventDefinition, ErrorEventDefinition, EscalationEventDefinition,
TimerEventDefinition, CycleTimerEventDefinition , MultipleEventDefinition]
event_definitions = [
NoneEventDefinition,
CancelEventDefinition,
TerminateEventDefinition,
SignalEventDefinition,
MessageEventDefinition,
ErrorEventDefinition,
EscalationEventDefinition,
TimeDateEventDefinition,
DurationTimerEventDefinition,
CycleTimerEventDefinition,
MultipleEventDefinition
]

for event_definition in event_definitions:
self.register(
Expand Down Expand Up @@ -238,12 +259,9 @@ def event_definition_to_dict(self, event_definition):
dct['name'] = event_definition.name
if isinstance(event_definition, MessageEventDefinition):
dct['correlation_properties'] = [prop.__dict__ for prop in event_definition.correlation_properties]
if isinstance(event_definition, TimerEventDefinition):
dct['label'] = event_definition.label
dct['dateTime'] = event_definition.dateTime
if isinstance(event_definition, CycleTimerEventDefinition):
dct['label'] = event_definition.label
dct['cycle_definition'] = event_definition.cycle_definition
if isinstance(event_definition, (TimeDateEventDefinition, DurationTimerEventDefinition, CycleTimerEventDefinition)):
dct['name'] = event_definition.name
dct['expression'] = event_definition.expression
if isinstance(event_definition, ErrorEventDefinition):
dct['error_code'] = event_definition.error_code
if isinstance(event_definition, EscalationEventDefinition):
Expand Down
90 changes: 89 additions & 1 deletion SpiffWorkflow/bpmn/serializer/version_migration.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,89 @@
from copy import deepcopy
from datetime import datetime, timedelta

from SpiffWorkflow.exceptions import WorkflowException
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.bpmn.specs.events.event_definitions import LOCALTZ

class VersionMigrationError(WorkflowException):
pass

def version_1_1_to_1_2(old):
"""
Upgrade v1.1 serialization to v1.2.
Expressions in timer event definitions have been converted from python expressions to
ISO 8601 expressions.
Cycle timers no longer connect back to themselves. New children are created from a single
tasks rather than reusing previously executed tasks.
"""
new = deepcopy(old)

def td_to_iso(td):
total = td.total_seconds()
v1, seconds = total // 60, total % 60
v2, minutes = v1 // 60, v1 % 60
days, hours = v2 // 24, v2 % 60
return f"P{days:.0f}DT{hours:.0f}H{minutes:.0f}M{seconds}S"

message = "Unable to convert time specifications for {spec}. This most likely because the values are set during workflow execution."

has_timer = lambda ts: 'event_definition' in ts and ts['event_definition']['typename'] in [ 'CycleTimerEventDefinition', 'TimerEventDefinition']
for spec in [ ts for ts in new['spec']['task_specs'].values() if has_timer(ts) ]:
spec['event_definition']['name'] = spec['event_definition'].pop('label')
if spec['event_definition']['typename'] == 'TimerEventDefinition':
expr = spec['event_definition'].pop('dateTime')
try:
dt = eval(expr)
if isinstance(dt, datetime):
spec['event_definition']['expression'] = f"'{dt.isoformat()}'"
spec['event_definition']['typename'] = 'TimeDateEventDefinition'
elif isinstance(dt, timedelta):
spec['event_definition']['expression'] = f"'{td_to_iso(dt)}'"
spec['event_definition']['typename'] = 'DurationTimerEventDefinition'
except:
raise VersionMigrationError(message.format(spec=spec['name']))

if spec['event_definition']['typename'] == 'CycleTimerEventDefinition':

tasks = [ t for t in new['tasks'].values() if t['task_spec'] == spec['name'] ]
task = tasks[0] if len(tasks) > 0 else None

expr = spec['event_definition'].pop('cycle_definition')
try:
repeat, duration = eval(expr)
spec['event_definition']['expression'] = f"'R{repeat}/{td_to_iso(duration)}'"
if task is not None:
cycles_complete = task['data'].pop('repeat_count', 0)
start_time = task['internal_data'].pop('start_time', None)
if start_time is not None:
dt = datetime.fromisoformat(start_time)
task['internal_data']['event_value'] = {
'cycles': repeat - cycles_complete,
'next': datetime.combine(dt.date(), dt.time(), LOCALTZ).isoformat(),
'duration': duration.total_seconds(),
}
except:
raise VersionMigrationError(message.format(spec=spec['name']))

if spec['typename'] == 'StartEvent':
spec['outputs'].remove(spec['name'])
if task is not None:
children = [ new['tasks'][c] for c in task['children'] ]
# Formerly cycles were handled by looping back and reusing the tasks so this removes the extra tasks
remove = [ c for c in children if c['task_spec'] == task['task_spec']][0]
for task_id in remove['children']:
child = new['tasks'][task_id]
if child['task_spec'].startswith('return') or child['state'] != TaskState.COMPLETED:
new['tasks'].pop(task_id)
else:
task['children'].append(task_id)
task['children'].remove(remove['id'])
new['tasks'].pop(remove['id'])

new['VERSION'] = "1.2"
return new

def version_1_0_to_1_1(old):
"""
Expand Down Expand Up @@ -47,8 +132,11 @@ def version_1_0_to_1_1(old):
task['children'] = [ c for c in task['children'] if c in sp['tasks'] ]

new['subprocesses'] = subprocesses
return new
new['VERSION'] = "1.1"
return version_1_1_to_1_2(new)


MIGRATIONS = {
'1.0': version_1_0_to_1_1,
'1.1': version_1_1_to_1_2,
}
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/serializer/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BpmnWorkflowSerializer:

# This is the default version set on the workflow, it can be overwritten
# using the configure_workflow_spec_converter.
VERSION = "1.1"
VERSION = "1.2"
VERSION_KEY = "serializer_version"
DEFAULT_JSON_ENCODER_CLS = None
DEFAULT_JSON_DECODER_CLS = None
Expand Down
10 changes: 1 addition & 9 deletions SpiffWorkflow/bpmn/specs/events/IntermediateEvent.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,10 @@ def _on_ready_hook(self, my_task):
for child in my_task.children:
if isinstance(child.task_spec, BoundaryEvent):
child.task_spec.event_definition.reset(child)
child._set_state(TaskState.WAITING)

def _child_complete_hook(self, child_task):

# If the main child completes, or a cancelling event occurs, cancel any
# unfinished children
# If the main child completes, or a cancelling event occurs, cancel any unfinished children
if child_task.task_spec == self.main_child_task_spec or child_task.task_spec.cancel_activity:
for sibling in child_task.parent.children:
if sibling == child_task:
Expand All @@ -89,11 +87,6 @@ def _child_complete_hook(self, child_task):
for t in child_task.workflow._get_waiting_tasks():
t.task_spec._update(t)

# If our event is a cycle timer, we need to set it back to waiting so it can fire again
elif isinstance(child_task.task_spec.event_definition, CycleTimerEventDefinition):
child_task._set_state(TaskState.WAITING)
child_task.task_spec._update_hook(child_task)

def _predict_hook(self, my_task):

# Events attached to the main task might occur
Expand All @@ -105,7 +98,6 @@ def _predict_hook(self, my_task):
child._set_state(state)



class BoundaryEvent(CatchingEvent):
"""Task Spec for a bpmn:boundaryEvent node."""

Expand Down
1 change: 1 addition & 0 deletions SpiffWorkflow/bpmn/specs/events/StartEvent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ def catch(self, my_task, event_definition):
my_task._set_state(TaskState.WAITING)

super(StartEvent, self).catch(my_task, event_definition)

Loading

0 comments on commit 7378639

Please sign in to comment.