diff --git a/backend/utils/dto.py b/backend/utils/dto.py index 445982d4c..8c06e3f91 100644 --- a/backend/utils/dto.py +++ b/backend/utils/dto.py @@ -19,6 +19,7 @@ class LogDataDTO: timestamp: timestamp log_type: log type data: log data + file_execution_id: Id for the file execution """ def __init__( @@ -28,8 +29,10 @@ def __init__( timestamp: int, log_type: str, data: dict[str, Any], + file_execution_id: Optional[str] = None, ): self.execution_id: str = execution_id + self.file_execution_id: Optional[str] = file_execution_id self.organization_id: str = organization_id self.timestamp: int = timestamp self.event_time: datetime = datetime.fromtimestamp(timestamp) @@ -41,12 +44,20 @@ def from_json(cls, json_data: str) -> Optional["LogDataDTO"]: try: json_data = json.loads(json_data) execution_id = json_data.get(LogFieldName.EXECUTION_ID) + file_execution_id = json_data.get(LogFieldName.FILE_EXECUTION_ID) organization_id = json_data.get(LogFieldName.ORGANIZATION_ID) timestamp = json_data.get(LogFieldName.TIMESTAMP) log_type = json_data.get(LogFieldName.TYPE) data = json_data.get(LogFieldName.DATA) if all((execution_id, organization_id, timestamp, log_type, data)): - return cls(execution_id, organization_id, timestamp, log_type, data) + return cls( + execution_id=execution_id, + file_execution_id=file_execution_id, + organization_id=organization_id, + timestamp=timestamp, + log_type=log_type, + data=data, + ) except (json.JSONDecodeError, AttributeError): logger.warning("Invalid log data: %s", json_data) return None @@ -60,5 +71,6 @@ def to_json(self) -> str: LogFieldName.EVENT_TIME: self.event_time.isoformat(), LogFieldName.TYPE: self.log_type, LogFieldName.DATA: self.data, + LogFieldName.FILE_EXECUTION_ID: self.file_execution_id, } ) diff --git a/backend/utils/log_events.py b/backend/utils/log_events.py index 309a879a7..da43eacd6 100644 --- a/backend/utils/log_events.py +++ b/backend/utils/log_events.py @@ -110,6 +110,7 @@ def _get_validated_log_data(json_data: Any) -> Optional[LogDataDTO]: organization_id = json_data.get(LogFieldName.ORGANIZATION_ID) timestamp = json_data.get(LogFieldName.TIMESTAMP) log_type = json_data.get(LogFieldName.TYPE) + file_execution_id = json_data.get(LogFieldName.FILE_EXECUTION_ID) # Ensure the log type is LogType.LOG if log_type != LogType.LOG.value: @@ -122,6 +123,7 @@ def _get_validated_log_data(json_data: Any) -> Optional[LogDataDTO]: return LogDataDTO( execution_id=execution_id, + file_execution_id=file_execution_id, organization_id=organization_id, timestamp=timestamp, log_type=log_type, diff --git a/backend/workflow_manager/workflow_v2/execution.py b/backend/workflow_manager/workflow_v2/execution.py index 994300d01..b3afceb03 100644 --- a/backend/workflow_manager/workflow_v2/execution.py +++ b/backend/workflow_manager/workflow_v2/execution.py @@ -213,7 +213,9 @@ def build(self) -> None: ) raise WorkflowExecutionError(self.compilation_result["problems"][0]) - def execute(self, run_id: str, file_name: str, single_step: bool = False) -> None: + def execute( + self, file_execution_id: str, file_name: str, single_step: bool = False + ) -> None: execution_type = ExecutionType.COMPLETE if single_step: execution_type = ExecutionType.STEP @@ -235,7 +237,9 @@ def execute(self, run_id: str, file_name: str, single_step: bool = False) -> Non start_time = time.time() try: self.execute_workflow( - run_id=run_id, file_name=file_name, execution_type=execution_type + file_execution_id=file_execution_id, + file_name=file_name, + execution_type=execution_type, ) end_time = time.time() execution_time = end_time - start_time @@ -308,7 +312,7 @@ def publish_initial_tool_execution_logs( def execute_input_file( self, - run_id: str, + file_execution_id: str, file_name: str, single_step: bool, workflow_file_execution: WorkflowFileExecution, @@ -316,7 +320,7 @@ def execute_input_file( """Executes the input file. Args: - run_id (str): UUID for a single run of a file + file_execution_id (str): UUID for a single run of a file file_name (str): The name of the file to be executed. single_step (bool): Flag indicating whether to execute in single step mode. @@ -335,7 +339,7 @@ def execute_input_file( ) workflow_file_execution.update_status(ExecutionStatus.EXECUTING) - self.execute(run_id, file_name, single_step) + self.execute(file_execution_id, file_name, single_step) self.publish_log(f"Tool executed successfully for '{file_name}'") self._handle_execution_type(execution_type) diff --git a/backend/workflow_manager/workflow_v2/execution_log_utils.py b/backend/workflow_manager/workflow_v2/execution_log_utils.py index 5b1ef6079..21bcf1b38 100644 --- a/backend/workflow_manager/workflow_v2/execution_log_utils.py +++ b/backend/workflow_manager/workflow_v2/execution_log_utils.py @@ -9,6 +9,7 @@ from utils.cache_service import CacheService from utils.constants import ExecutionLogConstants from utils.dto import LogDataDTO +from workflow_manager.file_execution.models import WorkflowFileExecution from workflow_manager.workflow_v2.models.execution_log import ExecutionLog logger = logging.getLogger(__name__) @@ -28,14 +29,21 @@ def consume_log_history(self): if not log_data: continue - organization_id = log_data.organization_id - organization_logs[organization_id].append( - ExecutionLog( - execution_id=log_data.execution_id, - data=log_data.data, - event_time=log_data.event_time, - ) + # Create ExecutionLog instance + execution_log = ExecutionLog( + execution_id=log_data.execution_id, + data=log_data.data, + event_time=log_data.event_time, ) + + # Conditionally set file_execution if file_execution_id is present + if log_data.file_execution_id: + execution_log.file_execution = WorkflowFileExecution( + id=log_data.file_execution_id + ) + + organization_id = log_data.organization_id + organization_logs[organization_id].append(execution_log) logs_count += 1 logger.info(f"Logs count: {logs_count}") for organization_id, logs in organization_logs.items(): diff --git a/backend/workflow_manager/workflow_v2/migrations/0004_executionlog_file_execution.py b/backend/workflow_manager/workflow_v2/migrations/0004_executionlog_file_execution.py new file mode 100644 index 000000000..d0d80cd8c --- /dev/null +++ b/backend/workflow_manager/workflow_v2/migrations/0004_executionlog_file_execution.py @@ -0,0 +1,28 @@ +# Generated by Django 4.2.1 on 2025-01-02 10:34 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("file_execution", "0001_initial"), + ("workflow_v2", "0003_workflowexecution_result_acknowledged"), + ] + + operations = [ + migrations.AddField( + model_name="executionlog", + name="file_execution", + field=models.ForeignKey( + blank=True, + db_comment="Foreign key from WorkflowFileExecution model", + editable=False, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="execution_logs", + to="file_execution.workflowfileexecution", + ), + ), + ] diff --git a/backend/workflow_manager/workflow_v2/models/execution_log.py b/backend/workflow_manager/workflow_v2/models/execution_log.py index 68faa9a4e..c3d5ebfa4 100644 --- a/backend/workflow_manager/workflow_v2/models/execution_log.py +++ b/backend/workflow_manager/workflow_v2/models/execution_log.py @@ -2,6 +2,7 @@ from django.db import models from utils.models.base_model import BaseModel +from workflow_manager.file_execution.models import WorkflowFileExecution class ExecutionLog(BaseModel): @@ -10,6 +11,16 @@ class ExecutionLog(BaseModel): editable=False, db_comment="Execution ID", ) + file_execution = models.ForeignKey( + WorkflowFileExecution, + on_delete=models.CASCADE, + db_index=True, + editable=False, + db_comment="Foreign key from WorkflowFileExecution model", + related_name="execution_logs", + null=True, + blank=True, + ) data = models.JSONField(db_comment="Execution log data") event_time = models.DateTimeField(db_comment="Execution log event time") diff --git a/backend/workflow_manager/workflow_v2/workflow_helper.py b/backend/workflow_manager/workflow_v2/workflow_helper.py index c2ecda704..ab72eab64 100644 --- a/backend/workflow_manager/workflow_v2/workflow_helper.py +++ b/backend/workflow_manager/workflow_v2/workflow_helper.py @@ -235,17 +235,18 @@ def _process_file( input_file_path=input_file, file_hash=file_hash ) try: + # Multiple run_ids are linked to an execution_id + # Each run_id corresponds to workflow runs for a single file + # It should e uuid of workflow_file_execution + file_execution_id = str(workflow_file_execution.id) + execution_service.file_execution_id = file_execution_id execution_service.initiate_tool_execution( current_file_idx, total_files, file_name, single_step ) workflow_file_execution.update_status(status=ExecutionStatus.INITIATED) if not file_hash.is_executed: - # Multiple run_ids are linked to an execution_id - # Each run_id corresponds to workflow runs for a single file - # It should e uuid of workflow_file_execution - run_id = str(workflow_file_execution.id) execution_service.execute_input_file( - run_id=run_id, + file_execution_id=file_execution_id, file_name=file_name, single_step=single_step, workflow_file_execution=workflow_file_execution, diff --git a/unstract/core/src/unstract/core/constants.py b/unstract/core/src/unstract/core/constants.py index 73303df0c..7523cd01c 100644 --- a/unstract/core/src/unstract/core/constants.py +++ b/unstract/core/src/unstract/core/constants.py @@ -5,6 +5,7 @@ class LogFieldName: TYPE = "type" DATA = "data" EVENT_TIME = "event_time" + FILE_EXECUTION_ID = "file_execution_id" class LogEventArgument: diff --git a/unstract/core/src/unstract/core/pubsub_helper.py b/unstract/core/src/unstract/core/pubsub_helper.py index 9011c40a0..dcbe6be37 100644 --- a/unstract/core/src/unstract/core/pubsub_helper.py +++ b/unstract/core/src/unstract/core/pubsub_helper.py @@ -41,6 +41,7 @@ def log_workflow( iteration: Optional[int] = None, iteration_total: Optional[int] = None, execution_id: Optional[str] = None, + file_execution_id: Optional[str] = None, organization_id: Optional[str] = None, ) -> dict[str, Any]: return { @@ -56,6 +57,7 @@ def log_workflow( "iteration": iteration, "iteration_total": iteration_total, "execution_id": execution_id, + "file_execution_id": file_execution_id, "organization_id": organization_id, } diff --git a/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py b/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py index 0f601bb06..39b94fd53 100644 --- a/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py +++ b/unstract/workflow-execution/src/unstract/workflow_execution/workflow_execution.py @@ -61,6 +61,7 @@ def __init__( self.ignore_processed_entities = ignore_processed_entities self.override_single_step = False self.execution_id: str = "" + self.file_execution_id: Optional[str] = None self.messaging_channel: Optional[str] = None self.input_files: list[str] = [] self.log_stage: LogStage = LogStage.COMPILE @@ -139,13 +140,13 @@ def build_workflow(self) -> None: logger.info(f"Execution {self.execution_id}: Build completed") def execute_workflow( - self, run_id: str, file_name: str, execution_type: ExecutionType + self, file_execution_id: str, file_name: str, execution_type: ExecutionType ) -> None: """Executes the complete workflow by running each tools one by one. Returns the result from final tool in a dictionary. Args: - run_id (str): UUID for a single run of a file + file_execution_id (str): UUID for a single run of a file file_name (str): Name of the file to process execution_type (ExecutionType): STEP or COMPLETE @@ -162,6 +163,7 @@ def execute_workflow( self._initialize_execution() total_steps = len(self.tool_sandboxes) self.total_steps = total_steps + self.file_execution_id = file_execution_id # Currently each tool is run serially for files and workflows contain 1 tool # only. While supporting more tools in a workflow, correct the tool container # name to avoid conflicts. @@ -169,7 +171,7 @@ def execute_workflow( container_name = UnstractUtils.build_tool_container_name( tool_image=sandbox.image_name, tool_version=sandbox.image_tag, - run_id=run_id, + run_id=file_execution_id, ) logger.info( f"Running execution: '{self.execution_id}', " @@ -177,7 +179,6 @@ def execute_workflow( f"file '{file_name}', container: '{container_name}'" ) self._execute_step( - run_id=run_id, step=step, sandbox=sandbox, ) @@ -185,14 +186,12 @@ def execute_workflow( def _execute_step( self, - run_id: str, step: int, sandbox: ToolSandbox, ) -> None: """Execution of workflow step. Args: - run_id (str): UUID for a single run of a file step (int): workflow step sandbox (ToolSandbox): instance of tool sandbox execution_type (ExecutionType): step or complete @@ -207,7 +206,10 @@ def _execute_step( tool_uid = sandbox.get_tool_uid() tool_instance_id = sandbox.get_tool_instance_id() log_message = f"Executing step {actual_step} with tool {tool_uid}" - logger.info(f"Execution {self.execution_id}, Run {run_id}: {log_message}") + logger.info( + f"Execution {self.execution_id}, Run {self.file_execution_id}" + f": {log_message}" + ) # TODO: Mention run_id in the FE logs / components self.publish_log( log_message, @@ -221,13 +223,15 @@ def _execute_step( message="Ready for execution", component=tool_instance_id, ) - result = self.tool_utils.run_tool(run_id=run_id, tool_sandbox=sandbox) + result = self.tool_utils.run_tool( + run_id=self.file_execution_id, tool_sandbox=sandbox + ) if result and result.get("error"): raise ToolOutputNotFoundException(result.get("error")) if not self.validate_execution_result(step + 1): raise ToolOutputNotFoundException( f"Error running tool '{tool_uid}' for run " - f"'{run_id}' of execution '{self.execution_id}'. " + f"'{self.file_execution_id}' of execution '{self.execution_id}'. " "Check logs for more information" ) log_message = f"Step {actual_step} executed successfully" @@ -398,6 +402,7 @@ def publish_log( iteration=iteration, iteration_total=iteration_total, execution_id=self.execution_id, + file_execution_id=self.file_execution_id, organization_id=self.organization_id, ) LogPublisher.publish(self.messaging_channel, log_details) diff --git a/worker/src/unstract/worker/worker.py b/worker/src/unstract/worker/worker.py index 93b34ccd9..685aa0c9c 100644 --- a/worker/src/unstract/worker/worker.py +++ b/worker/src/unstract/worker/worker.py @@ -40,6 +40,7 @@ def stream_logs( tool_instance_id: str, execution_id: str, organization_id: str, + run_id: str, channel: Optional[str] = None, ) -> None: for line in container.logs(follow=True): @@ -51,6 +52,7 @@ def stream_logs( channel=channel, execution_id=execution_id, organization_id=organization_id, + run_id=run_id, ) def get_valid_log_message(self, log_message: str) -> Optional[dict[str, Any]]: @@ -75,6 +77,7 @@ def process_log_message( tool_instance_id: str, execution_id: str, organization_id: str, + run_id: str, channel: Optional[str] = None, ) -> Optional[dict[str, Any]]: log_dict = self.get_valid_log_message(log_message) @@ -97,6 +100,7 @@ def process_log_message( log_dict[LogFieldName.EXECUTION_ID] = execution_id log_dict[LogFieldName.ORGANIZATION_ID] = organization_id log_dict[LogFieldName.TIMESTAMP] = datetime.now(timezone.utc).timestamp() + log_dict[LogFieldName.FILE_EXECUTION_ID] = run_id # Publish to channel of socket io LogPublisher.publish(channel, log_dict) @@ -239,6 +243,7 @@ def run_container( channel=messaging_channel, execution_id=execution_id, organization_id=organization_id, + run_id=run_id, ) except ToolRunException as te: self.logger.error(