Skip to content

Commit

Permalink
Merge pull request #393 from sartography/bugfix/data-object-management
Browse files Browse the repository at this point in the history
Bugfix/data object management
  • Loading branch information
essweine authored Mar 20, 2024
2 parents 32c00e4 + bee5cc6 commit 86cb84d
Show file tree
Hide file tree
Showing 21 changed files with 1,738 additions and 39 deletions.
7 changes: 2 additions & 5 deletions SpiffWorkflow/bpmn/parser/ProcessParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def __init__(self, p, node, nsmap, data_stores, filename=None, lane=None):
:param node: the XML node for the process
:param data_stores: map of ids to data store implementations
:param filename: the source BPMN filename (optional)
:param doc_xpath: an xpath evaluator for the document (optional)
:param lane: the lane of a subprocess (optional)
"""
super().__init__(node, nsmap, filename=filename, lane=lane)
Expand All @@ -48,7 +47,7 @@ def __init__(self, p, node, nsmap, data_stores, filename=None, lane=None):
self.spec = None
self.process_executable = node.get('isExecutable', 'true') == 'true'
self.data_stores = data_stores
self.inherited_data_objects = {}
self.parent = None

def get_name(self):
"""
Expand Down Expand Up @@ -118,8 +117,6 @@ def _parse(self):
raise ValidationException(f"Process {self.bpmn_id} is not executable.", node=self.node, file_name=self.filename)
self.spec = BpmnProcessSpec(name=self.bpmn_id, description=self.get_name(), filename=self.filename)

self.spec.data_objects.update(self.inherited_data_objects)

# Get the data objects
for obj in self.xpath('./bpmn:dataObject'):
data_object = self.parse_data_object(obj)
Expand Down Expand Up @@ -147,7 +144,7 @@ def _parse(self):
split_task.inputs = [self.spec.start]

def parse_data_object(self, obj):
return DataObject(obj.get('id'), obj.get('name'))
return self.create_data_spec(obj, DataObject)

def get_spec(self):
"""
Expand Down
20 changes: 16 additions & 4 deletions SpiffWorkflow/bpmn/parser/node_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ def parse_incoming_data_references(self):
specs = []
for name in self.xpath('./bpmn:dataInputAssociation/bpmn:sourceRef'):
ref = first(self.doc_xpath(f".//bpmn:dataObjectReference[@id='{name.text}']"))
if ref is not None and ref.get('dataObjectRef') in self.process_parser.spec.data_objects:
specs.append(self.process_parser.spec.data_objects[ref.get('dataObjectRef')])
data_obj = self._resolve_data_object_ref(ref)
if data_obj is not None:
specs.append(data_obj)
else:
ref = first(self.doc_xpath(f".//bpmn:dataStoreReference[@id='{name.text}']"))
if ref is not None and ref.get('dataStoreRef') in self.process_parser.data_stores:
Expand All @@ -96,8 +97,9 @@ def parse_outgoing_data_references(self):
specs = []
for name in self.xpath('./bpmn:dataOutputAssociation/bpmn:targetRef'):
ref = first(self.doc_xpath(f".//bpmn:dataObjectReference[@id='{name.text}']"))
if ref is not None and ref.get('dataObjectRef') in self.process_parser.spec.data_objects:
specs.append(self.process_parser.spec.data_objects[ref.get('dataObjectRef')])
data_obj = self._resolve_data_object_ref(ref)
if data_obj is not None:
specs.append(data_obj)
else:
ref = first(self.doc_xpath(f".//bpmn:dataStoreReference[@id='{name.text}']"))
if ref is not None and ref.get('dataStoreRef') in self.process_parser.data_stores:
Expand All @@ -124,6 +126,16 @@ def parse_io_spec(self):
outputs.append(data_refs[ref.text])
return BpmnIoSpecification(inputs, outputs)

def _resolve_data_object_ref(self, ref):
if ref is not None:
current = self.process_parser
while current is not None:
data_obj = current.spec.data_objects.get(ref.get('dataObjectRef'))
if data_obj is None:
current = self.process_parser.parent
else:
return data_obj

def create_data_spec(self, item, cls):
return cls(item.attrib.get('id'), item.attrib.get('name'))

Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/parser/task_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def get_subprocess_spec(task_parser):
spec_id = task_parser.node.get('id')
# This parser makes me want to cry
spec_parser = task_parser.process_parser.parser.process_parsers[spec_id]
spec_parser.inherited_data_objects.update(task_parser.process_parser.spec.data_objects)
spec_parser.parent = task_parser.process_parser
return spec_id

@staticmethod
Expand Down
2 changes: 0 additions & 2 deletions SpiffWorkflow/bpmn/serializer/default/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,4 @@ def subprocesses_from_dict(self, dct, workflow, top_workflow=None):
sp = self.registry.restore(dct.pop(str(task.id)), task=task, top_workflow=top_workflow)
top_workflow.subprocesses[task.id] = sp
sp.completed_event.connect(task.task_spec._on_subworkflow_completed, task)
if len(sp.spec.data_objects) > 0:
sp.data = task.workflow.data
self.subprocesses_from_dict(dct, sp, top_workflow)
38 changes: 37 additions & 1 deletion SpiffWorkflow/bpmn/serializer/migration/version_1_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,40 @@ def add_new_typenames(dct):
for sp in dct['subprocesses'].values():
sp['typename'] = 'BpmnSubWorkflow'
for task in sp['tasks'].values():
task['typename'] = 'Task'
task['typename'] = 'Task'

def update_data_objects(dct):

def update_spec(parent):
children = []
for ts in [ts for ts in parent['task_specs'].values() if 'spec' in ts]:
child = dct['subprocess_specs'].get(ts['spec'])
children.append((child, ts['typename']))
update_spec(child)
for child in [c for c, spec_type in children if spec_type != 'CallActivity']:
for name in parent['data_objects']:
child['data_objects'].pop(name, None)

data_objects = []

def update_wf(wf, spec):

data_objects.extend([v for v in spec.get('data_objects', {}) if v not in data_objects])

for task in [t for t in wf['tasks'].values() if t['id'] in dct['subprocesses']]:
ts = spec['task_specs'][task['task_spec']]
sp_spec = dct['subprocess_specs'].get(ts['spec'])
sp = dct['subprocesses'].get(task['id'])
update_wf(sp, sp_spec)

if len(spec.get('data_objects', {})) > 0:
wf['data']['data_objects'] = {}

for key in list(wf['data']):
if key in spec.get('data_objects', {}):
wf['data']['data_objects'][key] = wf['data'].pop(key)
elif key in data_objects:
del wf['data'][key]

update_spec(dct['spec'])
update_wf(dct, dct['spec'])
2 changes: 2 additions & 0 deletions SpiffWorkflow/bpmn/serializer/migration/version_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
remove_boundary_event_parent,
remove_root_task,
add_new_typenames,
update_data_objects,
)

def from_version_1_2(dct):
Expand All @@ -41,6 +42,7 @@ def from_version_1_2(dct):
remove_boundary_event_parent(dct)
remove_root_task(dct)
add_new_typenames(dct)
update_data_objects(dct)


def from_version_1_1(dct):
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/bpmn_task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self, *args):
super(_BpmnCondition, self).__init__(*args)

def _matches(self, task):
return task.workflow.script_engine.evaluate(task, self.args[0], external_context=task.workflow.data)
return task.workflow.script_engine.evaluate(task, self.args[0], external_context=task.workflow.data_objects)


class BpmnIoSpecification:
Expand Down
19 changes: 16 additions & 3 deletions SpiffWorkflow/bpmn/specs/data_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,31 @@ class DataObject(BpmnDataSpecification):

def get(self, my_task):
"""Copy a value form the workflow data to the task data."""
if self.bpmn_id not in my_task.workflow.data:

# Find the spec where the data object is defined and put it there
wf = my_task.workflow
while wf is not None and self.bpmn_id not in wf.spec.data_objects:
wf = wf.parent_workflow

if wf is None or self.bpmn_id not in wf.data_objects:
message = f"The data object could not be read; '{self.bpmn_id}' does not exist in the process."
raise WorkflowDataException(message, my_task, data_input=self)
my_task.data[self.bpmn_id] = deepcopy(my_task.workflow.data[self.bpmn_id])

my_task.data[self.bpmn_id] = deepcopy(wf.data_objects[self.bpmn_id])
data_log.info(f'Read workflow variable {self.bpmn_id}', extra=my_task.log_info())

def set(self, my_task):
"""Copy a value from the task data to the workflow data"""

if self.bpmn_id not in my_task.data:
message = f"A data object could not be set; '{self.bpmn_id}' not exist in the task."
raise WorkflowDataException(message, my_task, data_output=self)
my_task.workflow.data[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id])

wf = my_task.workflow
while wf is not None and self.bpmn_id not in wf.spec.data_objects:
wf = wf.parent_workflow

wf.data_objects[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id])
del my_task.data[self.bpmn_id]
data_log.info(f'Set workflow variable {self.bpmn_id}', extra=my_task.log_info())

Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/event_definitions/conditional.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ def __init__(self, expression, **kwargs):

def has_fired(self, my_task):
my_task._set_internal_data(
has_fired=my_task.workflow.script_engine.evaluate(my_task, self.expression, external_context=my_task.workflow.data)
has_fired=my_task.workflow.script_engine.evaluate(my_task, self.expression, external_context=my_task.workflow.data_objects)
)
return my_task._get_internal_data('has_fired', False)
6 changes: 0 additions & 6 deletions SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ def _on_cancel(self, my_task):
subworkflow.cancel()

def copy_data(self, my_task, subworkflow):
# There is only one copy of any given data object, so it should be updated immediately
# Doing this is actually a little problematic, because it gives parent processes access to
# data objects defined in subprocesses.
# But our data management is already hopelessly messed up and in dire needs of reconsideration
if len(subworkflow.spec.data_objects) > 0:
subworkflow.data = my_task.workflow.data
start = subworkflow.get_next_task(spec_name='Start')
start.set_data(**my_task.data)

Expand Down
14 changes: 13 additions & 1 deletion SpiffWorkflow/bpmn/util/subworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,19 @@
from SpiffWorkflow import Workflow
from SpiffWorkflow.exceptions import TaskNotFoundException

class BpmnSubWorkflow(Workflow):
class BpmnBaseWorkflow(Workflow):

def __init__(self, spec, **kwargs):
super().__init__(spec, **kwargs)
if len(spec.data_objects) > 0:
self.data['data_objects'] = {}

@property
def data_objects(self):
return self.data.get('data_objects', {})


class BpmnSubWorkflow(BpmnBaseWorkflow):

def __init__(self, spec, parent_task_id, top_workflow, **kwargs):
super().__init__(spec, **kwargs)
Expand Down
7 changes: 3 additions & 4 deletions SpiffWorkflow/bpmn/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from SpiffWorkflow.task import Task
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.workflow import Workflow
from SpiffWorkflow.exceptions import WorkflowException

from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent
Expand All @@ -28,13 +27,13 @@

from SpiffWorkflow.bpmn.specs.control import BoundaryEventSplit

from SpiffWorkflow.bpmn.util.subworkflow import BpmnSubWorkflow
from SpiffWorkflow.bpmn.util.subworkflow import BpmnBaseWorkflow, BpmnSubWorkflow
from SpiffWorkflow.bpmn.util.task import BpmnTaskIterator

from .script_engine.python_engine import PythonScriptEngine


class BpmnWorkflow(Workflow):
class BpmnWorkflow(BpmnBaseWorkflow):
"""
The engine that executes a BPMN workflow. This specialises the standard
Spiff Workflow class with a few extra methods and attributes.
Expand Down Expand Up @@ -247,7 +246,7 @@ def reset_from_task_id(self, task_id, data=None, remove_subprocess=True):
def cancel(self, workflow=None):

wf = workflow or self
cancelled = Workflow.cancel(wf)
cancelled = BpmnBaseWorkflow.cancel(wf)
cancelled_ids = [t.id for t in cancelled]
to_cancel = []
for sp_id, sp in self.subprocesses.items():
Expand Down
8 changes: 4 additions & 4 deletions tests/SpiffWorkflow/bpmn/DataObjectTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def testMissingDataInput(self):

# Remove the data before advancing
ready_tasks = self.get_ready_user_tasks()
self.workflow.data.pop('obj_1')
self.workflow.data_objects.pop('obj_1')
with self.assertRaises(WorkflowDataException) as exc:
ready_tasks[0].run()
self.assertEqual(exc.data_output.name, 'obj_1')
Expand All @@ -51,7 +51,7 @@ def actual_test(self, save_restore):
ready_tasks[0].run()
# After task completion, obj_1 should be copied out of the task into the workflow
self.assertNotIn('obj_1', ready_tasks[0].data)
self.assertIn('obj_1', self.workflow.data)
self.assertIn('obj_1', self.workflow.data_objects)

if save_restore:
self.save_restore()
Expand All @@ -70,7 +70,7 @@ def actual_test(self, save_restore):
# We did not set an output data reference so obj_1 should remain unchanged in the workflow data
# and be removed from the task data
self.assertNotIn('obj_1', ready_tasks[0].data)
self.assertEqual(self.workflow.data['obj_1'], 'hello')
self.assertEqual(self.workflow.data_objects['obj_1'], 'hello')

if save_restore:
self.save_restore()
Expand All @@ -86,7 +86,7 @@ def actual_test(self, save_restore):
# It was copied out
self.assertNotIn('obj_1', sp.data)
# The update should persist in the main process
self.assertEqual(self.workflow.data['obj_1'], 'hello again')
self.assertEqual(self.workflow.data_objects['obj_1'], 'hello again')

class DataObjectGatewayTest(BpmnWorkflowTestCase):

Expand Down
Binary file added tests/SpiffWorkflow/bpmn/data/scales11.xls
Binary file not shown.
Loading

0 comments on commit 86cb84d

Please sign in to comment.