diff --git a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go index 46b190520..d96a87d14 100644 --- a/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -64,6 +64,11 @@ type FlyteWorkflow struct { // so that it can be used downstream without any confusion. // This field is here because it's easier to put it here than pipe through a new object through all of propeller. DataReferenceConstructor storage.ReferenceConstructor `json:"-"` + + // WorkflowClosureReference is the location containing an offloaded WorkflowClosure. This is used to offload + // portions of the CRD to an external data store to reduce CRD size. If this exists, FlytePropeller must retrieve + // and parse the static data prior to processing. + WorkflowClosureReference DataReference `json:"workflowClosureReference,omitempty"` } func (in *FlyteWorkflow) GetSecurityContext() core.SecurityContext { diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go index 32ccc0bf0..8c72c3eea 100644 --- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go +++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go @@ -277,3 +277,45 @@ func buildConnections(w *core.CompiledWorkflow) v1alpha1.Connections { res.Upstream = toMapOfLists(w.GetConnections().GetUpstream()) return res } + +type WfClosureCrdFields struct { + *v1alpha1.WorkflowSpec `json:"spec"` + SubWorkflows map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec `json:"subWorkflows,omitempty"` + Tasks map[v1alpha1.TaskID]*v1alpha1.TaskSpec `json:"tasks"` +} + +func BuildWfClosureCrdFields(wfClosure *core.CompiledWorkflowClosure) (*WfClosureCrdFields, error) { + errs := errors.NewCompileErrors() + if wfClosure == nil { + errs.Collect(errors.NewValueRequiredErr("root", "wfClosure")) + return nil, errs + } + + primarySpec, err := buildFlyteWorkflowSpec(wfClosure.Primary, wfClosure.Tasks, errs.NewScope()) + if err != nil { + errs.Collect(errors.NewWorkflowBuildError(err)) + return nil, errs + } + + for _, t := range wfClosure.Tasks { + t.Template.Interface = StripInterfaceTypeMetadata(t.Template.Interface) + } + tasks := buildTasks(wfClosure.Tasks, errs.NewScope()) + + subwfs := make(map[v1alpha1.WorkflowID]*v1alpha1.WorkflowSpec, len(wfClosure.SubWorkflows)) + for _, subWf := range wfClosure.SubWorkflows { + spec, err := buildFlyteWorkflowSpec(subWf, wfClosure.Tasks, errs.NewScope()) + if err != nil { + errs.Collect(errors.NewWorkflowBuildError(err)) + } else { + subwfs[subWf.Template.Id.String()] = spec + } + } + + wfClosureCrdFields := &WfClosureCrdFields{ + WorkflowSpec: primarySpec, + SubWorkflows: subwfs, + Tasks: tasks, + } + return wfClosureCrdFields, nil +} diff --git a/flytepropeller/pkg/controller/controller.go b/flytepropeller/pkg/controller/controller.go index 2b40e3bd8..454835b12 100644 --- a/flytepropeller/pkg/controller/controller.go +++ b/flytepropeller/pkg/controller/controller.go @@ -443,7 +443,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter return nil, err } - handler := NewPropellerHandler(ctx, cfg, controller.workflowStore, workflowExecutor, scope) + handler := NewPropellerHandler(ctx, cfg, store, controller.workflowStore, workflowExecutor, scope) controller.workerPool = NewWorkerPool(ctx, scope, workQ, handler) if cfg.EnableGrpcLatencyMetrics { diff --git a/flytepropeller/pkg/controller/handler.go b/flytepropeller/pkg/controller/handler.go index 701205b9a..95fd46cb1 100644 --- a/flytepropeller/pkg/controller/handler.go +++ b/flytepropeller/pkg/controller/handler.go @@ -7,21 +7,23 @@ import ( "runtime/debug" "time" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/flyteorg/flytestdlib/contextutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" - eventsErr "github.com/flyteorg/flytepropeller/events/errors" "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s" "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/controller/executors" "github.com/flyteorg/flytepropeller/pkg/controller/workflowstore" + "github.com/flyteorg/flytestdlib/contextutils" "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" - "github.com/prometheus/client_golang/prometheus" + "github.com/flyteorg/flytestdlib/promutils/labeled" + "github.com/flyteorg/flytestdlib/storage" - "github.com/flyteorg/flytepropeller/pkg/controller/executors" + "github.com/prometheus/client_golang/prometheus" ) // TODO Lets move everything to use controller runtime @@ -35,6 +37,7 @@ type propellerMetrics struct { PanicObserved labeled.Counter RoundSkipped prometheus.Counter WorkflowNotFound prometheus.Counter + WorkflowClosureReadTime labeled.StopWatch StreakLength labeled.Counter RoundTime labeled.StopWatch } @@ -50,6 +53,7 @@ func newPropellerMetrics(scope promutils.Scope) *propellerMetrics { PanicObserved: labeled.NewCounter("panic", "Panic during handling or aborting workflow", roundScope, labeled.EmitUnlabeledMetric), RoundSkipped: roundScope.MustNewCounter("skipped", "Round Skipped because of stale workflow"), WorkflowNotFound: roundScope.MustNewCounter("not_found", "workflow not found in the cache"), + WorkflowClosureReadTime: labeled.NewStopWatch("closure_read", "Total time taken to read and parse the offloaded WorkflowClosure", time.Millisecond, roundScope, labeled.EmitUnlabeledMetric), StreakLength: labeled.NewCounter("streak_length", "Number of consecutive rounds used in fast follow mode", roundScope, labeled.EmitUnlabeledMetric), RoundTime: labeled.NewStopWatch("round_time", "Total time taken by one round traversing, copying and storing a workflow", time.Millisecond, roundScope, labeled.EmitUnlabeledMetric), } @@ -67,6 +71,7 @@ func RecordSystemError(w *v1alpha1.FlyteWorkflow, err error) *v1alpha1.FlyteWork // Core Propeller structure that houses the Reconciliation loop for Flytepropeller type Propeller struct { + store *storage.DataStore wfStore workflowstore.FlyteWorkflow workflowExecutor executors.Workflow metrics *propellerMetrics @@ -192,6 +197,32 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { return nil } } + + // if the FlyteWorkflow CRD has the WorkflowClosureReference set then we have offloaded the + // static fields to the blobstore to reduce CRD size. we must read and parse the workflow + // closure so that these fields may be temporarily repopulated. + var wfClosureCrdFields *k8s.WfClosureCrdFields + if len(w.WorkflowClosureReference) > 0 { + t := p.metrics.WorkflowClosureReadTime.Start(ctx) + + wfClosure := &admin.WorkflowClosure{} + err := p.store.ReadProtobuf(ctx, w.WorkflowClosureReference, wfClosure) + if err != nil { + t.Stop() + logger.Errorf(ctx, "Failed to retrieve workflow closure data from '%s' with error '%s'", w.WorkflowClosureReference, err) + return err + } + + wfClosureCrdFields, err = k8s.BuildWfClosureCrdFields(wfClosure.CompiledWorkflow) + if err != nil { + t.Stop() + logger.Errorf(ctx, "Failed to parse workflow closure data from '%s' with error '%s'", w.WorkflowClosureReference, err) + return err + } + + t.Stop() + } + streak := 0 defer p.metrics.StreakLength.Add(ctx, float64(streak)) @@ -201,8 +232,25 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { } for streak = 0; streak < maxLength; streak++ { + // if the wfClosureCrdFields struct is not nil then it contains static workflow data which + // has been offloaded to the blobstore. we must set these fields so they're available + // during workflow processing and immediately remove them afterwards so they do not + // accidentally get written to the workflow store once the new state is stored. + if wfClosureCrdFields != nil { + w.WorkflowSpec = wfClosureCrdFields.WorkflowSpec + w.Tasks = wfClosureCrdFields.Tasks + w.SubWorkflows = wfClosureCrdFields.SubWorkflows + } + t := p.metrics.RoundTime.Start(ctx) mutatedWf, err := p.TryMutateWorkflow(ctx, w) + + if wfClosureCrdFields != nil { + // strip data populated from WorkflowClosureReference + w.SubWorkflows, w.Tasks, w.WorkflowSpec = nil, nil, nil + mutatedWf.SubWorkflows, mutatedWf.Tasks, mutatedWf.WorkflowSpec = nil, nil, nil + } + if err != nil { // NOTE We are overriding the deepcopy here, as we are essentially ingnoring all mutations // We only want to increase failed attempts and discard any other partial changes to the CRD. @@ -319,11 +367,12 @@ func (p *Propeller) Handle(ctx context.Context, namespace, name string) error { } // NewPropellerHandler creates a new Propeller and initializes metrics -func NewPropellerHandler(_ context.Context, cfg *config.Config, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller { +func NewPropellerHandler(_ context.Context, cfg *config.Config, store *storage.DataStore, wfStore workflowstore.FlyteWorkflow, executor executors.Workflow, scope promutils.Scope) *Propeller { metrics := newPropellerMetrics(scope) return &Propeller{ metrics: metrics, + store: store, wfStore: wfStore, workflowExecutor: executor, cfg: cfg, diff --git a/flytepropeller/pkg/controller/handler_test.go b/flytepropeller/pkg/controller/handler_test.go index e957aeb51..b7892a73e 100644 --- a/flytepropeller/pkg/controller/handler_test.go +++ b/flytepropeller/pkg/controller/handler_test.go @@ -6,22 +6,25 @@ import ( "testing" "time" - "github.com/flyteorg/flytepropeller/pkg/controller/workflowstore/mocks" - "github.com/pkg/errors" - "github.com/stretchr/testify/mock" - + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" eventErrors "github.com/flyteorg/flytepropeller/events/errors" + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flytepropeller/pkg/controller/config" workflowErrors "github.com/flyteorg/flytepropeller/pkg/controller/workflow/errors" "github.com/flyteorg/flytepropeller/pkg/controller/workflowstore" + "github.com/flyteorg/flytepropeller/pkg/controller/workflowstore/mocks" "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/storage" + storagemocks "github.com/flyteorg/flytestdlib/storage/mocks" + + "github.com/pkg/errors" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" - "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) type mockExecutor struct { @@ -50,7 +53,7 @@ func TestPropeller_Handle(t *testing.T) { MaxWorkflowRetries: 0, } - p := NewPropellerHandler(ctx, cfg, s, exec, scope) + p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope) const namespace = "test" const name = "123" @@ -62,7 +65,7 @@ func TestPropeller_Handle(t *testing.T) { scope := promutils.NewTestScope() s := &mocks.FlyteWorkflow{} exec := &mockExecutor{} - p := NewPropellerHandler(ctx, cfg, s, exec, scope) + p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope) s.OnGetMatch(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.Wrap(workflowstore.ErrStaleWorkflowError, "stale")).Once() assert.NoError(t, p.Handle(ctx, namespace, name)) }) @@ -537,7 +540,7 @@ func TestPropeller_Handle_TurboMode(t *testing.T) { const namespace = "test" const name = "123" - p := NewPropellerHandler(ctx, cfg, s, exec, scope) + p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope) t.Run("error", func(t *testing.T) { assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ @@ -739,7 +742,7 @@ func TestPropellerHandler_Initialize(t *testing.T) { MaxWorkflowRetries: 0, } - p := NewPropellerHandler(ctx, cfg, s, exec, scope) + p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope) assert.NoError(t, p.Initialize(ctx)) } @@ -757,7 +760,7 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) { scope := promutils.NewTestScope() s := &mocks.FlyteWorkflow{} exec := &mockExecutor{} - p := NewPropellerHandler(ctx, cfg, s, exec, scope) + p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope) wf := &v1alpha1.FlyteWorkflow{ ObjectMeta: v1.ObjectMeta{ Name: name, @@ -778,7 +781,7 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) { scope := promutils.NewTestScope() s := &mocks.FlyteWorkflow{} exec := &mockExecutor{} - p := NewPropellerHandler(ctx, cfg, s, exec, scope) + p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope) wf := &v1alpha1.FlyteWorkflow{ ObjectMeta: v1.ObjectMeta{ Name: name, @@ -799,7 +802,7 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) { scope := promutils.NewTestScope() s := &mocks.FlyteWorkflow{} exec := &mockExecutor{} - p := NewPropellerHandler(ctx, cfg, s, exec, scope) + p := NewPropellerHandler(ctx, cfg, nil, s, exec, scope) wf := &v1alpha1.FlyteWorkflow{ ObjectMeta: v1.ObjectMeta{ Name: name, @@ -821,3 +824,74 @@ func TestNewPropellerHandler_UpdateFailure(t *testing.T) { assert.NoError(t, err) }) } + +func TestPropellerHandler_OffloadedWorkflowClosure(t *testing.T) { + ctx := context.TODO() + + const name = "123" + const namespace = "test" + + s := workflowstore.NewInMemoryWorkflowStore() + assert.NoError(t, s.Create(ctx, &v1alpha1.FlyteWorkflow{ + ObjectMeta: v1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + WorkflowClosureReference: "some-file-location", + })) + + exec := &mockExecutor{} + exec.HandleCb = func(ctx context.Context, w *v1alpha1.FlyteWorkflow) error { + w.GetExecutionStatus().UpdatePhase(v1alpha1.WorkflowPhaseSucceeding, "done", nil) + return nil + } + + cfg := &config.Config{ + MaxWorkflowRetries: 0, + } + + t.Run("Happy", func(t *testing.T) { + scope := promutils.NewTestScope() + + protoStore := &storagemocks.ComposedProtobufStore{} + protoStore.OnReadProtobufMatch(mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + // populate mock CompiledWorkflowClosure that satisfies just enough to compile + wfClosure := args.Get(2) + assert.NotNil(t, wfClosure) + casted := wfClosure.(*admin.WorkflowClosure) + casted.CompiledWorkflow = &core.CompiledWorkflowClosure{ + Primary: &core.CompiledWorkflow{ + Template: &core.WorkflowTemplate{ + Id: &core.Identifier{}, + }, + }, + } + }).Return(nil) + dataStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, protoStore) + p := NewPropellerHandler(ctx, cfg, dataStore, s, exec, scope) + + assert.NoError(t, p.Handle(ctx, namespace, name)) + + r, err := s.Get(ctx, namespace, name) + assert.NoError(t, err) + assert.Equal(t, v1alpha1.WorkflowPhaseSucceeding, r.GetExecutionStatus().GetPhase()) + assert.Equal(t, 1, len(r.Finalizers)) + assert.False(t, HasCompletedLabel(r)) + assert.Equal(t, uint32(0), r.Status.FailedAttempts) + assert.Nil(t, r.WorkflowSpec) + assert.Nil(t, r.SubWorkflows) + assert.Nil(t, r.Tasks) + }) + + t.Run("Error", func(t *testing.T) { + scope := promutils.NewTestScope() + + protoStore := &storagemocks.ComposedProtobufStore{} + protoStore.OnReadProtobufMatch(mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("foo")) + dataStore := storage.NewCompositeDataStore(storage.URLPathConstructor{}, protoStore) + p := NewPropellerHandler(ctx, cfg, dataStore, s, exec, scope) + + err := p.Handle(ctx, namespace, name) + assert.Error(t, err) + }) +} diff --git a/flytepropeller/pkg/controller/workflowstore/passthrough.go b/flytepropeller/pkg/controller/workflowstore/passthrough.go index a72e0bc4a..8e5b03f60 100644 --- a/flytepropeller/pkg/controller/workflowstore/passthrough.go +++ b/flytepropeller/pkg/controller/workflowstore/passthrough.go @@ -12,6 +12,7 @@ import ( "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" "github.com/prometheus/client_golang/prometheus" + kubeerrors "k8s.io/apimachinery/pkg/api/errors" )