diff --git a/errors/error.go b/errors/error.go new file mode 100644 index 0000000000..6abb08d3cf --- /dev/null +++ b/errors/error.go @@ -0,0 +1,113 @@ +// Contains utilities to use to create and consume simple errors. +package errors + +import ( + "fmt" + + "github.com/pkg/errors" +) + +// A generic error code type. +type ErrorCode = string + +type err struct { + code ErrorCode + message string +} + +func (e *err) Error() string { + return fmt.Sprintf("[%v] %v", e.code, e.message) +} + +func (e *err) Code() ErrorCode { + return e.code +} + +type errorWithCause struct { + *err + cause error +} + +func (e *errorWithCause) Error() string { + return fmt.Sprintf("%v, caused by: %v", e.err.Error(), errors.Cause(e)) +} + +func (e *errorWithCause) Cause() error { + return e.cause +} + +// Creates a new error using an error code and a message. +func Errorf(errorCode ErrorCode, msgFmt string, args ...interface{}) error { + return &err{ + code: errorCode, + message: fmt.Sprintf(msgFmt, args...), + } +} + +// Wraps a root cause error with another. This is useful to unify an error type in a package. +func Wrapf(code ErrorCode, cause error, msgFmt string, args ...interface{}) error { + return &errorWithCause{ + err: &err{ + code: code, + message: fmt.Sprintf(msgFmt, args...), + }, + cause: cause, + } +} + +// Gets the error code of the passed error if it has one. +func GetErrorCode(e error) (code ErrorCode, found bool) { + type coder interface { + Code() ErrorCode + } + + er, ok := e.(coder) + if ok { + return er.Code(), true + } + + return +} + +// Gets whether error is caused by another error with errCode. +func IsCausedBy(e error, errCode ErrorCode) bool { + type causer interface { + Cause() error + } + + for e != nil { + if code, found := GetErrorCode(e); found && code == errCode { + return true + } + + cause, ok := e.(causer) + if !ok { + break + } + + e = cause.Cause() + } + + return false +} + +func IsCausedByError(e, e2 error) bool { + type causer interface { + Cause() error + } + + for e != nil { + if e == e2 { + return true + } + + cause, ok := e.(causer) + if !ok { + break + } + + e = cause.Cause() + } + + return false +} diff --git a/errors/error_test.go b/errors/error_test.go new file mode 100644 index 0000000000..b3e3dd0a51 --- /dev/null +++ b/errors/error_test.go @@ -0,0 +1,47 @@ +package errors + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestErrorf(t *testing.T) { + e := Errorf("Code1", "msg") + assert.NotNil(t, e) + assert.Equal(t, "[Code1] msg", e.Error()) +} + +func TestWrapf(t *testing.T) { + e := Wrapf("Code1", fmt.Errorf("test error"), "msg") + assert.NotNil(t, e) + assert.Equal(t, "[Code1] msg, caused by: test error", e.Error()) +} + +func TestGetErrorCode(t *testing.T) { + e := Errorf("Code1", "msg") + assert.NotNil(t, e) + code, found := GetErrorCode(e) + assert.True(t, found) + assert.Equal(t, "Code1", code) +} + +func TestIsCausedBy(t *testing.T) { + e := Errorf("Code1", "msg") + assert.NotNil(t, e) + + e = Wrapf("Code2", e, "msg") + assert.True(t, IsCausedBy(e, "Code1")) + assert.True(t, IsCausedBy(e, "Code2")) +} + +func TestIsCausedByError(t *testing.T) { + eRoot := Errorf("Code1", "msg") + assert.NotNil(t, eRoot) + e1 := Wrapf("Code2", eRoot, "msg") + assert.True(t, IsCausedByError(e1, eRoot)) + e2 := Wrapf("Code3", e1, "msg") + assert.True(t, IsCausedByError(e2, eRoot)) + assert.True(t, IsCausedByError(e2, e1)) +} diff --git a/storage/cached_rawstore.go b/storage/cached_rawstore.go index 2b539d7bb1..e9d83df6cd 100644 --- a/storage/cached_rawstore.go +++ b/storage/cached_rawstore.go @@ -7,6 +7,8 @@ import ( "runtime/debug" "time" + "github.com/lyft/flytestdlib/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/lyft/flytestdlib/promutils" @@ -75,11 +77,11 @@ func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) ( err = s.cache.Set(key, b, 0) if err != nil { - // TODO Ignore errors in writing to cache? logger.Debugf(ctx, "Failed to Cache the metadata") + err = errors.Wrapf(ErrFailedToWriteCache, err, "Failed to Cache the metadata") } - return ioutils.NewBytesReadCloser(b), nil + return ioutils.NewBytesReadCloser(b), err } // Stores a raw byte array. @@ -94,9 +96,9 @@ func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference, err = s.cache.Set([]byte(reference), buf.Bytes(), neverExpire) if err != nil { s.metrics.CacheWriteError.Inc() + err = errors.Wrapf(ErrFailedToWriteCache, err, "Failed to Cache the metadata") } - // TODO ignore errors? return err } diff --git a/storage/cached_rawstore_test.go b/storage/cached_rawstore_test.go index c5225aa790..6949119627 100644 --- a/storage/cached_rawstore_test.go +++ b/storage/cached_rawstore_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/ioutil" + "math/rand" "runtime/debug" "testing" @@ -92,8 +93,12 @@ func TestCachedRawStore(t *testing.T) { ctx := context.TODO() k1 := DataReference("k1") k2 := DataReference("k2") + bigK := DataReference("bigK") d1 := []byte("abc") d2 := []byte("xyz") + bigD := make([]byte, 1.5*1024*1024) + // #nosec G404 + rand.Read(bigD) writeCalled := false readCalled := false store := &dummyStore{ @@ -113,6 +118,11 @@ func TestCachedRawStore(t *testing.T) { assert.NoError(t, err) assert.Equal(t, d2, b) return nil + } else if reference == "bigK" { + b, err := ioutil.ReadAll(raw) + assert.NoError(t, err) + assert.Equal(t, bigD, b) + return nil } return fmt.Errorf("err") }, @@ -123,6 +133,8 @@ func TestCachedRawStore(t *testing.T) { readCalled = true if reference == "k1" { return ioutils.NewBytesReadCloser(d1), nil + } else if reference == "bigK" { + return ioutils.NewBytesReadCloser(bigD), nil } return nil, fmt.Errorf("err") }, @@ -182,4 +194,18 @@ func TestCachedRawStore(t *testing.T) { assert.False(t, readCalled) }) + t.Run("WriteAndReadBigData", func(t *testing.T) { + writeCalled = false + readCalled = false + err := cStore.WriteRaw(ctx, bigK, int64(len(bigD)), Options{}, bytes.NewReader(bigD)) + assert.True(t, writeCalled) + assert.True(t, IsFailedWriteToCache(err)) + + o, err := cStore.ReadRaw(ctx, bigK) + assert.True(t, IsFailedWriteToCache(err)) + b, err := ioutil.ReadAll(o) + assert.NoError(t, err) + assert.Equal(t, bigD, b) + assert.True(t, readCalled) + }) } diff --git a/storage/protobuf_store.go b/storage/protobuf_store.go index ba11d3311c..6b09a0e6eb 100644 --- a/storage/protobuf_store.go +++ b/storage/protobuf_store.go @@ -17,11 +17,13 @@ import ( ) type protoMetrics struct { - FetchLatency promutils.StopWatch - MarshalTime promutils.StopWatch - UnmarshalTime promutils.StopWatch - MarshalFailure prometheus.Counter - UnmarshalFailure prometheus.Counter + FetchLatency promutils.StopWatch + MarshalTime promutils.StopWatch + UnmarshalTime promutils.StopWatch + MarshalFailure prometheus.Counter + UnmarshalFailure prometheus.Counter + WriteFailureUnrelatedToCache prometheus.Counter + ReadFailureUnrelatedToCache prometheus.Counter } // Implements ProtobufStore to marshal and unmarshal protobufs to/from a RawStore @@ -32,7 +34,9 @@ type DefaultProtobufStore struct { func (s DefaultProtobufStore) ReadProtobuf(ctx context.Context, reference DataReference, msg proto.Message) error { rc, err := s.ReadRaw(ctx, reference) - if err != nil { + if err != nil && !IsFailedWriteToCache(err) { + logger.Errorf(ctx, "Failed to read from the raw store. Error: %v", err) + s.metrics.ReadFailureUnrelatedToCache.Inc() return errs.Wrap(err, fmt.Sprintf("path:%v", reference)) } @@ -68,18 +72,26 @@ func (s DefaultProtobufStore) WriteProtobuf(ctx context.Context, reference DataR return err } - return s.WriteRaw(ctx, reference, int64(len(raw)), opts, bytes.NewReader(raw)) + err = s.WriteRaw(ctx, reference, int64(len(raw)), opts, bytes.NewReader(raw)) + if err != nil && !IsFailedWriteToCache(err) { + logger.Errorf(ctx, "Failed to write to the raw store. Error: %v", err) + s.metrics.WriteFailureUnrelatedToCache.Inc() + return err + } + return nil } func NewDefaultProtobufStore(store RawStore, metricsScope promutils.Scope) DefaultProtobufStore { return DefaultProtobufStore{ RawStore: store, metrics: &protoMetrics{ - FetchLatency: metricsScope.MustNewStopWatch("proto_fetch", "Time to read data before unmarshalling", time.Millisecond), - MarshalTime: metricsScope.MustNewStopWatch("marshal", "Time incurred in marshalling data before writing", time.Millisecond), - UnmarshalTime: metricsScope.MustNewStopWatch("unmarshal", "Time incurred in unmarshalling received data", time.Millisecond), - MarshalFailure: metricsScope.MustNewCounter("marshal_failure", "Failures when marshalling"), - UnmarshalFailure: metricsScope.MustNewCounter("unmarshal_failure", "Failures when unmarshalling"), + FetchLatency: metricsScope.MustNewStopWatch("proto_fetch", "Time to read data before unmarshalling", time.Millisecond), + MarshalTime: metricsScope.MustNewStopWatch("marshal", "Time incurred in marshalling data before writing", time.Millisecond), + UnmarshalTime: metricsScope.MustNewStopWatch("unmarshal", "Time incurred in unmarshalling received data", time.Millisecond), + MarshalFailure: metricsScope.MustNewCounter("marshal_failure", "Failures when marshalling"), + UnmarshalFailure: metricsScope.MustNewCounter("unmarshal_failure", "Failures when unmarshalling"), + WriteFailureUnrelatedToCache: metricsScope.MustNewCounter("write_failure_unrelated_to_cache", "Raw store write failures that are not caused by ErrFailedToWriteCache"), + ReadFailureUnrelatedToCache: metricsScope.MustNewCounter("read_failure_unrelated_to_cache", "Raw store read failures that are not caused by ErrFailedToWriteCache"), }, } } diff --git a/storage/protobuf_store_test.go b/storage/protobuf_store_test.go index 160239bb73..44ac0c4084 100644 --- a/storage/protobuf_store_test.go +++ b/storage/protobuf_store_test.go @@ -2,11 +2,15 @@ package storage import ( "context" + "fmt" + "io" + "math/rand" "testing" "github.com/lyft/flytestdlib/promutils" "github.com/golang/protobuf/proto" + errs "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -14,6 +18,10 @@ type mockProtoMessage struct { X int64 `protobuf:"varint,2,opt,name=x,json=x,proto3" json:"x,omitempty"` } +type mockBigDataProtoMessage struct { + X []byte `protobuf:"bytes,1,opt,name=X,proto3" json:"X,omitempty"` +} + func (mockProtoMessage) Reset() { } @@ -24,6 +32,16 @@ func (m mockProtoMessage) String() string { func (mockProtoMessage) ProtoMessage() { } +func (mockBigDataProtoMessage) Reset() { +} + +func (m mockBigDataProtoMessage) String() string { + return proto.CompactTextString(m) +} + +func (mockBigDataProtoMessage) ProtoMessage() { +} + func TestDefaultProtobufStore_ReadProtobuf(t *testing.T) { t.Run("Read after Write", func(t *testing.T) { testScope := promutils.NewTestScope() @@ -39,3 +57,67 @@ func TestDefaultProtobufStore_ReadProtobuf(t *testing.T) { assert.Equal(t, int64(5), m.X) }) } + +func TestDefaultProtobufStore_BigDataReadAfterWrite(t *testing.T) { + t.Run("Read after Write with Big Data", func(t *testing.T) { + testScope := promutils.NewTestScope() + + s, err := NewDataStore( + &Config{ + Type: TypeMemory, + Cache: CachingConfig{ + MaxSizeMegabytes: 1, + TargetGCPercent: 20, + }, + }, testScope) + assert.NoError(t, err) + + bigD := make([]byte, 1.5*1024*1024) + // #nosec G404 + rand.Read(bigD) + + mockMessage := &mockBigDataProtoMessage{X: bigD} + + err = s.WriteProtobuf(context.TODO(), DataReference("bigK"), Options{}, mockMessage) + assert.NoError(t, err) + + m := &mockBigDataProtoMessage{} + err = s.ReadProtobuf(context.TODO(), DataReference("bigK"), m) + assert.NoError(t, err) + assert.Equal(t, bigD, m.X) + + }) +} + +func TestDefaultProtobufStore_HardErrors(t *testing.T) { + ctx := context.TODO() + k1 := DataReference("k1") + dummyHeadErrorMsg := "Dummy head error" + dummyWriteErrorMsg := "Dummy write error" + dummyReadErrorMsg := "Dummy read error" + store := &dummyStore{ + HeadCb: func(ctx context.Context, reference DataReference) (Metadata, error) { + return MemoryMetadata{}, fmt.Errorf(dummyHeadErrorMsg) + }, + WriteRawCb: func(ctx context.Context, reference DataReference, size int64, opts Options, raw io.Reader) error { + return fmt.Errorf(dummyWriteErrorMsg) + }, + ReadRawCb: func(ctx context.Context, reference DataReference) (io.ReadCloser, error) { + return nil, fmt.Errorf(dummyReadErrorMsg) + }, + } + testScope := promutils.NewTestScope() + pbErroneousStore := NewDefaultProtobufStore(store, testScope) + t.Run("Test if hard write errors are handled correctly", func(t *testing.T) { + err := pbErroneousStore.WriteProtobuf(ctx, k1, Options{}, &mockProtoMessage{X: 5}) + assert.False(t, IsFailedWriteToCache(err)) + assert.Equal(t, dummyWriteErrorMsg, errs.Cause(err).Error()) + }) + + t.Run("Test if hard read errors are handled correctly", func(t *testing.T) { + m := &mockProtoMessage{} + err := pbErroneousStore.ReadProtobuf(ctx, k1, m) + assert.False(t, IsFailedWriteToCache(err)) + assert.Equal(t, dummyReadErrorMsg, errs.Cause(err).Error()) + }) +} diff --git a/storage/stow_store.go b/storage/stow_store.go index b13170bf09..e156f376da 100644 --- a/storage/stow_store.go +++ b/storage/stow_store.go @@ -5,6 +5,8 @@ import ( "io" "time" + "github.com/lyft/flytestdlib/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/lyft/flytestdlib/promutils" @@ -116,7 +118,7 @@ func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.Re } if sizeBytes/MiB > GetConfig().Limits.GetLimitMegabytes { - return nil, ErrExceedsLimit + return nil, errors.Wrapf(ErrExceedsLimit, err, "limit exceeded") } return item.Open() diff --git a/storage/utils.go b/storage/utils.go index 62fb2aa22c..9b790c6120 100644 --- a/storage/utils.go +++ b/storage/utils.go @@ -1,18 +1,26 @@ package storage import ( - "fmt" "os" + errors2 "github.com/lyft/flytestdlib/errors" + "github.com/graymeta/stow" "github.com/pkg/errors" ) -var ErrExceedsLimit = fmt.Errorf("limit exceeded") +var ( + ErrExceedsLimit errors2.ErrorCode = "LIMIT_EXCEEDED" + ErrFailedToWriteCache errors2.ErrorCode = "CACHE_WRITE_FAILED" +) // Gets a value indicating whether the underlying error is a Not Found error. func IsNotFound(err error) bool { - if root := errors.Cause(err); root == stow.ErrNotFound || os.IsNotExist(root) { + if root := errors.Cause(err); os.IsNotExist(root) { + return true + } + + if errors2.IsCausedByError(err, stow.ErrNotFound) { return true } @@ -30,5 +38,9 @@ func IsExists(err error) bool { // Gets a value indicating whether the root cause of error is a "limit exceeded" error. func IsExceedsLimit(err error) bool { - return errors.Cause(err) == ErrExceedsLimit + return errors2.IsCausedBy(err, ErrExceedsLimit) +} + +func IsFailedWriteToCache(err error) bool { + return errors2.IsCausedBy(err, ErrFailedToWriteCache) } diff --git a/storage/utils_test.go b/storage/utils_test.go new file mode 100644 index 0000000000..8618310f4a --- /dev/null +++ b/storage/utils_test.go @@ -0,0 +1,49 @@ +package storage + +import ( + "os" + "syscall" + "testing" + + "github.com/graymeta/stow" + flyteerrors "github.com/lyft/flytestdlib/errors" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" +) + +func TestIsNotFound(t *testing.T) { + sysError := &os.PathError{Err: syscall.ENOENT} + assert.True(t, IsNotFound(sysError)) + flyteError := errors.Wrap(sysError, "Wrapping \"system not found\" error") + assert.True(t, IsNotFound(flyteError)) + secondLevelError := errors.Wrap(flyteError, "Higher level error") + assert.True(t, IsNotFound(secondLevelError)) + + // more for stow errors + stowNotFoundError := stow.ErrNotFound + assert.True(t, IsNotFound(stowNotFoundError)) + flyteError = errors.Wrap(stowNotFoundError, "Wrapping stow.ErrNotFound") + assert.True(t, IsNotFound(flyteError)) + secondLevelError = errors.Wrap(flyteError, "Higher level error wrapper of the stow.ErrNotFound error") + assert.True(t, IsNotFound(secondLevelError)) +} + +func TestIsExceedsLimit(t *testing.T) { + sysError := &os.PathError{Err: syscall.ENOENT} + exceedsLimitError := flyteerrors.Wrapf(ErrExceedsLimit, sysError, "An error wrapped in ErrExceedsLimits") + failedToWriteCacheError := flyteerrors.Wrapf(ErrFailedToWriteCache, sysError, "An error wrapped in ErrFailedToWriteCache") + + assert.True(t, IsExceedsLimit(exceedsLimitError)) + assert.False(t, IsExceedsLimit(failedToWriteCacheError)) + assert.False(t, IsExceedsLimit(sysError)) +} + +func TestIsFailedWriteToCache(t *testing.T) { + sysError := &os.PathError{Err: syscall.ENOENT} + exceedsLimitError := flyteerrors.Wrapf(ErrExceedsLimit, sysError, "An error wrapped in ErrExceedsLimits") + failedToWriteCacheError := flyteerrors.Wrapf(ErrFailedToWriteCache, sysError, "An error wrapped in ErrFailedToWriteCache") + + assert.False(t, IsFailedWriteToCache(exceedsLimitError)) + assert.True(t, IsFailedWriteToCache(failedToWriteCacheError)) + assert.False(t, IsFailedWriteToCache(sysError)) +}