From c9d48377fbb6b20fa5650d2edae61182acc8f40c Mon Sep 17 00:00:00 2001 From: Andrew Dawson Date: Wed, 6 May 2020 00:59:40 -0700 Subject: [PATCH] Add iterators and writers for persistence and blobstore for scanner workflow (#3234) --- common/blobstore/filestore/client.go | 5 + common/blobstore/interface.go | 1 + common/blobstore/interface_mock.go | 15 ++ common/blobstore/retryableClient.go | 104 +++++++++++ common/collection/pagingIterator.go | 1 + common/pagination/interface.go | 73 ++++++++ common/pagination/iterator.go | 110 ++++++++++++ common/{util => pagination}/iterator_test.go | 75 +++++--- common/pagination/writer.go | 93 ++++++++++ common/pagination/writerIterator_test.go | 125 +++++++++++++ common/pagination/writer_test.go | 134 ++++++++++++++ common/util/bufferedWriter.go | 145 --------------- common/util/bufferedWriterIterator_test.go | 60 ------- common/util/bufferedWriter_test.go | 115 ------------ common/util/iterator.go | 165 ------------------ .../executions/common/blobstoreIterator.go | 102 +++++++++++ .../executions/common/blobstoreWriter.go | 127 ++++++++++++++ .../scanner/executions/common/interfaces.go | 19 ++ .../executions/common/persistenceIterator.go | 107 ++++++++++++ .../executions/common/persistenceRetryer.go | 140 +++++++++++++++ .../worker/scanner/executions/common/types.go | 9 + .../executions/common/writerIterator_test.go | 153 ++++++++++++++++ 22 files changed, 1366 insertions(+), 512 deletions(-) create mode 100644 common/blobstore/retryableClient.go create mode 100644 common/pagination/interface.go create mode 100644 common/pagination/iterator.go rename common/{util => pagination}/iterator_test.go (57%) create mode 100644 common/pagination/writer.go create mode 100644 common/pagination/writerIterator_test.go create mode 100644 common/pagination/writer_test.go delete mode 100644 common/util/bufferedWriter.go delete mode 100644 common/util/bufferedWriterIterator_test.go delete mode 100644 common/util/bufferedWriter_test.go delete mode 100644 common/util/iterator.go create mode 100644 service/worker/scanner/executions/common/blobstoreIterator.go create mode 100644 service/worker/scanner/executions/common/blobstoreWriter.go create mode 100644 service/worker/scanner/executions/common/persistenceIterator.go create mode 100644 service/worker/scanner/executions/common/persistenceRetryer.go create mode 100644 service/worker/scanner/executions/common/writerIterator_test.go diff --git a/common/blobstore/filestore/client.go b/common/blobstore/filestore/client.go index 8367dc7f7ea..f151a9c2447 100644 --- a/common/blobstore/filestore/client.go +++ b/common/blobstore/filestore/client.go @@ -109,6 +109,11 @@ func (c *client) Delete(_ context.Context, request *blobstore.DeleteRequest) (*b return &blobstore.DeleteResponse{}, nil } +// IsRetryableError returns true if the error is retryable false otherwise +func (c *client) IsRetryableError(err error) bool { + return false +} + func (c *client) deserializeBlob(data []byte) (blobstore.Blob, error) { var blob blobstore.Blob if err := json.Unmarshal(data, &blob); err != nil { diff --git a/common/blobstore/interface.go b/common/blobstore/interface.go index c860d8345e0..5d5e60a3787 100644 --- a/common/blobstore/interface.go +++ b/common/blobstore/interface.go @@ -31,6 +31,7 @@ type ( Get(context.Context, *GetRequest) (*GetResponse, error) Exists(context.Context, *ExistsRequest) (*ExistsResponse, error) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) + IsRetryableError(error) bool } // PutRequest is the request to Put diff --git a/common/blobstore/interface_mock.go b/common/blobstore/interface_mock.go index 711b89b53bb..ceba555f15a 100644 --- a/common/blobstore/interface_mock.go +++ b/common/blobstore/interface_mock.go @@ -126,3 +126,18 @@ func (_m *MockClient) Put(_a0 context.Context, _a1 *PutRequest) (*PutResponse, e return r0, r1 } + +func (_m *MockClient) IsRetryableError(_a0 error) bool { + ret := _m.Called(_a0) + + var r0 bool + if rf, ok := ret.Get(0).(func(error) bool); ok { + r0 = rf(_a0) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(bool) + } + } + + return r0 +} diff --git a/common/blobstore/retryableClient.go b/common/blobstore/retryableClient.go new file mode 100644 index 00000000000..b4a1d2556d2 --- /dev/null +++ b/common/blobstore/retryableClient.go @@ -0,0 +1,104 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package blobstore + +import ( + "context" + + "github.com/uber/cadence/common/backoff" +) + +type ( + retryableClient struct { + client Client + policy backoff.RetryPolicy + } +) + +// NewRetryableClient constructs a blobstorre client which retries transient errors. +func NewRetryableClient(client Client, policy backoff.RetryPolicy) Client { + return &retryableClient{ + client: client, + policy: policy, + } +} + +func (c *retryableClient) Put(ctx context.Context, req *PutRequest) (*PutResponse, error) { + var resp *PutResponse + var err error + op := func() error { + resp, err = c.client.Put(ctx, req) + return err + } + err = backoff.Retry(op, c.policy, c.client.IsRetryableError) + if err != nil { + return nil, err + } + return resp, nil +} + +func (c *retryableClient) Get(ctx context.Context, req *GetRequest) (*GetResponse, error) { + var resp *GetResponse + var err error + op := func() error { + resp, err = c.client.Get(ctx, req) + return err + } + err = backoff.Retry(op, c.policy, c.client.IsRetryableError) + if err != nil { + return nil, err + } + return resp, nil +} + +func (c *retryableClient) Exists(ctx context.Context, req *ExistsRequest) (*ExistsResponse, error) { + var resp *ExistsResponse + var err error + op := func() error { + resp, err = c.client.Exists(ctx, req) + return err + } + err = backoff.Retry(op, c.policy, c.client.IsRetryableError) + if err != nil { + return nil, err + } + return resp, nil +} + +func (c *retryableClient) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) { + var resp *DeleteResponse + var err error + op := func() error { + resp, err = c.client.Delete(ctx, req) + return err + } + err = backoff.Retry(op, c.policy, c.client.IsRetryableError) + if err != nil { + return nil, err + } + return resp, nil +} + +func (c *retryableClient) IsRetryableError(err error) bool { + return c.client.IsRetryableError(err) +} diff --git a/common/collection/pagingIterator.go b/common/collection/pagingIterator.go index 51db5750702..52fa2b6968e 100644 --- a/common/collection/pagingIterator.go +++ b/common/collection/pagingIterator.go @@ -35,6 +35,7 @@ type ( ) // NewPagingIterator create a new paging iterator +// TODO: this implementation should be removed in favor of pagination/iterator.go func NewPagingIterator(paginationFn PaginationFn) Iterator { iter := &PagingIteratorImpl{ paginationFn: paginationFn, diff --git a/common/pagination/interface.go b/common/pagination/interface.go new file mode 100644 index 00000000000..cb9626512d5 --- /dev/null +++ b/common/pagination/interface.go @@ -0,0 +1,73 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package pagination + +import "errors" + +// ErrIteratorFinished indicates that Next was called on a finished iterator +var ErrIteratorFinished = errors.New("iterator has reached end") + +type ( + // Page contains a PageToken which identifies the current page, + // a PageToken which identifies the next page and a list of Entity. + Page struct { + NextToken PageToken + CurrentToken PageToken + Entities []Entity + } + // Entity is a generic type which can be operated on by Iterator and Writer + Entity interface{} + // PageToken identifies a page + PageToken interface{} +) + +type ( + // WriteFn writes given Page to underlying sink. + // The Pages's NextToken will always be nil, its the responsibility of WriteFn to + // construct and return the next PageToken, or return an error on failure. + WriteFn func(Page) (PageToken, error) + // ShouldFlushFn returns true if given page should be flushed false otherwise. + ShouldFlushFn func(Page) bool + // FetchFn fetches Page from PageToken. + // Once a page with nil NextToken is returned no more pages will be fetched. + FetchFn func(PageToken) (Page, error) +) + +type ( + // Iterator is used to get entities from a collection of pages. + // When HasNext returns true it is guaranteed that Next will not return an error. + // Once iterator returns an error it will never make progress again and will always return that same error. + // Iterator is not thread safe and does not make defensive in or out copies. + Iterator interface { + Next() (Entity, error) + HasNext() bool + } + // Writer is used to buffer and write entities to underlying store. + Writer interface { + Add(Entity) error + Flush() error + FlushedPages() []PageToken + FirstFlushedPage() PageToken + LastFlushedPage() PageToken + } +) diff --git a/common/pagination/iterator.go b/common/pagination/iterator.go new file mode 100644 index 00000000000..908c84c4adf --- /dev/null +++ b/common/pagination/iterator.go @@ -0,0 +1,110 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package pagination + +type ( + iterator struct { + page Page + entityIndex int + + nextEntity Entity + nextError error + + fetchFn FetchFn + } +) + +// NewIterator constructs a new Iterator +func NewIterator( + startingPageToken PageToken, + fetchFn FetchFn, +) Iterator { + itr := &iterator{ + page: Page{ + Entities: nil, + CurrentToken: nil, + NextToken: startingPageToken, + }, + entityIndex: 0, + fetchFn: fetchFn, + } + itr.advance(true) + return itr +} + +// Next returns the next Entity or error. +// Returning nil, nil is valid if that is what the provided fetch function provided. +func (i *iterator) Next() (Entity, error) { + entity := i.nextEntity + error := i.nextError + i.advance(false) + return entity, error +} + +// HasNext returns true if there is a next element. There is considered to be a next element +// As long as a fatal error has not occurred and the iterator has not reached the end. +func (i *iterator) HasNext() bool { + return i.nextError == nil +} + +func (i *iterator) advance(firstPage bool) { + if !i.HasNext() && !firstPage { + return + } + if i.entityIndex < len(i.page.Entities) { + i.consume() + } else { + if err := i.advanceToNonEmptyPage(firstPage); err != nil { + i.terminate(err) + } else { + i.consume() + } + } +} + +func (i *iterator) advanceToNonEmptyPage(firstPage bool) error { + if i.page.NextToken == nil && !firstPage { + return ErrIteratorFinished + } + nextPage, err := i.fetchFn(i.page.NextToken) + if err != nil { + return err + } + i.page = nextPage + if len(i.page.Entities) != 0 { + i.entityIndex = 0 + return nil + } + return i.advanceToNonEmptyPage(false) +} + +func (i *iterator) consume() { + i.nextEntity = i.page.Entities[i.entityIndex] + i.nextError = nil + i.entityIndex++ +} + +func (i *iterator) terminate(err error) { + i.nextEntity = nil + i.nextError = err +} diff --git a/common/util/iterator_test.go b/common/pagination/iterator_test.go similarity index 57% rename from common/util/iterator_test.go rename to common/pagination/iterator_test.go index 0e44e9c3f34..064fa73861c 100644 --- a/common/util/iterator_test.go +++ b/common/pagination/iterator_test.go @@ -20,7 +20,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE // SOFTWARE. -package util +package pagination import ( "errors" @@ -30,13 +30,12 @@ import ( "github.com/stretchr/testify/suite" ) -var getMap = map[int][]byte{ +var fetchMap = map[PageToken][]Entity{ 0: nil, 1: {}, - 2: []byte("\r\n\r\n\r\n"), - 3: []byte("\"one\"\r\n\"two\"\r\n"), - 4: []byte("\"three\"\r\n\"four\"\r\n\r\n\"five\"\r\n"), - 5: []byte("\r\n\"six\"\r\n\"seven\"\r\n\"eight\"\r\n"), + 2: {"one", "two", "three"}, + 3: {"four", "five", "six", "seven"}, + 4: {"eight"}, } type IteratorSuite struct { @@ -53,52 +52,74 @@ func (s *IteratorSuite) SetupTest() { } func (s *IteratorSuite) TestInitializedToEmpty() { - getFn := func(page int) ([]byte, error) { - return getMap[page], nil + fetchFn := func(token PageToken) (Page, error) { + if token.(int) == 2 { + return Page{ + CurrentToken: token, + NextToken: nil, + Entities: nil, + }, nil + } + return Page{ + CurrentToken: token, + NextToken: token.(int) + 1, + Entities: fetchMap[token], + }, nil } - itr := NewIterator(0, 2, getFn, []byte("\r\n")) + itr := NewIterator(0, fetchFn) s.False(itr.HasNext()) _, err := itr.Next() - s.Error(err) + s.Equal(ErrIteratorFinished, err) } func (s *IteratorSuite) TestNonEmptyNoErrors() { - getFn := func(page int) ([]byte, error) { - return getMap[page], nil + fetchFn := func(token PageToken) (Page, error) { + var nextPageToken interface{} = token.(int) + 1 + if nextPageToken.(int) == 5 { + nextPageToken = nil + } + return Page{ + CurrentToken: token, + NextToken: nextPageToken, + Entities: fetchMap[token], + }, nil } - itr := NewIterator(0, 5, getFn, []byte("\r\n")) - expectedResults := []string{"\"one\"", "\"two\"", "\"three\"", "\"four\"", "\"five\"", "\"six\"", "\"seven\"", "\"eight\""} + itr := NewIterator(0, fetchFn) + expectedResults := []string{"one", "two", "three", "four", "five", "six", "seven", "eight"} i := 0 for itr.HasNext() { curr, err := itr.Next() s.NoError(err) - expectedCurr := []byte(expectedResults[i]) - s.Equal(expectedCurr, curr) + s.Equal(expectedResults[i], curr.(string)) i++ } s.False(itr.HasNext()) _, err := itr.Next() - s.Error(err) + s.Equal(ErrIteratorFinished, err) } func (s *IteratorSuite) TestNonEmptyWithErrors() { - getFn := func(page int) ([]byte, error) { - if page > 4 { - return nil, errors.New("error getting next page") + fetchFn := func(token PageToken) (Page, error) { + if token.(int) == 4 { + return Page{}, errors.New("got error") } - return getMap[page], nil + return Page{ + CurrentToken: token, + NextToken: token.(int) + 1, + Entities: fetchMap[token], + }, nil } - itr := NewIterator(0, 5, getFn, []byte("\r\n")) - expectedResults := []string{"\"one\"", "\"two\"", "\"three\"", "\"four\"", "\"five\""} + itr := NewIterator(0, fetchFn) + expectedResults := []string{"one", "two", "three", "four", "five", "six", "seven"} i := 0 for itr.HasNext() { curr, err := itr.Next() s.NoError(err) - expectedCurr := []byte(expectedResults[i]) - s.Equal(expectedCurr, curr) + s.Equal(expectedResults[i], curr.(string)) i++ } s.False(itr.HasNext()) - _, err := itr.Next() - s.Error(err) + curr, err := itr.Next() + s.Nil(curr) + s.Equal("got error", err.Error()) } diff --git a/common/pagination/writer.go b/common/pagination/writer.go new file mode 100644 index 00000000000..3110c3bd5d9 --- /dev/null +++ b/common/pagination/writer.go @@ -0,0 +1,93 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package pagination + +type ( + writer struct { + writeFn WriteFn + shouldFlushFn ShouldFlushFn + flushedPages []PageToken + page Page + } +) + +// NewWriter constructs a new Writer +func NewWriter( + writeFn WriteFn, + shouldFlushFn ShouldFlushFn, + startingPage PageToken, +) Writer { + return &writer{ + writeFn: writeFn, + shouldFlushFn: shouldFlushFn, + flushedPages: nil, + page: Page{ + Entities: nil, + CurrentToken: startingPage, + }, + } +} + +// Add adds entity to buffer and flushes if provided shouldFlushFn indicates the page should be flushed. +func (w *writer) Add(e Entity) error { + w.page.Entities = append(w.page.Entities, e) + if !w.shouldFlushFn(w.page) { + return nil + } + return w.Flush() +} + +// Flush flushes the buffer. +func (w *writer) Flush() error { + nextPageToken, err := w.writeFn(w.page) + if err != nil { + return err + } + w.flushedPages = append(w.flushedPages, w.page.CurrentToken) + w.page = Page{ + Entities: nil, + CurrentToken: nextPageToken, + } + return nil +} + +// FlushedPages returns all pages which have been successfully flushed. +func (w *writer) FlushedPages() []PageToken { + return w.flushedPages +} + +// FirstFlushedPage returns the first page that was flushed or nil if no pages have been flushed. +func (w *writer) FirstFlushedPage() PageToken { + if len(w.flushedPages) == 0 { + return nil + } + return w.flushedPages[0] +} + +// LastFlushedPage returns the last page that was flushed or nil if no pages have been flushed +func (w *writer) LastFlushedPage() PageToken { + if len(w.flushedPages) == 0 { + return nil + } + return w.flushedPages[len(w.flushedPages)-1] +} diff --git a/common/pagination/writerIterator_test.go b/common/pagination/writerIterator_test.go new file mode 100644 index 00000000000..83ab30f746e --- /dev/null +++ b/common/pagination/writerIterator_test.go @@ -0,0 +1,125 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package pagination + +import ( + "bytes" + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type TestEntity struct { + Counter int +} + +var separator = []byte("\r\n") + +type WriterIteratorSuite struct { + *require.Assertions + suite.Suite +} + +func TestWriterIteratorSuite(t *testing.T) { + suite.Run(t, new(WriterIteratorSuite)) +} + +func (s *WriterIteratorSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *WriterIteratorSuite) TestWriterIterator() { + store := make(map[string][]byte) + + shouldFlushFn := func(page Page) bool { + return len(page.Entities) == 10 + } + pageNum := 0 + writeFn := func(page Page) (PageToken, error) { + buffer := &bytes.Buffer{} + for _, e := range page.Entities { + data, err := json.Marshal(e) + if err != nil { + return nil, err + } + buffer.Write(data) + buffer.Write(separator) + } + store[page.CurrentToken.(string)] = buffer.Bytes() + pageNum++ + return fmt.Sprintf("key_%v", pageNum), nil + } + writer := NewWriter(writeFn, shouldFlushFn, fmt.Sprintf("key_%v", pageNum)) + for i := 0; i < 100; i++ { + te := TestEntity{ + Counter: i, + } + s.NoError(writer.Add(te)) + } + flushedKeys := writer.FlushedPages() + s.Len(flushedKeys, 10) + for i := 0; i < 10; i++ { + expectedKey := fmt.Sprintf("key_%v", i) + s.Equal(expectedKey, flushedKeys[i].(string)) + } + + fetchFn := func(token PageToken) (Page, error) { + key := flushedKeys[token.(int)] + data := store[key.(string)] + dataBlobs := bytes.Split(data, separator) + var entities []Entity + for _, db := range dataBlobs { + if len(db) == 0 { + continue + } + var entity TestEntity + if err := json.Unmarshal(db, &entity); err != nil { + return Page{}, err + } + entities = append(entities, entity) + } + var nextPageToken interface{} = token.(int) + 1 + if nextPageToken == len(flushedKeys) { + nextPageToken = nil + } + return Page{ + CurrentToken: token, + NextToken: nextPageToken, + Entities: entities, + }, nil + } + + itr := NewIterator(0, fetchFn) + itrCount := 0 + for itr.HasNext() { + val, err := itr.Next() + s.NoError(err) + te := val.(TestEntity) + s.Equal(itrCount, te.Counter) + itrCount++ + } + s.Equal(100, itrCount) +} diff --git a/common/pagination/writer_test.go b/common/pagination/writer_test.go new file mode 100644 index 00000000000..5e79107caf8 --- /dev/null +++ b/common/pagination/writer_test.go @@ -0,0 +1,134 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package pagination + +import ( + "errors" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +type WriterSuite struct { + *require.Assertions + suite.Suite +} + +func TestWriterSuite(t *testing.T) { + suite.Run(t, new(WriterSuite)) +} + +func (s *WriterSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *WriterSuite) TestAddNoFlush() { + writeFn := func(_ Page) (PageToken, error) { + return nil, nil + } + shouldFlushFn := func(_ Page) bool { + return false + } + writer := NewWriter(writeFn, shouldFlushFn, nil) + s.NoError(writer.Add(1)) + s.Empty(writer.FlushedPages()) + s.Nil(writer.FirstFlushedPage()) + s.Nil(writer.LastFlushedPage()) +} + +func (s *WriterSuite) TestAddAndFlush() { + writeFn := func(_ Page) (PageToken, error) { + return nil, nil + } + shouldFlushFn := func(_ Page) bool { + return true + } + writer := NewWriter(writeFn, shouldFlushFn, "test_token") + s.NoError(writer.Add(1)) + flushedPages := writer.FlushedPages() + s.Len(flushedPages, 1) + s.Equal("test_token", flushedPages[0].(string)) + s.Equal("test_token", writer.FirstFlushedPage().(string)) + s.Equal("test_token", writer.LastFlushedPage().(string)) +} + +func (s *WriterSuite) TestFlushErrorOnWrite() { + writeFn := func(_ Page) (PageToken, error) { + return nil, errors.New("got error") + } + shouldFlushFn := func(_ Page) bool { + return true + } + writer := NewWriter(writeFn, shouldFlushFn, nil) + s.Error(writer.Flush()) +} + +func (s *WriterSuite) TestFlushNoError() { + writeFn := func(_ Page) (PageToken, error) { + return nil, nil + } + shouldFlushFn := func(_ Page) bool { + return true + } + writer := NewWriter(writeFn, shouldFlushFn, "test_token") + s.Len(writer.FlushedPages(), 0) + s.NoError(writer.Flush()) + s.Equal("test_token", writer.FlushedPages()[0].(string)) + s.Equal("test_token", writer.FirstFlushedPage().(string)) + s.Equal("test_token", writer.LastFlushedPage().(string)) +} + +func (s *WriterSuite) TestMultiPageWrite() { + pageNum := 0 + writeFnCalls := 0 + writeFn := func(_ Page) (PageToken, error) { + writeFnCalls++ + pageNum++ + return fmt.Sprintf("page_%v", pageNum), nil + } + shouldFlushCalls := 0 + shouldFlushFn := func(p Page) bool { + shouldFlushCalls++ + return len(p.Entities) == 5 + } + writer := NewWriter(writeFn, shouldFlushFn, "page_0") + for i := 1; i <= 100; i++ { + s.NoError(writer.Add(i)) + } + var expectedPages []string + for i := 0; i < 20; i++ { + expectedPages = append(expectedPages, fmt.Sprintf("page_%v", i)) + } + actualPages := writer.FlushedPages() + s.Len(actualPages, 20) + for i, expected := range expectedPages { + s.Equal(expected, actualPages[i].(string)) + } + s.Equal(writeFnCalls, 20) + s.Equal(shouldFlushCalls, 100) + s.Equal("page_0", writer.FirstFlushedPage().(string)) + s.Equal("page_19", writer.LastFlushedPage().(string)) + +} diff --git a/common/util/bufferedWriter.go b/common/util/bufferedWriter.go deleted file mode 100644 index ad041189071..00000000000 --- a/common/util/bufferedWriter.go +++ /dev/null @@ -1,145 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package util - -import ( - "bytes" - "sync" -) - -type ( - // BufferedWriter is used to buffer entities, construct byte blobs and invoke handle function on byte blobs. - // BufferedWriter is thread safe and makes defensive copies in and out. - // BufferedWriter's state is unchanged whenever any method returns an error. - BufferedWriter interface { - Add(interface{}) error - Flush() error - LastFlushedPage() int - } - - // HandleFn is invoked whenever a byte blob needs to be flushed. - // Takes in deep copy of constructed byte blob and the current page number. - // Returns an error on failure to handle or nil otherwise. - HandleFn func([]byte, int) error - - // SerializeFn is used to serialize entities. - SerializeFn func(interface{}) ([]byte, error) - - bufferedWriter struct { - sync.Mutex - - buffer *bytes.Buffer - page int - - flushThreshold int - separatorToken []byte - handleFn HandleFn - serializeFn SerializeFn - } -) - -// NewBufferedWriter constructs a new BufferedWriter -func NewBufferedWriter( - handleFn HandleFn, - serializeFn SerializeFn, - flushThreshold int, - separatorToken []byte, -) BufferedWriter { - separatorTokenCopy := make([]byte, len(separatorToken), len(separatorToken)) - copy(separatorTokenCopy, separatorToken) - return &bufferedWriter{ - buffer: &bytes.Buffer{}, - page: 0, - - flushThreshold: flushThreshold, - separatorToken: separatorTokenCopy, - handleFn: handleFn, - serializeFn: serializeFn, - } -} - -// Add adds element to buffer. Triggers flush if exceeds flushThreshold. -func (bw *bufferedWriter) Add(e interface{}) error { - bw.Lock() - defer bw.Unlock() - - if err := bw.writeToBuffer(e); err != nil { - return err - } - if bw.shouldFlush() { - if err := bw.flush(); err != nil { - return err - } - } - return nil -} - -// Flush invokes PutFn and advances state of bufferedWriter to next page. -func (bw *bufferedWriter) Flush() error { - bw.Lock() - defer bw.Unlock() - - return bw.flush() -} - -// LastFlushedPage returns the page number of the last page that was flushed. -// Returns -1 if no pages have been flushed. -func (bw *bufferedWriter) LastFlushedPage() int { - bw.Lock() - defer bw.Unlock() - - return bw.page - 1 -} - -func (bw *bufferedWriter) flush() error { - src := bw.buffer.Bytes() - dest := make([]byte, len(src), len(src)) - copy(dest, src) - err := bw.handleFn(dest, bw.page) - if err != nil { - return err - } - bw.startNewPage() - return nil -} - -func (bw *bufferedWriter) writeToBuffer(e interface{}) error { - data, err := bw.serializeFn(e) - if err != nil { - return err - } - - // write will never return an error, so it can be safely ignored - bw.buffer.Write(data) - bw.buffer.Write(bw.separatorToken) - return nil -} - -func (bw *bufferedWriter) shouldFlush() bool { - return bw.buffer.Len() >= bw.flushThreshold -} - -func (bw *bufferedWriter) startNewPage() { - bw.buffer = &bytes.Buffer{} - bw.page = bw.page + 1 -} diff --git a/common/util/bufferedWriterIterator_test.go b/common/util/bufferedWriterIterator_test.go deleted file mode 100644 index 2bd813486a9..00000000000 --- a/common/util/bufferedWriterIterator_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package util - -import ( - "encoding/json" - "fmt" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestBufferedWriterWithIterator(t *testing.T) { - blobMap := make(map[string][]byte) - handleFn := func(data []byte, page int) error { - key := fmt.Sprintf("key_%v", page) - blobMap[key] = data - return nil - } - bw := NewBufferedWriter(handleFn, json.Marshal, 100, []byte("\r\n")) - for i := 0; i < 1000; i++ { - assert.NoError(t, bw.Add(i)) - } - assert.NoError(t, bw.Flush()) - lastFlushedPage := bw.LastFlushedPage() - getFn := func(page int) ([]byte, error) { - key := fmt.Sprintf("key_%v", page) - return blobMap[key], nil - } - itr := NewIterator(0, lastFlushedPage, getFn, []byte("\r\n")) - i := 0 - for itr.HasNext() { - val, err := itr.Next() - assert.NoError(t, err) - expectedVal, err := json.Marshal(i) - assert.NoError(t, err) - assert.Equal(t, expectedVal, val) - i++ - } -} diff --git a/common/util/bufferedWriter_test.go b/common/util/bufferedWriter_test.go deleted file mode 100644 index fab36b4ff5b..00000000000 --- a/common/util/bufferedWriter_test.go +++ /dev/null @@ -1,115 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package util - -import ( - "bytes" - "encoding/json" - "errors" - "testing" - - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" -) - -type BufferedWriterSuite struct { - *require.Assertions - suite.Suite -} - -func TestBufferedWriterSuite(t *testing.T) { - suite.Run(t, new(BufferedWriterSuite)) -} - -func (s *BufferedWriterSuite) SetupTest() { - s.Assertions = require.New(s.T()) -} - -func (s *BufferedWriterSuite) TestShouldFlush() { - bw := &bufferedWriter{ - buffer: bytes.NewBuffer([]byte{1, 2, 3}), - flushThreshold: 10, - } - s.False(bw.shouldFlush()) - bw.flushThreshold = 3 - s.True(bw.shouldFlush()) -} - -func (s *BufferedWriterSuite) TestWriteToBuffer() { - bw := &bufferedWriter{ - buffer: &bytes.Buffer{}, - separatorToken: []byte("\r\n"), - serializeFn: json.Marshal, - } - s.Error(bw.writeToBuffer(make(chan struct{}))) - s.Equal("", bw.buffer.String()) - - s.NoError(bw.writeToBuffer("first")) - s.Error(bw.writeToBuffer(make(chan struct{}))) - s.Equal("\"first\"\r\n", bw.buffer.String()) -} - -func (s *BufferedWriterSuite) TestFlush_HandleReturnsError() { - handleFn := func(data []byte, page int) error { - return errors.New("put function returns error") - } - bw := &bufferedWriter{ - buffer: bytes.NewBuffer([]byte{1, 2, 3}), - page: 1, - handleFn: handleFn, - } - s.Error(bw.flush()) - s.Equal(1, bw.page) - s.Equal([]byte{1, 2, 3}, bw.buffer.Bytes()) -} - -func (s *BufferedWriterSuite) TestFlush_Success() { - handleFn := func(data []byte, page int) error { - return nil - } - bw := &bufferedWriter{ - buffer: bytes.NewBuffer([]byte{1, 2, 3}), - page: 1, - handleFn: handleFn, - } - s.NoError(bw.flush()) - s.Len(bw.buffer.Bytes(), 0) - s.Equal(1, bw.LastFlushedPage()) -} - -func (s *BufferedWriterSuite) TestAddAndFlush() { - handleFn := func(data []byte, page int) error { - return nil - } - bw := NewBufferedWriter(handleFn, json.Marshal, 10, []byte("\r\n")).(*bufferedWriter) - expectedLastFlushedPage := -1 - for i := 1; i <= 100; i++ { - s.NoError(bw.Add(0)) - if i%4 == 0 { - expectedLastFlushedPage++ - } - s.Equal(expectedLastFlushedPage, bw.LastFlushedPage()) - } - s.Equal(24, bw.LastFlushedPage()) - s.Len(bw.buffer.Bytes(), 0) -} diff --git a/common/util/iterator.go b/common/util/iterator.go deleted file mode 100644 index df10281cf96..00000000000 --- a/common/util/iterator.go +++ /dev/null @@ -1,165 +0,0 @@ -// The MIT License (MIT) -// -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in all -// copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -// SOFTWARE. - -package util - -import ( - "bytes" - "errors" - "sync" -) - -var ( - // ErrIteratorFinished indicates that Next was called on an iterator - // which has already reached the end of its input. - ErrIteratorFinished = errors.New("iterator has reached end") -) - -type ( - // Iterator is used to iterate over entities. - // Each entity is represented as a byte slice. - // Iterator will fetch pages using the provided GetFn. - // Pages will be fetched starting from provided minPage and continuing until provided maxPage. - // Iterator will skip over any pages with empty input and will skip over any empty elements within a page. - // Iterator is thread safe and makes deep copies of all in and out data. - // If Next returns an error all subsequent calls to Next will return the same error. - Iterator interface { - Next() ([]byte, error) - HasNext() bool - } - - // GetFn fetches bytes for given page number. Returns error on failure. - GetFn func(int) ([]byte, error) - - iterator struct { - sync.Mutex - - currentPage int - page [][]byte - pageIndex int - nextResult []byte - nextError error - - separatorToken []byte - getFn GetFn - minPage int - maxPage int - } -) - -// NewIterator constructs a new iterator. -func NewIterator( - minPage int, - maxPage int, - getFn GetFn, - separatorToken []byte, -) Iterator { - separatorTokenCopy := make([]byte, len(separatorToken), len(separatorToken)) - copy(separatorTokenCopy, separatorToken) - itr := &iterator{ - currentPage: -1, - - separatorToken: separatorTokenCopy, - getFn: getFn, - minPage: minPage, - maxPage: maxPage, - } - itr.advance(true) - return itr -} - -// Next returns the next element in the iterator. -// Returns an error if no elements are left or if a non-recoverable error occurred. -func (i *iterator) Next() ([]byte, error) { - i.Lock() - defer i.Unlock() - - result := i.nextResult - error := i.nextError - - i.advance(false) - - copyResult := make([]byte, len(result), len(result)) - copy(copyResult, result) - return copyResult, error -} - -// HasNext returns true if next invocation of Next will return on-empty byte blob and nil error, false otherwise. -func (i *iterator) HasNext() bool { - i.Lock() - defer i.Unlock() - - return i.hasNext() -} - -func (i *iterator) advance(initialization bool) { - if !i.hasNext() && !initialization { - return - } - i.advanceOnce() - for len(i.nextResult) == 0 && i.nextError == nil { - i.advanceOnce() - } -} - -func (i *iterator) advanceOnce() { - if i.pageIndex < len(i.page) { - i.consumeFromCurrentPage() - return - } - if i.currentPage >= i.maxPage { - i.setIteratorToTerminalState(ErrIteratorFinished) - return - } - i.page = nil - i.currentPage++ - i.pageIndex = 0 - data, err := i.getFn(i.currentPage) - if err != nil { - i.setIteratorToTerminalState(err) - return - } - if len(data) == 0 { - i.nextResult = nil - i.nextError = nil - } else { - copyData := make([]byte, len(data), len(data)) - copy(copyData, data) - i.page = bytes.Split(copyData, i.separatorToken) - i.consumeFromCurrentPage() - } -} - -func (i *iterator) consumeFromCurrentPage() { - i.nextResult = i.page[i.pageIndex] - i.nextError = nil - i.pageIndex = i.pageIndex + 1 -} - -func (i *iterator) setIteratorToTerminalState(err error) { - i.nextResult = nil - i.nextError = err -} - -func (i *iterator) hasNext() bool { - return len(i.nextResult) > 0 && i.nextError == nil -} diff --git a/service/worker/scanner/executions/common/blobstoreIterator.go b/service/worker/scanner/executions/common/blobstoreIterator.go new file mode 100644 index 00000000000..3390506c73f --- /dev/null +++ b/service/worker/scanner/executions/common/blobstoreIterator.go @@ -0,0 +1,102 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package common + +import ( + "bytes" + "context" + "encoding/json" + + "github.com/uber/cadence/common/blobstore" + "github.com/uber/cadence/common/pagination" +) + +type ( + blobstoreIterator struct { + itr pagination.Iterator + } +) + +// NewBlobstoreIterator constructs a new iterator backed by blobstore. +func NewBlobstoreIterator( + client blobstore.Client, + keys Keys, +) ExecutionIterator { + return &blobstoreIterator{ + itr: pagination.NewIterator(keys.MinPage, getBlobstoreFetchPageFn(client, keys)), + } +} + +// Next returns the next Execution +func (i *blobstoreIterator) Next() (*Execution, error) { + exec, err := i.itr.Next() + if exec != nil { + return exec.(*Execution), err + } + return nil, err +} + +// HasNext returns true if there is a next Execution false otherwise +func (i *blobstoreIterator) HasNext() bool { + return i.itr.HasNext() +} + +func getBlobstoreFetchPageFn( + client blobstore.Client, + keys Keys, +) pagination.FetchFn { + return func(token pagination.PageToken) (pagination.Page, error) { + index := token.(int) + key := pageNumberToKey(keys.UUID, keys.Extension, index) + ctx, cancel := context.WithTimeout(context.Background(), BlobstoreTimeout) + defer cancel() + req := &blobstore.GetRequest{ + Key: key, + } + resp, err := client.Get(ctx, req) + if err != nil { + return pagination.Page{}, err + } + parts := bytes.Split(resp.Blob.Body, BlobstoreSeparatorToken) + var executions []pagination.Entity + for _, p := range parts { + if len(p) == 0 { + continue + } + var soe ScanOutputEntity + if err := json.Unmarshal(p, &soe); err != nil { + return pagination.Page{}, err + } + executions = append(executions, &soe.Execution) + } + var nextPageToken interface{} = index + 1 + if nextPageToken.(int) > keys.MaxPage { + nextPageToken = nil + } + return pagination.Page{ + CurrentToken: token, + NextToken: nextPageToken, + Entities: executions, + }, nil + } +} diff --git a/service/worker/scanner/executions/common/blobstoreWriter.go b/service/worker/scanner/executions/common/blobstoreWriter.go new file mode 100644 index 00000000000..b5bc65869f0 --- /dev/null +++ b/service/worker/scanner/executions/common/blobstoreWriter.go @@ -0,0 +1,127 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package common + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + + "github.com/uber/cadence/common/blobstore" + "github.com/uber/cadence/common/pagination" +) + +type ( + blobstoreWriter struct { + writer pagination.Writer + uuid string + extension string + } +) + +// NewBlobstoreWriter constructs a new blobstore writer +func NewBlobstoreWriter( + uuid string, + extension string, + client blobstore.Client, + flushThreshold int, +) ExecutionWriter { + return &blobstoreWriter{ + writer: pagination.NewWriter( + getBlobstoreWriteFn(uuid, extension, client), + getBlobstoreShouldFlushFn(flushThreshold), + 0), + uuid: uuid, + extension: extension, + } +} + +// Add adds an entity to blobstore writer +func (bw *blobstoreWriter) Add(e interface{}) error { + return bw.writer.Add(e) +} + +// Flush flushes contents of writer to blobstore +func (bw *blobstoreWriter) Flush() error { + return bw.writer.Flush() +} + +// FlushedKeys returns the keys that have been successfully flushed. +// Returns nil if no keys have been flushed. +func (bw *blobstoreWriter) FlushedKeys() *Keys { + if len(bw.writer.FlushedPages()) == 0 { + return nil + } + return &Keys{ + UUID: bw.uuid, + MinPage: bw.writer.FirstFlushedPage().(int), + MaxPage: bw.writer.LastFlushedPage().(int), + Extension: bw.extension, + } +} + +func getBlobstoreWriteFn( + uuid string, + extension string, + client blobstore.Client, +) pagination.WriteFn { + return func(page pagination.Page) (pagination.PageToken, error) { + blobIndex := page.CurrentToken.(int) + key := pageNumberToKey(uuid, extension, blobIndex) + buffer := &bytes.Buffer{} + for _, e := range page.Entities { + data, err := json.Marshal(e) + if err != nil { + return nil, err + } + buffer.Write(data) + buffer.Write(BlobstoreSeparatorToken) + } + req := &blobstore.PutRequest{ + Key: key, + Blob: blobstore.Blob{ + Body: buffer.Bytes(), + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), BlobstoreTimeout) + defer cancel() + if _, err := client.Put(ctx, req); err != nil { + return nil, err + } + return blobIndex + 1, nil + } +} + +func getBlobstoreShouldFlushFn( + flushThreshold int, +) pagination.ShouldFlushFn { + return func(page pagination.Page) bool { + return len(page.Entities) > flushThreshold + } +} + +func pageNumberToKey(uuid string, extension string, pageNum int) string { + return fmt.Sprintf("%v_%v.%v", uuid, pageNum, extension) +} diff --git a/service/worker/scanner/executions/common/interfaces.go b/service/worker/scanner/executions/common/interfaces.go index 32fe78e921b..87a5c4e9700 100644 --- a/service/worker/scanner/executions/common/interfaces.go +++ b/service/worker/scanner/executions/common/interfaces.go @@ -22,7 +22,19 @@ package common +import "github.com/uber/cadence/common/persistence" + type ( + // PersistenceRetryer is used to retry requests to persistence + PersistenceRetryer interface { + ListConcreteExecutions(*persistence.ListConcreteExecutionsRequest) (*persistence.ListConcreteExecutionsResponse, error) + GetWorkflowExecution(*persistence.GetWorkflowExecutionRequest) (*persistence.GetWorkflowExecutionResponse, error) + GetCurrentExecution(*persistence.GetCurrentExecutionRequest) (*persistence.GetCurrentExecutionResponse, error) + ReadHistoryBranch(*persistence.ReadHistoryBranchRequest) (*persistence.ReadHistoryBranchResponse, error) + DeleteWorkflowExecution(*persistence.DeleteWorkflowExecutionRequest) error + DeleteCurrentWorkflowExecution(request *persistence.DeleteCurrentWorkflowExecutionRequest) error + } + // InvariantManager represents a manager of several invariants. // It can be used to run a group of invariant checks or fixes. // It is responsible for running invariants in their dependency order. @@ -51,6 +63,13 @@ type ( HasNext() bool } + // ExecutionWriter is used to write entities (FixOutputEntity or ScanOutputEntity) to blobstore + ExecutionWriter interface { + Add(interface{}) error + Flush() error + FlushedKeys() *Keys + } + // Scanner is used to scan over all executions in a shard. It is responsible for three things: // 1. Checking invariants for each execution. // 2. Recording corruption and failures to durable store. diff --git a/service/worker/scanner/executions/common/persistenceIterator.go b/service/worker/scanner/executions/common/persistenceIterator.go new file mode 100644 index 00000000000..10a6958c7e2 --- /dev/null +++ b/service/worker/scanner/executions/common/persistenceIterator.go @@ -0,0 +1,107 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package common + +import ( + "github.com/uber/cadence/common/pagination" + "github.com/uber/cadence/common/persistence" +) + +type ( + persistenceIterator struct { + itr pagination.Iterator + } +) + +// NewPersistenceIterator returns a new paginated iterator over persistence +func NewPersistenceIterator( + pr PersistenceRetryer, + pageSize int, + shardID int, +) ExecutionIterator { + return &persistenceIterator{ + itr: pagination.NewIterator(nil, getPersistenceFetchPageFn(pr, pageSize, shardID)), + } +} + +// Next returns the next execution +func (i *persistenceIterator) Next() (*Execution, error) { + exec, err := i.itr.Next() + if exec != nil { + return exec.(*Execution), err + } + return nil, err +} + +// HasNext returns true if there is another execution, false otherwise. +func (i *persistenceIterator) HasNext() bool { + return i.itr.HasNext() +} + +func getPersistenceFetchPageFn( + pr PersistenceRetryer, + pageSize int, + shardID int, +) pagination.FetchFn { + return func(token pagination.PageToken) (pagination.Page, error) { + req := &persistence.ListConcreteExecutionsRequest{ + PageSize: pageSize, + } + if token != nil { + req.PageToken = token.([]byte) + } + resp, err := pr.ListConcreteExecutions(req) + if err != nil { + return pagination.Page{}, err + } + executions := make([]pagination.Entity, len(resp.Executions), len(resp.Executions)) + for i, e := range resp.Executions { + branchToken := e.ExecutionInfo.BranchToken + if e.VersionHistories != nil { + versionHistory, err := e.VersionHistories.GetCurrentVersionHistory() + if err != nil { + return pagination.Page{}, err + } + branchToken = versionHistory.GetBranchToken() + } + executions[i] = &Execution{ + ShardID: shardID, + DomainID: e.ExecutionInfo.DomainID, + WorkflowID: e.ExecutionInfo.WorkflowID, + RunID: e.ExecutionInfo.RunID, + BranchToken: branchToken, + State: e.ExecutionInfo.State, + } + } + var nextToken interface{} = resp.PageToken + if len(resp.PageToken) == 0 { + nextToken = nil + } + page := pagination.Page{ + CurrentToken: token, + NextToken: nextToken, + Entities: executions, + } + return page, nil + } +} diff --git a/service/worker/scanner/executions/common/persistenceRetryer.go b/service/worker/scanner/executions/common/persistenceRetryer.go new file mode 100644 index 00000000000..86a2d745235 --- /dev/null +++ b/service/worker/scanner/executions/common/persistenceRetryer.go @@ -0,0 +1,140 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package common + +import ( + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/persistence" +) + +type ( + persistenceRetryer struct { + execManager persistence.ExecutionManager + historyManager persistence.HistoryManager + } +) + +var ( + retryPolicy = common.CreatePersistanceRetryPolicy() +) + +// NewPersistenceRetryer constructs a new PersistenceRetryer +func NewPersistenceRetryer( + execManager persistence.ExecutionManager, + historyManager persistence.HistoryManager, +) PersistenceRetryer { + return &persistenceRetryer{ + execManager: execManager, + historyManager: historyManager, + } +} + +// ListConcreteExecutions retries ListConcreteExecutions +func (pr *persistenceRetryer) ListConcreteExecutions( + req *persistence.ListConcreteExecutionsRequest, +) (*persistence.ListConcreteExecutionsResponse, error) { + var resp *persistence.ListConcreteExecutionsResponse + op := func() error { + var err error + resp, err = pr.execManager.ListConcreteExecutions(req) + return err + } + var err error + err = backoff.Retry(op, retryPolicy, common.IsPersistenceTransientError) + if err == nil { + return resp, nil + } + return nil, err +} + +// GetWorkflowExecution retries GetWorkflowExecution +func (pr *persistenceRetryer) GetWorkflowExecution( + req *persistence.GetWorkflowExecutionRequest, +) (*persistence.GetWorkflowExecutionResponse, error) { + var resp *persistence.GetWorkflowExecutionResponse + op := func() error { + var err error + resp, err = pr.execManager.GetWorkflowExecution(req) + return err + } + err := backoff.Retry(op, retryPolicy, common.IsPersistenceTransientError) + if err != nil { + return nil, err + } + return resp, nil +} + +// GetCurrentExecution retries GetCurrentExecution +func (pr *persistenceRetryer) GetCurrentExecution( + req *persistence.GetCurrentExecutionRequest, +) (*persistence.GetCurrentExecutionResponse, error) { + var resp *persistence.GetCurrentExecutionResponse + op := func() error { + var err error + resp, err = pr.execManager.GetCurrentExecution(req) + return err + } + err := backoff.Retry(op, retryPolicy, common.IsPersistenceTransientError) + if err != nil { + return nil, err + } + return resp, nil +} + +// ReadHistoryBranch retries ReadHistoryBranch +func (pr *persistenceRetryer) ReadHistoryBranch( + req *persistence.ReadHistoryBranchRequest, +) (*persistence.ReadHistoryBranchResponse, error) { + var resp *persistence.ReadHistoryBranchResponse + op := func() error { + var err error + resp, err = pr.historyManager.ReadHistoryBranch(req) + return err + } + err := backoff.Retry(op, retryPolicy, common.IsPersistenceTransientError) + if err != nil { + return nil, err + } + return resp, nil +} + +// DeleteWorkflowExecution retries DeleteWorkflowExecution +func (pr *persistenceRetryer) DeleteWorkflowExecution( + req *persistence.DeleteWorkflowExecutionRequest, +) error { + op := func() error { + return pr.execManager.DeleteWorkflowExecution(req) + } + return backoff.Retry(op, retryPolicy, common.IsPersistenceTransientError) +} + +// DeleteCurrentWorkflowExecution retries DeleteCurrentWorkflowExecution +func (pr *persistenceRetryer) DeleteCurrentWorkflowExecution( + req *persistence.DeleteCurrentWorkflowExecutionRequest, +) error { + op := func() error { + return pr.execManager.DeleteCurrentWorkflowExecution(req) + } + return backoff.Retry(op, retryPolicy, common.IsPersistenceTransientError) +} diff --git a/service/worker/scanner/executions/common/types.go b/service/worker/scanner/executions/common/types.go index a4e371c5886..a0546ee3ef8 100644 --- a/service/worker/scanner/executions/common/types.go +++ b/service/worker/scanner/executions/common/types.go @@ -23,9 +23,18 @@ package common import ( + "time" + "github.com/uber/cadence/common/persistence" ) +var ( + // BlobstoreSeparatorToken is used to separate entries written to blobstore + BlobstoreSeparatorToken = []byte("\r\n") + // BlobstoreTimeout is the timeout used for blobstore requests + BlobstoreTimeout = time.Second * 10 +) + type ( // CheckResultType is the result type of running an invariant check CheckResultType string diff --git a/service/worker/scanner/executions/common/writerIterator_test.go b/service/worker/scanner/executions/common/writerIterator_test.go new file mode 100644 index 00000000000..f6d7ef6c8b6 --- /dev/null +++ b/service/worker/scanner/executions/common/writerIterator_test.go @@ -0,0 +1,153 @@ +// The MIT License (MIT) +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +// SOFTWARE. + +package common + +import ( + "fmt" + "io/ioutil" + "os" + "testing" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/uber/cadence/common/blobstore/filestore" + "github.com/uber/cadence/common/mocks" + "github.com/uber/cadence/common/pagination" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/service/config" +) + +var ( + testBranchToken = []byte{1, 2, 3} + executionPageSize = 10 + testShardID = 1 +) + +type WriterIteratorSuite struct { + *require.Assertions + suite.Suite +} + +func TestWriterIteratorSuite(t *testing.T) { + suite.Run(t, new(WriterIteratorSuite)) +} + +func (s *WriterIteratorSuite) SetupTest() { + s.Assertions = require.New(s.T()) +} + +func (s *WriterIteratorSuite) TestWriterIterator() { + pr := NewPersistenceRetryer(getMockExecutionManager(10, 10), nil) + pItr := NewPersistenceIterator(pr, executionPageSize, testShardID) + uuid := "uuid" + extension := "test" + outputDir, err := ioutil.TempDir("", "TestWriterIterator") + s.NoError(err) + defer os.RemoveAll(outputDir) + cfg := &config.FileBlobstore{ + OutputDirectory: outputDir, + } + blobstore, err := filestore.NewFilestoreClient(cfg) + s.NoError(err) + blobstoreWriter := NewBlobstoreWriter(uuid, extension, blobstore, 10) + var executions []*Execution + for pItr.HasNext() { + exec, err := pItr.Next() + s.NoError(err) + executions = append(executions, exec) + err = blobstoreWriter.Add(&ScanOutputEntity{ + Execution: *exec, + }) + s.NoError(err) + } + s.NoError(blobstoreWriter.Flush()) + s.Len(executions, 100) + s.False(pItr.HasNext()) + _, err = pItr.Next() + s.Equal(pagination.ErrIteratorFinished, err) + flushedKeys := blobstoreWriter.FlushedKeys() + s.Equal(uuid, flushedKeys.UUID) + s.Equal(0, flushedKeys.MinPage) + s.Equal(9, flushedKeys.MaxPage) + s.Equal("test", flushedKeys.Extension) + blobstoreItr := NewBlobstoreIterator(blobstore, *flushedKeys) + i := 0 + for blobstoreItr.HasNext() { + exec, err := blobstoreItr.Next() + s.NoError(err) + s.Equal(*executions[i], *exec) + i++ + } +} + +func getMockExecutionManager(pages int, countPerPage int) persistence.ExecutionManager { + execManager := &mocks.ExecutionManager{} + for i := 0; i < pages; i++ { + req := &persistence.ListConcreteExecutionsRequest{ + PageToken: []byte(fmt.Sprintf("token_%v", i)), + PageSize: executionPageSize, + } + if i == 0 { + req.PageToken = nil + } + resp := &persistence.ListConcreteExecutionsResponse{ + Executions: getExecutions(countPerPage), + PageToken: []byte(fmt.Sprintf("token_%v", i+1)), + } + if i == pages-1 { + resp.PageToken = nil + } + execManager.On("ListConcreteExecutions", req).Return(resp, nil) + } + return execManager +} + +func getExecutions(count int) []*persistence.ListConcreteExecutionsEntity { + var result []*persistence.ListConcreteExecutionsEntity + for i := 0; i < count; i++ { + execution := &persistence.ListConcreteExecutionsEntity{ + ExecutionInfo: &persistence.WorkflowExecutionInfo{ + DomainID: uuid.New(), + WorkflowID: uuid.New(), + RunID: uuid.New(), + BranchToken: testBranchToken, + State: 0, + }, + } + if i%2 == 0 { + execution.ExecutionInfo.BranchToken = nil + execution.VersionHistories = &persistence.VersionHistories{ + CurrentVersionHistoryIndex: 0, + Histories: []*persistence.VersionHistory{ + { + BranchToken: testBranchToken, + }, + }, + } + } + result = append(result, execution) + } + return result +}