Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Remove content md5 requirement (#587)
Browse files Browse the repository at this point in the history
Signed-off-by: Yee Hing Tong <[email protected]>
  • Loading branch information
wild-endeavor authored Jul 13, 2023
1 parent 1fc3541 commit b621b27
Show file tree
Hide file tree
Showing 5 changed files with 211 additions and 193 deletions.
41 changes: 39 additions & 2 deletions flyteadmin/dataproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/base32"
"encoding/base64"
"encoding/hex"
"fmt"
"net/url"
"reflect"
Expand Down Expand Up @@ -51,12 +52,48 @@ type Service struct {
func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUploadLocationRequest) (
*service.CreateUploadLocationResponse, error) {

// Basically if the full file name is user specified (non random, non-hash-derived), then we need to check if it exists.
// If it exists, and a hash was provided, then check if it matches. If it matches, then proceed as normal otherwise fail.
// If it doesn't exist, then proceed as normal.

if len(req.Project) == 0 || len(req.Domain) == 0 {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "project and domain are required parameters")
}

if len(req.ContentMd5) == 0 {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "content_md5 is a required parameter")
// At least one of the hash or manually given prefix must be provided.
if len(req.FilenameRoot) == 0 && len(req.ContentMd5) == 0 {
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"content_md5 or filename_root is a required parameter")
}

// If we fall in here, that means that the full path is deterministic and we should check for existence.
if len(req.Filename) > 0 && len(req.FilenameRoot) > 0 {
knownLocation, err := createStorageLocation(ctx, s.dataStore, s.cfg.Upload,
req.Project, req.Domain, req.FilenameRoot, req.Filename)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create storage location, Error: %v", err)
}
metadata, err := s.dataStore.Head(ctx, knownLocation)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to check if file exists at location [%s], Error: %v", knownLocation.String(), err)
}
if metadata.Exists() {
// Basically if the file exists, then error unless the user also provided a hash and it matches.
// Keep in mind this is just a best effort attempt. There can easily be race conditions where two users
// request the same file at the same time and one of the writes is lost.
if len(req.ContentMd5) == 0 {
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, "file already exists at location [%v], specify a matching hash if you wish to rewrite", knownLocation)
}
// Re-encode the hash 3-ways to support matching, hex, base32 and base64
hexDigest := hex.EncodeToString(req.ContentMd5)
base32Digest := base32.StdEncoding.EncodeToString(req.ContentMd5)
base64Digest := base64.StdEncoding.EncodeToString(req.ContentMd5)
if hexDigest != metadata.Etag() && base32Digest != metadata.Etag() && base64Digest != metadata.Etag() {
logger.Debug(ctx, "File already exists at location [%v] but hashes do not match", knownLocation)
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, "file already exists at location [%v], specify a matching hash if you wish to rewrite", knownLocation)
}
logger.Debug(ctx, "File already exists at location [%v] but allowing rewrite", knownLocation)
}
}

if expiresIn := req.ExpiresIn; expiresIn != nil {
Expand Down
77 changes: 77 additions & 0 deletions flyteadmin/dataproxy/service_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package dataproxy

import (
"bytes"
"context"
"crypto/md5" // #nosec
"net/url"
"testing"
"time"

Expand Down Expand Up @@ -85,6 +88,80 @@ func TestCreateUploadLocation(t *testing.T) {
})
}

type UploadableMemProtobufStore struct {
storage.ComposedProtobufStore
}

func (u *UploadableMemProtobufStore) CreateSignedURL(ctx context.Context, reference storage.DataReference, properties storage.SignedURLProperties) (storage.SignedURLResponse, error) {
return storage.SignedURLResponse{
URL: url.URL{
Scheme: "http",
Host: "localhost",
Path: "/blah",
},
}, nil
}

func TestCreateUploadLocationMore(t *testing.T) {
dataStore, err := storage.NewDataStore(&storage.Config{Type: storage.TypeMemory}, promutils.NewTestScope())
// Overwrite the CreateSignedURL function to return a fixed value
uploadable := &UploadableMemProtobufStore{
dataStore.ComposedProtobufStore,
}

ds := storage.DataStore{
ComposedProtobufStore: uploadable,
ReferenceConstructor: dataStore.ReferenceConstructor,
}

assert.NoError(t, err)
nodeExecutionManager := &mocks.MockNodeExecutionManager{}
taskExecutionManager := &mocks.MockTaskExecutionManager{}
s, err := NewService(config.DataProxyConfig{}, nodeExecutionManager, &ds, taskExecutionManager)
assert.NoError(t, err)

exists, err := createStorageLocation(context.TODO(), s.dataStore, s.cfg.Upload,
"flytesnacks", "development", "known_parent_folder", "myfile.txt")
assert.NoError(t, err)
err = dataStore.WriteRaw(context.TODO(), exists, 5, storage.Options{}, bytes.NewReader([]byte("hello")))
assert.NoError(t, err)

t.Run("no need for content md5", func(t *testing.T) {
_, err = s.CreateUploadLocation(context.Background(), &service.CreateUploadLocationRequest{
Project: "flytesnacks",
Domain: "development",
Filename: "myotherfile.txt",
ContentMd5: nil,
FilenameRoot: "known_parent_folder",
})
assert.NoError(t, err)
})

t.Run("already exists errors", func(t *testing.T) {
_, err = s.CreateUploadLocation(context.Background(), &service.CreateUploadLocationRequest{
Project: "flytesnacks",
Domain: "development",
Filename: "myfile.txt",
ContentMd5: nil,
FilenameRoot: "known_parent_folder",
})
assert.ErrorContainsf(t, err, "already exists", "expected error to contain already exists")
})

t.Run("already exists but matching content md5", func(t *testing.T) {
m := md5.Sum([]byte("hello")) // #nosec
resp, err := s.CreateUploadLocation(context.Background(), &service.CreateUploadLocationRequest{
Project: "flytesnacks",
Domain: "development",
Filename: "myfile.txt",
ContentMd5: m[:],
FilenameRoot: "known_parent_folder",
})
assert.NoError(t, err)
assert.Equal(t, "/flytesnacks/development/known_parent_folder/myfile.txt", resp.GetNativeUrl())
})
}

func TestCreateDownloadLink(t *testing.T) {
dataStore := commonMocks.GetMockStorageClient()
nodeExecutionManager := &mocks.MockNodeExecutionManager{}
Expand Down
58 changes: 29 additions & 29 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module github.com/flyteorg/flyteadmin
go 1.19

require (
cloud.google.com/go/iam v0.3.0
cloud.google.com/go/storage v1.22.0
cloud.google.com/go/iam v0.13.0
cloud.google.com/go/storage v1.28.1
github.com/NYTimes/gizmo v1.3.6
github.com/Selvatico/go-mocket v1.0.7
github.com/aws/aws-sdk-go v1.44.2
Expand All @@ -16,16 +16,16 @@ require (
github.com/flyteorg/flyteidl v1.5.11
github.com/flyteorg/flyteplugins v1.0.67
github.com/flyteorg/flytepropeller v1.1.98
github.com/flyteorg/flytestdlib v1.0.15
github.com/flyteorg/flytestdlib v1.0.20
github.com/flyteorg/stow v0.3.6
github.com/ghodss/yaml v1.0.0
github.com/go-gormigrate/gormigrate/v2 v2.0.0
github.com/gogo/protobuf v1.3.2
github.com/golang-jwt/jwt/v4 v4.4.1
github.com/golang/glog v1.0.0
github.com/golang/protobuf v1.5.2
github.com/golang/glog v1.1.0
github.com/golang/protobuf v1.5.3
github.com/google/uuid v1.3.0
github.com/googleapis/gax-go/v2 v2.3.0
github.com/googleapis/gax-go/v2 v2.7.1
github.com/gorilla/handlers v1.5.1
github.com/gorilla/securecookie v1.1.1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
Expand All @@ -35,7 +35,7 @@ require (
github.com/jackc/pgconn v1.13.0
github.com/lestrrat-go/jwx v1.1.6
github.com/magiconair/properties v1.8.6
github.com/mitchellh/mapstructure v1.4.3
github.com/mitchellh/mapstructure v1.5.0
github.com/ory/fosite v0.42.2
github.com/ory/x v0.0.214
github.com/pkg/errors v0.9.1
Expand All @@ -45,13 +45,13 @@ require (
github.com/sendgrid/sendgrid-go v3.10.0+incompatible
github.com/spf13/cobra v1.4.0
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5
golang.org/x/time v0.0.0-20220210224613-90d013bbcef8
google.golang.org/api v0.76.0
google.golang.org/genproto v0.0.0-20220426171045-31bebdecfb46
google.golang.org/grpc v1.46.0
google.golang.org/protobuf v1.28.0
github.com/stretchr/testify v1.8.4
golang.org/x/oauth2 v0.7.0
golang.org/x/time v0.3.0
google.golang.org/api v0.114.0
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
google.golang.org/grpc v1.56.1
google.golang.org/protobuf v1.30.0
gorm.io/driver/mysql v1.4.4
gorm.io/driver/postgres v1.4.5
gorm.io/driver/sqlite v1.1.1
Expand All @@ -69,10 +69,10 @@ require (
)

require (
cloud.google.com/go v0.101.0 // indirect
cloud.google.com/go/compute v1.6.1 // indirect
cloud.google.com/go/kms v1.2.0 // indirect
cloud.google.com/go/pubsub v1.10.1 // indirect
cloud.google.com/go v0.110.0 // indirect
cloud.google.com/go/compute v1.19.1 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/pubsub v1.30.0 // indirect
github.com/Azure/azure-sdk-for-go v63.4.0+incompatible // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.18 // indirect
Expand All @@ -85,7 +85,7 @@ require (
github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coocood/freecache v1.1.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v3 v3.0.0 // indirect
Expand All @@ -108,9 +108,9 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/googleapis/go-type-adapters v1.0.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.3 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/hashicorp/go-uuid v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
Expand Down Expand Up @@ -160,21 +160,21 @@ require (
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/sendgrid/rest v2.6.8+incompatible // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.11.0 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
go.opencensus.io v0.23.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/exp v0.0.0-20220722155223-a9213eeb770e // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/net v0.9.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/term v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/tools v0.4.0 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
Expand Down
Loading

0 comments on commit b621b27

Please sign in to comment.