Skip to content

Commit

Permalink
MarketSquare#7 init notify failure
Browse files Browse the repository at this point in the history
  • Loading branch information
Noordsestern committed Sep 13, 2021
1 parent d2fb584 commit 7cc94ae
Showing 1 changed file with 26 additions and 1 deletion.
27 changes: 26 additions & 1 deletion CamundaLibrary/CamundaLibrary.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
VariableValueDto, FetchExternalTasksDto, FetchExternalTaskTopicDto, ProcessDefinitionApi, \
ProcessInstanceWithVariablesDto, StartProcessInstanceDto, ProcessInstanceModificationInstructionDto, \
ProcessInstanceApi, ProcessInstanceDto, VersionApi, EvaluateDecisionDto, MessageApi, \
MessageCorrelationResultWithVariableDto, CorrelationMessageDto, ActivityInstanceDto
MessageCorrelationResultWithVariableDto, CorrelationMessageDto, ActivityInstanceDto, ExternalTaskFailureDto

import generic_camunda_client as openapi_client

Expand Down Expand Up @@ -444,6 +444,31 @@ def bpmn_error(self, error_code: str, error_message:str = None, variables: Dict[
except ApiException as e:
logger.error(f"Exception when calling ExternalTaskApi->handle_external_task_bpmn_error: {e}\n")

@keyword("Notify failure", tags=["task", "beta"])
def notify_failure(self, **kwargs):
"""
Raises a failure to Camunda. When retry counter is less than 1, an incident is created by Camunda.
CamundaLibrary takes care of providing the worker_id and task_id. *retry_timeout* is equal to *lock_duration* for external tasks.
Check for camunda client documentation for all parameters of the request body: https://noordsestern.gitlab.io/camunda-client-for-python/7-15-0/docs/ExternalTaskApi.html#handle_failure
Example:
| | ${variables} | *fetch workload* | _my_first_task_in_demo_ | |
| | *notify failure* | retries=3 | error_message= | _json=${{ {'workerId': '${fetch_response}[worker_id]'} }}_ |
"""
if not self.FETCH_RESPONSE:
logger.warn('No task to notify failure for. Maybe you did not fetch and lock a workitem before?')
else:
with self._shared_resources.api_client as api_client:
api_instance = openapi_client.ExternalTaskApi(api_client)
external_task_failure_dto = ExternalTaskFailureDto(worker_id=self.WORKER_ID, retry_timeout=60000, **kwargs)

try:
api_instance.handle_failure(id=self.FETCH_RESPONSE.id, external_task_failure_dto=external_task_failure_dto)
self.drop_fetch_response()
except ApiException as e:
print("Exception when calling ExternalTaskApi->handle_failure: %s\n" % e)

@keyword("Complete task", tags=['task'])
def complete(self, result_set: Dict[str, Any] = None, files: Dict = None):
"""
Expand Down

0 comments on commit 7cc94ae

Please sign in to comment.