Skip to content

Commit

Permalink
[BUG] Retry fetching subworkflow output data on failure (#4602)
Browse files Browse the repository at this point in the history
* bubble up errors on remote store reads

Signed-off-by: Paul Dittamo <[email protected]>

* error when closure output data exceeds max size

Signed-off-by: Paul Dittamo <[email protected]>

* remove urlblob from getouputs return

Signed-off-by: Paul Dittamo <[email protected]>

* remove signedurl logic on fetching outputs

Signed-off-by: Paul Dittamo <[email protected]>

* cleanup no longer used param

Signed-off-by: Paul Dittamo <[email protected]>

* remove legacy unit test after removing legacy utility

Signed-off-by: Paul Dittamo <[email protected]>

* revert changes to getinputs

Signed-off-by: Paul Dittamo <[email protected]>

* revert fetch input unit tests after reverting getinputs changes

Signed-off-by: Paul Dittamo <[email protected]>

* remove inputsURLBlob from GetInputs returns

Signed-off-by: Paul Dittamo <[email protected]>

* attempt remote storage output retrieval if fetching subworkflow execution data fails

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

* cleanup url blob set up in unit tests

Signed-off-by: Paul Dittamo <[email protected]>

* add unit tests

Signed-off-by: Paul Dittamo <[email protected]>

* lint

Signed-off-by: Paul Dittamo <[email protected]>

* revert admin changes to make in another PR

Signed-off-by: Paul Dittamo <[email protected]>

* retry fetching subworkflow output data if fulloutputs are not set

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Paul Dittamo <[email protected]>
  • Loading branch information
pvditt authored Jan 16, 2024
1 parent 38d1833 commit 136ac8d
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 58 deletions.
23 changes: 12 additions & 11 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,10 +324,21 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
logger.Errorf(ctx, "failed to initialize Admin client, err :%s", err.Error())
return nil, err
}

sCfg := storage.GetConfig()
if sCfg == nil {
logger.Errorf(ctx, "Storage configuration missing.")
}

store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore"))
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Metadata storage")
}

var launchPlanActor launchplan.FlyteAdmin
if cfg.EnableAdminLauncher {
launchPlanActor, err = launchplan.NewAdminLaunchPlanExecutor(ctx, adminClient, cfg.DownstreamEval.Duration,
launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher"))
launchplan.GetAdminConfig(), scope.NewSubScope("admin_launcher"), store)
if err != nil {
logger.Errorf(ctx, "failed to create Admin workflow Launcher, err: %v", err.Error())
return nil, err
Expand Down Expand Up @@ -401,16 +412,6 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter

flytek8s.DefaultPodTemplateStore.SetDefaultNamespace(podNamespace)

sCfg := storage.GetConfig()
if sCfg == nil {
logger.Errorf(ctx, "Storage configuration missing.")
}

store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore"))
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Metadata storage")
}

logger.Info(ctx, "Setting up Catalog client.")
catalogClient, err := catalog.NewCatalogClient(ctx, authOpts...)
if err != nil {
Expand Down
43 changes: 28 additions & 15 deletions flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
stdErr "github.com/flyteorg/flyte/flytestdlib/errors"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

var isRecovery = true
Expand All @@ -33,6 +34,7 @@ func IsWorkflowTerminated(p core.WorkflowExecution_Phase) bool {
type adminLaunchPlanExecutor struct {
adminClient service.AdminServiceClient
cache cache.AutoRefresh
store *storage.DataStore
}

type executionCacheItem struct {
Expand Down Expand Up @@ -258,29 +260,39 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
continue
}

var outputs *core.LiteralMap
var outputs = &core.LiteralMap{}
// Retrieve potential outputs only when the workflow succeeded.
// TODO: We can optimize further by only retrieving the outputs when the workflow has output variables in the
// interface.
if res.GetClosure().GetPhase() == core.WorkflowExecution_SUCCEEDED {
execData, err := a.adminClient.GetExecutionData(ctx, &admin.WorkflowExecutionGetDataRequest{
Id: &exec.WorkflowExecutionIdentifier,
})
if err != nil || execData.GetFullOutputs() == nil || execData.GetFullOutputs().GetLiterals() == nil {
outputURI := res.GetClosure().GetOutputs().GetUri()
// attempt remote storage read on GetExecutionData failure
if outputURI != "" {
err = a.store.ReadProtobuf(ctx, storage.DataReference(outputURI), outputs)
if err != nil {
logger.Errorf(ctx, "Failed to read outputs from URI [%s] with err: %v", outputURI, err)
}
}
if err != nil {
resp = append(resp, cache.ItemSyncResponse{
ID: obj.GetID(),
Item: executionCacheItem{
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
SyncError: err,
},
Action: cache.Update,
})

continue
}

if err != nil {
resp = append(resp, cache.ItemSyncResponse{
ID: obj.GetID(),
Item: executionCacheItem{
WorkflowExecutionIdentifier: exec.WorkflowExecutionIdentifier,
SyncError: err,
},
Action: cache.Update,
})

continue
} else {
outputs = execData.GetFullOutputs()
}

outputs = execData.GetFullOutputs()
}

// Update the cache with the retrieved status
Expand All @@ -299,9 +311,10 @@ func (a *adminLaunchPlanExecutor) syncItem(ctx context.Context, batch cache.Batc
}

func NewAdminLaunchPlanExecutor(_ context.Context, client service.AdminServiceClient,
syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope) (FlyteAdmin, error) {
syncPeriod time.Duration, cfg *AdminConfig, scope promutils.Scope, store *storage.DataStore) (FlyteAdmin, error) {
exec := &adminLaunchPlanExecutor{
adminClient: client,
store: store,
}

rateLimiter := &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(cfg.TPS), cfg.Burst)}
Expand Down
Loading

0 comments on commit 136ac8d

Please sign in to comment.