Skip to content

Commit

Permalink
Add Retry information to Spark and Mozart (flyteorg#127)
Browse files Browse the repository at this point in the history
  • Loading branch information
catalinii authored Oct 9, 2020
1 parent 47b3de4 commit d83e033
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 9 deletions.
1 change: 1 addition & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -880,6 +880,7 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c h1:grhR+C34yXImVGp7EzNk+DTIk+323eIUWOmEevy6bDo=
gopkg.in/yaml.v3 v3.0.0-20200605160147-a5ece683394c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
11 changes: 6 additions & 5 deletions flyteplugins/go/tasks/plugins/hive/client/qubole_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ type QuboleCommandDetails struct {
}

type CommandMetadata struct {
TaskName string
Domain string
Project string
Labels map[string]string
RetryAttempt uint32
TaskName string
Domain string
Project string
Labels map[string]string
AttemptNumber uint32
MaxAttempts uint32
}

// QuboleClient API Request Body, meant to be passed into JSON.marshal
Expand Down
9 changes: 5 additions & 4 deletions flyteplugins/go/tasks/plugins/hive/execution_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,11 @@ func KickOffQuery(ctx context.Context, tCtx core.TaskExecutionContext, currentSt

taskExecutionIdentifier := tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetID()
commandMetadata := client.CommandMetadata{TaskName: taskName,
Domain: taskExecutionIdentifier.GetTaskId().GetDomain(),
Project: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetProject(),
Labels: tCtx.TaskExecutionMetadata().GetLabels(),
RetryAttempt: taskExecutionIdentifier.GetRetryAttempt(),
Domain: taskExecutionIdentifier.GetTaskId().GetDomain(),
Project: taskExecutionIdentifier.GetNodeExecutionId().GetExecutionId().GetProject(),
Labels: tCtx.TaskExecutionMetadata().GetLabels(),
AttemptNumber: taskExecutionIdentifier.GetRetryAttempt(),
MaxAttempts: tCtx.TaskExecutionMetadata().GetMaxAttempts(),
}

cmdDetails, err := quboleClient.ExecuteHiveCommand(ctx, query, timeoutSec,
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go/tasks/plugins/hive/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func GetMockTaskExecutionMetadata() core.TaskExecutionMetadata {
taskMetadata.On("GetNamespace").Return("test-namespace")
taskMetadata.On("GetAnnotations").Return(map[string]string{"annotation-1": "val1"})
taskMetadata.On("GetLabels").Return(map[string]string{"label-1": "val1"})
taskMetadata.On("GetMaxAttempts").Return(uint32(1))
taskMetadata.On("GetOwnerReference").Return(metav1.OwnerReference{
Kind: "node",
Name: "blah",
Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package spark
import (
"context"
"fmt"
"strconv"

"github.com/lyft/flyteplugins/go/tasks/pluginmachinery"
"github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s"
Expand Down Expand Up @@ -111,6 +112,7 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo
for _, envVar := range envVars {
sparkEnvVars[envVar.Name] = envVar.Value
}
sparkEnvVars["FLYTE_MAX_ATTEMPTS"] = strconv.Itoa(int(taskCtx.TaskExecutionMetadata().GetMaxAttempts()))

driverSpec := sparkOp.DriverSpec{
SparkPodSpec: sparkOp.SparkPodSpec{
Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func dummySparkTaskContext(taskTemplate *core.TaskTemplate) pluginsCore.TaskExec
Name: "blah",
})
taskExecutionMetadata.On("IsInterruptible").Return(true)
taskExecutionMetadata.On("GetMaxAttempts").Return(uint32(1))
taskCtx.On("TaskExecutionMetadata").Return(taskExecutionMetadata)
return taskCtx
}
Expand Down Expand Up @@ -327,6 +328,7 @@ func TestBuildResourceSpark(t *testing.T) {
assert.Equal(t, dummySparkConf["spark.driver.cores"], sparkApp.Spec.SparkConf["spark.kubernetes.driver.limit.cores"])
assert.Equal(t, dummySparkConf["spark.executor.cores"], sparkApp.Spec.SparkConf["spark.kubernetes.executor.limit.cores"])

assert.Equal(t, len(sparkApp.Spec.Driver.EnvVars["FLYTE_MAX_ATTEMPTS"]), 1)
// Case2: Invalid Spark Task-Template
taskTemplate.Custom = nil
resource, err = sparkResourceHandler.BuildResource(context.TODO(), dummySparkTaskContext(taskTemplate))
Expand Down

0 comments on commit d83e033

Please sign in to comment.