diff --git a/flytepropeller/cmd/controller/main.go b/flytepropeller/cmd/controller/main.go index 4c1b828275..8146db9025 100644 --- a/flytepropeller/cmd/controller/main.go +++ b/flytepropeller/cmd/controller/main.go @@ -7,10 +7,18 @@ import ( _ "github.com/lyft/flyteplugins/go/tasks/plugins/k8s/container" _ "github.com/lyft/flyteplugins/go/tasks/plugins/k8s/sidecar" _ "github.com/lyft/flyteplugins/go/tasks/plugins/k8s/spark" + "github.com/lyft/flytestdlib/contextutils" + "github.com/lyft/flytestdlib/promutils/labeled" "github.com/lyft/flytepropeller/cmd/controller/cmd" + "github.com/lyft/flytepropeller/pkg/controller" ) +func init() { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, + contextutils.TaskIDKey, controller.ResourceVersion) +} + func main() { cmd.Execute() } diff --git a/flytepropeller/cmd/kubectl-flyte/main.go b/flytepropeller/cmd/kubectl-flyte/main.go index 8e66392cc2..7995dc7520 100644 --- a/flytepropeller/cmd/kubectl-flyte/main.go +++ b/flytepropeller/cmd/kubectl-flyte/main.go @@ -4,10 +4,19 @@ import ( "fmt" "os" + "github.com/lyft/flytepropeller/pkg/controller" + "github.com/lyft/flytepropeller/cmd/kubectl-flyte/cmd" + "github.com/lyft/flytestdlib/contextutils" + "github.com/lyft/flytestdlib/promutils/labeled" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) +func init() { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, + contextutils.TaskIDKey, controller.ResourceVersion) +} + func main() { rootCmd := cmd.NewFlyteCommand() diff --git a/flytepropeller/pkg/controller/handler.go b/flytepropeller/pkg/controller/handler.go index dbc337fc49..2a75473f14 100644 --- a/flytepropeller/pkg/controller/handler.go +++ b/flytepropeller/pkg/controller/handler.go @@ -105,6 +105,7 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { wfDeepCopy := w.DeepCopy() t.Stop() ctx = contextutils.WithWorkflowID(ctx, wfDeepCopy.GetID()) + ctx = context.WithValue(ctx, ResourceVersion, wfDeepCopy.GetResourceVersion()) maxRetries := uint32(p.cfg.MaxWorkflowRetries) if IsDeleted(wfDeepCopy) || (wfDeepCopy.Status.FailedAttempts > maxRetries) { diff --git a/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go b/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go index 71ae893dc5..6a951a848b 100644 --- a/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/dynamic/handler_test.go @@ -5,6 +5,9 @@ import ( "fmt" "testing" + "github.com/lyft/flytestdlib/contextutils" + "github.com/lyft/flytestdlib/promutils/labeled" + "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" "github.com/lyft/flytestdlib/promutils" @@ -422,3 +425,8 @@ func Test_dynamicNodeHandler_Handle_SubTask(t *testing.T) { }) } } + +func init() { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, + contextutils.TaskIDKey) +} diff --git a/flytepropeller/pkg/controller/nodes/executor.go b/flytepropeller/pkg/controller/nodes/executor.go index 7a9af2e4cb..a70b0a2b20 100644 --- a/flytepropeller/pkg/controller/nodes/executor.go +++ b/flytepropeller/pkg/controller/nodes/executor.go @@ -396,16 +396,19 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork np = v1alpha1.NodePhaseFailed finalStatus = executors.NodeStatusFailed(fmt.Errorf(ToError(p.GetErr(), p.GetReason()))) } + if np == v1alpha1.NodePhaseTimingOut && !h.FinalizeRequired() { logger.Infof(ctx, "Finalize not required, moving node to TimedOut") np = v1alpha1.NodePhaseTimedOut finalStatus = executors.NodeStatusTimedOut } + if np == v1alpha1.NodePhaseSucceeding && !h.FinalizeRequired() { logger.Infof(ctx, "Finalize not required, moving node to Succeeded") np = v1alpha1.NodePhaseSucceeded finalStatus = executors.NodeStatusSuccess } + // If it is retryable failure, we do no want to send any events, as the node is essentially still running if np != nodeStatus.GetPhase() && np != v1alpha1.NodePhaseRetryableFailure { // assert np == skipped, succeeding or failing @@ -427,6 +430,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, w v1alpha1.ExecutableWork } } } + UpdateNodeStatus(np, p, nCtx.nsm, nodeStatus) return finalStatus, nil } @@ -703,3 +707,8 @@ func NewExecutor(ctx context.Context, defaultDeadlines config.DefaultDeadlines, exec.nodeHandlerFactory = nodeHandlerFactory return exec, err } + +func init() { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, + contextutils.TaskIDKey) +} diff --git a/flytepropeller/pkg/controller/nodes/task/handler_test.go b/flytepropeller/pkg/controller/nodes/task/handler_test.go index 5536a9cbae..e27c00e302 100644 --- a/flytepropeller/pkg/controller/nodes/task/handler_test.go +++ b/flytepropeller/pkg/controller/nodes/task/handler_test.go @@ -7,6 +7,9 @@ import ( "testing" "time" + "github.com/lyft/flytestdlib/contextutils" + "github.com/lyft/flytestdlib/promutils/labeled" + "github.com/lyft/flyteidl/clients/go/events" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/core" "github.com/lyft/flyteidl/gen/pb-go/flyteidl/event" @@ -1298,3 +1301,8 @@ func TestNew(t *testing.T) { assert.NotNil(t, got.metrics) assert.Equal(t, got.pluginRegistry, pluginmachinery.PluginRegistry()) } + +func init() { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, + contextutils.TaskIDKey) +} diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go index 59e5acd4c9..9e618e0bde 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -376,7 +376,3 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry kubeClient: iCtx.KubeClient(), }, nil } - -func init() { - labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey) -} diff --git a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go index c4853b2250..983b52e149 100644 --- a/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/flytepropeller/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -6,6 +6,9 @@ import ( "fmt" "testing" + "github.com/lyft/flytestdlib/contextutils" + "github.com/lyft/flytestdlib/promutils/labeled" + "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery/io" k8serrors "k8s.io/apimachinery/pkg/api/errors" @@ -454,3 +457,7 @@ func TestAddObjectMetadata(t *testing.T) { }, o.GetAnnotations()) assert.Equal(t, l, o.GetLabels()) } + +func init() { + labeled.SetMetricKeys(contextutils.NamespaceKey) +} diff --git a/flytepropeller/pkg/controller/workers.go b/flytepropeller/pkg/controller/workers.go index 7b6303c818..76bac2b1a1 100644 --- a/flytepropeller/pkg/controller/workers.go +++ b/flytepropeller/pkg/controller/workers.go @@ -14,6 +14,10 @@ import ( "k8s.io/client-go/tools/cache" ) +const ( + ResourceVersion contextutils.Key = "rv_ver" +) + type Handler interface { // Initialize the Handler Initialize(ctx context.Context) error diff --git a/flytepropeller/pkg/controller/workflow/executor_test.go b/flytepropeller/pkg/controller/workflow/executor_test.go index 6d88122063..b205ef3f90 100644 --- a/flytepropeller/pkg/controller/workflow/executor_test.go +++ b/flytepropeller/pkg/controller/workflow/executor_test.go @@ -9,6 +9,9 @@ import ( "testing" "time" + "github.com/lyft/flytestdlib/contextutils" + "github.com/lyft/flytestdlib/promutils/labeled" + "k8s.io/apimachinery/pkg/util/sets" "github.com/lyft/flyteplugins/go/tasks/pluginmachinery" @@ -88,6 +91,11 @@ func (f fakeRemoteWritePlugin) Handle(ctx context.Context, tCtx pluginCore.TaskE return trns, err } +func init() { + labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, + contextutils.TaskIDKey) +} + func createInmemoryDataStore(t testing.TB, scope promutils.Scope) *storage.DataStore { cfg := storage.Config{ Type: storage.TypeMemory,