diff --git a/spanner/batch.go b/spanner/batch.go index 82346bfcebd6..a7e3a5a63052 100644 --- a/spanner/batch.go +++ b/spanner/batch.go @@ -23,8 +23,12 @@ import ( "log" "time" + "cloud.google.com/go/internal/trace" "github.com/golang/protobuf/proto" + "github.com/googleapis/gax-go/v2" sppb "google.golang.org/genproto/googleapis/spanner/v1" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) // BatchReadOnlyTransaction is a ReadOnlyTransaction that allows for exporting @@ -128,6 +132,7 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex if err != nil { return nil, err } + var md metadata.MD resp, err = client.PartitionRead(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.PartitionReadRequest{ Session: sid, Transaction: ts, @@ -136,7 +141,13 @@ func (t *BatchReadOnlyTransaction) PartitionReadUsingIndexWithOptions(ctx contex Columns: columns, KeySet: kset, PartitionOptions: opt.toProto(), - }) + }, gax.WithGRPCOptions(grpc.Header(&md))) + + if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "PartitionReadUsingIndexWithOptions"); err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) + } + } // Prepare ReadRequest. req := &sppb.ReadRequest{ Session: sid, @@ -180,6 +191,7 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement if err != nil { return nil, err } + var md metadata.MD // request Partitions req := &sppb.PartitionQueryRequest{ @@ -190,7 +202,13 @@ func (t *BatchReadOnlyTransaction) partitionQuery(ctx context.Context, statement Params: params, ParamTypes: paramTypes, } - resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req) + resp, err := client.PartitionQuery(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md))) + + if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "partitionQuery"); err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) + } + } // prepare ExecuteSqlRequest r := &sppb.ExecuteSqlRequest{ @@ -251,7 +269,16 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) { } t.sh = nil sid, client := sh.getID(), sh.getClient() - err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}) + + var md metadata.MD + err := client.DeleteSession(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.DeleteSessionRequest{Name: sid}, gax.WithGRPCOptions(grpc.Header(&md))) + + if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Cleanup"); err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) + } + } + if err != nil { var logger *log.Logger if sh.session != nil { @@ -280,7 +307,7 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R // Read or query partition. if p.rreq != nil { rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { - return client.StreamingRead(ctx, &sppb.ReadRequest{ + client, err := client.StreamingRead(ctx, &sppb.ReadRequest{ Session: p.rreq.Session, Transaction: p.rreq.Transaction, Table: p.rreq.Table, @@ -291,10 +318,20 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R RequestOptions: p.rreq.RequestOptions, ResumeToken: resumeToken, }) + if err != nil { + return client, err + } + md, err := client.Header() + if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) + } + } + return client, err } } else { rpc = func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { - return client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{ + client, err := client.ExecuteStreamingSql(ctx, &sppb.ExecuteSqlRequest{ Session: p.qreq.Session, Transaction: p.qreq.Transaction, Sql: p.qreq.Sql, @@ -305,6 +342,17 @@ func (t *BatchReadOnlyTransaction) Execute(ctx context.Context, p *Partition) *R RequestOptions: p.qreq.RequestOptions, ResumeToken: resumeToken, }) + if err != nil { + return client, err + } + md, err := client.Header() + + if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "Execute"); err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) + } + } + return client, err } } return stream( diff --git a/spanner/client.go b/spanner/client.go index 254468a0a90a..97a1d2ed4f3a 100644 --- a/spanner/client.go +++ b/spanner/client.go @@ -80,6 +80,7 @@ type Client struct { idleSessions *sessionPool logger *log.Logger qo QueryOptions + ct *commonTags } // DatabaseName returns the full name of a database, e.g., @@ -204,6 +205,7 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf idleSessions: sp, logger: config.logger, qo: getQueryOptions(config.QueryOptions), + ct: getCommonTags(sc), } return c, nil } @@ -280,6 +282,7 @@ func (c *Client) Single() *ReadOnlyTransaction { t.sh = sh return nil } + t.ct = c.ct return t } @@ -300,6 +303,7 @@ func (c *Client) ReadOnlyTransaction() *ReadOnlyTransaction { t.txReadOnly.sp = c.idleSessions t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo + t.ct = c.ct return t } @@ -368,6 +372,7 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo + t.ct = c.ct return t, nil } @@ -395,6 +400,7 @@ func (c *Client) BatchReadOnlyTransactionFromID(tid BatchReadOnlyTransactionID) t.txReadOnly.sh = sh t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo + t.ct = c.ct return t } @@ -480,6 +486,7 @@ func (c *Client) rwTransaction(ctx context.Context, f func(context.Context, *Rea t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo t.txOpts = options + t.ct = c.ct trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())}, "Starting transaction attempt") diff --git a/spanner/go.mod b/spanner/go.mod index f1b666102901..f4fa3d50c1c6 100644 --- a/spanner/go.mod +++ b/spanner/go.mod @@ -3,6 +3,7 @@ module cloud.google.com/go/spanner go 1.11 require ( + github.com/census-instrumentation/opencensus-proto v0.3.0 // indirect cloud.google.com/go v0.99.0 github.com/golang/protobuf v1.5.2 github.com/google/go-cmp v0.5.6 diff --git a/spanner/go.sum b/spanner/go.sum index c307e5ee4055..c6112ee74283 100644 --- a/spanner/go.sum +++ b/spanner/go.sum @@ -50,8 +50,9 @@ github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/census-instrumentation/opencensus-proto v0.3.0 h1:t/LhUZLVitR1Ow2YOnduCsavhwFUklBMoGVYUCqmCqk= +github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= diff --git a/spanner/integration_test.go b/spanner/integration_test.go index 632e21dde256..f7018c846da0 100644 --- a/spanner/integration_test.go +++ b/spanner/integration_test.go @@ -39,6 +39,7 @@ import ( "cloud.google.com/go/internal/uid" database "cloud.google.com/go/spanner/admin/database/apiv1" instance "cloud.google.com/go/spanner/admin/instance/apiv1" + "go.opencensus.io/stats/view" "google.golang.org/api/iterator" "google.golang.org/api/option" adminpb "google.golang.org/genproto/googleapis/spanner/admin/database/v1" @@ -3264,6 +3265,85 @@ func TestIntegration_DirectPathFallback(t *testing.T) { } } +func TestIntegration_GFE_Latency(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + te := testutil.NewTestExporter(GFEHeaderMissingCountView, GFELatencyView) + GFELatencyMetricsEnabled = true + + client, _, cleanup := prepareIntegrationTest(ctx, t, DefaultSessionPoolConfig, singerDBStatements) + defer cleanup() + + singerColumns := []string{"SingerId", "FirstName", "LastName"} + var ms = []*Mutation{ + InsertOrUpdate("Singers", singerColumns, []interface{}{1, "Marc", "Richards"}), + } + _, err := client.Apply(ctx, ms) + if err != nil { + t.Fatalf("Could not insert rows to table. Got error %v", err) + } + _, err = client.Single().ReadRow(ctx, "Singers", Key{1}, []string{"SingerId", "FirstName", "LastName"}) + if err != nil { + t.Fatalf("Could not read row. Got error %v", err) + } + waitErr := &Error{} + waitFor(t, func() error { + select { + case stat := <-te.Stats: + if len(stat.Rows) > 0 { + return nil + } + } + return waitErr + }) + + var viewMap = map[string]bool{statsPrefix + "gfe_latency": false, + statsPrefix + "gfe_header_missing_count": false, + } + + for { + if viewMap[statsPrefix+"gfe_latency"] || viewMap[statsPrefix+"gfe_header_missing_count"] { + break + } + select { + case stat := <-te.Stats: + if len(stat.Rows) == 0 { + t.Fatal("No metrics are exported") + } + if stat.View.Measure.Name() != statsPrefix+"gfe_latency" && stat.View.Measure.Name() != statsPrefix+"gfe_header_missing_count" { + t.Fatalf("Incorrect measure: got %v, want %v", stat.View.Measure.Name(), statsPrefix+"gfe_latency or "+statsPrefix+"gfe_header_missing_count") + } else { + viewMap[stat.View.Measure.Name()] = true + } + for _, row := range stat.Rows { + m := getTagMap(row.Tags) + checkCommonTagsGFELatency(t, m) + var data string + switch row.Data.(type) { + default: + data = fmt.Sprintf("%v", row.Data) + case *view.CountData: + data = fmt.Sprintf("%v", row.Data.(*view.CountData).Value) + case *view.LastValueData: + data = fmt.Sprintf("%v", row.Data.(*view.LastValueData).Value) + case *view.DistributionData: + data = fmt.Sprintf("%v", row.Data.(*view.DistributionData).Count) + } + if got, want := fmt.Sprintf("%v", data), "0"; got <= want { + t.Fatalf("Incorrect data: got %v, wanted more than %v for metric %v", got, want, stat.View.Measure.Name()) + } + } + case <-time.After(10 * time.Second): + if !viewMap[statsPrefix+"gfe_latency"] && !viewMap[statsPrefix+"gfe_header_missing_count"] { + t.Fatal("no stats were exported before timeout") + } + } + } + DisableGfeLatencyAndHeaderMissingCountViews() +} + // Prepare initializes Cloud Spanner testing DB and clients. func prepareIntegrationTest(ctx context.Context, t *testing.T, spc SessionPoolConfig, statements []string) (*Client, string, func()) { if databaseAdmin == nil { diff --git a/spanner/internal/testutil/inmem_spanner_server.go b/spanner/internal/testutil/inmem_spanner_server.go index ac6f9bdfd043..8dd34e8d80e0 100644 --- a/spanner/internal/testutil/inmem_spanner_server.go +++ b/spanner/internal/testutil/inmem_spanner_server.go @@ -32,7 +32,9 @@ import ( "google.golang.org/genproto/googleapis/rpc/errdetails" "google.golang.org/genproto/googleapis/rpc/status" spannerpb "google.golang.org/genproto/googleapis/spanner/v1" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" gstatus "google.golang.org/grpc/status" ) @@ -687,6 +689,10 @@ func (s *inMemSpannerServer) BatchCreateSessions(ctx context.Context, req *spann s.totalSessionsCreated++ s.sessions[sessionName] = sessions[i] } + header := metadata.New(map[string]string{"server-timing": "gfet4t7; dur=123"}) + if err := grpc.SendHeader(ctx, header); err != nil { + return nil, gstatus.Errorf(codes.Internal, "unable to send 'server-timing' header") + } return &spannerpb.BatchCreateSessionsResponse{Session: sessions}, nil } @@ -922,6 +928,10 @@ func (s *inMemSpannerServer) Read(ctx context.Context, req *spannerpb.ReadReques } s.receivedRequests <- req s.mu.Unlock() + header := metadata.New(map[string]string{"server-timing": "gfet4t7; dur=123"}) + if err := grpc.SendHeader(ctx, header); err != nil { + return nil, gstatus.Errorf(codes.Internal, "unable to send 'server-timing' header") + } return nil, gstatus.Error(codes.Unimplemented, "Method not yet implemented") } diff --git a/spanner/oc_test.go b/spanner/oc_test.go index 638b32abb2d2..d4ddc2baeba3 100644 --- a/spanner/oc_test.go +++ b/spanner/oc_test.go @@ -25,8 +25,11 @@ import ( "cloud.google.com/go/internal/testutil" "cloud.google.com/go/internal/version" stestutil "cloud.google.com/go/spanner/internal/testutil" + structpb "github.com/golang/protobuf/ptypes/struct" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "google.golang.org/api/iterator" + spannerpb "google.golang.org/genproto/googleapis/spanner/v1" ) // Check that stats are being exported. @@ -261,6 +264,91 @@ func TestOCStats_SessionPool_GetSessionTimeoutsCount(t *testing.T) { } } +func TestOCStats_GFE_Latency(t *testing.T) { + te := testutil.NewTestExporter([]*view.View{GFELatencyView, GFEHeaderMissingCountView}...) + defer te.Unregister() + + GFELatencyMetricsEnabled = true + + server, client, teardown := setupMockedTestServer(t) + defer teardown() + + if err := server.TestSpanner.PutStatementResult("SELECT email FROM Users", &stestutil.StatementResult{ + Type: stestutil.StatementResultResultSet, + ResultSet: &spannerpb.ResultSet{ + Metadata: &spannerpb.ResultSetMetadata{ + RowType: &spannerpb.StructType{ + Fields: []*spannerpb.StructType_Field{ + { + Name: "email", + Type: &spannerpb.Type{Code: spannerpb.TypeCode_STRING}, + }, + }, + }, + }, + Rows: []*structpb.ListValue{ + {Values: []*structpb.Value{{ + Kind: &structpb.Value_StringValue{StringValue: "test@test.com"}, + }}}, + }, + }, + }); err != nil { + t.Fatalf("could not add result: %v", err) + } + iter := client.Single().Read(context.Background(), "Users", AllKeys(), []string{"email"}) + for { + _, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + t.Fatal(err.Error()) + break + } + } + + waitErr := &Error{} + waitFor(t, func() error { + select { + case stat := <-te.Stats: + if len(stat.Rows) > 0 { + return nil + } + } + return waitErr + }) + + // Wait until we see data from the view. + select { + case stat := <-te.Stats: + if len(stat.Rows) == 0 { + t.Fatal("No metrics are exported") + } + if stat.View.Measure.Name() != statsPrefix+"gfe_latency" && stat.View.Measure.Name() != statsPrefix+"gfe_header_missing_count" { + t.Fatalf("Incorrect measure: got %v, want %v", stat.View.Measure.Name(), statsPrefix+"gfe_header_missing_count or "+statsPrefix+"gfe_latency") + } + row := stat.Rows[0] + m := getTagMap(row.Tags) + checkCommonTags(t, m) + var data string + switch row.Data.(type) { + default: + data = fmt.Sprintf("%v", row.Data) + case *view.CountData: + data = fmt.Sprintf("%v", row.Data.(*view.CountData).Value) + case *view.LastValueData: + data = fmt.Sprintf("%v", row.Data.(*view.LastValueData).Value) + case *view.DistributionData: + data = fmt.Sprintf("%v", row.Data.(*view.DistributionData).Count) + } + if got, want := fmt.Sprintf("%v", data), "0"; got <= want { + t.Fatalf("Incorrect data: got %v, wanted more than %v for metric %v", got, want, stat.View.Measure.Name()) + } + case <-time.After(1 * time.Second): + t.Fatal("no stats were exported before timeout") + } + +} func getTagMap(tags []tag.Tag) map[tag.Key]string { m := make(map[tag.Key]string) for _, t := range tags { diff --git a/spanner/pdml.go b/spanner/pdml.go index d24a8767ab90..18b494ac9565 100644 --- a/spanner/pdml.go +++ b/spanner/pdml.go @@ -19,8 +19,11 @@ import ( "cloud.google.com/go/internal/trace" "github.com/googleapis/gax-go/v2" + "go.opencensus.io/tag" sppb "google.golang.org/genproto/googleapis/spanner/v1" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" ) // PartitionedUpdate executes a DML statement in parallel across the database, @@ -100,6 +103,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt // // Note that PDML transactions cannot be committed or rolled back. func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlRequest) (count int64, err error) { + var md metadata.MD // Begin transaction. res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ Session: sh.getID(), @@ -114,10 +118,17 @@ func executePdml(ctx context.Context, sh *sessionHandle, req *sppb.ExecuteSqlReq req.Transaction = &sppb.TransactionSelector{ Selector: &sppb.TransactionSelector_Id{Id: res.Id}, } - resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req) + resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md))) + if GFELatencyMetricsEnabled && md != nil && sh.session.pool != nil { + err := captureGFELatencyStats(tag.NewContext(ctx, sh.session.pool.tagMap), md, "executePdml_ExecuteSql") + if err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) + } + } if err != nil { return 0, err } + if resultSet.Stats == nil { return 0, spannerErrorf(codes.InvalidArgument, "query passed to Update: %q", req.Sql) } diff --git a/spanner/sessionclient.go b/spanner/sessionclient.go index 4646db577efa..22838e43e968 100644 --- a/spanner/sessionclient.go +++ b/spanner/sessionclient.go @@ -28,9 +28,11 @@ import ( "cloud.google.com/go/internal/version" vkit "cloud.google.com/go/spanner/apiv1" "github.com/googleapis/gax-go/v2" + "go.opencensus.io/tag" "google.golang.org/api/option" gtransport "google.golang.org/api/transport/grpc" sppb "google.golang.org/genproto/googleapis/spanner/v1" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" ) @@ -130,10 +132,31 @@ func (sc *sessionClient) createSession(ctx context.Context) (*session, error) { return nil, err } ctx = contextWithOutgoingMetadata(ctx, sc.md) + var md metadata.MD sid, err := client.CreateSession(ctx, &sppb.CreateSessionRequest{ Database: sc.database, Session: &sppb.Session{Labels: sc.sessionLabels}, - }) + }, gax.WithGRPCOptions(grpc.Header(&md))) + + if GFELatencyMetricsEnabled && md != nil { + _, instance, database, err := parseDatabaseName(sc.database) + if err != nil { + return nil, ToSpannerError(err) + } + ctxGFE, err := tag.New(ctx, + tag.Upsert(tagKeyClientID, sc.id), + tag.Upsert(tagKeyDatabase, database), + tag.Upsert(tagKeyInstance, instance), + tag.Upsert(tagKeyLibVersion, version.Repo), + ) + if err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", ToSpannerError(err)) + } + err = captureGFELatencyStats(ctxGFE, md, "createSession") + if err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", ToSpannerError(err)) + } + } if err != nil { return nil, ToSpannerError(err) } @@ -230,11 +253,33 @@ func (sc *sessionClient) executeBatchCreateSessions(client *vkit.Client, createC consumer.sessionCreationFailed(ToSpannerError(ctx.Err()), remainingCreateCount) break } + var mdForGFELatency metadata.MD response, err := client.BatchCreateSessions(ctx, &sppb.BatchCreateSessionsRequest{ SessionCount: remainingCreateCount, Database: sc.database, SessionTemplate: &sppb.Session{Labels: labels}, - }) + }, gax.WithGRPCOptions(grpc.Header(&mdForGFELatency))) + + if GFELatencyMetricsEnabled && mdForGFELatency != nil { + _, instance, database, err := parseDatabaseName(sc.database) + if err != nil { + trace.TracePrintf(ctx, nil, "Error getting instance and database name: %v", err) + } + // Errors should not prevent initializing the session pool. + ctxGFE, err := tag.New(ctx, + tag.Upsert(tagKeyClientID, sc.id), + tag.Upsert(tagKeyDatabase, database), + tag.Upsert(tagKeyInstance, instance), + tag.Upsert(tagKeyLibVersion, version.Repo), + ) + if err != nil { + trace.TracePrintf(ctx, nil, "Error in adding tags in BatchCreateSessions for GFE Latency: %v", err) + } + err = captureGFELatencyStats(ctxGFE, mdForGFELatency, "executeBatchCreateSessions") + if err != nil { + trace.TracePrintf(ctx, nil, "Error in Capturing GFE Latency and Header Missing count. Try disabling and rerunning. Error: %v", err) + } + } if err != nil { trace.TracePrintf(ctx, nil, "Error creating a batch of %d sessions: %v", remainingCreateCount, err) consumer.sessionCreationFailed(ToSpannerError(err), remainingCreateCount) diff --git a/spanner/stats.go b/spanner/stats.go index 72d09e05072b..09a4b91697d9 100644 --- a/spanner/stats.go +++ b/spanner/stats.go @@ -16,10 +16,15 @@ package spanner import ( "context" + "strconv" + "strings" + "testing" + "cloud.google.com/go/internal/version" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "google.golang.org/grpc/metadata" ) const statsPrefix = "cloud.google.com/go/spanner/" @@ -36,6 +41,9 @@ var ( tagNumBeingPrepared = tag.Tag{Key: tagKeyType, Value: "num_sessions_being_prepared"} tagNumReadSessions = tag.Tag{Key: tagKeyType, Value: "num_read_sessions"} tagNumWriteSessions = tag.Tag{Key: tagKeyType, Value: "num_write_prepared_sessions"} + tagKeyMethod = tag.MustNewKey("grpc_client_method") + // GFELatencyMetricsEnabled is used to track if GFELatency and GFEHeaderMissingCount need to be recorded + GFELatencyMetricsEnabled = false ) func recordStat(ctx context.Context, m *stats.Int64Measure, n int64) { @@ -153,6 +161,41 @@ var ( Aggregation: view.Count(), TagKeys: tagCommonKeys, } + + // GFELatency is the latency between Google's network receiving an RPC and reading back the first byte of the response + GFELatency = stats.Int64( + statsPrefix+"gfe_latency", + "Latency between Google's network receiving an RPC and reading back the first byte of the response", + stats.UnitMilliseconds, + ) + + // GFELatencyView is the view of distribution of GFELatency values + GFELatencyView = &view.View{ + Name: "cloud.google.com/go/spanner/gfe_latency", + Measure: GFELatency, + Description: "Latency between Google's network receives an RPC and reads back the first byte of the response", + Aggregation: view.Distribution(0.0, 0.01, 0.05, 0.1, 0.3, 0.6, 0.8, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 13.0, + 16.0, 20.0, 25.0, 30.0, 40.0, 50.0, 65.0, 80.0, 100.0, 130.0, 160.0, 200.0, 250.0, + 300.0, 400.0, 500.0, 650.0, 800.0, 1000.0, 2000.0, 5000.0, 10000.0, 20000.0, 50000.0, + 100000.0), + TagKeys: append(tagCommonKeys, tagKeyMethod), + } + + // GFEHeaderMissingCount is the number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network + GFEHeaderMissingCount = stats.Int64( + statsPrefix+"gfe_header_missing_count", + "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", + stats.UnitDimensionless, + ) + + // GFEHeaderMissingCountView is the view of number of GFEHeaderMissingCount + GFEHeaderMissingCountView = &view.View{ + Name: "cloud.google.com/go/spanner/gfe_header_missing_count", + Measure: GFEHeaderMissingCount, + Description: "Number of RPC responses received without the server-timing header, most likely means that the RPC never reached Google's network", + Aggregation: view.Count(), + TagKeys: append(tagCommonKeys, tagKeyMethod), + } ) // EnableStatViews enables all views of metrics relate to session management. @@ -167,3 +210,108 @@ func EnableStatViews() error { ReleasedSessionsCountView, ) } + +// EnableGfeLatencyView enables GFELatency metric +func EnableGfeLatencyView() error { + GFELatencyMetricsEnabled = true + return view.Register(GFELatencyView) +} + +// EnableGfeHeaderMissingCountView enables GFEHeaderMissingCount metric +func EnableGfeHeaderMissingCountView() error { + GFELatencyMetricsEnabled = true + return view.Register(GFEHeaderMissingCountView) +} + +// EnableGfeLatencyAndHeaderMissingCountViews enables GFEHeaderMissingCount and GFELatency metric +func EnableGfeLatencyAndHeaderMissingCountViews() error { + GFELatencyMetricsEnabled = true + return view.Register( + GFELatencyView, + GFEHeaderMissingCountView, + ) +} + +// DisableGfeLatencyAndHeaderMissingCountViews disables GFEHeaderMissingCount and GFELatency metric +func DisableGfeLatencyAndHeaderMissingCountViews() { + GFELatencyMetricsEnabled = false + view.Unregister( + GFELatencyView, + GFEHeaderMissingCountView, + ) +} + +func captureGFELatencyStats(ctx context.Context, md metadata.MD, keyMethod string) error { + if len(md.Get("server-timing")) == 0 { + recordStat(ctx, GFEHeaderMissingCount, 1) + return nil + } + serverTiming := md.Get("server-timing")[0] + gfeLatency, err := strconv.Atoi(strings.TrimPrefix(serverTiming, "gfet4t7; dur=")) + if !strings.HasPrefix(serverTiming, "gfet4t7; dur=") || err != nil { + return err + } + // Record GFE latency with OpenCensus. + ctx = tag.NewContext(ctx, tag.FromContext(ctx)) + ctx, err = tag.New(ctx, tag.Insert(tagKeyMethod, keyMethod)) + if err != nil { + return err + } + recordStat(ctx, GFELatency, int64(gfeLatency)) + return nil +} + +func checkCommonTagsGFELatency(t *testing.T, m map[tag.Key]string) { + // We only check prefix because client ID increases if we create + // multiple clients for the same database. + if !strings.HasPrefix(m[tagKeyClientID], "client") { + t.Fatalf("Incorrect client ID: %v", m[tagKeyClientID]) + } + if !strings.HasPrefix(m[tagKeyInstance], "gotest") { + t.Fatalf("Incorrect instance ID: %v", m[tagKeyInstance]) + } + if !strings.HasPrefix(m[tagKeyDatabase], "gotest") { + t.Fatalf("Incorrect database ID: %v", m[tagKeyDatabase]) + } + if m[tagKeyLibVersion] != version.Repo { + t.Fatalf("Incorrect library version: %v", m[tagKeyLibVersion]) + } +} + +func createContextAndCaptureGFELatencyMetrics(ctx context.Context, ct *commonTags, md metadata.MD, keyMethod string) error { + var ctxGFE, err = tag.New(ctx, + tag.Upsert(tagKeyClientID, ct.clientID), + tag.Upsert(tagKeyDatabase, ct.database), + tag.Upsert(tagKeyInstance, ct.instance), + tag.Upsert(tagKeyLibVersion, ct.libVersion), + ) + if err != nil { + return err + } + return captureGFELatencyStats(ctxGFE, md, keyMethod) +} + +func getCommonTags(sc *sessionClient) *commonTags { + _, instance, database, err := parseDatabaseName(sc.database) + if err != nil { + return nil + } + return &commonTags{ + clientID: sc.id, + database: database, + instance: instance, + libVersion: version.Repo, + } +} + +// commonTags are common key-value pairs of data associated with the GFELatency measure +type commonTags struct { + // Client ID + clientID string + // Database Name + database string + // Instance ID + instance string + // Library Version + libVersion string +} diff --git a/spanner/transaction.go b/spanner/transaction.go index 4ab2e76a2122..6a9702cfc31a 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -25,9 +25,12 @@ import ( "cloud.google.com/go/internal/trace" vkit "cloud.google.com/go/spanner/apiv1" "github.com/golang/protobuf/proto" + "github.com/googleapis/gax-go/v2" "google.golang.org/api/iterator" sppb "google.golang.org/genproto/googleapis/spanner/v1" + "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" ) @@ -75,6 +78,9 @@ type txReadOnly struct { // txOpts provides options for a transaction. txOpts TransactionOptions + + // commonTags for opencensus metrics + ct *commonTags } // TransactionOptions provides options for a transaction. @@ -172,7 +178,7 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key contextWithOutgoingMetadata(ctx, sh.getMetadata()), sh.session.logger, func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { - return client.StreamingRead(ctx, + client, err := client.StreamingRead(ctx, &sppb.ReadRequest{ Session: t.sh.getID(), Transaction: ts, @@ -184,6 +190,16 @@ func (t *txReadOnly) ReadWithOptions(ctx context.Context, table string, keys Key Limit: int64(limit), RequestOptions: createRequestOptions(prio, requestTag, t.txOpts.TransactionTag), }) + if err != nil { + return client, err + } + md, err := client.Header() + if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "ReadWithOptions"); err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) + } + } + return client, err }, t.replaceSessionFunc, t.setTimestamp, @@ -381,7 +397,17 @@ func (t *txReadOnly) query(ctx context.Context, statement Statement, options Que func(ctx context.Context, resumeToken []byte) (streamingReceiver, error) { req.ResumeToken = resumeToken req.Session = t.sh.getID() - return client.ExecuteStreamingSql(ctx, req) + client, err := client.ExecuteStreamingSql(ctx, req) + if err != nil { + return client, err + } + md, err := client.Header() + if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "query"); err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) + } + } + return client, err }, t.replaceSessionFunc, t.setTimestamp, @@ -541,6 +567,7 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { if err != nil { return err } + var md metadata.MD res, err = sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{ Session: sh.getID(), Options: &sppb.TransactionOptions{ @@ -548,7 +575,14 @@ func (t *ReadOnlyTransaction) begin(ctx context.Context) error { ReadOnly: buildTransactionOptionsReadOnly(t.getTimestampBound(), true), }, }, - }) + }, gax.WithGRPCOptions(grpc.Header(&md))) + + if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "begin_BeginTransaction"); err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) + } + } + if isSessionNotFoundError(err) { sh.destroy() continue @@ -894,7 +928,14 @@ func (t *ReadWriteTransaction) update(ctx context.Context, stmt Statement, opts if err != nil { return 0, err } - resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req) + var md metadata.MD + resultSet, err := sh.getClient().ExecuteSql(contextWithOutgoingMetadata(ctx, sh.getMetadata()), req, gax.WithGRPCOptions(grpc.Header(&md))) + + if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "update"); err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", err) + } + } if err != nil { return 0, ToSpannerError(err) } @@ -956,13 +997,20 @@ func (t *ReadWriteTransaction) batchUpdateWithOptions(ctx context.Context, stmts }) } + var md metadata.MD resp, err := sh.getClient().ExecuteBatchDml(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.ExecuteBatchDmlRequest{ Session: sh.getID(), Transaction: ts, Statements: sppbStmts, Seqno: atomic.AddInt64(&t.sequenceNumber, 1), RequestOptions: createRequestOptions(opts.Priority, opts.RequestTag, t.txOpts.TransactionTag), - }) + }, gax.WithGRPCOptions(grpc.Header(&md))) + + if GFELatencyMetricsEnabled && md != nil && t.ct != nil { + if err := createContextAndCaptureGFELatencyMetrics(ctx, t.ct, md, "batchUpdateWithOptions"); err != nil { + trace.TracePrintf(ctx, nil, "Error in recording GFE Latency. Try disabling and rerunning. Error: %v", ToSpannerError(err)) + } + } if err != nil { return nil, ToSpannerError(err) } @@ -1218,6 +1266,7 @@ func NewReadWriteStmtBasedTransactionWithOptions(ctx context.Context, c *Client, t.txReadOnly.txReadEnv = t t.txReadOnly.qo = c.qo t.txOpts = options + t.ct = c.ct if err = t.begin(ctx); err != nil { if sh != nil {