Skip to content

Commit

Permalink
Support replica materialized view in resource_bigquery_table in the b…
Browse files Browse the repository at this point in the history
…eta provider (#9773)

* support replica materialized view in resource_bigquery_table

* fix typo

* add unit tests

* fix white spaces

* fix the condition check for table_replication_info in Update

* create source materialized view in unit tests using DDL instead of table API

* add missing google-beta provider declarations in unit test

* create source table using DDL in unit test

* Revert "create source table using DDL in unit test"

This reverts commit 232a7ac.

* reorder input args in unit tests

* remove wrong project declaration in unit tests

* remove update check, make the new fields forcenew, and add default to replication_interval_ms

* add ForceNew to table_replication_info
  • Loading branch information
wj-chen authored Jan 17, 2024
1 parent 5d303ae commit 256f7f5
Show file tree
Hide file tree
Showing 2 changed files with 439 additions and 49 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
<% autogen_exception -%>
package bigquery

import (
Expand Down Expand Up @@ -528,7 +529,7 @@ func ResourceBigQueryTable() *schema.Resource {
Default: "NONE",
Description: `The compression type of the data source. Valid values are "NONE" or "GZIP".`,
},
// Schema: Optional] The schema for the data.
// Schema: [Optional] The schema for the data.
// Schema is required for CSV and JSON formats if autodetect is not on.
// Schema is disallowed for Google Cloud Bigtable, Cloud Datastore backups, Avro, Iceberg, ORC, and Parquet formats.
"schema": {
Expand Down Expand Up @@ -887,7 +888,7 @@ func ResourceBigQueryTable() *schema.Resource {
Type: schema.TypeInt,
Default: 1800000,
Optional: true,
Description: `Specifies maximum frequency at which this materialized view will be refreshed. The default is 1800000`,
Description: `Specifies maximum frequency at which this materialized view will be refreshed. The default is 1800000.`,
},

"allow_non_incremental_definition": {
Expand Down Expand Up @@ -1250,6 +1251,45 @@ func ResourceBigQueryTable() *schema.Resource {
},
},
},
<% unless version == 'ga' -%>
// TableReplicationInfo: [Optional] Replication info of a table created using `AS REPLICA` DDL like: `CREATE MATERIALIZED VIEW mv1 AS REPLICA OF src_mv`.
"table_replication_info": {
Type: schema.TypeList,
Optional: true,
ForceNew: true,
MaxItems: 1,
Description: `Replication info of a table created using "AS REPLICA" DDL like: "CREATE MATERIALIZED VIEW mv1 AS REPLICA OF src_mv".`,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"source_project_id": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
Description: `The ID of the source project.`,
},
"source_dataset_id": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
Description: `The ID of the source dataset.`,
},
"source_table_id": {
Type: schema.TypeString,
Required: true,
ForceNew: true,
Description: `The ID of the source materialized view.`,
},
"replication_interval_ms": {
Type: schema.TypeInt,
Default: 300000,
Optional: true,
ForceNew: true,
Description: `The interval at which the source materialized view is polled for updates. The default is 300000.`,
},
},
},
},
<% end -%>
},
UseJSONNumber: true,
}
Expand Down Expand Up @@ -1386,9 +1426,51 @@ func resourceBigQueryTableCreate(d *schema.ResourceData, meta interface{}) error

datasetID := d.Get("dataset_id").(string)

<% unless version == 'ga' -%>
if v, ok := d.GetOk("table_replication_info"); ok {
if table.Schema != nil || table.View != nil || table.MaterializedView != nil {
return errors.New("Schema, view, or materialized view cannot be specified when table replication info is present")
}

replicationDDL := fmt.Sprintf("CREATE MATERIALIZED VIEW %s.%s.%s", d.Get("project").(string), d.Get("dataset_id").(string), d.Get("table_id").(string))

tableReplicationInfo := expandTableReplicationInfo(v)
replicationIntervalMs := tableReplicationInfo["replication_interval_ms"].(int64)
if replicationIntervalMs > 0 {
replicationIntervalSeconds := replicationIntervalMs / 1000
replicationDDL = fmt.Sprintf("%s OPTIONS(replication_interval_seconds=%d)", replicationDDL, replicationIntervalSeconds)
}

replicationDDL = fmt.Sprintf("%s AS REPLICA OF %s.%s.%s", replicationDDL, tableReplicationInfo["source_project_id"], tableReplicationInfo["source_dataset_id"], tableReplicationInfo["source_table_id"])
useLegacySQL := false

req := &bigquery.QueryRequest{
Query: replicationDDL,
UseLegacySql: &useLegacySQL,
}

log.Printf("[INFO] Creating a replica materialized view with DDL: '%s'", replicationDDL)

_, err := config.NewBigQueryClient(userAgent).Jobs.Query(project, req).Do()

id := fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, datasetID, d.Get("table_id").(string))
if err != nil {
if deleteErr := resourceBigQueryTableDelete(d, meta); deleteErr != nil {
log.Printf("[INFO] Unable to clean up table %s: %s", id, deleteErr)
}
return err
}

log.Printf("[INFO] BigQuery table %s has been created", id)
d.SetId(id)

return resourceBigQueryTableRead(d, meta)
}

<% end -%>
if table.View != nil && table.Schema != nil {

log.Printf("[INFO] Removing schema from table definition because big query does not support setting schema on view creation")
log.Printf("[INFO] Removing schema from table definition because BigQuery does not support setting schema on view creation")
schemaBack := table.Schema
table.Schema = nil

Expand All @@ -1408,7 +1490,7 @@ func resourceBigQueryTableCreate(d *schema.ResourceData, meta interface{}) error
return err
}

log.Printf("[INFO] BigQuery table %s has been update with schema", res.Id)
log.Printf("[INFO] BigQuery table %s has been updated with schema", res.Id)
} else {
log.Printf("[INFO] Creating BigQuery table: %s", table.TableReference.TableId)

Expand Down Expand Up @@ -1597,6 +1679,34 @@ func resourceBigQueryTableRead(d *schema.ResourceData, meta interface{}) error {
}
}

<% unless version == 'ga' -%>
// TODO: Update when the Get API fields for TableReplicationInfo are available in the client library.
url, err := tpgresource.ReplaceVars(d, config, "{{BigQueryBasePath}}projects/{{project}}/datasets/{{dataset_id}}/tables/{{table_id}}")
if err != nil {
return err
}

log.Printf("[INFO] Reading BigQuery table through API: %s", url)

getRes, err := transport_tpg.SendRequest(transport_tpg.SendRequestOptions{
Config: config,
Method: "GET",
RawURL: url,
UserAgent: userAgent,
})
if err != nil {
return err
}

if v, ok := getRes["tableReplicationInfo"]; ok {
tableReplicationInfo := flattenTableReplicationInfo(v.(map[string]interface{}))

if err := d.Set("table_replication_info", tableReplicationInfo); err != nil {
return fmt.Errorf("Error setting table replication info: %s", err)
}
}

<% end -%>
return nil
}

Expand Down Expand Up @@ -2387,6 +2497,58 @@ func flattenTableConstraints(edc *bigquery.TableConstraints) []map[string]interf
return []map[string]interface{}{result}
}

<% unless version == 'ga' -%>
func expandTableReplicationInfo(cfg interface{}) map[string]interface{} {
raw := cfg.([]interface{})[0].(map[string]interface{})

result := map[string]interface{}{}

if v, ok := raw["source_project_id"]; ok {
result["source_project_id"] = v.(string)
}

if v, ok := raw["source_dataset_id"]; ok {
result["source_dataset_id"] = v.(string)
}

if v, ok := raw["source_table_id"]; ok {
result["source_table_id"] = v.(string)
}

if v, ok := raw["replication_interval_ms"]; ok {
result["replication_interval_ms"] = int64(v.(int))
}

return result
}

func flattenTableReplicationInfo(tableReplicationInfo map[string]interface{}) []map[string]interface{} {
result := map[string]interface{}{}

if v, ok := tableReplicationInfo["sourceTable"]; ok {
sourceTable := v.(map[string]interface{})
if v, ok := sourceTable["projectId"]; ok {
result["source_project_id"] = v.(string)
}
if v, ok := sourceTable["datasetId"]; ok {
result["source_dataset_id"] = v.(string)
}
if v, ok := sourceTable["tableId"]; ok {
result["source_table_id"] = v.(string)
}
}

if v, ok := tableReplicationInfo["replicationIntervalMs"]; ok {
replicationIntervalMs := v.(string)
if i, err := strconv.Atoi(replicationIntervalMs); err == nil {
result["replication_interval_ms"] = int64(i)
}
}

return []map[string]interface{}{result}
}

<% end -%>
func resourceBigQueryTableImport(d *schema.ResourceData, meta interface{}) ([]*schema.ResourceData, error) {
config := meta.(*transport_tpg.Config)
if err := tpgresource.ParseImportId([]string{
Expand Down
Loading

0 comments on commit 256f7f5

Please sign in to comment.