Skip to content

Commit

Permalink
feat(bigtable): support update column family's value type to non-aggr…
Browse files Browse the repository at this point in the history
…egate type (#10410)

* feat(bigtable): support update column family's value type to non-aggregate type

* Fix public method doc.

* Add integration test.

* Add nil check for type.

* Add gcpolicy nil check.
  • Loading branch information
hoangpham95 authored Jul 1, 2024
1 parent 82f661d commit df60464
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 0 deletions.
31 changes: 31 additions & 0 deletions bigtable/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,9 @@ func IgnoreWarnings() GCPolicyOption {
}

func (ac *AdminClient) setGCPolicy(ctx context.Context, table, family string, policy GCPolicy, opts ...GCPolicyOption) error {
if policy == nil {
return fmt.Errorf("policy cannot be nil")
}
ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()

Expand All @@ -707,13 +710,41 @@ func (ac *AdminClient) setGCPolicy(ctx context.Context, table, family string, po
return err
}

func (ac *AdminClient) setValueTypeImpl(ctx context.Context, table, family string, valueType Type) error {
if valueType == nil {
return fmt.Errorf("value type must be non nil")
}
if _, ok := valueType.proto().GetKind().(*btapb.Type_AggregateType); ok {
return fmt.Errorf("update family value type to aggregate type is unsupported")
}

ctx = mergeOutgoingMetadata(ctx, ac.md)
prefix := ac.instancePrefix()

req := &btapb.ModifyColumnFamiliesRequest{
Name: prefix + "/tables/" + table,
Modifications: []*btapb.ModifyColumnFamiliesRequest_Modification{{
Id: family,
Mod: &btapb.ModifyColumnFamiliesRequest_Modification_Update{Update: &btapb.ColumnFamily{ValueType: valueType.proto()}},
}},
}
_, err := ac.tClient.ModifyColumnFamilies(ctx, req)
return err
}

// SetGCPolicy specifies which cells in a column family should be garbage collected.
// GC executes opportunistically in the background; table reads may return data
// matching the GC policy.
func (ac *AdminClient) SetGCPolicy(ctx context.Context, table, family string, policy GCPolicy) error {
return ac.SetGCPolicyWithOptions(ctx, table, family, policy)
}

// SetValueType specifies the type of all values in a column family. Currently,
// only non-aggregate type is supported.
func (ac *AdminClient) SetValueType(ctx context.Context, table, family string, valueType Type) error {
return ac.setValueTypeImpl(ctx, table, family, valueType)
}

// SetGCPolicyWithOptions is similar to SetGCPolicy but allows passing options
func (ac *AdminClient) SetGCPolicyWithOptions(ctx context.Context, table, family string, policy GCPolicy, opts ...GCPolicyOption) error {
return ac.setGCPolicy(ctx, table, family, policy, opts...)
Expand Down
32 changes: 32 additions & 0 deletions bigtable/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,38 @@ func TestTableAdmin_SetGcPolicy(t *testing.T) {
}
}

func TestTableAdmin_SetType(t *testing.T) {
for _, test := range []struct {
desc string
familyType Type
hasError bool
}{
{
desc: "Update with aggregate type failed",
familyType: AggregateType{Input: Int64Type{Encoding: BigEndianBytesEncoding{}}, Aggregator: SumAggregator{}},
hasError: true,
},
{
desc: "Update with string type",
familyType: StringType{Encoding: StringUtf8Encoding{}},
hasError: false,
},
{
desc: "Update with nil type",
familyType: nil,
hasError: true,
},
} {
mock := &mockTableAdminClock{}
c := setupTableClient(t, mock)

err := c.SetValueType(context.Background(), "My-table", "cf1", test.familyType)
if err != nil && !test.hasError {
t.Fatalf("Unexpected error when setting type: %v", err)
}
}
}

func TestTableAdmin_CreateAuthorizedView_DeletionProtection_Protected(t *testing.T) {
mock := &mockTableAdminClock{}
c := setupTableClient(t, mock)
Expand Down
52 changes: 52 additions & 0 deletions bigtable/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3962,6 +3962,58 @@ func TestIntegration_DataAuthorizedView(t *testing.T) {
}
}

func TestIntegration_TestUpdateColumnFamilyValueType(t *testing.T) {
testEnv, err := NewIntegrationEnv()
if err != nil {
t.Fatalf("IntegrationEnv: %v", err)
}
defer testEnv.Close()

if !testEnv.Config().UseProd {
t.Skip("emulator doesn't support update column family operation")
}

timeout := 15 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

adminClient, err := testEnv.NewAdminClient()
if err != nil {
t.Fatalf("NewAdminClient: %v", err)
}
defer adminClient.Close()

tblConf := TableConf{
TableID: testEnv.Config().Table,
ColumnFamilies: map[string]Family{
"cf": {
GCPolicy: MaxVersionsPolicy(1),
},
},
}
if err := adminClient.CreateTableFromConf(ctx, &tblConf); err != nil {
t.Fatalf("Create table from TableConf error: %v", err)
}
// Clean-up
defer deleteTable(ctx, t, adminClient, tblConf.TableID)

// Update column family type to aggregate type should not be successful
err = adminClient.SetValueType(ctx, tblConf.TableID, "cf", AggregateType{
Input: Int64Type{}, Aggregator: SumAggregator{},
})
if err == nil {
t.Fatalf("Update family type to aggregate type should not be successful")
}

// Update column family type to string type should be successful
err = adminClient.SetValueType(ctx, tblConf.TableID, "cf", StringType{
Encoding: StringUtf8Encoding{},
})
if err != nil {
t.Fatalf("Failed to update value type of family: %v", err)
}
}

// TestIntegration_DirectPathFallback tests the CFE fallback when the directpath net is blackholed.
func TestIntegration_DirectPathFallback(t *testing.T) {
ctx := context.Background()
Expand Down

0 comments on commit df60464

Please sign in to comment.