Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/data object management #393

Merged
merged 4 commits into from
Mar 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions SpiffWorkflow/bpmn/parser/ProcessParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ def __init__(self, p, node, nsmap, data_stores, filename=None, lane=None):
:param node: the XML node for the process
:param data_stores: map of ids to data store implementations
:param filename: the source BPMN filename (optional)
:param doc_xpath: an xpath evaluator for the document (optional)
:param lane: the lane of a subprocess (optional)
"""
super().__init__(node, nsmap, filename=filename, lane=lane)
Expand All @@ -48,7 +47,7 @@ def __init__(self, p, node, nsmap, data_stores, filename=None, lane=None):
self.spec = None
self.process_executable = node.get('isExecutable', 'true') == 'true'
self.data_stores = data_stores
self.inherited_data_objects = {}
self.parent = None

def get_name(self):
"""
Expand Down Expand Up @@ -118,8 +117,6 @@ def _parse(self):
raise ValidationException(f"Process {self.bpmn_id} is not executable.", node=self.node, file_name=self.filename)
self.spec = BpmnProcessSpec(name=self.bpmn_id, description=self.get_name(), filename=self.filename)

self.spec.data_objects.update(self.inherited_data_objects)

# Get the data objects
for obj in self.xpath('./bpmn:dataObject'):
data_object = self.parse_data_object(obj)
Expand Down Expand Up @@ -147,7 +144,7 @@ def _parse(self):
split_task.inputs = [self.spec.start]

def parse_data_object(self, obj):
return DataObject(obj.get('id'), obj.get('name'))
return self.create_data_spec(obj, DataObject)

def get_spec(self):
"""
Expand Down
20 changes: 16 additions & 4 deletions SpiffWorkflow/bpmn/parser/node_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ def parse_incoming_data_references(self):
specs = []
for name in self.xpath('./bpmn:dataInputAssociation/bpmn:sourceRef'):
ref = first(self.doc_xpath(f".//bpmn:dataObjectReference[@id='{name.text}']"))
if ref is not None and ref.get('dataObjectRef') in self.process_parser.spec.data_objects:
specs.append(self.process_parser.spec.data_objects[ref.get('dataObjectRef')])
data_obj = self._resolve_data_object_ref(ref)
if data_obj is not None:
specs.append(data_obj)
else:
ref = first(self.doc_xpath(f".//bpmn:dataStoreReference[@id='{name.text}']"))
if ref is not None and ref.get('dataStoreRef') in self.process_parser.data_stores:
Expand All @@ -96,8 +97,9 @@ def parse_outgoing_data_references(self):
specs = []
for name in self.xpath('./bpmn:dataOutputAssociation/bpmn:targetRef'):
ref = first(self.doc_xpath(f".//bpmn:dataObjectReference[@id='{name.text}']"))
if ref is not None and ref.get('dataObjectRef') in self.process_parser.spec.data_objects:
specs.append(self.process_parser.spec.data_objects[ref.get('dataObjectRef')])
data_obj = self._resolve_data_object_ref(ref)
if data_obj is not None:
specs.append(data_obj)
else:
ref = first(self.doc_xpath(f".//bpmn:dataStoreReference[@id='{name.text}']"))
if ref is not None and ref.get('dataStoreRef') in self.process_parser.data_stores:
Expand All @@ -124,6 +126,16 @@ def parse_io_spec(self):
outputs.append(data_refs[ref.text])
return BpmnIoSpecification(inputs, outputs)

def _resolve_data_object_ref(self, ref):
if ref is not None:
current = self.process_parser
while current is not None:
data_obj = current.spec.data_objects.get(ref.get('dataObjectRef'))
if data_obj is None:
current = self.process_parser.parent
else:
return data_obj

def create_data_spec(self, item, cls):
return cls(item.attrib.get('id'), item.attrib.get('name'))

Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/parser/task_parsers.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def get_subprocess_spec(task_parser):
spec_id = task_parser.node.get('id')
# This parser makes me want to cry
spec_parser = task_parser.process_parser.parser.process_parsers[spec_id]
spec_parser.inherited_data_objects.update(task_parser.process_parser.spec.data_objects)
spec_parser.parent = task_parser.process_parser
return spec_id

@staticmethod
Expand Down
2 changes: 0 additions & 2 deletions SpiffWorkflow/bpmn/serializer/default/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,4 @@ def subprocesses_from_dict(self, dct, workflow, top_workflow=None):
sp = self.registry.restore(dct.pop(str(task.id)), task=task, top_workflow=top_workflow)
top_workflow.subprocesses[task.id] = sp
sp.completed_event.connect(task.task_spec._on_subworkflow_completed, task)
if len(sp.spec.data_objects) > 0:
sp.data = task.workflow.data
self.subprocesses_from_dict(dct, sp, top_workflow)
38 changes: 37 additions & 1 deletion SpiffWorkflow/bpmn/serializer/migration/version_1_3.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,40 @@ def add_new_typenames(dct):
for sp in dct['subprocesses'].values():
sp['typename'] = 'BpmnSubWorkflow'
for task in sp['tasks'].values():
task['typename'] = 'Task'
task['typename'] = 'Task'

def update_data_objects(dct):

def update_spec(parent):
children = []
for ts in [ts for ts in parent['task_specs'].values() if 'spec' in ts]:
child = dct['subprocess_specs'].get(ts['spec'])
children.append((child, ts['typename']))
update_spec(child)
for child in [c for c, spec_type in children if spec_type != 'CallActivity']:
for name in parent['data_objects']:
child['data_objects'].pop(name, None)

data_objects = []

def update_wf(wf, spec):

data_objects.extend([v for v in spec.get('data_objects', {}) if v not in data_objects])

for task in [t for t in wf['tasks'].values() if t['id'] in dct['subprocesses']]:
ts = spec['task_specs'][task['task_spec']]
sp_spec = dct['subprocess_specs'].get(ts['spec'])
sp = dct['subprocesses'].get(task['id'])
update_wf(sp, sp_spec)

if len(spec.get('data_objects', {})) > 0:
wf['data']['data_objects'] = {}

for key in list(wf['data']):
if key in spec.get('data_objects', {}):
wf['data']['data_objects'][key] = wf['data'].pop(key)
elif key in data_objects:
del wf['data'][key]

update_spec(dct['spec'])
update_wf(dct, dct['spec'])
2 changes: 2 additions & 0 deletions SpiffWorkflow/bpmn/serializer/migration/version_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
remove_boundary_event_parent,
remove_root_task,
add_new_typenames,
update_data_objects,
)

def from_version_1_2(dct):
Expand All @@ -41,6 +42,7 @@ def from_version_1_2(dct):
remove_boundary_event_parent(dct)
remove_root_task(dct)
add_new_typenames(dct)
update_data_objects(dct)


def from_version_1_1(dct):
Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/bpmn_task_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def __init__(self, *args):
super(_BpmnCondition, self).__init__(*args)

def _matches(self, task):
return task.workflow.script_engine.evaluate(task, self.args[0], external_context=task.workflow.data)
return task.workflow.script_engine.evaluate(task, self.args[0], external_context=task.workflow.data_objects)


class BpmnIoSpecification:
Expand Down
19 changes: 16 additions & 3 deletions SpiffWorkflow/bpmn/specs/data_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,31 @@ class DataObject(BpmnDataSpecification):

def get(self, my_task):
"""Copy a value form the workflow data to the task data."""
if self.bpmn_id not in my_task.workflow.data:

# Find the spec where the data object is defined and put it there
wf = my_task.workflow
while wf is not None and self.bpmn_id not in wf.spec.data_objects:
wf = wf.parent_workflow

if wf is None or self.bpmn_id not in wf.data_objects:
message = f"The data object could not be read; '{self.bpmn_id}' does not exist in the process."
raise WorkflowDataException(message, my_task, data_input=self)
my_task.data[self.bpmn_id] = deepcopy(my_task.workflow.data[self.bpmn_id])

my_task.data[self.bpmn_id] = deepcopy(wf.data_objects[self.bpmn_id])
data_log.info(f'Read workflow variable {self.bpmn_id}', extra=my_task.log_info())

def set(self, my_task):
"""Copy a value from the task data to the workflow data"""

if self.bpmn_id not in my_task.data:
message = f"A data object could not be set; '{self.bpmn_id}' not exist in the task."
raise WorkflowDataException(message, my_task, data_output=self)
my_task.workflow.data[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id])

wf = my_task.workflow
while wf is not None and self.bpmn_id not in wf.spec.data_objects:
wf = wf.parent_workflow

wf.data_objects[self.bpmn_id] = deepcopy(my_task.data[self.bpmn_id])
del my_task.data[self.bpmn_id]
data_log.info(f'Set workflow variable {self.bpmn_id}', extra=my_task.log_info())

Expand Down
2 changes: 1 addition & 1 deletion SpiffWorkflow/bpmn/specs/event_definitions/conditional.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ def __init__(self, expression, **kwargs):

def has_fired(self, my_task):
my_task._set_internal_data(
has_fired=my_task.workflow.script_engine.evaluate(my_task, self.expression, external_context=my_task.workflow.data)
has_fired=my_task.workflow.script_engine.evaluate(my_task, self.expression, external_context=my_task.workflow.data_objects)
)
return my_task._get_internal_data('has_fired', False)
6 changes: 0 additions & 6 deletions SpiffWorkflow/bpmn/specs/mixins/subworkflow_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,6 @@ def _on_cancel(self, my_task):
subworkflow.cancel()

def copy_data(self, my_task, subworkflow):
# There is only one copy of any given data object, so it should be updated immediately
# Doing this is actually a little problematic, because it gives parent processes access to
# data objects defined in subprocesses.
# But our data management is already hopelessly messed up and in dire needs of reconsideration
if len(subworkflow.spec.data_objects) > 0:
subworkflow.data = my_task.workflow.data
start = subworkflow.get_next_task(spec_name='Start')
start.set_data(**my_task.data)

Expand Down
14 changes: 13 additions & 1 deletion SpiffWorkflow/bpmn/util/subworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,19 @@
from SpiffWorkflow import Workflow
from SpiffWorkflow.exceptions import TaskNotFoundException

class BpmnSubWorkflow(Workflow):
class BpmnBaseWorkflow(Workflow):

def __init__(self, spec, **kwargs):
super().__init__(spec, **kwargs)
if len(spec.data_objects) > 0:
self.data['data_objects'] = {}

@property
def data_objects(self):
return self.data.get('data_objects', {})


class BpmnSubWorkflow(BpmnBaseWorkflow):

def __init__(self, spec, parent_task_id, top_workflow, **kwargs):
super().__init__(spec, **kwargs)
Expand Down
7 changes: 3 additions & 4 deletions SpiffWorkflow/bpmn/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

from SpiffWorkflow.task import Task
from SpiffWorkflow.util.task import TaskState
from SpiffWorkflow.workflow import Workflow
from SpiffWorkflow.exceptions import WorkflowException

from SpiffWorkflow.bpmn.specs.mixins.events.event_types import CatchingEvent
Expand All @@ -28,13 +27,13 @@

from SpiffWorkflow.bpmn.specs.control import BoundaryEventSplit

from SpiffWorkflow.bpmn.util.subworkflow import BpmnSubWorkflow
from SpiffWorkflow.bpmn.util.subworkflow import BpmnBaseWorkflow, BpmnSubWorkflow
from SpiffWorkflow.bpmn.util.task import BpmnTaskIterator

from .script_engine.python_engine import PythonScriptEngine


class BpmnWorkflow(Workflow):
class BpmnWorkflow(BpmnBaseWorkflow):
"""
The engine that executes a BPMN workflow. This specialises the standard
Spiff Workflow class with a few extra methods and attributes.
Expand Down Expand Up @@ -247,7 +246,7 @@ def reset_from_task_id(self, task_id, data=None, remove_subprocess=True):
def cancel(self, workflow=None):

wf = workflow or self
cancelled = Workflow.cancel(wf)
cancelled = BpmnBaseWorkflow.cancel(wf)
cancelled_ids = [t.id for t in cancelled]
to_cancel = []
for sp_id, sp in self.subprocesses.items():
Expand Down
8 changes: 4 additions & 4 deletions tests/SpiffWorkflow/bpmn/DataObjectTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def testMissingDataInput(self):

# Remove the data before advancing
ready_tasks = self.get_ready_user_tasks()
self.workflow.data.pop('obj_1')
self.workflow.data_objects.pop('obj_1')
with self.assertRaises(WorkflowDataException) as exc:
ready_tasks[0].run()
self.assertEqual(exc.data_output.name, 'obj_1')
Expand All @@ -51,7 +51,7 @@ def actual_test(self, save_restore):
ready_tasks[0].run()
# After task completion, obj_1 should be copied out of the task into the workflow
self.assertNotIn('obj_1', ready_tasks[0].data)
self.assertIn('obj_1', self.workflow.data)
self.assertIn('obj_1', self.workflow.data_objects)

if save_restore:
self.save_restore()
Expand All @@ -70,7 +70,7 @@ def actual_test(self, save_restore):
# We did not set an output data reference so obj_1 should remain unchanged in the workflow data
# and be removed from the task data
self.assertNotIn('obj_1', ready_tasks[0].data)
self.assertEqual(self.workflow.data['obj_1'], 'hello')
self.assertEqual(self.workflow.data_objects['obj_1'], 'hello')

if save_restore:
self.save_restore()
Expand All @@ -86,7 +86,7 @@ def actual_test(self, save_restore):
# It was copied out
self.assertNotIn('obj_1', sp.data)
# The update should persist in the main process
self.assertEqual(self.workflow.data['obj_1'], 'hello again')
self.assertEqual(self.workflow.data_objects['obj_1'], 'hello again')

class DataObjectGatewayTest(BpmnWorkflowTestCase):

Expand Down
Binary file added tests/SpiffWorkflow/bpmn/data/scales11.xls
Binary file not shown.
Loading
Loading