From 936ee48dd801192bde37152375598ef708988803 Mon Sep 17 00:00:00 2001 From: Iaroslav Ciupin Date: Fri, 3 May 2024 22:14:16 +0300 Subject: [PATCH] Dont bubble error on write cache failure (#249) ## Overview * Dont bubble up error on write cache failure. There is not much a client can do with this error and will most probably ignore it. We can only log some warning and increment error counter to track this kind of errors and adjust memory size config for cache. * Bubble up error in `GetInputs/OutputsData` method instead of silently ignoring it. This will avoid confusing/misleading 200 success responses with actual data missing. ## Test Plan * unit tests ## Rollout Plan (if applicable) * redeploy all dataplanes after pulling latest version into union cloud ## Upstream Changes Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F). - [ ] To be upstreamed ## Jira Issue https://unionai.atlassian.net/browse/CASE-643 ## Checklist * [x] Added tests * [ ] Ran a deploy dry run and shared the terraform plan * [x] Added logging and metrics * [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list) * [ ] Updated documentation --- .../manager/impl/execution_manager_test.go | 28 ++++------ flyteadmin/pkg/manager/impl/util/data.go | 17 ++---- flyteadmin/pkg/manager/impl/util/data_test.go | 53 +++++++++++++++++++ flytestdlib/storage/cached_rawstore.go | 8 +-- flytestdlib/storage/cached_rawstore_test.go | 4 +- 5 files changed, 72 insertions(+), 38 deletions(-) diff --git a/flyteadmin/pkg/manager/impl/execution_manager_test.go b/flyteadmin/pkg/manager/impl/execution_manager_test.go index a7caa95f336..110f6d9d089 100644 --- a/flyteadmin/pkg/manager/impl/execution_manager_test.go +++ b/flyteadmin/pkg/manager/impl/execution_manager_test.go @@ -4059,29 +4059,19 @@ func TestGetExecutionData_LegacyModel(t *testing.T) { r := plugins.NewRegistry() r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor) execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), storageClient, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}, artifacts.NewArtifactRegistry(context.Background(), nil)) + dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{ Id: &executionIdentifier, }) - assert.Nil(t, err) - assert.True(t, proto.Equal(&admin.WorkflowExecutionGetDataResponse{ - Outputs: &admin.UrlBlob{ - Url: "outputs", - Bytes: 200, - }, - Inputs: &admin.UrlBlob{ - Url: "inputs", - Bytes: 200, - }, - FullInputs: &core.LiteralMap{ - Literals: map[string]*core.Literal{ - "foo": testutils.MakeStringLiteral("foo-value-1"), - }, - }, - FullOutputs: &core.LiteralMap{}, - }, dataResponse)) + + assert.EqualError(t, err, "could not find value in storage [s3://bucket/output_uri]") + assert.Empty(t, dataResponse) + var inputs core.LiteralMap - err = storageClient.ReadProtobuf(context.Background(), storage.DataReference("s3://bucket/metadata/project/domain/name/inputs"), &inputs) - assert.Nil(t, err) + + err = storageClient.ReadProtobuf(context.Background(), "s3://bucket/metadata/project/domain/name/inputs", &inputs) + + assert.NoError(t, err) assert.True(t, proto.Equal(&inputs, closure.ComputedInputs)) } diff --git a/flyteadmin/pkg/manager/impl/util/data.go b/flyteadmin/pkg/manager/impl/util/data.go index 080cc9fe712..828b4c6ccf1 100644 --- a/flyteadmin/pkg/manager/impl/util/data.go +++ b/flyteadmin/pkg/manager/impl/util/data.go @@ -18,8 +18,7 @@ import ( ) const ( - OutputsFile = "outputs.pb" - DeckFile = "deck.html" + DeckFile = "deck.html" ) type GetObjectRequest struct { @@ -75,13 +74,8 @@ func GetInputs(ctx context.Context, } else { err = readFromDataPlane(ctx, objectStore, cluster, project, domain, inputURI, &fullInputs) } - if err != nil { - // If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether. - // Instead we return the signed URL blob so that the client can use that to fetch the input data. - logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", inputURI, err) - } } - return &fullInputs, &inputsURLBlob, nil + return &fullInputs, &inputsURLBlob, err } // ExecutionClosure defines common methods in NodeExecutionClosure and TaskExecutionClosure used to return output data. @@ -155,14 +149,9 @@ func GetOutputs(ctx context.Context, } else { err = readFromDataPlane(ctx, objectStore, cluster, project, domain, closure.GetOutputUri(), fullOutputs) } - if err != nil { - // If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether. - // Instead we return the signed URL blob so that the client can use that to fetch the output data. - logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", closure.GetOutputUri(), err) - } } - return fullOutputs, &outputsURLBlob, nil + return fullOutputs, &outputsURLBlob, err } func IsLocalURI(ctx context.Context, store *storage.DataStore, uri string) bool { diff --git a/flyteadmin/pkg/manager/impl/util/data_test.go b/flyteadmin/pkg/manager/impl/util/data_test.go index 31a9a9aaaff..524c148e383 100644 --- a/flyteadmin/pkg/manager/impl/util/data_test.go +++ b/flyteadmin/pkg/manager/impl/util/data_test.go @@ -2,6 +2,7 @@ package util import ( "context" + "errors" "testing" "github.com/golang/protobuf/proto" @@ -193,6 +194,31 @@ func TestGetInputs(t *testing.T) { assert.Empty(t, inputURLBlob) store.AssertExpectations(t) }) + + t.Run("should fail if store fails", func(t *testing.T) { + remoteDataConfig.SignedURL = interfaces.SignedURL{Enabled: false} + store := &objectStoreMock{} + expectedErr := errors.New("call failed") + store. + On("GetObject", GetObjectRequest{ + Cluster: clusterName, + Project: project, + Domain: domain, + Prefix: "/foo/bar", + }). + Return(GetObjectResponse{}, expectedErr). + Once() + + ctx := context.TODO() + inputURI := "s3://wrong/foo/bar" + + fullInputs, inputURLBlob, err := GetInputs(ctx, mockRemoteURL, &remoteDataConfig, mockStorage, clusterName, project, domain, inputURI, store) + + assert.EqualError(t, err, "failed to fetch object: call failed") + assert.Empty(t, fullInputs) + assert.Empty(t, inputURLBlob) + store.AssertExpectations(t) + }) } func TestGetOutputs(t *testing.T) { @@ -270,6 +296,33 @@ func TestGetOutputs(t *testing.T) { store.AssertExpectations(t) }) + t.Run("should fail if store fails", func(t *testing.T) { + remoteDataConfig.SignedURL = interfaces.SignedURL{Enabled: false} + store := &objectStoreMock{} + expectedErr := errors.New("call failed") + store. + On("GetObject", GetObjectRequest{ + Cluster: clusterName, + Project: project, + Domain: domain, + Prefix: "/foo/bar", + }). + Return(GetObjectResponse{}, expectedErr). + Once() + testClosure := &admin.NodeExecutionClosure{ + OutputResult: &admin.NodeExecutionClosure_OutputUri{ + OutputUri: "s3://wrong/foo/bar", + }, + } + + fullOutputs, outputURLBlob, err := GetOutputs(context.TODO(), mockRemoteURL, &remoteDataConfig, mockStorage, testClosure, clusterName, project, domain, store) + + assert.EqualError(t, err, "failed to fetch object: call failed") + assert.Empty(t, fullOutputs) + assert.Empty(t, outputURLBlob) + store.AssertExpectations(t) + }) + t.Run("inline outputs", func(t *testing.T) { mockRemoteURL := urlMocks.NewMockRemoteURL() mockRemoteURL.(*urlMocks.MockRemoteURL).GetCallback = func(ctx context.Context, uri string) (admin.UrlBlob, error) { diff --git a/flytestdlib/storage/cached_rawstore.go b/flytestdlib/storage/cached_rawstore.go index 913a517a0f3..0418a1e6812 100644 --- a/flytestdlib/storage/cached_rawstore.go +++ b/flytestdlib/storage/cached_rawstore.go @@ -81,11 +81,12 @@ func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) ( err = s.cache.Set(key, b, 0) if err != nil { - logger.Debugf(ctx, "Failed to Cache the metadata") + s.metrics.CacheWriteError.Inc() err = errors.Wrapf(ErrFailedToWriteCache, err, "Failed to Cache the metadata") + logger.Warn(ctx, err.Error()) } - return ioutils.NewBytesReadCloser(b), err + return ioutils.NewBytesReadCloser(b), nil } // WriteRaw stores a raw byte array. @@ -104,9 +105,10 @@ func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference, if err != nil { s.metrics.CacheWriteError.Inc() err = errors.Wrapf(ErrFailedToWriteCache, err, "Failed to Cache the metadata") + logger.Warn(ctx, err.Error()) } - return err + return nil } // Delete removes the referenced data from the cache as well as underlying store. diff --git a/flytestdlib/storage/cached_rawstore_test.go b/flytestdlib/storage/cached_rawstore_test.go index b9751d7fa1a..54307ff5546 100644 --- a/flytestdlib/storage/cached_rawstore_test.go +++ b/flytestdlib/storage/cached_rawstore_test.go @@ -201,10 +201,10 @@ func TestCachedRawStore(t *testing.T) { readCalled = false err := cStore.WriteRaw(ctx, bigK, int64(len(bigD)), Options{}, bytes.NewReader(bigD)) assert.True(t, writeCalled) - assert.True(t, IsFailedWriteToCache(err)) + assert.NoError(t, err) o, err := cStore.ReadRaw(ctx, bigK) - assert.True(t, IsFailedWriteToCache(err)) + assert.NoError(t, err) b, err := ioutil.ReadAll(o) assert.NoError(t, err) assert.Equal(t, bigD, b)