Skip to content

Commit

Permalink
feat(blobfs): Use AWS SDK v2 for blobfs
Browse files Browse the repository at this point in the history
Signed-off-by: Dave Henderson <[email protected]>
  • Loading branch information
hairyhenderson committed Nov 23, 2024
1 parent 65102bf commit 22cb0cb
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 34 deletions.
2 changes: 1 addition & 1 deletion awssmfs/awssm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"testing"
"testing/fstest"

"github.com/aws/aws-sdk-go-v2/aws"
smtypes "github.com/aws/aws-sdk-go-v2/service/secretsmanager/types"
"github.com/aws/aws-sdk-go/aws"
"github.com/hairyhenderson/go-fsimpl/internal/tests"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down
68 changes: 47 additions & 21 deletions blobfs/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"io/fs"
"log/slog"
"net/http"
"net/url"
"os"
Expand All @@ -15,9 +16,6 @@ import (

azblobblob "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/hairyhenderson/go-fsimpl"
"github.com/hairyhenderson/go-fsimpl/internal"
"github.com/hairyhenderson/go-fsimpl/internal/env"
Expand Down Expand Up @@ -113,6 +111,12 @@ func (f *blobFS) openBucket() (*blob.Bucket, error) {

u := f.cleanCdkURL(*f.base)

// TODO: remove before merging
slog.InfoContext(f.ctx, "opening bucket",
slog.String("base", f.base.String()),
slog.String("cleaned", u.String()),
)

bucket, err := o.OpenBucketURL(f.ctx, &u)
if err != nil {
return nil, fmt.Errorf("open bucket: %w", err)
Expand Down Expand Up @@ -144,7 +148,7 @@ func (f *blobFS) Open(name string) (fs.File, error) {
if f.bucket == nil {
bucket, err := f.openBucket()
if err != nil {
return nil, fmt.Errorf("open bucket: %w", err)
return nil, fmt.Errorf("open: %w", err)
}

f.bucket = bucket
Expand Down Expand Up @@ -185,7 +189,7 @@ func (f *blobFS) ReadFile(name string) ([]byte, error) {
if f.bucket == nil {
bucket, err := f.openBucket()
if err != nil {
return nil, fmt.Errorf("open bucket: %w", err)
return nil, fmt.Errorf("readFile: %w", err)
}

f.bucket = bucket
Expand All @@ -198,10 +202,8 @@ func (f *blobFS) ReadFile(name string) ([]byte, error) {
func (f *blobFS) newOpener(ctx context.Context, scheme string) (opener blob.BucketURLOpener, err error) {
switch scheme {
case s3blob.Scheme:
sess := f.initS3Session()

// see https://gocloud.dev/concepts/urls/#muxes
return &s3blob.URLOpener{ConfigProvider: sess}, nil
return &s3blob.URLOpener{UseV2: true}, nil
case gcsblob.Scheme:
if env.GetenvFS(f.envfs, "GOOGLE_ANON") == "true" {
return &gcsblob.URLOpener{
Expand Down Expand Up @@ -230,21 +232,22 @@ func (f *blobFS) newOpener(ctx context.Context, scheme string) (opener blob.Buck
}

// initS3Session -
func (f *blobFS) initS3Session() *session.Session {
config := aws.NewConfig()
config = config.WithHTTPClient(f.hclient)
// Deprecated: this is for v1, but kept here for posterity
// func (f *blobFS) initS3Sessionv1() *session.Session {
// config := aws.NewConfig()
// config = config.WithHTTPClient(f.hclient)

if env.GetenvFS(f.envfs, "AWS_ANON") == "true" {
config = config.WithCredentials(credentials.AnonymousCredentials)
}
// if env.GetenvFS(f.envfs, "AWS_ANON") == "true" {
// config = config.WithCredentials(credentials.AnonymousCredentials)
// }

config = config.WithCredentialsChainVerboseErrors(true)
// config = config.WithCredentialsChainVerboseErrors(true)

return session.Must(session.NewSessionWithOptions(session.Options{
Config: *config,
SharedConfigState: session.SharedConfigEnable,
}))
}
// return session.Must(session.NewSessionWithOptions(session.Options{
// Config: *config,
// SharedConfigState: session.SharedConfigEnable,
// }))
// }

// copy/sanitize the URL for the Go CDK - it doesn't like params it can't parse
func (f *blobFS) cleanCdkURL(u url.URL) url.URL {
Expand Down Expand Up @@ -294,7 +297,15 @@ func (f *blobFS) cleanS3URL(u url.URL) url.URL {
q := u.Query()
for param := range q {
switch param {
case "region", "endpoint", "disableSSL", "s3ForcePathStyle":
case "disableSSL":
// changed to 'disable_https' in s3v2
q.Set("disable_https", q.Get(param))
q.Del(param)
case "s3ForcePathStyle":
// changed to 'use_path_style' in s3v2
q.Set("use_path_style", q.Get(param))
q.Del(param)
case "region", "endpoint", "disable_https", "use_path_style":
default:
q.Del(param)
}
Expand All @@ -307,6 +318,21 @@ func (f *blobFS) cleanS3URL(u url.URL) url.URL {
}
}

if endpoint := q.Get("endpoint"); endpoint != "" {
// if we have an endpoint, make sure it's a parseable URL with a scheme
u, err := url.Parse(endpoint)
if err != nil || u.Scheme == "" {
// try adding a schema - if disable_https is set, use http, otherwise https
if q.Get("disable_https") == "true" {
q.Del("endpoint")
q.Set("endpoint", "http://"+endpoint)
} else {
q.Del("endpoint")
q.Set("endpoint", "https://"+endpoint)
}
}
}

if q.Get("region") == "" {
region := env.GetenvFS(f.envfs, "AWS_REGION", env.GetenvFS(f.envfs, "AWS_DEFAULT_REGION"))
if region != "" {
Expand Down
35 changes: 24 additions & 11 deletions blobfs/blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/johannesboyne/gofakes3"
"github.com/johannesboyne/gofakes3/backend/s3mem"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func setupTestS3Bucket(t *testing.T) *url.URL {
Expand Down Expand Up @@ -206,36 +207,36 @@ func TestBlobFS_ReadDir(t *testing.T) {
t.Setenv("AWS_ANON", "true")

fsys, err := New(tests.MustURL("s3://mybucket/?region=us-east-1&disableSSL=true&s3ForcePathStyle=true&endpoint=" + srvURL.Host))
assert.NoError(t, err)
require.NoError(t, err)

de, err := fs.ReadDir(fsys, "dir1")
assert.NoError(t, err)
require.NoError(t, err)
assert.Len(t, de, 2)

de, err = fs.ReadDir(fsys, ".")
assert.NoError(t, err)
require.NoError(t, err)
assert.Len(t, de, 5)

fi, err := de[0].Info()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, "dir1", fi.Name())

f, err := fsys.Open("dir1")
assert.NoError(t, err)
require.NoError(t, err)
assert.IsType(t, &blobFile{}, f)

fi, err = f.Stat()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, "dir1", fi.Name())

f, err = fsys.Open("file1")
assert.NoError(t, err)
require.NoError(t, err)

defer f.Close()

fi, err = f.Stat()
assert.NoError(t, err)
assert.Equal(t, fs.FileMode(0o444), fi.Mode())
require.NoError(t, err)
assert.Equal(t, fs.FileMode(0o644), fi.Mode())
}

func TestBlobFS_CleanCdkURL(t *testing.T) {
Expand All @@ -247,8 +248,20 @@ func TestBlobFS_CleanCdkURL(t *testing.T) {
{"s3://foo/bar/baz", "s3://foo/bar/baz"},
{"s3://foo/bar/baz?type=hello/world", "s3://foo/bar/baz"},
{"s3://foo/bar/baz?region=us-east-1", "s3://foo/bar/baz?region=us-east-1"},
{"s3://foo/bar/baz?disableSSL=true&type=text/csv", "s3://foo/bar/baz?disableSSL=true"},
{"s3://foo/bar/baz?type=text/csv&s3ForcePathStyle=true&endpoint=1.2.3.4", "s3://foo/bar/baz?endpoint=1.2.3.4&s3ForcePathStyle=true"},
{"s3://foo/bar/baz?disableSSL=true&type=text/csv", "s3://foo/bar/baz?disable_https=true"},
{
"s3://foo/bar/baz?type=text/csv&s3ForcePathStyle=true&endpoint=1.2.3.4",
"s3://foo/bar/baz?endpoint=https%3A%2F%2F1.2.3.4&use_path_style=true",
},
{"s3://foo/bar/baz?disable_https=true&type=text/csv", "s3://foo/bar/baz?disable_https=true"},
{
"s3://foo/bar/baz?type=text/csv&use_path_style=true&endpoint=1.2.3.4",
"s3://foo/bar/baz?endpoint=https%3A%2F%2F1.2.3.4&use_path_style=true",
},
{
"s3://foo/bar/baz?disable_https=true&type=text/csv&use_path_style=true&endpoint=1.2.3.4:1234",
"s3://foo/bar/baz?disable_https=true&endpoint=http%3A%2F%2F1.2.3.4%3A1234&use_path_style=true",
},
{"gs://foo/bar/baz", "gs://foo/bar/baz"},
{"gs://foo/bar/baz?type=foo/bar", "gs://foo/bar/baz"},
{"gs://foo/bar/baz?access_id=123", "gs://foo/bar/baz?access_id=123"},
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.22.4

require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.5.0
github.com/aws/aws-sdk-go v1.55.5
github.com/aws/aws-sdk-go-v2 v1.32.5
github.com/aws/aws-sdk-go-v2/config v1.28.4
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.20
Expand Down Expand Up @@ -57,6 +56,7 @@ require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/ProtonMail/go-crypto v1.0.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/aws/aws-sdk-go v1.55.5 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.3 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.45 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.10 // indirect
Expand Down
6 changes: 6 additions & 0 deletions url_schemes.md
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,12 @@ filesystem.
- `region`: The AWS region for requests. Defaults to the value from the
`AWS_REGION` or `AWS_DEFAULT_REGION` environment variables, or the EC2
region if used in AWS EC2.
- `profile`: The shared config profile name to load from the shared
AWS configuration files. Defaults to the value from the `AWS_PROFILE` or
`AWS_DEFAULT_PROFILE` environment variables, or "default" if none are set.

(old)
TODO: delete these if not relevant:
- `endpoint`: The endpoint (`hostname`, `hostname:port`, or fully qualified
URI). Useful for using a different S3-compatible object storage server. You
can also set the `AWS_S3_ENDPOINT` environment variable.
Expand Down

0 comments on commit 22cb0cb

Please sign in to comment.