Skip to content

Commit

Permalink
Merge pull request #282 from k1LoW/separate-datasource-go
Browse files Browse the repository at this point in the history
Separate datasource/datasource.go
  • Loading branch information
k1LoW authored Jan 31, 2021
2 parents a34204e + fea5986 commit 1d90700
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 156 deletions.
1 change: 1 addition & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ coverage:
target: 65%
patch: off
ignore:
- "datasource/gcp.go"
- "drivers/bq"
- "drivers/spanner"
- "schema/json.go"
Expand Down
65 changes: 65 additions & 0 deletions datasource/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package datasource

import (
"context"
"fmt"
"net/url"
"os"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/k1LoW/tbls/drivers/dynamo"
"github.com/k1LoW/tbls/schema"
)

// AnalizeDynamodb analyze `dynamodb://`
func AnalyzeDynamodb(urlstr string) (*schema.Schema, error) {
s := &schema.Schema{}
u, err := url.Parse(urlstr)
if err != nil {
return s, err
}

values := u.Query()
err = setEnvAWSCredentials(values)
if err != nil {
return s, err
}

region := u.Host

sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))

config := aws.NewConfig().WithRegion(region)
if os.Getenv("AWS_ENDPOINT_URL") != "" {
config = config.WithEndpoint(os.Getenv("AWS_ENDPOINT_URL"))
}

client := dynamodb.New(sess, config)
ctx := context.Background()

driver, err := dynamo.New(ctx, client)
if err != nil {
return s, err
}

s.Name = fmt.Sprintf("Amazon DynamoDB (%s)", region)
err = driver.Analyze(s)
if err != nil {
return s, err
}
return s, nil
}

func setEnvAWSCredentials(values url.Values) error {
for k := range values {
if strings.HasPrefix(k, "aws_") {
return os.Setenv(strings.ToUpper(k), values.Get(k))
}
}
return nil
}
156 changes: 0 additions & 156 deletions datasource/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,23 @@ package datasource

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path/filepath"
"strings"
"time"

"cloud.google.com/go/bigquery"
cloudspanner "cloud.google.com/go/spanner"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/k1LoW/tbls/config"
"github.com/k1LoW/tbls/drivers"
"github.com/k1LoW/tbls/drivers/bq"
"github.com/k1LoW/tbls/drivers/dynamo"
"github.com/k1LoW/tbls/drivers/mariadb"
"github.com/k1LoW/tbls/drivers/mssql"
"github.com/k1LoW/tbls/drivers/mysql"
"github.com/k1LoW/tbls/drivers/postgres"
"github.com/k1LoW/tbls/drivers/redshift"
"github.com/k1LoW/tbls/drivers/snowflake"
"github.com/k1LoW/tbls/drivers/spanner"
"github.com/k1LoW/tbls/drivers/sqlite"
"github.com/k1LoW/tbls/schema"
"github.com/pkg/errors"
Expand Down Expand Up @@ -198,149 +188,3 @@ func AnalyzeJSONStringOrFile(strOrPath string) (s *schema.Schema, err error) {
}
return s, nil
}

// AnalyzeBigquery analyze `bq://`
func AnalyzeBigquery(urlstr string) (*schema.Schema, error) {
s := &schema.Schema{}
ctx := context.Background()
client, projectID, datasetID, err := NewBigqueryClient(ctx, urlstr)
if err != nil {
return s, err
}
defer client.Close()

s.Name = fmt.Sprintf("%s:%s", projectID, datasetID)
driver, err := bq.New(ctx, client, datasetID)
if err != nil {
return s, err
}
err = driver.Analyze(s)
if err != nil {
return s, err
}
return s, nil
}

// NewBigqueryClient returns new bigquery.Client
func NewBigqueryClient(ctx context.Context, urlstr string) (*bigquery.Client, string, string, error) {
u, err := url.Parse(urlstr)
if err != nil {
return nil, "", "", err
}
values := u.Query()
err = setEnvGoogleApplicationCredentials(values)
if err != nil {
return nil, "", "", err
}

splitted := strings.Split(u.Path, "/")

projectID := u.Host
datasetID := splitted[1]

client, err := bigquery.NewClient(ctx, projectID)
return client, projectID, datasetID, err
}

// AnalyzeSpanner analyze `spanner://`
func AnalyzeSpanner(urlstr string) (*schema.Schema, error) {
s := &schema.Schema{}
u, err := url.Parse(urlstr)
if err != nil {
return s, err
}

values := u.Query()
err = setEnvGoogleApplicationCredentials(values)
if err != nil {
return s, err
}

splitted := strings.Split(u.Path, "/")
projectID := u.Host
instanceID := splitted[1]
databaseID := splitted[2]

db := fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, databaseID)
ctx := context.Background()
client, err := cloudspanner.NewClient(ctx, db)
if err != nil {
return s, err
}
defer client.Close()
s.Name = db

driver, err := spanner.New(ctx, client)
if err != nil {
return s, err
}
err = driver.Analyze(s)
if err != nil {
return s, err
}
return s, nil
}

// AnalizeDynamodb analyze `dynamodb://`
func AnalyzeDynamodb(urlstr string) (*schema.Schema, error) {
s := &schema.Schema{}
u, err := url.Parse(urlstr)
if err != nil {
return s, err
}

values := u.Query()
err = setEnvAWSCredentials(values)
if err != nil {
return s, err
}

region := u.Host

sess := session.Must(session.NewSessionWithOptions(session.Options{
SharedConfigState: session.SharedConfigEnable,
}))

config := aws.NewConfig().WithRegion(region)
if os.Getenv("AWS_ENDPOINT_URL") != "" {
config = config.WithEndpoint(os.Getenv("AWS_ENDPOINT_URL"))
}

client := dynamodb.New(sess, config)
ctx := context.Background()

driver, err := dynamo.New(ctx, client)
if err != nil {
return s, err
}

s.Name = fmt.Sprintf("Amazon DynamoDB (%s)", region)
err = driver.Analyze(s)
if err != nil {
return s, err
}
return s, nil
}

func setEnvGoogleApplicationCredentials(values url.Values) error {
keys := []string{
"google_application_credentials",
"credentials",
"creds",
}
for _, k := range keys {
if values.Get(k) != "" {
return os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", values.Get(k))
}
}
return nil
}

func setEnvAWSCredentials(values url.Values) error {
for k := range values {
if strings.HasPrefix(k, "aws_") {
return os.Setenv(strings.ToUpper(k), values.Get(k))
}
}
return nil
}
111 changes: 111 additions & 0 deletions datasource/gcp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package datasource

import (
"context"
"fmt"
"net/url"
"os"
"strings"

"cloud.google.com/go/bigquery"
cloudspanner "cloud.google.com/go/spanner"
"github.com/k1LoW/tbls/drivers/bq"
"github.com/k1LoW/tbls/drivers/spanner"
"github.com/k1LoW/tbls/schema"
)

// AnalyzeBigquery analyze `bq://`
func AnalyzeBigquery(urlstr string) (*schema.Schema, error) {
s := &schema.Schema{}
ctx := context.Background()
client, projectID, datasetID, err := NewBigqueryClient(ctx, urlstr)
if err != nil {
return s, err
}
defer client.Close()

s.Name = fmt.Sprintf("%s:%s", projectID, datasetID)
driver, err := bq.New(ctx, client, datasetID)
if err != nil {
return s, err
}
err = driver.Analyze(s)
if err != nil {
return s, err
}
return s, nil
}

// NewBigqueryClient returns new bigquery.Client
func NewBigqueryClient(ctx context.Context, urlstr string) (*bigquery.Client, string, string, error) {
u, err := url.Parse(urlstr)
if err != nil {
return nil, "", "", err
}
values := u.Query()
err = setEnvGoogleApplicationCredentials(values)
if err != nil {
return nil, "", "", err
}

splitted := strings.Split(u.Path, "/")

projectID := u.Host
datasetID := splitted[1]

client, err := bigquery.NewClient(ctx, projectID)
return client, projectID, datasetID, err
}

// AnalyzeSpanner analyze `spanner://`
func AnalyzeSpanner(urlstr string) (*schema.Schema, error) {
s := &schema.Schema{}
u, err := url.Parse(urlstr)
if err != nil {
return s, err
}

values := u.Query()
err = setEnvGoogleApplicationCredentials(values)
if err != nil {
return s, err
}

splitted := strings.Split(u.Path, "/")
projectID := u.Host
instanceID := splitted[1]
databaseID := splitted[2]

db := fmt.Sprintf("projects/%s/instances/%s/databases/%s", projectID, instanceID, databaseID)
ctx := context.Background()
client, err := cloudspanner.NewClient(ctx, db)
if err != nil {
return s, err
}
defer client.Close()
s.Name = db

driver, err := spanner.New(ctx, client)
if err != nil {
return s, err
}
err = driver.Analyze(s)
if err != nil {
return s, err
}
return s, nil
}

func setEnvGoogleApplicationCredentials(values url.Values) error {
keys := []string{
"google_application_credentials",
"credentials",
"creds",
}
for _, k := range keys {
if values.Get(k) != "" {
return os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", values.Get(k))
}
}
return nil
}

0 comments on commit 1d90700

Please sign in to comment.