Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport 1.6.1 and 1.6.2 to 1.2 #1729

Merged
merged 22 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d3bb8f8
Multi arch imageSpec (#1630)
pingsutw May 12, 2023
115cd45
Add executor_path and applications_path to spark config (#1634)
pingsutw May 14, 2023
174ead9
Add support for env vars to pyflyte run (#1617)
pingsutw May 15, 2023
4b0b7c2
Fetch task executions in dynamic (#1636)
pingsutw May 15, 2023
ebd916a
Added metrics command to pyflyte (#1513)
hamersaw May 15, 2023
88afc39
Add http_proxy to client & Fix deviceflow (#1611)
ByronHsu May 16, 2023
4e0fa7d
Improve variable names (#1642)
ByronHsu May 16, 2023
7bbd44b
Address resolution (#1567)
wild-endeavor May 17, 2023
dac0603
pyflyte run supports pickle (#1646)
pingsutw May 18, 2023
7bc7da5
Wait for the pod plugin instead of flytekit (#1647)
eapolinario May 18, 2023
f8d0acd
Beautify deviceflow prompt (#1625)
ByronHsu May 19, 2023
5c4142a
Improve flytekit register (#1643)
ByronHsu May 19, 2023
7e9067a
Pass verify flag to all authenticators (#1641)
ByronHsu May 19, 2023
10e3227
Allow annotated FlyteFile as task input argument (#1632)
AdrianoKF May 19, 2023
a7247e4
Use logger instead of print statement in sqlalchemy plugin (#1651)
wirthual May 22, 2023
4a272e2
Map over notebook task (#1650)
pingsutw May 24, 2023
bc3d06a
Support single literals in tiny url (#1654)
wild-endeavor May 24, 2023
c07e9e7
Add support overriding image (#1652)
pingsutw May 24, 2023
2dd2ee1
Fix ability to pass None to task with Optional kwarg, add test (#1657)
fg91 May 26, 2023
ed5ecdc
Regenerate plugins requirements
eapolinario Jul 12, 2023
d90c077
Regenerate plugins requirements and linting
eapolinario Jul 12, 2023
563a836
Regenerate whylogs requirements
eapolinario Jul 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pythonpublish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ jobs:
echo "No tagged version found, exiting"
exit 1
fi
LINK="https://pypi.org/project/flytekit/${VERSION}"
LINK="https://pypi.org/project/flytekitplugins-pod/${VERSION}"
for i in {1..60}; do
if curl -L -I -s -f ${LINK} >/dev/null; then
echo "Found pypi"
Expand Down
9 changes: 4 additions & 5 deletions flytekit/clients/auth/authenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from abc import abstractmethod
from dataclasses import dataclass

import click

from . import token_client
from .auth_client import AuthorizationClient
from .exceptions import AccessTokenNotFoundError, AuthenticationError
Expand Down Expand Up @@ -255,11 +257,8 @@ def refresh_credentials(self):
resp = token_client.get_device_code(
self._device_auth_endpoint, self._client_id, self._audience, self._scope, self._http_proxy_url, self._verify
)
print(
f"""
To Authenticate navigate in a browser to the following URL: {resp.verification_uri} and enter code: {resp.user_code}
"""
)
text = f"To Authenticate, navigate in a browser to the following URL: {click.style(resp.verification_uri, fg='blue', underline=True)} and enter code: {click.style(resp.user_code, fg='blue')}"
click.secho(text)
try:
# Currently the refresh token is not retreived. We may want to add support for refreshTokens so that
# access tokens can be refreshed for once authenticated machines
Expand Down
6 changes: 6 additions & 0 deletions flytekit/clients/friendly.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,3 +1018,9 @@ def get_download_signed_url(
expires_in=expires_in_pb,
)
)

def get_data(self, flyte_uri: str) -> _data_proxy_pb2.GetDataResponse:
req = _data_proxy_pb2.GetDataRequest(flyte_url=flyte_uri)

resp = self._dataproxy_stub.GetData(req, metadata=self._metadata)
return resp
19 changes: 14 additions & 5 deletions flytekit/clients/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,15 @@ def get_execution_data(self, get_execution_data_request):
"""
return self._stub.GetExecutionData(get_execution_data_request, metadata=self._metadata)

def get_execution_metrics(self, get_execution_metrics_request):
"""
Returns metrics partitioning and categorizing the workflow execution time-series.

:param flyteidl.admin.execution_pb2.WorkflowExecutionGetMetricsRequest get_execution_metrics_request:
:rtype: flyteidl.admin.execution_pb2.WorkflowExecutionGetMetricsResponse
"""
return self._stub.GetExecutionMetrics(get_execution_metrics_request, metadata=self._metadata)

def list_executions_paginated(self, resource_list_request):
"""
Lists the executions for a given identifier.
Expand Down Expand Up @@ -570,9 +579,9 @@ def create_upload_location(
def create_download_location(
self, create_download_location_request: _dataproxy_pb2.CreateDownloadLocationRequest
) -> _dataproxy_pb2.CreateDownloadLocationResponse:
"""
Get a signed url to be used during fast registration
:param flyteidl.service.dataproxy_pb2.CreateDownloadLocationRequest create_download_location_request:
:rtype: flyteidl.service.dataproxy_pb2.CreateDownloadLocationResponse
"""
return self._dataproxy_stub.CreateDownloadLocation(create_download_location_request, metadata=self._metadata)

def create_download_link(
self, create_download_link_request: _dataproxy_pb2.CreateDownloadLinkRequest
) -> _dataproxy_pb2.CreateDownloadLinkResponse:
return self._dataproxy_stub.CreateDownloadLink(create_download_link_request, metadata=self._metadata)
218 changes: 218 additions & 0 deletions flytekit/clis/sdk_in_container/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
from datetime import datetime

import rich_click as click
import yaml
from flyteidl.admin.execution_pb2 import WorkflowExecutionGetMetricsRequest
from flyteidl.core.identifier_pb2 import WorkflowExecutionIdentifier

from flytekit.clis.sdk_in_container.constants import CTX_DOMAIN, CTX_PROJECT
from flytekit.clis.sdk_in_container.helpers import get_and_save_remote_with_click_context

CTX_DEPTH = "depth"

_dump_help = """
The dump command aggregates workflow execution metrics and displays them. This aggregation is meant to provide an easy
to understand breakdown of where time is spent in a hierarchical manner.
- execution_id refers to the id of the workflow execution
"""

_explain_help = """
The explain command prints each individual execution span and the associated timestamps and Flyte entity reference.
This breakdown provides precise information into exactly how and when Flyte processes a workflow execution.
- execution_id refers to the id of the workflow execution
"""


@click.group("metrics")
@click.option(
"-d",
"--depth",
required=False,
type=int,
default=-1,
help="The depth of Flyte entity hierarchy to traverse when computing metrics for this execution",
)
@click.option(
"-p",
"--project",
required=False,
type=str,
default="flytesnacks",
help="The project of the workflow execution",
)
@click.option(
"-d",
"--domain",
required=False,
type=str,
default="development",
help="The domain of the workflow execution",
)
@click.pass_context
def metrics(ctx: click.Context, depth, domain, project):
ctx.obj[CTX_DEPTH] = depth
ctx.obj[CTX_DOMAIN] = domain
ctx.obj[CTX_PROJECT] = project


@click.command("dump", help=_dump_help)
@click.argument("execution_id", type=str)
@click.pass_context
def metrics_dump(
ctx: click.Context,
execution_id: str,
):
depth = ctx.obj[CTX_DEPTH]
domain = ctx.obj[CTX_DOMAIN]
project = ctx.obj[CTX_PROJECT]

# retrieve remote
remote = get_and_save_remote_with_click_context(ctx, project, domain)
sync_client = remote.client

# retrieve workflow execution metrics
workflow_execution_id = WorkflowExecutionIdentifier(project=project, domain=domain, name=execution_id)

request = WorkflowExecutionGetMetricsRequest(id=workflow_execution_id, depth=depth)
response = sync_client.get_execution_metrics(request)

# aggregate spans and print
id, info = aggregate_reference_span(response.span)
yaml.emitter.Emitter.process_tag = lambda self, *args, **kw: None
print(yaml.dump({id: info}, indent=2))


def aggregate_reference_span(span):
id = ""
id_type = span.WhichOneof("id")
if id_type == "workflow_id":
id = span.workflow_id.name
elif id_type == "node_id":
id = span.node_id.node_id
elif id_type == "task_id":
id = span.task_id.retry_attempt

spans = aggregate_spans(span.spans)
return id, spans


def aggregate_spans(spans):
breakdown = {}

tasks = {}
nodes = {}
workflows = {}

for span in spans:
id_type = span.WhichOneof("id")
if id_type == "operation_id":
operation_id = span.operation_id

start_time = datetime.fromtimestamp(span.start_time.seconds + span.start_time.nanos / 1e9)
end_time = datetime.fromtimestamp(span.end_time.seconds + span.end_time.nanos / 1e9)
total_time = (end_time - start_time).total_seconds()

if operation_id in breakdown:
breakdown[operation_id] += total_time
else:
breakdown[operation_id] = total_time
else:
id, underlying_span = aggregate_reference_span(span)

if id_type == "workflow_id":
workflows[id] = underlying_span
elif id_type == "node_id":
nodes[id] = underlying_span
elif id_type == "task_id":
tasks[id] = underlying_span

for operation_id, total_time in underlying_span["breakdown"].items():
if operation_id in breakdown:
breakdown[operation_id] += total_time
else:
breakdown[operation_id] = total_time

span = {"breakdown": breakdown}

if len(tasks) > 0:
span["task_attempts"] = tasks
if len(nodes) > 0:
span["nodes"] = nodes
if len(workflows) > 0:
span["workflows"] = workflows

return span


@click.command("explain", help=_explain_help)
@click.argument("execution_id", type=str)
@click.pass_context
def metrics_explain(
ctx: click.Context,
execution_id: str,
):
depth = ctx.obj[CTX_DEPTH]
domain = ctx.obj[CTX_DOMAIN]
project = ctx.obj[CTX_PROJECT]

# retrieve remote
remote = get_and_save_remote_with_click_context(ctx, project, domain)
sync_client = remote.client

# retrieve workflow execution metrics
workflow_execution_id = WorkflowExecutionIdentifier(project=project, domain=domain, name=execution_id)

request = WorkflowExecutionGetMetricsRequest(id=workflow_execution_id, depth=depth)
response = sync_client.get_execution_metrics(request)

# print execution spans
print(
"{:25s}{:25s}{:25s} {:>8s} {:s}".format(
"operation", "start_timestamp", "end_timestamp", "duration", "entity"
)
)
print("-" * 140)

print_span(response.span, -1, "")


def print_span(span, indent, identifier):
start_time = datetime.fromtimestamp(span.start_time.seconds + span.start_time.nanos / 1e9)
end_time = datetime.fromtimestamp(span.end_time.seconds + span.end_time.nanos / 1e9)

id_type = span.WhichOneof("id")
span_identifier = ""

if id_type == "operation_id":
indent_str = ""
for i in range(indent):
indent_str += " "

print(
"{:25s}{:25s}{:25s} {:7.2f}s {:s}{:s}".format(
span.operation_id,
start_time.strftime("%m-%d %H:%M:%S.%f"),
end_time.strftime("%m-%d %H:%M:%S.%f"),
(end_time - start_time).total_seconds(),
indent_str,
identifier,
)
)

span_identifier = identifier + "/" + span.operation_id
else:
if id_type == "workflow_id":
span_identifier = "workflow/" + span.workflow_id.name
elif id_type == "node_id":
span_identifier = "node/" + span.node_id.node_id
elif id_type == "task_id":
span_identifier = "task/" + str(span.task_id.retry_attempt)

for under_span in span.spans:
print_span(under_span, indent + 1, span_identifier)


metrics.add_command(metrics_dump)
metrics.add_command(metrics_explain)
2 changes: 2 additions & 0 deletions flytekit/clis/sdk_in_container/pyflyte.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from flytekit.clis.sdk_in_container.init import init
from flytekit.clis.sdk_in_container.launchplan import launchplan
from flytekit.clis.sdk_in_container.local_cache import local_cache
from flytekit.clis.sdk_in_container.metrics import metrics
from flytekit.clis.sdk_in_container.package import package
from flytekit.clis.sdk_in_container.register import register
from flytekit.clis.sdk_in_container.run import run
Expand Down Expand Up @@ -144,6 +145,7 @@ def main(ctx, pkgs: typing.List[str], config: str, verbose: bool):
main.add_command(backfill)
main.add_command(serve)
main.add_command(build)
main.add_command(metrics)
main.add_command(launchplan)
main.epilog

Expand Down
Loading