Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[flyteagent][CLI] Make agent prometheus port configurable #3064

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions flytekit/clis/sdk_in_container/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ def serve(ctx: click.Context):
type=int,
help="Grpc port for the agent service",
)
@click.option(
"--prometheus_port",
default="9090",
is_flag=False,
type=int,
help="Grpc port for the agent service",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you!!

)
@click.option(
"--worker",
default="10",
Expand All @@ -45,20 +52,20 @@ def serve(ctx: click.Context):
"for testing.",
)
@click.pass_context
def agent(_: click.Context, port, worker, timeout):
def agent(_: click.Context, port, prometheus_port, worker, timeout):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding port validation checks

Consider validating the prometheus_port parameter to ensure it doesn't conflict with the main port parameter and is within valid port range (0-65535). A similar issue was also found in flytekit/clis/sdk_in_container/serve.py (line 83).

Code suggestion
Check the AI-generated fix before applying
 @@ -55,2 +55,6 @@
  def agent(_: click.Context, port, prometheus_port, worker, timeout):
 +    if not 0 <= prometheus_port <= 65535:
 +        raise ValueError(f'prometheus_port {prometheus_port} is not in valid range (0-65535)')
 +    if prometheus_port == port:
 +        raise ValueError(f'prometheus_port {prometheus_port} conflicts with main service port {port}')
      """

Code Review Run #9cfb00


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

"""
Start a grpc server for the agent service.
"""
import asyncio

asyncio.run(_start_grpc_server(port, worker, timeout))
asyncio.run(_start_grpc_server(port, prometheus_port, worker, timeout))


async def _start_grpc_server(port: int, worker: int, timeout: int):
async def _start_grpc_server(port: int, prometheus_port: int, worker: int, timeout: int):
from flytekit.extend.backend.agent_service import AgentMetadataService, AsyncAgentService, SyncAgentService

click.secho("🚀 Starting the agent service...")
_start_http_server()
_start_http_server(prometheus_port)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding error handling for port

Consider adding error handling around _start_http_server(prometheus_port) call to gracefully handle port binding failures.

Code suggestion
Check the AI-generated fix before applying
Suggested change
_start_http_server(prometheus_port)
try:
_start_http_server(prometheus_port)
except OSError as e:
click.secho(f"Failed to start prometheus server on port {prometheus_port}: {e}", fg="red")
raise

Code Review Run #9cfb00


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

print_agents_metadata()

server = grpc.aio.server(futures.ThreadPoolExecutor(max_workers=worker))
Expand All @@ -73,12 +80,12 @@ async def _start_grpc_server(port: int, worker: int, timeout: int):
await server.wait_for_termination(timeout)


def _start_http_server():
def _start_http_server(prometheus_port: int):
try:
from prometheus_client import start_http_server

click.secho("Starting up the server to expose the prometheus metrics...")
start_http_server(9090)
start_http_server(prometheus_port)
except ImportError as e:
click.secho(f"Failed to start the prometheus server with error {e}", fg="red")

Expand Down
32 changes: 32 additions & 0 deletions tests/flytekit/clis/sdk_in_container/test_serve.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import pytest
from click.testing import CliRunner
from unittest.mock import patch

from flytekit.clis.sdk_in_container.serve import serve

def test_agent_prometheus_port():
runner = CliRunner()
test_port = 9100
test_prometheus_port = 9200
test_worker = 5
test_timeout = 30

with patch('flytekit.clis.sdk_in_container.serve._start_grpc_server') as mock_start_grpc:
result = runner.invoke(
serve,
[
'agent',
'--port', str(test_port),
'--prometheus_port', str(test_prometheus_port),
'--worker', str(test_worker),
'--timeout', str(test_timeout)
]
)

assert result.exit_code == 0, f"Command failed with output: {result.output}"
mock_start_grpc.assert_called_once_with(
test_port,
test_prometheus_port,
test_worker,
test_timeout
)
Loading