From 51c3825aed1ccf9db1cdbc23199be2e71721aa4d Mon Sep 17 00:00:00 2001 From: ddl-ebrown Date: Thu, 16 May 2024 13:25:47 -0700 Subject: [PATCH] Propagate custom_info Dict through agent Resource - The agent defines a Resource return type with values: * outputs * message * log_links * phase These are all a part of the underlying protobuf contract defined in flyteidl. However, the message field custom_info from the protobuf is not here google.protobuf.Struct custom_info https://github.com/flyteorg/flyte/blob/519080b6e4e53fc0e216b5715ad9b5b5270f35c0/flyteidl/protos/flyteidl/admin/agent.proto#L140 This field was added in https://github.com/flyteorg/flyte/pull/4874 but never made it into the corresponding flytekit PR https://github.com/flyteorg/flytekit/pull/2146 - It's useful for agents to return additional metadata about the job, and it looks like custom_info is the intended location - Make a minor refactor to how the agent responds to requests that return Resource by implementing to_flyte_idl / from_flyte_idl directly Signed-off-by: ddl-ebrown --- flytekit/extend/backend/agent_service.py | 23 +----------- flytekit/extend/backend/base_agent.py | 47 +++++++++++++++++++++++- tests/flytekit/unit/extend/test_agent.py | 13 ++++++- 3 files changed, 59 insertions(+), 24 deletions(-) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index eb2838ca415..9836b95ca5b 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -136,16 +136,7 @@ async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) logger.info(f"{agent.name} start checking the status of the job") res = await mirror_async_methods(agent.get, resource_meta=agent.metadata_type.decode(request.resource_meta)) - if res.outputs is None: - outputs = None - elif isinstance(res.outputs, LiteralMap): - outputs = res.outputs.to_flyte_idl() - else: - ctx = FlyteContext.current_context() - outputs = TypeEngine.dict_to_literal_map_pb(ctx, res.outputs) - return GetTaskResponse( - resource=Resource(phase=res.phase, log_links=res.log_links, message=res.message, outputs=outputs) - ) + return GetTaskResponse(resource=res.to_flyte_idl()) @record_agent_metrics async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse: @@ -175,17 +166,7 @@ async def ExecuteTaskSync( literal_map = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None res = await mirror_async_methods(agent.do, task_template=template, inputs=literal_map) - if res.outputs is None: - outputs = None - elif isinstance(res.outputs, LiteralMap): - outputs = res.outputs.to_flyte_idl() - else: - ctx = FlyteContext.current_context() - outputs = TypeEngine.dict_to_literal_map_pb(ctx, res.outputs) - - header = ExecuteTaskSyncResponseHeader( - resource=Resource(phase=res.phase, log_links=res.log_links, message=res.message, outputs=outputs) - ) + header = ExecuteTaskSyncResponseHeader(resource=res.to_flyte_idl()) yield ExecuteTaskSyncResponse(header=header) request_success_count.labels(task_type=task_type, operation=do_operation).inc() except Exception as e: diff --git a/flytekit/extend/backend/base_agent.py b/flytekit/extend/backend/base_agent.py index 4d1d8956dac..4b3d38aecee 100644 --- a/flytekit/extend/backend/base_agent.py +++ b/flytekit/extend/backend/base_agent.py @@ -11,10 +11,12 @@ from types import FrameType, coroutine from typing import Any, Dict, List, Optional, Union -from flyteidl.admin.agent_pb2 import Agent +from flyteidl.admin.agent_pb2 import Agent, Resource as _Resource from flyteidl.admin.agent_pb2 import TaskCategory as _TaskCategory from flyteidl.core import literals_pb2 from flyteidl.core.execution_pb2 import TaskExecution, TaskLog +from google.protobuf import json_format +from google.protobuf.struct_pb2 import Struct from rich.logging import RichHandler from rich.progress import Progress @@ -27,6 +29,7 @@ from flytekit.exceptions.user import FlyteUserException from flytekit.extend.backend.utils import is_terminal_phase, mirror_async_methods, render_task_template from flytekit.loggers import set_flytekit_log_properties +from flytekit.models import common as _common from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskExecutionMetadata, TaskTemplate @@ -90,6 +93,48 @@ class Resource: message: Optional[str] = None log_links: Optional[List[TaskLog]] = None outputs: Optional[Union[LiteralMap, typing.Dict[str, Any]]] = None + custom_info: Optional[typing.Dict[str, Any]] = None + + def to_flyte_idl(self) -> _Resource: + if self.outputs is None: + outputs = None + elif isinstance(self.outputs, LiteralMap): + outputs = self.outputs.to_flyte_idl() + else: + ctx = FlyteContext.current_context() + outputs = TypeEngine.dict_to_literal_map_pb(ctx, self.outputs) + + return Agent.Resource( + phase=self.phase, + message=self.message, + log_links=self.log_links, + outputs=outputs, + custom_info=( + json_format.Parse(json.dumps(self.custom_info), Struct.Struct()) + if self.custom_info + else None + ), + ) + + @classmethod + def from_flyte_idl(cls, pb2_object: _Resource): + return cls( + phase=pb2_object.phase, + message=pb2_object.message if pb2_object.HasField("message") else None, + log_links=( + pb2_object.log_links if pb2_object.HasField("log_links") else None + ), + outputs=( + LiteralMap.from_flyte_idl(pb2_object.outputs) + if pb2_object.outputs + else None + ), + custom_info=( + json_format.MessageToDict(pb2_object.custom_info) + if pb2_object.HasField("custom_info") + else None + ), + ) class AgentBase(ABC): diff --git a/tests/flytekit/unit/extend/test_agent.py b/tests/flytekit/unit/extend/test_agent.py index 17db5c27887..4697278499f 100644 --- a/tests/flytekit/unit/extend/test_agent.py +++ b/tests/flytekit/unit/extend/test_agent.py @@ -70,7 +70,11 @@ def create(self, task_template: TaskTemplate, inputs: typing.Optional[LiteralMap return DummyMetadata(job_id=dummy_id) def get(self, resource_meta: DummyMetadata, **kwargs) -> Resource: - return Resource(phase=TaskExecution.SUCCEEDED, log_links=[TaskLog(name="console", uri="localhost:3000")]) + return Resource( + phase=TaskExecution.SUCCEEDED, + log_links=[TaskLog(name="console", uri="localhost:3000")], + custom_info={"custom": "info"}, + ) def delete(self, resource_meta: DummyMetadata, **kwargs): ... @@ -95,7 +99,11 @@ async def create( return DummyMetadata(job_id=dummy_id, output_path=output_path, task_name=task_name) async def get(self, resource_meta: DummyMetadata, **kwargs) -> Resource: - return Resource(phase=TaskExecution.SUCCEEDED, log_links=[TaskLog(name="console", uri="localhost:3000")]) + return Resource( + phase=TaskExecution.SUCCEEDED, + log_links=[TaskLog(name="console", uri="localhost:3000")], + custom_info={"custom": "info"}, + ) async def delete(self, resource_meta: DummyMetadata, **kwargs): ... @@ -172,6 +180,7 @@ def test_dummy_agent(): assert resource.phase == TaskExecution.SUCCEEDED assert resource.log_links[0].name == "console" assert resource.log_links[0].uri == "localhost:3000" + assert resource.custom_info["custom"] == "info" assert agent.delete(metadata) is None class DummyTask(AsyncAgentExecutorMixin, PythonFunctionTask):