Skip to content

Commit

Permalink
Return errors from cached raw store (#28)
Browse files Browse the repository at this point in the history
* Return errors from cached raw store

* gracefully ignore cache write errors in ReadProtobuf and WriteProtobuf

* fix fixture

* Wrapf the use of ErrExceedsLimit

* nosec math/rand

* add test with caching failures for protobuf store

* log and add metrics to record failures not caused by ErrFailedToWriteCache
  • Loading branch information
EngHabu authored and bnsblue committed Jul 26, 2019
1 parent 0d50ff5 commit c0e1a93
Show file tree
Hide file tree
Showing 9 changed files with 365 additions and 20 deletions.
113 changes: 113 additions & 0 deletions errors/error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Contains utilities to use to create and consume simple errors.
package errors

import (
"fmt"

"github.com/pkg/errors"
)

// A generic error code type.
type ErrorCode = string

type err struct {
code ErrorCode
message string
}

func (e *err) Error() string {
return fmt.Sprintf("[%v] %v", e.code, e.message)
}

func (e *err) Code() ErrorCode {
return e.code
}

type errorWithCause struct {
*err
cause error
}

func (e *errorWithCause) Error() string {
return fmt.Sprintf("%v, caused by: %v", e.err.Error(), errors.Cause(e))
}

func (e *errorWithCause) Cause() error {
return e.cause
}

// Creates a new error using an error code and a message.
func Errorf(errorCode ErrorCode, msgFmt string, args ...interface{}) error {
return &err{
code: errorCode,
message: fmt.Sprintf(msgFmt, args...),
}
}

// Wraps a root cause error with another. This is useful to unify an error type in a package.
func Wrapf(code ErrorCode, cause error, msgFmt string, args ...interface{}) error {
return &errorWithCause{
err: &err{
code: code,
message: fmt.Sprintf(msgFmt, args...),
},
cause: cause,
}
}

// Gets the error code of the passed error if it has one.
func GetErrorCode(e error) (code ErrorCode, found bool) {
type coder interface {
Code() ErrorCode
}

er, ok := e.(coder)
if ok {
return er.Code(), true
}

return
}

// Gets whether error is caused by another error with errCode.
func IsCausedBy(e error, errCode ErrorCode) bool {
type causer interface {
Cause() error
}

for e != nil {
if code, found := GetErrorCode(e); found && code == errCode {
return true
}

cause, ok := e.(causer)
if !ok {
break
}

e = cause.Cause()
}

return false
}

func IsCausedByError(e, e2 error) bool {
type causer interface {
Cause() error
}

for e != nil {
if e == e2 {
return true
}

cause, ok := e.(causer)
if !ok {
break
}

e = cause.Cause()
}

return false
}
47 changes: 47 additions & 0 deletions errors/error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package errors

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestErrorf(t *testing.T) {
e := Errorf("Code1", "msg")
assert.NotNil(t, e)
assert.Equal(t, "[Code1] msg", e.Error())
}

func TestWrapf(t *testing.T) {
e := Wrapf("Code1", fmt.Errorf("test error"), "msg")
assert.NotNil(t, e)
assert.Equal(t, "[Code1] msg, caused by: test error", e.Error())
}

func TestGetErrorCode(t *testing.T) {
e := Errorf("Code1", "msg")
assert.NotNil(t, e)
code, found := GetErrorCode(e)
assert.True(t, found)
assert.Equal(t, "Code1", code)
}

func TestIsCausedBy(t *testing.T) {
e := Errorf("Code1", "msg")
assert.NotNil(t, e)

e = Wrapf("Code2", e, "msg")
assert.True(t, IsCausedBy(e, "Code1"))
assert.True(t, IsCausedBy(e, "Code2"))
}

func TestIsCausedByError(t *testing.T) {
eRoot := Errorf("Code1", "msg")
assert.NotNil(t, eRoot)
e1 := Wrapf("Code2", eRoot, "msg")
assert.True(t, IsCausedByError(e1, eRoot))
e2 := Wrapf("Code3", e1, "msg")
assert.True(t, IsCausedByError(e2, eRoot))
assert.True(t, IsCausedByError(e2, e1))
}
8 changes: 5 additions & 3 deletions storage/cached_rawstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"runtime/debug"
"time"

"github.com/lyft/flytestdlib/errors"

"github.com/prometheus/client_golang/prometheus"

"github.com/lyft/flytestdlib/promutils"
Expand Down Expand Up @@ -75,11 +77,11 @@ func (s *cachedRawStore) ReadRaw(ctx context.Context, reference DataReference) (

err = s.cache.Set(key, b, 0)
if err != nil {
// TODO Ignore errors in writing to cache?
logger.Debugf(ctx, "Failed to Cache the metadata")
err = errors.Wrapf(ErrFailedToWriteCache, err, "Failed to Cache the metadata")
}

return ioutils.NewBytesReadCloser(b), nil
return ioutils.NewBytesReadCloser(b), err
}

// Stores a raw byte array.
Expand All @@ -94,9 +96,9 @@ func (s *cachedRawStore) WriteRaw(ctx context.Context, reference DataReference,
err = s.cache.Set([]byte(reference), buf.Bytes(), neverExpire)
if err != nil {
s.metrics.CacheWriteError.Inc()
err = errors.Wrapf(ErrFailedToWriteCache, err, "Failed to Cache the metadata")
}

// TODO ignore errors?
return err
}

Expand Down
26 changes: 26 additions & 0 deletions storage/cached_rawstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math/rand"
"runtime/debug"
"testing"

Expand Down Expand Up @@ -92,8 +93,12 @@ func TestCachedRawStore(t *testing.T) {
ctx := context.TODO()
k1 := DataReference("k1")
k2 := DataReference("k2")
bigK := DataReference("bigK")
d1 := []byte("abc")
d2 := []byte("xyz")
bigD := make([]byte, 1.5*1024*1024)
// #nosec G404
rand.Read(bigD)
writeCalled := false
readCalled := false
store := &dummyStore{
Expand All @@ -113,6 +118,11 @@ func TestCachedRawStore(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, d2, b)
return nil
} else if reference == "bigK" {
b, err := ioutil.ReadAll(raw)
assert.NoError(t, err)
assert.Equal(t, bigD, b)
return nil
}
return fmt.Errorf("err")
},
Expand All @@ -123,6 +133,8 @@ func TestCachedRawStore(t *testing.T) {
readCalled = true
if reference == "k1" {
return ioutils.NewBytesReadCloser(d1), nil
} else if reference == "bigK" {
return ioutils.NewBytesReadCloser(bigD), nil
}
return nil, fmt.Errorf("err")
},
Expand Down Expand Up @@ -182,4 +194,18 @@ func TestCachedRawStore(t *testing.T) {
assert.False(t, readCalled)
})

t.Run("WriteAndReadBigData", func(t *testing.T) {
writeCalled = false
readCalled = false
err := cStore.WriteRaw(ctx, bigK, int64(len(bigD)), Options{}, bytes.NewReader(bigD))
assert.True(t, writeCalled)
assert.True(t, IsFailedWriteToCache(err))

o, err := cStore.ReadRaw(ctx, bigK)
assert.True(t, IsFailedWriteToCache(err))
b, err := ioutil.ReadAll(o)
assert.NoError(t, err)
assert.Equal(t, bigD, b)
assert.True(t, readCalled)
})
}
36 changes: 24 additions & 12 deletions storage/protobuf_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ import (
)

type protoMetrics struct {
FetchLatency promutils.StopWatch
MarshalTime promutils.StopWatch
UnmarshalTime promutils.StopWatch
MarshalFailure prometheus.Counter
UnmarshalFailure prometheus.Counter
FetchLatency promutils.StopWatch
MarshalTime promutils.StopWatch
UnmarshalTime promutils.StopWatch
MarshalFailure prometheus.Counter
UnmarshalFailure prometheus.Counter
WriteFailureUnrelatedToCache prometheus.Counter
ReadFailureUnrelatedToCache prometheus.Counter
}

// Implements ProtobufStore to marshal and unmarshal protobufs to/from a RawStore
Expand All @@ -32,7 +34,9 @@ type DefaultProtobufStore struct {

func (s DefaultProtobufStore) ReadProtobuf(ctx context.Context, reference DataReference, msg proto.Message) error {
rc, err := s.ReadRaw(ctx, reference)
if err != nil {
if err != nil && !IsFailedWriteToCache(err) {
logger.Errorf(ctx, "Failed to read from the raw store. Error: %v", err)
s.metrics.ReadFailureUnrelatedToCache.Inc()
return errs.Wrap(err, fmt.Sprintf("path:%v", reference))
}

Expand Down Expand Up @@ -68,18 +72,26 @@ func (s DefaultProtobufStore) WriteProtobuf(ctx context.Context, reference DataR
return err
}

return s.WriteRaw(ctx, reference, int64(len(raw)), opts, bytes.NewReader(raw))
err = s.WriteRaw(ctx, reference, int64(len(raw)), opts, bytes.NewReader(raw))
if err != nil && !IsFailedWriteToCache(err) {
logger.Errorf(ctx, "Failed to write to the raw store. Error: %v", err)
s.metrics.WriteFailureUnrelatedToCache.Inc()
return err
}
return nil
}

func NewDefaultProtobufStore(store RawStore, metricsScope promutils.Scope) DefaultProtobufStore {
return DefaultProtobufStore{
RawStore: store,
metrics: &protoMetrics{
FetchLatency: metricsScope.MustNewStopWatch("proto_fetch", "Time to read data before unmarshalling", time.Millisecond),
MarshalTime: metricsScope.MustNewStopWatch("marshal", "Time incurred in marshalling data before writing", time.Millisecond),
UnmarshalTime: metricsScope.MustNewStopWatch("unmarshal", "Time incurred in unmarshalling received data", time.Millisecond),
MarshalFailure: metricsScope.MustNewCounter("marshal_failure", "Failures when marshalling"),
UnmarshalFailure: metricsScope.MustNewCounter("unmarshal_failure", "Failures when unmarshalling"),
FetchLatency: metricsScope.MustNewStopWatch("proto_fetch", "Time to read data before unmarshalling", time.Millisecond),
MarshalTime: metricsScope.MustNewStopWatch("marshal", "Time incurred in marshalling data before writing", time.Millisecond),
UnmarshalTime: metricsScope.MustNewStopWatch("unmarshal", "Time incurred in unmarshalling received data", time.Millisecond),
MarshalFailure: metricsScope.MustNewCounter("marshal_failure", "Failures when marshalling"),
UnmarshalFailure: metricsScope.MustNewCounter("unmarshal_failure", "Failures when unmarshalling"),
WriteFailureUnrelatedToCache: metricsScope.MustNewCounter("write_failure_unrelated_to_cache", "Raw store write failures that are not caused by ErrFailedToWriteCache"),
ReadFailureUnrelatedToCache: metricsScope.MustNewCounter("read_failure_unrelated_to_cache", "Raw store read failures that are not caused by ErrFailedToWriteCache"),
},
}
}
Loading

0 comments on commit c0e1a93

Please sign in to comment.