From 6a8ffa8dab0f554e10e4a00cfe5b360af316d398 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 10 Oct 2022 09:07:37 -0500 Subject: [PATCH 1/2] adding cache information on first node event Signed-off-by: Daniel Rammer --- .../transformers/node_execution.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/pkg/repositories/transformers/node_execution.go b/pkg/repositories/transformers/node_execution.go index 2029f69f7..0fe9685c3 100644 --- a/pkg/repositories/transformers/node_execution.go +++ b/pkg/repositories/transformers/node_execution.go @@ -133,12 +133,30 @@ func CreateNodeExecutionModel(ctx context.Context, input ToNodeExecutionModelInp return nil, err } } + if common.IsNodeExecutionTerminal(input.Request.Event.Phase) { err := addTerminalState(ctx, input.Request, nodeExecution, &closure, input.InlineEventDataPolicy, input.StorageClient) if err != nil { return nil, err } } + + // Update TaskNodeMetadata, which includes caching information today. + if input.Request.Event.GetTaskNodeMetadata() != nil { + targetMetadata := &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{ + CheckpointUri: input.Request.Event.GetTaskNodeMetadata().CheckpointUri, + }, + } + if input.Request.Event.GetTaskNodeMetadata().CatalogKey != nil { + st := input.Request.Event.GetTaskNodeMetadata().GetCacheStatus().String() + targetMetadata.TaskNodeMetadata.CacheStatus = input.Request.Event.GetTaskNodeMetadata().GetCacheStatus() + targetMetadata.TaskNodeMetadata.CatalogKey = input.Request.Event.GetTaskNodeMetadata().GetCatalogKey() + nodeExecution.CacheStatus = &st + } + closure.TargetMetadata = targetMetadata + } + marshaledClosure, err := proto.Marshal(&closure) if err != nil { return nil, errors.NewFlyteAdminErrorf( From 6a275357525ae9b6f31a5a62a55095849721cfd4 Mon Sep 17 00:00:00 2001 From: Daniel Rammer Date: Mon, 10 Oct 2022 09:26:46 -0500 Subject: [PATCH 2/2] fixed unit tests Signed-off-by: Daniel Rammer --- .../impl/node_execution_manager_test.go | 3 + .../transformers/node_execution_test.go | 69 +++++++++++++------ 2 files changed, 50 insertions(+), 22 deletions(-) diff --git a/pkg/manager/impl/node_execution_manager_test.go b/pkg/manager/impl/node_execution_manager_test.go index 1b47f16bf..104a2c9ac 100644 --- a/pkg/manager/impl/node_execution_manager_test.go +++ b/pkg/manager/impl/node_execution_manager_test.go @@ -132,6 +132,9 @@ func TestCreateNodeEvent(t *testing.T) { StartedAt: occurredAtProto, CreatedAt: occurredAtProto, UpdatedAt: occurredAtProto, + TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{}, + }, } closureBytes, _ := proto.Marshal(&expectedClosure) repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetCreateCallback( diff --git a/pkg/repositories/transformers/node_execution_test.go b/pkg/repositories/transformers/node_execution_test.go index a06d762c7..e07bff85b 100644 --- a/pkg/repositories/transformers/node_execution_test.go +++ b/pkg/repositories/transformers/node_execution_test.go @@ -188,33 +188,49 @@ func TestAddTerminalState_Error(t *testing.T) { func TestCreateNodeExecutionModel(t *testing.T) { parentTaskExecID := uint(8) - nodeExecutionModel, err := CreateNodeExecutionModel(context.TODO(), ToNodeExecutionModelInput{ - Request: &admin.NodeExecutionEventRequest{ - Event: &event.NodeExecutionEvent{ - Id: &core.NodeExecutionIdentifier{ - NodeId: "node id", - ExecutionId: &core.WorkflowExecutionIdentifier{ - Project: "project", - Domain: "domain", - Name: "name", - }, - }, - Phase: core.NodeExecution_RUNNING, - InputUri: "input uri", - OutputResult: &event.NodeExecutionEvent_OutputUri{ - OutputUri: "output uri", + request := &admin.NodeExecutionEventRequest{ + Event: &event.NodeExecutionEvent{ + Id: &core.NodeExecutionIdentifier{ + NodeId: "node id", + ExecutionId: &core.WorkflowExecutionIdentifier{ + Project: "project", + Domain: "domain", + Name: "name", }, - OccurredAt: occurredAtProto, - ParentTaskMetadata: &event.ParentTaskExecutionMetadata{ - Id: &core.TaskExecutionIdentifier{ - RetryAttempt: 1, + }, + Phase: core.NodeExecution_RUNNING, + InputUri: "input uri", + OutputResult: &event.NodeExecutionEvent_OutputUri{ + OutputUri: "output uri", + }, + OccurredAt: occurredAtProto, + TargetMetadata: &event.NodeExecutionEvent_TaskNodeMetadata{ + TaskNodeMetadata: &event.TaskNodeMetadata{ + CacheStatus: core.CatalogCacheStatus_CACHE_POPULATED, + CatalogKey: &core.CatalogMetadata{ + DatasetId: &core.Identifier{ + ResourceType: core.ResourceType_DATASET, + Name: "x", + Project: "proj", + Domain: "domain", + }, }, + CheckpointUri: "last checkpoint uri", + }, + }, + ParentTaskMetadata: &event.ParentTaskExecutionMetadata{ + Id: &core.TaskExecutionIdentifier{ + RetryAttempt: 1, }, - IsParent: true, - IsDynamic: true, - EventVersion: 2, }, + IsParent: true, + IsDynamic: true, + EventVersion: 2, }, + } + + nodeExecutionModel, err := CreateNodeExecutionModel(context.TODO(), ToNodeExecutionModelInput{ + Request: request, ParentTaskExecutionID: &parentTaskExecID, }) assert.Nil(t, err) @@ -224,6 +240,13 @@ func TestCreateNodeExecutionModel(t *testing.T) { StartedAt: occurredAtProto, CreatedAt: occurredAtProto, UpdatedAt: occurredAtProto, + TargetMetadata: &admin.NodeExecutionClosure_TaskNodeMetadata{ + TaskNodeMetadata: &admin.TaskNodeMetadata{ + CacheStatus: request.Event.GetTaskNodeMetadata().CacheStatus, + CatalogKey: request.Event.GetTaskNodeMetadata().CatalogKey, + CheckpointUri: request.Event.GetTaskNodeMetadata().CheckpointUri, + }, + }, } var closureBytes, _ = proto.Marshal(closure) var nodeExecutionMetadata, _ = proto.Marshal(&admin.NodeExecutionMetaData{ @@ -234,6 +257,7 @@ func TestCreateNodeExecutionModel(t *testing.T) { EventVersion: 2, } internalDataBytes, _ := proto.Marshal(internalData) + cacheStatus := request.Event.GetTaskNodeMetadata().CacheStatus.String() assert.Equal(t, &models.NodeExecution{ NodeExecutionKey: models.NodeExecutionKey{ NodeID: "node id", @@ -251,6 +275,7 @@ func TestCreateNodeExecutionModel(t *testing.T) { NodeExecutionUpdatedAt: &occurredAt, NodeExecutionMetadata: nodeExecutionMetadata, ParentTaskExecutionID: &parentTaskExecID, + CacheStatus: &cacheStatus, InternalData: internalDataBytes, }, nodeExecutionModel) }