From dd91cbaee86f96582d79f8f2fda7a7f1b80f831e Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Wed, 8 Nov 2023 15:50:22 -0600 Subject: [PATCH 1/8] Update metadata in ArrayNode TaskExecutionEvents (#4355) * adding PluginIdentifier to TaskExecutionMetadata Signed-off-by: Daniel Rammer * sending input and output metadata - removing validating check on flyteadmin temporarily Signed-off-by: Daniel Rammer * readded task execution id verification Signed-off-by: Daniel Rammer * removed dead code Signed-off-by: Daniel Rammer --------- Signed-off-by: Daniel Rammer --- .../controller/nodes/array/event_recorder.go | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/flytepropeller/pkg/controller/nodes/array/event_recorder.go b/flytepropeller/pkg/controller/nodes/array/event_recorder.go index 55cbbce89f..c4e3004011 100644 --- a/flytepropeller/pkg/controller/nodes/array/event_recorder.go +++ b/flytepropeller/pkg/controller/nodes/array/event_recorder.go @@ -120,11 +120,39 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte OccurredAt: occurredAt, Metadata: &event.TaskExecutionMetadata{ ExternalResources: e.externalResources, + PluginIdentifier: "container", }, TaskType: "k8s-array", EventVersion: 1, } + // only attach input values if taskPhase is QUEUED meaning this the first evaluation + if taskPhase == idlcore.TaskExecution_QUEUED { + if eventConfig.RawOutputPolicy == config.RawOutputPolicyInline { + // pass inputs by value + literalMap, err := nCtx.InputReader().Get(ctx) + if err != nil { + return err + } + + taskExecutionEvent.InputValue = &event.TaskExecutionEvent_InputData{ + InputData: literalMap, + } + } else { + // pass inputs by reference + taskExecutionEvent.InputValue = &event.TaskExecutionEvent_InputUri{ + InputUri: nCtx.InputReader().GetInputPath().String(), + } + } + } + + // only attach output uri if taskPhase is SUCCEEDED + if taskPhase == idlcore.TaskExecution_SUCCEEDED { + taskExecutionEvent.OutputResult = &event.TaskExecutionEvent_OutputUri{ + OutputUri: v1alpha1.GetOutputsFile(nCtx.NodeStatus().GetOutputDir()).String(), + } + } + // record TaskExecutionEvent return e.EventRecorder.RecordTaskEvent(ctx, taskExecutionEvent, eventConfig) } From fee63b6563d80539a9df21386c0a7e0acf5e73ea Mon Sep 17 00:00:00 2001 From: "Thomas J. Fan" Date: Thu, 9 Nov 2023 09:00:03 -0500 Subject: [PATCH 2/8] Fixes list formatting in flytepropeller arch docs (#4345) Signed-off-by: Thomas J. Fan --- .../component_architecture/flytepropeller_architecture.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/rsts/concepts/component_architecture/flytepropeller_architecture.rst b/rsts/concepts/component_architecture/flytepropeller_architecture.rst index 4fa3cc2cd6..a04f6dbe4d 100644 --- a/rsts/concepts/component_architecture/flytepropeller_architecture.rst +++ b/rsts/concepts/component_architecture/flytepropeller_architecture.rst @@ -13,6 +13,7 @@ Introduction ============ A Flyte :ref:`workflow ` is represented as a Directed Acyclic Graph (DAG) of interconnected Nodes. Flyte supports a robust collection of Node types to ensure diverse functionality. + - ``TaskNodes`` support a plugin system to externally add system integrations. - Control flow can be altered during runtime using ``BranchNodes``, which prune downstream evaluation paths based on input. - ``DynamicNodes`` add nodes to the DAG. From 243a8cb3e38b09307c40bc24b206f408a1192d9f Mon Sep 17 00:00:00 2001 From: Dan Rammer Date: Thu, 9 Nov 2023 10:48:51 -0600 Subject: [PATCH 3/8] update boilerplate end2end tests (#4393) Signed-off-by: Daniel Rammer --- boilerplate/flyte/end2end/run-tests.py | 111 ++++++++++++++++++++----- 1 file changed, 88 insertions(+), 23 deletions(-) diff --git a/boilerplate/flyte/end2end/run-tests.py b/boilerplate/flyte/end2end/run-tests.py index 5365da006e..9f8a8e85cb 100644 --- a/boilerplate/flyte/end2end/run-tests.py +++ b/boilerplate/flyte/end2end/run-tests.py @@ -5,7 +5,7 @@ import sys import time import traceback -from typing import Dict, List, Mapping, Tuple +from typing import Dict, List, Mapping, Tuple, Optional import click import requests @@ -50,15 +50,17 @@ ("basics.named_outputs.simple_wf_with_named_outputs", {}), # # Getting a 403 for the wikipedia image # # ("basics.reference_task.wf", {}), - ("data_types_and_io.custom_objects.wf", {"x": 10, "y": 20}), + ("data_types_and_io.dataclass.dataclass_wf", {"x": 10, "y": 20}), # Enums are not supported in flyteremote # ("type_system.enums.enum_wf", {"c": "red"}), - ("data_types_and_io.schema.df_wf", {"a": 42}), - ("data_types_and_io.typed_schema.wf", {}), + ("data_types_and_io.structured_dataset.simple_sd_wf", {"a": 42}), # ("my.imperative.workflow.example", {"in1": "hello", "in2": "foo"}), ], "integrations-k8s-spark": [ - ("k8s_spark_plugin.pyspark_pi.my_spark", {"triggered_date": datetime.datetime.now()}), + ( + "k8s_spark_plugin.pyspark_pi.my_spark", + {"triggered_date": datetime.datetime.now()}, + ), ], "integrations-kfpytorch": [ ("kfpytorch_plugin.pytorch_mnist.pytorch_training_wf", {}), @@ -89,20 +91,30 @@ } -def execute_workflow(remote, version, workflow_name, inputs): +def execute_workflow( + remote: FlyteRemote, + version, + workflow_name, + inputs, + cluster_pool_name: Optional[str] = None, +): print(f"Fetching workflow={workflow_name} and version={version}") wf = remote.fetch_workflow(name=workflow_name, version=version) - return remote.execute(wf, inputs=inputs, wait=False) + return remote.execute(wf, inputs=inputs, wait=False, cluster_pool=cluster_pool_name) -def executions_finished(executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]) -> bool: +def executions_finished( + executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]] +) -> bool: for executions in executions_by_wfgroup.values(): if not all([execution.is_done for execution in executions]): return False return True -def sync_executions(remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]]): +def sync_executions( + remote: FlyteRemote, executions_by_wfgroup: Dict[str, List[FlyteWorkflowExecution]] +): try: for executions in executions_by_wfgroup.values(): for execution in executions: @@ -125,6 +137,7 @@ def schedule_workflow_groups( workflow_groups: List[str], remote: FlyteRemote, terminate_workflow_on_failure: bool, + cluster_pool_name: Optional[str] = None, ) -> Dict[str, bool]: """ Schedule workflows executions for all workflow groups and return True if all executions succeed, otherwise @@ -135,14 +148,19 @@ def schedule_workflow_groups( for wf_group in workflow_groups: workflows = FLYTESNACKS_WORKFLOW_GROUPS.get(wf_group, []) executions_by_wfgroup[wf_group] = [ - execute_workflow(remote, tag, workflow[0], workflow[1]) for workflow in workflows + execute_workflow(remote, tag, workflow[0], workflow[1], cluster_pool_name) + for workflow in workflows ] # Wait for all executions to finish attempt = 0 - while attempt == 0 or (not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS): + while attempt == 0 or ( + not executions_finished(executions_by_wfgroup) and attempt < MAX_ATTEMPTS + ): attempt += 1 - print(f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s") + print( + f"Not all executions finished yet. Sleeping for some time, will check again in {WAIT_TIME}s" + ) time.sleep(WAIT_TIME) sync_executions(remote, executions_by_wfgroup) @@ -158,9 +176,13 @@ def schedule_workflow_groups( if len(non_succeeded_executions) != 0: print(f"Failed executions for {wf_group}:") for execution in non_succeeded_executions: - print(f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}") + print( + f" workflow={execution.spec.launch_plan.name}, execution_id={execution.id.name}" + ) if terminate_workflow_on_failure: - remote.terminate(execution, "aborting execution scheduled in functional test") + remote.terminate( + execution, "aborting execution scheduled in functional test" + ) # A workflow group succeeds iff all of its executions succeed results[wf_group] = len(non_succeeded_executions) == 0 return results @@ -179,17 +201,21 @@ def run( priorities: List[str], config_file_path, terminate_workflow_on_failure: bool, + test_project_name: str, + test_project_domain: str, + cluster_pool_name: Optional[str] = None, ) -> List[Dict[str, str]]: remote = FlyteRemote( Config.auto(config_file=config_file_path), - default_project="flytesnacks", - default_domain="development", + test_project_name, + test_project_domain, ) # For a given release tag and priority, this function filters the workflow groups from the flytesnacks # manifest file. For example, for the release tag "v0.2.224" and the priority "P0" it returns [ "core" ]. manifest_url = ( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/" f"{flytesnacks_release_tag}/flyte_tests_manifest.json" + "https://raw.githubusercontent.com/flyteorg/flytesnacks/" + f"{flytesnacks_release_tag}/flyte_tests_manifest.json" ) r = requests.get(manifest_url) parsed_manifest = r.json() @@ -197,7 +223,11 @@ def run( workflow_groups = ( ["lite"] if "lite" in priorities - else [group["name"] for group in parsed_manifest if group["priority"] in priorities] + else [ + group["name"] + for group in parsed_manifest + if group["priority"] in priorities + ] ) results = [] @@ -215,7 +245,11 @@ def run( valid_workgroups.append(workflow_group) results_by_wfgroup = schedule_workflow_groups( - flytesnacks_release_tag, valid_workgroups, remote, terminate_workflow_on_failure + flytesnacks_release_tag, + valid_workgroups, + remote, + terminate_workflow_on_failure, + cluster_pool_name, ) for workflow_group, succeeded in results_by_wfgroup.items(): @@ -246,6 +280,9 @@ def run( @click.command() +@click.argument("flytesnacks_release_tag") +@click.argument("priorities") +@click.argument("config_file") @click.option( "--return_non_zero_on_failure", default=False, @@ -258,18 +295,46 @@ def run( is_flag=True, help="Abort failing workflows upon exit", ) -@click.argument("flytesnacks_release_tag") -@click.argument("priorities") -@click.argument("config_file") +@click.option( + "--test_project_name", + default="flytesnacks", + type=str, + is_flag=False, + help="Name of project to run functional tests on", +) +@click.option( + "--test_project_domain", + default="development", + type=str, + is_flag=False, + help="Name of domain in project to run functional tests on", +) +@click.argument( + "cluster_pool_name", + required=False, + type=str, + default=None, +) def cli( flytesnacks_release_tag, priorities, config_file, return_non_zero_on_failure, terminate_workflow_on_failure, + test_project_name, + test_project_domain, + cluster_pool_name, ): print(f"return_non_zero_on_failure={return_non_zero_on_failure}") - results = run(flytesnacks_release_tag, priorities, config_file, terminate_workflow_on_failure) + results = run( + flytesnacks_release_tag, + priorities, + config_file, + terminate_workflow_on_failure, + test_project_name, + test_project_domain, + cluster_pool_name, + ) # Write a json object in its own line describing the result of this run to stdout print(f"Result of run:\n{json.dumps(results)}") From 7712626d9cc638728bf471c61148fb8e22c63e44 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Fri, 10 Nov 2023 09:07:23 -0700 Subject: [PATCH 4/8] Handle all ray job statuses (#4389) Signed-off-by: Haytham Abuelfutuh --- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index cc8d198334..1d0fde4ca8 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -470,7 +470,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont return pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "Scheduling"), nil } - // Kuberay creates a Ray cluster first, and then submits a Ray job to the cluster + // KubeRay creates a Ray cluster first, and then submits a Ray job to the cluster switch rayJob.Status.JobDeploymentStatus { case rayv1alpha1.JobDeploymentStatusInitializing: return pluginsCore.PhaseInfoInitializing(rayJob.CreationTimestamp.Time, pluginsCore.DefaultPhaseVersion, "cluster is creating", info), nil @@ -480,7 +480,7 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont case rayv1alpha1.JobDeploymentStatusFailedJobDeploy: reason := fmt.Sprintf("Failed to submit Ray job %s with error: %s", rayJob.Name, rayJob.Status.Message) return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil - case rayv1alpha1.JobDeploymentStatusWaitForDashboard: + case rayv1alpha1.JobDeploymentStatusWaitForDashboard, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil case rayv1alpha1.JobDeploymentStatusRunning, rayv1alpha1.JobDeploymentStatusComplete: switch rayJob.Status.JobStatus { @@ -491,10 +491,19 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont return pluginsCore.PhaseInfoSuccess(info), nil case rayv1alpha1.JobStatusPending, rayv1alpha1.JobStatusRunning: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil + case rayv1alpha1.JobStatusStopped: + // There is no current usage of this job status in KubeRay. It's unclear what it represents + fallthrough + default: + // We already handle all known job status, so this should never happen unless a future version of ray + // introduced a new job status. + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("unknown job status: %s", rayJob.Status.JobStatus) } + default: + // We already handle all known deployment status, so this should never happen unless a future version of ray + // introduced a new job status. + return pluginsCore.PhaseInfoUndefined, fmt.Errorf("unknown job deployment status: %s", rayJob.Status.JobDeploymentStatus) } - - return pluginsCore.PhaseInfoUndefined, nil } func init() { From 48f53f0995c82568fb10ebf3ffead16c10424e99 Mon Sep 17 00:00:00 2001 From: David Espejo <82604841+davidmirror-ops@users.noreply.github.com> Date: Fri, 10 Nov 2023 11:36:41 -0500 Subject: [PATCH 5/8] Fix YAML indentation for sandbox config (#4385) Signed-off-by: davidmirror-ops --- rsts/deployment/deployment/sandbox.rst | 30 ++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/rsts/deployment/deployment/sandbox.rst b/rsts/deployment/deployment/sandbox.rst index 5c40eea5eb..46809ae4a3 100644 --- a/rsts/deployment/deployment/sandbox.rst +++ b/rsts/deployment/deployment/sandbox.rst @@ -78,5 +78,35 @@ who wish to dig deeper into the storage layer. 🐋 Flyte sandbox ships with a Docker registry. Tag and push custom workflow images to localhost:30000 📂 The Minio API is hosted on localhost:30002. Use http://localhost:30080/minio/login for Minio console +Configuration +______________ + +The ``config-sandbox.yaml`` file contains configuration for **FlyteAdmin**, +which is the Flyte cluster backend component that processes all client requests +such as workflow executions. The default values are enough to let you connect and use Flyte: + + +.. code-block:: yaml + + admin: + # For GRPC endpoints you might want to use dns:///flyte.myexample.com + endpoint: localhost:30080 + authType: Pkce + insecure: true + console: + endpoint: http://localhost:30080 + logger: + show-source: true + level: 0 + +.. note:: + + You can also create your own config file with `flytectl config init`, which + will create a config file at `~/.flyte/config.yaml`. + + Learn more about the configuration settings in the + {ref}`Deployment Guide ` + + Now that you have the sandbox cluster running, you can now go to the :ref:`User Guide ` or :ref:`Tutorials ` to run tasks and workflows written in ``flytekit``, the Python SDK for Flyte. \ No newline at end of file From 09cb3b1d869a999281affb2ff5fe4358937bba75 Mon Sep 17 00:00:00 2001 From: Jeev B Date: Fri, 10 Nov 2023 11:14:40 -0800 Subject: [PATCH 6/8] Refactor task logs framework (#4396) * Refactor task logs framework Signed-off-by: Jeev B * Return templateLogPluginCollection instead of nil even if no plugins are specified Signed-off-by: Jeev B --------- Signed-off-by: Jeev B --- flyteplugins/go/tasks/logs/config.go | 25 +- flyteplugins/go/tasks/logs/logging_utils.go | 53 ++-- .../go/tasks/logs/logging_utils_test.go | 2 +- .../tasks/pluginmachinery/tasklog/plugin.go | 10 + .../tasks/pluginmachinery/tasklog/template.go | 57 +--- .../pluginmachinery/tasklog/template_test.go | 243 ++++++------------ 6 files changed, 128 insertions(+), 262 deletions(-) diff --git a/flyteplugins/go/tasks/logs/config.go b/flyteplugins/go/tasks/logs/config.go index 69ef17ed89..ca5a6012a8 100644 --- a/flyteplugins/go/tasks/logs/config.go +++ b/flyteplugins/go/tasks/logs/config.go @@ -1,45 +1,34 @@ package logs import ( - "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteplugins/go/tasks/config" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog" ) //go:generate pflags LogConfig --default-var=DefaultConfig -// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. -type TemplateURI = string - // LogConfig encapsulates plugins' log configs type LogConfig struct { IsCloudwatchEnabled bool `json:"cloudwatch-enabled" pflag:",Enable Cloudwatch Logging"` // Deprecated: Please use CloudwatchTemplateURI CloudwatchRegion string `json:"cloudwatch-region" pflag:",AWS region in which Cloudwatch logs are stored."` // Deprecated: Please use CloudwatchTemplateURI - CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."` - CloudwatchTemplateURI TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"` + CloudwatchLogGroup string `json:"cloudwatch-log-group" pflag:",Log group to which streams are associated."` + CloudwatchTemplateURI tasklog.TemplateURI `json:"cloudwatch-template-uri" pflag:",Template Uri to use when building cloudwatch log links"` IsKubernetesEnabled bool `json:"kubernetes-enabled" pflag:",Enable Kubernetes Logging"` // Deprecated: Please use KubernetesTemplateURI - KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"` - KubernetesTemplateURI TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"` + KubernetesURL string `json:"kubernetes-url" pflag:",Console URL for Kubernetes logs"` + KubernetesTemplateURI tasklog.TemplateURI `json:"kubernetes-template-uri" pflag:",Template Uri to use when building kubernetes log links"` IsStackDriverEnabled bool `json:"stackdriver-enabled" pflag:",Enable Log-links to stackdriver"` // Deprecated: Please use StackDriverTemplateURI GCPProjectName string `json:"gcp-project" pflag:",Name of the project in GCP"` // Deprecated: Please use StackDriverTemplateURI - StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"` - StackDriverTemplateURI TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"` - - Templates []TemplateLogPluginConfig `json:"templates" pflag:"-,"` -} + StackdriverLogResourceName string `json:"stackdriver-logresourcename" pflag:",Name of the logresource in stackdriver"` + StackDriverTemplateURI tasklog.TemplateURI `json:"stackdriver-template-uri" pflag:",Template Uri to use when building stackdriver log links"` -type TemplateLogPluginConfig struct { - DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."` - TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."` - MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."` - Scheme tasklog.TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."` + Templates []tasklog.TemplateLogPlugin `json:"templates" pflag:"-,"` } var ( diff --git a/flyteplugins/go/tasks/logs/logging_utils.go b/flyteplugins/go/tasks/logs/logging_utils.go index 6af1889e9f..4978109458 100644 --- a/flyteplugins/go/tasks/logs/logging_utils.go +++ b/flyteplugins/go/tasks/logs/logging_utils.go @@ -13,11 +13,6 @@ import ( "github.com/flyteorg/flyte/flytestdlib/logger" ) -type logPlugin struct { - Name string - Plugin tasklog.Plugin -} - // Internal func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, taskExecID pluginsCore.TaskExecutionID, pod *v1.Pod, index uint32, nameSuffix string, extraLogTemplateVarsByScheme *tasklog.TemplateVarsByScheme) ([]*core.TaskLog, error) { if logPlugin == nil { @@ -66,67 +61,53 @@ func GetLogsForContainerInPod(ctx context.Context, logPlugin tasklog.Plugin, tas return logs.TaskLogs, nil } -type taskLogPluginWrapper struct { - logPlugins []logPlugin +type templateLogPluginCollection struct { + plugins []tasklog.TemplateLogPlugin } -func (t taskLogPluginWrapper) GetTaskLogs(input tasklog.Input) (logOutput tasklog.Output, err error) { - logs := make([]*core.TaskLog, 0, len(t.logPlugins)) - suffix := input.LogName +func (t templateLogPluginCollection) GetTaskLogs(input tasklog.Input) (tasklog.Output, error) { + var taskLogs []*core.TaskLog - for _, plugin := range t.logPlugins { - input.LogName = plugin.Name + suffix - o, err := plugin.Plugin.GetTaskLogs(input) + for _, plugin := range t.plugins { + o, err := plugin.GetTaskLogs(input) if err != nil { return tasklog.Output{}, err } - - logs = append(logs, o.TaskLogs...) + taskLogs = append(taskLogs, o.TaskLogs...) } - return tasklog.Output{ - TaskLogs: logs, - }, nil + return tasklog.Output{TaskLogs: taskLogs}, nil } // InitializeLogPlugins initializes log plugin based on config. func InitializeLogPlugins(cfg *LogConfig) (tasklog.Plugin, error) { // Use a list to maintain order. - logPlugins := make([]logPlugin, 0, 2) + var plugins []tasklog.TemplateLogPlugin if cfg.IsKubernetesEnabled { if len(cfg.KubernetesTemplateURI) > 0 { - logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.KubernetesTemplateURI}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.KubernetesTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - logPlugins = append(logPlugins, logPlugin{Name: "Kubernetes Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Kubernetes Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("%s/#!/log/{{ .namespace }}/{{ .podName }}/pod?namespace={{ .namespace }}", cfg.KubernetesURL)}, MessageFormat: core.TaskLog_JSON}) } } if cfg.IsCloudwatchEnabled { if len(cfg.CloudwatchTemplateURI) > 0 { - logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.CloudwatchTemplateURI}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.CloudwatchTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - logPlugins = append(logPlugins, logPlugin{Name: "Cloudwatch Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Cloudwatch Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.aws.amazon.com/cloudwatch/home?region=%s#logEventViewer:group=%s;stream=var.log.containers.{{ .podName }}_{{ .namespace }}_{{ .containerName }}-{{ .containerId }}.log", cfg.CloudwatchRegion, cfg.CloudwatchLogGroup)}, MessageFormat: core.TaskLog_JSON}) } } if cfg.IsStackDriverEnabled { if len(cfg.StackDriverTemplateURI) > 0 { - logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{cfg.StackDriverTemplateURI}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{cfg.StackDriverTemplateURI}, MessageFormat: core.TaskLog_JSON}) } else { - logPlugins = append(logPlugins, logPlugin{Name: "Stackdriver Logs", Plugin: tasklog.NewTemplateLogPlugin(tasklog.TemplateSchemePod, []string{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, core.TaskLog_JSON)}) + plugins = append(plugins, tasklog.TemplateLogPlugin{DisplayName: "Stackdriver Logs", Scheme: tasklog.TemplateSchemePod, TemplateURIs: []tasklog.TemplateURI{fmt.Sprintf("https://console.cloud.google.com/logs/viewer?project=%s&angularJsUrl=%%2Flogs%%2Fviewer%%3Fproject%%3D%s&resource=%s&advancedFilter=resource.labels.pod_name%%3D{{ .podName }}", cfg.GCPProjectName, cfg.GCPProjectName, cfg.StackdriverLogResourceName)}, MessageFormat: core.TaskLog_JSON}) } } - if len(cfg.Templates) > 0 { - for _, cfg := range cfg.Templates { - logPlugins = append(logPlugins, logPlugin{Name: cfg.DisplayName, Plugin: tasklog.NewTemplateLogPlugin(cfg.Scheme, cfg.TemplateURIs, cfg.MessageFormat)}) - } - } - - if len(logPlugins) == 0 { - return nil, nil - } - - return taskLogPluginWrapper{logPlugins: logPlugins}, nil + plugins = append(plugins, cfg.Templates...) + return templateLogPluginCollection{plugins: plugins}, nil } diff --git a/flyteplugins/go/tasks/logs/logging_utils_test.go b/flyteplugins/go/tasks/logs/logging_utils_test.go index 066fdd96c8..91048eff16 100644 --- a/flyteplugins/go/tasks/logs/logging_utils_test.go +++ b/flyteplugins/go/tasks/logs/logging_utils_test.go @@ -320,7 +320,7 @@ func assertTestSucceeded(tb testing.TB, config *LogConfig, expectedTaskLogs []*c func TestGetLogsForContainerInPod_Templates(t *testing.T) { assertTestSucceeded(t, &LogConfig{ - Templates: []TemplateLogPluginConfig{ + Templates: []tasklog.TemplateLogPlugin{ { DisplayName: "StackDriver", TemplateURIs: []string{ diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go index b812221f6d..c43da02e58 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/plugin.go @@ -16,6 +16,9 @@ const ( TemplateSchemeTaskExecution ) +// TemplateURI is a URI that accepts templates. See: go/tasks/pluginmachinery/tasklog/template.go for available templates. +type TemplateURI = string + type TemplateVar struct { Regex *regexp.Regexp Value string @@ -57,3 +60,10 @@ type Plugin interface { // Generates a TaskLog object given necessary computation information GetTaskLogs(i Input) (logs Output, err error) } + +type TemplateLogPlugin struct { + DisplayName string `json:"displayName" pflag:",Display name for the generated log when displayed in the console."` + TemplateURIs []TemplateURI `json:"templateUris" pflag:",URI Templates for generating task log links."` + MessageFormat core.TaskLog_MessageFormat `json:"messageFormat" pflag:",Log Message Format."` + Scheme TemplateScheme `json:"scheme" pflag:",Templating scheme to use. Supported values are Pod and TaskExecution."` +} diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go index 77c49d2695..750b1972df 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template.go @@ -34,6 +34,7 @@ type templateRegexes struct { ExecutionName *regexp.Regexp ExecutionProject *regexp.Regexp ExecutionDomain *regexp.Regexp + GeneratedName *regexp.Regexp } func initDefaultRegexes() templateRegexes { @@ -58,6 +59,7 @@ func initDefaultRegexes() templateRegexes { MustCreateRegex("executionName"), MustCreateRegex("executionProject"), MustCreateRegex("executionDomain"), + MustCreateRegex("generatedName"), } } @@ -121,6 +123,10 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { defaultRegexes.NodeID, input.TaskExecutionID.GetUniqueNodeID(), }, + TemplateVar{ + defaultRegexes.GeneratedName, + input.TaskExecutionID.GetGeneratedName(), + }, TemplateVar{ defaultRegexes.TaskRetryAttempt, strconv.FormatUint(uint64(taskExecutionIdentifier.RetryAttempt), 10), @@ -172,55 +178,16 @@ func (input Input) templateVarsForScheme(scheme TemplateScheme) TemplateVars { return vars } -// A simple log plugin that supports templates in urls to build the final log link. -// See `defaultRegexes` for supported templates. -type TemplateLogPlugin struct { - scheme TemplateScheme - templateUris []string - messageFormat core.TaskLog_MessageFormat -} - -func (s TemplateLogPlugin) GetTaskLog(podName, podUID, namespace, containerName, containerID, logName string, podRFC3339StartTime string, podRFC3339FinishTime string, podUnixStartTime, podUnixFinishTime int64) (core.TaskLog, error) { - o, err := s.GetTaskLogs(Input{ - LogName: logName, - Namespace: namespace, - PodName: podName, - PodUID: podUID, - ContainerName: containerName, - ContainerID: containerID, - PodRFC3339StartTime: podRFC3339StartTime, - PodRFC3339FinishTime: podRFC3339FinishTime, - PodUnixStartTime: podUnixStartTime, - PodUnixFinishTime: podUnixFinishTime, - }) - - if err != nil || len(o.TaskLogs) == 0 { - return core.TaskLog{}, err - } - - return *o.TaskLogs[0], nil -} - -func (s TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { - templateVars := input.templateVarsForScheme(s.scheme) - taskLogs := make([]*core.TaskLog, 0, len(s.templateUris)) - for _, templateURI := range s.templateUris { +func (p TemplateLogPlugin) GetTaskLogs(input Input) (Output, error) { + templateVars := input.templateVarsForScheme(p.Scheme) + taskLogs := make([]*core.TaskLog, 0, len(p.TemplateURIs)) + for _, templateURI := range p.TemplateURIs { taskLogs = append(taskLogs, &core.TaskLog{ Uri: replaceAll(templateURI, templateVars), - Name: input.LogName, - MessageFormat: s.messageFormat, + Name: p.DisplayName + input.LogName, + MessageFormat: p.MessageFormat, }) } return Output{TaskLogs: taskLogs}, nil } - -// NewTemplateLogPlugin creates a template-based log plugin with the provided template Uri and message format. -// See `defaultRegexes` for supported templates. -func NewTemplateLogPlugin(scheme TemplateScheme, templateUris []string, messageFormat core.TaskLog_MessageFormat) TemplateLogPlugin { - return TemplateLogPlugin{ - scheme: scheme, - templateUris: templateUris, - messageFormat: messageFormat, - } -} diff --git a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go index 320ece05a4..f279707a3b 100644 --- a/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go +++ b/flyteplugins/go/tasks/pluginmachinery/tasklog/template_test.go @@ -5,7 +5,6 @@ import ( "regexp" "testing" - "github.com/go-test/deep" "github.com/stretchr/testify/assert" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" @@ -13,26 +12,6 @@ import ( coreMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/core/mocks" ) -func TestTemplateLog(t *testing.T) { - p := NewTemplateLogPlugin(TemplateSchemePod, []string{"https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.{{.podName}}_{{.podUID}}_{{.namespace}}_{{.containerName}}-{{.containerId}}.log"}, core.TaskLog_JSON) - tl, err := p.GetTaskLog( - "f-uuid-driver", - "pod-uid", - "flyteexamples-production", - "spark-kubernetes-driver", - "cri-o://abc", - "main_logs", - "2015-03-14T17:08:14+01:00", - "2021-06-15T20:47:57+02:00", - 1426349294, - 1623782877, - ) - assert.NoError(t, err) - assert.Equal(t, tl.GetName(), "main_logs") - assert.Equal(t, tl.GetMessageFormat(), core.TaskLog_JSON) - assert.Equal(t, "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.f-uuid-driver_pod-uid_flyteexamples-production_spark-kubernetes-driver-abc.log", tl.Uri) -} - // Latest Run: Benchmark_mustInitTemplateRegexes-16 45960 26914 ns/op func Benchmark_initDefaultRegexes(b *testing.B) { for i := 0; i < b.N; i++ { @@ -172,6 +151,7 @@ func Test_Input_templateVarsForScheme(t *testing.T) { TemplateVars{ {defaultRegexes.LogName, "main_logs"}, {defaultRegexes.NodeID, "n0-0-n0"}, + {defaultRegexes.GeneratedName, "generated-name"}, {defaultRegexes.TaskRetryAttempt, "1"}, {defaultRegexes.TaskID, "my-task-name"}, {defaultRegexes.TaskVersion, "1"}, @@ -245,147 +225,99 @@ func Test_Input_templateVarsForScheme(t *testing.T) { } } -func Test_templateLogPlugin_Regression(t *testing.T) { - type fields struct { - templateURI string - messageFormat core.TaskLog_MessageFormat - } +func TestTemplateLogPlugin(t *testing.T) { type args struct { - podName string - podUID string - namespace string - containerName string - containerID string - logName string - podRFC3339StartTime string - podRFC3339FinishTime string - podUnixStartTime int64 - podUnixFinishTime int64 + input Input } tests := []struct { - name string - fields fields - args args - want core.TaskLog - wantErr bool + name string + plugin TemplateLogPlugin + args args + want Output }{ { "cloudwatch", - fields{ - templateURI: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.{{.podName}}_{{.namespace}}_{{.containerName}}-{{.containerId}}.log", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.{{.podName}}_{{.namespace}}_{{.containerName}}-{{.containerId}}.log"}, + MessageFormat: core.TaskLog_JSON, }, args{ - podName: "f-uuid-driver", - podUID: "pod-uid", - namespace: "flyteexamples-production", - containerName: "spark-kubernetes-driver", - containerID: "cri-o://abc", - logName: "main_logs", - podRFC3339StartTime: "1970-01-01T01:02:03+01:00", - podRFC3339FinishTime: "1970-01-01T04:25:45+01:00", - podUnixStartTime: 123, - podUnixFinishTime: 12345, - }, - core.TaskLog{ + input: Input{ + PodName: "f-uuid-driver", + PodUID: "pod-uid", + Namespace: "flyteexamples-production", + ContainerName: "spark-kubernetes-driver", + ContainerID: "cri-o://abc", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + }, + }, + Output{TaskLogs: []*core.TaskLog{{ Uri: "https://console.aws.amazon.com/cloudwatch/home?region=us-east-1#logEventViewer:group=/flyte-production/kubernetes;stream=var.log.containers.f-uuid-driver_flyteexamples-production_spark-kubernetes-driver-abc.log", MessageFormat: core.TaskLog_JSON, Name: "main_logs", - }, - false, + }}}, }, { "stackdriver", - fields{ - templateURI: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}"}, + MessageFormat: core.TaskLog_JSON, }, args{ - podName: "podName", - podUID: "pod-uid", - namespace: "flyteexamples-production", - containerName: "spark-kubernetes-driver", - containerID: "cri-o://abc", - logName: "main_logs", - podRFC3339StartTime: "1970-01-01T01:02:03+01:00", - podRFC3339FinishTime: "1970-01-01T04:25:45+01:00", - podUnixStartTime: 123, - podUnixFinishTime: 12345, - }, - core.TaskLog{ + input: Input{ + PodName: "podName", + PodUID: "pod-uid", + Namespace: "flyteexamples-production", + ContainerName: "spark-kubernetes-driver", + ContainerID: "cri-o://abc", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + }, + }, + Output{TaskLogs: []*core.TaskLog{{ Uri: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3DpodName", MessageFormat: core.TaskLog_JSON, Name: "main_logs", - }, - false, + }}}, }, { "kubernetes", - fields{ - templateURI: "https://dashboard.k8s.net/#!/log/{{.namespace}}/{{.podName}}/pod?namespace={{.namespace}}", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://dashboard.k8s.net/#!/log/{{.namespace}}/{{.podName}}/pod?namespace={{.namespace}}"}, + MessageFormat: core.TaskLog_JSON, }, args{ - podName: "flyteexamples-development-task-name", - podUID: "pod-uid", - namespace: "flyteexamples-development", - containerName: "ignore", - containerID: "ignore", - logName: "main_logs", - podRFC3339StartTime: "1970-01-01T01:02:03+01:00", - podRFC3339FinishTime: "1970-01-01T04:25:45+01:00", - podUnixStartTime: 123, - podUnixFinishTime: 12345, - }, - core.TaskLog{ + input: Input{ + PodName: "flyteexamples-development-task-name", + PodUID: "pod-uid", + Namespace: "flyteexamples-development", + ContainerName: "ignore", + ContainerID: "ignore", + LogName: "main_logs", + PodRFC3339StartTime: "1970-01-01T01:02:03+01:00", + PodRFC3339FinishTime: "1970-01-01T04:25:45+01:00", + PodUnixStartTime: 123, + PodUnixFinishTime: 12345, + }, + }, + Output{TaskLogs: []*core.TaskLog{{ Uri: "https://dashboard.k8s.net/#!/log/flyteexamples-development/flyteexamples-development-task-name/pod?namespace=flyteexamples-development", MessageFormat: core.TaskLog_JSON, Name: "main_logs", - }, - false, + }}}, }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - s := TemplateLogPlugin{ - templateUris: []string{tt.fields.templateURI}, - messageFormat: tt.fields.messageFormat, - } - - got, err := s.GetTaskLog(tt.args.podName, tt.args.podUID, tt.args.namespace, tt.args.containerName, tt.args.containerID, tt.args.logName, tt.args.podRFC3339FinishTime, tt.args.podRFC3339FinishTime, tt.args.podUnixStartTime, tt.args.podUnixFinishTime) - if (err != nil) != tt.wantErr { - t.Errorf("GetTaskLog() error = %v, wantErr %v", err, tt.wantErr) - return - } - - if diff := deep.Equal(got, tt.want); diff != nil { - t.Errorf("GetTaskLog() got = %v, want %v, diff: %v", got, tt.want, diff) - } - }) - } -} - -func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { - type fields struct { - scheme TemplateScheme - templateURI string - messageFormat core.TaskLog_MessageFormat - } - type args struct { - input Input - } - tests := []struct { - name string - fields fields - args args - want Output - wantErr bool - }{ { "splunk", - fields{ - templateURI: "https://prd-p-ighar.splunkcloud.com/en-US/app/search/search?q=search%20container_name%3D%22{{ .containerName }}%22", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://prd-p-ighar.splunkcloud.com/en-US/app/search/search?q=search%20container_name%3D%22{{ .containerName }}%22"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -410,13 +342,12 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "ddog", - fields{ - templateURI: "https://app.datadoghq.com/logs?event&from_ts={{ .podUnixStartTime }}&live=true&query=pod_name%3A{{ .podName }}&to_ts={{ .podUnixFinishTime }}", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://app.datadoghq.com/logs?event&from_ts={{ .podUnixStartTime }}&live=true&query=pod_name%3A{{ .podName }}&to_ts={{ .podUnixFinishTime }}"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -441,13 +372,12 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "stackdriver-with-rfc3339-timestamp", - fields{ - templateURI: "https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}%20%22{{.podRFC3339StartTime}}%22", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + TemplateURIs: []TemplateURI{"https://console.cloud.google.com/logs/viewer?project=test-gcp-project&angularJsUrl=%2Flogs%2Fviewer%3Fproject%3Dtest-gcp-project&resource=aws_ec2_instance&advancedFilter=resource.labels.pod_name%3D{{.podName}}%20%22{{.podRFC3339StartTime}}%22"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -472,14 +402,13 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "task-with-task-execution-identifier", - fields{ - scheme: TemplateSchemeTaskExecution, - templateURI: "https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + Scheme: TemplateSchemeTaskExecution, + TemplateURIs: []TemplateURI{"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}/view/logs"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -505,14 +434,13 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, { "mapped-task-with-task-execution-identifier", - fields{ - scheme: TemplateSchemeTaskExecution, - templateURI: "https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .subtaskParentRetryAttempt }}/mappedIndex/{{ .subtaskExecutionIndex }}/mappedAttempt/{{ .subtaskRetryAttempt }}/view/logs", - messageFormat: core.TaskLog_JSON, + TemplateLogPlugin{ + Scheme: TemplateSchemeTaskExecution, + TemplateURIs: []TemplateURI{"https://flyte.corp.net/console/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .subtaskParentRetryAttempt }}/mappedIndex/{{ .subtaskExecutionIndex }}/mappedAttempt/{{ .subtaskRetryAttempt }}/view/logs"}, + MessageFormat: core.TaskLog_JSON, }, args{ input: Input{ @@ -545,23 +473,14 @@ func TestTemplateLogPlugin_NewTaskLog(t *testing.T) { }, }, }, - false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - s := TemplateLogPlugin{ - scheme: tt.fields.scheme, - templateUris: []string{tt.fields.templateURI}, - messageFormat: tt.fields.messageFormat, - } - got, err := s.GetTaskLogs(tt.args.input) - if (err != nil) != tt.wantErr { - t.Errorf("NewTaskLog() error = %v, wantErr %v", err, tt.wantErr) - return - } + got, err := tt.plugin.GetTaskLogs(tt.args.input) + assert.NoError(t, err) if !reflect.DeepEqual(got, tt.want) { - t.Errorf("NewTaskLog() got = %v, want %v", got, tt.want) + t.Errorf("GetTaskLogs() got = %v, want %v", got, tt.want) } }) } From 70c23c2cf8dccd44011c2168e7f532735ce8b99c Mon Sep 17 00:00:00 2001 From: Jeev B Date: Fri, 10 Nov 2023 11:42:52 -0800 Subject: [PATCH 7/8] Add support for displaying the Ray dashboard when a RayJob is active (#4397) * Refactor task logs framework Signed-off-by: Jeev B * Return templateLogPluginCollection instead of nil even if no plugins are specified Signed-off-by: Jeev B * Add support for displaying the Ray dashboard when a RayJob is active Signed-off-by: Jeev B * Fix tasklogs returned for Ray task Signed-off-by: Jeev B * Get tasklogs working with task phase Signed-off-by: Jeev B * Misc fixes Signed-off-by: Jeev B * Add tests for dashboard URL link Signed-off-by: Jeev B * Fix linting issues and merge conflicts Signed-off-by: Jeev B --------- Signed-off-by: Jeev B --- .../go/tasks/plugins/k8s/ray/config.go | 12 +-- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 40 ++++++--- .../go/tasks/plugins/k8s/ray/ray_test.go | 84 ++++++++++++++++--- 3 files changed, 108 insertions(+), 28 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/config.go b/flyteplugins/go/tasks/plugins/k8s/ray/config.go index e123c5b8ab..8601264edf 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/config.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/config.go @@ -8,6 +8,7 @@ import ( pluginsConfig "github.com/flyteorg/flyte/flyteplugins/go/tasks/config" "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" pluginmachinery "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog" "github.com/flyteorg/flyte/flytestdlib/config" ) @@ -78,11 +79,12 @@ type Config struct { DeprecatedNodeIPAddress string `json:"nodeIPAddress,omitempty" pflag:"-,DEPRECATED. Please use DefaultConfig.[HeadNode|WorkerNode].IPAddress"` // Remote Ray Cluster Config - RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"` - Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"` - LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"` - Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"` - EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"` + RemoteClusterConfig pluginmachinery.ClusterConfig `json:"remoteClusterConfig" pflag:"Configuration of remote K8s cluster for ray jobs"` + Logs logs.LogConfig `json:"logs" pflag:"-,Log configuration for ray jobs"` + LogsSidecar *v1.Container `json:"logsSidecar" pflag:"-,Sidecar to inject into head pods for capturing ray job logs"` + DashboardURLTemplate *tasklog.TemplateLogPlugin `json:"dashboardURLTemplate" pflag:",Template for URL of Ray dashboard running on a head node."` + Defaults DefaultConfig `json:"defaults" pflag:"-,Default configuration for ray jobs"` + EnableUsageStats bool `json:"enableUsageStats" pflag:",Enable usage stats for ray jobs. These stats are submitted to usage-stats.ray.io per https://docs.ray.io/en/latest/cluster/usage-stats.html"` } type DefaultConfig struct { diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index 1d0fde4ca8..0bc4f1183b 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -13,6 +13,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/plugins" flyteerr "github.com/flyteorg/flyte/flyteplugins/go/tasks/errors" "github.com/flyteorg/flyte/flyteplugins/go/tasks/logs" @@ -437,26 +438,35 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon return nil, fmt.Errorf("failed to initialize log plugins. Error: %w", err) } - if logPlugin == nil { - return nil, nil - } - - // TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs - // RayJob CRD does not include the name of the worker or head pod for now + var taskLogs []*core.TaskLog taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID() - logOutput, err := logPlugin.GetTaskLogs(tasklog.Input{ + input := tasklog.Input{ Namespace: rayJob.Namespace, TaskExecutionID: taskExecID, - }) + } + // TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs + // RayJob CRD does not include the name of the worker or head pod for now + logOutput, err := logPlugin.GetTaskLogs(input) if err != nil { return nil, fmt.Errorf("failed to generate task logs. Error: %w", err) } + taskLogs = append(taskLogs, logOutput.TaskLogs...) - return &pluginsCore.TaskInfo{ - Logs: logOutput.TaskLogs, - }, nil + // Handling for Ray Dashboard + dashboardURLTemplate := GetConfig().DashboardURLTemplate + if dashboardURLTemplate != nil && + rayJob.Status.DashboardURL != "" && + rayJob.Status.JobStatus == rayv1alpha1.JobStatusRunning { + dashboardURLOutput, err := dashboardURLTemplate.GetTaskLogs(input) + if err != nil { + return nil, fmt.Errorf("failed to generate Ray dashboard link. Error: %w", err) + } + taskLogs = append(taskLogs, dashboardURLOutput.TaskLogs...) + } + + return &pluginsCore.TaskInfo{Logs: taskLogs}, nil } func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginContext k8s.PluginContext, resource client.Object) (pluginsCore.PhaseInfo, error) { @@ -489,8 +499,14 @@ func (plugin rayJobResourceHandler) GetTaskPhase(ctx context.Context, pluginCont return pluginsCore.PhaseInfoFailure(flyteerr.TaskFailedWithError, reason, info), nil case rayv1alpha1.JobStatusSucceeded: return pluginsCore.PhaseInfoSuccess(info), nil - case rayv1alpha1.JobStatusPending, rayv1alpha1.JobStatusRunning: + case rayv1alpha1.JobStatusPending: return pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info), nil + case rayv1alpha1.JobStatusRunning: + phaseInfo := pluginsCore.PhaseInfoRunning(pluginsCore.DefaultPhaseVersion, info) + if len(info.Logs) > 0 { + phaseInfo = phaseInfo.WithVersion(pluginsCore.DefaultPhaseVersion + 1) + } + return phaseInfo, nil case rayv1alpha1.JobStatusStopped: // There is no current usage of this job status in KubeRay. It's unclear what it represents fallthrough diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go index 920fa85d61..ccb518fa03 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go @@ -24,6 +24,7 @@ import ( pluginIOMocks "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/io/mocks" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s" mocks2 "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" + "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/tasklog" "github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/utils" ) @@ -615,6 +616,8 @@ func newPluginContext() k8s.PluginContext { }, }, }) + taskExecID.OnGetUniqueNodeID().Return("unique-node") + taskExecID.OnGetGeneratedName().Return("generated-name") tskCtx := &mocks.TaskExecutionMetadata{} tskCtx.OnGetTaskExecutionID().Return(taskExecID) @@ -642,17 +645,19 @@ func TestGetTaskPhase(t *testing.T) { rayJobPhase rayv1alpha1.JobStatus rayClusterPhase rayv1alpha1.JobDeploymentStatus expectedCorePhase pluginsCore.Phase + expectedError bool }{ - {"", rayv1alpha1.JobDeploymentStatusInitializing, pluginsCore.PhaseInitializing}, - {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedToGetOrCreateRayCluster, pluginsCore.PhasePermanentFailure}, - {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusWaitForDashboard, pluginsCore.PhaseRunning}, - {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedJobDeploy, pluginsCore.PhasePermanentFailure}, - {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseRunning}, - {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus, pluginsCore.PhaseUndefined}, - {rayv1alpha1.JobStatusRunning, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseRunning}, - {rayv1alpha1.JobStatusFailed, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhasePermanentFailure}, - {rayv1alpha1.JobStatusSucceeded, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseSuccess}, - {rayv1alpha1.JobStatusSucceeded, rayv1alpha1.JobDeploymentStatusComplete, pluginsCore.PhaseSuccess}, + {"", rayv1alpha1.JobDeploymentStatusInitializing, pluginsCore.PhaseInitializing, false}, + {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedToGetOrCreateRayCluster, pluginsCore.PhasePermanentFailure, false}, + {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusWaitForDashboard, pluginsCore.PhaseRunning, false}, + {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedJobDeploy, pluginsCore.PhasePermanentFailure, false}, + {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseRunning, false}, + {rayv1alpha1.JobStatusPending, rayv1alpha1.JobDeploymentStatusFailedToGetJobStatus, pluginsCore.PhaseRunning, false}, + {rayv1alpha1.JobStatusRunning, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseRunning, false}, + {rayv1alpha1.JobStatusFailed, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhasePermanentFailure, false}, + {rayv1alpha1.JobStatusSucceeded, rayv1alpha1.JobDeploymentStatusRunning, pluginsCore.PhaseSuccess, false}, + {rayv1alpha1.JobStatusSucceeded, rayv1alpha1.JobDeploymentStatusComplete, pluginsCore.PhaseSuccess, false}, + {rayv1alpha1.JobStatusStopped, rayv1alpha1.JobDeploymentStatusComplete, pluginsCore.PhaseUndefined, true}, } for _, tc := range testCases { @@ -663,12 +668,69 @@ func TestGetTaskPhase(t *testing.T) { startTime := metav1.NewTime(time.Now()) rayObject.Status.StartTime = &startTime phaseInfo, err := rayJobResourceHandler.GetTaskPhase(ctx, pluginCtx, rayObject) - assert.Nil(t, err) + if tc.expectedError { + assert.Error(t, err) + } else { + assert.Nil(t, err) + } assert.Equal(t, tc.expectedCorePhase.String(), phaseInfo.Phase().String()) }) } } +func TestGetEventInfo_DashboardURL(t *testing.T) { + pluginCtx := newPluginContext() + testCases := []struct { + name string + rayJob rayv1alpha1.RayJob + dashboardURLTemplate tasklog.TemplateLogPlugin + expectedTaskLogs []*core.TaskLog + }{ + { + name: "dashboard URL displayed", + rayJob: rayv1alpha1.RayJob{ + Status: rayv1alpha1.RayJobStatus{ + DashboardURL: "exists", + JobStatus: rayv1alpha1.JobStatusRunning, + }, + }, + dashboardURLTemplate: tasklog.TemplateLogPlugin{ + DisplayName: "Ray Dashboard", + TemplateURIs: []tasklog.TemplateURI{"http://test/{{.generatedName}}"}, + Scheme: tasklog.TemplateSchemeTaskExecution, + }, + expectedTaskLogs: []*core.TaskLog{ + { + Name: "Ray Dashboard", + Uri: "http://test/generated-name", + }, + }, + }, + { + name: "dashboard URL is not displayed", + rayJob: rayv1alpha1.RayJob{ + Status: rayv1alpha1.RayJobStatus{ + JobStatus: rayv1alpha1.JobStatusPending, + }, + }, + dashboardURLTemplate: tasklog.TemplateLogPlugin{ + DisplayName: "dummy", + TemplateURIs: []tasklog.TemplateURI{"http://dummy"}, + }, + expectedTaskLogs: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert.NoError(t, SetConfig(&Config{DashboardURLTemplate: &tc.dashboardURLTemplate})) + ti, err := getEventInfoForRayJob(logs.LogConfig{}, pluginCtx, &tc.rayJob) + assert.NoError(t, err) + assert.Equal(t, tc.expectedTaskLogs, ti.Logs) + }) + } +} + func TestGetPropertiesRay(t *testing.T) { rayJobResourceHandler := rayJobResourceHandler{} expected := k8s.PluginProperties{} From c4b040b8b33f1558e9169c751cbff3916e1f1b7e Mon Sep 17 00:00:00 2001 From: Eduardo Apolinario <653394+eapolinario@users.noreply.github.com> Date: Fri, 10 Nov 2023 13:12:04 -0800 Subject: [PATCH 8/8] Disable path filtering for monorepo components (#4404) Signed-off-by: Eduardo Apolinario Co-authored-by: Eduardo Apolinario --- .github/workflows/checks.yml | 15 --------------- .github/workflows/flyteidl-checks.yml | 4 ---- 2 files changed, 19 deletions(-) diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 8c235d3d71..41ce00d103 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -6,24 +6,9 @@ concurrency: on: pull_request: - paths: - - 'datacatalog/**' - - 'flyteadmin/**' - - 'flytecopilot/**' - - 'flyteplugins/**' - - 'flytepropeller/**' - - 'flytestdlib/**' push: branches: - master - paths: - - 'datacatalog/**' - - 'flyteadmin/**' - - 'flytecopilot/**' - - 'flyteidl/**' - - 'flyteplugins/**' - - 'flytepropeller/**' - - 'flytestdlib/**' env: GO_VERSION: "1.19" PRIORITIES: "P0" diff --git a/.github/workflows/flyteidl-checks.yml b/.github/workflows/flyteidl-checks.yml index cbf84b97f1..5cdf9733b2 100644 --- a/.github/workflows/flyteidl-checks.yml +++ b/.github/workflows/flyteidl-checks.yml @@ -6,13 +6,9 @@ concurrency: on: pull_request: - paths: - - 'flyteidl/**' push: branches: - master - paths: - - 'flyteidl/**' env: GO_VERSION: "1.19" jobs: