Skip to content

Commit

Permalink
Merge branch 'master' into no-warning
Browse files Browse the repository at this point in the history
  • Loading branch information
hamersaw authored Nov 10, 2023
2 parents 28bf92e + c4b040b commit 412792a
Show file tree
Hide file tree
Showing 15 changed files with 396 additions and 336 deletions.
15 changes: 0 additions & 15 deletions .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,9 @@ concurrency:

on:
pull_request:
paths:
- 'datacatalog/**'
- 'flyteadmin/**'
- 'flytecopilot/**'
- 'flyteplugins/**'
- 'flytepropeller/**'
- 'flytestdlib/**'
push:
branches:
- master
paths:
- 'datacatalog/**'
- 'flyteadmin/**'
- 'flytecopilot/**'
- 'flyteidl/**'
- 'flyteplugins/**'
- 'flytepropeller/**'
- 'flytestdlib/**'
env:
GO_VERSION: "1.19"
PRIORITIES: "P0"
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/flyteidl-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,9 @@ concurrency:

on:
pull_request:
paths:
- 'flyteidl/**'
push:
branches:
- master
paths:
- 'flyteidl/**'
env:
GO_VERSION: "1.19"
jobs:
Expand Down
111 changes: 88 additions & 23 deletions boilerplate/flyte/end2end/run-tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import sys
import time
import traceback
from typing import Dict, List, Mapping, Tuple
from typing import Dict, List, Mapping, Tuple, Optional

import click
import requests
Expand Down Expand Up @@ -50,15 +50,17 @@
("basics.named_outputs.simple_wf_with_named_outputs", {}),
# # Getting a 403 for the wikipedia image
# # ("basics.reference_task.wf", {}),
("data_types_and_io.custom_objects.wf", {"x": 10, "y": 20}),
("data_types_and_io.dataclass.dataclass_wf", {"x": 10, "y": 20}),
# Enums are not supported in flyteremote
# ("type_system.enums.enum_wf", {"c": "red"}),
("data_types_and_io.schema.df_wf", {"a": 42}),
("data_types_and_io.typed_schema.wf", {}),
("data_types_and_io.structured_dataset.simple_sd_wf", {"a": 42}),
# ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}),
],
"integrations-k8s-spark": [
("k8s_spark_plugin.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}),
(
"k8s_spark_plugin.pyspark_pi.my_spark",
{"triggered_date": datetime.datetime.now()},
),
],
"integrations-kfpytorch": [
("kfpytorch_plugin.pytorch_mnist.pytorch_training_wf", {}),
Expand Down Expand Up @@ -89,20 +91,30 @@
}


def execute_workflow(remote, version, workflow_name, inputs):
def execute_workflow(
remote: FlyteRemote,
version,
workflow_name,
inputs,
cluster_pool_name: Optional[str] = None,
):
print(f"Fetching workflow={workflow_name} and version={version}")
wf = remote.fetch_workflow(name=workflow_name, version=version)
return remote.execute(wf, inputs=inputs, wait=False)
return remote.execute(wf, inputs=inputs, wait=False, cluster_pool=cluster_pool_name)


def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool:
def executions_finished(
executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]
) -> bool:
for executions in executions_by_wfgroup.values():
if not all([execution.is_done for execution in executions]):
return False
return True


def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]):
def sync_executions(
remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]
):
try:
for executions in executions_by_wfgroup.values():
for execution in executions:
Expand All @@ -125,6 +137,7 @@ def schedule_workflow_groups(
workflow_groups: List[str],
remote: FlyteRemote,
terminate_workflow_on_failure: bool,
cluster_pool_name: Optional[str] = None,
) -> Dict[str, bool]:
"""
Schedule workflows executions for all workflow groups and return True if all executions succeed, otherwise
Expand All @@ -135,14 +148,19 @@ def schedule_workflow_groups(
for wf_group in workflow_groups:
workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(wf_group, [])
executions_by_wfgroup[wf_group] = [
execute_workflow(remote, tag, workflow[0], workflow[1]) for workflow in workflows
execute_workflow(remote, tag, workflow[0], workflow[1], cluster_pool_name)
for workflow in workflows
]

# Wait for all executions to finish
attempt = 0
while attempt == 0 or (not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS):
while attempt == 0 or (
not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS
):
attempt += 1
print(f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s")
print(
f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s"
)
time.sleep(WAIT_TIME)
sync_executions(remote, executions_by_wfgroup)

Expand All @@ -158,9 +176,13 @@ def schedule_workflow_groups(
if len(non_succeeded_executions) != 0:
print(f"Failed executions for {wf_group}:")
for execution in non_succeeded_executions:
print(f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}")
print(
f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}"
)
if terminate_workflow_on_failure:
remote.terminate(execution, "aborting execution scheduled in functional test")
remote.terminate(
execution, "aborting execution scheduled in functional test"
)
# A workflow group succeeds iff all of its executions succeed
results[wf_group] = len(non_succeeded_executions) == 0
return results
Expand All @@ -179,25 +201,33 @@ def run(
priorities: List[str],
config_file_path,
terminate_workflow_on_failure: bool,
test_project_name: str,
test_project_domain: str,
cluster_pool_name: Optional[str] = None,
) -> List[Dict[str, str]]:
remote = FlyteRemote(
Config.auto(config_file=config_file_path),
default_project="flytesnacks",
default_domain="development",
test_project_name,
test_project_domain,
)

# For a given release tag and priority, this function filters the workflow groups from the flytesnacks
# manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ].
manifest_url = (
"https://raw.githubusercontent.com/flyteorg/flytesnacks/" f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
"https://raw.githubusercontent.com/flyteorg/flytesnacks/"
f"{flytesnacks_release_tag}/flyte_tests_manifest.json"
)
r = requests.get(manifest_url)
parsed_manifest = r.json()
workflow_groups = []
workflow_groups = (
["lite"]
if "lite" in priorities
else [group["name"] for group in parsed_manifest if group["priority"] in priorities]
else [
group["name"]
for group in parsed_manifest
if group["priority"] in priorities
]
)

results = []
Expand All @@ -215,7 +245,11 @@ def run(
valid_workgroups.append(workflow_group)

results_by_wfgroup = schedule_workflow_groups(
flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure
flytesnacks_release_tag,
valid_workgroups,
remote,
terminate_workflow_on_failure,
cluster_pool_name,
)

for workflow_group, succeeded in results_by_wfgroup.items():
Expand Down Expand Up @@ -246,6 +280,9 @@ def run(


@click.command()
@click.argument("flytesnacks_release_tag")
@click.argument("priorities")
@click.argument("config_file")
@click.option(
"--return_non_zero_on_failure",
default=False,
Expand All @@ -258,18 +295,46 @@ def run(
is_flag=True,
help="Abort failing workflows upon exit",
)
@click.argument("flytesnacks_release_tag")
@click.argument("priorities")
@click.argument("config_file")
@click.option(
"--test_project_name",
default="flytesnacks",
type=str,
is_flag=False,
help="Name of project to run functional tests on",
)
@click.option(
"--test_project_domain",
default="development",
type=str,
is_flag=False,
help="Name of domain in project to run functional tests on",
)
@click.argument(
"cluster_pool_name",
required=False,
type=str,
default=None,
)
def cli(
flytesnacks_release_tag,
priorities,
config_file,
return_non_zero_on_failure,
terminate_workflow_on_failure,
test_project_name,
test_project_domain,
cluster_pool_name,
):
print(f"return_non_zero_on_failure={return_non_zero_on_failure}")
results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure)
results = run(
flytesnacks_release_tag,
priorities,
config_file,
terminate_workflow_on_failure,
test_project_name,
test_project_domain,
cluster_pool_name,
)

# Write a json object in its own line describing the result of this run to stdout
print(f"Result of run:\n{json.dumps(results)}")
Expand Down
25 changes: 7 additions & 18 deletions flyteplugins/go/tasks/logs/config.go
Original file line number Diff line number Diff line change
@@ -1,45 +1,34 @@
package logs

import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/config"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog"
)

//go:generate pflags LogConfig --default-var=DefaultConfig

// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates.
type TemplateURI = string

// LogConfig encapsulates plugins' log configs
type LogConfig struct {
IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"`
// Deprecated: Please use CloudwatchTemplateURI
CloudwatchRegion string `json:"cloudwatch-region" pflag:",AWS region in which Cloudwatch logs are stored."`
// Deprecated: Please use CloudwatchTemplateURI
CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."`
CloudwatchTemplateURI TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"`
CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."`
CloudwatchTemplateURI tasklog.TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"`

IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"`
// Deprecated: Please use KubernetesTemplateURI
KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
KubernetesTemplateURI TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"`
KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"`
KubernetesTemplateURI tasklog.TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"`

IsStackDriverEnabled bool `json:"stackdriver-enabled" pflag:",Enable Log-links to stackdriver"`
// Deprecated: Please use StackDriverTemplateURI
GCPProjectName string `json:"gcp-project" pflag:",Name of the project in GCP"`
// Deprecated: Please use StackDriverTemplateURI
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

Templates []TemplateLogPluginConfig `json:"templates" pflag:"-,"`
}
StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"`
StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"`

type TemplateLogPluginConfig struct {
DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."`
TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."`
MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."`
Scheme tasklog.TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."`
Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"`
}

var (
Expand Down
Loading

0 comments on commit 412792a

Please sign in to comment.