Skip to content

Commit

Permalink
feat(blobfs): Use AWS SDK v2 for blobfs
Browse files Browse the repository at this point in the history
Signed-off-by: Dave Henderson <[email protected]>
  • Loading branch information
hairyhenderson committed Nov 24, 2024
1 parent ba05387 commit e54fa68
Show file tree
Hide file tree
Showing 7 changed files with 411 additions and 69 deletions.
2 changes: 1 addition & 1 deletion awssmfs/awssm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"testing"
"testing/fstest"

"github.com/aws/aws-sdk-go-v2/aws"
smtypes "github.com/aws/aws-sdk-go-v2/service/secretsmanager/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/hairyhenderson/go-fsimpl/internal/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down
65 changes: 16 additions & 49 deletions blobfs/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (

azblobblob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/hairyhenderson/go-fsimpl"
"github.com/hairyhenderson/go-fsimpl/internal"
"github.com/hairyhenderson/go-fsimpl/internal/env"
Expand Down Expand Up @@ -144,7 +141,7 @@ func (f *blobFS) Open(name string) (fs.File, error) {
if f.bucket == nil {
bucket, err := f.openBucket()
if err != nil {
return nil, fmt.Errorf("open bucket: %w", err)
return nil, fmt.Errorf("open: %w", err)
}

f.bucket = bucket
Expand Down Expand Up @@ -185,7 +182,7 @@ func (f *blobFS) ReadFile(name string) ([]byte, error) {
if f.bucket == nil {
bucket, err := f.openBucket()
if err != nil {
return nil, fmt.Errorf("open bucket: %w", err)
return nil, fmt.Errorf("readFile: %w", err)
}

f.bucket = bucket
Expand All @@ -198,10 +195,8 @@ func (f *blobFS) ReadFile(name string) ([]byte, error) {
func (f *blobFS) newOpener(ctx context.Context, scheme string) (opener blob.BucketURLOpener, err error) {
switch scheme {
case s3blob.Scheme:
sess := f.initS3Session()

// see https://gocloud.dev/concepts/urls/#muxes
return &s3blob.URLOpener{ConfigProvider: sess}, nil
return &s3v2URLOpener{}, nil
case gcsblob.Scheme:
if env.GetenvFS(f.envfs, "GOOGLE_ANON") == "true" {
return &gcsblob.URLOpener{
Expand Down Expand Up @@ -230,21 +225,22 @@ func (f *blobFS) newOpener(ctx context.Context, scheme string) (opener blob.Buck
}

// initS3Session -
func (f *blobFS) initS3Session() *session.Session {
config := aws.NewConfig()
config = config.WithHTTPClient(f.hclient)
// Deprecated: this is for v1, but kept here for posterity
// func (f *blobFS) initS3Sessionv1() *session.Session {
// config := aws.NewConfig()
// config = config.WithHTTPClient(f.hclient)

if env.GetenvFS(f.envfs, "AWS_ANON") == "true" {
config = config.WithCredentials(credentials.AnonymousCredentials)
}
// if env.GetenvFS(f.envfs, "AWS_ANON") == "true" {
// config = config.WithCredentials(credentials.AnonymousCredentials)
// }

config = config.WithCredentialsChainVerboseErrors(true)
// config = config.WithCredentialsChainVerboseErrors(true)

return session.Must(session.NewSessionWithOptions(session.Options{
Config: *config,
SharedConfigState: session.SharedConfigEnable,
}))
}
// return session.Must(session.NewSessionWithOptions(session.Options{
// Config: *config,
// SharedConfigState: session.SharedConfigEnable,
// }))
// }

// copy/sanitize the URL for the Go CDK - it doesn't like params it can't parse
func (f *blobFS) cleanCdkURL(u url.URL) url.URL {
Expand Down Expand Up @@ -290,35 +286,6 @@ func (f *blobFS) cleanGSURL(u url.URL) url.URL {
return u
}

func (f *blobFS) cleanS3URL(u url.URL) url.URL {
q := u.Query()
for param := range q {
switch param {
case "region", "endpoint", "disableSSL", "s3ForcePathStyle":
default:
q.Del(param)
}
}

if q.Get("endpoint") == "" {
endpoint := env.GetenvFS(f.envfs, "AWS_S3_ENDPOINT")
if endpoint != "" {
q.Set("endpoint", endpoint)
}
}

if q.Get("region") == "" {
region := env.GetenvFS(f.envfs, "AWS_REGION", env.GetenvFS(f.envfs, "AWS_DEFAULT_REGION"))
if region != "" {
q.Set("region", region)
}
}

u.RawQuery = q.Encode()

return u
}

type blobFile struct {
ctx context.Context
reader *blob.Reader
Expand Down
33 changes: 23 additions & 10 deletions blobfs/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/johannesboyne/gofakes3"
"github.com/johannesboyne/gofakes3/backend/s3mem"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func setupTestS3Bucket(t *testing.T) *url.URL {
Expand Down Expand Up @@ -206,35 +207,35 @@ func TestBlobFS_ReadDir(t *testing.T) {
t.Setenv("AWS_ANON", "true")

fsys, err := New(tests.MustURL("s3://mybucket/?region=us-east-1&disableSSL=true&s3ForcePathStyle=true&endpoint=" + srvURL.Host))
assert.NoError(t, err)
require.NoError(t, err)

de, err := fs.ReadDir(fsys, "dir1")
assert.NoError(t, err)
require.NoError(t, err)
assert.Len(t, de, 2)

de, err = fs.ReadDir(fsys, ".")
assert.NoError(t, err)
require.NoError(t, err)
assert.Len(t, de, 5)

fi, err := de[0].Info()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, "dir1", fi.Name())

f, err := fsys.Open("dir1")
assert.NoError(t, err)
require.NoError(t, err)
assert.IsType(t, &blobFile{}, f)

fi, err = f.Stat()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, "dir1", fi.Name())

f, err = fsys.Open("file1")
assert.NoError(t, err)
require.NoError(t, err)

defer f.Close()

fi, err = f.Stat()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, fs.FileMode(0o444), fi.Mode())
}

Expand All @@ -247,8 +248,20 @@ func TestBlobFS_CleanCdkURL(t *testing.T) {
{"s3://foo/bar/baz", "s3://foo/bar/baz"},
{"s3://foo/bar/baz?type=hello/world", "s3://foo/bar/baz"},
{"s3://foo/bar/baz?region=us-east-1", "s3://foo/bar/baz?region=us-east-1"},
{"s3://foo/bar/baz?disableSSL=true&type=text/csv", "s3://foo/bar/baz?disableSSL=true"},
{"s3://foo/bar/baz?type=text/csv&s3ForcePathStyle=true&endpoint=1.2.3.4", "s3://foo/bar/baz?endpoint=1.2.3.4&s3ForcePathStyle=true"},
{"s3://foo/bar/baz?disableSSL=true&type=text/csv", "s3://foo/bar/baz?disable_https=true"},
{
"s3://foo/bar/baz?type=text/csv&s3ForcePathStyle=true&endpoint=1.2.3.4",
"s3://foo/bar/baz?endpoint=https%3A%2F%2F1.2.3.4&use_path_style=true",
},
{"s3://foo/bar/baz?disable_https=true&type=text/csv", "s3://foo/bar/baz?disable_https=true"},
{
"s3://foo/bar/baz?type=text/csv&use_path_style=true&endpoint=1.2.3.4",
"s3://foo/bar/baz?endpoint=https%3A%2F%2F1.2.3.4&use_path_style=true",
},
{
"s3://foo/bar/baz?disable_https=true&type=text/csv&use_path_style=true&endpoint=1.2.3.4:1234",
"s3://foo/bar/baz?disable_https=true&endpoint=http%3A%2F%2F1.2.3.4%3A1234&use_path_style=true",
},
{"gs://foo/bar/baz", "gs://foo/bar/baz"},
{"gs://foo/bar/baz?type=foo/bar", "gs://foo/bar/baz"},
{"gs://foo/bar/baz?access_id=123", "gs://foo/bar/baz?access_id=123"},
Expand Down
101 changes: 101 additions & 0 deletions blobfs/s3blob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package blobfs

import (
"net/url"

"github.com/hairyhenderson/go-fsimpl/internal/env"
)

// s3-specific blobfs methods

func (f *blobFS) cleanS3URL(u url.URL) url.URL {
q := u.Query()
translateV1Params(q)

// allow known query parameters, remove unknown ones
for param := range q {
switch param {
case "accelerate",
"anonymous",
"disable_https",
"dualstack",
"endpoint",
"fips",
"hostname_immutable",
"profile",
"rate_limiter_capacity",
"region",
"use_path_style":
// not relevant for read operations, but are be passed through to the
// Go CDK
case "kmskeyid", "ssetype":
default:
q.Del(param)
}
}

f.setParamsFromEnv(q)

ensureValidEndpointURL(q)

u.RawQuery = q.Encode()

return u
}

// translateV1Params translates v1 query parameters to v2 query parameters.
func translateV1Params(q url.Values) {
for param := range q {
switch param {
// changed to 'disable_https' in s3v2
case "disableSSL":
q.Set("disable_https", q.Get(param))
q.Del(param)
// changed to 'use_path_style' in s3v2
case "s3ForcePathStyle":
q.Set("use_path_style", q.Get(param))
q.Del(param)
}
}
}

func ensureValidEndpointURL(q url.Values) {
// if we have an endpoint, make sure it's a parseable URL with a scheme
if endpoint := q.Get("endpoint"); endpoint != "" {
u, err := url.Parse(endpoint)
if err != nil || u.Scheme == "" {
// try adding a schema - if disable_https is set, use http, otherwise https
if q.Get("disable_https") == "true" {
q.Del("endpoint")
q.Set("endpoint", "http://"+endpoint)
} else {
q.Del("endpoint")
q.Set("endpoint", "https://"+endpoint)
}
}
}
}

// setParamsFromEnv sets query parameters based on env vars
func (f *blobFS) setParamsFromEnv(q url.Values) {
if q.Get("endpoint") == "" {
endpoint := env.GetenvFS(f.envfs, "AWS_S3_ENDPOINT")
if endpoint != "" {
q.Set("endpoint", endpoint)
}
}

if q.Get("region") == "" {
region := env.GetenvFS(f.envfs, "AWS_REGION", env.GetenvFS(f.envfs, "AWS_DEFAULT_REGION"))
if region != "" {
q.Set("region", region)
}
}

if q.Get("anonymous") == "" {
anon := env.GetenvFS(f.envfs, "AWS_ANON")
if anon != "" {
q.Set("anonymous", anon)
}
}
}
Loading

0 comments on commit e54fa68

Please sign in to comment.