From 31748fea0dbfa1a106e5bc1be5cff4406246ba4c Mon Sep 17 00:00:00 2001 From: Laura Lin <13591898+lauralindy@users.noreply.github.com> Date: Fri, 10 Nov 2023 20:32:42 -0800 Subject: [PATCH 1/4] adding consoleUrl parameterization based on partition (#4375) * adding consoleUrl parameterization based on partition Signed-off-by: Laura Lin <13591898+lauralindy@users.noreply.github.com> * Update plugin.go Signed-off-by: Laura Lin <13591898+lauralindy@users.noreply.github.com> * Update plugin_test.go Signed-off-by: Laura Lin <13591898+lauralindy@users.noreply.github.com> * lint Signed-off-by: Laura Lin <13591898+lauralindy@users.noreply.github.com> --------- Signed-off-by: Laura Lin <13591898+lauralindy@users.noreply.github.com> Co-authored-by: Kevin Su --- .../go/tasks/plugins/webapi/athena/plugin.go | 10 +++++++++- .../go/tasks/plugins/webapi/athena/plugin_test.go | 15 +++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/flyteplugins/go/tasks/plugins/webapi/athena/plugin.go b/flyteplugins/go/tasks/plugins/webapi/athena/plugin.go index a1f16163fe..826a12e45f 100644 --- a/flyteplugins/go/tasks/plugins/webapi/athena/plugin.go +++ b/flyteplugins/go/tasks/plugins/webapi/athena/plugin.go @@ -3,6 +3,7 @@ package athena import ( "context" "fmt" + "strings" "time" awsSdk "github.com/aws/aws-sdk-go-v2/aws" @@ -177,12 +178,19 @@ func (p Plugin) Status(ctx context.Context, tCtx webapi.StatusContext) (phase co func createTaskInfo(queryID string, cfg awsSdk.Config) *core.TaskInfo { timeNow := time.Now() + var consoleURL string + if strings.Contains(cfg.Region, "gov") { + consoleURL = "console.amazonaws-us-gov.com" + } else { + consoleURL = "console.aws.amazon.com" + } return &core.TaskInfo{ OccurredAt: &timeNow, Logs: []*idlCore.TaskLog{ { - Uri: fmt.Sprintf("https://%v.console.aws.amazon.com/athena/home?force®ion=%v#query/history/%v", + Uri: fmt.Sprintf("https://%v.%v/athena/home?force®ion=%v#query/history/%v", cfg.Region, + consoleURL, cfg.Region, queryID), Name: "Athena Query Console", diff --git a/flyteplugins/go/tasks/plugins/webapi/athena/plugin_test.go b/flyteplugins/go/tasks/plugins/webapi/athena/plugin_test.go index 6e3238d813..e19829447e 100644 --- a/flyteplugins/go/tasks/plugins/webapi/athena/plugin_test.go +++ b/flyteplugins/go/tasks/plugins/webapi/athena/plugin_test.go @@ -22,3 +22,18 @@ func TestCreateTaskInfo(t *testing.T) { assert.Len(t, taskInfo.ExternalResources, 1) assert.Equal(t, taskInfo.ExternalResources[0].ExternalID, "query_id") } + + +func TestCreateTaskInfoGovAWS(t *testing.T) { + taskInfo := createTaskInfo("query_id", awsSdk.Config{ + Region: "us-gov-east-1", + }) + assert.EqualValues(t, []*idlCore.TaskLog{ + { + Uri: "https://us-gov-east-1.console.amazonaws-us-gov.com/athena/home?force®ion=us-gov-east-1#query/history/query_id", + Name: "Athena Query Console", + }, + }, taskInfo.Logs) + assert.Len(t, taskInfo.ExternalResources, 1) + assert.Equal(t, taskInfo.ExternalResources[0].ExternalID, "query_id") +} From 0fc2ab0b76a5c5b916d30c06028ddf8d7ac3e773 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Sat, 11 Nov 2023 15:29:36 +0800 Subject: [PATCH 2/4] [Docs] Sensor Agent Doc (#4195) * sensor agent doc Signed-off-by: Future Outlier * sensor agent index rst Signed-off-by: Future Outlier --------- Signed-off-by: Future Outlier Co-authored-by: Future Outlier Co-authored-by: Kevin Su --- rsts/deployment/agents/bigquery.rst | 51 ++++++---- rsts/deployment/agents/index.rst | 10 ++ rsts/deployment/agents/sensor.rst | 147 ++++++++++++++++++++++++++++ 3 files changed, 189 insertions(+), 19 deletions(-) create mode 100644 rsts/deployment/agents/sensor.rst diff --git a/rsts/deployment/agents/bigquery.rst b/rsts/deployment/agents/bigquery.rst index cf01e0b7bd..ac6ec896bb 100644 --- a/rsts/deployment/agents/bigquery.rst +++ b/rsts/deployment/agents/bigquery.rst @@ -20,30 +20,39 @@ Specify agent configuration .. tabs:: - .. group-tab:: Flyte binary + .. group-tab:: Flyte binary - Edit the relevant YAML file to specify the agent. + Edit the relevant YAML file to specify the agent. - .. code-block:: yaml - :emphasize-lines: 7,11 + .. code-block:: bash - tasks: - task-plugins: - enabled-plugins: - - container - - sidecar - - k8s-array - - bigquery - default-for-task-types: - - container: container - - container_array: k8s-array - - bigquery_query_job_task: agent-service + kubectl edit configmap flyte-sandbox-config -n flyte - .. group-tab:: Flyte core + .. code-block:: yaml + :emphasize-lines: 7,11,16 + + tasks: + task-plugins: + enabled-plugins: + - container + - sidecar + - k8s-array + - agent-service + default-for-task-types: + - container: container + - container_array: k8s-array + - bigquery_query_job_task: agent-service + + plugins: + agent-service: + supportedTaskTypes: + - bigquery_query_job_task + + .. group-tab:: Flyte core - Create a file named ``values-override.yaml`` and add the following configuration to it. + Create a file named ``values-override.yaml`` and add the following configuration to it. - .. code-block:: yaml + .. code-block:: yaml configmap: enabled_plugins: @@ -56,12 +65,16 @@ Specify agent configuration - container - sidecar - k8s-array - - bigquery + - agent-service default-for-task-types: container: container sidecar: sidecar container_array: k8s-array bigquery_query_job_task: agent-service + plugins: + agent-service: + supportedTaskTypes: + - bigquery_query_job_task Ensure that the propeller has the correct service account for BigQuery. diff --git a/rsts/deployment/agents/index.rst b/rsts/deployment/agents/index.rst index b3a196e1b6..af0bddaefd 100644 --- a/rsts/deployment/agents/index.rst +++ b/rsts/deployment/agents/index.rst @@ -27,6 +27,15 @@ Discover the process of setting up Agents for Flyte. ^^^^^^^^^^^^ Guide to setting up the MMCloud agent. + --- + + .. link-button:: deployment-agent-setup-sensor + :type: ref + :text: Sensor Agent + :classes: btn-block stretched-link + ^^^^^^^^^^^^ + Guide to setting up the Sensor agent. + .. toctree:: :maxdepth: 1 @@ -35,3 +44,4 @@ Discover the process of setting up Agents for Flyte. bigquery mmcloud + sensor diff --git a/rsts/deployment/agents/sensor.rst b/rsts/deployment/agents/sensor.rst new file mode 100644 index 0000000000..ecb45e426f --- /dev/null +++ b/rsts/deployment/agents/sensor.rst @@ -0,0 +1,147 @@ +.. _deployment-agent-setup-sensor: + +Sensor Agent +================= + +Sensor enables users to continuously check for a file or a condition to be met periodically. + +When the condition is met, the sensor will complete. + +This guide provides an overview of how to set up Sensor in your Flyte deployment. + +Spin up a cluster +----------------- + +.. tabs:: + + .. group-tab:: Flyte binary + + You can spin up a demo cluster using the following command: + + .. code-block:: bash + + flytectl demo start + + Or install Flyte using the :ref:`flyte-binary helm chart `. + + .. group-tab:: Flyte core + + If you've installed Flyte using the + `flyte-core helm chart `__, please ensure: + + * You have the correct kubeconfig and have selected the correct Kubernetes context. + * Confirm that you have the correct Flytectl configuration at ``~/.flyte/config.yaml``. + +.. note:: + + Add the Flyte chart repo to Helm if you're installing via the Helm charts. + + .. code-block:: bash + + helm repo add flyteorg https://flyteorg.github.io/flyte + +Specify agent configuration +---------------------------- + +Enable the Sensor agent by adding the following config to the relevant YAML file(s): + +.. tabs:: + + .. group-tab:: Flyte binary + + Edit the relevant YAML file to specify the agent. + + .. code-block:: bash + + kubectl edit configmap flyte-sandbox-config -n flyte + + .. code-block:: yaml + :emphasize-lines: 7,11,16 + + tasks: + task-plugins: + enabled-plugins: + - container + - sidecar + - k8s-array + - agent-service + default-for-task-types: + - container: container + - container_array: k8s-array + - sensor: agent-service + + plugins: + agent-service: + supportedTaskTypes: + - sensor + + .. group-tab:: Flyte core + + Create a file named ``values-override.yaml`` and add the following configuration to it. + + .. code-block:: yaml + + configmap: + enabled_plugins: + # -- Tasks specific configuration [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#GetConfig) + tasks: + # -- Plugins configuration, [structure](https://pkg.go.dev/github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config#TaskPluginConfig) + task-plugins: + # -- [Enabled Plugins](https://pkg.go.dev/github.com/flyteorg/flyteplugins/go/tasks/config#Config). Enable sagemaker*, athena if you install the backend + enabled-plugins: + - container + - sidecar + - k8s-array + - agent-service + default-for-task-types: + container: container + sidecar: sidecar + container_array: k8s-array + sensor: agent-service + plugins: + agent-service: + supportedTaskTypes: + - sensor + + +Upgrade the deployment +---------------------- + +.. tabs:: + + .. group-tab:: Flyte binary + + .. tabs:: + + .. group-tab:: Demo cluster + + .. code-block:: bash + + kubectl rollout restart deployment flyte-sandbox -n flyte + + .. group-tab:: Helm chart + + .. code-block:: bash + + helm upgrade flyteorg/flyte-binary -n --values + + Replace ```` with the name of your release (e.g., ``flyte-backend``), + ```` with the name of your namespace (e.g., ``flyte``), + and ```` with the name of your YAML file. + + .. group-tab:: Flyte core + + .. code-block:: + + helm upgrade flyte/flyte-core -n --values values-override.yaml + + Replace ```` with the name of your release (e.g., ``flyte``) + and ```` with the name of your namespace (e.g., ``flyte``). + +Wait for the upgrade to complete. + +You can check the status of the deployment pods by running the following command: + +.. code-block:: + + kubectl get pods -n flyte From 033276d42d00e508d9e66299683a5ea18efaec2f Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Sat, 11 Nov 2023 16:24:26 +0800 Subject: [PATCH 3/4] [flytepropeller] Add Tests in v1alpha.go including `array_test.go`, `branch_test.go`, `error_test.go`, and `iface_test.go` with 0.13% Coverage Improvment (#4234) * add flyteworkflow 4 test file Signed-off-by: Future Outlier * import error Signed-off-by: Future Outlier * golangci-lint run --fix Signed-off-by: Future Outlier --------- Signed-off-by: Future Outlier Co-authored-by: Future Outlier Co-authored-by: Kevin Su --- .../apis/flyteworkflow/v1alpha1/array_test.go | 49 +++++ .../flyteworkflow/v1alpha1/branch_test.go | 78 +++++++- .../apis/flyteworkflow/v1alpha1/error_test.go | 30 +++ .../apis/flyteworkflow/v1alpha1/iface_test.go | 186 ++++++++++++++++++ 4 files changed, 340 insertions(+), 3 deletions(-) create mode 100644 flytepropeller/pkg/apis/flyteworkflow/v1alpha1/array_test.go create mode 100644 flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error_test.go create mode 100644 flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface_test.go diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/array_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/array_test.go new file mode 100644 index 0000000000..74ea26a428 --- /dev/null +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/array_test.go @@ -0,0 +1,49 @@ +package v1alpha1 + +import ( + "testing" +) + +func TestArrayNodeSpec_GetSubNodeSpec(t *testing.T) { + nodeSpec := &NodeSpec{} + arrayNodeSpec := ArrayNodeSpec{ + SubNodeSpec: nodeSpec, + } + + if arrayNodeSpec.GetSubNodeSpec() != nodeSpec { + t.Errorf("Expected nodeSpec, but got a different value") + } +} + +func TestArrayNodeSpec_GetParallelism(t *testing.T) { + parallelism := uint32(5) + arrayNodeSpec := ArrayNodeSpec{ + Parallelism: parallelism, + } + + if arrayNodeSpec.GetParallelism() != parallelism { + t.Errorf("Expected %d, but got %d", parallelism, arrayNodeSpec.GetParallelism()) + } +} + +func TestArrayNodeSpec_GetMinSuccesses(t *testing.T) { + minSuccesses := uint32(3) + arrayNodeSpec := ArrayNodeSpec{ + MinSuccesses: &minSuccesses, + } + + if *arrayNodeSpec.GetMinSuccesses() != minSuccesses { + t.Errorf("Expected %d, but got %d", minSuccesses, *arrayNodeSpec.GetMinSuccesses()) + } +} + +func TestArrayNodeSpec_GetMinSuccessRatio(t *testing.T) { + minSuccessRatio := float32(0.8) + arrayNodeSpec := ArrayNodeSpec{ + MinSuccessRatio: &minSuccessRatio, + } + + if *arrayNodeSpec.GetMinSuccessRatio() != minSuccessRatio { + t.Errorf("Expected %f, but got %f", minSuccessRatio, *arrayNodeSpec.GetMinSuccessRatio()) + } +} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch_test.go index d5adf91d77..5fe19f6758 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch_test.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/branch_test.go @@ -1,20 +1,21 @@ -package v1alpha1_test +package v1alpha1 import ( + "bytes" "encoding/json" "io/ioutil" "testing" + "github.com/golang/protobuf/jsonpb" "github.com/stretchr/testify/assert" "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) func TestMarshalUnMarshal_BranchTask(t *testing.T) { r, err := ioutil.ReadFile("testdata/branch.json") assert.NoError(t, err) - o := v1alpha1.NodeSpec{} + o := NodeSpec{} err = json.Unmarshal(r, &o) assert.NoError(t, err) assert.NotNil(t, o.BranchNode.If) @@ -25,3 +26,74 @@ func TestMarshalUnMarshal_BranchTask(t *testing.T) { assert.NotEmpty(t, raw) } } + +// TestBranchNodeSpecMethods tests the methods of the BranchNodeSpec struct. +func TestErrorMarshalAndUnmarshalJSON(t *testing.T) { + coreError := &core.Error{ + FailedNodeId: "TestNode", + Message: "Test error message", + } + + err := Error{Error: coreError} + data, jErr := err.MarshalJSON() + assert.Nil(t, jErr) + + // Unmarshalling the JSON back to a new core.Error struct + var newCoreError core.Error + uErr := jsonpb.Unmarshal(bytes.NewReader(data), &newCoreError) + assert.Nil(t, uErr) + assert.Equal(t, coreError.Message, newCoreError.Message) + assert.Equal(t, coreError.FailedNodeId, newCoreError.FailedNodeId) +} + +func TestBranchNodeSpecMethods(t *testing.T) { + // Creating a core.BooleanExpression instance for testing + boolExpr := &core.BooleanExpression{} + + // Creating an Error instance for testing + errorMessage := &core.Error{ + Message: "Test error", + } + + ifNode := NodeID("ifNode") + elifNode := NodeID("elifNode") + elseNode := NodeID("elseNode") + + // Creating a BranchNodeSpec instance for testing + branchNodeSpec := BranchNodeSpec{ + If: IfBlock{ + Condition: BooleanExpression{ + BooleanExpression: boolExpr, + }, + ThenNode: &ifNode, + }, + ElseIf: []*IfBlock{ + { + Condition: BooleanExpression{ + BooleanExpression: boolExpr, + }, + ThenNode: &elifNode, + }, + }, + Else: &elseNode, + ElseFail: &Error{Error: errorMessage}, + } + + assert.Equal(t, boolExpr, branchNodeSpec.If.GetCondition()) + + assert.Equal(t, &ifNode, branchNodeSpec.If.GetThenNode()) + + assert.Equal(t, &branchNodeSpec.If, branchNodeSpec.GetIf()) + + assert.Equal(t, &elseNode, branchNodeSpec.GetElse()) + + elifs := branchNodeSpec.GetElseIf() + assert.Equal(t, 1, len(elifs)) + assert.Equal(t, boolExpr, elifs[0].GetCondition()) + assert.Equal(t, &elifNode, elifs[0].GetThenNode()) + + assert.Equal(t, errorMessage, branchNodeSpec.GetElseFail()) + + branchNodeSpec.ElseFail = nil + assert.Nil(t, branchNodeSpec.GetElseFail()) +} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error_test.go new file mode 100644 index 0000000000..4e0968205d --- /dev/null +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/error_test.go @@ -0,0 +1,30 @@ +package v1alpha1 + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" +) + +func TestExecutionErrorJSONMarshalling(t *testing.T) { + execError := &core.ExecutionError{ + Code: "TestCode", + Message: "Test error message", + ErrorUri: "Test error uri", + } + + execErr := &ExecutionError{ExecutionError: execError} + data, jErr := json.Marshal(execErr) + assert.Nil(t, jErr) + + newExecErr := &ExecutionError{} + uErr := json.Unmarshal(data, newExecErr) + assert.Nil(t, uErr) + + assert.Equal(t, execError.Code, newExecErr.ExecutionError.Code) + assert.Equal(t, execError.Message, newExecErr.ExecutionError.Message) + assert.Equal(t, execError.ErrorUri, newExecErr.ExecutionError.ErrorUri) +} diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface_test.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface_test.go new file mode 100644 index 0000000000..eb2eafa723 --- /dev/null +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/iface_test.go @@ -0,0 +1,186 @@ +package v1alpha1 + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core" +) + +func TestNodeKindString(t *testing.T) { + tests := []struct { + kind NodeKind + expected string + }{ + {NodeKindTask, "task"}, + {NodeKindBranch, "branch"}, + {NodeKindWorkflow, "workflow"}, + {NodeKindGate, "gate"}, + {NodeKindArray, "array"}, + {NodeKindStart, "start"}, + {NodeKindEnd, "end"}, + } + + for _, test := range tests { + t.Run(test.expected, func(t *testing.T) { + assert.Equal(t, test.expected, test.kind.String()) + }) + } +} + +func TestNodePhaseString(t *testing.T) { + tests := []struct { + phase NodePhase + expected string + }{ + {NodePhaseNotYetStarted, "NotYetStarted"}, + {NodePhaseQueued, "Queued"}, + {NodePhaseRunning, "Running"}, + {NodePhaseTimingOut, "NodePhaseTimingOut"}, + {NodePhaseTimedOut, "NodePhaseTimedOut"}, + {NodePhaseSucceeding, "Succeeding"}, + {NodePhaseSucceeded, "Succeeded"}, + {NodePhaseFailing, "Failing"}, + {NodePhaseFailed, "Failed"}, + {NodePhaseSkipped, "Skipped"}, + {NodePhaseRetryableFailure, "RetryableFailure"}, + {NodePhaseDynamicRunning, "DynamicRunning"}, + {NodePhaseRecovered, "NodePhaseRecovered"}, + } + + for _, test := range tests { + t.Run(test.expected, func(t *testing.T) { + assert.Equal(t, test.expected, test.phase.String()) + }) + } +} + +func TestWorkflowPhaseString(t *testing.T) { + tests := []struct { + phase WorkflowPhase + expected string + }{ + {WorkflowPhaseReady, "Ready"}, + {WorkflowPhaseRunning, "Running"}, + {WorkflowPhaseSuccess, "Succeeded"}, + {WorkflowPhaseFailed, "Failed"}, + {WorkflowPhaseFailing, "Failing"}, + {WorkflowPhaseSucceeding, "Succeeding"}, + {WorkflowPhaseAborted, "Aborted"}, + {WorkflowPhaseHandlingFailureNode, "HandlingFailureNode"}, + {-1, "Unknown"}, + // Add more cases as needed + } + + for _, test := range tests { + t.Run(test.expected, func(t *testing.T) { + assert.Equal(t, test.expected, test.phase.String()) + }) + } +} + +func TestBranchNodePhaseString(t *testing.T) { + tests := []struct { + phase BranchNodePhase + expected string + }{ + {BranchNodeNotYetEvaluated, "NotYetEvaluated"}, + {BranchNodeSuccess, "BranchEvalSuccess"}, + {BranchNodeError, "BranchEvalFailed"}, + {-1, "Undefined"}, + } + + for _, test := range tests { + t.Run(test.expected, func(t *testing.T) { + assert.Equal(t, test.expected, test.phase.String()) + }) + } +} + +func TestWorkflowOnFailurePolicyStringError(t *testing.T) { + _, err := WorkflowOnFailurePolicyString("NON_EXISTENT_POLICY") + assert.Error(t, err) +} + +func TestWorkflowOnFailurePolicyJSONMarshalling(t *testing.T) { + tests := []struct { + policy WorkflowOnFailurePolicy + jsonStr string + }{ + {WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_IMMEDIATELY), `"FAIL_IMMEDIATELY"`}, + {WorkflowOnFailurePolicy(core.WorkflowMetadata_FAIL_AFTER_EXECUTABLE_NODES_COMPLETE), `"FAIL_AFTER_EXECUTABLE_NODES_COMPLETE"`}, + } + + for _, test := range tests { + t.Run(test.jsonStr, func(t *testing.T) { + // Testing marshalling + data, err := json.Marshal(test.policy) + require.NoError(t, err) + assert.Equal(t, test.jsonStr, string(data)) + + // Testing unmarshalling + var unmarshalledPolicy WorkflowOnFailurePolicy + err = json.Unmarshal(data, &unmarshalledPolicy) + require.NoError(t, err) + assert.Equal(t, test.policy, unmarshalledPolicy) + }) + } + + invalidTest := `123` + var policy WorkflowOnFailurePolicy + err := json.Unmarshal([]byte(invalidTest), &policy) + assert.Error(t, err) + assert.Contains(t, err.Error(), "WorkflowOnFailurePolicy should be a string, got 123") + +} + +func TestGetOutputsFile(t *testing.T) { + tests := []struct { + outputDir DataReference + expected DataReference + }{ + {"dir1", "dir1/outputs.pb"}, + {"dir2", "dir2/outputs.pb"}, + } + + for _, tt := range tests { + t.Run(string(tt.outputDir), func(t *testing.T) { // Convert DataReference to string here + assert.Equal(t, tt.expected, GetOutputsFile(tt.outputDir)) + }) + } +} + +func TestGetInputsFile(t *testing.T) { + tests := []struct { + inputDir DataReference + expected DataReference + }{ + {"dir1", "dir1/inputs.pb"}, + {"dir2", "dir2/inputs.pb"}, + } + + for _, tt := range tests { + t.Run(string(tt.inputDir), func(t *testing.T) { + assert.Equal(t, tt.expected, GetInputsFile(tt.inputDir)) + }) + } +} + +func TestGetDeckFile(t *testing.T) { + tests := []struct { + inputDir DataReference + expected DataReference + }{ + {"dir1", "dir1/deck.html"}, + {"dir2", "dir2/deck.html"}, + } + + for _, tt := range tests { + t.Run(string(tt.inputDir), func(t *testing.T) { + assert.Equal(t, tt.expected, GetDeckFile(tt.inputDir)) + }) + } +} From 72e743882d79b50208da0c4488063ab4518ce266 Mon Sep 17 00:00:00 2001 From: Jeev B Date: Mon, 13 Nov 2023 13:34:32 -0800 Subject: [PATCH 4/4] Add more context for ray log template links (#4416) Signed-off-by: Jeev B --- flyteplugins/go/tasks/plugins/k8s/ray/ray.go | 32 ++++- .../go/tasks/plugins/k8s/ray/ray_test.go | 114 +++++++++++++++++- 2 files changed, 141 insertions(+), 5 deletions(-) diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go index 0bc4f1183b..50a9c76094 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray.go @@ -3,6 +3,7 @@ package ray import ( "context" "fmt" + "regexp" "strconv" "strings" "time" @@ -37,6 +38,14 @@ const ( DisableUsageStatsStartParameter = "disable-usage-stats" ) +var logTemplateRegexes = struct { + RayClusterName *regexp.Regexp + RayJobID *regexp.Regexp +}{ + tasklog.MustCreateRegex("rayClusterName"), + tasklog.MustCreateRegex("rayJobID"), +} + type rayJobResourceHandler struct { } @@ -442,8 +451,27 @@ func getEventInfoForRayJob(logConfig logs.LogConfig, pluginContext k8s.PluginCon taskExecID := pluginContext.TaskExecutionMetadata().GetTaskExecutionID() input := tasklog.Input{ - Namespace: rayJob.Namespace, - TaskExecutionID: taskExecID, + Namespace: rayJob.Namespace, + TaskExecutionID: taskExecID, + ExtraTemplateVarsByScheme: &tasklog.TemplateVarsByScheme{}, + } + if rayJob.Status.JobId != "" { + input.ExtraTemplateVarsByScheme.Common = append( + input.ExtraTemplateVarsByScheme.Common, + tasklog.TemplateVar{ + Regex: logTemplateRegexes.RayJobID, + Value: rayJob.Status.JobId, + }, + ) + } + if rayJob.Status.RayClusterName != "" { + input.ExtraTemplateVarsByScheme.Common = append( + input.ExtraTemplateVarsByScheme.Common, + tasklog.TemplateVar{ + Regex: logTemplateRegexes.RayClusterName, + Value: rayJob.Status.RayClusterName, + }, + ) } // TODO: Retrieve the name of head pod from rayJob.status, and add it to task logs diff --git a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go index ccb518fa03..64700957c9 100644 --- a/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go +++ b/flyteplugins/go/tasks/plugins/k8s/ray/ray_test.go @@ -608,13 +608,21 @@ func newPluginContext() k8s.PluginContext { taskExecID := &mocks.TaskExecutionID{} taskExecID.OnGetID().Return(core.TaskExecutionIdentifier{ + TaskId: &core.Identifier{ + ResourceType: core.ResourceType_TASK, + Name: "my-task-name", + Project: "my-task-project", + Domain: "my-task-domain", + Version: "1", + }, NodeExecutionId: &core.NodeExecutionIdentifier{ ExecutionId: &core.WorkflowExecutionIdentifier{ - Name: "my_name", - Project: "my_project", - Domain: "my_domain", + Name: "my-execution-name", + Project: "my-execution-project", + Domain: "my-execution-domain", }, }, + RetryAttempt: 1, }) taskExecID.OnGetUniqueNodeID().Return("unique-node") taskExecID.OnGetGeneratedName().Return("generated-name") @@ -678,6 +686,106 @@ func TestGetTaskPhase(t *testing.T) { } } +func TestGetEventInfo_LogTemplates(t *testing.T) { + pluginCtx := newPluginContext() + testCases := []struct { + name string + rayJob rayv1alpha1.RayJob + logPlugin tasklog.TemplateLogPlugin + expectedTaskLogs []*core.TaskLog + }{ + { + name: "namespace", + rayJob: rayv1alpha1.RayJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-namespace", + }, + }, + logPlugin: tasklog.TemplateLogPlugin{ + DisplayName: "namespace", + TemplateURIs: []tasklog.TemplateURI{"http://test/{{ .namespace }}"}, + }, + expectedTaskLogs: []*core.TaskLog{ + { + Name: "namespace", + Uri: "http://test/test-namespace", + }, + }, + }, + { + name: "task execution ID", + rayJob: rayv1alpha1.RayJob{}, + logPlugin: tasklog.TemplateLogPlugin{ + DisplayName: "taskExecID", + TemplateURIs: []tasklog.TemplateURI{ + "http://test/projects/{{ .executionProject }}/domains/{{ .executionDomain }}/executions/{{ .executionName }}/nodeId/{{ .nodeID }}/taskId/{{ .taskID }}/attempt/{{ .taskRetryAttempt }}", + }, + Scheme: tasklog.TemplateSchemeTaskExecution, + }, + expectedTaskLogs: []*core.TaskLog{ + { + Name: "taskExecID", + Uri: "http://test/projects/my-execution-project/domains/my-execution-domain/executions/my-execution-name/nodeId/unique-node/taskId/my-task-name/attempt/1", + }, + }, + }, + { + name: "ray cluster name", + rayJob: rayv1alpha1.RayJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-namespace", + }, + Status: rayv1alpha1.RayJobStatus{ + RayClusterName: "ray-cluster", + }, + }, + logPlugin: tasklog.TemplateLogPlugin{ + DisplayName: "ray cluster name", + TemplateURIs: []tasklog.TemplateURI{"http://test/{{ .namespace }}/{{ .rayClusterName }}"}, + }, + expectedTaskLogs: []*core.TaskLog{ + { + Name: "ray cluster name", + Uri: "http://test/test-namespace/ray-cluster", + }, + }, + }, + { + name: "ray job ID", + rayJob: rayv1alpha1.RayJob{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "test-namespace", + }, + Status: rayv1alpha1.RayJobStatus{ + JobId: "ray-job-1", + }, + }, + logPlugin: tasklog.TemplateLogPlugin{ + DisplayName: "ray job ID", + TemplateURIs: []tasklog.TemplateURI{"http://test/{{ .namespace }}/{{ .rayJobID }}"}, + }, + expectedTaskLogs: []*core.TaskLog{ + { + Name: "ray job ID", + Uri: "http://test/test-namespace/ray-job-1", + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ti, err := getEventInfoForRayJob( + logs.LogConfig{Templates: []tasklog.TemplateLogPlugin{tc.logPlugin}}, + pluginCtx, + &tc.rayJob, + ) + assert.NoError(t, err) + assert.Equal(t, tc.expectedTaskLogs, ti.Logs) + }) + } +} + func TestGetEventInfo_DashboardURL(t *testing.T) { pluginCtx := newPluginContext() testCases := []struct {