Skip to content

Commit

Permalink
Dont bubble error on write cache failure (#249)
Browse files Browse the repository at this point in the history
## Overview
* Dont bubble up error on write cache failure. There is not much a client can do with this error and will most probably ignore it. We can only log some warning and increment error counter to track this kind of errors and adjust memory size config for cache.
* Bubble up error in `GetInputs/OutputsData` method instead of silently ignoring it. This will avoid confusing/misleading 200 success responses with actual data missing. 

## Test Plan
* unit tests

## Rollout Plan (if applicable)
* redeploy all dataplanes after pulling latest version into union cloud

## Upstream Changes
Should this change be upstreamed to OSS (flyteorg/flyte)? If so, please check this box for auditing. Note, this is the responsibility of each developer. See [this guide](https://unionai.atlassian.net/wiki/spaces/ENG/pages/447610883/Flyte+-+Union+Cloud+Development+Runbook/#When-are-versions-updated%3F).
- [ ] To be upstreamed

## Jira Issue
https://unionai.atlassian.net/browse/CASE-643

## Checklist
* [x] Added tests
* [ ] Ran a deploy dry run and shared the terraform plan
* [x] Added logging and metrics
* [ ] Updated [dashboards](https://unionai.grafana.net/dashboards) and [alerts](https://unionai.grafana.net/alerting/list)
* [ ] Updated documentation
  • Loading branch information
iaroslav-ciupin authored May 3, 2024
1 parent aaa1f28 commit 936ee48
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 38 deletions.
28 changes: 9 additions & 19 deletions flyteadmin/pkg/manager/impl/execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4059,29 +4059,19 @@ func TestGetExecutionData_LegacyModel(t *testing.T) {
r := plugins.NewRegistry()
r.RegisterDefault(plugins.PluginIDWorkflowExecutor, &defaultTestExecutor)
execManager := NewExecutionManager(repository, r, getMockExecutionsConfigProvider(), storageClient, mockScope.NewTestScope(), mockScope.NewTestScope(), &mockPublisher, mockExecutionRemoteURL, nil, nil, nil, nil, &eventWriterMocks.WorkflowExecutionEventWriter{}, artifacts.NewArtifactRegistry(context.Background(), nil))

dataResponse, err := execManager.GetExecutionData(context.Background(), admin.WorkflowExecutionGetDataRequest{
Id: &executionIdentifier,
})
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.WorkflowExecutionGetDataResponse{
Outputs: &admin.UrlBlob{
Url: "outputs",
Bytes: 200,
},
Inputs: &admin.UrlBlob{
Url: "inputs",
Bytes: 200,
},
FullInputs: &core.LiteralMap{
Literals: map[string]*core.Literal{
"foo": testutils.MakeStringLiteral("foo-value-1"),
},
},
FullOutputs: &core.LiteralMap{},
}, dataResponse))

assert.EqualError(t, err, "could not find value in storage [s3://bucket/output_uri]")
assert.Empty(t, dataResponse)

var inputs core.LiteralMap
err = storageClient.ReadProtobuf(context.Background(), storage.DataReference("s3://bucket/metadata/project/domain/name/inputs"), &inputs)
assert.Nil(t, err)

err = storageClient.ReadProtobuf(context.Background(), "s3://bucket/metadata/project/domain/name/inputs", &inputs)

assert.NoError(t, err)
assert.True(t, proto.Equal(&inputs, closure.ComputedInputs))
}

Expand Down
17 changes: 3 additions & 14 deletions flyteadmin/pkg/manager/impl/util/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ import (
)

const (
OutputsFile = "outputs.pb"
DeckFile = "deck.html"
DeckFile = "deck.html"
)

type GetObjectRequest struct {
Expand Down Expand Up @@ -75,13 +74,8 @@ func GetInputs(ctx context.Context,
} else {
err = readFromDataPlane(ctx, objectStore, cluster, project, domain, inputURI, &fullInputs)
}
if err != nil {
// If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether.
// Instead we return the signed URL blob so that the client can use that to fetch the input data.
logger.Warningf(ctx, "Failed to read inputs from URI [%s] with err: %v", inputURI, err)
}
}
return &fullInputs, &inputsURLBlob, nil
return &fullInputs, &inputsURLBlob, err
}

// ExecutionClosure defines common methods in NodeExecutionClosure and TaskExecutionClosure used to return output data.
Expand Down Expand Up @@ -155,14 +149,9 @@ func GetOutputs(ctx context.Context,
} else {
err = readFromDataPlane(ctx, objectStore, cluster, project, domain, closure.GetOutputUri(), fullOutputs)
}
if err != nil {
// If we fail to read the protobuf from the remote store, we shouldn't fail the request altogether.
// Instead we return the signed URL blob so that the client can use that to fetch the output data.
logger.Warningf(ctx, "Failed to read outputs from URI [%s] with err: %v", closure.GetOutputUri(), err)
}
}

return fullOutputs, &outputsURLBlob, nil
return fullOutputs, &outputsURLBlob, err
}

func IsLocalURI(ctx context.Context, store *storage.DataStore, uri string) bool {
Expand Down
53 changes: 53 additions & 0 deletions flyteadmin/pkg/manager/impl/util/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"context"
"errors"
"testing"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -193,6 +194,31 @@ func TestGetInputs(t *testing.T) {
assert.Empty(t, inputURLBlob)
store.AssertExpectations(t)
})

t.Run("should fail if store fails", func(t *testing.T) {
remoteDataConfig.SignedURL = interfaces.SignedURL{Enabled: false}
store := &objectStoreMock{}
expectedErr := errors.New("call failed")
store.
On("GetObject", GetObjectRequest{
Cluster: clusterName,
Project: project,
Domain: domain,
Prefix: "/foo/bar",
}).
Return(GetObjectResponse{}, expectedErr).
Once()

ctx := context.TODO()
inputURI := "s3://wrong/foo/bar"

fullInputs, inputURLBlob, err := GetInputs(ctx, mockRemoteURL, &remoteDataConfig, mockStorage, clusterName, project, domain, inputURI, store)

assert.EqualError(t, err, "failed to fetch object: call failed")
assert.Empty(t, fullInputs)
assert.Empty(t, inputURLBlob)
store.AssertExpectations(t)
})
}

func TestGetOutputs(t *testing.T) {
Expand Down Expand Up @@ -270,6 +296,33 @@ func TestGetOutputs(t *testing.T) {
store.AssertExpectations(t)
})

t.Run("should fail if store fails", func(t *testing.T) {
remoteDataConfig.SignedURL = interfaces.SignedURL{Enabled: false}
store := &objectStoreMock{}
expectedErr := errors.New("call failed")
store.
On("GetObject", GetObjectRequest{
Cluster: clusterName,
Project: project,
Domain: domain,
Prefix: "/foo/bar",
}).
Return(GetObjectResponse{}, expectedErr).
Once()
testClosure := &admin.NodeExecutionClosure{
OutputResult: &admin.NodeExecutionClosure_OutputUri{
OutputUri: "s3://wrong/foo/bar",
},
}

fullOutputs, outputURLBlob, err := GetOutputs(context.TODO(), mockRemoteURL, &remoteDataConfig, mockStorage, testClosure, clusterName, project, domain, store)

assert.EqualError(t, err, "failed to fetch object: call failed")
assert.Empty(t, fullOutputs)
assert.Empty(t, outputURLBlob)
store.AssertExpectations(t)
})

t.Run("inline outputs", func(t *testing.T) {
mockRemoteURL := urlMocks.NewMockRemoteURL()
mockRemoteURL.(*urlMocks.MockRemoteURL).GetCallback = func(ctx context.Context, uri string) (admin.UrlBlob, error) {
Expand Down
8 changes: 5 additions & 3 deletions flytestdlib/storage/cached_rawstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,12 @@ func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) (

err = s.cache.Set(key, b, 0)
if err != nil {
logger.Debugf(ctx, "Failed to Cache the metadata")
s.metrics.CacheWriteError.Inc()
err = errors.Wrapf(ErrFailedToWriteCache, err, "Failed to Cache the metadata")
logger.Warn(ctx, err.Error())
}

return ioutils.NewBytesReadCloser(b), err
return ioutils.NewBytesReadCloser(b), nil
}

// WriteRaw stores a raw byte array.
Expand All @@ -104,9 +105,10 @@ func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference,
if err != nil {
s.metrics.CacheWriteError.Inc()
err = errors.Wrapf(ErrFailedToWriteCache, err, "Failed to Cache the metadata")
logger.Warn(ctx, err.Error())
}

return err
return nil
}

// Delete removes the referenced data from the cache as well as underlying store.
Expand Down
4 changes: 2 additions & 2 deletions flytestdlib/storage/cached_rawstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,10 +201,10 @@ func TestCachedRawStore(t *testing.T) {
readCalled = false
err := cStore.WriteRaw(ctx, bigK, int64(len(bigD)), Options{}, bytes.NewReader(bigD))
assert.True(t, writeCalled)
assert.True(t, IsFailedWriteToCache(err))
assert.NoError(t, err)

o, err := cStore.ReadRaw(ctx, bigK)
assert.True(t, IsFailedWriteToCache(err))
assert.NoError(t, err)
b, err := ioutil.ReadAll(o)
assert.NoError(t, err)
assert.Equal(t, bigD, b)
Expand Down

0 comments on commit 936ee48

Please sign in to comment.