Skip to content

Commit

Permalink
Update create container if not exists logic (flyteorg#77)
Browse files Browse the repository at this point in the history
  • Loading branch information
Katrina Rogan authored Mar 16, 2021
1 parent d411f70 commit a07acb2
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 23 deletions.
56 changes: 41 additions & 15 deletions storage/stow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ var fQNFn = map[string]func(string) DataReference{
},
}

// Checks if the error is AWS S3 bucket not found error
func awsBucketIsNotFound(err error) bool {
if awsErr, errOk := errs.Cause(err).(awserr.Error); errOk {
return awsErr.Code() == s32.ErrCodeNoSuchBucket
}

return false
}

// Checks if the error is AWS S3 bucket already exists error.
func awsBucketAlreadyExists(err error) bool {
if IsExists(err) {
Expand Down Expand Up @@ -110,22 +119,30 @@ type StowStore struct {
baseContainerFQN DataReference
}

func (s *StowStore) LoadContainer(ctx context.Context, container string, createIfNotFound bool) (stow.Container, error) {
// TODO: As of stow v0.2.6 elides the container lookup when a bucket region is set,
// so we always just attempt to create it when createIfNotFound is true.

if createIfNotFound {
logger.Infof(ctx, "Attempting to create container [%s]", container)
_, err := s.loc.CreateContainer(container)
if err != nil && !awsBucketAlreadyExists(err) && !IsExists(err) {
return nil, fmt.Errorf("unable to initialize container [%v]. Error: %v", container, err)
}
func (s *StowStore) CreateContainer(ctx context.Context, container string) (stow.Container, error) {
logger.Infof(ctx, "Attempting to create container [%s]", container)
c, err := s.loc.CreateContainer(container)
if err != nil && !awsBucketAlreadyExists(err) && !IsExists(err) {
return nil, fmt.Errorf("unable to initialize container [%v]. Error: %v", container, err)
}
return c, nil
}

func (s *StowStore) LoadContainer(ctx context.Context, container string, createIfNotFound bool) (stow.Container, error) {
c, err := s.loc.Container(container)
if err != nil {
logger.Errorf(ctx, "Container [%s] lookup failed. Error %s", container, err)
return nil, err
// IsNotFound is not always guaranteed to be returned if the underlying container doesn't exist!
// As of stow v0.2.6, the call to get container elides the lookup when a bucket region is set for S3 containers.
if IsNotFound(err) && createIfNotFound {
c, err = s.CreateContainer(ctx, container)
if err != nil {
logger.Errorf(ctx, "Call to create container [%s] failed. Error %s", container, err)
return nil, err
}
} else {
logger.Errorf(ctx, "Container [%s] lookup failed. Error %s", container, err)
return nil, err
}
}
return c, nil
}
Expand Down Expand Up @@ -180,7 +197,7 @@ func (s *StowStore) Head(ctx context.Context, reference DataReference) (Metadata
}
}

if IsNotFound(err) {
if IsNotFound(err) || awsBucketIsNotFound(err) {
return StowMetadata{exists: false}, nil
}

Expand Down Expand Up @@ -235,8 +252,17 @@ func (s *StowStore) WriteRaw(ctx context.Context, reference DataReference, size
t := s.metrics.WriteLatency.Start(ctx)
_, err = container.Put(k, raw, size, opts.Metadata)
if err != nil {
incFailureCounterForError(ctx, s.metrics.WriteFailure, err)
return errs.Wrapf(err, "Failed to write data [%vb] to path [%v].", size, k)
// If this error is due to the bucket not existing, first attempt to create it and retry the getContainer call.
if IsNotFound(err) || awsBucketIsNotFound(err) {
container, err = s.CreateContainer(ctx, c)
if err == nil {
s.dynamicContainerMap.Store(container, c)
}
}
if err != nil {
incFailureCounterForError(ctx, s.metrics.WriteFailure, err)
return errs.Wrapf(err, "Failed to write data [%vb] to path [%v].", size, k)
}
}

t.Stop()
Expand Down
91 changes: 83 additions & 8 deletions storage/stow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"bytes"
"context"
errors2 "errors"
"fmt"
"io"
"io/ioutil"
Expand All @@ -12,6 +13,9 @@ import (
"testing"
"time"

"github.com/aws/aws-sdk-go/aws/awserr"
s32 "github.com/aws/aws-sdk-go/service/s3"

"github.com/graymeta/stow/google"
"github.com/graymeta/stow/local"
"github.com/pkg/errors"
Expand Down Expand Up @@ -43,6 +47,7 @@ func (m mockStowLoc) CreateContainer(name string) (stow.Container, error) {
type mockStowContainer struct {
id string
items map[string]mockStowItem
putCB func(name string, r io.Reader, size int64, metadata map[string]interface{}) (stow.Item, error)
}

func (m mockStowContainer) ID() string {
Expand Down Expand Up @@ -70,6 +75,9 @@ func (mockStowContainer) RemoveItem(id string) error {
}

func (m *mockStowContainer) Put(name string, r io.Reader, size int64, metadata map[string]interface{}) (stow.Item, error) {
if m.putCB != nil {
return m.putCB(name, r, size, metadata)
}
item := mockStowItem{url: name, size: size}
m.items[name] = item
return item, nil
Expand Down Expand Up @@ -124,6 +132,17 @@ func (mockStowItem) Metadata() (map[string]interface{}, error) {
return map[string]interface{}{}, nil
}

func TestAwsBucketIsNotFound(t *testing.T) {
t.Run("detect is not found", func(t *testing.T) {
err := awserr.New(s32.ErrCodeNoSuchBucket, "foo", errors2.New("foo"))
assert.True(t, awsBucketIsNotFound(err))
})
t.Run("do not detect random errors", func(t *testing.T) {
err := awserr.New(s32.ErrCodeInvalidObjectState, "foo", errors2.New("foo"))
assert.False(t, awsBucketIsNotFound(err))
})
}

func TestStowStore_ReadRaw(t *testing.T) {
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey)

Expand Down Expand Up @@ -410,7 +429,7 @@ func TestLoadContainer(t *testing.T) {
loc: &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
return newMockStowContainer(container), nil
return newMockStowContainer(container), stow.ErrNotFound
}
return nil, fmt.Errorf("container is not supported")
},
Expand All @@ -430,10 +449,7 @@ func TestLoadContainer(t *testing.T) {
stowStore := StowStore{
loc: &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
return nil, stow.ErrNotFound
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
Expand All @@ -451,14 +467,73 @@ func TestLoadContainer(t *testing.T) {
loc: &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
return newMockStowContainer(container), nil
return newMockStowContainer(container), stow.ErrNotFound
}
return nil, fmt.Errorf("container is not supported")
},
},
}
stowContainer, err := stowStore.LoadContainer(context.Background(), "container", false)
_, err := stowStore.LoadContainer(context.Background(), "container", false)
assert.EqualError(t, err, stow.ErrNotFound.Error())
})
}

func TestStowStore_WriteRaw(t *testing.T) {
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey)
const container = "container"
fn := fQNFn["s3"]
t.Run("create container when not found", func(t *testing.T) {
testScope := promutils.NewTestScope()
var createCalled bool
s, err := NewStowRawStore(fn(container), &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
mockStowContainer := newMockStowContainer(container)
mockStowContainer.putCB = func(name string, r io.Reader, size int64, metadata map[string]interface{}) (stow.Item, error) {
return nil, awserr.New(s32.ErrCodeNoSuchBucket, "foo", errors2.New("foo"))
}
return mockStowContainer, nil
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
createCalled = true
if name == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
}, true, testScope)
assert.NoError(t, err)
assert.Equal(t, container, stowContainer.ID())
err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 0, Options{}, bytes.NewReader([]byte{}))
assert.NoError(t, err)
assert.True(t, createCalled)
var containerStoredInDynamicContainerMap bool
s.dynamicContainerMap.Range(func(key, value interface{}) bool {
if value == container {
containerStoredInDynamicContainerMap = true
return true
}
return false
})
assert.True(t, containerStoredInDynamicContainerMap)
})
t.Run("bubble up generic put errors", func(t *testing.T) {
testScope := promutils.NewTestScope()
s, err := NewStowRawStore(fn(container), &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
mockStowContainer := newMockStowContainer(container)
mockStowContainer.putCB = func(name string, r io.Reader, size int64, metadata map[string]interface{}) (stow.Item, error) {
return nil, errors2.New("foo")
}
return mockStowContainer, nil
}
return nil, fmt.Errorf("container is not supported")
},
}, true, testScope)
assert.NoError(t, err)
err = s.WriteRaw(context.TODO(), DataReference("s3://container/path"), 0, Options{}, bytes.NewReader([]byte{}))
assert.EqualError(t, err, "Failed to write data [0b] to path [path].: foo")
})
}

0 comments on commit a07acb2

Please sign in to comment.