Skip to content

Commit

Permalink
Track lineage for child workflows (flyteorg#98)
Browse files Browse the repository at this point in the history
  • Loading branch information
katrogan authored May 19, 2020
1 parent d821500 commit 3828203
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
12 changes: 11 additions & 1 deletion flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,9 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
}
ctx = getExecutionContext(ctx, &workflowExecutionID)

// Get the node execution (if any) that launched this execution
// Get the node and parent execution (if any) that launched this execution
var parentNodeExecutionID uint
var sourceExecutionID uint
if request.Spec.Metadata != nil && request.Spec.Metadata.ParentNodeExecution != nil {
parentNodeExecutionModel, err := util.GetNodeExecutionModel(ctx, m.db, request.Spec.Metadata.ParentNodeExecution)
if err != nil {
Expand All @@ -357,6 +358,14 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
}

parentNodeExecutionID = parentNodeExecutionModel.ID

sourceExecutionModel, err := util.GetExecutionModel(ctx, m.db, *request.Spec.Metadata.ParentNodeExecution.ExecutionId)
if err != nil {
logger.Errorf(ctx, "Failed to get node execution [%+v] that launched this execution [%+v] with error %v",
request.Spec.Metadata.ParentNodeExecution, workflowExecutionID, err)
return nil, nil, err
}
sourceExecutionID = sourceExecutionModel.ID
}

// Dynamically assign task resource defaults.
Expand Down Expand Up @@ -424,6 +433,7 @@ func (m *ExecutionManager) launchExecutionAndPrepareModel(
Notifications: notificationsSettings,
WorkflowIdentifier: workflow.Id,
ParentNodeExecutionID: parentNodeExecutionID,
SourceExecutionID: sourceExecutionID,
Cluster: execInfo.Cluster,
InputsURI: inputsURI,
UserInputsURI: userInputsURI,
Expand Down
29 changes: 28 additions & 1 deletion flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,33 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
}, nil
},
)
getExecutionCalled := false
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(
func(ctx context.Context, input interfaces.GetResourceInput) (models.Execution, error) {
assert.EqualValues(t, input.Project, parentNodeExecutionID.ExecutionId.Project)
assert.EqualValues(t, input.Domain, parentNodeExecutionID.ExecutionId.Domain)
assert.EqualValues(t, input.Name, parentNodeExecutionID.ExecutionId.Name)
getExecutionCalled = true
return models.Execution{
BaseModel: models.BaseModel{
ID: 2,
},
}, nil
},
)

repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetCreateCallback(
func(ctx context.Context, input models.Execution) error {
assert.Equal(t, input.ParentNodeExecutionID, uint(1))
var spec admin.ExecutionSpec
err := proto.Unmarshal(input.Spec, &spec)
assert.NoError(t, err)
assert.Equal(t, admin.ExecutionMetadata_CHILD_WORKFLOW, spec.Metadata.Mode)
assert.Equal(t, "feeny", spec.Metadata.Principal)
assert.EqualValues(t, 1, spec.Metadata.Nesting)
assert.True(t, proto.Equal(&parentNodeExecutionID, spec.Metadata.ParentNodeExecution))
assert.EqualValues(t, input.ParentNodeExecutionID, 1)
assert.EqualValues(t, input.SourceExecutionID, 2)
return nil
},
)
Expand All @@ -276,12 +300,15 @@ func TestCreateExecutionFromWorkflowNode(t *testing.T) {
mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL)
request := testutils.GetExecutionRequest()
request.Spec.Metadata = &admin.ExecutionMetadata{
Mode: admin.ExecutionMetadata_SYSTEM,
Mode: admin.ExecutionMetadata_CHILD_WORKFLOW,
Nesting: 1,
ParentNodeExecution: &parentNodeExecutionID,
Principal: "feeny",
}
response, err := execManager.CreateExecution(context.Background(), request, requestedAt)
assert.Nil(t, err)
assert.True(t, getNodeExecutionCalled)
assert.True(t, getExecutionCalled)
expectedResponse := &admin.ExecutionCreateResponse{
Id: &executionIdentifier,
}
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type CreateExecutionModelInput struct {
Notifications []*admin.Notification
WorkflowIdentifier *core.Identifier
ParentNodeExecutionID uint
SourceExecutionID uint
Cluster string
InputsURI storage.DataReference
UserInputsURI storage.DataReference
Expand Down Expand Up @@ -85,6 +86,7 @@ func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, e
ExecutionCreatedAt: &input.CreatedAt,
ExecutionUpdatedAt: &input.CreatedAt,
ParentNodeExecutionID: input.ParentNodeExecutionID,
SourceExecutionID: input.SourceExecutionID,
Cluster: input.Cluster,
InputsURI: input.InputsURI,
UserInputsURI: input.UserInputsURI,
Expand Down
3 changes: 3 additions & 0 deletions flyteadmin/pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func TestCreateExecutionModel(t *testing.T) {
lpID := uint(33)
wfID := uint(23)
nodeID := uint(11)
sourceID := uint(12)
createdAt := time.Now()
workflowIdentifier := &core.Identifier{
Project: "project",
Expand All @@ -67,6 +68,7 @@ func TestCreateExecutionModel(t *testing.T) {
CreatedAt: createdAt,
WorkflowIdentifier: workflowIdentifier,
ParentNodeExecutionID: nodeID,
SourceExecutionID: sourceID,
Principal: principal,
Cluster: cluster,
})
Expand All @@ -80,6 +82,7 @@ func TestCreateExecutionModel(t *testing.T) {
assert.EqualValues(t, createdAt, *execution.ExecutionUpdatedAt)
assert.Equal(t, int32(admin.ExecutionMetadata_SYSTEM), execution.Mode)
assert.Equal(t, nodeID, execution.ParentNodeExecutionID)
assert.Equal(t, sourceID, execution.SourceExecutionID)
expectedSpec := execRequest.Spec
expectedSpec.Metadata.Principal = principal
expectedSpec.Metadata.SystemMetadata = &admin.SystemMetadata{
Expand Down

0 comments on commit 3828203

Please sign in to comment.