Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Return full execution data on every request if under max specified size #109

Merged
merged 6 commits into from
Aug 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/jinzhu/gorm v1.9.12
github.com/kelseyhightower/envconfig v1.4.0 // indirect
github.com/lib/pq v1.3.0
github.com/lyft/flyteidl v0.18.1
github.com/lyft/flyteidl v0.18.2
github.com/lyft/flytepropeller v0.3.7
github.com/lyft/flytestdlib v0.3.9
github.com/magiconair/properties v1.8.1
Expand Down Expand Up @@ -52,4 +52,5 @@ replace (
k8s.io/api => github.com/lyft/api v0.0.0-20191031200350-b49a72c274e0
k8s.io/apimachinery => github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f
k8s.io/client-go => k8s.io/client-go v0.0.0-20191016111102-bec269661e48

)
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -457,16 +457,10 @@ github.com/lyft/apimachinery v0.0.0-20191031200210-047e3ea32d7f/go.mod h1:llRdnz
github.com/lyft/datacatalog v0.2.1/go.mod h1:ktrPvzTDUwHO5Lv0hLH38zLHnOJ++rGoAO0iQ/sIPJ4=
github.com/lyft/flyteidl v0.17.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.0/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteidl v0.18.1 h1:COKkZi5k6bQvUYOk5gE70+FJX9/NUn0WOQ1uMrw3Qio=
github.com/lyft/flyteidl v0.18.1/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.2/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flyteidl v0.18.2 h1:znA8yy8ImnVUGRa2j6z/4zaRbPHNgFCUp84UTquHDJk=
github.com/lyft/flyteidl v0.18.2/go.mod h1:/zQXxuHO11u/saxTTZc8oYExIGEShXB+xCB1/F1Cu20=
github.com/lyft/flyteplugins v0.4.4/go.mod h1:8zhqFG9BzbHNQGEXzGYltTJLD+KTmQZkanxXgeFI25c=
github.com/lyft/flytepropeller v0.3.4 h1:BaxfEJGNnFtYffsjnMhQesUlW06D1zrMM8v/wOos/nU=
github.com/lyft/flytepropeller v0.3.4/go.mod h1:8Stq6f16u7qL9U0CDx7mNkPi80JRl0gtbIMzeqRegug=
github.com/lyft/flytepropeller v0.3.5-0.20200807170205-cc336537cedf h1:pewwhbuOjXI3oiFtKwsmj4qBU29oP1ezP5YG8Ervujs=
github.com/lyft/flytepropeller v0.3.5-0.20200807170205-cc336537cedf/go.mod h1:8JuHCbR2MKMIlLr839NGJidj2tjBGmB/CQt8/zsHc1E=
github.com/lyft/flytepropeller v0.3.6 h1:bMpa96VqvxpRmMCEQw/rp64BC8voPZhjEoKF4FhmzIw=
github.com/lyft/flytepropeller v0.3.6/go.mod h1:1Iw3ngmJBP+52coloHL1rOxcX7EDDUUvTYFQQy2WYzk=
github.com/lyft/flytepropeller v0.3.7 h1:l2AguhyhiUDCvqjHYF8XJw46gPW9j4XNZwJEAJdiEtI=
github.com/lyft/flytepropeller v0.3.7/go.mod h1:8sNP7ZnEngNRYBMewmH4PtiRR0pus8RkjNoPqelyKX8=
github.com/lyft/flytestdlib v0.3.0 h1:nIkX4MlyYdcLLzaF35RI2P5BhARt+qMgHoFto8eVNzU=
Expand Down
38 changes: 33 additions & 5 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,11 @@ type executionSystemMetrics struct {
}

type executionUserMetrics struct {
Scope promutils.Scope
ScheduledExecutionDelays projectDomainScopedStopWatchMap
WorkflowExecutionDurations projectDomainScopedStopWatchMap
Scope promutils.Scope
ScheduledExecutionDelays projectDomainScopedStopWatchMap
WorkflowExecutionDurations projectDomainScopedStopWatchMap
WorkflowExecutionInputBytes prometheus.Summary
WorkflowExecutionOutputBytes prometheus.Summary
}

type ExecutionManager struct {
Expand Down Expand Up @@ -1019,10 +1021,32 @@ func (m *ExecutionManager) GetExecutionData(
if err != nil {
return nil, err
}
return &admin.WorkflowExecutionGetDataResponse{
response := &admin.WorkflowExecutionGetDataResponse{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@katrogan one reason to do this is to avoid having to sign the URL, so can we just do a HEAD, check the size and if that is ok then do a protobuf get using flytestdlib.storage?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or maybe i am misunderstanding, do we actually store the size of the payload in our database?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

until the UI transitions to using the new field, we should return both for backwards compatibility
but good point on using flytestdlib, will update

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we don't store the size

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh wait, and the storage client is using flytestdlib already

Outputs: &signedOutputsURLBlob,
Inputs: &inputsURLBlob,
}, nil
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
if maxDataSize == 0 || inputsURLBlob.Bytes < maxDataSize {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, executionModel.InputsURI, &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", executionModel.InputsURI, err)
}
response.FullInputs = &fullInputs
}
if maxDataSize == 0 || (signedOutputsURLBlob.Bytes < maxDataSize && execution.Closure.GetOutputs() != nil) {
var fullOutputs core.LiteralMap
outputsURI := execution.Closure.GetOutputs().GetUri()
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(outputsURI), &fullOutputs)
if err != nil {
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", outputsURI, err)
}
response.FullOutputs = &fullOutputs
}

m.userMetrics.WorkflowExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
m.userMetrics.WorkflowExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))
return response, nil
}

func (m *ExecutionManager) ListExecutions(
Expand Down Expand Up @@ -1234,6 +1258,10 @@ func NewExecutionManager(
Scope: userScope,
ScheduledExecutionDelays: make(map[string]map[string]*promutils.StopWatch),
WorkflowExecutionDurations: make(map[string]map[string]*promutils.StopWatch),
WorkflowExecutionInputBytes: userScope.MustNewSummary("input_size_bytes",
"size in bytes of serialized execution inputs"),
WorkflowExecutionOutputBytes: userScope.MustNewSummary("output_size_bytes",
"size in bytes of serialized execution outputs"),
}

resourceManager := resources.NewResourceManager(db, config.ApplicationConfiguration())
Expand Down
36 changes: 34 additions & 2 deletions pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func getMockStorageForExecTest(ctx context.Context) *storage.DataStore {
return nil
}
workflowClosure := testutils.GetWorkflowClosure()
if err := mockStorage.WriteProtobuf(ctx, storage.DataReference(remoteClosureIdentifier), defaultStorageOptions, workflowClosure); err != nil {
if err := mockStorage.WriteProtobuf(ctx, remoteClosureIdentifier, defaultStorageOptions, workflowClosure); err != nil {
return nil
}
return mockStorage
Expand Down Expand Up @@ -2013,10 +2013,34 @@ func TestGetExecutionData(t *testing.T) {

return admin.UrlBlob{}, errors.New("unexpected input")
}
mockStorage := commonMocks.GetMockStorageClient()
fullInputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo": testutils.MakeStringLiteral("foo-value-1"),
},
}
fullOutputs := &core.LiteralMap{
Literals: map[string]*core.Literal{
"bar": testutils.MakeStringLiteral("bar-value-1"),
},
}
mockStorage.ComposedProtobufStore.(*commonMocks.TestDataStore).ReadProtobufCb = func(
ctx context.Context, reference storage.DataReference, msg proto.Message) error {
if reference.String() == "inputs" {
marshalled, _ := proto.Marshal(fullInputs)
_ = proto.Unmarshal(marshalled, msg)
return nil
} else if reference.String() == outputURI {
marshalled, _ := proto.Marshal(fullOutputs)
_ = proto.Unmarshal(marshalled, msg)
return nil
}
return fmt.Errorf("unexpected call to find value in storage [%v]", reference.String())
}

repository.ExecutionRepo().(*repositoryMocks.MockExecutionRepo).SetGetCallback(executionGetFunc)
execManager := NewExecutionManager(
repository, getMockExecutionsConfigProvider(), getMockStorageForExecTest(context.Background()), workflowengineMocks.NewMockExecutor(),
repository, getMockExecutionsConfigProvider(), mockStorage, workflowengineMocks.NewMockExecutor(),
mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil)
dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{
Id: &executionIdentifier,
Expand All @@ -2031,6 +2055,8 @@ func TestGetExecutionData(t *testing.T) {
Url: "inputs",
Bytes: 200,
},
FullInputs: fullInputs,
FullOutputs: fullOutputs,
}, dataResponse))
}

Expand Down Expand Up @@ -2215,6 +2241,12 @@ func TestGetExecutionData_LegacyModel(t *testing.T) {
Url: "inputs",
Bytes: 200,
},
FullInputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo": testutils.MakeStringLiteral("foo-value-1"),
},
},
FullOutputs: &core.LiteralMap{},
}, dataResponse))
var inputs core.LiteralMap
err = storageClient.ReadProtobuf(context.Background(), storage.DataReference("s3://bucket/metadata/project/domain/name/inputs"), &inputs)
Expand Down
56 changes: 46 additions & 10 deletions pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"strconv"

"github.com/lyft/flytestdlib/storage"

"github.com/lyft/flytestdlib/contextutils"

"github.com/lyft/flyteadmin/pkg/manager/impl/shared"
Expand All @@ -25,6 +27,7 @@ import (
repoInterfaces "github.com/lyft/flyteadmin/pkg/repositories/interfaces"
"github.com/lyft/flyteadmin/pkg/repositories/models"
"github.com/lyft/flyteadmin/pkg/repositories/transformers"
runtimeInterfaces "github.com/lyft/flyteadmin/pkg/runtime/interfaces"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/lyft/flyteidl/gen/pb-go/flyteidl/core"
"google.golang.org/grpc/codes"
Expand All @@ -38,12 +41,16 @@ type nodeExecutionMetrics struct {
NodeExecutionEventsCreated prometheus.Counter
MissingWorkflowExecution prometheus.Counter
ClosureSizeBytes prometheus.Summary
NodeExecutionInputBytes prometheus.Summary
NodeExecutionOutputBytes prometheus.Summary
}

type NodeExecutionManager struct {
db repositories.RepositoryInterface
metrics nodeExecutionMetrics
urlData dataInterfaces.RemoteURLInterface
db repositories.RepositoryInterface
config runtimeInterfaces.Configuration
storageClient *storage.DataStore
metrics nodeExecutionMetrics
urlData dataInterfaces.RemoteURLInterface
}

type updateNodeExecutionStatus int
Expand Down Expand Up @@ -391,15 +398,38 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return nil, err
}
}
return &admin.NodeExecutionGetDataResponse{
response := &admin.NodeExecutionGetDataResponse{
Inputs: &signedInputsURLBlob,
Outputs: &signedOutputsURLBlob,
}, nil
}
maxDataSize := m.config.ApplicationConfiguration().GetRemoteDataConfig().MaxSizeInBytes
if maxDataSize == 0 || signedInputsURLBlob.Bytes < maxDataSize {
var fullInputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.InputUri), &fullInputs)
if err != nil {
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", nodeExecution.InputUri, err)
}
response.FullInputs = &fullInputs
}
if maxDataSize == 0 || (signedOutputsURLBlob.Bytes < maxDataSize && len(nodeExecution.Closure.GetOutputUri()) > 0) {
var fullOutputs core.LiteralMap
err := m.storageClient.ReadProtobuf(ctx, storage.DataReference(nodeExecution.Closure.GetOutputUri()), &fullOutputs)
if err != nil {
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v",
nodeExecution.Closure.GetOutputUri(), err)
}
response.FullOutputs = &fullOutputs
}

m.metrics.NodeExecutionInputBytes.Observe(float64(response.Inputs.Bytes))
m.metrics.NodeExecutionOutputBytes.Observe(float64(response.Outputs.Bytes))

return response, nil
}

func NewNodeExecutionManager(
db repositories.RepositoryInterface, scope promutils.Scope,
urlData dataInterfaces.RemoteURLInterface) interfaces.NodeExecutionInterface {
db repositories.RepositoryInterface, config runtimeInterfaces.Configuration, storageClient *storage.DataStore,
scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface) interfaces.NodeExecutionInterface {
metrics := nodeExecutionMetrics{
Scope: scope,
ActiveNodeExecutions: scope.MustNewGauge("active_node_executions",
Expand All @@ -414,10 +444,16 @@ func NewNodeExecutionManager(
"overall count of node execution events received that are missing a parent workflow execution"),
ClosureSizeBytes: scope.MustNewSummary("closure_size_bytes",
"size in bytes of serialized node execution closure"),
NodeExecutionInputBytes: scope.MustNewSummary("input_size_bytes",
"size in bytes of serialized node execution inputs"),
NodeExecutionOutputBytes: scope.MustNewSummary("output_size_bytes",
"size in bytes of serialized node execution outputs"),
}
return &NodeExecutionManager{
db: db,
metrics: metrics,
urlData: urlData,
db: db,
config: config,
storageClient: storageClient,
metrics: metrics,
urlData: urlData,
}
}
Loading