Skip to content

Commit

Permalink
Link ExecutionLogs with FileExecutionEntity (#1045)
Browse files Browse the repository at this point in the history
  • Loading branch information
muhammad-ali-e authored Jan 7, 2025
1 parent 7946bf8 commit 078c22a
Show file tree
Hide file tree
Showing 11 changed files with 106 additions and 27 deletions.
14 changes: 13 additions & 1 deletion backend/utils/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class LogDataDTO:
timestamp: timestamp
log_type: log type
data: log data
file_execution_id: Id for the file execution
"""

def __init__(
Expand All @@ -28,8 +29,10 @@ def __init__(
timestamp: int,
log_type: str,
data: dict[str, Any],
file_execution_id: Optional[str] = None,
):
self.execution_id: str = execution_id
self.file_execution_id: Optional[str] = file_execution_id
self.organization_id: str = organization_id
self.timestamp: int = timestamp
self.event_time: datetime = datetime.fromtimestamp(timestamp)
Expand All @@ -41,12 +44,20 @@ def from_json(cls, json_data: str) -> Optional["LogDataDTO"]:
try:
json_data = json.loads(json_data)
execution_id = json_data.get(LogFieldName.EXECUTION_ID)
file_execution_id = json_data.get(LogFieldName.FILE_EXECUTION_ID)
organization_id = json_data.get(LogFieldName.ORGANIZATION_ID)
timestamp = json_data.get(LogFieldName.TIMESTAMP)
log_type = json_data.get(LogFieldName.TYPE)
data = json_data.get(LogFieldName.DATA)
if all((execution_id, organization_id, timestamp, log_type, data)):
return cls(execution_id, organization_id, timestamp, log_type, data)
return cls(
execution_id=execution_id,
file_execution_id=file_execution_id,
organization_id=organization_id,
timestamp=timestamp,
log_type=log_type,
data=data,
)
except (json.JSONDecodeError, AttributeError):
logger.warning("Invalid log data: %s", json_data)
return None
Expand All @@ -60,5 +71,6 @@ def to_json(self) -> str:
LogFieldName.EVENT_TIME: self.event_time.isoformat(),
LogFieldName.TYPE: self.log_type,
LogFieldName.DATA: self.data,
LogFieldName.FILE_EXECUTION_ID: self.file_execution_id,
}
)
2 changes: 2 additions & 0 deletions backend/utils/log_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def _get_validated_log_data(json_data: Any) -> Optional[LogDataDTO]:
organization_id = json_data.get(LogFieldName.ORGANIZATION_ID)
timestamp = json_data.get(LogFieldName.TIMESTAMP)
log_type = json_data.get(LogFieldName.TYPE)
file_execution_id = json_data.get(LogFieldName.FILE_EXECUTION_ID)

# Ensure the log type is LogType.LOG
if log_type != LogType.LOG.value:
Expand All @@ -122,6 +123,7 @@ def _get_validated_log_data(json_data: Any) -> Optional[LogDataDTO]:

return LogDataDTO(
execution_id=execution_id,
file_execution_id=file_execution_id,
organization_id=organization_id,
timestamp=timestamp,
log_type=log_type,
Expand Down
14 changes: 9 additions & 5 deletions backend/workflow_manager/workflow_v2/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ def build(self) -> None:
)
raise WorkflowExecutionError(self.compilation_result["problems"][0])

def execute(self, run_id: str, file_name: str, single_step: bool = False) -> None:
def execute(
self, file_execution_id: str, file_name: str, single_step: bool = False
) -> None:
execution_type = ExecutionType.COMPLETE
if single_step:
execution_type = ExecutionType.STEP
Expand All @@ -235,7 +237,9 @@ def execute(self, run_id: str, file_name: str, single_step: bool = False) -> Non
start_time = time.time()
try:
self.execute_workflow(
run_id=run_id, file_name=file_name, execution_type=execution_type
file_execution_id=file_execution_id,
file_name=file_name,
execution_type=execution_type,
)
end_time = time.time()
execution_time = end_time - start_time
Expand Down Expand Up @@ -308,15 +312,15 @@ def publish_initial_tool_execution_logs(

def execute_input_file(
self,
run_id: str,
file_execution_id: str,
file_name: str,
single_step: bool,
workflow_file_execution: WorkflowFileExecution,
) -> None:
"""Executes the input file.
Args:
run_id (str): UUID for a single run of a file
file_execution_id (str): UUID for a single run of a file
file_name (str): The name of the file to be executed.
single_step (bool): Flag indicating whether to execute in
single step mode.
Expand All @@ -335,7 +339,7 @@ def execute_input_file(
)
workflow_file_execution.update_status(ExecutionStatus.EXECUTING)

self.execute(run_id, file_name, single_step)
self.execute(file_execution_id, file_name, single_step)
self.publish_log(f"Tool executed successfully for '{file_name}'")
self._handle_execution_type(execution_type)

Expand Down
22 changes: 15 additions & 7 deletions backend/workflow_manager/workflow_v2/execution_log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from utils.cache_service import CacheService
from utils.constants import ExecutionLogConstants
from utils.dto import LogDataDTO
from workflow_manager.file_execution.models import WorkflowFileExecution
from workflow_manager.workflow_v2.models.execution_log import ExecutionLog

logger = logging.getLogger(__name__)
Expand All @@ -28,14 +29,21 @@ def consume_log_history(self):
if not log_data:
continue

organization_id = log_data.organization_id
organization_logs[organization_id].append(
ExecutionLog(
execution_id=log_data.execution_id,
data=log_data.data,
event_time=log_data.event_time,
)
# Create ExecutionLog instance
execution_log = ExecutionLog(
execution_id=log_data.execution_id,
data=log_data.data,
event_time=log_data.event_time,
)

# Conditionally set file_execution if file_execution_id is present
if log_data.file_execution_id:
execution_log.file_execution = WorkflowFileExecution(
id=log_data.file_execution_id
)

organization_id = log_data.organization_id
organization_logs[organization_id].append(execution_log)
logs_count += 1
logger.info(f"Logs count: {logs_count}")
for organization_id, logs in organization_logs.items():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 4.2.1 on 2025-01-02 10:34

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("file_execution", "0001_initial"),
("workflow_v2", "0003_workflowexecution_result_acknowledged"),
]

operations = [
migrations.AddField(
model_name="executionlog",
name="file_execution",
field=models.ForeignKey(
blank=True,
db_comment="Foreign key from WorkflowFileExecution model",
editable=False,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="execution_logs",
to="file_execution.workflowfileexecution",
),
),
]
11 changes: 11 additions & 0 deletions backend/workflow_manager/workflow_v2/models/execution_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from django.db import models
from utils.models.base_model import BaseModel
from workflow_manager.file_execution.models import WorkflowFileExecution


class ExecutionLog(BaseModel):
Expand All @@ -10,6 +11,16 @@ class ExecutionLog(BaseModel):
editable=False,
db_comment="Execution ID",
)
file_execution = models.ForeignKey(
WorkflowFileExecution,
on_delete=models.CASCADE,
db_index=True,
editable=False,
db_comment="Foreign key from WorkflowFileExecution model",
related_name="execution_logs",
null=True,
blank=True,
)
data = models.JSONField(db_comment="Execution log data")
event_time = models.DateTimeField(db_comment="Execution log event time")

Expand Down
11 changes: 6 additions & 5 deletions backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,18 @@ def _process_file(
input_file_path=input_file, file_hash=file_hash
)
try:
# Multiple run_ids are linked to an execution_id
# Each run_id corresponds to workflow runs for a single file
# It should e uuid of workflow_file_execution
file_execution_id = str(workflow_file_execution.id)
execution_service.file_execution_id = file_execution_id
execution_service.initiate_tool_execution(
current_file_idx, total_files, file_name, single_step
)
workflow_file_execution.update_status(status=ExecutionStatus.INITIATED)
if not file_hash.is_executed:
# Multiple run_ids are linked to an execution_id
# Each run_id corresponds to workflow runs for a single file
# It should e uuid of workflow_file_execution
run_id = str(workflow_file_execution.id)
execution_service.execute_input_file(
run_id=run_id,
file_execution_id=file_execution_id,
file_name=file_name,
single_step=single_step,
workflow_file_execution=workflow_file_execution,
Expand Down
1 change: 1 addition & 0 deletions unstract/core/src/unstract/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class LogFieldName:
TYPE = "type"
DATA = "data"
EVENT_TIME = "event_time"
FILE_EXECUTION_ID = "file_execution_id"


class LogEventArgument:
Expand Down
2 changes: 2 additions & 0 deletions unstract/core/src/unstract/core/pubsub_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def log_workflow(
iteration: Optional[int] = None,
iteration_total: Optional[int] = None,
execution_id: Optional[str] = None,
file_execution_id: Optional[str] = None,
organization_id: Optional[str] = None,
) -> dict[str, Any]:
return {
Expand All @@ -56,6 +57,7 @@ def log_workflow(
"iteration": iteration,
"iteration_total": iteration_total,
"execution_id": execution_id,
"file_execution_id": file_execution_id,
"organization_id": organization_id,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self.ignore_processed_entities = ignore_processed_entities
self.override_single_step = False
self.execution_id: str = ""
self.file_execution_id: Optional[str] = None
self.messaging_channel: Optional[str] = None
self.input_files: list[str] = []
self.log_stage: LogStage = LogStage.COMPILE
Expand Down Expand Up @@ -139,13 +140,13 @@ def build_workflow(self) -> None:
logger.info(f"Execution {self.execution_id}: Build completed")

def execute_workflow(
self, run_id: str, file_name: str, execution_type: ExecutionType
self, file_execution_id: str, file_name: str, execution_type: ExecutionType
) -> None:
"""Executes the complete workflow by running each tools one by one.
Returns the result from final tool in a dictionary.
Args:
run_id (str): UUID for a single run of a file
file_execution_id (str): UUID for a single run of a file
file_name (str): Name of the file to process
execution_type (ExecutionType): STEP or COMPLETE
Expand All @@ -162,37 +163,35 @@ def execute_workflow(
self._initialize_execution()
total_steps = len(self.tool_sandboxes)
self.total_steps = total_steps
self.file_execution_id = file_execution_id
# Currently each tool is run serially for files and workflows contain 1 tool
# only. While supporting more tools in a workflow, correct the tool container
# name to avoid conflicts.
for step, sandbox in enumerate(self.tool_sandboxes):
container_name = UnstractUtils.build_tool_container_name(
tool_image=sandbox.image_name,
tool_version=sandbox.image_tag,
run_id=run_id,
run_id=file_execution_id,
)
logger.info(
f"Running execution: '{self.execution_id}', "
f"tool: '{sandbox.image_name}:{sandbox.image_tag}', "
f"file '{file_name}', container: '{container_name}'"
)
self._execute_step(
run_id=run_id,
step=step,
sandbox=sandbox,
)
self._finalize_execution(execution_type)

def _execute_step(
self,
run_id: str,
step: int,
sandbox: ToolSandbox,
) -> None:
"""Execution of workflow step.
Args:
run_id (str): UUID for a single run of a file
step (int): workflow step
sandbox (ToolSandbox): instance of tool sandbox
execution_type (ExecutionType): step or complete
Expand All @@ -207,7 +206,10 @@ def _execute_step(
tool_uid = sandbox.get_tool_uid()
tool_instance_id = sandbox.get_tool_instance_id()
log_message = f"Executing step {actual_step} with tool {tool_uid}"
logger.info(f"Execution {self.execution_id}, Run {run_id}: {log_message}")
logger.info(
f"Execution {self.execution_id}, Run {self.file_execution_id}"
f": {log_message}"
)
# TODO: Mention run_id in the FE logs / components
self.publish_log(
log_message,
Expand All @@ -221,13 +223,15 @@ def _execute_step(
message="Ready for execution",
component=tool_instance_id,
)
result = self.tool_utils.run_tool(run_id=run_id, tool_sandbox=sandbox)
result = self.tool_utils.run_tool(
run_id=self.file_execution_id, tool_sandbox=sandbox
)
if result and result.get("error"):
raise ToolOutputNotFoundException(result.get("error"))
if not self.validate_execution_result(step + 1):
raise ToolOutputNotFoundException(
f"Error running tool '{tool_uid}' for run "
f"'{run_id}' of execution '{self.execution_id}'. "
f"'{self.file_execution_id}' of execution '{self.execution_id}'. "
"Check logs for more information"
)
log_message = f"Step {actual_step} executed successfully"
Expand Down Expand Up @@ -398,6 +402,7 @@ def publish_log(
iteration=iteration,
iteration_total=iteration_total,
execution_id=self.execution_id,
file_execution_id=self.file_execution_id,
organization_id=self.organization_id,
)
LogPublisher.publish(self.messaging_channel, log_details)
Expand Down
5 changes: 5 additions & 0 deletions worker/src/unstract/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def stream_logs(
tool_instance_id: str,
execution_id: str,
organization_id: str,
run_id: str,
channel: Optional[str] = None,
) -> None:
for line in container.logs(follow=True):
Expand All @@ -51,6 +52,7 @@ def stream_logs(
channel=channel,
execution_id=execution_id,
organization_id=organization_id,
run_id=run_id,
)

def get_valid_log_message(self, log_message: str) -> Optional[dict[str, Any]]:
Expand All @@ -75,6 +77,7 @@ def process_log_message(
tool_instance_id: str,
execution_id: str,
organization_id: str,
run_id: str,
channel: Optional[str] = None,
) -> Optional[dict[str, Any]]:
log_dict = self.get_valid_log_message(log_message)
Expand All @@ -97,6 +100,7 @@ def process_log_message(
log_dict[LogFieldName.EXECUTION_ID] = execution_id
log_dict[LogFieldName.ORGANIZATION_ID] = organization_id
log_dict[LogFieldName.TIMESTAMP] = datetime.now(timezone.utc).timestamp()
log_dict[LogFieldName.FILE_EXECUTION_ID] = run_id

# Publish to channel of socket io
LogPublisher.publish(channel, log_dict)
Expand Down Expand Up @@ -239,6 +243,7 @@ def run_container(
channel=messaging_channel,
execution_id=execution_id,
organization_id=organization_id,
run_id=run_id,
)
except ToolRunException as te:
self.logger.error(
Expand Down

0 comments on commit 078c22a

Please sign in to comment.