From ba7578b20fc47335bbde8c18365ccc330148dbe5 Mon Sep 17 00:00:00 2001 From: william Date: Wed, 20 Nov 2024 16:59:38 +1100 Subject: [PATCH 1/7] added rerun endpoint --- .../aws_event_bridge}/README.md | 0 .../aws_event_bridge}/__init__.py | 0 .../aws_event_bridge/event.py | 37 ++++++ .../executionservice/__init__.py | 0 .../workflowrunstatechange/AWSEvent.py | 0 .../workflowrunstatechange/LibraryRecord.py | 0 .../workflowrunstatechange/Payload.py | 0 .../WorkflowRunStateChange.py | 0 .../workflowrunstatechange/__init__.py | 0 .../workflowrunstatechange/marshaller.py | 2 +- .../workflowmanager/__init__.py | 0 .../workflowrunstatechange/AWSEvent.py | 0 .../workflowrunstatechange/LibraryRecord.py | 0 .../workflowrunstatechange/Payload.py | 0 .../WorkflowRunStateChange.py | 0 .../workflowrunstatechange/__init__.py | 0 .../workflowrunstatechange/marshaller.py | 2 +- .../workflow_manager/models/utils.py | 7 +- .../serializers/workflow_run_action.py | 43 +++++++ .../workflow_manager/settings/local.py | 1 + .../workflow_manager/urls/base.py | 2 + .../viewsets/workflow_run_action.py | 115 ++++++++++++++++++ .../lambdas/handle_service_wrsc_event.py | 4 +- .../lambdas/transition_bcm_fastq_copy.py | 4 +- .../services/__init__.py | 2 +- .../services/create_workflow_run_state.py | 4 +- .../emit_workflow_run_state_change.py | 4 +- .../tests/test_create_workflow_run_state.py | 2 +- 28 files changed, 216 insertions(+), 13 deletions(-) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/README.md (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/__init__.py (100%) create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/executionservice/__init__.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/executionservice/workflowrunstatechange/AWSEvent.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/executionservice/workflowrunstatechange/LibraryRecord.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/executionservice/workflowrunstatechange/Payload.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/executionservice/workflowrunstatechange/WorkflowRunStateChange.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/executionservice/workflowrunstatechange/__init__.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain/workflowmanager => workflow_manager/aws_event_bridge/executionservice}/workflowrunstatechange/marshaller.py (98%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/workflowmanager/__init__.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/workflowmanager/workflowrunstatechange/AWSEvent.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/workflowmanager/workflowrunstatechange/LibraryRecord.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/workflowmanager/workflowrunstatechange/Payload.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain => workflow_manager/aws_event_bridge}/workflowmanager/workflowrunstatechange/__init__.py (100%) rename lib/workload/stateless/stacks/workflow-manager/{workflow_manager_proc/domain/executionservice => workflow_manager/aws_event_bridge/workflowmanager}/workflowrunstatechange/marshaller.py (98%) create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py create mode 100644 lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/README.md b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/README.md similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/README.md rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/README.md diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/__init__.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/__init__.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/__init__.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py new file mode 100644 index 000000000..113792be6 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py @@ -0,0 +1,37 @@ +import os +import logging +import boto3 +import json +from typing import Literal +import workflow_manager.aws_event_bridge.workflowmanager.workflowrunstatechange as wfm + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +client = boto3.client('events') +event_bus_name = os.environ["EVENT_BUS_NAME"] + + +def emit_wrsc_api_event(event): + """ + Emit events to the event bridge sourced from the workflow manager API + """ + source = "orcabus.workflowmanager" + + logger.info(f"Emitting event: {event}") + + response = client.put_events( + Entries=[ + { + 'Source': source, + 'DetailType': wfm.WorkflowRunStateChange.__name__, + 'Detail': json.dumps(wfm.Marshaller.marshall(event)), + 'EventBusName': event_bus_name, + }, + ], + ) + + logger.info(f"Sent a WRSC event to event bus {event_bus_name}:") + logger.info(event) + logger.info(f"{__name__} done.") + return response diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/__init__.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/__init__.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/__init__.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/AWSEvent.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/AWSEvent.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/AWSEvent.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/AWSEvent.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/LibraryRecord.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/LibraryRecord.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/LibraryRecord.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/LibraryRecord.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/Payload.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/Payload.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/Payload.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/Payload.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/WorkflowRunStateChange.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/WorkflowRunStateChange.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/WorkflowRunStateChange.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/WorkflowRunStateChange.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/__init__.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/__init__.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/__init__.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/marshaller.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/marshaller.py similarity index 98% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/marshaller.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/marshaller.py index 86d3ccfbb..7954db8c1 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/marshaller.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/executionservice/workflowrunstatechange/marshaller.py @@ -1,7 +1,7 @@ import datetime import re import six -from workflow_manager_proc.domain.workflowmanager import workflowrunstatechange +from workflow_manager.aws_event_bridge.executionservice import workflowrunstatechange class Marshaller: PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/__init__.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/__init__.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/__init__.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/AWSEvent.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/AWSEvent.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/AWSEvent.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/AWSEvent.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/LibraryRecord.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/LibraryRecord.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/LibraryRecord.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/LibraryRecord.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/Payload.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/Payload.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/Payload.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/Payload.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/WorkflowRunStateChange.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/__init__.py similarity index 100% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/workflowmanager/workflowrunstatechange/__init__.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/__init__.py diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/marshaller.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/marshaller.py similarity index 98% rename from lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/marshaller.py rename to lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/marshaller.py index 5f6d04a6b..e103cb1cc 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/domain/executionservice/workflowrunstatechange/marshaller.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/workflowmanager/workflowrunstatechange/marshaller.py @@ -1,7 +1,7 @@ import datetime import re import six -from workflow_manager_proc.domain.executionservice import workflowrunstatechange +from workflow_manager.aws_event_bridge.workflowmanager import workflowrunstatechange class Marshaller: PRIMITIVE_TYPES = (float, bool, bytes, six.text_type) + six.integer_types diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/utils.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/utils.py index 06e56a0b7..c0654a176 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/utils.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/models/utils.py @@ -1,5 +1,6 @@ import logging -from datetime import timedelta +import uuid +from datetime import timedelta, datetime, timezone from typing import List from workflow_manager.models import Status, State, WorkflowRun @@ -142,3 +143,7 @@ def get_latest_state(states: List[State]) -> State: last = s return last + +def create_portal_run_id() -> str: + date = datetime.now(timezone.utc) + return f"{date.year}{date.month}{date.day}{str(uuid.uuid4())[:8]}" \ No newline at end of file diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py new file mode 100644 index 000000000..568904a9a --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/workflow_run_action.py @@ -0,0 +1,43 @@ +from enum import StrEnum +from typing import Type + +from rest_framework import serializers + + +class AllowedRerunWorkflow(StrEnum): + RNASUM = "rnasum" + + +class BaseRerunInputSerializer(serializers.Serializer): + + def update(self, instance, validated_data): + pass + + def create(self, validated_data): + pass + + +class RnasumRerunInputSerializer(BaseRerunInputSerializer): + """ + For 'rnasum' workflow rerun only allow dataset to be overridden. + """ + + # https://github.com/umccr/RNAsum/blob/master/TCGA_projects_summary.md + allowed_dataset_choice = [ + # PRIMARY_DATASETS_OPTION + "BRCA", "THCA", "HNSC", "LGG", "KIRC", "LUSC", "LUAD", "PRAD", "STAD", "LIHC", "COAD", "KIRP", + "BLCA", "OV", "SARC", "PCPG", "CESC", "UCEC", "PAAD", "TGCT", "LAML", "ESCA", "GBM", "THYM", + "SKCM", "READ", "UVM", "ACC", "MESO", "KICH", "UCS", "DLBC", "CHOL", + # EXTENDED_DATASETS_OPTION + "LUAD-LCNEC", "BLCA-NET", + "PAAD-IPMN", "PAAD-NET", "PAAD-ACC", + # PAN_CANCER_DATASETS_OPTION + "PANCAN" + ] + + dataset = serializers.ChoiceField(choices=allowed_dataset_choice, required=True) + + +RERUN_INPUT_SERIALIZERS: dict[AllowedRerunWorkflow, Type[BaseRerunInputSerializer]] = { + AllowedRerunWorkflow.RNASUM: RnasumRerunInputSerializer, +} diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/settings/local.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/settings/local.py index c942eb120..4cfdfd3c2 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/settings/local.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/settings/local.py @@ -62,4 +62,5 @@ 'drf_spectacular.contrib.djangorestframework_camel_case.camelize_serializer_fields', 'drf_spectacular.hooks.postprocess_schema_enums' ], + 'SCHEMA_PATH_PREFIX': f'/api/{API_VERSION}/', } diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py index 24e58f2d9..fcf899754 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py @@ -8,6 +8,7 @@ from workflow_manager.viewsets.payload import PayloadViewSet from workflow_manager.viewsets.analysis_context import AnalysisContextViewSet from workflow_manager.viewsets.state import StateViewSet +from workflow_manager.viewsets.workflow_run_action import WorkflowRunAction # from workflow_manager.viewsets.library import LibraryViewSet from workflow_manager.viewsets.workflow_run_comment import WorkflowRunCommentViewSet from workflow_manager.settings.base import API_VERSION @@ -22,6 +23,7 @@ router.register(r"analysiscontext", AnalysisContextViewSet, basename="analysiscontext") router.register(r"workflow", WorkflowViewSet, basename="workflow") router.register(r"workflowrun", WorkflowRunViewSet, basename="workflowrun") +router.register(r"workflowrun", WorkflowRunAction, basename="workflowrun-action") router.register(r"payload", PayloadViewSet, basename="payload") # may no longer need this as it's currently included in the detail response for an individual WorkflowRun record diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py new file mode 100644 index 000000000..e897f1599 --- /dev/null +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py @@ -0,0 +1,115 @@ +import json + +from datetime import datetime, timezone +from rest_framework import status +from rest_framework.viewsets import ViewSet +from rest_framework.decorators import action +from rest_framework.response import Response + +from django.shortcuts import get_object_or_404 + +from drf_spectacular.types import OpenApiTypes +from drf_spectacular.utils import extend_schema, PolymorphicProxySerializer + +from workflow_manager.aws_event_bridge.event import emit_wrsc_api_event +from workflow_manager.models.utils import create_portal_run_id +from workflow_manager.serializers.library import LibrarySerializer +from workflow_manager.serializers.payload import PayloadSerializer +from workflow_manager.serializers.workflow_run_action import AllowedRerunWorkflow, RERUN_INPUT_SERIALIZERS +from workflow_manager.models import ( + WorkflowRun, + State, +) + + +class WorkflowRunAction(ViewSet): + queryset = WorkflowRun.objects.prefetch_related('states').all() + orcabus_id_prefix = WorkflowRun.orcabus_id_prefix + + @extend_schema( + request= PolymorphicProxySerializer( + component_name='WorkflowRunRerun', + serializers=RERUN_INPUT_SERIALIZERS.values(), + resource_type_field_name=None + ), + responses=OpenApiTypes.OBJECT, + description="Trigger a workflow run rerun by emitting an event to EventBridge with an overridden workflow " + "input payload." + ) + @action( + detail=True, + methods=['post'], + url_name='rerun', + url_path='rerun' + ) + def rerun(self, request, *args, **kwargs): + """ + rerun from existing workflow run + """ + pk = self.kwargs.get('pk') + if pk and pk.startswith(self.orcabus_id_prefix): + pk = pk[len(self.orcabus_id_prefix):] + wfl_run = get_object_or_404(self.queryset, pk=pk) + + # Only approved workflow_name is allowed + if wfl_run.workflow.workflow_name not in AllowedRerunWorkflow: + return Response(f"This workflow type is not allowed: {wfl_run.workflow.workflow_name}", + status=status.HTTP_400_BAD_REQUEST) + + serializer = RERUN_INPUT_SERIALIZERS[wfl_run.workflow.workflow_name](data=request.data) + if not serializer.is_valid(): + return Response(serializer.errors, + status=status.HTTP_400_BAD_REQUEST) + + detail = construct_rerun_eb_detail(wfl_run, serializer.data) + emit_wrsc_api_event(detail) + + return Response(detail, status=status.HTTP_200_OK) + + +def construct_rerun_eb_detail(wfl_run: WorkflowRun, input_body: dict) -> dict: + """ + Construct event bridge detail for rerun based on the existing workflow run and request body + """ + new_portal_run_id = create_portal_run_id() + wfl_name = wfl_run.workflow.workflow_name + + new_payload: dict + if wfl_name == AllowedRerunWorkflow.RNASUM.value: + new_payload = construct_rnasum_rerun_payload(wfl_run, new_portal_run_id, input_body) + else: + raise ValueError(f"Rerun is not allowed for this workflow: {wfl_name}") + + return { + "status": 'READY', + "payload": new_payload, + "portalRunId": new_portal_run_id, + "linkedLibraries": LibrarySerializer(wfl_run.libraries.all(), many=True).data, + "workflowName": wfl_run.workflow.workflow_name, + "workflowRunName": wfl_run.workflow_run_name, + "workflowVersion": wfl_run.workflow.workflow_version, + "timestamp": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), + } + + +def construct_rnasum_rerun_payload(wfl_run: WorkflowRun, new_portal_run_id: str, input_body: dict) -> dict: + """ + Construct payload for rerun for 'rnasum' workflow based on the request body payload + """ + + # Get the payload where the state is 'READY' + ready_state: State = wfl_run.states.filter(status='READY').order_by('-orcabus_id').first() + ready_data_payload = PayloadSerializer(ready_state.payload).data.get("data", None) + + # Start crafting the payload based on the old ones + new_data_payload = ready_data_payload.copy() + + # Override payload based on given input + new_data_payload["inputs"]["dataset"] = input_body["dataset"] + + # Replace old portal_run_id with new_portal_run_id in any part of the string + # In the 'rnasum` payload, the engine parameter URI prefixes contain the portal_run_id + new_data_payload = json.loads( + json.dumps(new_data_payload).replace(f"{wfl_run.portal_run_id}", f"{new_portal_run_id}")) + + return new_data_payload diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py index 2e6df24a2..5afb7a0b7 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/handle_service_wrsc_event.py @@ -3,8 +3,8 @@ django.setup() # --- keep ^^^ at top of the module -import workflow_manager_proc.domain.executionservice.workflowrunstatechange as srv -import workflow_manager_proc.domain.workflowmanager.workflowrunstatechange as wfm +import workflow_manager.aws_event_bridge.executionservice.workflowrunstatechange as srv +import workflow_manager.aws_event_bridge.workflowmanager.workflowrunstatechange as wfm from workflow_manager_proc.services import emit_workflow_run_state_change, create_workflow_run_state import logging diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/transition_bcm_fastq_copy.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/transition_bcm_fastq_copy.py index 0be7b318d..ba03ba6b1 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/transition_bcm_fastq_copy.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/lambdas/transition_bcm_fastq_copy.py @@ -4,8 +4,8 @@ # --- keep ^^^ at top of the module from workflow_manager.models.workflow_run import WorkflowRun -import workflow_manager_proc.domain.executionservice.workflowrunstatechange as srv -import workflow_manager_proc.domain.workflowmanager.workflowrunstatechange as wfm +import workflow_manager.aws_event_bridge.executionservice.workflowrunstatechange as srv +import workflow_manager.aws_event_bridge.workflowmanager.workflowrunstatechange as wfm import logging logger = logging.getLogger(__name__) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py index d94862699..29304bf83 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/__init__.py @@ -1,5 +1,5 @@ import uuid -from workflow_manager_proc.domain.executionservice.workflowrunstatechange import WorkflowRunStateChange +from workflow_manager.aws_event_bridge.executionservice.workflowrunstatechange import WorkflowRunStateChange from workflow_manager.models import Payload diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run_state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run_state.py index 66d9b71ee..07fd804f7 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run_state.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/create_workflow_run_state.py @@ -7,8 +7,8 @@ import logging from django.db import transaction -import workflow_manager_proc.domain.executionservice.workflowrunstatechange as srv -import workflow_manager_proc.domain.workflowmanager.workflowrunstatechange as wfm +import workflow_manager.aws_event_bridge.executionservice.workflowrunstatechange as srv +import workflow_manager.aws_event_bridge.workflowmanager.workflowrunstatechange as wfm from workflow_manager.models import ( WorkflowRun, Workflow, diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/emit_workflow_run_state_change.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/emit_workflow_run_state_change.py index e35d8a220..8c3d59cb2 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/emit_workflow_run_state_change.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/services/emit_workflow_run_state_change.py @@ -1,8 +1,8 @@ import os import boto3 import json -import workflow_manager_proc.domain.workflowmanager.workflowrunstatechange as wfm -from workflow_manager_proc.domain.workflowmanager.workflowrunstatechange import WorkflowRunStateChange +import workflow_manager.aws_event_bridge.workflowmanager.workflowrunstatechange as wfm +from workflow_manager.aws_event_bridge.workflowmanager.workflowrunstatechange import WorkflowRunStateChange import logging logger = logging.getLogger(__name__) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_create_workflow_run_state.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_create_workflow_run_state.py index d742029ed..3db4a77a6 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_create_workflow_run_state.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager_proc/tests/test_create_workflow_run_state.py @@ -5,7 +5,7 @@ from django.db.models import QuerySet from django.utils.timezone import make_aware -from workflow_manager_proc.domain.workflowmanager.workflowrunstatechange import WorkflowRunStateChange +from workflow_manager.aws_event_bridge.workflowmanager.workflowrunstatechange import WorkflowRunStateChange from workflow_manager_proc.services import create_workflow_run_state from workflow_manager_proc.tests.case import WorkflowManagerProcUnitTestCase, logger from workflow_manager.models import WorkflowRun, State, WorkflowRunUtil, Library From 9c0d0ef6536092d9642b31cf51ec034d7d3e1710 Mon Sep 17 00:00:00 2001 From: william Date: Wed, 20 Nov 2024 23:05:41 +1100 Subject: [PATCH 2/7] tests and cdk --- .../stacks/workflow-manager/deploy/stack.ts | 12 +++ .../aws_event_bridge/event.py | 26 +++---- .../workflow_manager/tests/factories.py | 73 ++++++++++++++++++- .../workflow_manager/tests/test_viewsets.py | 50 ++++++++++++- .../viewsets/workflow_run_action.py | 2 +- 5 files changed, 144 insertions(+), 19 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts b/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts index 88d549466..e8e00a809 100644 --- a/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts @@ -111,6 +111,7 @@ export class WorkflowManagerStack extends Stack { } private createApiHandlerAndIntegration(props: WorkflowManagerStackProps) { + const API_VERSION = 'v1'; const apiFn: PythonFunction = this.createPythonFunction('Api', { index: 'api.py', handler: 'handler', @@ -145,6 +146,17 @@ export class WorkflowManagerStack extends Stack { integration: apiIntegration, routeKey: HttpRouteKey.with('/{proxy+}', HttpMethod.DELETE), }); + + // Route and permission for rerun cases where it needs to put event to mainBus + this.mainBus.grantPutEventsTo(apiFn); + new HttpRoute(this, 'PostRerunHttpRoute', { + httpApi: httpApi, + integration: apiIntegration, + routeKey: HttpRouteKey.with( + `api/${API_VERSION}/workflowrun/{orcabusId}/rerun/{proxy+}`, + HttpMethod.POST + ), + }); } private createHandleServiceWrscEventHandler() { diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py index 113792be6..cf2c9b88c 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py @@ -1,15 +1,12 @@ import os import logging -import boto3 import json -from typing import Literal +from libumccr.aws import libeb import workflow_manager.aws_event_bridge.workflowmanager.workflowrunstatechange as wfm logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) -client = boto3.client('events') -event_bus_name = os.environ["EVENT_BUS_NAME"] def emit_wrsc_api_event(event): @@ -17,19 +14,18 @@ def emit_wrsc_api_event(event): Emit events to the event bridge sourced from the workflow manager API """ source = "orcabus.workflowmanager" + event_bus_name = os.environ.get("EVENT_BUS_NAME", None) - logger.info(f"Emitting event: {event}") + if event_bus_name is None: + raise ValueError("EVENT_BUS_NAME environment variable is not set.") - response = client.put_events( - Entries=[ - { - 'Source': source, - 'DetailType': wfm.WorkflowRunStateChange.__name__, - 'Detail': json.dumps(wfm.Marshaller.marshall(event)), - 'EventBusName': event_bus_name, - }, - ], - ) + logger.info(f"Emitting event: {event}") + response = libeb.emit_event({ + 'Source': source, + 'DetailType': wfm.WorkflowRunStateChange.__name__, + 'Detail': json.dumps(wfm.Marshaller.marshall(event)), + 'EventBusName': event_bus_name, + }) logger.info(f"Sent a WRSC event to event bus {event_bus_name}:") logger.info(event) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py index b271e6b54..41e9266ea 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/factories.py @@ -1,12 +1,12 @@ from enum import Enum import uuid -from datetime import datetime +from datetime import datetime, timedelta from zoneinfo import ZoneInfo import factory from django.utils.timezone import make_aware -from workflow_manager.models import Workflow, WorkflowRun, Payload, Library, State +from workflow_manager.models import Workflow, WorkflowRun, Payload, Library, State, LibraryAssociation class TestConstant(Enum): @@ -72,3 +72,72 @@ class Meta: comment = "Comment" payload = None workflow_run = factory.SubFactory(WorkflowRunFactory) + + +class PrimaryTestData(): + WORKFLOW_NAME = "TestWorkflow" + + STATUS_DRAFT = "DRAFT" + STATUS_START = "READY" + STATUS_RUNNING = "RUNNING" + STATUS_END = "SUCCEEDED" + STATUS_FAIL = "FAILED" + STATUS_RESOLVED = "RESOLVED" + + + + def create_primary(self, generic_payload, libraries): + """ + Case: a primary workflow with two executions linked to 4 libraries + The first execution failed and led to a repetition that succeeded + """ + + wf = WorkflowFactory(workflow_name=self.WORKFLOW_NAME + "Primary") + + # The first execution (workflow run 1) + wfr_1: WorkflowRun = WorkflowRunFactory( + workflow_run_name=self.WORKFLOW_NAME + "PrimaryRun1", + portal_run_id="1234", + workflow=wf + ) + + for i, state in enumerate([self.STATUS_DRAFT, self.STATUS_START, self.STATUS_RUNNING, self.STATUS_FAIL]): + StateFactory(workflow_run=wfr_1, status=state, payload=generic_payload, + timestamp=make_aware(datetime.now() + timedelta(hours=i))) + for i in [0, 1, 2, 3]: + LibraryAssociation.objects.create( + workflow_run=wfr_1, + library=libraries[i], + association_date=make_aware(datetime.now()), + status="ACTIVE", + ) + + # The second execution (workflow run 2) + wfr_2: WorkflowRun = WorkflowRunFactory( + workflow_run_name=self.WORKFLOW_NAME + "PrimaryRun2", + portal_run_id="1235", + workflow=wf + ) + for i, state in enumerate([self.STATUS_DRAFT, self.STATUS_START, self.STATUS_RUNNING, self.STATUS_END]): + StateFactory(workflow_run=wfr_2, status=state, payload=generic_payload, + timestamp=make_aware(datetime.now() + timedelta(hours=i))) + for i in [0, 1, 2, 3]: + LibraryAssociation.objects.create( + workflow_run=wfr_2, + library=libraries[i], + association_date=make_aware(datetime.now()), + status="ACTIVE", + ) + + def setup(self): + + # Common components: payload and libraries + generic_payload = PayloadFactory() # Payload content is not important for now + libraries = [ + LibraryFactory(orcabus_id="01J5M2JFE1JPYV62RYQEG99CP1", library_id="L000001"), + LibraryFactory(orcabus_id="02J5M2JFE1JPYV62RYQEG99CP2", library_id="L000002"), + LibraryFactory(orcabus_id="03J5M2JFE1JPYV62RYQEG99CP3", library_id="L000003"), + LibraryFactory(orcabus_id="04J5M2JFE1JPYV62RYQEG99CP4", library_id="L000004") + ] + + self.create_primary(generic_payload, libraries) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/test_viewsets.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/test_viewsets.py index 9a7b68ce9..d0e70508d 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/test_viewsets.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/tests/test_viewsets.py @@ -1,12 +1,16 @@ import logging +import os from unittest import skip +from unittest.mock import MagicMock from django.test import TestCase +from libumccr.aws import libeb +from workflow_manager.models import WorkflowRun from workflow_manager.models.workflow import Workflow +from workflow_manager.tests.factories import PrimaryTestData from workflow_manager.urls.base import api_base - logger = logging.getLogger() logger.setLevel(logging.INFO) @@ -28,3 +32,47 @@ def test_get_api(self): response = self.client.get(f"{self.endpoint}/") logger.info(response) self.assertEqual(response.status_code, 200, 'Ok status response is expected') + + +class WorkflowRunRerunViewSetTestCase(TestCase): + endpoint = f"/{api_base}workflowrun" + + def setUp(self): + os.environ["EVENT_BUS_NAME"] = "mock-bus" + PrimaryTestData().setup() + self._real_emit_event = libeb.emit_event + libeb.emit_events = MagicMock() + + def tearDown(self) -> None: + libeb.emit_event = self._real_emit_event + + def test_rerun_api(self): + """ + python manage.py test workflow_manager.tests.test_viewsets.WorkflowRunRerunViewSetTestCase.test_rerun_api + """ + wfl_run = WorkflowRun.objects.all().first() + payload = wfl_run.states.get(status='READY').payload + payload.data = { + "inputs": { + "someUri": "s3://random/prefix/" + }, + "engineParameters": { + "sourceUri": f"s3:/bucket/{wfl_run.portal_run_id}/", + } + } + payload.save() + + response = self.client.post(f"{self.endpoint}/{wfl_run.orcabus_id}/rerun") + self.assertIn(response.status_code, [400], 'Workflow name associated with the workflow run is not allowed') + + # Change the workflow name to 'rnasum' as this is the only allowed workflow name for rerrun + wfl = Workflow.objects.all().first() + wfl.workflow_name = "rnasum" + wfl.save() + + response = self.client.post(f"{self.endpoint}/{wfl_run.orcabus_id}/rerun", data={"dataset": "INVALID_CHOICE"}) + self.assertIn(response.status_code, [400], 'Invalid payload expected') + + response = self.client.post(f"{self.endpoint}/{wfl_run.orcabus_id}/rerun", data={"dataset": "BRCA"}) + self.assertIn(response.status_code, [200], 'Expected a successful response') + self.assertTrue(wfl_run.portal_run_id not in str(response.content), 'expect old portal_rub_id replaced') diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py index e897f1599..f83bea44a 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py @@ -98,7 +98,7 @@ def construct_rnasum_rerun_payload(wfl_run: WorkflowRun, new_portal_run_id: str, """ # Get the payload where the state is 'READY' - ready_state: State = wfl_run.states.filter(status='READY').order_by('-orcabus_id').first() + ready_state: State = wfl_run.states.get(status='READY') ready_data_payload = PayloadSerializer(ready_state.payload).data.get("data", None) # Start crafting the payload based on the old ones From 5a7b7eb9dee0024d1e4775843c5a0ef3cdd1d983 Mon Sep 17 00:00:00 2001 From: william Date: Thu, 21 Nov 2024 14:22:35 +1100 Subject: [PATCH 3/7] update orcabus prefix support --- .../stateless/stacks/workflow-manager/deploy/stack.ts | 2 +- .../stacks/workflow-manager/workflow_manager/urls/base.py | 4 ++-- .../workflow_manager/viewsets/workflow_run_action.py | 5 +++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts b/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts index e8e00a809..90d7fe439 100644 --- a/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts @@ -153,7 +153,7 @@ export class WorkflowManagerStack extends Stack { httpApi: httpApi, integration: apiIntegration, routeKey: HttpRouteKey.with( - `api/${API_VERSION}/workflowrun/{orcabusId}/rerun/{proxy+}`, + `/api/${API_VERSION}/workflowrun/{orcabusId}/rerun/{proxy+}`, HttpMethod.POST ), }); diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py index fcf899754..da0274072 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/urls/base.py @@ -8,7 +8,7 @@ from workflow_manager.viewsets.payload import PayloadViewSet from workflow_manager.viewsets.analysis_context import AnalysisContextViewSet from workflow_manager.viewsets.state import StateViewSet -from workflow_manager.viewsets.workflow_run_action import WorkflowRunAction +from workflow_manager.viewsets.workflow_run_action import WorkflowRunActionViewSet # from workflow_manager.viewsets.library import LibraryViewSet from workflow_manager.viewsets.workflow_run_comment import WorkflowRunCommentViewSet from workflow_manager.settings.base import API_VERSION @@ -23,7 +23,7 @@ router.register(r"analysiscontext", AnalysisContextViewSet, basename="analysiscontext") router.register(r"workflow", WorkflowViewSet, basename="workflow") router.register(r"workflowrun", WorkflowRunViewSet, basename="workflowrun") -router.register(r"workflowrun", WorkflowRunAction, basename="workflowrun-action") +router.register(r"workflowrun", WorkflowRunActionViewSet, basename="workflowrun-action") router.register(r"payload", PayloadViewSet, basename="payload") # may no longer need this as it's currently included in the detail response for an individual WorkflowRun record diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py index f83bea44a..951f00e95 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py @@ -22,14 +22,15 @@ ) -class WorkflowRunAction(ViewSet): +class WorkflowRunActionViewSet(ViewSet): + lookup_value_regex = "[^/]+" # to allow orcabus id prefix queryset = WorkflowRun.objects.prefetch_related('states').all() orcabus_id_prefix = WorkflowRun.orcabus_id_prefix @extend_schema( request= PolymorphicProxySerializer( component_name='WorkflowRunRerun', - serializers=RERUN_INPUT_SERIALIZERS.values(), + serializers=list(RERUN_INPUT_SERIALIZERS.values()), resource_type_field_name=None ), responses=OpenApiTypes.OBJECT, From d3455689e33c8c84d220a2d6ec92acbf1a2314da Mon Sep 17 00:00:00 2001 From: william Date: Thu, 21 Nov 2024 16:59:08 +1100 Subject: [PATCH 4/7] update source --- .../workflow_manager/aws_event_bridge/event.py | 2 +- .../workflow_manager/viewsets/workflow_run_action.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py index cf2c9b88c..2463933ca 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py @@ -13,7 +13,7 @@ def emit_wrsc_api_event(event): """ Emit events to the event bridge sourced from the workflow manager API """ - source = "orcabus.workflowmanager" + source = "orcabus.workflowmanagerapi" event_bus_name = os.environ.get("EVENT_BUS_NAME", None) if event_bus_name is None: diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py index 951f00e95..70d8288e9 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py @@ -23,19 +23,19 @@ class WorkflowRunActionViewSet(ViewSet): - lookup_value_regex = "[^/]+" # to allow orcabus id prefix + lookup_value_regex = "[^/]+" # to allow orcabus id prefix queryset = WorkflowRun.objects.prefetch_related('states').all() orcabus_id_prefix = WorkflowRun.orcabus_id_prefix @extend_schema( - request= PolymorphicProxySerializer( + request=PolymorphicProxySerializer( component_name='WorkflowRunRerun', serializers=list(RERUN_INPUT_SERIALIZERS.values()), resource_type_field_name=None ), responses=OpenApiTypes.OBJECT, description="Trigger a workflow run rerun by emitting an event to EventBridge with an overridden workflow " - "input payload." + "input payload. (Current supported workflow: 'rnasum')" ) @action( detail=True, From 1be762a0464ee08a214e365d69db1567754ad444 Mon Sep 17 00:00:00 2001 From: william Date: Fri, 22 Nov 2024 09:59:15 +1100 Subject: [PATCH 5/7] add version in payload --- .../stacks/workflow-manager/deploy/stack.ts | 1 + .../workflow_manager/aws_event_bridge/event.py | 1 - .../workflow_manager/serializers/base.py | 14 ++++++++++++++ .../workflow_manager/serializers/library.py | 2 ++ .../viewsets/workflow_run_action.py | 15 ++++++++------- 5 files changed, 25 insertions(+), 8 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts b/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts index 90d7fe439..1f6dd526e 100644 --- a/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts @@ -98,6 +98,7 @@ export class WorkflowManagerStack extends Stack { vpcSubnets: { subnets: this.vpc.privateSubnets }, role: this.lambdaRole, architecture: Architecture.ARM_64, + memorySize: 1024, ...props, }); } diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py index 2463933ca..10f849500 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/aws_event_bridge/event.py @@ -8,7 +8,6 @@ logger.setLevel(logging.INFO) - def emit_wrsc_api_event(event): """ Emit events to the event bridge sourced from the workflow manager API diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/base.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/base.py index 6263deae8..2b57a52a5 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/base.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/base.py @@ -1,14 +1,28 @@ +import re from rest_framework import serializers +def to_camel_case(snake_str): + components = re.split(r'[_\-\s]', snake_str) + return components[0].lower() + ''.join(x.title() for x in components[1:]) + + class SerializersBase(serializers.ModelSerializer): prefix = '' + def __init__(self, *args, camel_case_data=False, **kwargs): + super().__init__(*args, **kwargs) + self.use_camel_case = camel_case_data + def to_representation(self, instance): representation = super().to_representation(instance) representation['orcabus_id'] = self.prefix + str(representation['orcabus_id']) + + if self.use_camel_case: + return {to_camel_case(key): value for key, value in representation.items()} return representation + class OptionalFieldsMixin: def make_fields_optional(self): # Make all fields optional diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/library.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/library.py index c30a64746..29b27140e 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/library.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/serializers/library.py @@ -5,11 +5,13 @@ class LibraryBaseSerializer(SerializersBase): prefix = Library.orcabus_id_prefix + class LibraryListParamSerializer(OptionalFieldsMixin, LibraryBaseSerializer): class Meta: model = Library fields = "__all__" + class LibrarySerializer(LibraryBaseSerializer): class Meta: model = Library diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py index 70d8288e9..c0b87a890 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py @@ -35,7 +35,7 @@ class WorkflowRunActionViewSet(ViewSet): ), responses=OpenApiTypes.OBJECT, description="Trigger a workflow run rerun by emitting an event to EventBridge with an overridden workflow " - "input payload. (Current supported workflow: 'rnasum')" + "input payload. Current supported workflow: 'rnasum'" ) @action( detail=True, @@ -85,7 +85,7 @@ def construct_rerun_eb_detail(wfl_run: WorkflowRun, input_body: dict) -> dict: "status": 'READY', "payload": new_payload, "portalRunId": new_portal_run_id, - "linkedLibraries": LibrarySerializer(wfl_run.libraries.all(), many=True).data, + "linkedLibraries": LibrarySerializer(wfl_run.libraries.all(), many=True, camel_case_data=True).data, "workflowName": wfl_run.workflow.workflow_name, "workflowRunName": wfl_run.workflow_run_name, "workflowVersion": wfl_run.workflow.workflow_version, @@ -100,14 +100,15 @@ def construct_rnasum_rerun_payload(wfl_run: WorkflowRun, new_portal_run_id: str, # Get the payload where the state is 'READY' ready_state: State = wfl_run.states.get(status='READY') - ready_data_payload = PayloadSerializer(ready_state.payload).data.get("data", None) + ready_data_payload = PayloadSerializer(ready_state.payload).data - # Start crafting the payload based on the old ones - new_data_payload = ready_data_payload.copy() + new_data_payload = { + 'version': ready_data_payload['version'], + 'data': ready_data_payload['data'] + } # Override payload based on given input - new_data_payload["inputs"]["dataset"] = input_body["dataset"] - + new_data_payload['data']["inputs"]["dataset"] = input_body["dataset"] # Replace old portal_run_id with new_portal_run_id in any part of the string # In the 'rnasum` payload, the engine parameter URI prefixes contain the portal_run_id new_data_payload = json.loads( From 9c198aa9326b89e466d937e5f7b9d7996b72c3d1 Mon Sep 17 00:00:00 2001 From: william Date: Fri, 22 Nov 2024 10:03:18 +1100 Subject: [PATCH 6/7] Update stack.ts --- lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts b/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts index 1f6dd526e..a6b3a5808 100644 --- a/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts +++ b/lib/workload/stateless/stacks/workflow-manager/deploy/stack.ts @@ -153,6 +153,7 @@ export class WorkflowManagerStack extends Stack { new HttpRoute(this, 'PostRerunHttpRoute', { httpApi: httpApi, integration: apiIntegration, + authorizer: wfmApi.authStackHttpLambdaAuthorizer, routeKey: HttpRouteKey.with( `/api/${API_VERSION}/workflowrun/{orcabusId}/rerun/{proxy+}`, HttpMethod.POST From 566bbb34cbcb870500fee01fa2c21aa324c9271c Mon Sep 17 00:00:00 2001 From: william Date: Fri, 22 Nov 2024 12:21:00 +1100 Subject: [PATCH 7/7] Update workflow_run_action.py --- .../viewsets/workflow_run_action.py | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py index c0b87a890..d05e47c32 100644 --- a/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py +++ b/lib/workload/stateless/stacks/workflow-manager/workflow_manager/viewsets/workflow_run_action.py @@ -77,23 +77,27 @@ def construct_rerun_eb_detail(wfl_run: WorkflowRun, input_body: dict) -> dict: new_payload: dict if wfl_name == AllowedRerunWorkflow.RNASUM.value: - new_payload = construct_rnasum_rerun_payload(wfl_run, new_portal_run_id, input_body) + new_payload = construct_rnasum_rerun_payload(wfl_run, input_body) else: raise ValueError(f"Rerun is not allowed for this workflow: {wfl_name}") - return { - "status": 'READY', - "payload": new_payload, - "portalRunId": new_portal_run_id, - "linkedLibraries": LibrarySerializer(wfl_run.libraries.all(), many=True, camel_case_data=True).data, - "workflowName": wfl_run.workflow.workflow_name, - "workflowRunName": wfl_run.workflow_run_name, - "workflowVersion": wfl_run.workflow.workflow_version, - "timestamp": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), - } - - -def construct_rnasum_rerun_payload(wfl_run: WorkflowRun, new_portal_run_id: str, input_body: dict) -> dict: + # Replace old portal_run_id with new_portal_run_id in any part of the string + new_eb_detail = json.loads( + json.dumps({ + "status": 'READY', + "payload": new_payload, + "portalRunId": new_portal_run_id, + "linkedLibraries": LibrarySerializer(wfl_run.libraries.all(), many=True, camel_case_data=True).data, + "workflowName": wfl_run.workflow.workflow_name, + "workflowRunName": wfl_run.workflow_run_name, + "workflowVersion": wfl_run.workflow.workflow_version, + "timestamp": datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%SZ'), + }).replace(f"{wfl_run.portal_run_id}", f"{new_portal_run_id}")) + + return new_eb_detail + + +def construct_rnasum_rerun_payload(wfl_run: WorkflowRun, input_body: dict) -> dict: """ Construct payload for rerun for 'rnasum' workflow based on the request body payload """ @@ -109,9 +113,5 @@ def construct_rnasum_rerun_payload(wfl_run: WorkflowRun, new_portal_run_id: str, # Override payload based on given input new_data_payload['data']["inputs"]["dataset"] = input_body["dataset"] - # Replace old portal_run_id with new_portal_run_id in any part of the string - # In the 'rnasum` payload, the engine parameter URI prefixes contain the portal_run_id - new_data_payload = json.loads( - json.dumps(new_data_payload).replace(f"{wfl_run.portal_run_id}", f"{new_portal_run_id}")) return new_data_payload