Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add extra header to signed url #4971

Merged
merged 24 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions flyteadmin/dataproxy/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp
// If it doesn't exist, then proceed as normal.

if len(req.Project) == 0 || len(req.Domain) == 0 {
logger.Infof(ctx, "project and domain are required parameters")
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument, "project and domain are required parameters")
}

// At least one of the hash or manually given prefix must be provided.
if len(req.FilenameRoot) == 0 && len(req.ContentMd5) == 0 {
logger.Infof(ctx, "content_md5 or filename_root is a required parameter")
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.NewFlyteAdminErrorf(codes.InvalidArgument,
"content_md5 or filename_root is a required parameter")
}
Expand All @@ -63,10 +65,12 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp
knownLocation, err := createStorageLocation(ctx, s.dataStore, s.cfg.Upload,
req.Project, req.Domain, req.FilenameRoot, req.Filename)
if err != nil {
logger.Errorf(ctx, "failed to create storage location. Error %v", err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create storage location, Error: %v", err)
}
metadata, err := s.dataStore.Head(ctx, knownLocation)
if err != nil {
logger.Errorf(ctx, "failed to check if file exists at location [%s], Error: %v", knownLocation.String(), err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to check if file exists at location [%s], Error: %v", knownLocation.String(), err)
}
if metadata.Exists() {
Expand All @@ -76,12 +80,19 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp
if len(req.ContentMd5) == 0 {
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, "file already exists at location [%v], specify a matching hash if you wish to rewrite", knownLocation)
}
// Re-encode the hash 3-ways to support matching, hex, base32 and base64
hexDigest := hex.EncodeToString(req.ContentMd5)
base32Digest := base32.StdEncoding.EncodeToString(req.ContentMd5)
base64Digest := base64.StdEncoding.EncodeToString(req.ContentMd5)
if hexDigest != metadata.Etag() && base32Digest != metadata.Etag() && base64Digest != metadata.Etag() {
logger.Debugf(ctx, "File already exists at location [%v] but hashes do not match", knownLocation)
if len(metadata.ContentMD5()) == 0 {
// For backward compatibility, dataproxy assumes that the Etag exists if ContentMD5 is not in the metadata.
// Data proxy won't allow people to overwrite the file if both the Etag and the ContentMD5 do not exist.
hexDigest := hex.EncodeToString(req.ContentMd5)
base32Digest := base32.StdEncoding.EncodeToString(req.ContentMd5)
if hexDigest != metadata.Etag() && base32Digest != metadata.Etag() && base64Digest != metadata.Etag() {
logger.Errorf(ctx, "File already exists at location [%v] but hashes do not match", knownLocation)
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, "file already exists at location [%v], specify a matching hash if you wish to rewrite", knownLocation)
}
}
if len(metadata.ContentMD5()) != 0 && base64Digest != metadata.ContentMD5() {
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
logger.Errorf(ctx, "File already exists at location [%v] but hashes do not match", knownLocation)
return nil, errors.NewFlyteAdminErrorf(codes.AlreadyExists, "file already exists at location [%v], specify a matching hash if you wish to rewrite", knownLocation)
}
logger.Debugf(ctx, "File already exists at location [%v] but allowing rewrite", knownLocation)
Expand All @@ -105,7 +116,7 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp
req.Filename = rand.String(s.cfg.Upload.DefaultFileNameLength)
}

md5 := base64.StdEncoding.EncodeToString(req.ContentMd5)
base64digestMD5 := base64.StdEncoding.EncodeToString(req.ContentMd5)

var prefix string
if len(req.FilenameRoot) > 0 {
Expand All @@ -117,23 +128,33 @@ func (s Service) CreateUploadLocation(ctx context.Context, req *service.CreateUp
storagePath, err := createStorageLocation(ctx, s.dataStore, s.cfg.Upload,
req.Project, req.Domain, prefix, req.Filename)
if err != nil {
logger.Errorf(ctx, "failed to create shardedStorageLocation. Error %v", err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create shardedStorageLocation, Error: %v", err)
}

metadata := make(map[string]string)

if req.AddMetadata {
metadata[storage.FlyteContentMD5] = base64digestMD5
}
pingsutw marked this conversation as resolved.
Show resolved Hide resolved

resp, err := s.dataStore.CreateSignedURL(ctx, storagePath, storage.SignedURLProperties{
Scope: stow.ClientMethodPut,
ExpiresIn: req.ExpiresIn.AsDuration(),
ContentMD5: md5,
ContentMD5: base64digestMD5,
Metadata: metadata,
})

if err != nil {
logger.Errorf(ctx, "failed to create signed url. Error:", err)
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to create a signed url. Error: %v", err)
}

return &service.CreateUploadLocationResponse{
SignedUrl: resp.URL.String(),
NativeUrl: storagePath.String(),
ExpiresAt: timestamppb.New(time.Now().Add(req.ExpiresIn.AsDuration())),
Headers: getExtraHeaders(storagePath, base64digestMD5, req.AddMetadata),
}, nil
}

Expand Down Expand Up @@ -268,6 +289,21 @@ func (s Service) validateCreateDownloadLinkRequest(req *service.CreateDownloadLi
return req, nil
}

func getExtraHeaders(reference storage.DataReference, contentMd5 string, addMetadata bool) map[string]string {
headers := map[string]string{"Content-Length": strconv.Itoa(len(contentMd5)), "Content-MD5": contentMd5}
if addMetadata {
if strings.HasPrefix(reference.String(), "s3://") {
headers[fmt.Sprintf("x-amz-meta-%s", storage.FlyteContentMD5)] = contentMd5
} else if strings.HasPrefix(reference.String(), "gs://") {
headers[fmt.Sprintf("x-goog-meta-%s", storage.FlyteContentMD5)] = contentMd5
} else if strings.HasPrefix(reference.String(), "abfs://") {
headers[fmt.Sprintf("x-ms-meta-%s", storage.FlyteContentMD5)] = contentMd5
headers["x-ms-blob-type"] = "BlockBlob" // https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob?tabs=microsoft-entra-id#remarks
}
pingsutw marked this conversation as resolved.
Show resolved Hide resolved
}
return headers
}

// createStorageLocation creates a location in storage destination to maximize read/write performance in most
// block stores. The final location should look something like: s3://<my bucket>/<file name>
func createStorageLocation(ctx context.Context, store *storage.DataStore,
Expand Down
14 changes: 14 additions & 0 deletions flyteadmin/dataproxy/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,3 +422,17 @@ func TestService_Error(t *testing.T) {
assert.Error(t, err, "no task executions")
})
}

func TestGetExtraHeader(t *testing.T) {
reference := storage.DataReference("s3://bucket/key")
headers := getExtraHeaders(reference, "md5", true)
assert.Equal(t, map[string]string{"Content-MD5": "md5", "Content-Length": "3", "x-amz-meta-flyteContentMD5": "md5"}, headers)

reference = "gs://bucket/key"
headers = getExtraHeaders(reference, "md5", true)
assert.Equal(t, map[string]string{"Content-MD5": "md5", "Content-Length": "3", "x-goog-meta-flyteContentMD5": "md5"}, headers)

reference = "abfs://bucket/key"
headers = getExtraHeaders(reference, "md5", true)
assert.Equal(t, map[string]string{"Content-MD5": "md5", "Content-Length": "3", "x-ms-meta-flyteContentMD5": "md5", "x-ms-blob-type": "BlockBlob"}, headers)
}
2 changes: 2 additions & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -229,3 +229,5 @@ replace (
k8s.io/kube-openapi => k8s.io/kube-openapi v0.0.0-20230905202853-d090da108d2f
sigs.k8s.io/controller-runtime => sigs.k8s.io/controller-runtime v0.16.2
)

replace github.com/flyteorg/stow => github.com/pingsutw/stow v0.3.6-0.20240229093623-86cb63d6001a
4 changes: 2 additions & 2 deletions flyteadmin/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ github.com/fatih/structs v1.1.0/go.mod h1:9NiDSp5zOcgEDl+j00MP/WkGVPOlPRLejGD8Ga
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/flyteorg/stow v0.3.8 h1:4a6BtfgDR86fUwa48DkkZTcp6WK4oQXSfewPd/kN0Z4=
github.com/flyteorg/stow v0.3.8/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
Expand Down Expand Up @@ -1096,6 +1094,8 @@ github.com/pierrec/lz4 v2.2.6+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi
github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.5.2+incompatible h1:WCjObylUIOlKy/+7Abdn34TLIkXiA4UWUMhxq9m9ZXI=
github.com/pierrec/lz4 v2.5.2+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pingsutw/stow v0.3.6-0.20240229093623-86cb63d6001a h1:7LOllqeZkOFFAmDnmsgSM5+sLeHRxDcZQO8+f7RbNaw=
github.com/pingsutw/stow v0.3.6-0.20240229093623-86cb63d6001a/go.mod h1:fArjMpsYJNWkp/hyDKKdbcv07gxbuLmKFcb7YT1aSOM=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
17 changes: 17 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/service/dataproxy_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading