Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update google_dataflow_flex_template_job to send sdkpipeline parameters via environment field #6357

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changelog/9031.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
```release-note:bug
`dataflow`: fixed permadiff when SdkPipeline values are supplied via parameters.
```
```release-note:bug
`dataflow`: fixed max_workers read value permanently displaying as 0.
```
```release-note:bug
`dataflow`: fixed issue causing error message when max_workers and num_workers were supplied via parameters.
```
241 changes: 220 additions & 21 deletions google-beta/services/dataflow/resource_dataflow_flex_template_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"log"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -111,6 +112,7 @@ func ResourceDataflowFlexTemplateJob() *schema.Resource {
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
Computed: true,
Description: `The initial number of Google Compute Engine instances for the job.`,
},

Expand All @@ -119,6 +121,7 @@ func ResourceDataflowFlexTemplateJob() *schema.Resource {
Optional: true,
// ForceNew applies to both stream and batch jobs
ForceNew: true,
Computed: true,
Description: `The maximum number of Google Compute Engine instances to be made available to your pipeline during execution, from 1 to 1000.`,
},

Expand Down Expand Up @@ -146,32 +149,37 @@ func ResourceDataflowFlexTemplateJob() *schema.Resource {
"sdk_container_image": {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: `Docker registry location of container image to use for the 'worker harness. Default is the container for the version of the SDK. Note this field is only valid for portable pipelines.`,
},

"network": {
Type: schema.TypeString,
Optional: true,
Computed: true,
DiffSuppressFunc: tpgresource.CompareSelfLinkOrResourceName,
Description: `The network to which VMs will be assigned. If it is not provided, "default" will be used.`,
},

"subnetwork": {
Type: schema.TypeString,
Optional: true,
Computed: true,
DiffSuppressFunc: tpgresource.CompareSelfLinkOrResourceName,
Description: `The subnetwork to which VMs will be assigned. Should be of the form "regions/REGION/subnetworks/SUBNETWORK".`,
},

"machine_type": {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: `The machine type to use for the job.`,
},

"kms_key_name": {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: `The name for the Cloud KMS key for the job. Key format is: projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING/cryptoKeys/KEY`,
},

Expand Down Expand Up @@ -202,12 +210,14 @@ func ResourceDataflowFlexTemplateJob() *schema.Resource {
"autoscaling_algorithm": {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: `The algorithm to use for autoscaling`,
},

"launcher_machine_type": {
Type: schema.TypeString,
Optional: true,
Computed: true,
Description: `The machine type to use for launching the job. The default is n1-standard-1.`,
},

Expand Down Expand Up @@ -240,7 +250,7 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
return err
}

env, err := resourceDataflowFlexJobSetupEnv(d, config)
env, updatedParameters, err := resourceDataflowFlexJobSetupEnv(d, config)
if err != nil {
return err
}
Expand All @@ -249,7 +259,7 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
LaunchParameter: &dataflow.LaunchFlexTemplateParameter{
ContainerSpecGcsPath: d.Get("container_spec_gcs_path").(string),
JobName: d.Get("name").(string),
Parameters: tpgresource.ExpandStringMap(d, "parameters"),
Parameters: updatedParameters,
Environment: &env,
},
}
Expand All @@ -275,29 +285,92 @@ func resourceDataflowFlexTemplateJobCreate(d *schema.ResourceData, meta interfac
return resourceDataflowFlexTemplateJobRead(d, meta)
}

func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, error) {
func resourceDataflowFlexJobSetupEnv(d *schema.ResourceData, config *transport_tpg.Config) (dataflow.FlexTemplateRuntimeEnvironment, map[string]string, error) {

updatedParameters := tpgresource.ExpandStringMap(d, "parameters")

additionalExperiments := tpgresource.ConvertStringSet(d.Get("additional_experiments").(*schema.Set))

var autoscalingAlgorithm string
autoscalingAlgorithm, updatedParameters = dataflowFlexJobTypeTransferVar("autoscaling_algorithm", "autoscalingAlgorithm", updatedParameters, d)

var numWorkers int
if p, ok := d.GetOk("parameters.numWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.numWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "numWorkers")
numWorkers = number
} else {
if v, ok := d.GetOk("num_workers"); ok {
numWorkers = v.(int)
}
}

var maxNumWorkers int
if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
number, err := strconv.Atoi(p.(string))
if err != nil {
return dataflow.FlexTemplateRuntimeEnvironment{}, updatedParameters, fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
delete(updatedParameters, "maxNumWorkers")
maxNumWorkers = number
} else {
if v, ok := d.GetOk("max_workers"); ok {
maxNumWorkers = v.(int)
}
}

network, updatedParameters := dataflowFlexJobTypeTransferVar("network", "network", updatedParameters, d)

serviceAccountEmail, updatedParameters := dataflowFlexJobTypeTransferVar("service_account_email", "serviceAccountEmail", updatedParameters, d)

subnetwork, updatedParameters := dataflowFlexJobTypeTransferVar("subnetwork", "subnetwork", updatedParameters, d)

tempLocation, updatedParameters := dataflowFlexJobTypeTransferVar("temp_location", "tempLocation", updatedParameters, d)

stagingLocation, updatedParameters := dataflowFlexJobTypeTransferVar("staging_location", "stagingLocation", updatedParameters, d)

machineType, updatedParameters := dataflowFlexJobTypeTransferVar("machine_type", "workerMachineType", updatedParameters, d)

kmsKeyName, updatedParameters := dataflowFlexJobTypeTransferVar("kms_key_name", "kmsKeyName", updatedParameters, d)

ipConfiguration, updatedParameters := dataflowFlexJobTypeTransferVar("ip_configuration", "ipConfiguration", updatedParameters, d)

var enableStreamingEngine bool
if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
delete(updatedParameters, "enableStreamingEngine")
enableStreamingEngine = p.(bool)
} else {
if v, ok := d.GetOk("enable_streaming_engine"); ok {
enableStreamingEngine = v.(bool)
}
}

sdkContainerImage, updatedParameters := dataflowFlexJobTypeTransferVar("sdk_container_image", "sdkContainerImage", updatedParameters, d)

launcherMachineType, updatedParameters := dataflowFlexJobTypeTransferVar("launcher_machine_type", "launcherMachineType", updatedParameters, d)

env := dataflow.FlexTemplateRuntimeEnvironment{
AdditionalUserLabels: tpgresource.ExpandStringMap(d, "labels"),
AutoscalingAlgorithm: d.Get("autoscaling_algorithm").(string),
NumWorkers: int64(d.Get("num_workers").(int)),
MaxWorkers: int64(d.Get("max_workers").(int)),
Network: d.Get("network").(string),
ServiceAccountEmail: d.Get("service_account_email").(string),
Subnetwork: d.Get("subnetwork").(string),
TempLocation: d.Get("temp_location").(string),
StagingLocation: d.Get("staging_location").(string),
MachineType: d.Get("machine_type").(string),
KmsKeyName: d.Get("kms_key_name").(string),
IpConfiguration: d.Get("ip_configuration").(string),
EnableStreamingEngine: d.Get("enable_streaming_engine").(bool),
AutoscalingAlgorithm: autoscalingAlgorithm,
NumWorkers: int64(numWorkers),
MaxWorkers: int64(maxNumWorkers),
Network: network,
ServiceAccountEmail: serviceAccountEmail,
Subnetwork: subnetwork,
TempLocation: tempLocation,
StagingLocation: stagingLocation,
MachineType: machineType,
KmsKeyName: kmsKeyName,
IpConfiguration: ipConfiguration,
EnableStreamingEngine: enableStreamingEngine,
AdditionalExperiments: additionalExperiments,
SdkContainerImage: d.Get("sdk_container_image").(string),
LauncherMachineType: d.Get("launcher_machine_type").(string),
SdkContainerImage: sdkContainerImage,
LauncherMachineType: launcherMachineType,
}
return env, nil
return env, updatedParameters, nil
}

// resourceDataflowFlexTemplateJobRead reads a Flex Template Job resource.
Expand Down Expand Up @@ -365,7 +438,7 @@ func resourceDataflowFlexTemplateJobRead(d *schema.ResourceData, meta interface{
if err := d.Set("num_workers", optionsMap["numWorkers"]); err != nil {
return fmt.Errorf("Error setting num_workers: %s", err)
}
if err := d.Set("max_workers", optionsMap["maxWorkers"]); err != nil {
if err := d.Set("max_workers", optionsMap["maxNumWorkers"]); err != nil {
return fmt.Errorf("Error setting max_workers: %s", err)
}
if err := d.Set("staging_location", optionsMap["stagingLocation"]); err != nil {
Expand Down Expand Up @@ -456,7 +529,7 @@ func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interfac

tnamemapping := tpgresource.ExpandStringMap(d, "transform_name_mapping")

env, err := resourceDataflowFlexJobSetupEnv(d, config)
env, updatedParameters, err := resourceDataflowFlexJobSetupEnv(d, config)
if err != nil {
return err
}
Expand All @@ -472,7 +545,7 @@ func resourceDataflowFlexTemplateJobUpdate(d *schema.ResourceData, meta interfac

ContainerSpecGcsPath: d.Get("container_spec_gcs_path").(string),
JobName: d.Get("name").(string),
Parameters: tpgresource.ExpandStringMap(d, "parameters"),
Parameters: updatedParameters,
TransformNameMappings: tnamemapping,
Environment: &env,
Update: true,
Expand Down Expand Up @@ -585,6 +658,100 @@ func resourceDataflowFlexTemplateJobDelete(d *schema.ResourceData, meta interfac
}

func resourceDataflowFlexJobTypeCustomizeDiff(_ context.Context, d *schema.ResourceDiff, meta interface{}) error {

err := dataflowFlexJobTypeParameterOverride("autoscaling_algorithm", "autoscalingAlgorithm", d)
if err != nil {
return err
}

if p, ok := d.GetOk("parameters.numWorkers"); ok {
if d.HasChange("num_workers") {
e := d.Get("num_workers")
return fmt.Errorf("Error setting num_workers, value is supplied twice: num_workers=%d, parameters.numWorkers=%d", e.(int), p.(int))
} else {
p := d.Get("parameters.numWorkers")
number, err := strconv.Atoi(p.(string))
if err != nil {
return fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
d.SetNew("num_workers", number)
}
}

if p, ok := d.GetOk("parameters.maxNumWorkers"); ok {
if d.HasChange("max_workers") {
e := d.Get("max_workers")
return fmt.Errorf("Error setting max_workers, value is supplied twice: max_workers=%d, parameters.maxNumWorkers=%d", e.(int), p.(int))
} else {
p := d.Get("parameters.maxNumWorkers")
number, err := strconv.Atoi(p.(string))
if err != nil {
return fmt.Errorf("parameters.maxNumWorkers must have a valid integer assigned to it, current value is %s", p.(string))
}
d.SetNew("max_workers", number)
}
}

err = dataflowFlexJobTypeParameterOverride("network", "network", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("service_account_email", "serviceAccountEmail", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("subnetwork", "subnetwork", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("temp_location", "tempLocation", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("staging_location", "stagingLocation", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("machine_type", "workerMachineType", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("kms_key_name", "kmsKeyName", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("ip_configuration", "ipConfiguration", d)
if err != nil {
return err
}

if p, ok := d.GetOk("parameters.enableStreamingEngine"); ok {
if d.HasChange("enable_streaming_engine") {
e := d.Get("enable_streaming_engine")
return fmt.Errorf("Error setting enable_streaming_engine, value is supplied twice: enable_streaming_engine=%t, parameters.enableStreamingEngine=%t", e.(bool), p.(bool))
} else {
p := d.Get("parameters.enableStreamingEngine")
d.SetNew("enable_streaming_engine", p.(string))
}
}

err = dataflowFlexJobTypeParameterOverride("sdk_container_image", "sdkContainerImage", d)
if err != nil {
return err
}

err = dataflowFlexJobTypeParameterOverride("launcher_machine_type", "launcherMachineType", d)
if err != nil {
return err
}

// All non-virtual fields are ForceNew for batch jobs
if d.Get("type") == "JOB_TYPE_BATCH" {
resourceSchema := ResourceDataflowFlexTemplateJob().Schema
Expand All @@ -607,3 +774,35 @@ func resourceDataflowFlexJobTypeCustomizeDiff(_ context.Context, d *schema.Resou

return nil
}

func dataflowFlexJobTypeTransferVar(ename, pname string, updatedParameters map[string]string, d *schema.ResourceData) (string, map[string]string) {

pstring := fmt.Sprintf("parameters.%s", pname)

if p, ok := d.GetOk(pstring); ok {
delete(updatedParameters, pname)
return p.(string), updatedParameters
} else {
if v, ok := d.GetOk(ename); ok {
return v.(string), updatedParameters
} else {
return "", updatedParameters
}
}
}

func dataflowFlexJobTypeParameterOverride(ename, pname string, d *schema.ResourceDiff) error {

pstring := fmt.Sprintf("parameters.%s", pname)

if p, ok := d.GetOk(pstring); ok {
if d.HasChange(ename) {
e := d.Get(ename)
return fmt.Errorf("Error setting %s, value is supplied twice: %s=\"%s\", %s=\"%s\"", ename, ename, e.(string), pstring, p.(string))
} else {
p := d.Get(pstring)
d.SetNew(ename, p.(string))
}
}
return nil
}
Loading