Skip to content
This repository has been archived by the owner on Jul 19, 2024. It is now read-only.

Commit

Permalink
Prometheus metrics (flyteorg#1815)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Future Outlier <[email protected]>
  • Loading branch information
pingsutw authored and Future Outlier committed Oct 3, 2023
1 parent 6e2cec1 commit 0bea129
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 36 deletions.
1 change: 1 addition & 0 deletions Dockerfile.agent
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ MAINTAINER Flyte Team <[email protected]>
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit

ARG VERSION
RUN pip install prometheus-client
RUN pip install -U flytekit==$VERSION flytekitplugins-bigquery==$VERSION

CMD pyflyte serve --port 8000
1 change: 1 addition & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ types-protobuf
types-croniter
types-mock
autoflake
prometheus-client
7 changes: 7 additions & 0 deletions flytekit/clis/sdk_in_container/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ def serve(_: click.Context, port, worker, timeout):


async def _start_grpc_server(port: int, worker: int, timeout: int):
click.secho("Starting up the server to expose the prometheus metrics...", fg="blue")
try:
from prometheus_client import start_http_server

start_http_server(9090)
except ImportError as e:
click.secho(f"Failed to start the prometheus server with error {e}", fg="red")
click.secho("Starting the agent service...", fg="blue")
server = aio.server(futures.ThreadPoolExecutor(max_workers=worker))
add_AsyncAgentServiceServicer_to_server(AsyncAgentService(), server)
Expand Down
112 changes: 76 additions & 36 deletions flytekit/extend/backend/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,73 +10,113 @@
GetTaskResponse,
)
from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceServicer
from prometheus_client import Counter, Summary

from flytekit import logger
from flytekit.extend.backend.base_agent import AgentRegistry
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate

request_success_count = Counter(
"flyte_agent_req_success_count", "Total number of successful requests", ["task_type", "operation"]
)
request_failure_count = Counter(
"flyte_agent_req_failure_count", "Total number of failed requests", ["task_type", "operation"]
)

request_latency = Summary("flyte_agent_req_latency", "Time spent processing agent request", ["task_type", "operation"])
input_literal_size = Summary("flyte_agent_input_literal_size", "Size of input literal", ["task_type"])

create_operation = "create"
get_operation = "get"
delete_operation = "delete"


class AsyncAgentService(AsyncAgentServiceServicer):
async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> CreateTaskResponse:
try:
tmp = TaskTemplate.from_flyte_idl(request.template)
inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None
agent = AgentRegistry.get_agent(tmp.type)
logger.info(f"{tmp.type} agent start creating the job")
if agent.asynchronous:
with request_latency.labels(task_type=request.template.type, operation=create_operation).time():
tmp = TaskTemplate.from_flyte_idl(request.template)
inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None

input_literal_size.labels(task_type=tmp.type).observe(request.inputs.ByteSize())

agent = AgentRegistry.get_agent(tmp.type)
logger.info(f"{tmp.type} agent start creating the job")
if agent.asynchronous:
try:
res = await agent.async_create(
context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp
)
request_success_count.labels(task_type=tmp.type, operation=create_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async create with error {e}")
raise e
try:
return await agent.async_create(
context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp
res = await asyncio.to_thread(
agent.create,
context=context,
inputs=inputs,
output_prefix=request.output_prefix,
task_template=tmp,
)
request_success_count.labels(task_type=tmp.type, operation=create_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async create with error {e}")
raise e
try:
return await asyncio.to_thread(
agent.create, context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp
)
except Exception as e:
logger.error(f"failed to run sync create with error {e}")
raise
logger.error(f"failed to run sync create with error {e}")
raise
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to create task with error {e}")
request_failure_count.labels(task_type=tmp.type, operation=create_operation).inc()

async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse:
try:
agent = AgentRegistry.get_agent(request.task_type)
logger.info(f"{agent.task_type} agent start checking the status of the job")
if agent.asynchronous:
with request_latency.labels(task_type=request.task_type, operation="get").time():
agent = AgentRegistry.get_agent(request.task_type)
logger.info(f"{agent.task_type} agent start checking the status of the job")
if agent.asynchronous:
try:
res = await agent.async_get(context=context, resource_meta=request.resource_meta)
request_success_count.labels(task_type=request.task_type, operation=get_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async get with error {e}")
raise
try:
return await agent.async_get(context=context, resource_meta=request.resource_meta)
res = await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta)
request_success_count.labels(task_type=request.task_type, operation=get_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async get with error {e}")
logger.error(f"failed to run sync get with error {e}")
raise
try:
return await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta)
except Exception as e:
logger.error(f"failed to run sync get with error {e}")
raise
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to get task with error {e}")
request_failure_count.labels(task_type=request.task_type, operation=get_operation).inc()

async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse:
try:
agent = AgentRegistry.get_agent(request.task_type)
logger.info(f"{agent.task_type} agent start deleting the job")
if agent.asynchronous:
with request_latency.labels(task_type=request.task_type, operation="delete").time():
agent = AgentRegistry.get_agent(request.task_type)
logger.info(f"{agent.task_type} agent start deleting the job")
if agent.asynchronous:
try:
res = await agent.async_delete(context=context, resource_meta=request.resource_meta)
request_success_count.labels(task_type=request.task_type, operation=delete_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async delete with error {e}")
raise
try:
return await agent.async_delete(context=context, resource_meta=request.resource_meta)
res = asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta)
request_success_count.labels(task_type=request.task_type, operation=delete_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async delete with error {e}")
logger.error(f"failed to run sync delete with error {e}")
raise
try:
return asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta)
except Exception as e:
logger.error(f"failed to run sync delete with error {e}")
raise
except Exception as e:
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to delete task with error {e}")
request_failure_count.labels(task_type=request.task_type, operation=delete_operation).inc()

0 comments on commit 0bea129

Please sign in to comment.