Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added metrics command to pyflyte #1513

Merged
merged 12 commits into from
May 15, 2023
9 changes: 9 additions & 0 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,15 @@ def get_execution_data(self, get_execution_data_request):
"""
return self._stub.GetExecutionData(get_execution_data_request, metadata=self._metadata)

def get_execution_metrics(self, get_execution_metrics_request):
"""
Returns metrics partitioning and categorizing the workflow execution time-series.

:param flyteidl.admin.execution_pb2.WorkflowExecutionGetMetricsRequest get_execution_metrics_request:
:rtype: flyteidl.admin.execution_pb2.WorkflowExecutionGetMetricsResponse
"""
return self._stub.GetExecutionMetrics(get_execution_metrics_request, metadata=self._metadata)

def list_executions_paginated(self, resource_list_request):
"""
Lists the executions for a given identifier.
Expand Down
218 changes: 218 additions & 0 deletions flytekit/clis/sdk_in_container/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
from datetime import datetime

import rich_click as click
import yaml
from flyteidl.admin.execution_pb2 import WorkflowExecutionGetMetricsRequest
from flyteidl.core.identifier_pb2 import WorkflowExecutionIdentifier

from flytekit.clis.sdk_in_container.constants import CTX_DOMAIN, CTX_PROJECT
from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context

CTX_DEPTH = "depth"

_dump_help = """
The dump command aggregates workflow execution metrics and displays them. This aggregation is meant to provide an easy
to understand breakdown of where time is spent in a hierarchical manner.

- execution_id refers to the id of the workflow execution
"""

_explain_help = """
The explain command prints each individual execution span and the associated timestamps and Flyte entity reference.
This breakdown provides precise information into exactly how and when Flyte processes a workflow execution.

- execution_id refers to the id of the workflow execution
"""


@click.group("metrics")
@click.option(
"-d",
"--depth",
required=False,
type=int,
default=-1,
help="The depth of Flyte entity hierarchy to traverse when computing metrics for this execution",
)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="The project of the workflow execution",
)
@click.option(
"-d",
"--domain",
required=False,
type=str,
default="development",
help="The domain of the workflow execution",
)
@click.pass_context
def metrics(ctx: click.Context, depth, domain, project):
ctx.obj[CTX_DEPTH] = depth
ctx.obj[CTX_DOMAIN] = domain
ctx.obj[CTX_PROJECT] = project


@click.command("dump", help=_dump_help)
@click.argument("execution_id", type=str)
@click.pass_context
def metrics_dump(
ctx: click.Context,
execution_id: str,
):
depth = ctx.obj[CTX_DEPTH]
domain = ctx.obj[CTX_DOMAIN]
project = ctx.obj[CTX_PROJECT]

# retrieve remote
remote = get_and_save_remote_with_click_context(ctx, project, domain)
sync_client = remote.client

# retrieve workflow execution metrics
workflow_execution_id = WorkflowExecutionIdentifier(project=project, domain=domain, name=execution_id)

request = WorkflowExecutionGetMetricsRequest(id=workflow_execution_id, depth=depth)
response = sync_client.get_execution_metrics(request)

# aggregate spans and print
id, info = aggregate_reference_span(response.span)
yaml.emitter.Emitter.process_tag = lambda self, *args, **kw: None
print(yaml.dump({id: info}, indent=2))


def aggregate_reference_span(span):
id = ""
id_type = span.WhichOneof("id")
if id_type == "workflow_id":
id = span.workflow_id.name
elif id_type == "node_id":
id = span.node_id.node_id
elif id_type == "task_id":
id = span.task_id.retry_attempt

spans = aggregate_spans(span.spans)
return id, spans


def aggregate_spans(spans):
breakdown = {}

tasks = {}
nodes = {}
workflows = {}

for span in spans:
id_type = span.WhichOneof("id")
if id_type == "operation_id":
operation_id = span.operation_id

start_time = datetime.fromtimestamp(span.start_time.seconds + span.start_time.nanos / 1e9)
end_time = datetime.fromtimestamp(span.end_time.seconds + span.end_time.nanos / 1e9)
total_time = (end_time - start_time).total_seconds()

if operation_id in breakdown:
breakdown[operation_id] += total_time
else:
breakdown[operation_id] = total_time
else:
id, underlying_span = aggregate_reference_span(span)

if id_type == "workflow_id":
workflows[id] = underlying_span
elif id_type == "node_id":
nodes[id] = underlying_span
elif id_type == "task_id":
tasks[id] = underlying_span

for operation_id, total_time in underlying_span["breakdown"].items():
if operation_id in breakdown:
breakdown[operation_id] += total_time
else:
breakdown[operation_id] = total_time

span = {"breakdown": breakdown}

if len(tasks) > 0:
span["task_attempts"] = tasks
if len(nodes) > 0:
span["nodes"] = nodes
if len(workflows) > 0:
span["workflows"] = workflows

return span


@click.command("explain", help=_explain_help)
@click.argument("execution_id", type=str)
@click.pass_context
def metrics_explain(
ctx: click.Context,
execution_id: str,
):
depth = ctx.obj[CTX_DEPTH]
domain = ctx.obj[CTX_DOMAIN]
project = ctx.obj[CTX_PROJECT]

# retrieve remote
remote = get_and_save_remote_with_click_context(ctx, project, domain)
sync_client = remote.client

# retrieve workflow execution metrics
workflow_execution_id = WorkflowExecutionIdentifier(project=project, domain=domain, name=execution_id)

request = WorkflowExecutionGetMetricsRequest(id=workflow_execution_id, depth=depth)
response = sync_client.get_execution_metrics(request)

# print execution spans
print(
"{:25s}{:25s}{:25s} {:>8s} {:s}".format(
"operation", "start_timestamp", "end_timestamp", "duration", "entity"
)
)
print("-" * 140)

print_span(response.span, -1, "")


def print_span(span, indent, identifier):
start_time = datetime.fromtimestamp(span.start_time.seconds + span.start_time.nanos / 1e9)
end_time = datetime.fromtimestamp(span.end_time.seconds + span.end_time.nanos / 1e9)

id_type = span.WhichOneof("id")
span_identifier = ""

if id_type == "operation_id":
indent_str = ""
for i in range(indent):
indent_str += " "

print(
"{:25s}{:25s}{:25s} {:7.2f}s {:s}{:s}".format(
span.operation_id,
start_time.strftime("%m-%d %H:%M:%S.%f"),
end_time.strftime("%m-%d %H:%M:%S.%f"),
(end_time - start_time).total_seconds(),
indent_str,
identifier,
)
)

span_identifier = identifier + "/" + span.operation_id
else:
if id_type == "workflow_id":
span_identifier = "workflow/" + span.workflow_id.name
elif id_type == "node_id":
span_identifier = "node/" + span.node_id.node_id
elif id_type == "task_id":
span_identifier = "task/" + str(span.task_id.retry_attempt)

for under_span in span.spans:
print_span(under_span, indent + 1, span_identifier)


metrics.add_command(metrics_dump)
metrics.add_command(metrics_explain)
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from flytekit.clis.sdk_in_container.init import init
from flytekit.clis.sdk_in_container.launchplan import launchplan
from flytekit.clis.sdk_in_container.local_cache import local_cache
from flytekit.clis.sdk_in_container.metrics import metrics
from flytekit.clis.sdk_in_container.package import package
from flytekit.clis.sdk_in_container.register import register
from flytekit.clis.sdk_in_container.run import run
Expand Down Expand Up @@ -136,6 +137,7 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: bool):
main.add_command(backfill)
main.add_command(serve)
main.add_command(build)
main.add_command(metrics)
main.add_command(launchplan)
main.epilog

Expand Down