Skip to content

Commit

Permalink
Supporting gs (Google Cloud Storage) datasources
Browse files Browse the repository at this point in the history
Signed-off-by: Dave Henderson <[email protected]>
  • Loading branch information
hairyhenderson committed Oct 15, 2019
1 parent b6d60b4 commit 428d78c
Show file tree
Hide file tree
Showing 167 changed files with 81,114 additions and 19 deletions.
1 change: 1 addition & 0 deletions data/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (d *Data) registerReaders() {
d.sourceReaders["vault+http"] = readVault
d.sourceReaders["vault+https"] = readVault
d.sourceReaders["s3"] = readBlob
d.sourceReaders["gs"] = readBlob
}

// lookupReader - return the reader function for the given scheme
Expand Down
84 changes: 65 additions & 19 deletions data/datasource_blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@ import (
"path"
"strings"

"gocloud.dev/blob"

gaws "github.com/hairyhenderson/gomplate/aws"
"github.com/hairyhenderson/gomplate/env"
"github.com/pkg/errors"

"gocloud.dev/blob"
"gocloud.dev/blob/gcsblob"
"gocloud.dev/blob/s3blob"
"gocloud.dev/gcp"
)

func readBlob(source *Source, args ...string) (output []byte, err error) {
if len(args) >= 2 {
return nil, errors.New("Maximum two arguments to s3 datasource: alias, extraPath")
return nil, errors.New("Maximum two arguments to blob datasource: alias, extraPath")
}

ctx := context.TODO()
Expand All @@ -31,14 +32,16 @@ func readBlob(source *Source, args ...string) (output []byte, err error) {
key = path.Join(key, args[0])
}

// set up a "regular" gomplate AWS SDK session
sess := gaws.SDKSession()
// see https://gocloud.dev/concepts/urls/#muxes
opener := &s3blob.URLOpener{ConfigProvider: sess}
opener, err := newOpener(ctx, source.URL)
if err != nil {
return nil, err
}

mux := blob.URLMux{}
mux.RegisterBucket(s3blob.Scheme, opener)
mux.RegisterBucket(source.URL.Scheme, opener)

bucket, err := mux.OpenBucket(ctx, blobURL(source.URL))
u := blobURL(source.URL)
bucket, err := mux.OpenBucket(ctx, u)
if err != nil {
return nil, err
}
Expand All @@ -58,10 +61,38 @@ func readBlob(source *Source, args ...string) (output []byte, err error) {
return data, err
}

// create the correct kind of blob.BucketURLOpener for the given URL
func newOpener(ctx context.Context, u *url.URL) (opener blob.BucketURLOpener, err error) {
switch u.Scheme {
case "s3":
// set up a "regular" gomplate AWS SDK session
sess := gaws.SDKSession()
// see https://gocloud.dev/concepts/urls/#muxes
opener = &s3blob.URLOpener{ConfigProvider: sess}
case "gs":
creds, err := gcp.DefaultCredentials(ctx)
if err != nil {
return nil, errors.Wrap(err, "failed to retrieve GCP credentials")
}

client, err := gcp.NewHTTPClient(
gcp.DefaultTransport(),
gcp.CredentialsTokenSource(creds))
if err != nil {
return nil, errors.Wrap(err, "failed to create GCP HTTP client")
}
opener = &gcsblob.URLOpener{
Client: client,
}
}
return opener, nil
}

func getBlob(ctx context.Context, bucket *blob.Bucket, key string) (mediaType string, data []byte, err error) {
key = strings.TrimPrefix(key, "/")
attr, err := bucket.Attributes(ctx, key)
if err != nil {
return "", nil, err
return "", nil, errors.Wrapf(err, "failed to retrieve attributes for %s", key)
}
if attr.ContentType != "" {
mt, _, e := mime.ParseMediaType(attr.ContentType)
Expand All @@ -71,7 +102,7 @@ func getBlob(ctx context.Context, bucket *blob.Bucket, key string) (mediaType st
mediaType = mt
}
data, err = bucket.ReadAll(ctx, key)
return mediaType, data, err
return mediaType, data, errors.Wrapf(err, "failed to read %s", key)
}

// calls the bucket listing API, returning a JSON Array
Expand Down Expand Up @@ -109,18 +140,33 @@ func listBucket(ctx context.Context, bucket *blob.Bucket, path string) (mediaTyp
func blobURL(u *url.URL) string {
out, _ := url.Parse(u.String())
q := out.Query()

for param := range q {
switch param {
case "region", "endpoint", "disableSSL", "s3ForcePathStyle":
default:
q.Del(param)
switch u.Scheme {
case "s3":
switch param {
case "region", "endpoint", "disableSSL", "s3ForcePathStyle":
default:
q.Del(param)
}
case "gs":
switch param {
case "access_id", "private_key_path":
default:
q.Del(param)
}
}
}
// handle AWS_S3_ENDPOINT env var
endpoint := env.Getenv("AWS_S3_ENDPOINT")
if endpoint != "" {
q.Set("endpoint", endpoint)

if u.Scheme == "s3" {
// handle AWS_S3_ENDPOINT env var
endpoint := env.Getenv("AWS_S3_ENDPOINT")
if endpoint != "" {
q.Set("endpoint", endpoint)
}
}

out.RawQuery = q.Encode()

return out.String()
}
28 changes: 28 additions & 0 deletions tests/integration/datasources_blob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"net"
"net/http"
"os"

"github.com/johannesboyne/gofakes3"
"github.com/johannesboyne/gofakes3/backend/s3mem"
Expand Down Expand Up @@ -140,3 +141,30 @@ func (s *BlobDatasourcesSuite) TestS3MIMETypes(c *C) {
})
result.Assert(c, icmd.Expected{ExitCode: 0, Out: "yay for yaml"})
}

func (s *BlobDatasourcesSuite) TestGCSDatasource(c *C) {
// this only works if we're authed with GCS
if os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" {
c.Skip("Not configured to authenticate with Google Cloud - skipping")
return
}
result := icmd.RunCmd(icmd.Command(GomplateBin,
"-c", "data=gs://gcp-public-data-landsat/LT08/PRE/015/013/LT80150132013127LGN01/LT80150132013127LGN01_MTL.txt?type=text/plain",
"-i", "{{ len .data }}",
), func(c *icmd.Cmd) {
})
result.Assert(c, icmd.Expected{ExitCode: 0, Out: "3218"})
}

func (s *BlobDatasourcesSuite) TestGCSDirectory(c *C) {
// this only works if we're likely to be authed with GCS
if os.Getenv("GOOGLE_APPLICATION_CREDENTIALS") == "" {
c.Skip("Not configured to authenticate with Google Cloud - skipping")
return
}
result := icmd.RunCmd(icmd.Command(GomplateBin,
"-c", "data=gs://gcp-public-data-landsat/",
"-i", "{{ coll.Has .data `index.csv.gz` }}",
), func(c *icmd.Cmd) {})
result.Assert(c, icmd.Expected{ExitCode: 0, Out: "true"})
}
202 changes: 202 additions & 0 deletions vendor/cloud.google.com/go/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 428d78c

Please sign in to comment.