Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Merge branch 'master' into extend-webhook-self-signed-cert-expiration…
Browse files Browse the repository at this point in the history
…-date-to-99-years
  • Loading branch information
eapolinario authored Mar 6, 2023
2 parents 3aa0fa8 + 3c3ec33 commit 0abbdb8
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 55 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ require (
github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1
github.com/fatih/color v1.13.0
github.com/flyteorg/flyteidl v1.3.7
github.com/flyteorg/flyteplugins v1.0.37
github.com/flyteorg/flyteidl v1.3.9
github.com/flyteorg/flyteplugins v1.0.40
github.com/flyteorg/flytestdlib v1.0.15
github.com/ghodss/yaml v1.0.0
github.com/go-redis/redis v6.15.7+incompatible
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv
github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/flyteidl v1.3.7 h1:MA7kOqMr/TmPlYPvJZwfsl+CYneuDOJ+kEKx2DocLhE=
github.com/flyteorg/flyteidl v1.3.7/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.37 h1:TRgsZaGn5JhHBOsfLVU1kNg+TPFiuxbItC1wFA4nmgU=
github.com/flyteorg/flyteplugins v1.0.37/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio=
github.com/flyteorg/flyteidl v1.3.9 h1:MHUa89yKwCz58mQC2OxTzYjr0d3fA14qKG462v+RAyk=
github.com/flyteorg/flyteidl v1.3.9/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
github.com/flyteorg/flyteplugins v1.0.40 h1:RTsYingqmqr13qBbi4CB2ArXDHNHUOkAF+HTLJQiQ/s=
github.com/flyteorg/flyteplugins v1.0.40/go.mod h1:qyUPqVspLcLGJpKxVwHDWf+kBpOGuItOxCaF6zAmDio=
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/nodes/dynamic/dynamic_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type dynamicWorkflowContext struct {
subWorkflowClosure *core.CompiledWorkflowClosure
nodeLookup executors.NodeLookup
isDynamic bool
dynamicJobSpecURI string
}

const dynamicWfNameTemplate = "dynamic_%s"
Expand Down Expand Up @@ -183,6 +184,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
subWorkflowClosure: workflowCacheContents.CompiledWorkflow,
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), compiledWf, compiledWf, newParentInfo, nCtx.ExecutionContext()),
nodeLookup: executors.NewNodeLookup(compiledWf, dynamicNodeStatus),
dynamicJobSpecURI: string(f.GetLoc()),
}, nil
}
}
Expand Down Expand Up @@ -215,6 +217,7 @@ func (d dynamicNodeTaskNodeHandler) buildContextualDynamicWorkflow(ctx context.C
subWorkflowClosure: closure,
execContext: executors.NewExecutionContext(nCtx.ExecutionContext(), dynamicWf, dynamicWf, newParentInfo, nCtx.ExecutionContext()),
nodeLookup: executors.NewNodeLookup(dynamicWf, dynamicNodeStatus),
dynamicJobSpecURI: string(f.GetLoc()),
}, nil
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/nodes/dynamic/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ func (d dynamicNodeTaskNodeHandler) produceDynamicWorkflow(ctx context.Context,
taskNodeInfoMetadata := &event.TaskNodeMetadata{}
if dCtx.subWorkflowClosure != nil && dCtx.subWorkflowClosure.Primary != nil && dCtx.subWorkflowClosure.Primary.Template != nil {
taskNodeInfoMetadata.DynamicWorkflow = &event.DynamicWorkflowNodeMetadata{
Id: dCtx.subWorkflowClosure.Primary.Template.Id,
CompiledWorkflow: dCtx.subWorkflowClosure,
Id: dCtx.subWorkflowClosure.Primary.Template.Id,
CompiledWorkflow: dCtx.subWorkflowClosure,
DynamicJobSpecUri: dCtx.dynamicJobSpecURI,
}
}

Expand Down
142 changes: 95 additions & 47 deletions pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,39 @@ import (
"fmt"
"time"

"github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/flyteorg/flytepropeller/pkg/controller/nodes/common"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"
errors2 "github.com/flyteorg/flytestdlib/errors"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"

"github.com/flyteorg/flytepropeller/events"
eventsErr "github.com/flyteorg/flytepropeller/events/errors"
"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/common"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/task"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog"
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils"

"github.com/flyteorg/flytestdlib/contextutils"
errors2 "github.com/flyteorg/flytestdlib/errors"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/flyteorg/flytestdlib/promutils/labeled"
"github.com/flyteorg/flytestdlib/storage"

"github.com/golang/protobuf/ptypes"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/flyteorg/flytepropeller/pkg/controller/config"

"github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flytepropeller/pkg/controller/executors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/errors"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler"
"github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan"
)

type nodeMetrics struct {
Expand Down Expand Up @@ -156,6 +155,35 @@ func (c *nodeExecutor) IdempotentRecordEvent(ctx context.Context, nodeEvent *eve
return err
}

func (c *nodeExecutor) recoverInputs(ctx context.Context, nCtx handler.NodeExecutionContext,
recovered *admin.NodeExecution, recoveredData *admin.NodeExecutionGetDataResponse) (*core.LiteralMap, error) {

nodeInputs := recoveredData.FullInputs
if nodeInputs != nil {
if err := c.store.WriteProtobuf(ctx, nCtx.InputReader().GetInputPath(), storage.Options{}, nodeInputs); err != nil {
c.metrics.InputsWriteFailure.Inc(ctx)
logger.Errorf(ctx, "Failed to move recovered inputs for Node. Error [%v]. InputsFile [%s]", err, nCtx.InputReader().GetInputPath())
return nil, errors.Wrapf(errors.StorageError, nCtx.NodeID(), err, "Failed to store inputs for Node. InputsFile [%s]", nCtx.InputReader().GetInputPath())
}
} else if len(recovered.InputUri) > 0 {
// If the inputs are too large they won't be returned inline in the RecoverData call. We must fetch them before copying them.
nodeInputs = &core.LiteralMap{}
if recoveredData.FullInputs == nil {
if err := c.store.ReadProtobuf(ctx, storage.DataReference(recovered.InputUri), nodeInputs); err != nil {
return nil, errors.Wrapf(errors.InputsNotFoundError, nCtx.NodeID(), err, "failed to read data from dataDir [%v].", recovered.InputUri)
}
}

if err := c.store.WriteProtobuf(ctx, nCtx.InputReader().GetInputPath(), storage.Options{}, nodeInputs); err != nil {
c.metrics.InputsWriteFailure.Inc(ctx)
logger.Errorf(ctx, "Failed to move recovered inputs for Node. Error [%v]. InputsFile [%s]", err, nCtx.InputReader().GetInputPath())
return nil, errors.Wrapf(errors.StorageError, nCtx.NodeID(), err, "Failed to store inputs for Node. InputsFile [%s]", nCtx.InputReader().GetInputPath())
}
}

return nodeInputs, nil
}

func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.PhaseInfo, error) {
fullyQualifiedNodeID := nCtx.NodeExecutionMetadata().GetNodeExecutionID().NodeId
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
Expand Down Expand Up @@ -205,6 +233,44 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe
nCtx.NodeExecutionMetadata().GetNodeExecutionID(), err)
}
}

// if this node is a dynamic task we attempt to recover the compiled workflow from instances where the parent
// task succeeded but the dynamic task did not complete. this is important to ensure correctness since node ids
// within the compiled closure may not be generated deterministically.
if recovered.Metadata != nil && recovered.Metadata.IsDynamic && len(recovered.Closure.DynamicJobSpecUri) > 0 {
// recover node inputs
recoveredData, err := c.recoveryClient.RecoverNodeExecutionData(ctx,
nCtx.ExecutionContext().GetExecutionConfig().RecoveryExecution.WorkflowExecutionIdentifier, fullyQualifiedNodeID)
if err != nil || recoveredData == nil {
return handler.PhaseInfoUndefined, nil
}

if _, err := c.recoverInputs(ctx, nCtx, recovered, recoveredData); err != nil {
return handler.PhaseInfoUndefined, err
}

// copy previous DynamicJobSpec file
f, err := task.NewRemoteFutureFileReader(ctx, nCtx.NodeStatus().GetOutputDir(), nCtx.DataStore())
if err != nil {
return handler.PhaseInfoUndefined, err
}

dynamicJobSpecReference := storage.DataReference(recovered.Closure.DynamicJobSpecUri)
if err := nCtx.DataStore().CopyRaw(ctx, dynamicJobSpecReference, f.GetLoc(), storage.Options{}); err != nil {
return handler.PhaseInfoUndefined, errors.Wrapf(errors.StorageError, nCtx.NodeID(), err,
"failed to store dynamic job spec for node. source file [%s] destination file [%s]", dynamicJobSpecReference, f.GetLoc())
}

// transition node phase to 'Running' and dynamic task phase to 'DynamicNodePhaseParentFinalized'
state := nCtx.NodeStateReader().GetDynamicNodeState()
state.Phase = v1alpha1.DynamicNodePhaseParentFinalized
if err := nCtx.NodeStateWriter().PutDynamicNodeState(state); err != nil {
return handler.PhaseInfoUndefined, errors.Wrapf(errors.UnknownError, nCtx.NodeID(), err, "failed to store dynamic node state")
}

return handler.PhaseInfoRunning(&handler.ExecutionInfo{}), nil
}

logger.Debugf(ctx, "Node [%+v] phase [%v] is not recoverable", nCtx.NodeExecutionMetadata().GetNodeExecutionID(), recovered.Closure.Phase)
return handler.PhaseInfoUndefined, nil
}
Expand All @@ -222,31 +288,13 @@ func (c *nodeExecutor) attemptRecovery(ctx context.Context, nCtx handler.NodeExe
logger.Warnf(ctx, "call to attemptRecovery node [%+v] data returned no error but also no data", nCtx.NodeExecutionMetadata().GetNodeExecutionID())
return handler.PhaseInfoUndefined, nil
}
nodeInputs := recoveredData.GetFullInputs()
// Copy inputs to this node's expected location
if nodeInputs != nil {
if err = c.store.WriteProtobuf(ctx, nCtx.InputReader().GetInputPath(), storage.Options{}, recoveredData.FullInputs); err != nil {
c.metrics.InputsWriteFailure.Inc(ctx)
logger.Errorf(ctx, "Failed to move recovered inputs for Node. Error [%v]. InputsFile [%s]", err, nCtx.InputReader().GetInputPath())
return handler.PhaseInfoUndefined, errors.Wrapf(
errors.StorageError, nCtx.NodeID(), err, "Failed to store inputs for Node. InputsFile [%s]", nCtx.InputReader().GetInputPath())
}
} else if len(recovered.InputUri) > 0 {
// If the inputs are too large they won't be returned inline in the RecoverData call. We must fetch them before copying them.
nodeInputs = &core.LiteralMap{}
if recoveredData.FullInputs == nil {
if err := c.store.ReadProtobuf(ctx, storage.DataReference(recovered.InputUri), nodeInputs); err != nil {
return handler.PhaseInfoUndefined, errors.Wrapf(errors.InputsNotFoundError, nCtx.NodeID(), err, "failed to read data from dataDir [%v].", recovered.InputUri)
}
}

if err := c.store.WriteProtobuf(ctx, nCtx.InputReader().GetInputPath(), storage.Options{}, nodeInputs); err != nil {
c.metrics.InputsWriteFailure.Inc(ctx)
logger.Errorf(ctx, "Failed to move recovered inputs for Node. Error [%v]. InputsFile [%s]", err, nCtx.InputReader().GetInputPath())
return handler.PhaseInfoUndefined, errors.Wrapf(
errors.StorageError, nCtx.NodeID(), err, "Failed to store inputs for Node. InputsFile [%s]", nCtx.InputReader().GetInputPath())
}
// Copy inputs to this node's expected location
nodeInputs, err := c.recoverInputs(ctx, nCtx, recovered, recoveredData)
if err != nil {
return handler.PhaseInfoUndefined, err
}

// Similarly, copy outputs' reference
so := storage.Options{}
var outputs = &core.LiteralMap{}
Expand Down Expand Up @@ -334,7 +382,7 @@ func (c *nodeExecutor) preExecute(ctx context.Context, dag executors.DAGStructur
if !node.IsStartNode() {
if nCtx.ExecutionContext().GetExecutionConfig().RecoveryExecution.WorkflowExecutionIdentifier != nil {
phaseInfo, err := c.attemptRecovery(ctx, nCtx)
if err != nil || phaseInfo.GetPhase() == handler.EPhaseRecovered {
if err != nil || phaseInfo.GetPhase() != handler.EPhaseUndefined {
return phaseInfo, err
}
}
Expand Down Expand Up @@ -676,7 +724,7 @@ func (c *nodeExecutor) handleRetryableFailure(ctx context.Context, nCtx *nodeExe
// NOTE: It is important to increment attempts only after abort has been called. Increment attempt mutates the state
// Attempt is used throughout the system to determine the idempotent resource version.
nodeStatus.IncrementAttempts()
nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, v1.Now(), "retrying", nil)
nodeStatus.UpdatePhase(v1alpha1.NodePhaseRunning, metav1.Now(), "retrying", nil)
// We are going to retry in the next round, so we should clear all current state
nodeStatus.ClearSubNodeStatus()
nodeStatus.ClearTaskStatus()
Expand Down Expand Up @@ -710,7 +758,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur
if err := c.abort(ctx, h, nCtx, "node failing"); err != nil {
return executors.NodeStatusUndefined, err
}
nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, v1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError())
nodeStatus.UpdatePhase(v1alpha1.NodePhaseFailed, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError())
c.metrics.FailureDuration.Observe(ctx, nodeStatus.GetStartedAt().Time, nodeStatus.GetStoppedAt().Time)
if nCtx.md.IsInterruptible() {
c.metrics.InterruptibleNodesTerminated.Inc(ctx)
Expand All @@ -725,7 +773,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur
}

nodeStatus.ClearSubNodeStatus()
nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, v1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError())
nodeStatus.UpdatePhase(v1alpha1.NodePhaseTimedOut, metav1.Now(), nodeStatus.GetMessage(), nodeStatus.GetExecutionError())
c.metrics.TimedOutFailure.Inc(ctx)
if nCtx.md.IsInterruptible() {
c.metrics.InterruptibleNodesTerminated.Inc(ctx)
Expand All @@ -738,7 +786,7 @@ func (c *nodeExecutor) handleNode(ctx context.Context, dag executors.DAGStructur
if err := c.finalize(ctx, h, nCtx); err != nil {
return executors.NodeStatusUndefined, err
}
t := v1.Now()
t := metav1.Now()

started := nodeStatus.GetStartedAt()
if started == nil {
Expand Down
91 changes: 91 additions & 0 deletions pkg/controller/nodes/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2151,6 +2151,97 @@ func TestRecover(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, phaseInfo.GetPhase(), handler.EPhaseRecovered)
})
t.Run("recover partially completed dynamic task", func(t *testing.T) {
srcDynamicJobSpecURI := "src/foo/bar"
dstDynamicJobSpecURI := "dst/foo/bar"

// initialize node execution context
nCtx := &nodeHandlerMocks.NodeExecutionContext{}
nCtx.OnExecutionContext().Return(execContext)
nCtx.OnNodeExecutionMetadata().Return(nm)
nCtx.OnInputReader().Return(ir)
nCtx.OnNodeStatus().Return(ns)

mockPBStore := &storageMocks.ComposedProtobufStore{}
mockPBStore.On("CopyRaw", mock.Anything, storage.DataReference(srcDynamicJobSpecURI), storage.DataReference(dstDynamicJobSpecURI), mock.Anything).Return(nil)
mockPBStore.On("WriteProtobuf", mock.Anything, mock.MatchedBy(func(reference storage.DataReference) bool {
return reference.String() == inputsPath || reference.String() == outputsPath
}), mock.Anything,
mock.Anything).Return(nil)
mockReferenceConstructor := storageMocks.ReferenceConstructor{}
mockReferenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("out"), "futures.pb").Return(
storage.DataReference(dstDynamicJobSpecURI), nil)
mockReferenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("out"), "futures_compiled.pb").Return(
storage.DataReference("out/futures_compiled.pb"), nil)
mockReferenceConstructor.On("ConstructReference", mock.MatchedBy(func(ctx context.Context) bool { return true }), storage.DataReference("out"), "dynamic_compiled.pb").Return(
storage.DataReference("out/dynamic_compiled.pb"), nil)
storageClient := &storage.DataStore{
ComposedProtobufStore: mockPBStore,
ReferenceConstructor: &mockReferenceConstructor,
}

nCtx.OnDataStore().Return(storageClient)

reader := &nodeHandlerMocks.NodeStateReader{}
reader.OnGetDynamicNodeState().Return(handler.DynamicNodeState{})
nCtx.OnNodeStateReader().Return(reader)

writer := &nodeHandlerMocks.NodeStateWriter{}
writer.OnPutDynamicNodeStateMatch(mock.Anything).Run(func(args mock.Arguments) {
state := args.Get(0).(handler.DynamicNodeState)
assert.Equal(t, v1alpha1.DynamicNodePhaseParentFinalized, state.Phase)
}).Return(nil)
nCtx.OnNodeStateWriter().Return(writer)

// initialize node executor
recoveryClient := &recoveryMocks.Client{}
recoveryClient.On("RecoverNodeExecution", mock.Anything, recoveryID, nodeID).Return(
&admin.NodeExecution{
Closure: &admin.NodeExecutionClosure{
Phase: core.NodeExecution_FAILED,
DynamicJobSpecUri: srcDynamicJobSpecURI,
},
Metadata: &admin.NodeExecutionMetaData{
IsDynamic: true,
},
}, nil)

dynamicWorkflow := &admin.DynamicWorkflowNodeMetadata{
Id: &core.Identifier{
ResourceType: core.ResourceType_WORKFLOW,
Project: "p",
Domain: "d",
Name: "n",
Version: "abc123",
},
CompiledWorkflow: &core.CompiledWorkflowClosure{
Primary: &core.CompiledWorkflow{
Template: &core.WorkflowTemplate{
Metadata: &core.WorkflowMetadata{
OnFailure: core.WorkflowMetadata_FAIL_AFTER_EXECUTABLE_NODES_COMPLETE,
},
},
},
},
}

recoveryClient.On("RecoverNodeExecutionData", mock.Anything, recoveryID, nodeID).Return(
&admin.NodeExecutionGetDataResponse{
FullInputs: fullInputs,
FullOutputs: fullOutputs,
DynamicWorkflow: dynamicWorkflow,
}, nil)

executor := nodeExecutor{
recoveryClient: recoveryClient,
store: storageClient,
eventConfig: eventConfig,
}

phaseInfo, err := executor.attemptRecovery(context.TODO(), nCtx)
assert.NoError(t, err)
assert.Equal(t, phaseInfo.GetPhase(), handler.EPhaseRunning)
})
t.Run("recover cached, dynamic task node successfully", func(t *testing.T) {
recoveryClient := &recoveryMocks.Client{}
recoveryClient.On("RecoverNodeExecution", mock.Anything, recoveryID, nodeID).Return(
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/nodes/task/future_file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ type FutureFileReader struct {
store *storage.DataStore
}

func (f FutureFileReader) GetLoc() storage.DataReference {
return f.loc
}

func (f FutureFileReader) Exists(ctx context.Context) (bool, error) {
metadata, err := f.store.Head(ctx, f.loc)
// If no futures file produced, then declare success and return.
Expand Down

0 comments on commit 0abbdb8

Please sign in to comment.