Skip to content

Commit

Permalink
Merge branch 'main' into spanner_read_row_with_options
Browse files Browse the repository at this point in the history
  • Loading branch information
rahul2393 authored Dec 22, 2021
2 parents 1605b95 + 0a81fbc commit b81a887
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 24 deletions.
21 changes: 0 additions & 21 deletions bigquery/storage/managedwriter/appendresult.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

// NoStreamOffset is a sentinel value for signalling we're not tracking
Expand Down Expand Up @@ -114,23 +113,3 @@ func (pw *pendingWrite) markDone(startOffset int64, err error, fc *flowControlle
fc.release(pw.reqSize)
}
}

// AppendOption are options that can be passed when appending data with a managed stream instance.
type AppendOption func(*pendingWrite)

// UpdateSchemaDescriptor is used to update the descriptor message schema associated
// with a given stream.
func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption {
return func(pw *pendingWrite) {
pw.newSchema = schema
}
}

// WithOffset sets an explicit offset value for this append request.
func WithOffset(offset int64) AppendOption {
return func(pw *pendingWrite) {
pw.request.Offset = &wrapperspb.Int64Value{
Value: offset,
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package managedwriter
import (
"github.com/googleapis/gax-go/v2"
"google.golang.org/protobuf/types/descriptorpb"
"google.golang.org/protobuf/types/known/wrapperspb"
)

// WriterOption are variadic options used to configure a ManagedStream instance.
Expand Down Expand Up @@ -96,3 +97,23 @@ func WithAppendRowsCallOption(o gax.CallOption) WriterOption {
ms.callOptions = append(ms.callOptions, o)
}
}

// AppendOption are options that can be passed when appending data with a managed stream instance.
type AppendOption func(*pendingWrite)

// UpdateSchemaDescriptor is used to update the descriptor message schema associated
// with a given stream.
func UpdateSchemaDescriptor(schema *descriptorpb.DescriptorProto) AppendOption {
return func(pw *pendingWrite) {
pw.newSchema = schema
}
}

// WithOffset sets an explicit offset value for this append request.
func WithOffset(offset int64) AppendOption {
return func(pw *pendingWrite) {
pw.request.Offset = &wrapperspb.Int64Value{
Value: offset,
}
}
}
34 changes: 34 additions & 0 deletions spanner/spannertest/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,40 @@ var functions = map[string]function{
return timestamp, spansql.Type{Base: spansql.Timestamp}, nil
},
},
"FARM_FINGERPRINT": {
Eval: func(values []interface{}, types []spansql.Type) (interface{}, spansql.Type, error) {
// Check input values first.
if len(values) != 1 {
return nil, spansql.Type{}, status.Error(codes.InvalidArgument, "No matching signature for function FARM_FINGERPRINT for the given argument types")
}
if values[0] == nil {
return int64(1), spansql.Type{Base: spansql.Int64}, nil
}
if _, ok := values[0].(string); !ok {
return nil, spansql.Type{}, status.Error(codes.InvalidArgument, "No matching signature for function FARM_FINGERPRINT for the given argument types")
}
// This function currently has no implementation and always returns
// same value, as it would otherwise require an fingerprint function
return int64(1), spansql.Type{Base: spansql.Int64}, nil
},
},
"MOD": {
Eval: func(values []interface{}, types []spansql.Type) (interface{}, spansql.Type, error) {
// Check input values first.
if len(values) != 2 {
return nil, spansql.Type{}, status.Error(codes.InvalidArgument, "No matching signature for function MOD for the given argument types")
}
x, okArg1 := values[0].(int64)
y, okArg2 := values[1].(int64)
if !(okArg1 && okArg2) {
return nil, spansql.Type{}, status.Error(codes.InvalidArgument, "No matching signature for function MOD for the given argument types")
}
if y == 0 {
return nil, spansql.Type{}, status.Error(codes.OutOfRange, "Division by zero")
}
return x % y, spansql.Type{Base: spansql.Int64}, nil
},
},
}

func cast(values []interface{}, types []spansql.Type, safe bool) (interface{}, spansql.Type, error) {
Expand Down
4 changes: 2 additions & 2 deletions spanner/spannertest/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,9 +763,9 @@ func TestIntegration_ReadsAndQueries(t *testing.T) {
}{
{

`SELECT 17, "sweet", TRUE AND FALSE, NULL, B"hello", STARTS_WITH('Foo', 'B'), STARTS_WITH('Bar', 'B'), CAST(17 AS STRING), SAFE_CAST(TRUE AS STRING), SAFE_CAST('Foo' AS INT64), EXTRACT(DATE FROM TIMESTAMP('2008-12-25T05:30:00Z') AT TIME ZONE 'Europe/Amsterdam'), EXTRACT(YEAR FROM TIMESTAMP('2008-12-25T05:30:00Z'))`,
`SELECT 17, "sweet", TRUE AND FALSE, NULL, B"hello", STARTS_WITH('Foo', 'B'), STARTS_WITH('Bar', 'B'), CAST(17 AS STRING), SAFE_CAST(TRUE AS STRING), SAFE_CAST('Foo' AS INT64), EXTRACT(DATE FROM TIMESTAMP('2008-12-25T05:30:00Z') AT TIME ZONE 'Europe/Amsterdam'), EXTRACT(YEAR FROM TIMESTAMP('2008-12-25T05:30:00Z')), FARM_FINGERPRINT('test'), MOD(5, 10)`,
nil,
[][]interface{}{{int64(17), "sweet", false, nil, []byte("hello"), false, true, "17", "true", nil, civil.Date{Year: 2008, Month: 12, Day: 25}, int64(2008)}},
[][]interface{}{{int64(17), "sweet", false, nil, []byte("hello"), false, true, "17", "true", nil, civil.Date{Year: 2008, Month: 12, Day: 25}, int64(2008), int64(1), int64(5)}},
},
// Check handling of NULL values for the IS operator.
// There was a bug that returned errors for some of these cases.
Expand Down
1 change: 1 addition & 0 deletions spanner/spansql/keywords.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ var allFuncs = []string{

// Mathematical functions.
"ABS",
"MOD",

// Hash functions.
"FARM_FINGERPRINT",
Expand Down
10 changes: 9 additions & 1 deletion spanner/spansql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ func TestParseDDL(t *testing.T) {
some_time TIMESTAMP NOT NULL,
number_key INT64 AS (SAFE_CAST(SUBSTR(some_string, 2) AS INT64)) STORED,
generated_date DATE AS (EXTRACT(DATE FROM some_time AT TIME ZONE "CET")) STORED,
shard_id INT64 AS (MOD(FARM_FINGERPRINT(user_id), 19)) STORED,
) PRIMARY KEY(user_id);
-- Trailing comment at end of file.
Expand Down Expand Up @@ -763,6 +764,13 @@ func TestParseDDL(t *testing.T) {
}},
Position: line(71),
},
{
Name: "shard_id", Type: Type{Base: Int64},
Generated: Func{Name: "MOD", Args: []Expr{
Func{Name: "FARM_FINGERPRINT", Args: []Expr{ID("user_id")}}, IntegerLiteral(19),
}},
Position: line(72),
},
},
PrimaryKey: []KeyPart{{Column: "user_id"}},
Position: line(66),
Expand All @@ -789,7 +797,7 @@ func TestParseDDL(t *testing.T) {
{Marker: "--", Isolated: true, Start: line(49), End: line(49), Text: []string{"Table with row deletion policy."}},

// Comment after everything else.
{Marker: "--", Isolated: true, Start: line(74), End: line(74), Text: []string{"Trailing comment at end of file."}},
{Marker: "--", Isolated: true, Start: line(75), End: line(75), Text: []string{"Trailing comment at end of file."}},
}}},
// No trailing comma:
{`ALTER TABLE T ADD COLUMN C2 INT64`, &DDL{Filename: "filename", List: []DDLStmt{
Expand Down

0 comments on commit b81a887

Please sign in to comment.