Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Add count to execution repo interfaces #464

Merged
merged 5 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion auth/init_secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ var (
// GetInitSecretsCommand creates a command to issue secrets to be used for Auth settings. It writes the secrets to the
// working directory. The expectation is that they are put in a location and made available to the serve command later.
// To configure where the serve command looks for secrets, update this config:
// secrets:
//
// secrets:
// secrets-prefix: <my custom path>
func GetInitSecretsCommand() *cobra.Command {
cmd := &cobra.Command{
Expand Down
13 changes: 7 additions & 6 deletions pkg/clusterresource/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,10 @@ func prepareDynamicCreate(target executioncluster.ExecutionTarget, config string

// This function loops through the kubernetes resource template files in the configured template directory.
// For each unapplied template file (wrt the namespace) this func attempts to
// 1) create k8s object resource from template by performing:
// a) read template file
// b) substitute templatized variables with their resolved values
// 2) create the resource on the kubernetes cluster and cache successful outcomes
// 1. create k8s object resource from template by performing:
// a) read template file
// b) substitute templatized variables with their resolved values
// 2. create the resource on the kubernetes cluster and cache successful outcomes
func (c *controller) syncNamespace(ctx context.Context, project *admin.Project, domain *admin.Domain, namespace NamespaceName,
templateValues, customTemplateValues templateValuesType) error {
templateDir := c.config.ClusterResourceConfiguration().GetTemplatePath()
Expand Down Expand Up @@ -445,8 +445,9 @@ func addResourceVersion(patch []byte, rv string) ([]byte, error) {
}

// createResourceFromTemplate this method perform following processes:
// 1) read template file pointed by templateDir and templateFileName
// 2) substitute templatized variables with their resolved values
// 1. read template file pointed by templateDir and templateFileName
// 2. substitute templatized variables with their resolved values
//
// the method will return the kubernetes raw manifest
func (c *controller) createResourceFromTemplate(ctx context.Context, templateDir string,
templateFileName string, project *admin.Project, domain *admin.Domain, namespace NamespaceName,
Expand Down
3 changes: 2 additions & 1 deletion pkg/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ func (e *flyteAdminErrorImpl) String() string {
}

// enclose the error in the format that grpc server expect from golang:
// https://github.com/grpc/grpc-go/blob/master/status/status.go#L133
//
// https://github.com/grpc/grpc-go/blob/master/status/status.go#L133
func (e *flyteAdminErrorImpl) WithDetails(details *admin.EventFailureReason) (FlyteAdminError, error) {
s, err := e.status.WithDetails(details)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2294,7 +2294,7 @@ func TestUpdateExecution(t *testing.T) {
updateExecFuncCalled = true
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecFunc)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecFunc)
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
Expand All @@ -2315,7 +2315,7 @@ func TestUpdateExecution(t *testing.T) {
updateExecFuncCalled = true
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecFunc)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecFunc)
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
Expand All @@ -2333,7 +2333,7 @@ func TestUpdateExecution(t *testing.T) {
updateExecFunc := func(ctx context.Context, execModel models.Execution) error {
return fmt.Errorf("some db error")
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecFunc)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecFunc)
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{})
Expand Down Expand Up @@ -2818,7 +2818,7 @@ func TestTerminateExecution(t *testing.T) {
}, unmarshaledClosure.GetAbortMetadata()))
return nil
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecutionFunc)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)
andrewwdye marked this conversation as resolved.
Show resolved Hide resolved

mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnAbortMatch(mock.Anything, mock.MatchedBy(func(data workflowengineInterfaces.AbortData) bool {
Expand Down Expand Up @@ -2860,7 +2860,7 @@ func TestTerminateExecution_PropellerError(t *testing.T) {

updateCalled := false
repository := repositoryMocks.NewMockRepository()
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(func(
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(func(
context context.Context, execution models.Execution) error {
updateCalled = true
assert.Equal(t, core.WorkflowExecution_ABORTING.String(), execution.Phase)
Expand Down Expand Up @@ -2892,7 +2892,7 @@ func TestTerminateExecution_DatabaseError(t *testing.T) {
context context.Context, execution models.Execution) error {
return expectedError
}
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateExecutionCallback(updateExecutionFunc)
repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetUpdateCallback(updateExecutionFunc)
mockExecutor := workflowengineMocks.WorkflowExecutor{}
mockExecutor.OnAbortMatch(mock.Anything, mock.Anything).Return(nil)
mockExecutor.OnID().Return("testMockExecutor")
Expand Down
10 changes: 5 additions & 5 deletions pkg/manager/impl/executions/quality_of_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func (q qualityOfServiceAllocator) getQualityOfServiceFromDb(ctx context.Context
/*
Users can specify the quality of service for an execution (in order of decreasing specificity)

- At CreateExecution request time
- In the LaunchPlan spec
- In the Workflow spec
- As an overridable MatchableResource (https://lyft.github.io/flyte/administrator/install/managing_customizable_resources.html)
for the underlying workflow
- At CreateExecution request time
- In the LaunchPlan spec
- In the Workflow spec
- As an overridable MatchableResource (https://lyft.github.io/flyte/administrator/install/managing_customizable_resources.html)
for the underlying workflow

System administrators can specify default QualityOfService specs
(https://github.com/flyteorg/flyteidl/blob/e9727afcedf8d4c30a1fc2eeac45593e426d9bb0/protos/flyteidl/core/execution.proto#L92)s
Expand Down
171 changes: 87 additions & 84 deletions pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
ExecutionId: &workflowExecutionIdentifier,
}
t.Run("event version 0", func(t *testing.T) {
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction =
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.True(t, proto.Equal(nodeExecID, &input.NodeExecutionIdentifier))
return models.NodeExecution{
Expand All @@ -432,7 +432,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
},
},
}, nil
}
})

manager := NodeExecutionManager{
db: repository,
Expand Down Expand Up @@ -484,11 +484,11 @@ func TestTransformNodeExecutionModel(t *testing.T) {
})
t.Run("get with children err", func(t *testing.T) {
expectedErr := flyteAdminErrors.NewFlyteAdminError(codes.Internal, "foo")
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction =
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
assert.True(t, proto.Equal(nodeExecID, &input.NodeExecutionIdentifier))
return models.NodeExecution{}, expectedErr
}
})

manager := NodeExecutionManager{
db: repository,
Expand All @@ -501,7 +501,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
func TestTransformNodeExecutionModelList(t *testing.T) {
ctx := context.TODO()
repository := repositoryMocks.NewMockRepository()
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction =
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
Expand All @@ -522,7 +522,7 @@ func TestTransformNodeExecutionModelList(t *testing.T) {
},
},
}, nil
}
})

manager := NodeExecutionManager{
db: repository,
Expand Down Expand Up @@ -600,45 +600,46 @@ func TestGetNodeExecutionParentNode(t *testing.T) {
}
metadataBytes, _ := proto.Marshal(&expectedMetadata)
closureBytes, _ := proto.Marshal(&expectedClosure)
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction = func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &workflowExecutionIdentifier,
}, &input.NodeExecutionIdentifier))
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &workflowExecutionIdentifier,
}, &input.NodeExecutionIdentifier))
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
},
},
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
ChildNodeExecutions: []models.NodeExecution{
{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node-child",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
ChildNodeExecutions: []models.NodeExecution{
{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node-child",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
},
},
},
},
}, nil
}
}, nil
})
nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), make([]string, 0), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil, nil, &eventWriterMocks.NodeExecutionEventWriter{})
nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{
Id: &nodeExecutionIdentifier,
Expand All @@ -664,33 +665,34 @@ func TestGetNodeExecutionEventVersion0(t *testing.T) {
}
metadataBytes, _ := proto.Marshal(&expectedMetadata)
closureBytes, _ := proto.Marshal(&expectedClosure)
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction = func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &workflowExecutionIdentifier,
}, &input.NodeExecutionIdentifier))
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
workflowExecutionIdentifier := core.WorkflowExecutionIdentifier{
Project: "project",
Domain: "domain",
Name: "name",
}
assert.True(t, proto.Equal(&core.NodeExecutionIdentifier{
NodeId: "node id",
ExecutionId: &workflowExecutionIdentifier,
}, &input.NodeExecutionIdentifier))
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
},
},
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
}, nil
}
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
}, nil
})

nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), make([]string, 0), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil, nil, &eventWriterMocks.NodeExecutionEventWriter{})
nodeExecution, err := nodeExecManager.GetNodeExecution(context.Background(), admin.NodeExecutionGetRequest{
Expand Down Expand Up @@ -809,24 +811,25 @@ func TestListNodeExecutionsLevelZero(t *testing.T) {
},
}, nil
})
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).GetWithChildrenFunction = func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetGetWithChildrenCallback(
func(
ctx context.Context, input interfaces.NodeExecutionResource) (models.NodeExecution, error) {
return models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "node id",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
},
},
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
}, nil
}
Phase: core.NodeExecution_SUCCEEDED.String(),
InputURI: "input uri",
StartedAt: &occurredAt,
Closure: closureBytes,
NodeExecutionMetadata: metadataBytes,
}, nil
})
nodeExecManager := NewNodeExecutionManager(repository, getMockExecutionsConfigProvider(), make([]string, 0), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockNodeExecutionRemoteURL, nil, nil, &eventWriterMocks.NodeExecutionEventWriter{})
nodeExecutions, err := nodeExecManager.ListNodeExecutions(context.Background(), admin.NodeExecutionListRequest{
WorkflowExecutionId: &core.WorkflowExecutionIdentifier{
Expand Down
18 changes: 10 additions & 8 deletions pkg/manager/impl/task_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,21 +304,23 @@ func TestCreateTaskEvent_MissingExecution(t *testing.T) {
func(ctx context.Context, input interfaces.GetTaskExecutionInput) (models.TaskExecution, error) {
return models.TaskExecution{}, flyteAdminErrors.NewFlyteAdminError(codes.NotFound, "foo")
})
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).ExistsFunction = func(
ctx context.Context, input interfaces.NodeExecutionResource) (bool, error) {
return false, expectedErr
}
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetExistsCallback(
func(
ctx context.Context, input interfaces.NodeExecutionResource) (bool, error) {
return false, expectedErr
})
taskExecManager := NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil, nil)
resp, err := taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.EqualError(t, err, "Failed to get existing node execution id: [node_id:\"node-id\""+
" execution_id:<project:\"project\" domain:\"domain\" name:\"name\" > ] "+
"with err: expected error")
assert.Nil(t, resp)

repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).ExistsFunction = func(
ctx context.Context, input interfaces.NodeExecutionResource) (bool, error) {
return false, nil
}
repository.NodeExecutionRepo().(*repositoryMocks.MockNodeExecutionRepo).SetExistsCallback(
func(
ctx context.Context, input interfaces.NodeExecutionResource) (bool, error) {
return false, nil
})
taskExecManager = NewTaskExecutionManager(repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), mockScope.NewTestScope(), mockTaskExecutionRemoteURL, nil, nil)
resp, err = taskExecManager.CreateTaskExecutionEvent(context.Background(), taskEventRequest)
assert.EqualError(t, err, "failed to get existing node execution id: [node_id:\"node-id\""+
Expand Down
4 changes: 3 additions & 1 deletion pkg/repositories/errors/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
// This errors utility translates postgres application error codes into internal error types.
// The go postgres driver defines possible error codes here: https://github.com/lib/pq/blob/master/error.go
// And the postgres standard defines error responses here:
// https://www.postgresql.org/docs/current/static/protocol-error-fields.html
//
// https://www.postgresql.org/docs/current/static/protocol-error-fields.html
//
// Inspired by https://www.codementor.io/tamizhvendan/managing-data-in-golang-using-gorm-part-1-a9cdjb8nb
package errors

Expand Down
Loading