Skip to content

Commit

Permalink
Message API (#29)
Browse files Browse the repository at this point in the history
* init

* #5 simple test case

* #5 test without result enabled

* MessageApi from generic_camunda_client seems broken

* validate deleted processes are deleted

* convert collections to json
  • Loading branch information
Noordsestern authored Aug 3, 2021
1 parent c660c94 commit 1dc7727
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 10 deletions.
49 changes: 46 additions & 3 deletions CamundaLibrary/CamundaLibrary.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
LockedExternalTaskDto, \
VariableValueDto, FetchExternalTasksDto, FetchExternalTaskTopicDto, ProcessDefinitionApi, \
ProcessInstanceWithVariablesDto, StartProcessInstanceDto, ProcessInstanceModificationInstructionDto, \
ProcessInstanceApi, ProcessInstanceDto, VersionApi, EvaluateDecisionDto
ProcessInstanceApi, ProcessInstanceDto, VersionApi, EvaluateDecisionDto, MessageApi, \
MessageCorrelationResultWithVariableDto, CorrelationMessageDto
import generic_camunda_client as openapi_client

# local imports
Expand Down Expand Up @@ -87,7 +88,7 @@ class CamundaLibrary:

EMPTY_STRING = ""
KNOWN_TOPICS: Dict[str,Dict[str, Any]] = {}
FETCH_RESPONSE: LockedExternalTaskDto = None
FETCH_RESPONSE: LockedExternalTaskDto = {}

def __init__(self, camunda_engine_url: str = 'http://localhost:8080'):
self._shared_resources = CamundaResources()
Expand Down Expand Up @@ -231,6 +232,48 @@ def get_deployments(self, deployment_id: str = None, **kwargs):

return [r.to_dict() for r in response]

@keyword("Deliver Message", tags=["message"])
def deliver_message(self, message_name, **kwargs):
"""
Delivers a message using Camunda REST API: https://docs.camunda.org/manual/7.15/reference/rest/message/post-message/
Example:
| ${result} | deliver message | msg_payment_received |
| ${result} | deliver message | msg_payment_received | process_variables = ${variable_dictionary} |
| ${result} | deliver message | msg_payment_received | business_key = ${correlating_business_key} |
"""
with self._shared_resources.api_client as api_client:
correlation_message: CorrelationMessageDto = CorrelationMessageDto(**kwargs)
correlation_message.message_name = message_name
if not 'result_enabled' in kwargs:
correlation_message.result_enabled = True
if 'process_variables' in kwargs:
correlation_message.process_variables = CamundaResources.dict_to_camunda_json(
kwargs['process_variables'])

serialized_message = api_client.sanitize_for_serialization(correlation_message)
logger.debug(f'Message:\n{serialized_message}')

try:
response = requests.post(f'{self._shared_resources.camunda_url}/message', json=serialized_message,
headers={'Content-Type': 'application/json'})
except ApiException as e:
logger.error(f'Failed to deliver message:\n{e}')
raise e

try:
response.raise_for_status()
except HTTPError as e:
logger.error(response.text)
raise e

if correlation_message.result_enabled:
json = response.json()
logger.debug(json)
return json
else:
return {}

@keyword("Fetch and Lock workloads", tags=['task', 'deprecated'])
def fetch_and_lock_workloads(self, topic, **kwargs) -> Dict:
"""*DEPRECATED*
Expand Down Expand Up @@ -489,7 +532,7 @@ def delete_process_instance(self, process_instance_id):
api_instance = openapi_client.ProcessInstanceApi(api_client)

try:
api_instance.delete_process_instance(id=process_instance_id)
response = api_instance.delete_process_instance(id=process_instance_id)
except ApiException as e:
logger.error(f'Failed to delete process instance {process_instance_id}:\n{e}')
raise e
Expand Down
33 changes: 26 additions & 7 deletions CamundaLibrary/CamundaResources.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import base64
import json
import os
from collections.abc import Collection
from typing import Dict, Any

from generic_camunda_client import Configuration, ApiClient, VariableValueDto
from typing import Dict, Any
import os
import base64
import json


class CamundaResources:
Expand Down Expand Up @@ -73,7 +73,7 @@ def convert_openapi_variables_to_dict(open_api_variables: Dict[str, VariableValu
return {k: CamundaResources.convert_variable_dto(v) for k, v in open_api_variables.items()}

@staticmethod
def convert_dict_to_openapi_variables(variabes: dict) -> Dict[str,VariableValueDto]:
def convert_dict_to_openapi_variables(variabes: dict) -> Dict[str, VariableValueDto]:
"""
Converts the variables to a simple dictionary
:return: dict
Expand Down Expand Up @@ -135,7 +135,7 @@ def convert_file_to_dto(path: str) -> VariableValueDto:
def convert_to_variable_dto(value: Any) -> VariableValueDto:
if isinstance(value, str):
return VariableValueDto(value=value)
elif isinstance(value, Collection): # String is also a collection and must be filtered before Collection.
elif isinstance(value, Collection): # String is also a collection and must be filtered before Collection.
return VariableValueDto(value=json.dumps(value), type='Json')
else:
return VariableValueDto(value=value)
Expand All @@ -148,10 +148,29 @@ def convert_variable_dto(dto: VariableValueDto) -> Any:
return json.loads(dto.value)
return dto.value

@staticmethod
def dict_to_camunda_json(d: dict) -> Any:
"""
Example:
>>> CamundaResources.dict_to_camunda_json({'a':1})
{'a': {'value': 1}}
>>> CamundaResources.dict_to_camunda_json({'person': {'age': 25}})
{'person': {'value': '{"age": 25}', 'type': 'Json'}}
>>> CamundaResources.dict_to_camunda_json({'languages': ['English', 'Suomi']})
{'languages': {'value': '["English", "Suomi"]', 'type': 'Json'}}
>>> CamundaResources.dict_to_camunda_json({'person': {'age': 25, 'languages': ['English', 'Suomi']}})
{'person': {'value': '{"age": 25, "languages": ["English", "Suomi"]}', 'type': 'Json'}}
"""
return {k: {'value': json.dumps(v), 'type': 'Json'} if isinstance(v, Collection) else {'value': v}
for k, v in d.items()}


if __name__ == '__main__':
import doctest
import xmlrunner

suite = doctest.DocTestSuite()
xmlrunner.XMLTestRunner(output='logs').run(suite)
xmlrunner.XMLTestRunner(output='logs').run(suite)
88 changes: 88 additions & 0 deletions tests/bpmn/message_test.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_0o68s4x" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="4.6.0">
<bpmn:collaboration id="Collaboration_1259shc">
<bpmn:participant id="Participant_17r119v" name="Send Message" processRef="process_send_message" />
<bpmn:participant id="Participant_01x95n7" name="Receive Message" processRef="process_receive_message" />
<bpmn:messageFlow id="Flow_13avncs" sourceRef="Activity_07thf2h" targetRef="Event_1w9m0l7" />
</bpmn:collaboration>
<bpmn:process id="process_send_message" name="Send Message" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_0b7wd5l</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:endEvent id="Event_0w7ck3j">
<bpmn:incoming>Flow_0d25nx3</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sendTask id="Activity_07thf2h" name="Send message" camunda:type="external" camunda:topic="send_message">
<bpmn:incoming>Flow_0b7wd5l</bpmn:incoming>
<bpmn:outgoing>Flow_0d25nx3</bpmn:outgoing>
</bpmn:sendTask>
<bpmn:sequenceFlow id="Flow_0b7wd5l" sourceRef="StartEvent_1" targetRef="Activity_07thf2h" />
<bpmn:sequenceFlow id="Flow_0d25nx3" sourceRef="Activity_07thf2h" targetRef="Event_0w7ck3j" />
</bpmn:process>
<bpmn:process id="process_receive_message" name="Receive Message" isExecutable="true">
<bpmn:startEvent id="Event_1w9m0l7">
<bpmn:outgoing>Flow_1y1q6p6</bpmn:outgoing>
<bpmn:messageEventDefinition id="MessageEventDefinition_0ncs6dd" messageRef="Message_1253uuy" />
</bpmn:startEvent>
<bpmn:endEvent id="Event_0mydum3">
<bpmn:incoming>Flow_0h9hvs6</bpmn:incoming>
</bpmn:endEvent>
<bpmn:serviceTask id="Activity_1rllkk3" name="Read message" camunda:type="external" camunda:topic="read_message">
<bpmn:incoming>Flow_1y1q6p6</bpmn:incoming>
<bpmn:outgoing>Flow_0h9hvs6</bpmn:outgoing>
</bpmn:serviceTask>
<bpmn:sequenceFlow id="Flow_1y1q6p6" sourceRef="Event_1w9m0l7" targetRef="Activity_1rllkk3" />
<bpmn:sequenceFlow id="Flow_0h9hvs6" sourceRef="Activity_1rllkk3" targetRef="Event_0mydum3" />
</bpmn:process>
<bpmn:message id="Message_1253uuy" name="receive_message" />
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Collaboration_1259shc">
<bpmndi:BPMNShape id="Participant_17r119v_di" bpmnElement="Participant_17r119v" isHorizontal="true">
<dc:Bounds x="170" y="80" width="430" height="330" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_0b7wd5l_di" bpmnElement="Flow_0b7wd5l">
<di:waypoint x="318" y="200" />
<di:waypoint x="370" y="200" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0d25nx3_di" bpmnElement="Flow_0d25nx3">
<di:waypoint x="470" y="200" />
<di:waypoint x="502" y="200" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="282" y="182" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1xcbx8p_di" bpmnElement="Event_0w7ck3j">
<dc:Bounds x="502" y="182" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1d0huma_di" bpmnElement="Activity_07thf2h">
<dc:Bounds x="370" y="160" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Participant_01x95n7_di" bpmnElement="Participant_01x95n7" isHorizontal="true">
<dc:Bounds x="170" y="430" width="440" height="250" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1y1q6p6_di" bpmnElement="Flow_1y1q6p6">
<di:waypoint x="288" y="560" />
<di:waypoint x="340" y="560" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0h9hvs6_di" bpmnElement="Flow_0h9hvs6">
<di:waypoint x="440" y="560" />
<di:waypoint x="492" y="560" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Event_1bv3c92_di" bpmnElement="Event_1w9m0l7">
<dc:Bounds x="252" y="542" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0mydum3_di" bpmnElement="Event_0mydum3">
<dc:Bounds x="492" y="542" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_04v26ri_di" bpmnElement="Activity_1rllkk3">
<dc:Bounds x="340" y="520" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_13avncs_di" bpmnElement="Flow_13avncs">
<di:waypoint x="420" y="240" />
<di:waypoint x="420" y="391" />
<di:waypoint x="270" y="391" />
<di:waypoint x="270" y="542" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
138 changes: 138 additions & 0 deletions tests/robot/Message/deliver_message.robot
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
*** Settings ***
Library CamundaLibrary ${CAMUNDA_HOST}
Library Collections
Resource ../cleanup.resource
Suite Setup Deploy ${MODEL}

*** Variables ***
${MODEL} ${CURDIR}/../../bpmn/message_test.bpmn
${PROCESS_DEFINITION_KEY_SEND_MESSAGE} process_send_message
${PROCESS_DEFINITION_KEY_RECEIVE_MESSAGE} process_receive_message
${TOPIC_SEND_MESSAGE} send_message
${TOPIC_RECEIVE_MESSAGE} read_message
${MESSAGE_NAME} receive_message


*** Test Cases ***
Test Messaging with Message Result
[Setup] Prepare testcase
#GIVEN
Get workload from topic '${TOPIC_RECEIVE_MESSAGE}'
Last topic should have no workload
${workload} Get workload from topic '${TOPIC_SEND_MESSAGE}'

# EXPECT
Last topic should have workload

# WHEN
${message_response} Deliver Message ${MESSAGE_NAME}
Complete task

# THEN
Get workload from topic '${TOPIC_RECEIVE_MESSAGE}'
Last topic should have workload
Complete task

Get workload from topic '${TOPIC_SEND_MESSAGE}'
Last topic should have no workload

Should Not Be Empty ${message_response}

Test Messaging without Message Result
[Setup] Prepare testcase
#GIVEN
Get workload from topic '${TOPIC_RECEIVE_MESSAGE}'
Last topic should have no workload
${workload} Get workload from topic '${TOPIC_SEND_MESSAGE}'

# EXPECT
Last topic should have workload

# WHEN
${message_response} Deliver Message ${MESSAGE_NAME} result_enabled=${False}
Complete task

# THEN
Get workload from topic '${TOPIC_RECEIVE_MESSAGE}'
Last topic should have workload
Complete task

Get workload from topic '${TOPIC_SEND_MESSAGE}'
Last topic should have no workload

Should Be Empty ${message_response}

Test Messaging with variable
[Setup] Prepare testcase
#GIVEN
${process_variables} Create Dictionary message=Hello World
Get workload from topic '${TOPIC_RECEIVE_MESSAGE}'
Last topic should have no workload
${workload} Get workload from topic '${TOPIC_SEND_MESSAGE}'

# EXPECT
Last topic should have workload

# WHEN
${message_response} Deliver Message ${MESSAGE_NAME} process_variables=${process_variables}
Complete task

# THEN
${received_message_workload} Get workload from topic '${TOPIC_RECEIVE_MESSAGE}'
Last topic should have workload
Dictionaries Should Be Equal ${process_variables} ${received_message_workload}
Complete task

Get workload from topic '${TOPIC_SEND_MESSAGE}'
Last topic should have no workload

Should Not Be Empty ${message_response}
[Teardown] Complete Task

Test Messaging with dict variable
[Setup] Prepare testcase
#GIVEN
${languages} Create List Suomi English
${person} Create Dictionary firstname=Pekka lastname=Klärck languages=${languages}
${process_variables} Create Dictionary person=${person}
Get workload from topic '${TOPIC_RECEIVE_MESSAGE}'
Last topic should have no workload
${workload} Get workload from topic '${TOPIC_SEND_MESSAGE}'

# EXPECT
Last topic should have workload

# WHEN
${message_response} Deliver Message ${MESSAGE_NAME} process_variables=${process_variables}
Complete task

# THEN
${received_message_workload} Get workload from topic '${TOPIC_RECEIVE_MESSAGE}'
Last topic should have workload
Dictionaries Should Be Equal ${process_variables} ${received_message_workload}
Complete task

Get workload from topic '${TOPIC_SEND_MESSAGE}'
Last topic should have no workload

Should Not Be Empty ${message_response}


*** Keywords ***
Prepare testcase
Delete all instances from process '${PROCESS_DEFINITION_KEY_SEND_MESSAGE}'
Delete all instances from process '${PROCESS_DEFINITION_KEY_RECEIVE_MESSAGE}'
Start process ${PROCESS_DEFINITION_KEY_SEND_MESSAGE}


Get workload from topic '${topic}'
${workload} Fetch Workload ${topic}
[Return] ${workload}

Last topic should have workload
${recent_process_instance} Get fetch response
Should not be empty ${recent_process_instance}

Last topic should have no workload
${recent_process_instance} Get fetch response
Should Be Empty ${recent_process_instance}
2 changes: 2 additions & 0 deletions tests/robot/cleanup.resource
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ Delete all instances from process '${process_key}'
FOR ${instance} IN @{instances}
Delete process instance ${instance}[id]
END
${instances} Get all active process instances ${process_key}
Should Be Empty ${instances} Process ${process_key} should not have any processes anymore.

0 comments on commit 1dc7727

Please sign in to comment.