Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Link ExecutionLogs with FileExecutionEntity #1045

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion backend/utils/dto.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class LogDataDTO:
timestamp: timestamp
log_type: log type
data: log data
file_execution_id: Id for the file execution
"""

def __init__(
Expand All @@ -28,8 +29,10 @@ def __init__(
timestamp: int,
log_type: str,
data: dict[str, Any],
file_execution_id: Optional[str] = None,
):
self.execution_id: str = execution_id
self.file_execution_id: Optional[str] = file_execution_id
self.organization_id: str = organization_id
self.timestamp: int = timestamp
self.event_time: datetime = datetime.fromtimestamp(timestamp)
Expand All @@ -41,12 +44,20 @@ def from_json(cls, json_data: str) -> Optional["LogDataDTO"]:
try:
json_data = json.loads(json_data)
execution_id = json_data.get(LogFieldName.EXECUTION_ID)
file_execution_id = json_data.get(LogFieldName.FILE_EXECUTION_ID)
organization_id = json_data.get(LogFieldName.ORGANIZATION_ID)
timestamp = json_data.get(LogFieldName.TIMESTAMP)
log_type = json_data.get(LogFieldName.TYPE)
data = json_data.get(LogFieldName.DATA)
if all((execution_id, organization_id, timestamp, log_type, data)):
return cls(execution_id, organization_id, timestamp, log_type, data)
return cls(
execution_id=execution_id,
file_execution_id=file_execution_id,
organization_id=organization_id,
timestamp=timestamp,
log_type=log_type,
data=data,
)
except (json.JSONDecodeError, AttributeError):
logger.warning("Invalid log data: %s", json_data)
return None
Expand All @@ -60,5 +71,6 @@ def to_json(self) -> str:
LogFieldName.EVENT_TIME: self.event_time.isoformat(),
LogFieldName.TYPE: self.log_type,
LogFieldName.DATA: self.data,
LogFieldName.FILE_EXECUTION_ID: self.file_execution_id,
}
)
2 changes: 2 additions & 0 deletions backend/utils/log_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ def _get_validated_log_data(json_data: Any) -> Optional[LogDataDTO]:
organization_id = json_data.get(LogFieldName.ORGANIZATION_ID)
timestamp = json_data.get(LogFieldName.TIMESTAMP)
log_type = json_data.get(LogFieldName.TYPE)
file_execution_id = json_data.get(LogFieldName.FILE_EXECUTION_ID)

# Ensure the log type is LogType.LOG
if log_type != LogType.LOG.value:
Expand All @@ -122,6 +123,7 @@ def _get_validated_log_data(json_data: Any) -> Optional[LogDataDTO]:

return LogDataDTO(
execution_id=execution_id,
file_execution_id=file_execution_id,
organization_id=organization_id,
timestamp=timestamp,
log_type=log_type,
Expand Down
14 changes: 9 additions & 5 deletions backend/workflow_manager/workflow_v2/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ def build(self) -> None:
)
raise WorkflowExecutionError(self.compilation_result["problems"][0])

def execute(self, run_id: str, file_name: str, single_step: bool = False) -> None:
def execute(
self, file_execution_id: str, file_name: str, single_step: bool = False
) -> None:
execution_type = ExecutionType.COMPLETE
if single_step:
execution_type = ExecutionType.STEP
Expand All @@ -235,7 +237,9 @@ def execute(self, run_id: str, file_name: str, single_step: bool = False) -> Non
start_time = time.time()
try:
self.execute_workflow(
run_id=run_id, file_name=file_name, execution_type=execution_type
file_execution_id=file_execution_id,
file_name=file_name,
execution_type=execution_type,
)
end_time = time.time()
execution_time = end_time - start_time
Expand Down Expand Up @@ -308,15 +312,15 @@ def publish_initial_tool_execution_logs(

def execute_input_file(
self,
run_id: str,
file_execution_id: str,
file_name: str,
single_step: bool,
workflow_file_execution: WorkflowFileExecution,
) -> None:
"""Executes the input file.

Args:
run_id (str): UUID for a single run of a file
file_execution_id (str): UUID for a single run of a file
file_name (str): The name of the file to be executed.
single_step (bool): Flag indicating whether to execute in
single step mode.
Expand All @@ -335,7 +339,7 @@ def execute_input_file(
)
workflow_file_execution.update_status(ExecutionStatus.EXECUTING)

self.execute(run_id, file_name, single_step)
self.execute(file_execution_id, file_name, single_step)
self.publish_log(f"Tool executed successfully for '{file_name}'")
self._handle_execution_type(execution_type)

Expand Down
22 changes: 15 additions & 7 deletions backend/workflow_manager/workflow_v2/execution_log_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from utils.cache_service import CacheService
from utils.constants import ExecutionLogConstants
from utils.dto import LogDataDTO
from workflow_manager.file_execution.models import WorkflowFileExecution
from workflow_manager.workflow_v2.models.execution_log import ExecutionLog

logger = logging.getLogger(__name__)
Expand All @@ -28,14 +29,21 @@ def consume_log_history(self):
if not log_data:
continue

organization_id = log_data.organization_id
organization_logs[organization_id].append(
ExecutionLog(
execution_id=log_data.execution_id,
data=log_data.data,
event_time=log_data.event_time,
)
# Create ExecutionLog instance
execution_log = ExecutionLog(
execution_id=log_data.execution_id,
data=log_data.data,
event_time=log_data.event_time,
)

# Conditionally set file_execution if file_execution_id is present
if log_data.file_execution_id:
execution_log.file_execution = WorkflowFileExecution(
id=log_data.file_execution_id
)

organization_id = log_data.organization_id
organization_logs[organization_id].append(execution_log)
logs_count += 1
logger.info(f"Logs count: {logs_count}")
for organization_id, logs in organization_logs.items():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Generated by Django 4.2.1 on 2025-01-02 10:34

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("file_execution", "0001_initial"),
("workflow_v2", "0003_workflowexecution_result_acknowledged"),
]

operations = [
migrations.AddField(
model_name="executionlog",
name="file_execution",
field=models.ForeignKey(
blank=True,
db_comment="Foreign key from WorkflowFileExecution model",
editable=False,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="execution_logs",
to="file_execution.workflowfileexecution",
),
),
]
11 changes: 11 additions & 0 deletions backend/workflow_manager/workflow_v2/models/execution_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from django.db import models
from utils.models.base_model import BaseModel
from workflow_manager.file_execution.models import WorkflowFileExecution


class ExecutionLog(BaseModel):
Expand All @@ -10,6 +11,16 @@ class ExecutionLog(BaseModel):
editable=False,
db_comment="Execution ID",
)
file_execution = models.ForeignKey(
WorkflowFileExecution,
on_delete=models.CASCADE,
db_index=True,
editable=False,
db_comment="Foreign key from WorkflowFileExecution model",
related_name="execution_logs",
null=True,
blank=True,
)
data = models.JSONField(db_comment="Execution log data")
event_time = models.DateTimeField(db_comment="Execution log event time")

Expand Down
11 changes: 6 additions & 5 deletions backend/workflow_manager/workflow_v2/workflow_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,17 +235,18 @@ def _process_file(
input_file_path=input_file, file_hash=file_hash
)
try:
# Multiple run_ids are linked to an execution_id
# Each run_id corresponds to workflow runs for a single file
# It should e uuid of workflow_file_execution
file_execution_id = str(workflow_file_execution.id)
execution_service.file_execution_id = file_execution_id
execution_service.initiate_tool_execution(
current_file_idx, total_files, file_name, single_step
)
workflow_file_execution.update_status(status=ExecutionStatus.INITIATED)
if not file_hash.is_executed:
# Multiple run_ids are linked to an execution_id
# Each run_id corresponds to workflow runs for a single file
# It should e uuid of workflow_file_execution
run_id = str(workflow_file_execution.id)
execution_service.execute_input_file(
run_id=run_id,
file_execution_id=file_execution_id,
file_name=file_name,
single_step=single_step,
workflow_file_execution=workflow_file_execution,
Expand Down
1 change: 1 addition & 0 deletions unstract/core/src/unstract/core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class LogFieldName:
TYPE = "type"
DATA = "data"
EVENT_TIME = "event_time"
FILE_EXECUTION_ID = "file_execution_id"


class LogEventArgument:
Expand Down
2 changes: 2 additions & 0 deletions unstract/core/src/unstract/core/pubsub_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def log_workflow(
iteration: Optional[int] = None,
iteration_total: Optional[int] = None,
execution_id: Optional[str] = None,
file_execution_id: Optional[str] = None,
organization_id: Optional[str] = None,
) -> dict[str, Any]:
return {
Expand All @@ -56,6 +57,7 @@ def log_workflow(
"iteration": iteration,
"iteration_total": iteration_total,
"execution_id": execution_id,
"file_execution_id": file_execution_id,
"organization_id": organization_id,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(
self.ignore_processed_entities = ignore_processed_entities
self.override_single_step = False
self.execution_id: str = ""
self.file_execution_id: Optional[str] = None
self.messaging_channel: Optional[str] = None
self.input_files: list[str] = []
self.log_stage: LogStage = LogStage.COMPILE
Expand Down Expand Up @@ -139,13 +140,13 @@ def build_workflow(self) -> None:
logger.info(f"Execution {self.execution_id}: Build completed")

def execute_workflow(
self, run_id: str, file_name: str, execution_type: ExecutionType
self, file_execution_id: str, file_name: str, execution_type: ExecutionType
) -> None:
"""Executes the complete workflow by running each tools one by one.
Returns the result from final tool in a dictionary.

Args:
run_id (str): UUID for a single run of a file
file_execution_id (str): UUID for a single run of a file
file_name (str): Name of the file to process
execution_type (ExecutionType): STEP or COMPLETE

Expand All @@ -162,37 +163,35 @@ def execute_workflow(
self._initialize_execution()
total_steps = len(self.tool_sandboxes)
self.total_steps = total_steps
self.file_execution_id = file_execution_id
# Currently each tool is run serially for files and workflows contain 1 tool
# only. While supporting more tools in a workflow, correct the tool container
# name to avoid conflicts.
for step, sandbox in enumerate(self.tool_sandboxes):
container_name = UnstractUtils.build_tool_container_name(
tool_image=sandbox.image_name,
tool_version=sandbox.image_tag,
run_id=run_id,
run_id=file_execution_id,
)
logger.info(
f"Running execution: '{self.execution_id}', "
f"tool: '{sandbox.image_name}:{sandbox.image_tag}', "
f"file '{file_name}', container: '{container_name}'"
)
self._execute_step(
run_id=run_id,
step=step,
sandbox=sandbox,
)
self._finalize_execution(execution_type)

def _execute_step(
self,
run_id: str,
step: int,
sandbox: ToolSandbox,
) -> None:
"""Execution of workflow step.

Args:
run_id (str): UUID for a single run of a file
step (int): workflow step
sandbox (ToolSandbox): instance of tool sandbox
execution_type (ExecutionType): step or complete
Expand All @@ -207,7 +206,10 @@ def _execute_step(
tool_uid = sandbox.get_tool_uid()
tool_instance_id = sandbox.get_tool_instance_id()
log_message = f"Executing step {actual_step} with tool {tool_uid}"
logger.info(f"Execution {self.execution_id}, Run {run_id}: {log_message}")
logger.info(
f"Execution {self.execution_id}, Run {self.file_execution_id}"
f": {log_message}"
)
# TODO: Mention run_id in the FE logs / components
self.publish_log(
log_message,
Expand All @@ -221,13 +223,15 @@ def _execute_step(
message="Ready for execution",
component=tool_instance_id,
)
result = self.tool_utils.run_tool(run_id=run_id, tool_sandbox=sandbox)
result = self.tool_utils.run_tool(
run_id=self.file_execution_id, tool_sandbox=sandbox
)
if result and result.get("error"):
raise ToolOutputNotFoundException(result.get("error"))
if not self.validate_execution_result(step + 1):
raise ToolOutputNotFoundException(
f"Error running tool '{tool_uid}' for run "
f"'{run_id}' of execution '{self.execution_id}'. "
f"'{self.file_execution_id}' of execution '{self.execution_id}'. "
"Check logs for more information"
)
log_message = f"Step {actual_step} executed successfully"
Expand Down Expand Up @@ -398,6 +402,7 @@ def publish_log(
iteration=iteration,
iteration_total=iteration_total,
execution_id=self.execution_id,
file_execution_id=self.file_execution_id,
organization_id=self.organization_id,
)
LogPublisher.publish(self.messaging_channel, log_details)
Expand Down
5 changes: 5 additions & 0 deletions worker/src/unstract/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def stream_logs(
tool_instance_id: str,
execution_id: str,
organization_id: str,
run_id: str,
channel: Optional[str] = None,
) -> None:
for line in container.logs(follow=True):
Expand All @@ -51,6 +52,7 @@ def stream_logs(
channel=channel,
execution_id=execution_id,
organization_id=organization_id,
run_id=run_id,
)

def get_valid_log_message(self, log_message: str) -> Optional[dict[str, Any]]:
Expand All @@ -75,6 +77,7 @@ def process_log_message(
tool_instance_id: str,
execution_id: str,
organization_id: str,
run_id: str,
channel: Optional[str] = None,
) -> Optional[dict[str, Any]]:
log_dict = self.get_valid_log_message(log_message)
Expand All @@ -97,6 +100,7 @@ def process_log_message(
log_dict[LogFieldName.EXECUTION_ID] = execution_id
log_dict[LogFieldName.ORGANIZATION_ID] = organization_id
log_dict[LogFieldName.TIMESTAMP] = datetime.now(timezone.utc).timestamp()
log_dict[LogFieldName.FILE_EXECUTION_ID] = run_id

# Publish to channel of socket io
LogPublisher.publish(channel, log_dict)
Expand Down Expand Up @@ -239,6 +243,7 @@ def run_container(
channel=messaging_channel,
execution_id=execution_id,
organization_id=organization_id,
run_id=run_id,
)
except ToolRunException as te:
self.logger.error(
Expand Down
Loading