Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add listing to stow storage apis #5674

Closed
Closed
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
69 commits
Select commit Hold shift + click to select a range
7306683
Add environment variable for pod name
bgedik Aug 1, 2024
dd2957b
[flyteadmin] Refactor panic recovery into middleware (#5546)
Sovietaced Aug 1, 2024
d8e7491
Snowflake agent Doc (#5620)
Future-Outlier Aug 2, 2024
021c606
[flytepropeller][compiler] Error Handling when Type is not found (#5612)
Future-Outlier Aug 2, 2024
91d6d40
Fix nil pointer when task plugin load returns error (#5622)
Sovietaced Aug 2, 2024
eae4bf5
Log stack trace when refresh cache sync recovers from panic (#5623)
Sovietaced Aug 2, 2024
dcbc55a
use private-key (#5626)
Future-Outlier Aug 2, 2024
61cffe8
Explain how Agent Secret Works (#5625)
Future-Outlier Aug 2, 2024
2cf52ef
Fix typo in execution manager (#5619)
ddl-rliu Aug 2, 2024
a65a590
Amend Admin to use grpc message size (#5628)
wild-endeavor Aug 2, 2024
fe9fb67
document the process of setting ttl for a ray cluster (#5636)
pingsutw Aug 6, 2024
0514430
Add CustomHeaderMatcher to pass additional headers (#5563)
andrewwdye Aug 7, 2024
80c349d
Turn flyteidl and flytectl releases into manual gh workflows (#5635)
eapolinario Aug 8, 2024
ee724b1
docs: fix typo (#5643)
cratiu222 Aug 8, 2024
a9beb65
Use enable_deck=True in docs (#5645)
thomasjpfan Aug 8, 2024
392632d
Fix flyteidl release checkout all tags (#5646)
eapolinario Aug 8, 2024
ee4783e
Install pyarrow in sandbox functional tests (#5647)
eapolinario Aug 8, 2024
b21d674
docs: add documentation for configuring notifications in GCP (#5545)
desihsu Aug 9, 2024
aff319b
Correct "sucessfile" to "successfile" (#5652)
shengyu7697 Aug 12, 2024
4929522
Fix ordering for custom template values in cluster resource controlle…
katrogan Aug 12, 2024
5772d1f
Don't error when attempting to trigger schedules for inactive project…
katrogan Aug 12, 2024
2ddb4d2
fix tests
bgedik Aug 15, 2024
4e7a1c5
Merge branch 'master' of https://github.com/flyteorg/flyte into bugra…
bgedik Aug 15, 2024
1b24ca2
change to shorter names
bgedik Aug 20, 2024
ede7536
change to shorter names
bgedik Aug 20, 2024
7a14649
change to shorter names
bgedik Aug 20, 2024
783d75e
change to shorter names
bgedik Aug 20, 2024
c89950f
change to shorter names
bgedik Aug 20, 2024
8215edc
Add list operation to stow container
bgedik Aug 20, 2024
a4f87b4
Minor fix
bgedik Aug 20, 2024
cd7667c
Merge branch 'master' of https://github.com/flyteorg/flyte into bugra…
bgedik Aug 21, 2024
e333bb6
renamings
bgedik Sep 10, 2024
786903c
renamings
bgedik Sep 11, 2024
e4e1dd5
Merge branch 'master' of https://github.com/flyteorg/flyte into bugra…
bgedik Sep 11, 2024
acab62a
mockery
bgedik Sep 12, 2024
d02a58f
mockery
bgedik Sep 12, 2024
72095c0
renamings
bgedik Sep 10, 2024
77f47d1
renamings
bgedik Sep 11, 2024
49a7760
[Bug] Update resource failures w/ Finalizers set (#423) (#5673)
pvditt Aug 21, 2024
0272b86
[BUG] array node eventing bump version (#5680)
pvditt Aug 21, 2024
2fd51eb
Add custominfo to agents (#5604)
ddl-rliu Aug 23, 2024
85f7d3d
[BUG] use deep copy of bit arrays when getting array node state (#5681)
pvditt Aug 23, 2024
f02a8c7
More concise definition of launchplan (#5682)
eapolinario Aug 23, 2024
86859d1
Auth/prevent lookup per call (#5686)
wild-endeavor Aug 23, 2024
a86feb0
Update Flyte components - v1.13.1-rc1 (#5691)
flyte-bot Aug 23, 2024
3b690d2
[flytectl] DataConfig missing from TaskSpec (#5692)
wild-endeavor Aug 24, 2024
e0ae3ca
Update Flyte components - v1.13.1 (#5696)
flyte-bot Aug 27, 2024
add4e48
Enable echo plugin by default (#5679)
pingsutw Aug 28, 2024
34efd0b
Do not emit execution id label by default in single binary (#5704)
eapolinario Aug 29, 2024
e126da1
Using new offloaded metadata literal message for literal offloading (…
pmahindrakar-oss Aug 30, 2024
da2a5e6
Improve error message for MismatchingTypes (#5639)
pingsutw Aug 30, 2024
ef6fcc0
[Docs] Echo Task (#5707)
Future-Outlier Aug 30, 2024
c5f7d84
Improve execution name readability (#5637)
wayner0628 Aug 30, 2024
b46e203
Configure imagePullPolicy to be Always pull on flyte sandbox environm…
pmahindrakar-oss Aug 30, 2024
2afe101
should not set echo plugin as default (#5713)
pingsutw Sep 2, 2024
9f630b8
Move default execution name generation to flyteadmin (#5714)
wayner0628 Sep 2, 2024
2dc91ca
Update helm/docs per changes in supported task discovery (#5694)
Sovietaced Sep 3, 2024
573516f
[flyteagent] Add Logging for Agent Supported Task Types (#5718)
Future-Outlier Sep 3, 2024
9070fe7
extend pod customization to include init containers (#5685)
samhita-alla Sep 3, 2024
4a7cb55
Update "Try Serverless" language in Quickstart guide (#5698)
neverett Sep 5, 2024
8c303f4
Refactor flyteadmin to pass proto structs as pointers (#5717)
Sovietaced Sep 5, 2024
caa10a3
fix: Use deterministic execution names in scheduler (#5724)
pingsutw Sep 5, 2024
daaff18
Enable in every helm chart (#5730)
Future-Outlier Sep 8, 2024
d3a3fd1
[Docs][flyteagent] Remove agent-service config in flyte agent documen…
Future-Outlier Sep 8, 2024
f30613b
Fix flytectl returning the oldest workflow when using --latest flag (…
RRap0so Sep 9, 2024
cee4e16
Remove explict go toolchain versions (#5723)
ddl-ebrown Sep 10, 2024
cbdddd4
mockery
bgedik Sep 12, 2024
31ba83d
mockery
bgedik Sep 12, 2024
16454bf
Merge branch 'bugra.gedik/add_listing_to_storage_apis' of github.com:…
bgedik Sep 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions flyteadmin/pkg/common/mocks/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ func (t *TestDataStore) Head(ctx context.Context, reference storage.DataReferenc
return t.HeadCb(ctx, reference)
}

func (t *TestDataStore) List(ctx context.Context, reference storage.DataReference, maxItems int, cursor storage.Cursor) ([]storage.DataReference, storage.Cursor, error) {
return nil, storage.NewCursorAtEnd(), fmt.Errorf("Not implemented yet")
}

func (t *TestDataStore) ReadProtobuf(ctx context.Context, reference storage.DataReference, msg proto.Message) error {
return t.ReadProtobufCb(ctx, reference, msg)
}
Expand Down
4 changes: 4 additions & 0 deletions flytepropeller/pkg/utils/failing_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ func (FailingRawStore) Head(ctx context.Context, reference storage.DataReference
return nil, fmt.Errorf("failed metadata fetch")
}

func (s *FailingRawStore) List(ctx context.Context, reference storage.DataReference, maxItems int, cursor storage.Cursor) ([]storage.DataReference, storage.Cursor, error) {
return nil, storage.NewCursorAtEnd(), fmt.Errorf("Not implemented yet")
}

func (FailingRawStore) ReadRaw(ctx context.Context, reference storage.DataReference) (io.ReadCloser, error) {
return nil, fmt.Errorf("failed read raw")
}
Expand Down
4 changes: 4 additions & 0 deletions flytestdlib/storage/cached_rawstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (d *dummyStore) Head(ctx context.Context, reference DataReference) (Metadat
return d.HeadCb(ctx, reference)
}

func (s *dummyStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) {
return nil, NewCursorAtEnd(), fmt.Errorf("Not implemented yet")
}

func (d *dummyStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
return d.ReadRawCb(ctx, reference)
}
Expand Down
4 changes: 4 additions & 0 deletions flytestdlib/storage/mem_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
}, nil
}

func (s *InMemoryStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) {
return nil, NewCursorAtEnd(), fmt.Errorf("Not implemented yet")

Check warning on line 58 in flytestdlib/storage/mem_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/mem_store.go#L57-L58

Added lines #L57 - L58 were not covered by tests
}

func (s *InMemoryStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
if raw, found := s.cache[reference]; found {
return ioutil.NopCloser(bytes.NewReader(raw)), nil
Expand Down
38 changes: 38 additions & 0 deletions flytestdlib/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,41 @@ type Metadata interface {
ContentMD5() string
}

type CursorState int

const (
// Enum representing state of the cursor
AtStartCursorState CursorState = 0
AtEndCursorState CursorState = 1
AtCustomPosCursorState CursorState = 2
)

type Cursor struct {
cursorState CursorState
customPosition string
}

func NewCursorAtStart() Cursor {
return Cursor{
cursorState: AtStartCursorState,
customPosition: "",
}
}

func NewCursorAtEnd() Cursor {
return Cursor{
cursorState: AtEndCursorState,
customPosition: "",
}
}

func NewCursorFromCustomPosition(customPosition string) Cursor {
return Cursor{
cursorState: AtCustomPosCursorState,
customPosition: customPosition,
}
}

// DataStore is a simplified interface for accessing and storing data in one of the Cloud stores.
// Today we rely on Stow for multi-cloud support, but this interface abstracts that part
type DataStore struct {
Expand Down Expand Up @@ -78,6 +113,9 @@ type RawStore interface {
// Head gets metadata about the reference. This should generally be a light weight operation.
Head(ctx context.Context, reference DataReference) (Metadata, error)

// List gets a list of items given a prefix, using a paginated API
List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error)

// ReadRaw retrieves a byte array from the Blob store or an error
ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error)

Expand Down
46 changes: 46 additions & 0 deletions flytestdlib/storage/stow_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@
HeadFailure labeled.Counter
HeadLatency labeled.StopWatch

ListFailure labeled.Counter
ListLatency labeled.StopWatch

ReadFailure labeled.Counter
ReadOpenLatency labeled.StopWatch

Expand Down Expand Up @@ -251,6 +254,46 @@
return StowMetadata{exists: false}, errs.Wrapf(err, "path:%v", k)
}

func (s *StowStore) List(ctx context.Context, reference DataReference, maxItems int, cursor Cursor) ([]DataReference, Cursor, error) {
_, c, k, err := reference.Split()
if err != nil {
s.metrics.BadReference.Inc(ctx)
return nil, NewCursorAtEnd(), err

Check warning on line 261 in flytestdlib/storage/stow_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/stow_store.go#L260-L261

Added lines #L260 - L261 were not covered by tests
}

container, err := s.getContainer(ctx, locationIDMain, c)
if err != nil {
return nil, NewCursorAtEnd(), err

Check warning on line 266 in flytestdlib/storage/stow_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/stow_store.go#L266

Added line #L266 was not covered by tests
}

t := s.metrics.ListLatency.Start(ctx)
var stowCursor string
if cursor.cursorState == AtStartCursorState {
stowCursor = stow.CursorStart
} else if cursor.cursorState == AtEndCursorState {
return nil, NewCursorAtEnd(), fmt.Errorf("Cursor cannot be at end for the List call")

Check warning on line 274 in flytestdlib/storage/stow_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/stow_store.go#L274

Added line #L274 was not covered by tests
} else {
stowCursor = cursor.customPosition
}
items, stowCursor, err := container.Items(k, stowCursor, maxItems)
if err == nil {
results := make([]DataReference, len(items))
for index, item := range items {
results[index] = DataReference(item.URL().String())
}
if stow.IsCursorEnd(stowCursor) {
cursor = NewCursorAtEnd()
} else {
cursor = NewCursorFromCustomPosition(stowCursor)
}
t.Stop()
return results, cursor, nil
}

incFailureCounterForError(ctx, s.metrics.ListFailure, err)
return nil, NewCursorAtEnd(), errs.Wrapf(err, "path:%v", k)

Check warning on line 294 in flytestdlib/storage/stow_store.go

View check run for this annotation

Codecov / codecov/patch

flytestdlib/storage/stow_store.go#L293-L294

Added lines #L293 - L294 were not covered by tests
}

func (s *StowStore) ReadRaw(ctx context.Context, reference DataReference) (io.ReadCloser, error) {
_, c, k, err := reference.Split()
if err != nil {
Expand Down Expand Up @@ -434,6 +477,9 @@
HeadFailure: labeled.NewCounter("head_failure", "Indicates failure in HEAD for a given reference", scope, labeled.EmitUnlabeledMetric),
HeadLatency: labeled.NewStopWatch("head", "Indicates time to fetch metadata using the Head API", time.Millisecond, scope, labeled.EmitUnlabeledMetric),

ListFailure: labeled.NewCounter("list_failure", "Indicates failure in item listing for a given reference", scope, labeled.EmitUnlabeledMetric),
ListLatency: labeled.NewStopWatch("list", "Indicates time to fetch item listing using the List API", time.Millisecond, scope, labeled.EmitUnlabeledMetric),

ReadFailure: labeled.NewCounter("read_failure", "Indicates failure in GET for a given reference", scope, labeled.EmitUnlabeledMetric, failureTypeOption),
ReadOpenLatency: labeled.NewStopWatch("read_open", "Indicates time to first byte when reading", time.Millisecond, scope, labeled.EmitUnlabeledMetric),

Expand Down
96 changes: 94 additions & 2 deletions flytestdlib/storage/stow_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net/url"
"os"
"path/filepath"
"sort"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -73,8 +75,37 @@ func (m mockStowContainer) Item(id string) (stow.Item, error) {
return nil, stow.ErrNotFound
}

func (mockStowContainer) Items(prefix, cursor string, count int) ([]stow.Item, string, error) {
return []stow.Item{}, "", nil
func (m mockStowContainer) Items(prefix, cursor string, count int) ([]stow.Item, string, error) {
startIndex := 0
if cursor != "" {
index, err := strconv.Atoi(cursor)
if err != nil {
return nil, "", fmt.Errorf("Invalid cursor '%s'", cursor)
}
startIndex = index
}
endIndexExc := min(len(m.items), startIndex+count)

itemKeys := make([]string, len(m.items))
index := 0
for key := range m.items {
itemKeys[index] = key
index += 1
}
sort.Strings(itemKeys)

numItems := endIndexExc - startIndex
results := make([]stow.Item, numItems)
for index, itemKey := range itemKeys[startIndex:endIndexExc] {
results[index] = m.items[itemKey]
}

if endIndexExc == len(m.items) {
cursor = ""
} else {
cursor = fmt.Sprintf("%d", endIndexExc)
}
return results, cursor, nil
}

func (m mockStowContainer) RemoveItem(id string) error {
Expand Down Expand Up @@ -361,6 +392,67 @@ func TestStowStore_ReadRaw(t *testing.T) {
})
}

func TestStowStore_List(t *testing.T) {
const container = "container"
t.Run("Listing", func(t *testing.T) {
ctx := context.Background()
fn := fQNFn["s3"]
s, err := NewStowRawStore(fn(container), &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
}, nil, false, metrics)
assert.NoError(t, err)
writeTestFile(ctx, t, s, "s3://container/a/1")
writeTestFile(ctx, t, s, "s3://container/a/2")
var maxResults = 10
var dataReference DataReference = "s3://container/a"
items, cursor, err := s.List(ctx, dataReference, maxResults, NewCursorAtStart())
assert.NoError(t, err)
assert.Equal(t, NewCursorAtEnd(), cursor)
assert.Equal(t, []DataReference{"a/1", "a/2"}, items)
})

t.Run("Listing with pagination", func(t *testing.T) {
ctx := context.Background()
fn := fQNFn["s3"]
s, err := NewStowRawStore(fn(container), &mockStowLoc{
ContainerCb: func(id string) (stow.Container, error) {
if id == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
CreateContainerCb: func(name string) (stow.Container, error) {
if name == container {
return newMockStowContainer(container), nil
}
return nil, fmt.Errorf("container is not supported")
},
}, nil, false, metrics)
assert.NoError(t, err)
writeTestFile(ctx, t, s, "s3://container/a/1")
writeTestFile(ctx, t, s, "s3://container/a/2")
var maxResults = 1
var dataReference DataReference = "s3://container/a"
items, cursor, err := s.List(ctx, dataReference, maxResults, NewCursorAtStart())
assert.NoError(t, err)
assert.Equal(t, []DataReference{"a/1"}, items)
items, cursor, err = s.List(ctx, dataReference, maxResults, cursor)
assert.NoError(t, err)
assert.Equal(t, []DataReference{"a/2"}, items)
})
}

func TestNewLocalStore(t *testing.T) {
labeled.SetMetricKeys(contextutils.ProjectKey, contextutils.DomainKey, contextutils.WorkflowIDKey, contextutils.TaskIDKey)
t.Run("Valid config", func(t *testing.T) {
Expand Down
Loading