Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus metrics #1815

Merged
merged 15 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Dockerfile.agent
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ MAINTAINER Flyte Team <[email protected]>
LABEL org.opencontainers.image.source=https://github.com/flyteorg/flytekit

ARG VERSION
RUN pip install prometheus-client
RUN pip install -U flytekit==$VERSION flytekitplugins-bigquery==$VERSION

CMD pyflyte serve --port 8000
1 change: 1 addition & 0 deletions dev-requirements.in
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ types-protobuf
types-croniter
types-mock
autoflake
prometheus-client
7 changes: 7 additions & 0 deletions flytekit/clis/sdk_in_container/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ def serve(_: click.Context, port, worker, timeout):


async def _start_grpc_server(port: int, worker: int, timeout: int):
click.secho("Starting up the server to expose the prometheus metrics...", fg="blue")
try:
from prometheus_client import start_http_server

start_http_server(9090)
except ImportError as e:
click.secho(f"Failed to start the prometheus server with error {e}", fg="red")
click.secho("Starting the agent service...", fg="blue")
server = aio.server(futures.ThreadPoolExecutor(max_workers=worker))
add_AsyncAgentServiceServicer_to_server(AsyncAgentService(), server)
Expand Down
115 changes: 76 additions & 39 deletions flytekit/extend/backend/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,113 @@
GetTaskResponse,
)
from flyteidl.service.agent_pb2_grpc import AsyncAgentServiceServicer
from prometheus_client import Counter, Summary

from flytekit import logger
from flytekit.extend.backend.base_agent import AgentRegistry
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate

request_success_count = Counter(
"flyte_agent_req_success_count", "Total number of successful requests", ["task_type", "operation"]
)
request_failure_count = Counter(
"flyte_agent_req_failure_count", "Total number of failed requests", ["task_type", "operation"]
)

request_latency = Summary("flyte_agent_req_latency", "Time spent processing agent request", ["task_type", "operation"])
input_literal_size = Summary("flyte_agent_input_literal_size", "Size of input literal", ["task_type"])

create_operation = "create"
get_operation = "get"
delete_operation = "delete"


class AsyncAgentService(AsyncAgentServiceServicer):
async def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> CreateTaskResponse:
try:
tmp = TaskTemplate.from_flyte_idl(request.template)
inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None
agent = AgentRegistry.get_agent(tmp.type)
logger.info(f"{tmp.type} agent start creating the job")
if agent.asynchronous:
with request_latency.labels(task_type=request.template.type, operation=create_operation).time():
tmp = TaskTemplate.from_flyte_idl(request.template)
inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None

input_literal_size.labels(task_type=tmp.type).observe(request.inputs.ByteSize())

agent = AgentRegistry.get_agent(tmp.type)
logger.info(f"{tmp.type} agent start creating the job")
if agent.asynchronous:
try:
res = await agent.async_create(
context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp
)
request_success_count.labels(task_type=tmp.type, operation=create_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async create with error {e}")
raise e
try:
return await agent.async_create(
context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp
res = await asyncio.to_thread(
agent.create,
context=context,
inputs=inputs,
output_prefix=request.output_prefix,
task_template=tmp,
)
request_success_count.labels(task_type=tmp.type, operation=create_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async create with error {e}")
raise e
try:
return await asyncio.to_thread(
agent.create, context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp
)
except Exception as e:
logger.error(f"failed to run sync create with error {e}")
raise
logger.error(f"failed to run sync create with error {e}")
raise
except Exception as e:
logger.error(f"failed to create task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to create task with error {e}")
request_failure_count.labels(task_type=tmp.type, operation=create_operation).inc()

async def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse:
try:
agent = AgentRegistry.get_agent(request.task_type)
logger.info(f"{agent.task_type} agent start checking the status of the job")
if agent.asynchronous:
with request_latency.labels(task_type=request.task_type, operation="get").time():
agent = AgentRegistry.get_agent(request.task_type)
logger.info(f"{agent.task_type} agent start checking the status of the job")
if agent.asynchronous:
try:
res = await agent.async_get(context=context, resource_meta=request.resource_meta)
request_success_count.labels(task_type=request.task_type, operation=get_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async get with error {e}")
raise
try:
return await agent.async_get(context=context, resource_meta=request.resource_meta)
res = await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta)
request_success_count.labels(task_type=request.task_type, operation=get_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async get with error {e}")
logger.error(f"failed to run sync get with error {e}")
raise
try:
return await asyncio.to_thread(agent.get, context=context, resource_meta=request.resource_meta)
except Exception as e:
logger.error(f"failed to run sync get with error {e}")
raise
except Exception as e:
logger.error(f"failed to get task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to get task with error {e}")
request_failure_count.labels(task_type=request.task_type, operation=get_operation).inc()

async def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse:
try:
agent = AgentRegistry.get_agent(request.task_type)
logger.info(f"{agent.task_type} agent start deleting the job")
if agent.asynchronous:
with request_latency.labels(task_type=request.task_type, operation="delete").time():
agent = AgentRegistry.get_agent(request.task_type)
logger.info(f"{agent.task_type} agent start deleting the job")
if agent.asynchronous:
try:
res = await agent.async_delete(context=context, resource_meta=request.resource_meta)
request_success_count.labels(task_type=request.task_type, operation=delete_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async delete with error {e}")
raise
try:
return await agent.async_delete(context=context, resource_meta=request.resource_meta)
res = asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta)
request_success_count.labels(task_type=request.task_type, operation=delete_operation).inc()
return res
except Exception as e:
logger.error(f"failed to run async delete with error {e}")
logger.error(f"failed to run sync delete with error {e}")
raise
try:
return asyncio.to_thread(agent.delete, context=context, resource_meta=request.resource_meta)
except Exception as e:
logger.error(f"failed to run sync delete with error {e}")
raise
except Exception as e:
logger.error(f"failed to delete task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to delete task with error {e}")
request_failure_count.labels(task_type=request.task_type, operation=delete_operation).inc()