Skip to content

Commit

Permalink
Add iterators and writers for persistence and blobstore for scanner w…
Browse files Browse the repository at this point in the history
…orkflow (#3234)
  • Loading branch information
andrewjdawson2016 authored May 6, 2020
1 parent 7178f2a commit c9d4837
Show file tree
Hide file tree
Showing 22 changed files with 1,366 additions and 512 deletions.
5 changes: 5 additions & 0 deletions common/blobstore/filestore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ func (c *client) Delete(_ context.Context, request *blobstore.DeleteRequest) (*b
return &blobstore.DeleteResponse{}, nil
}

// IsRetryableError returns true if the error is retryable false otherwise
func (c *client) IsRetryableError(err error) bool {
return false
}

func (c *client) deserializeBlob(data []byte) (blobstore.Blob, error) {
var blob blobstore.Blob
if err := json.Unmarshal(data, &blob); err != nil {
Expand Down
1 change: 1 addition & 0 deletions common/blobstore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type (
Get(context.Context, *GetRequest) (*GetResponse, error)
Exists(context.Context, *ExistsRequest) (*ExistsResponse, error)
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
IsRetryableError(error) bool
}

// PutRequest is the request to Put
Expand Down
15 changes: 15 additions & 0 deletions common/blobstore/interface_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

104 changes: 104 additions & 0 deletions common/blobstore/retryableClient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// The MIT License (MIT)
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package blobstore

import (
"context"

"github.com/uber/cadence/common/backoff"
)

type (
retryableClient struct {
client Client
policy backoff.RetryPolicy
}
)

// NewRetryableClient constructs a blobstorre client which retries transient errors.
func NewRetryableClient(client Client, policy backoff.RetryPolicy) Client {
return &retryableClient{
client: client,
policy: policy,
}
}

func (c *retryableClient) Put(ctx context.Context, req *PutRequest) (*PutResponse, error) {
var resp *PutResponse
var err error
op := func() error {
resp, err = c.client.Put(ctx, req)
return err
}
err = backoff.Retry(op, c.policy, c.client.IsRetryableError)
if err != nil {
return nil, err
}
return resp, nil
}

func (c *retryableClient) Get(ctx context.Context, req *GetRequest) (*GetResponse, error) {
var resp *GetResponse
var err error
op := func() error {
resp, err = c.client.Get(ctx, req)
return err
}
err = backoff.Retry(op, c.policy, c.client.IsRetryableError)
if err != nil {
return nil, err
}
return resp, nil
}

func (c *retryableClient) Exists(ctx context.Context, req *ExistsRequest) (*ExistsResponse, error) {
var resp *ExistsResponse
var err error
op := func() error {
resp, err = c.client.Exists(ctx, req)
return err
}
err = backoff.Retry(op, c.policy, c.client.IsRetryableError)
if err != nil {
return nil, err
}
return resp, nil
}

func (c *retryableClient) Delete(ctx context.Context, req *DeleteRequest) (*DeleteResponse, error) {
var resp *DeleteResponse
var err error
op := func() error {
resp, err = c.client.Delete(ctx, req)
return err
}
err = backoff.Retry(op, c.policy, c.client.IsRetryableError)
if err != nil {
return nil, err
}
return resp, nil
}

func (c *retryableClient) IsRetryableError(err error) bool {
return c.client.IsRetryableError(err)
}
1 change: 1 addition & 0 deletions common/collection/pagingIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type (
)

// NewPagingIterator create a new paging iterator
// TODO: this implementation should be removed in favor of pagination/iterator.go
func NewPagingIterator(paginationFn PaginationFn) Iterator {
iter := &PagingIteratorImpl{
paginationFn: paginationFn,
Expand Down
73 changes: 73 additions & 0 deletions common/pagination/interface.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// The MIT License (MIT)
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package pagination

import "errors"

// ErrIteratorFinished indicates that Next was called on a finished iterator
var ErrIteratorFinished = errors.New("iterator has reached end")

type (
// Page contains a PageToken which identifies the current page,
// a PageToken which identifies the next page and a list of Entity.
Page struct {
NextToken PageToken
CurrentToken PageToken
Entities []Entity
}
// Entity is a generic type which can be operated on by Iterator and Writer
Entity interface{}
// PageToken identifies a page
PageToken interface{}
)

type (
// WriteFn writes given Page to underlying sink.
// The Pages's NextToken will always be nil, its the responsibility of WriteFn to
// construct and return the next PageToken, or return an error on failure.
WriteFn func(Page) (PageToken, error)
// ShouldFlushFn returns true if given page should be flushed false otherwise.
ShouldFlushFn func(Page) bool
// FetchFn fetches Page from PageToken.
// Once a page with nil NextToken is returned no more pages will be fetched.
FetchFn func(PageToken) (Page, error)
)

type (
// Iterator is used to get entities from a collection of pages.
// When HasNext returns true it is guaranteed that Next will not return an error.
// Once iterator returns an error it will never make progress again and will always return that same error.
// Iterator is not thread safe and does not make defensive in or out copies.
Iterator interface {
Next() (Entity, error)
HasNext() bool
}
// Writer is used to buffer and write entities to underlying store.
Writer interface {
Add(Entity) error
Flush() error
FlushedPages() []PageToken
FirstFlushedPage() PageToken
LastFlushedPage() PageToken
}
)
110 changes: 110 additions & 0 deletions common/pagination/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// The MIT License (MIT)
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package pagination

type (
iterator struct {
page Page
entityIndex int

nextEntity Entity
nextError error

fetchFn FetchFn
}
)

// NewIterator constructs a new Iterator
func NewIterator(
startingPageToken PageToken,
fetchFn FetchFn,
) Iterator {
itr := &iterator{
page: Page{
Entities: nil,
CurrentToken: nil,
NextToken: startingPageToken,
},
entityIndex: 0,
fetchFn: fetchFn,
}
itr.advance(true)
return itr
}

// Next returns the next Entity or error.
// Returning nil, nil is valid if that is what the provided fetch function provided.
func (i *iterator) Next() (Entity, error) {
entity := i.nextEntity
error := i.nextError
i.advance(false)
return entity, error
}

// HasNext returns true if there is a next element. There is considered to be a next element
// As long as a fatal error has not occurred and the iterator has not reached the end.
func (i *iterator) HasNext() bool {
return i.nextError == nil
}

func (i *iterator) advance(firstPage bool) {
if !i.HasNext() && !firstPage {
return
}
if i.entityIndex < len(i.page.Entities) {
i.consume()
} else {
if err := i.advanceToNonEmptyPage(firstPage); err != nil {
i.terminate(err)
} else {
i.consume()
}
}
}

func (i *iterator) advanceToNonEmptyPage(firstPage bool) error {
if i.page.NextToken == nil && !firstPage {
return ErrIteratorFinished
}
nextPage, err := i.fetchFn(i.page.NextToken)
if err != nil {
return err
}
i.page = nextPage
if len(i.page.Entities) != 0 {
i.entityIndex = 0
return nil
}
return i.advanceToNonEmptyPage(false)
}

func (i *iterator) consume() {
i.nextEntity = i.page.Entities[i.entityIndex]
i.nextError = nil
i.entityIndex++
}

func (i *iterator) terminate(err error) {
i.nextEntity = nil
i.nextError = err
}
Loading

0 comments on commit c9d4837

Please sign in to comment.