Skip to content

Commit

Permalink
Add health check servicer for agent (#2232)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Su <[email protected]>
Signed-off-by: Jan Fiedler <[email protected]>
  • Loading branch information
pingsutw authored and fiedlerNr9 committed Jul 25, 2024
1 parent 2402497 commit fb0c12f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 10 deletions.
2 changes: 1 addition & 1 deletion Dockerfile.agent
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ARG VERSION

RUN apt-get update && apt-get install build-essential -y

RUN pip install prometheus-client
RUN pip install prometheus-client grpcio-health-checking
RUN pip install --no-cache-dir -U flytekit==$VERSION \
flytekitplugins-airflow==$VERSION \
flytekitplugins-bigquery==$VERSION \
Expand Down
43 changes: 34 additions & 9 deletions flytekit/clis/sdk_in_container/serve.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from concurrent import futures

import grpc
import rich_click as click
from flyteidl.service import agent_pb2
from flyteidl.service.agent_pb2_grpc import (
add_AgentMetadataServiceServicer_to_server,
add_AsyncAgentServiceServicer_to_server,
add_SyncAgentServiceServicer_to_server,
)
from grpc import aio


@click.group("serve")
Expand Down Expand Up @@ -52,22 +53,46 @@ def agent(_: click.Context, port, worker, timeout):


async def _start_grpc_server(port: int, worker: int, timeout: int):
click.secho("Starting up the server to expose the prometheus metrics...", fg="blue")
from flytekit.extend.backend.agent_service import AgentMetadataService, AsyncAgentService, SyncAgentService

try:
from prometheus_client import start_http_server

start_http_server(9090)
except ImportError as e:
click.secho(f"Failed to start the prometheus server with error {e}", fg="red")
_start_http_server()
click.secho("Starting the agent service...", fg="blue")
server = aio.server(futures.ThreadPoolExecutor(max_workers=worker))
server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=worker))

add_AsyncAgentServiceServicer_to_server(AsyncAgentService(), server)
add_SyncAgentServiceServicer_to_server(SyncAgentService(), server)
add_AgentMetadataServiceServicer_to_server(AgentMetadataService(), server)
_start_health_check_server(server, worker)

server.add_insecure_port(f"[::]:{port}")
await server.start()
await server.wait_for_termination(timeout)


def _start_http_server():
try:
from prometheus_client import start_http_server

click.secho("Starting up the server to expose the prometheus metrics...", fg="blue")
start_http_server(9090)
except ImportError as e:
click.secho(f"Failed to start the prometheus server with error {e}", fg="red")


def _start_health_check_server(server: grpc.Server, worker: int):
try:
from grpc_health.v1 import health, health_pb2, health_pb2_grpc

health_servicer = health.HealthServicer(
experimental_non_blocking=True,
experimental_thread_pool=futures.ThreadPoolExecutor(max_workers=worker),
)

for service in agent_pb2.DESCRIPTOR.services_by_name.values():
health_servicer.set(service.full_name, health_pb2.HealthCheckResponse.SERVING)
health_servicer.set(health.SERVICE_NAME, health_pb2.HealthCheckResponse.SERVING)

health_pb2_grpc.add_HealthServicer_to_server(health_servicer, server)

except ImportError as e:
click.secho(f"Failed to start the health check servicer with error {e}", fg="red")

0 comments on commit fb0c12f

Please sign in to comment.