From 22991515c67997379378df5fc8a996ec1cb04add Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 23 Aug 2023 11:57:52 -0700 Subject: [PATCH 01/14] wip Signed-off-by: Kevin Su --- flytekit/clis/sdk_in_container/serve.py | 4 +++- flytekit/extend/backend/agent_service.py | 5 +++++ .../flytekit-bigquery/flytekitplugins/bigquery/__init__.py | 1 + plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py | 2 +- 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index 1e14d6649e..3e963e967c 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -3,7 +3,7 @@ import click from flyteidl.service.agent_pb2_grpc import add_AsyncAgentServiceServicer_to_server from grpc import aio - +from prometheus_client import start_http_server from flytekit.extend.backend.agent_service import AsyncAgentService _serve_help = """Start a grpc server for the agent service.""" @@ -43,6 +43,8 @@ def serve(_: click.Context, port, worker, timeout): async def _start_grpc_server(port: int, worker: int, timeout: int): + click.secho("Starting up the server to expose the prometheus metrics...", fg="blue") + start_http_server(9090) click.secho("Starting the agent service...", fg="blue") server = aio.server(futures.ThreadPoolExecutor(max_workers=worker)) add_AsyncAgentServiceServicer_to_server(AsyncAgentService(), server) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index 470bd01e2e..fd7a898622 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -12,11 +12,15 @@ Resource, ) from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceServicer +from prometheus_client import Histogram from flytekit import logger from flytekit.extend.backend.base_agent import AgentRegistry from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate +from prometheus_async.aio import time + +req_process_time = Histogram('request_processing_seconds', 'Time spent processing get request') class AsyncAgentService(AsyncAgentServiceServicer): @@ -48,6 +52,7 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to create task with error {e}") + @time(req_process_time) async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse: try: agent = AgentRegistry.get_agent(context, request.task_type) diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py index 0e0fe80bc7..bac1dd03ef 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py @@ -14,3 +14,4 @@ from .agent import BigQueryAgent from .task import BigQueryConfig, BigQueryTask +from .sleep_agent import SleepAgent diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py index 4ddb26cdfd..dbb4a63392 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py @@ -43,7 +43,7 @@ class Metadata: class BigQueryAgent(AgentBase): def __init__(self): - super().__init__(task_type="bigquery_query_job_task", asynchronous=False) + super().__init__(task_type="bigquery_query_job_task1", asynchronous=False) def create( self, From 03466028c13bbe0cc0181842cc2d4bddab374129 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 28 Aug 2023 11:50:20 -0700 Subject: [PATCH 02/14] wip Signed-off-by: Kevin Su --- flytekit/clis/sdk_in_container/serve.py | 1 + flytekit/extend/backend/agent_service.py | 41 ++++++++++++++----- .../flytekitplugins/bigquery/__init__.py | 2 +- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index 3e963e967c..af9e7dda68 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -4,6 +4,7 @@ from flyteidl.service.agent_pb2_grpc import add_AsyncAgentServiceServicer_to_server from grpc import aio from prometheus_client import start_http_server + from flytekit.extend.backend.agent_service import AsyncAgentService _serve_help = """Start a grpc server for the agent service.""" diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index fd7a898622..185634d07e 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -12,20 +12,24 @@ Resource, ) from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceServicer -from prometheus_client import Histogram +from prometheus_async.aio import time +from prometheus_client import Gauge, Histogram from flytekit import logger from flytekit.extend.backend.base_agent import AgentRegistry from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate -from prometheus_async.aio import time -req_process_time = Histogram('request_processing_seconds', 'Time spent processing get request') +request_count = Gauge("request_count", "Total number of requests") +create_req_process_time = Histogram("create_request_processing_seconds", "Time spent processing agent create request") +get_req_process_time = Histogram("get_request_processing_seconds", "Time spent processing agent get request") +delete_req_process_time = Histogram("delete_request_processing_seconds", "Time spent processing agent delete request") class AsyncAgentService(AsyncAgentServiceServicer): async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> CreateTaskResponse: try: + request_count.inc() tmp = TaskTemplate.from_flyte_idl(request.template) inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None agent = AgentRegistry.get_agent(context, tmp.type) @@ -34,16 +38,20 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon return CreateTaskResponse() if agent.asynchronous: try: - return await agent.async_create( + res = await agent.async_create( context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp ) + request_count.dec() + return res except Exception as e: logger.error(f"failed to run async create with error {e}") raise e try: - return await asyncio.to_thread( + res = await asyncio.to_thread( agent.create, context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp ) + request_count.dec() + return res except Exception as e: logger.error(f"failed to run sync create with error {e}") raise @@ -51,22 +59,28 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon logger.error(f"failed to create task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to create task with error {e}") + request_count.dec() - @time(req_process_time) + @time(get_req_process_time) async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse: try: + request_count.inc() agent = AgentRegistry.get_agent(context, request.task_type) logger.info(f"{agent.task_type} agent start checking the status of the job") if agent is None: return GetTaskResponse(resource=Resource(state=PERMANENT_FAILURE)) if agent.asynchronous: try: - return await agent.async_get(context=context, resource_meta=request.resource_meta) + res = await agent.async_get(context=context, resource_meta=request.resource_meta) + request_count.dec() + return res except Exception as e: logger.error(f"failed to run async get with error {e}") raise try: - return await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta) + res = await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta) + request_count.dec() + return res except Exception as e: logger.error(f"failed to run sync get with error {e}") raise @@ -74,21 +88,27 @@ async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) logger.error(f"failed to get task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to get task with error {e}") + request_count.dec() async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse: try: + request_count.inc() agent = AgentRegistry.get_agent(context, request.task_type) logger.info(f"{agent.task_type} agent start deleting the job") if agent is None: return DeleteTaskResponse() if agent.asynchronous: try: - return await agent.async_delete(context=context, resource_meta=request.resource_meta) + res = await agent.async_delete(context=context, resource_meta=request.resource_meta) + request_count.dec() + return res except Exception as e: logger.error(f"failed to run async delete with error {e}") raise try: - return asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta) + res = asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta) + request_count.dec() + return res except Exception as e: logger.error(f"failed to run sync delete with error {e}") raise @@ -96,3 +116,4 @@ async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerCon logger.error(f"failed to delete task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to delete task with error {e}") + request_count.dec() diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py index bac1dd03ef..a81672ea41 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py @@ -13,5 +13,5 @@ """ from .agent import BigQueryAgent -from .task import BigQueryConfig, BigQueryTask from .sleep_agent import SleepAgent +from .task import BigQueryConfig, BigQueryTask From ebb343587b97c1cf0fd36665ef3d099a4fc80c21 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 28 Aug 2023 12:32:07 -0700 Subject: [PATCH 03/14] Add sleep agent Signed-off-by: Kevin Su --- plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py index a81672ea41..0e0fe80bc7 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/__init__.py @@ -13,5 +13,4 @@ """ from .agent import BigQueryAgent -from .sleep_agent import SleepAgent from .task import BigQueryConfig, BigQueryTask From 789c7200c5e30ba554f4a3c2956494956b4c9bea Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 30 Aug 2023 11:16:55 -0700 Subject: [PATCH 04/14] Use summary instead Signed-off-by: Kevin Su --- flytekit/extend/backend/agent_service.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index 185634d07e..6690b3c359 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -13,7 +13,7 @@ ) from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceServicer from prometheus_async.aio import time -from prometheus_client import Gauge, Histogram +from prometheus_client import Gauge, Histogram, Summary from flytekit import logger from flytekit.extend.backend.base_agent import AgentRegistry @@ -21,9 +21,9 @@ from flytekit.models.task import TaskTemplate request_count = Gauge("request_count", "Total number of requests") -create_req_process_time = Histogram("create_request_processing_seconds", "Time spent processing agent create request") -get_req_process_time = Histogram("get_request_processing_seconds", "Time spent processing agent get request") -delete_req_process_time = Histogram("delete_request_processing_seconds", "Time spent processing agent delete request") +create_req_process_time = Summary("create_request_processing_seconds", "Time spent processing agent create request") +get_req_process_time = Summary("get_request_processing_seconds", "Time spent processing agent get request") +delete_req_process_time = Summary("delete_request_processing_seconds", "Time spent processing agent delete request") class AsyncAgentService(AsyncAgentServiceServicer): From a30c25e6ae2f469cf27b6e7837d977fb441e30b5 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 30 Aug 2023 11:33:03 -0700 Subject: [PATCH 05/14] Update counter Signed-off-by: Kevin Su --- flytekit/extend/backend/agent_service.py | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index 6690b3c359..093119b475 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -13,14 +13,16 @@ ) from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceServicer from prometheus_async.aio import time -from prometheus_client import Gauge, Histogram, Summary +from prometheus_client import Counter, Summary from flytekit import logger from flytekit.extend.backend.base_agent import AgentRegistry from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate -request_count = Gauge("request_count", "Total number of requests") +request_count = Counter("request_count", "Total number of requests") +request_success_count = Counter("request_success_count", "Total number of successful requests") +request_failure_count = Counter("request_failure_count", "Total number of failed requests") create_req_process_time = Summary("create_request_processing_seconds", "Time spent processing agent create request") get_req_process_time = Summary("get_request_processing_seconds", "Time spent processing agent get request") delete_req_process_time = Summary("delete_request_processing_seconds", "Time spent processing agent delete request") @@ -41,7 +43,7 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon res = await agent.async_create( context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp ) - request_count.dec() + request_success_count.inc() return res except Exception as e: logger.error(f"failed to run async create with error {e}") @@ -50,7 +52,7 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon res = await asyncio.to_thread( agent.create, context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp ) - request_count.dec() + request_success_count.inc() return res except Exception as e: logger.error(f"failed to run sync create with error {e}") @@ -59,7 +61,7 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon logger.error(f"failed to create task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to create task with error {e}") - request_count.dec() + request_failure_count.inc() @time(get_req_process_time) async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse: @@ -72,14 +74,14 @@ async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) if agent.asynchronous: try: res = await agent.async_get(context=context, resource_meta=request.resource_meta) - request_count.dec() + request_success_count.inc() return res except Exception as e: logger.error(f"failed to run async get with error {e}") raise try: res = await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta) - request_count.dec() + request_success_count.inc() return res except Exception as e: logger.error(f"failed to run sync get with error {e}") @@ -88,7 +90,7 @@ async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) logger.error(f"failed to get task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to get task with error {e}") - request_count.dec() + request_failure_count.inc() async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse: try: @@ -100,14 +102,14 @@ async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerCon if agent.asynchronous: try: res = await agent.async_delete(context=context, resource_meta=request.resource_meta) - request_count.dec() + request_success_count.inc() return res except Exception as e: logger.error(f"failed to run async delete with error {e}") raise try: res = asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta) - request_count.dec() + request_success_count.inc() return res except Exception as e: logger.error(f"failed to run sync delete with error {e}") @@ -116,4 +118,4 @@ async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerCon logger.error(f"failed to delete task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to delete task with error {e}") - request_count.dec() + request_failure_count.inc() From cc5a1aa71a3cb84239d15929f9041994342a88fe Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 30 Aug 2023 12:40:48 -0700 Subject: [PATCH 06/14] update metric Signed-off-by: Kevin Su --- flytekit/extend/backend/agent_service.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index 093119b475..54a97a005a 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -29,6 +29,7 @@ class AsyncAgentService(AsyncAgentServiceServicer): + @time(create_req_process_time) async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> CreateTaskResponse: try: request_count.inc() @@ -92,6 +93,7 @@ async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) context.set_details(f"failed to get task with error {e}") request_failure_count.inc() + @time(delete_req_process_time) async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse: try: request_count.inc() From 18a2de6fe09b1096ba1e3da439ff8eef1b17fe81 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 31 Aug 2023 15:32:56 -0700 Subject: [PATCH 07/14] prometheus metric Signed-off-by: Kevin Su --- flytekit/extend/backend/agent_service.py | 149 ++++++++++++----------- 1 file changed, 81 insertions(+), 68 deletions(-) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index 54a97a005a..cfe42c57e4 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -12,7 +12,6 @@ Resource, ) from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceServicer -from prometheus_async.aio import time from prometheus_client import Counter, Summary from flytekit import logger @@ -20,104 +19,118 @@ from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate -request_count = Counter("request_count", "Total number of requests") -request_success_count = Counter("request_success_count", "Total number of successful requests") -request_failure_count = Counter("request_failure_count", "Total number of failed requests") -create_req_process_time = Summary("create_request_processing_seconds", "Time spent processing agent create request") -get_req_process_time = Summary("get_request_processing_seconds", "Time spent processing agent get request") -delete_req_process_time = Summary("delete_request_processing_seconds", "Time spent processing agent delete request") +request_count = Counter("request_count", "Total number of requests", ["task_type"]) +request_success_count = Counter("request_success_count", "Total number of successful requests", ["task_type"]) +request_failure_count = Counter("request_failure_count", "Total number of failed requests", ["task_type"]) + +create_req_process_time = Summary( + "create_request_processing_seconds", "Time spent processing agent create request", ["task_type"] +) +get_req_process_time = Summary( + "get_request_processing_seconds", "Time spent processing agent get request", ["task_type"] +) +delete_req_process_time = Summary( + "delete_request_processing_seconds", "Time spent processing agent delete request", ["task_type"] +) + +input_literal_size = Summary("input_literal_size", "Size of input literal", ["task_type"]) class AsyncAgentService(AsyncAgentServiceServicer): - @time(create_req_process_time) async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> CreateTaskResponse: try: - request_count.inc() - tmp = TaskTemplate.from_flyte_idl(request.template) - inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None - agent = AgentRegistry.get_agent(context, tmp.type) - logger.info(f"{tmp.type} agent start creating the job") - if agent is None: - return CreateTaskResponse() - if agent.asynchronous: + with create_req_process_time.labels(task_type=request.template.type).time(): + tmp = TaskTemplate.from_flyte_idl(request.template) + inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None + + request_count.labels(task_type=tmp.type).inc() + input_literal_size.labels(task_type=tmp.type).observe(request.inputs.ByteSize()) + + agent = AgentRegistry.get_agent(context, tmp.type) + logger.info(f"{tmp.type} agent start creating the job") + + if agent is None: + return CreateTaskResponse() + if agent.asynchronous: + try: + res = await agent.async_create( + context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp + ) + request_success_count.inc() + return res + except Exception as e: + logger.error(f"failed to run async create with error {e}") + raise e try: - res = await agent.async_create( - context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp + res = await asyncio.to_thread( + agent.create, + context=context, + inputs=inputs, + output_prefix=request.output_prefix, + task_template=tmp, ) - request_success_count.inc() + request_success_count.labels(task_type=tmp.type).inc() return res except Exception as e: - logger.error(f"failed to run async create with error {e}") - raise e - try: - res = await asyncio.to_thread( - agent.create, context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp - ) - request_success_count.inc() - return res - except Exception as e: - logger.error(f"failed to run sync create with error {e}") - raise + logger.error(f"failed to run sync create with error {e}") + raise except Exception as e: - logger.error(f"failed to create task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to create task with error {e}") - request_failure_count.inc() + request_failure_count.labels(task_type=tmp.type).inc() - @time(get_req_process_time) async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse: try: - request_count.inc() - agent = AgentRegistry.get_agent(context, request.task_type) - logger.info(f"{agent.task_type} agent start checking the status of the job") - if agent is None: - return GetTaskResponse(resource=Resource(state=PERMANENT_FAILURE)) - if agent.asynchronous: + with get_req_process_time.labels(task_type=request.task_type).time(): + request_count.labels(task_type=request.task_type).inc() + agent = AgentRegistry.get_agent(context, request.task_type) + logger.info(f"{agent.task_type} agent start checking the status of the job") + if agent is None: + return GetTaskResponse(resource=Resource(state=PERMANENT_FAILURE)) + if agent.asynchronous: + try: + res = await agent.async_get(context=context, resource_meta=request.resource_meta) + request_success_count.labels(task_type=request.task_type).inc() + return res + except Exception as e: + logger.error(f"failed to run async get with error {e}") + raise try: - res = await agent.async_get(context=context, resource_meta=request.resource_meta) + res = await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta) request_success_count.inc() return res except Exception as e: - logger.error(f"failed to run async get with error {e}") + logger.error(f"failed to run sync get with error {e}") raise - try: - res = await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta) - request_success_count.inc() - return res - except Exception as e: - logger.error(f"failed to run sync get with error {e}") - raise except Exception as e: - logger.error(f"failed to get task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to get task with error {e}") - request_failure_count.inc() + request_failure_count.labels(task_type=request.task_type).inc() - @time(delete_req_process_time) async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse: try: - request_count.inc() - agent = AgentRegistry.get_agent(context, request.task_type) - logger.info(f"{agent.task_type} agent start deleting the job") - if agent is None: - return DeleteTaskResponse() - if agent.asynchronous: + with delete_req_process_time.labels(task_type=request.task_type).time(): + request_count.labels(task_type=request.task_type).inc() + agent = AgentRegistry.get_agent(context, request.task_type) + logger.info(f"{agent.task_type} agent start deleting the job") + if agent is None: + return DeleteTaskResponse() + if agent.asynchronous: + try: + res = await agent.async_delete(context=context, resource_meta=request.resource_meta) + request_success_count.inc() + return res + except Exception as e: + logger.error(f"failed to run async delete with error {e}") + raise try: - res = await agent.async_delete(context=context, resource_meta=request.resource_meta) + res = asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta) request_success_count.inc() return res except Exception as e: - logger.error(f"failed to run async delete with error {e}") + logger.error(f"failed to run sync delete with error {e}") raise - try: - res = asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta) - request_success_count.inc() - return res - except Exception as e: - logger.error(f"failed to run sync delete with error {e}") - raise except Exception as e: - logger.error(f"failed to delete task with error {e}") context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to delete task with error {e}") - request_failure_count.inc() + request_failure_count.labels(task_type=request.task_type).inc() From 0ddd31737f12c00fbbaa46eab3fe0595d791e78a Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 31 Aug 2023 15:41:50 -0700 Subject: [PATCH 08/14] nit Signed-off-by: Kevin Su --- Dockerfile.agent | 1 + plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/Dockerfile.agent b/Dockerfile.agent index 2194f5de23..79dfb5b9d0 100644 --- a/Dockerfile.agent +++ b/Dockerfile.agent @@ -4,6 +4,7 @@ MAINTAINER Flyte Team LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit ARG VERSION +RUN pip install prometheus-client RUN pip install -U flytekit==$VERSION flytekitplugins-bigquery==$VERSION CMD pyflyte serve --port 8000 diff --git a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py index dbb4a63392..4ddb26cdfd 100644 --- a/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py +++ b/plugins/flytekit-bigquery/flytekitplugins/bigquery/agent.py @@ -43,7 +43,7 @@ class Metadata: class BigQueryAgent(AgentBase): def __init__(self): - super().__init__(task_type="bigquery_query_job_task1", asynchronous=False) + super().__init__(task_type="bigquery_query_job_task", asynchronous=False) def create( self, From 1132fb90739caaeb85c70ddd46327ce372d18e24 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 31 Aug 2023 15:48:54 -0700 Subject: [PATCH 09/14] nit Signed-off-by: Kevin Su --- flytekit/extend/backend/agent_service.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index cfe42c57e4..293b1737e0 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -56,7 +56,7 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon res = await agent.async_create( context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp ) - request_success_count.inc() + request_success_count.labels(task_type=tmp.type).inc() return res except Exception as e: logger.error(f"failed to run async create with error {e}") @@ -97,7 +97,7 @@ async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) raise try: res = await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta) - request_success_count.inc() + request_success_count.labels(task_type=request.task_type).inc() return res except Exception as e: logger.error(f"failed to run sync get with error {e}") @@ -118,14 +118,14 @@ async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerCon if agent.asynchronous: try: res = await agent.async_delete(context=context, resource_meta=request.resource_meta) - request_success_count.inc() + request_success_count.labels(task_type=request.task_type).inc() return res except Exception as e: logger.error(f"failed to run async delete with error {e}") raise try: res = asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta) - request_success_count.inc() + request_success_count.labels(task_type=request.task_type).inc() return res except Exception as e: logger.error(f"failed to run sync delete with error {e}") From 9c60fc2bee7d4309130cd196db7e8815d230f4f4 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Thu, 31 Aug 2023 16:30:33 -0700 Subject: [PATCH 10/14] Update dev-req Signed-off-by: Kevin Su --- dev-requirements.in | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dev-requirements.in b/dev-requirements.in index bd16ba151a..d3d9d46348 100644 --- a/dev-requirements.in +++ b/dev-requirements.in @@ -1,4 +1,4 @@ --r requirements.in +re-r requirements.in coverage[toml] hypothesis @@ -30,3 +30,4 @@ types-protobuf types-croniter types-mock autoflake +prometheus-client From 07cc22d3c4d483b3a39f301854deceb6222bbdae Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 1 Sep 2023 09:13:18 -0700 Subject: [PATCH 11/14] Update dev-req Signed-off-by: Kevin Su --- dev-requirements.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-requirements.in b/dev-requirements.in index d3d9d46348..51fc48911b 100644 --- a/dev-requirements.in +++ b/dev-requirements.in @@ -1,4 +1,4 @@ -re-r requirements.in +-r requirements.in coverage[toml] hypothesis From edb951a24967e7484b1b76e15aaeb73bb3435d0c Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 8 Sep 2023 12:30:21 -0700 Subject: [PATCH 12/14] update metric Signed-off-by: Kevin Su --- flytekit/extend/backend/agent_service.py | 25 ++++++++---------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index 293b1737e0..3ed905cedf 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -19,27 +19,18 @@ from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate -request_count = Counter("request_count", "Total number of requests", ["task_type"]) -request_success_count = Counter("request_success_count", "Total number of successful requests", ["task_type"]) -request_failure_count = Counter("request_failure_count", "Total number of failed requests", ["task_type"]) +request_count = Counter("flyte_agent_req_count", "Total number of requests", ["task_type"]) +request_success_count = Counter("flyte_agent_req_success_count", "Total number of successful requests", ["task_type"]) +request_failure_count = Counter("flyte_agent_req_failure_count", "Total number of failed requests", ["task_type"]) -create_req_process_time = Summary( - "create_request_processing_seconds", "Time spent processing agent create request", ["task_type"] -) -get_req_process_time = Summary( - "get_request_processing_seconds", "Time spent processing agent get request", ["task_type"] -) -delete_req_process_time = Summary( - "delete_request_processing_seconds", "Time spent processing agent delete request", ["task_type"] -) - -input_literal_size = Summary("input_literal_size", "Size of input literal", ["task_type"]) +request_latency = Summary("flyte_agent_req_latency", "Time spent processing agent request", ["task_type", "operation"]) +input_literal_size = Summary("flyte_agent_input_literal_size", "Size of input literal", ["task_type"]) class AsyncAgentService(AsyncAgentServiceServicer): async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> CreateTaskResponse: try: - with create_req_process_time.labels(task_type=request.template.type).time(): + with request_latency.labels(task_type=request.template.type, operation="create").time(): tmp = TaskTemplate.from_flyte_idl(request.template) inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None @@ -81,7 +72,7 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse: try: - with get_req_process_time.labels(task_type=request.task_type).time(): + with request_latency.labels(task_type=request.task_type, operation="get").time(): request_count.labels(task_type=request.task_type).inc() agent = AgentRegistry.get_agent(context, request.task_type) logger.info(f"{agent.task_type} agent start checking the status of the job") @@ -109,7 +100,7 @@ async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse: try: - with delete_req_process_time.labels(task_type=request.task_type).time(): + with request_latency.labels(task_type=request.task_type, operation="delete").time(): request_count.labels(task_type=request.task_type).inc() agent = AgentRegistry.get_agent(context, request.task_type) logger.info(f"{agent.task_type} agent start deleting the job") From 1679b9fcd9cd569bf7640850f67fe8beeba09869 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Fri, 8 Sep 2023 12:45:53 -0700 Subject: [PATCH 13/14] update metric Signed-off-by: Kevin Su --- flytekit/extend/backend/agent_service.py | 36 +++++++++++++----------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/flytekit/extend/backend/agent_service.py b/flytekit/extend/backend/agent_service.py index 86c3562668..fc9665fc55 100644 --- a/flytekit/extend/backend/agent_service.py +++ b/flytekit/extend/backend/agent_service.py @@ -17,22 +17,28 @@ from flytekit.models.literals import LiteralMap from flytekit.models.task import TaskTemplate -request_count = Counter("flyte_agent_req_count", "Total number of requests", ["task_type"]) -request_success_count = Counter("flyte_agent_req_success_count", "Total number of successful requests", ["task_type"]) -request_failure_count = Counter("flyte_agent_req_failure_count", "Total number of failed requests", ["task_type"]) +request_success_count = Counter( + "flyte_agent_req_success_count", "Total number of successful requests", ["task_type", "operation"] +) +request_failure_count = Counter( + "flyte_agent_req_failure_count", "Total number of failed requests", ["task_type", "operation"] +) request_latency = Summary("flyte_agent_req_latency", "Time spent processing agent request", ["task_type", "operation"]) input_literal_size = Summary("flyte_agent_input_literal_size", "Size of input literal", ["task_type"]) +create_operation = "create" +get_operation = "get" +delete_operation = "delete" + class AsyncAgentService(AsyncAgentServiceServicer): async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> CreateTaskResponse: try: - with request_latency.labels(task_type=request.template.type, operation="create").time(): + with request_latency.labels(task_type=request.template.type, operation=create_operation).time(): tmp = TaskTemplate.from_flyte_idl(request.template) inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None - request_count.labels(task_type=tmp.type).inc() input_literal_size.labels(task_type=tmp.type).observe(request.inputs.ByteSize()) agent = AgentRegistry.get_agent(tmp.type) @@ -42,7 +48,7 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon res = await agent.async_create( context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp ) - request_success_count.labels(task_type=tmp.type).inc() + request_success_count.labels(task_type=tmp.type, operation=create_operation).inc() return res except Exception as e: logger.error(f"failed to run async create with error {e}") @@ -55,7 +61,7 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon output_prefix=request.output_prefix, task_template=tmp, ) - request_success_count.labels(task_type=tmp.type).inc() + request_success_count.labels(task_type=tmp.type, operation=create_operation).inc() return res except Exception as e: logger.error(f"failed to run sync create with error {e}") @@ -63,25 +69,24 @@ async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerCon except Exception as e: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to create task with error {e}") - request_failure_count.labels(task_type=tmp.type).inc() + request_failure_count.labels(task_type=tmp.type, operation=create_operation).inc() async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse: try: with request_latency.labels(task_type=request.task_type, operation="get").time(): - request_count.labels(task_type=request.task_type).inc() agent = AgentRegistry.get_agent(request.task_type) logger.info(f"{agent.task_type} agent start checking the status of the job") if agent.asynchronous: try: res = await agent.async_get(context=context, resource_meta=request.resource_meta) - request_success_count.labels(task_type=request.task_type).inc() + request_success_count.labels(task_type=request.task_type, operation=get_operation).inc() return res except Exception as e: logger.error(f"failed to run async get with error {e}") raise try: res = await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta) - request_success_count.labels(task_type=request.task_type).inc() + request_success_count.labels(task_type=request.task_type, operation=get_operation).inc() return res except Exception as e: logger.error(f"failed to run sync get with error {e}") @@ -89,25 +94,24 @@ async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) except Exception as e: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to get task with error {e}") - request_failure_count.labels(task_type=request.task_type).inc() + request_failure_count.labels(task_type=request.task_type, operation=get_operation).inc() async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse: try: with request_latency.labels(task_type=request.task_type, operation="delete").time(): - request_count.labels(task_type=request.task_type).inc() agent = AgentRegistry.get_agent(request.task_type) logger.info(f"{agent.task_type} agent start deleting the job") if agent.asynchronous: try: res = await agent.async_delete(context=context, resource_meta=request.resource_meta) - request_success_count.labels(task_type=request.task_type).inc() + request_success_count.labels(task_type=request.task_type, operation=delete_operation).inc() return res except Exception as e: logger.error(f"failed to run async delete with error {e}") raise try: res = asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta) - request_success_count.labels(task_type=request.task_type).inc() + request_success_count.labels(task_type=request.task_type, operation=delete_operation).inc() return res except Exception as e: logger.error(f"failed to run sync delete with error {e}") @@ -115,4 +119,4 @@ async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerCon except Exception as e: context.set_code(grpc.StatusCode.INTERNAL) context.set_details(f"failed to delete task with error {e}") - request_failure_count.labels(task_type=request.task_type).inc() + request_failure_count.labels(task_type=request.task_type, operation=delete_operation).inc() From 2f4833f1b70d26aee2e29673766dbeb5aae75fb4 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 12 Sep 2023 13:03:25 -0700 Subject: [PATCH 14/14] nit Signed-off-by: Kevin Su --- flytekit/clis/sdk_in_container/serve.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flytekit/clis/sdk_in_container/serve.py b/flytekit/clis/sdk_in_container/serve.py index af9e7dda68..2aed58312e 100644 --- a/flytekit/clis/sdk_in_container/serve.py +++ b/flytekit/clis/sdk_in_container/serve.py @@ -3,7 +3,6 @@ import click from flyteidl.service.agent_pb2_grpc import add_AsyncAgentServiceServicer_to_server from grpc import aio -from prometheus_client import start_http_server from flytekit.extend.backend.agent_service import AsyncAgentService @@ -45,7 +44,12 @@ def serve(_: click.Context, port, worker, timeout): async def _start_grpc_server(port: int, worker: int, timeout: int): click.secho("Starting up the server to expose the prometheus metrics...", fg="blue") - start_http_server(9090) + try: + from prometheus_client import start_http_server + + start_http_server(9090) + except ImportError as e: + click.secho(f"Failed to start the prometheus server with error {e}", fg="red") click.secho("Starting the agent service...", fg="blue") server = aio.server(futures.ThreadPoolExecutor(max_workers=worker)) add_AsyncAgentServiceServicer_to_server(AsyncAgentService(), server)