Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Add an external prober for BigQuery. #348

Merged
merged 2 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 90 additions & 0 deletions contrib/gcp/bigquery/bigquery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2020 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package bigquery connects to the GCP BigQuery API and computes metrics suitable
for blackbox-probing.
*/
package bigquery

import (
"context"
"fmt"

"cloud.google.com/go/bigquery"
)

// The QueryRunner interface encapsulates the BigQuery API.
//
// Query() runs the given query on BQ and returns the result and an error, if
// any.
type QueryRunner interface {
Query(context.Context, string) (string, error)
}

type bigqueryRunner struct {
client *bigquery.Client
}

// Query takes a string in BQL and executes it on BigQuery, returning the
// result.
func (r *bigqueryRunner) Query(ctx context.Context, query string) (string, error) {
q := r.client.Query(query)

it, err := q.Read(ctx)
if err != nil {
return "", err
}
var row []bigquery.Value

if err = it.Next(&row); err != nil {
return "", err
}
return fmt.Sprint(row[0]), nil
}

// NewRunner creates a new BQ QueryRunner associated with the given projectID.
func NewRunner(ctx context.Context, projectID string) (QueryRunner, error) {
client, err := bigquery.NewClient(ctx, projectID)
if err != nil {
return nil, err
}
return &bigqueryRunner{client}, nil
}

// Probe executes queries against BQ and returns the resulting metrics.
// If no table is specified, Probe will execute a dummy query against BQ (such
// as 'SELECT 1'). Otherwise, table should specify an existing table in the
// format 'dataset.table'.
//
// The service account cloudprober runs under should have sufficient permissions
// to read the table.
func Probe(ctx context.Context, r QueryRunner, table string) (string, error) {

var metrics, result string
var err error

if table == "" {
result, err = r.Query(ctx, "SELECT 1")
metrics = fmt.Sprintf("%s %s", "bigquery_connect", result)
} else {
result, err = r.Query(ctx, "SELECT COUNT(*) FROM "+table)
metrics = fmt.Sprintf("%s %s", "row_count", result)
}

if err != nil {
return "", err
}
return metrics, nil
}
47 changes: 47 additions & 0 deletions contrib/gcp/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package bigquery

import (
"context"
"errors"
"testing"
)

type stubRunner struct {
result, query string
err error
}

func (f *stubRunner) Query(ctx context.Context, query string) (string, error) {
f.query = query
return f.result, f.err
}

func TestProbe(t *testing.T) {
probeTests := []struct {
table, expQuery string
result string
err error
expMetrics string
}{
{"", "SELECT 1", "1", nil, "bigquery_connect 1"},
{"ds.table", "SELECT COUNT(*) FROM ds.table", "500", nil, "row_count 500"},
{"", "SELECT 1", "", errors.New("connection error"), ""},
}

for _, pt := range probeTests {
f := stubRunner{
result: pt.result,
err: pt.err,
}
metrics, err := Probe(context.Background(), &f, pt.table)
if err != pt.err {
t.Errorf("Probe(table=%#v): mismatched error, got %#v want %#v", pt.table, err, pt.err)
}
if metrics != pt.expMetrics {
t.Errorf("Probe(table=%#v) = %#v, want %#v", pt.table, metrics, pt.expMetrics)
}
if f.query != pt.expQuery {
t.Errorf("Probe(table=%#v): unexpected BQL, got: %#v, want %#v", pt.table, f.query, pt.expQuery)
}
}
}
89 changes: 89 additions & 0 deletions contrib/gcp/cmd/bigquery_probe.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright 2020 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Bigquery_probe is an external Cloudprober probe suitable for blackbox-probing
the GCP BigQuery API.

This binary assumes that the environment variable
$GOOGLE_APPLICATION_CREDENTIALS has been set following the instructions at
https://cloud.google.com/docs/authentication/production
*/
package main

import (
"context"
"fmt"

"flag"
"github.com/golang/glog"
"github.com/golang/protobuf/proto"
"github.com/google/cloudprober/contrib/gcp/bigquery"
serverpb "github.com/google/cloudprober/probes/external/proto"
"github.com/google/cloudprober/probes/external/serverutils"
)

var (
projectID = flag.String("project_id", "", "GCP project ID to connect to.")
serverMode = flag.Bool("server_mode", false, "Whether to run in server mode.")
table = flag.String("table", "", "Table to probe (specified as \"dataset.table\"). "+
"If empty, the probe will simply try to connect to BQ and execute 'SELECT 1'. "+
"If running in server mode, the 'table' option from the configuration will override this flag.")
)

func parseProbeRequest(request *serverpb.ProbeRequest) map[string]string {
m := make(map[string]string)
for _, opt := range request.Options {
m[*opt.Name] = *opt.Value
}
return m
}

func main() {
flag.Parse()

if *projectID == "" {
glog.Exitf("--project_id must be specified")
}

dstTable := *table
ctx := context.Background()
runner, err := bigquery.NewRunner(ctx, *projectID)
if err != nil {
glog.Fatal(err)
}

if *serverMode {
serverutils.Serve(func(request *serverpb.ProbeRequest, reply *serverpb.ProbeReply) {

opts := parseProbeRequest(request)
if val, ok := opts["table"]; ok {
dstTable = val
glog.Infof("--table set to %q by ProbeRequest config", val)
}
payload, err := bigquery.Probe(ctx, runner, dstTable)
reply.Payload = proto.String(payload)
if err != nil {
reply.ErrorMessage = proto.String(err.Error())
}
})
}

payload, err := bigquery.Probe(ctx, runner, dstTable)
if err != nil {
glog.Fatal(err)
}
fmt.Println(payload)

}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.13

require (
cloud.google.com/go v0.48.0
cloud.google.com/go/bigquery v1.0.1
cloud.google.com/go/logging v1.0.0
cloud.google.com/go/pubsub v1.0.1
github.com/aws/aws-sdk-go v1.25.37
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTj
cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0=
cloud.google.com/go v0.48.0 h1:6ZHYIRlohUdU4LrLHbTsReY1eYy/MoZW1FsEyBuMXsk=
cloud.google.com/go v0.48.0/go.mod h1:gGOnoa/XMQYHAscREBlbdHduGchEaP9N0//OXdrPI/M=
cloud.google.com/go/bigquery v1.0.1 h1:hL+ycaJpVE9M7nLoiXb/Pn10ENE2u+oddxbD8uu0ZVU=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/logging v1.0.0 h1:kaunpnoEh9L4hu6JUsBa8Y20LBfKnCuDhKUgdZp7oK8=
Expand Down