From df604641b70b4c11f119a427c5d620ca63d610f3 Mon Sep 17 00:00:00 2001 From: Hoang Pham Date: Mon, 1 Jul 2024 14:01:59 -0400 Subject: [PATCH] feat(bigtable): support update column family's value type to non-aggregate type (#10410) * feat(bigtable): support update column family's value type to non-aggregate type * Fix public method doc. * Add integration test. * Add nil check for type. * Add gcpolicy nil check. --- bigtable/admin.go | 31 +++++++++++++++++++++ bigtable/admin_test.go | 32 ++++++++++++++++++++++ bigtable/integration_test.go | 52 ++++++++++++++++++++++++++++++++++++ 3 files changed, 115 insertions(+) diff --git a/bigtable/admin.go b/bigtable/admin.go index 3e5974f1d729..1f1dac23ece2 100644 --- a/bigtable/admin.go +++ b/bigtable/admin.go @@ -686,6 +686,9 @@ func IgnoreWarnings() GCPolicyOption { } func (ac *AdminClient) setGCPolicy(ctx context.Context, table, family string, policy GCPolicy, opts ...GCPolicyOption) error { + if policy == nil { + return fmt.Errorf("policy cannot be nil") + } ctx = mergeOutgoingMetadata(ctx, ac.md) prefix := ac.instancePrefix() @@ -707,6 +710,28 @@ func (ac *AdminClient) setGCPolicy(ctx context.Context, table, family string, po return err } +func (ac *AdminClient) setValueTypeImpl(ctx context.Context, table, family string, valueType Type) error { + if valueType == nil { + return fmt.Errorf("value type must be non nil") + } + if _, ok := valueType.proto().GetKind().(*btapb.Type_AggregateType); ok { + return fmt.Errorf("update family value type to aggregate type is unsupported") + } + + ctx = mergeOutgoingMetadata(ctx, ac.md) + prefix := ac.instancePrefix() + + req := &btapb.ModifyColumnFamiliesRequest{ + Name: prefix + "/tables/" + table, + Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{ + Id: family, + Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{ValueType: valueType.proto()}}, + }}, + } + _, err := ac.tClient.ModifyColumnFamilies(ctx, req) + return err +} + // SetGCPolicy specifies which cells in a column family should be garbage collected. // GC executes opportunistically in the background; table reads may return data // matching the GC policy. @@ -714,6 +739,12 @@ func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, po return ac.SetGCPolicyWithOptions(ctx, table, family, policy) } +// SetValueType specifies the type of all values in a column family. Currently, +// only non-aggregate type is supported. +func (ac *AdminClient) SetValueType(ctx context.Context, table, family string, valueType Type) error { + return ac.setValueTypeImpl(ctx, table, family, valueType) +} + // SetGCPolicyWithOptions is similar to SetGCPolicy but allows passing options func (ac *AdminClient) SetGCPolicyWithOptions(ctx context.Context, table, family string, policy GCPolicy, opts ...GCPolicyOption) error { return ac.setGCPolicy(ctx, table, family, policy, opts...) diff --git a/bigtable/admin_test.go b/bigtable/admin_test.go index 3c4f5d00e6ed..6e0497545900 100644 --- a/bigtable/admin_test.go +++ b/bigtable/admin_test.go @@ -410,6 +410,38 @@ func TestTableAdmin_SetGcPolicy(t *testing.T) { } } +func TestTableAdmin_SetType(t *testing.T) { + for _, test := range []struct { + desc string + familyType Type + hasError bool + }{ + { + desc: "Update with aggregate type failed", + familyType: AggregateType{Input: Int64Type{Encoding: BigEndianBytesEncoding{}}, Aggregator: SumAggregator{}}, + hasError: true, + }, + { + desc: "Update with string type", + familyType: StringType{Encoding: StringUtf8Encoding{}}, + hasError: false, + }, + { + desc: "Update with nil type", + familyType: nil, + hasError: true, + }, + } { + mock := &mockTableAdminClock{} + c := setupTableClient(t, mock) + + err := c.SetValueType(context.Background(), "My-table", "cf1", test.familyType) + if err != nil && !test.hasError { + t.Fatalf("Unexpected error when setting type: %v", err) + } + } +} + func TestTableAdmin_CreateAuthorizedView_DeletionProtection_Protected(t *testing.T) { mock := &mockTableAdminClock{} c := setupTableClient(t, mock) diff --git a/bigtable/integration_test.go b/bigtable/integration_test.go index e87b343e6c88..70104c1f1984 100644 --- a/bigtable/integration_test.go +++ b/bigtable/integration_test.go @@ -3962,6 +3962,58 @@ func TestIntegration_DataAuthorizedView(t *testing.T) { } } +func TestIntegration_TestUpdateColumnFamilyValueType(t *testing.T) { + testEnv, err := NewIntegrationEnv() + if err != nil { + t.Fatalf("IntegrationEnv: %v", err) + } + defer testEnv.Close() + + if !testEnv.Config().UseProd { + t.Skip("emulator doesn't support update column family operation") + } + + timeout := 15 * time.Minute + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + adminClient, err := testEnv.NewAdminClient() + if err != nil { + t.Fatalf("NewAdminClient: %v", err) + } + defer adminClient.Close() + + tblConf := TableConf{ + TableID: testEnv.Config().Table, + ColumnFamilies: map[string]Family{ + "cf": { + GCPolicy: MaxVersionsPolicy(1), + }, + }, + } + if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil { + t.Fatalf("Create table from TableConf error: %v", err) + } + // Clean-up + defer deleteTable(ctx, t, adminClient, tblConf.TableID) + + // Update column family type to aggregate type should not be successful + err = adminClient.SetValueType(ctx, tblConf.TableID, "cf", AggregateType{ + Input: Int64Type{}, Aggregator: SumAggregator{}, + }) + if err == nil { + t.Fatalf("Update family type to aggregate type should not be successful") + } + + // Update column family type to string type should be successful + err = adminClient.SetValueType(ctx, tblConf.TableID, "cf", StringType{ + Encoding: StringUtf8Encoding{}, + }) + if err != nil { + t.Fatalf("Failed to update value type of family: %v", err) + } +} + // TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed. func TestIntegration_DirectPathFallback(t *testing.T) { ctx := context.Background()