Skip to content

Commit

Permalink
Merge branch 'master' into artifacts-shell-2
Browse files Browse the repository at this point in the history
  • Loading branch information
wild-endeavor authored Jan 8, 2024
2 parents 3ce12b2 + 513c3e1 commit 63b1b81
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 20 deletions.
54 changes: 39 additions & 15 deletions flytepropeller/pkg/controller/nodes/array/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,24 @@ package array
import (
"context"
"fmt"
"strconv"
"time"

"github.com/golang/protobuf/ptypes"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyte/flyteplugins/go/tasks/pluginmachinery/encoding"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/interfaces"
"github.com/flyteorg/flyte/flytepropeller/pkg/controller/nodes/task"
)

type arrayEventRecorder interface {
interfaces.EventRecorder
process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32)
process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error
finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext, taskPhase idlcore.TaskExecution_Phase, taskPhaseVersion uint32, eventConfig *config.EventConfig) error
finalizeRequired(ctx context.Context) bool
}
Expand All @@ -39,8 +42,23 @@ func (e *externalResourcesEventRecorder) RecordTaskEvent(ctx context.Context, ev
return nil
}

func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) {
externalResourceID := fmt.Sprintf("%s-%d", buildSubNodeID(nCtx, index), retryAttempt)
func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error {
// generate externalResourceID
currentNodeUniqueID := nCtx.NodeID()
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
var err error
currentNodeUniqueID, err = common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nCtx.NodeID())
if err != nil {
return err
}
}

uniqueID, err := encoding.FixedLengthUniqueIDForParts(task.IDMaxLength, []string{nCtx.NodeExecutionMetadata().GetOwnerID().Name, currentNodeUniqueID, strconv.Itoa(int(retryAttempt))})
if err != nil {
return err
}

externalResourceID := fmt.Sprintf("%s-n%d-%d", uniqueID, index, retryAttempt)

// process events
cacheStatus := idlcore.CatalogCacheStatus_CACHE_DISABLED
Expand Down Expand Up @@ -83,6 +101,8 @@ func (e *externalResourcesEventRecorder) process(ctx context.Context, nCtx inter
// clear nodeEvents and taskEvents
e.nodeEvents = e.nodeEvents[:0]
e.taskEvents = e.taskEvents[:0]

return nil
}

func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext,
Expand All @@ -94,6 +114,17 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte
return err
}

var taskID *idlcore.Identifier
subNode := nCtx.Node().GetArrayNode().GetSubNodeSpec()
if subNode != nil && subNode.Kind == v1alpha1.NodeKindTask {
executableTask, err := nCtx.ExecutionContext().GetTask(*subNode.GetTaskID())
if err != nil {
return err
}

taskID = executableTask.CoreTask().GetId()
}

nodeExecutionID := *nCtx.NodeExecutionMetadata().GetNodeExecutionID()
if nCtx.ExecutionContext().GetEventVersion() != v1alpha1.EventVersion0 {
currentNodeUniqueID, err := common.GenerateUniqueID(nCtx.ExecutionContext().GetParentInfo(), nodeExecutionID.NodeId)
Expand All @@ -103,26 +134,18 @@ func (e *externalResourcesEventRecorder) finalize(ctx context.Context, nCtx inte
nodeExecutionID.NodeId = currentNodeUniqueID
}

workflowExecutionID := nodeExecutionID.ExecutionId

taskExecutionEvent := &event.TaskExecutionEvent{
TaskId: &idlcore.Identifier{
ResourceType: idlcore.ResourceType_TASK,
Project: workflowExecutionID.Project,
Domain: workflowExecutionID.Domain,
Name: nCtx.NodeID(),
Version: "v1", // this value is irrelevant but necessary for the identifier to be valid
},
TaskId: taskID,
ParentNodeExecutionId: &nodeExecutionID,
RetryAttempt: 0, // ArrayNode will never retry
Phase: taskPhase,
PhaseVersion: taskPhaseVersion,
OccurredAt: occurredAt,
Metadata: &event.TaskExecutionMetadata{
ExternalResources: e.externalResources,
PluginIdentifier: "container",
PluginIdentifier: "k8s-array",
},
TaskType: "k8s-array",
TaskType: "container_array",
EventVersion: 1,
}

Expand Down Expand Up @@ -165,7 +188,8 @@ type passThroughEventRecorder struct {
interfaces.EventRecorder
}

func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) {
func (*passThroughEventRecorder) process(ctx context.Context, nCtx interfaces.NodeExecutionContext, index int, retryAttempt uint32) error {
return nil
}

func (*passThroughEventRecorder) finalize(ctx context.Context, nCtx interfaces.NodeExecutionContext,
Expand Down
12 changes: 9 additions & 3 deletions flytepropeller/pkg/controller/nodes/array/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ func (a *arrayNodeHandler) Abort(ctx context.Context, nCtx interfaces.NodeExecut
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}

eventRecorder.process(ctx, nCtx, i, retryAttempt)
if err := eventRecorder.process(ctx, nCtx, i, retryAttempt); err != nil {
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}
}
}
}
Expand Down Expand Up @@ -241,7 +243,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}

eventRecorder.process(ctx, nCtx, i, 0)
if err := eventRecorder.process(ctx, nCtx, i, 0); err != nil {
logger.Warnf(ctx, "failed to record ArrayNode events: %v", err)
}
}

// transition ArrayNode to `ArrayNodePhaseExecuting`
Expand Down Expand Up @@ -331,7 +335,9 @@ func (a *arrayNodeHandler) Handle(ctx context.Context, nCtx interfaces.NodeExecu
}
}
}
eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts())
if err := eventRecorder.process(ctx, nCtx, index, subNodeStatus.GetAttempts()); err != nil {
return handler.UnknownTransition, err
}

// update subNode state
arrayNodeState.SubNodePhases.SetItem(index, uint64(subNodeStatus.GetPhase()))
Expand Down
5 changes: 5 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/types"

idlcore "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/event"
Expand Down Expand Up @@ -145,6 +146,10 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
Name: "name",
},
})
nodeExecutionMetadata.OnGetOwnerID().Return(types.NamespacedName{
Namespace: "wf-namespace",
Name: "wf-name",
})
nCtx.OnNodeExecutionMetadata().Return(nodeExecutionMetadata)

// NodeID
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 63b1b81

Please sign in to comment.