diff --git a/apis/v1alpha1/ack-generate-metadata.yaml b/apis/v1alpha1/ack-generate-metadata.yaml index 3963f3b..8dc44d7 100644 --- a/apis/v1alpha1/ack-generate-metadata.yaml +++ b/apis/v1alpha1/ack-generate-metadata.yaml @@ -1,13 +1,13 @@ ack_generate_info: - build_date: "2024-12-09T17:29:02Z" - build_hash: 631aeb190e332addb8379672df6367a0875dce88 - go_version: go1.23.3 - version: v0.40.0 -api_directory_checksum: 8c038bde341a2f28283e47ad34e7fd7e1e5c64ba + build_date: "2025-02-05T20:05:54Z" + build_hash: 509a9991d926c6e13b5f02566443e79ebe994e17 + go_version: go1.23.5 + version: v0.41.0-18-g509a999 +api_directory_checksum: 88b2165f4492d336f46f359046ec7a213a8e9169 api_version: v1alpha1 -aws_sdk_go_version: v1.44.93 +aws_sdk_go_version: v1.32.6 generator_config_info: - file_checksum: 20d49de859809a74502b2e823cfa52f6507d6636 + file_checksum: f835af6f8d5aa6338537ce41dc53804c04d30a33 original_file_name: generator.yaml last_modification: reason: API generation diff --git a/apis/v1alpha1/enums.go b/apis/v1alpha1/enums.go index ac2d40b..e741f88 100644 --- a/apis/v1alpha1/enums.go +++ b/apis/v1alpha1/enums.go @@ -15,6 +15,12 @@ package v1alpha1 +type CertificateProviderType string + +const ( + CertificateProviderType_PEM CertificateProviderType = "PEM" +) + type ContainerProviderType string const ( @@ -24,46 +30,53 @@ const ( type EndpointState string const ( - EndpointState_CREATING EndpointState = "CREATING" EndpointState_ACTIVE EndpointState = "ACTIVE" - EndpointState_TERMINATING EndpointState = "TERMINATING" + EndpointState_CREATING EndpointState = "CREATING" EndpointState_TERMINATED EndpointState = "TERMINATED" EndpointState_TERMINATED_WITH_ERRORS EndpointState = "TERMINATED_WITH_ERRORS" + EndpointState_TERMINATING EndpointState = "TERMINATING" ) type FailureReason string const ( + FailureReason_CLUSTER_UNAVAILABLE FailureReason = "CLUSTER_UNAVAILABLE" FailureReason_INTERNAL_ERROR FailureReason = "INTERNAL_ERROR" FailureReason_USER_ERROR FailureReason = "USER_ERROR" FailureReason_VALIDATION_ERROR FailureReason = "VALIDATION_ERROR" - FailureReason_CLUSTER_UNAVAILABLE FailureReason = "CLUSTER_UNAVAILABLE" ) type JobRunState string const ( - JobRunState_PENDING JobRunState = "PENDING" - JobRunState_SUBMITTED JobRunState = "SUBMITTED" - JobRunState_RUNNING JobRunState = "RUNNING" - JobRunState_FAILED JobRunState = "FAILED" JobRunState_CANCELLED JobRunState = "CANCELLED" JobRunState_CANCEL_PENDING JobRunState = "CANCEL_PENDING" JobRunState_COMPLETED JobRunState = "COMPLETED" + JobRunState_FAILED JobRunState = "FAILED" + JobRunState_PENDING JobRunState = "PENDING" + JobRunState_RUNNING JobRunState = "RUNNING" + JobRunState_SUBMITTED JobRunState = "SUBMITTED" ) type PersistentAppUI string const ( - PersistentAppUI_ENABLED PersistentAppUI = "ENABLED" PersistentAppUI_DISABLED PersistentAppUI = "DISABLED" + PersistentAppUI_ENABLED PersistentAppUI = "ENABLED" +) + +type TemplateParameterDataType string + +const ( + TemplateParameterDataType_NUMBER TemplateParameterDataType = "NUMBER" + TemplateParameterDataType_STRING TemplateParameterDataType = "STRING" ) type VirtualClusterState string const ( + VirtualClusterState_ARRESTED VirtualClusterState = "ARRESTED" VirtualClusterState_RUNNING VirtualClusterState = "RUNNING" - VirtualClusterState_TERMINATING VirtualClusterState = "TERMINATING" VirtualClusterState_TERMINATED VirtualClusterState = "TERMINATED" - VirtualClusterState_ARRESTED VirtualClusterState = "ARRESTED" + VirtualClusterState_TERMINATING VirtualClusterState = "TERMINATING" ) diff --git a/apis/v1alpha1/generator.yaml b/apis/v1alpha1/generator.yaml index 8dbda59..41993a8 100644 --- a/apis/v1alpha1/generator.yaml +++ b/apis/v1alpha1/generator.yaml @@ -4,13 +4,20 @@ ignore: - StartJobRunInput.ClientToken - JobRun.ConfigurationOverrides - StartJobRunInput.ConfigurationOverrides + - StartJobRunInput.JobDriver.SparkSqlJobDriver + - StartJobRunInput.JobTemplateId + - StartJobRunInput.RetryPolicyConfiguration + - StartJobRunInput.JobTemplateParameters + - CreateVirtualClusterInput.SecurityConfigurationId operations: null resource_names: - # - VirtualCluster - # - JobRun - - ManagedEndpoint - shape_names: null -model_name: emr-containers + # - VirtualCluster + # - JobRun + - ManagedEndpoint + - JobTemplate + - SecurityConfiguration +sdk_names: + model_name: emr-containers operations: DescribeVirtualCluster: output_wrapper_field_path: VirtualCluster @@ -27,9 +34,12 @@ resources: VirtualCluster: exceptions: terminal_codes: - - ValidationException - - ResourceNotFoundException - - InternalServerException + - ValidationException + - ResourceNotFoundException + - InternalServerException + hooks: + sdk_create_post_build_request: + template_path: hooks/virtual_cluster/sdk_create_post_build_request.go.tpl JobRun: fields: Name: @@ -51,7 +61,7 @@ resources: name: STATE from: operation: DescribeJobRun - path: JobRun.State + path: JobRun.State Id: is_read_only: true print: @@ -61,10 +71,10 @@ resources: type: "string" is_immutable: true is_required: False - compare: + compare: is_ignored: true - # requeue_on_success_seconds is using 15 seconds for ACK to make describe-job-run API call so that it can update fields (ex: State). This is used as default values for now until ACK enables users to configure this value using Helm charts. - reconcile: + # requeue_on_success_seconds is using 15 seconds for ACK to make describe-job-run API call so that it can update fields (ex: State). This is used as default values for now until ACK enables users to configure this value using Helm charts. + reconcile: requeue_on_success_seconds: 15 hooks: delta_pre_compare: @@ -75,4 +85,4 @@ resources: template_path: hooks/configuration_overrides/sdk_read_one_pre_set_output.go.tpl exceptions: terminal_codes: - - ValidationException + - ValidationException diff --git a/apis/v1alpha1/job_run.go b/apis/v1alpha1/job_run.go index 6a2113b..8e05115 100644 --- a/apis/v1alpha1/job_run.go +++ b/apis/v1alpha1/job_run.go @@ -28,16 +28,13 @@ import ( type JobRunSpec struct { ConfigurationOverrides *string `json:"configurationOverrides,omitempty"` // The execution role ARN for the job run. - // +kubebuilder:validation:Required - ExecutionRoleARN *string `json:"executionRoleARN"` + ExecutionRoleARN *string `json:"executionRoleARN,omitempty"` // The job driver for the job run. - // +kubebuilder:validation:Required - JobDriver *JobDriver `json:"jobDriver"` + JobDriver *JobDriver `json:"jobDriver,omitempty"` // The name of the job run. Name *string `json:"name,omitempty"` // The Amazon EMR release version to use for the job run. - // +kubebuilder:validation:Required - ReleaseLabel *string `json:"releaseLabel"` + ReleaseLabel *string `json:"releaseLabel,omitempty"` // The tags assigned to job runs. Tags map[string]*string `json:"tags,omitempty"` // The virtual cluster ID for which the job run request is submitted. diff --git a/apis/v1alpha1/types.go b/apis/v1alpha1/types.go index 7ad6723..a5c16b8 100644 --- a/apis/v1alpha1/types.go +++ b/apis/v1alpha1/types.go @@ -46,7 +46,7 @@ type Configuration struct { // The information about the container used for a job run or a managed endpoint. type ContainerInfo struct { - // The information about the EKS cluster. + // The information about the Amazon EKS cluster. EKSInfo *EKSInfo `json:"eksInfo,omitempty"` } @@ -58,7 +58,7 @@ type ContainerProvider struct { Type *string `json:"type_,omitempty"` } -// The information about the EKS cluster. +// The information about the Amazon EKS cluster. type EKSInfo struct { Namespace *string `json:"namespace,omitempty"` } @@ -77,7 +77,8 @@ type Endpoint struct { VirtualClusterID *string `json:"virtualClusterID,omitempty"` } -// Specify the driver that the job runs on. +// Specify the driver that the job runs on. Exactly one of the two available +// job drivers is required, either sparkSqlJobDriver or sparkSubmitJobDriver. type JobDriver struct { // The information about job driver for Spark submit. SparkSubmitJobDriver *SparkSubmitJobDriver `json:"sparkSubmitJobDriver,omitempty"` @@ -95,14 +96,86 @@ type JobRun_SDK struct { FailureReason *string `json:"failureReason,omitempty"` FinishedAt *metav1.Time `json:"finishedAt,omitempty"` ID *string `json:"id,omitempty"` - // Specify the driver that the job runs on. - JobDriver *JobDriver `json:"jobDriver,omitempty"` - Name *string `json:"name,omitempty"` - ReleaseLabel *string `json:"releaseLabel,omitempty"` - State *string `json:"state,omitempty"` - StateDetails *string `json:"stateDetails,omitempty"` - Tags map[string]*string `json:"tags,omitempty"` - VirtualClusterID *string `json:"virtualClusterID,omitempty"` + // Specify the driver that the job runs on. Exactly one of the two available + // job drivers is required, either sparkSqlJobDriver or sparkSubmitJobDriver. + JobDriver *JobDriver `json:"jobDriver,omitempty"` + Name *string `json:"name,omitempty"` + ReleaseLabel *string `json:"releaseLabel,omitempty"` + // The configuration of the retry policy that the job runs on. + RetryPolicyConfiguration *RetryPolicyConfiguration `json:"retryPolicyConfiguration,omitempty"` + // The current status of the retry policy executed on the job. + RetryPolicyExecution *RetryPolicyExecution `json:"retryPolicyExecution,omitempty"` + State *string `json:"state,omitempty"` + StateDetails *string `json:"stateDetails,omitempty"` + Tags map[string]*string `json:"tags,omitempty"` + VirtualClusterID *string `json:"virtualClusterID,omitempty"` +} + +// This entity describes a job template. Job template stores values of StartJobRun +// API request in a template and can be used to start a job run. Job template +// allows two use cases: avoid repeating recurring StartJobRun API request values, +// enforcing certain values in StartJobRun API request. +type JobTemplate struct { + CreatedAt *metav1.Time `json:"createdAt,omitempty"` + CreatedBy *string `json:"createdBy,omitempty"` + ID *string `json:"id,omitempty"` + Name *string `json:"name,omitempty"` + Tags map[string]*string `json:"tags,omitempty"` +} + +// The values of StartJobRun API requests used in job runs started using the +// job template. +type JobTemplateData struct { + // Specify the driver that the job runs on. Exactly one of the two available + // job drivers is required, either sparkSqlJobDriver or sparkSubmitJobDriver. + JobDriver *JobDriver `json:"jobDriver,omitempty"` + JobTags map[string]*string `json:"jobTags,omitempty"` +} + +// Lake Formation related configuration inputs for the security configuration. +type LakeFormationConfiguration struct { + QueryEngineRoleARN *string `json:"queryEngineRoleARN,omitempty"` +} + +// A configuration for CloudWatch monitoring. You can configure your jobs to +// send log information to CloudWatch Logs. This data type allows job template +// parameters to be specified within. +type ParametricCloudWatchMonitoringConfiguration struct { + LogStreamNamePrefix *string `json:"logStreamNamePrefix,omitempty"` +} + +// The configuration of the retry policy that the job runs on. +type RetryPolicyConfiguration struct { + MaxAttempts *int64 `json:"maxAttempts,omitempty"` +} + +// The current status of the retry policy executed on the job. +type RetryPolicyExecution struct { + CurrentAttemptCount *int64 `json:"currentAttemptCount,omitempty"` +} + +// Namespace inputs for the system job. +type SecureNamespaceInfo struct { + ClusterID *string `json:"clusterID,omitempty"` + Namespace *string `json:"namespace,omitempty"` +} + +// Inputs related to the security configuration. Security configurations in +// Amazon EMR on EKS are templates for different security setups. You can use +// security configurations to configure the Lake Formation integration setup. +// You can also create a security configuration to re-use a security setup each +// time you create a virtual cluster. +type SecurityConfiguration struct { + CreatedAt *metav1.Time `json:"createdAt,omitempty"` + CreatedBy *string `json:"createdBy,omitempty"` + ID *string `json:"id,omitempty"` + Name *string `json:"name,omitempty"` + Tags map[string]*string `json:"tags,omitempty"` +} + +// The job driver for job type. +type SparkSQLJobDriver struct { + EntryPoint *string `json:"entryPoint,omitempty"` } // The information about job driver for Spark submit. @@ -112,20 +185,26 @@ type SparkSubmitJobDriver struct { SparkSubmitParameters *string `json:"sparkSubmitParameters,omitempty"` } +// The configuration of a job template parameter. +type TemplateParameterConfiguration struct { + DefaultValue *string `json:"defaultValue,omitempty"` +} + // This entity describes a virtual cluster. A virtual cluster is a Kubernetes // namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters // to run jobs and host endpoints. Multiple virtual clusters can be backed by // the same physical cluster. However, each virtual cluster maps to one namespace -// on an EKS cluster. Virtual clusters do not create any active resources that -// contribute to your bill or that require lifecycle management outside the -// service. +// on an Amazon EKS cluster. Virtual clusters do not create any active resources +// that contribute to your bill or that require lifecycle management outside +// the service. type VirtualCluster_SDK struct { ARN *string `json:"arn,omitempty"` // The information about the container provider. - ContainerProvider *ContainerProvider `json:"containerProvider,omitempty"` - CreatedAt *metav1.Time `json:"createdAt,omitempty"` - ID *string `json:"id,omitempty"` - Name *string `json:"name,omitempty"` - State *string `json:"state,omitempty"` - Tags map[string]*string `json:"tags,omitempty"` + ContainerProvider *ContainerProvider `json:"containerProvider,omitempty"` + CreatedAt *metav1.Time `json:"createdAt,omitempty"` + ID *string `json:"id,omitempty"` + Name *string `json:"name,omitempty"` + SecurityConfigurationID *string `json:"securityConfigurationID,omitempty"` + State *string `json:"state,omitempty"` + Tags map[string]*string `json:"tags,omitempty"` } diff --git a/apis/v1alpha1/virtual_cluster.go b/apis/v1alpha1/virtual_cluster.go index 385a3f7..68f84a0 100644 --- a/apis/v1alpha1/virtual_cluster.go +++ b/apis/v1alpha1/virtual_cluster.go @@ -26,9 +26,9 @@ import ( // namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters // to run jobs and host endpoints. Multiple virtual clusters can be backed by // the same physical cluster. However, each virtual cluster maps to one namespace -// on an EKS cluster. Virtual clusters do not create any active resources that -// contribute to your bill or that require lifecycle management outside the -// service. +// on an Amazon EKS cluster. Virtual clusters do not create any active resources +// that contribute to your bill or that require lifecycle management outside +// the service. type VirtualClusterSpec struct { // The container provider of the virtual cluster. diff --git a/apis/v1alpha1/zz_generated.deepcopy.go b/apis/v1alpha1/zz_generated.deepcopy.go index 4d5b775..2d32c67 100644 --- a/apis/v1alpha1/zz_generated.deepcopy.go +++ b/apis/v1alpha1/zz_generated.deepcopy.go @@ -451,6 +451,16 @@ func (in *JobRun_SDK) DeepCopyInto(out *JobRun_SDK) { *out = new(string) **out = **in } + if in.RetryPolicyConfiguration != nil { + in, out := &in.RetryPolicyConfiguration, &out.RetryPolicyConfiguration + *out = new(RetryPolicyConfiguration) + (*in).DeepCopyInto(*out) + } + if in.RetryPolicyExecution != nil { + in, out := &in.RetryPolicyExecution, &out.RetryPolicyExecution + *out = new(RetryPolicyExecution) + (*in).DeepCopyInto(*out) + } if in.State != nil { in, out := &in.State, &out.State *out = new(string) @@ -494,6 +504,267 @@ func (in *JobRun_SDK) DeepCopy() *JobRun_SDK { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JobTemplate) DeepCopyInto(out *JobTemplate) { + *out = *in + if in.CreatedAt != nil { + in, out := &in.CreatedAt, &out.CreatedAt + *out = (*in).DeepCopy() + } + if in.CreatedBy != nil { + in, out := &in.CreatedBy, &out.CreatedBy + *out = new(string) + **out = **in + } + if in.ID != nil { + in, out := &in.ID, &out.ID + *out = new(string) + **out = **in + } + if in.Name != nil { + in, out := &in.Name, &out.Name + *out = new(string) + **out = **in + } + if in.Tags != nil { + in, out := &in.Tags, &out.Tags + *out = make(map[string]*string, len(*in)) + for key, val := range *in { + var outVal *string + if val == nil { + (*out)[key] = nil + } else { + inVal := (*in)[key] + in, out := &inVal, &outVal + *out = new(string) + **out = **in + } + (*out)[key] = outVal + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobTemplate. +func (in *JobTemplate) DeepCopy() *JobTemplate { + if in == nil { + return nil + } + out := new(JobTemplate) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JobTemplateData) DeepCopyInto(out *JobTemplateData) { + *out = *in + if in.JobDriver != nil { + in, out := &in.JobDriver, &out.JobDriver + *out = new(JobDriver) + (*in).DeepCopyInto(*out) + } + if in.JobTags != nil { + in, out := &in.JobTags, &out.JobTags + *out = make(map[string]*string, len(*in)) + for key, val := range *in { + var outVal *string + if val == nil { + (*out)[key] = nil + } else { + inVal := (*in)[key] + in, out := &inVal, &outVal + *out = new(string) + **out = **in + } + (*out)[key] = outVal + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JobTemplateData. +func (in *JobTemplateData) DeepCopy() *JobTemplateData { + if in == nil { + return nil + } + out := new(JobTemplateData) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LakeFormationConfiguration) DeepCopyInto(out *LakeFormationConfiguration) { + *out = *in + if in.QueryEngineRoleARN != nil { + in, out := &in.QueryEngineRoleARN, &out.QueryEngineRoleARN + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LakeFormationConfiguration. +func (in *LakeFormationConfiguration) DeepCopy() *LakeFormationConfiguration { + if in == nil { + return nil + } + out := new(LakeFormationConfiguration) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ParametricCloudWatchMonitoringConfiguration) DeepCopyInto(out *ParametricCloudWatchMonitoringConfiguration) { + *out = *in + if in.LogStreamNamePrefix != nil { + in, out := &in.LogStreamNamePrefix, &out.LogStreamNamePrefix + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ParametricCloudWatchMonitoringConfiguration. +func (in *ParametricCloudWatchMonitoringConfiguration) DeepCopy() *ParametricCloudWatchMonitoringConfiguration { + if in == nil { + return nil + } + out := new(ParametricCloudWatchMonitoringConfiguration) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RetryPolicyConfiguration) DeepCopyInto(out *RetryPolicyConfiguration) { + *out = *in + if in.MaxAttempts != nil { + in, out := &in.MaxAttempts, &out.MaxAttempts + *out = new(int64) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RetryPolicyConfiguration. +func (in *RetryPolicyConfiguration) DeepCopy() *RetryPolicyConfiguration { + if in == nil { + return nil + } + out := new(RetryPolicyConfiguration) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RetryPolicyExecution) DeepCopyInto(out *RetryPolicyExecution) { + *out = *in + if in.CurrentAttemptCount != nil { + in, out := &in.CurrentAttemptCount, &out.CurrentAttemptCount + *out = new(int64) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RetryPolicyExecution. +func (in *RetryPolicyExecution) DeepCopy() *RetryPolicyExecution { + if in == nil { + return nil + } + out := new(RetryPolicyExecution) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SecureNamespaceInfo) DeepCopyInto(out *SecureNamespaceInfo) { + *out = *in + if in.ClusterID != nil { + in, out := &in.ClusterID, &out.ClusterID + *out = new(string) + **out = **in + } + if in.Namespace != nil { + in, out := &in.Namespace, &out.Namespace + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecureNamespaceInfo. +func (in *SecureNamespaceInfo) DeepCopy() *SecureNamespaceInfo { + if in == nil { + return nil + } + out := new(SecureNamespaceInfo) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SecurityConfiguration) DeepCopyInto(out *SecurityConfiguration) { + *out = *in + if in.CreatedAt != nil { + in, out := &in.CreatedAt, &out.CreatedAt + *out = (*in).DeepCopy() + } + if in.CreatedBy != nil { + in, out := &in.CreatedBy, &out.CreatedBy + *out = new(string) + **out = **in + } + if in.ID != nil { + in, out := &in.ID, &out.ID + *out = new(string) + **out = **in + } + if in.Name != nil { + in, out := &in.Name, &out.Name + *out = new(string) + **out = **in + } + if in.Tags != nil { + in, out := &in.Tags, &out.Tags + *out = make(map[string]*string, len(*in)) + for key, val := range *in { + var outVal *string + if val == nil { + (*out)[key] = nil + } else { + inVal := (*in)[key] + in, out := &inVal, &outVal + *out = new(string) + **out = **in + } + (*out)[key] = outVal + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SecurityConfiguration. +func (in *SecurityConfiguration) DeepCopy() *SecurityConfiguration { + if in == nil { + return nil + } + out := new(SecurityConfiguration) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SparkSQLJobDriver) DeepCopyInto(out *SparkSQLJobDriver) { + *out = *in + if in.EntryPoint != nil { + in, out := &in.EntryPoint, &out.EntryPoint + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SparkSQLJobDriver. +func (in *SparkSQLJobDriver) DeepCopy() *SparkSQLJobDriver { + if in == nil { + return nil + } + out := new(SparkSQLJobDriver) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SparkSubmitJobDriver) DeepCopyInto(out *SparkSubmitJobDriver) { *out = *in @@ -530,6 +801,26 @@ func (in *SparkSubmitJobDriver) DeepCopy() *SparkSubmitJobDriver { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemplateParameterConfiguration) DeepCopyInto(out *TemplateParameterConfiguration) { + *out = *in + if in.DefaultValue != nil { + in, out := &in.DefaultValue, &out.DefaultValue + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemplateParameterConfiguration. +func (in *TemplateParameterConfiguration) DeepCopy() *TemplateParameterConfiguration { + if in == nil { + return nil + } + out := new(TemplateParameterConfiguration) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VirtualCluster) DeepCopyInto(out *VirtualCluster) { *out = *in @@ -693,6 +984,11 @@ func (in *VirtualCluster_SDK) DeepCopyInto(out *VirtualCluster_SDK) { *out = new(string) **out = **in } + if in.SecurityConfigurationID != nil { + in, out := &in.SecurityConfigurationID, &out.SecurityConfigurationID + *out = new(string) + **out = **in + } if in.State != nil { in, out := &in.State, &out.State *out = new(string) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index 507ff23..630e88a 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -16,6 +16,7 @@ package main import ( + "context" "os" ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1" @@ -37,7 +38,6 @@ import ( svctypes "github.com/aws-controllers-k8s/emrcontainers-controller/apis/v1alpha1" svcresource "github.com/aws-controllers-k8s/emrcontainers-controller/pkg/resource" - svcsdk "github.com/aws/aws-sdk-go/service/emrcontainers" _ "github.com/aws-controllers-k8s/emrcontainers-controller/pkg/resource/job_run" _ "github.com/aws-controllers-k8s/emrcontainers-controller/pkg/resource/virtual_cluster" @@ -46,11 +46,10 @@ import ( ) var ( - awsServiceAPIGroup = "emrcontainers.services.k8s.aws" - awsServiceAlias = "emrcontainers" - awsServiceEndpointsID = svcsdk.EndpointsID - scheme = runtime.NewScheme() - setupLog = ctrlrt.Log.WithName("setup") + awsServiceAPIGroup = "emrcontainers.services.k8s.aws" + awsServiceAlias = "emrcontainers" + scheme = runtime.NewScheme() + setupLog = ctrlrt.Log.WithName("setup") ) func init() { @@ -72,7 +71,8 @@ func main() { resourceGVKs = append(resourceGVKs, mf.ResourceDescriptor().GroupVersionKind()) } - if err := ackCfg.Validate(ackcfg.WithGVKs(resourceGVKs)); err != nil { + ctx := context.Background() + if err := ackCfg.Validate(ctx, ackcfg.WithGVKs(resourceGVKs)); err != nil { setupLog.Error( err, "Unable to create controller manager", "aws.service", awsServiceAlias, @@ -137,7 +137,7 @@ func main() { "aws.service", awsServiceAlias, ) sc := ackrt.NewServiceController( - awsServiceAlias, awsServiceAPIGroup, awsServiceEndpointsID, + awsServiceAlias, awsServiceAPIGroup, acktypes.VersionInfo{ version.GitCommit, version.GitVersion, diff --git a/config/crd/bases/emrcontainers.services.k8s.aws_jobruns.yaml b/config/crd/bases/emrcontainers.services.k8s.aws_jobruns.yaml index 6261b83..15b8e7e 100644 --- a/config/crd/bases/emrcontainers.services.k8s.aws_jobruns.yaml +++ b/config/crd/bases/emrcontainers.services.k8s.aws_jobruns.yaml @@ -104,10 +104,6 @@ spec: type: string type: object type: object - required: - - executionRoleARN - - jobDriver - - releaseLabel type: object status: description: JobRunStatus defines the observed state of JobRun diff --git a/config/crd/bases/emrcontainers.services.k8s.aws_virtualclusters.yaml b/config/crd/bases/emrcontainers.services.k8s.aws_virtualclusters.yaml index b57988d..2d344d4 100644 --- a/config/crd/bases/emrcontainers.services.k8s.aws_virtualclusters.yaml +++ b/config/crd/bases/emrcontainers.services.k8s.aws_virtualclusters.yaml @@ -44,9 +44,9 @@ spec: namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints. Multiple virtual clusters can be backed by the same physical cluster. However, each virtual cluster maps to one namespace - on an EKS cluster. Virtual clusters do not create any active resources that - contribute to your bill or that require lifecycle management outside the - service. + on an Amazon EKS cluster. Virtual clusters do not create any active resources + that contribute to your bill or that require lifecycle management outside + the service. properties: containerProvider: description: The container provider of the virtual cluster. @@ -58,7 +58,7 @@ spec: run or a managed endpoint. properties: eksInfo: - description: The information about the EKS cluster. + description: The information about the Amazon EKS cluster. properties: namespace: type: string diff --git a/generator.yaml b/generator.yaml index 8dbda59..41993a8 100644 --- a/generator.yaml +++ b/generator.yaml @@ -4,13 +4,20 @@ ignore: - StartJobRunInput.ClientToken - JobRun.ConfigurationOverrides - StartJobRunInput.ConfigurationOverrides + - StartJobRunInput.JobDriver.SparkSqlJobDriver + - StartJobRunInput.JobTemplateId + - StartJobRunInput.RetryPolicyConfiguration + - StartJobRunInput.JobTemplateParameters + - CreateVirtualClusterInput.SecurityConfigurationId operations: null resource_names: - # - VirtualCluster - # - JobRun - - ManagedEndpoint - shape_names: null -model_name: emr-containers + # - VirtualCluster + # - JobRun + - ManagedEndpoint + - JobTemplate + - SecurityConfiguration +sdk_names: + model_name: emr-containers operations: DescribeVirtualCluster: output_wrapper_field_path: VirtualCluster @@ -27,9 +34,12 @@ resources: VirtualCluster: exceptions: terminal_codes: - - ValidationException - - ResourceNotFoundException - - InternalServerException + - ValidationException + - ResourceNotFoundException + - InternalServerException + hooks: + sdk_create_post_build_request: + template_path: hooks/virtual_cluster/sdk_create_post_build_request.go.tpl JobRun: fields: Name: @@ -51,7 +61,7 @@ resources: name: STATE from: operation: DescribeJobRun - path: JobRun.State + path: JobRun.State Id: is_read_only: true print: @@ -61,10 +71,10 @@ resources: type: "string" is_immutable: true is_required: False - compare: + compare: is_ignored: true - # requeue_on_success_seconds is using 15 seconds for ACK to make describe-job-run API call so that it can update fields (ex: State). This is used as default values for now until ACK enables users to configure this value using Helm charts. - reconcile: + # requeue_on_success_seconds is using 15 seconds for ACK to make describe-job-run API call so that it can update fields (ex: State). This is used as default values for now until ACK enables users to configure this value using Helm charts. + reconcile: requeue_on_success_seconds: 15 hooks: delta_pre_compare: @@ -75,4 +85,4 @@ resources: template_path: hooks/configuration_overrides/sdk_read_one_pre_set_output.go.tpl exceptions: terminal_codes: - - ValidationException + - ValidationException diff --git a/go.mod b/go.mod index 770e392..56100ab 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,11 @@ go 1.22.0 toolchain go1.22.5 require ( - github.com/aws-controllers-k8s/runtime v0.40.0 + github.com/aws-controllers-k8s/runtime v0.41.1-0.20250204215244-e48dd7b2d6d0 github.com/aws/aws-sdk-go v1.49.0 + github.com/aws/aws-sdk-go-v2 v1.35.0 + github.com/aws/aws-sdk-go-v2/service/emrcontainers v1.33.14 + github.com/aws/smithy-go v1.22.2 github.com/ghodss/yaml v1.0.0 github.com/go-logr/logr v1.4.2 github.com/spf13/pflag v1.0.5 @@ -17,6 +20,17 @@ require ( ) require ( + github.com/aws/aws-sdk-go-v2/config v1.28.6 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.47 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.30 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.30 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -40,7 +54,6 @@ require ( github.com/itchyny/gojq v0.12.6 // indirect github.com/itchyny/timefmt-go v0.1.3 // indirect github.com/jaypipes/envutil v1.0.0 // indirect - github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -57,11 +70,11 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect - golang.org/x/net v0.26.0 // indirect + golang.org/x/net v0.33.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect - golang.org/x/sys v0.21.0 // indirect - golang.org/x/term v0.21.0 // indirect - golang.org/x/text v0.16.0 // indirect + golang.org/x/sys v0.28.0 // indirect + golang.org/x/term v0.27.0 // indirect + golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.3.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/protobuf v1.34.2 // indirect diff --git a/go.sum b/go.sum index d54df4c..c2ac3aa 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,35 @@ -github.com/aws-controllers-k8s/runtime v0.40.0 h1:FplFYgzCIbQsPafarP3dy/4bG1uGR8G1OLYOWO4a7Lc= -github.com/aws-controllers-k8s/runtime v0.40.0/go.mod h1:G07g26y1cxyZO6Ngp+LwXf03CqFyLNL7os4Py4IdyGY= +github.com/aws-controllers-k8s/runtime v0.41.1-0.20250204215244-e48dd7b2d6d0 h1:ygZwhPfearlE8/P0HY8rXpFsbarwJ5tzBIov+3xgQfk= +github.com/aws-controllers-k8s/runtime v0.41.1-0.20250204215244-e48dd7b2d6d0/go.mod h1:Oy0JKvDxZMZ+SVupm4NZVqP00KLIIAMfk93KnOwlt5c= github.com/aws/aws-sdk-go v1.49.0 h1:g9BkW1fo9GqKfwg2+zCD+TW/D36Ux+vtfJ8guF4AYmY= github.com/aws/aws-sdk-go v1.49.0/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go-v2 v1.35.0 h1:jTPxEJyzjSuuz0wB+302hr8Eu9KUI+Zv8zlujMGJpVI= +github.com/aws/aws-sdk-go-v2 v1.35.0/go.mod h1:JgstGg0JjWU1KpVJjD5H0y0yyAIpSdKEq556EI6yOOM= +github.com/aws/aws-sdk-go-v2/config v1.28.6 h1:D89IKtGrs/I3QXOLNTH93NJYtDhm8SYa9Q5CsPShmyo= +github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko= +github.com/aws/aws-sdk-go-v2/credentials v1.17.47 h1:48bA+3/fCdi2yAwVt+3COvmatZ6jUDNkDTIsqDiMUdw= +github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21 h1:AmoU1pziydclFT/xRV+xXE/Vb8fttJCLRPv8oAkprc0= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.30 h1:+7AzSGNhHoY53di13lvztf9Dyd/9ofzoYGBllkWp3a0= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.30/go.mod h1:Jxd/FrCny99yURiQiMywgXvBhd7tmgdv6KdlUTNzMSo= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.30 h1:Ex06eY6I5rO7IX0HalGfa5nGjpBoOsS1Qm3xfjkuszs= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.30/go.mod h1:AvyEMA9QcX59kFhVizBpIBpEMThUTXssuJe+emBdcGM= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/emrcontainers v1.33.14 h1:uXGRq1a7SDUVwQZf4d+DbrUr58e/BjB2J4Azo8ALAuY= +github.com/aws/aws-sdk-go-v2/service/emrcontainers v1.33.14/go.mod h1:c0YLGT/09MnoVHHw3oKRSkW8imedFU1mI3/DUa0X5Ns= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1 h1:iXtILhvDxB6kPvEXgsDhGaZCSC6LQET5ZHSdJozeI0Y= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6 h1:50+XsN70RS7dwJ2CkVNXzj7U2L1HKP8nqTd3XWEXBN4= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6/go.mod h1:WqgLmwY7so32kG01zD8CPTJWVWM+TzJoOVHwTg4aPug= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.7 h1:rLnYAfXQ3YAccocshIH5mzNNwZBkBo+bP6EhIxak6Hw= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.7/go.mod h1:ZHtuQJ6t9A/+YDuxOLnbryAmITtr8UysSny3qcyvJTc= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6 h1:JnhTZR3PiYDNKlXy50/pNeix9aGMo6lLpXwJ1mw8MD4= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6/go.mod h1:URronUEGfXZN1VpdktPSD1EkAL9mfrV+2F4sjH38qOY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.2 h1:s4074ZO1Hk8qv65GqNXqDjmkf4HSQqJukaLuuW0TpDA= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.2/go.mod h1:mVggCnIWoM09jP71Wh+ea7+5gAp53q+49wDFs1SW5z8= +github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ= +github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= @@ -67,8 +95,6 @@ github.com/jaypipes/envutil v1.0.0 h1:u6Vwy9HwruFihoZrL0bxDLCa/YNadGVwKyPElNmZWo github.com/jaypipes/envutil v1.0.0/go.mod h1:vgIRDly+xgBq0eeZRcflOHMMobMwgC6MkMbxo/Nw65M= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= -github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -148,8 +174,8 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= -golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= +golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= +golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs= golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -160,14 +186,14 @@ golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= -golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/term v0.21.0 h1:WVXCp+/EBEHOj53Rvu+7KiT/iElMrO8ACK16SMZ3jaA= -golang.org/x/term v0.21.0/go.mod h1:ooXLefLobQVslOqselCNF4SxFAaoS6KujMbsGzSDmX0= +golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= +golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.27.0 h1:WP60Sv1nlK1T6SupCHbXzSaN0b9wUmsPoRS9b61A23Q= +golang.org/x/term v0.27.0/go.mod h1:iMsnZpn0cago0GOrHO2+Y7u7JPn5AylBrcoWkElMTSM= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= -golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= +golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/helm/crds/emrcontainers.services.k8s.aws_jobruns.yaml b/helm/crds/emrcontainers.services.k8s.aws_jobruns.yaml index 6261b83..15b8e7e 100644 --- a/helm/crds/emrcontainers.services.k8s.aws_jobruns.yaml +++ b/helm/crds/emrcontainers.services.k8s.aws_jobruns.yaml @@ -104,10 +104,6 @@ spec: type: string type: object type: object - required: - - executionRoleARN - - jobDriver - - releaseLabel type: object status: description: JobRunStatus defines the observed state of JobRun diff --git a/helm/crds/emrcontainers.services.k8s.aws_virtualclusters.yaml b/helm/crds/emrcontainers.services.k8s.aws_virtualclusters.yaml index b57988d..2d344d4 100644 --- a/helm/crds/emrcontainers.services.k8s.aws_virtualclusters.yaml +++ b/helm/crds/emrcontainers.services.k8s.aws_virtualclusters.yaml @@ -44,9 +44,9 @@ spec: namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints. Multiple virtual clusters can be backed by the same physical cluster. However, each virtual cluster maps to one namespace - on an EKS cluster. Virtual clusters do not create any active resources that - contribute to your bill or that require lifecycle management outside the - service. + on an Amazon EKS cluster. Virtual clusters do not create any active resources + that contribute to your bill or that require lifecycle management outside + the service. properties: containerProvider: description: The container provider of the virtual cluster. @@ -58,7 +58,7 @@ spec: run or a managed endpoint. properties: eksInfo: - description: The information about the EKS cluster. + description: The information about the Amazon EKS cluster. properties: namespace: type: string diff --git a/pkg/resource/job_run/hooks.go b/pkg/resource/job_run/hooks.go index 2b63eb0..4ea9819 100644 --- a/pkg/resource/job_run/hooks.go +++ b/pkg/resource/job_run/hooks.go @@ -4,12 +4,12 @@ import ( "reflect" ackcompare "github.com/aws-controllers-k8s/runtime/pkg/compare" - "github.com/aws/aws-sdk-go/aws" - svcsdk "github.com/aws/aws-sdk-go/service/emrcontainers" + "github.com/aws/aws-sdk-go-v2/aws" + svcsdktypes "github.com/aws/aws-sdk-go-v2/service/emrcontainers/types" "github.com/ghodss/yaml" ) -func configurationOverridesToString(cfg *svcsdk.ConfigurationOverrides) (*string, error) { +func configurationOverridesToString(cfg *svcsdktypes.ConfigurationOverrides) (*string, error) { configBytes, err := yaml.Marshal(cfg) if err != nil { return nil, err @@ -18,12 +18,12 @@ func configurationOverridesToString(cfg *svcsdk.ConfigurationOverrides) (*string return &configStr, nil } -func stringToConfigurationOverrides(cfg *string) (*svcsdk.ConfigurationOverrides, error) { +func stringToConfigurationOverrides(cfg *string) (*svcsdktypes.ConfigurationOverrides, error) { if cfg == nil { cfg = aws.String("") } - var config svcsdk.ConfigurationOverrides + var config svcsdktypes.ConfigurationOverrides err := yaml.Unmarshal([]byte(*cfg), &config) if err != nil { return nil, err @@ -55,15 +55,13 @@ func customPreCompare( delta.Add("Spec.ConfigurationOverrides", aConfig.MonitoringConfiguration, bConfig.MonitoringConfiguration) } else if aConfig.MonitoringConfiguration != nil && bConfig.MonitoringConfiguration != nil { if ackcompare.HasNilDifference(aConfig.MonitoringConfiguration.PersistentAppUI, bConfig.MonitoringConfiguration.PersistentAppUI) { - if aConfig.MonitoringConfiguration.PersistentAppUI == nil && *bConfig.MonitoringConfiguration.PersistentAppUI == "ENABLED" { + if aConfig.MonitoringConfiguration.PersistentAppUI == "" && bConfig.MonitoringConfiguration.PersistentAppUI == svcsdktypes.PersistentAppUIEnabled { // We do not consider this as a difference because the API defaults PersistentAppUI to "ENABLED" } else { delta.Add("Spec.ConfigurationOverrides.PersistentAppUI", aConfig.MonitoringConfiguration.PersistentAppUI, bConfig.MonitoringConfiguration.PersistentAppUI) } - } else if aConfig.MonitoringConfiguration.PersistentAppUI != nil && bConfig.MonitoringConfiguration.PersistentAppUI != nil { - if *aConfig.MonitoringConfiguration.PersistentAppUI != *bConfig.MonitoringConfiguration.PersistentAppUI { - delta.Add("Spec.ConfigurationOverrides.PersistentAppUI", aConfig.MonitoringConfiguration.PersistentAppUI, bConfig.MonitoringConfiguration.PersistentAppUI) - } + } else if aConfig.MonitoringConfiguration.PersistentAppUI != bConfig.MonitoringConfiguration.PersistentAppUI { + delta.Add("Spec.ConfigurationOverrides.PersistentAppUI", aConfig.MonitoringConfiguration.PersistentAppUI, bConfig.MonitoringConfiguration.PersistentAppUI) } if ackcompare.HasNilDifference( aConfig.MonitoringConfiguration.CloudWatchMonitoringConfiguration, diff --git a/pkg/resource/job_run/manager.go b/pkg/resource/job_run/manager.go index 17afd2a..9829f54 100644 --- a/pkg/resource/job_run/manager.go +++ b/pkg/resource/job_run/manager.go @@ -32,9 +32,8 @@ import ( acktags "github.com/aws-controllers-k8s/runtime/pkg/tags" acktypes "github.com/aws-controllers-k8s/runtime/pkg/types" ackutil "github.com/aws-controllers-k8s/runtime/pkg/util" - "github.com/aws/aws-sdk-go/aws/session" - svcsdk "github.com/aws/aws-sdk-go/service/emrcontainers" - svcsdkapi "github.com/aws/aws-sdk-go/service/emrcontainers/emrcontainersiface" + "github.com/aws/aws-sdk-go-v2/aws" + svcsdk "github.com/aws/aws-sdk-go-v2/service/emrcontainers" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -59,6 +58,9 @@ type resourceManager struct { // cfg is a copy of the ackcfg.Config object passed on start of the service // controller cfg ackcfg.Config + // clientcfg is a copy of the client configuration passed on start of the + // service controller + clientcfg aws.Config // log refers to the logr.Logger object handling logging for the service // controller log logr.Logger @@ -73,12 +75,9 @@ type resourceManager struct { awsAccountID ackv1alpha1.AWSAccountID // The AWS Region that this resource manager targets awsRegion ackv1alpha1.AWSRegion - // sess is the AWS SDK Session object used to communicate with the backend - // AWS service API - sess *session.Session - // sdk is a pointer to the AWS service API interface exposed by the - // aws-sdk-go/services/{alias}/{alias}iface package. - sdkapi svcsdkapi.EMRContainersAPI + // sdk is a pointer to the AWS service API client exposed by the + // aws-sdk-go-v2/services/{alias} package. + sdkapi *svcsdk.Client } // concreteResource returns a pointer to a resource from the supplied @@ -299,24 +298,25 @@ func (rm *resourceManager) EnsureTags( // newResourceManager returns a new struct implementing // acktypes.AWSResourceManager +// This is for AWS-SDK-GO-V2 - Created newResourceManager With AWS sdk-Go-ClientV2 func newResourceManager( cfg ackcfg.Config, + clientcfg aws.Config, log logr.Logger, metrics *ackmetrics.Metrics, rr acktypes.Reconciler, - sess *session.Session, id ackv1alpha1.AWSAccountID, region ackv1alpha1.AWSRegion, ) (*resourceManager, error) { return &resourceManager{ cfg: cfg, + clientcfg: clientcfg, log: log, metrics: metrics, rr: rr, awsAccountID: id, awsRegion: region, - sess: sess, - sdkapi: svcsdk.New(sess), + sdkapi: svcsdk.NewFromConfig(clientcfg), }, nil } diff --git a/pkg/resource/job_run/manager_factory.go b/pkg/resource/job_run/manager_factory.go index 4d2d3ad..a78e5ee 100644 --- a/pkg/resource/job_run/manager_factory.go +++ b/pkg/resource/job_run/manager_factory.go @@ -23,7 +23,7 @@ import ( ackcfg "github.com/aws-controllers-k8s/runtime/pkg/config" ackmetrics "github.com/aws-controllers-k8s/runtime/pkg/metrics" acktypes "github.com/aws-controllers-k8s/runtime/pkg/types" - "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/go-logr/logr" svcresource "github.com/aws-controllers-k8s/emrcontainers-controller/pkg/resource" @@ -47,10 +47,10 @@ func (f *resourceManagerFactory) ResourceDescriptor() acktypes.AWSResourceDescri // supplied AWS account func (f *resourceManagerFactory) ManagerFor( cfg ackcfg.Config, + clientcfg aws.Config, log logr.Logger, metrics *ackmetrics.Metrics, rr acktypes.Reconciler, - sess *session.Session, id ackv1alpha1.AWSAccountID, region ackv1alpha1.AWSRegion, roleARN ackv1alpha1.AWSResourceName, @@ -70,7 +70,7 @@ func (f *resourceManagerFactory) ManagerFor( f.Lock() defer f.Unlock() - rm, err := newResourceManager(cfg, log, metrics, rr, sess, id, region) + rm, err := newResourceManager(cfg, clientcfg, log, metrics, rr, id, region) if err != nil { return nil, err } diff --git a/pkg/resource/job_run/resource.go b/pkg/resource/job_run/resource.go index 27b7ed4..c4974d7 100644 --- a/pkg/resource/job_run/resource.go +++ b/pkg/resource/job_run/resource.go @@ -19,6 +19,7 @@ import ( ackv1alpha1 "github.com/aws-controllers-k8s/runtime/apis/core/v1alpha1" ackerrors "github.com/aws-controllers-k8s/runtime/pkg/errors" acktypes "github.com/aws-controllers-k8s/runtime/pkg/types" + "github.com/aws/aws-sdk-go-v2/aws" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" rtclient "sigs.k8s.io/controller-runtime/pkg/client" @@ -92,7 +93,7 @@ func (r *resource) SetIdentifiers(identifier *ackv1alpha1.AWSIdentifiers) error f1, f1ok := identifier.AdditionalKeys["virtualClusterID"] if f1ok { - r.ko.Spec.VirtualClusterID = &f1 + r.ko.Spec.VirtualClusterID = aws.String(f1) } return nil @@ -108,7 +109,7 @@ func (r *resource) PopulateResourceFromAnnotation(fields map[string]string) erro f1, f1ok := fields["virtualClusterID"] if f1ok { - r.ko.Spec.VirtualClusterID = &f1 + r.ko.Spec.VirtualClusterID = aws.String(f1) } return nil diff --git a/pkg/resource/job_run/sdk.go b/pkg/resource/job_run/sdk.go index 4ab4331..d4f3cee 100644 --- a/pkg/resource/job_run/sdk.go +++ b/pkg/resource/job_run/sdk.go @@ -28,8 +28,10 @@ import ( ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors" ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue" ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log" - "github.com/aws/aws-sdk-go/aws" - svcsdk "github.com/aws/aws-sdk-go/service/emrcontainers" + "github.com/aws/aws-sdk-go-v2/aws" + svcsdk "github.com/aws/aws-sdk-go-v2/service/emrcontainers" + svcsdktypes "github.com/aws/aws-sdk-go-v2/service/emrcontainers/types" + smithy "github.com/aws/smithy-go" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -40,8 +42,7 @@ import ( var ( _ = &metav1.Time{} _ = strings.ToLower("") - _ = &aws.JSONValue{} - _ = &svcsdk.EMRContainers{} + _ = &svcsdk.Client{} _ = &svcapitypes.JobRun{} _ = ackv1alpha1.AWSAccountID("") _ = &ackerr.NotFound @@ -49,6 +50,7 @@ var ( _ = &reflect.Value{} _ = fmt.Sprintf("") _ = &ackrequeue.NoRequeue{} + _ = &aws.Config{} ) // sdkFind returns SDK-specific information about a supplied resource @@ -74,13 +76,11 @@ func (rm *resourceManager) sdkFind( } var resp *svcsdk.DescribeJobRunOutput - resp, err = rm.sdkapi.DescribeJobRunWithContext(ctx, input) + resp, err = rm.sdkapi.DescribeJobRun(ctx, input) rm.metrics.RecordAPICall("READ_ONE", "DescribeJobRun", err) if err != nil { - if reqErr, ok := ackerr.AWSRequestFailure(err); ok && reqErr.StatusCode() == 404 { - return nil, ackerr.NotFound - } - if awsErr, ok := ackerr.AWSError(err); ok && awsErr.Code() == "UNKNOWN" { + var awsErr smithy.APIError + if errors.As(err, &awsErr) && awsErr.ErrorCode() == "UNKNOWN" { return nil, ackerr.NotFound } return nil, err @@ -122,13 +122,7 @@ func (rm *resourceManager) sdkFind( f8f0.EntryPoint = resp.JobRun.JobDriver.SparkSubmitJobDriver.EntryPoint } if resp.JobRun.JobDriver.SparkSubmitJobDriver.EntryPointArguments != nil { - f8f0f1 := []*string{} - for _, f8f0f1iter := range resp.JobRun.JobDriver.SparkSubmitJobDriver.EntryPointArguments { - var f8f0f1elem string - f8f0f1elem = *f8f0f1iter - f8f0f1 = append(f8f0f1, &f8f0f1elem) - } - f8f0.EntryPointArguments = f8f0f1 + f8f0.EntryPointArguments = aws.StringSlice(resp.JobRun.JobDriver.SparkSubmitJobDriver.EntryPointArguments) } if resp.JobRun.JobDriver.SparkSubmitJobDriver.SparkSubmitParameters != nil { f8f0.SparkSubmitParameters = resp.JobRun.JobDriver.SparkSubmitJobDriver.SparkSubmitParameters @@ -149,19 +143,13 @@ func (rm *resourceManager) sdkFind( } else { ko.Spec.ReleaseLabel = nil } - if resp.JobRun.State != nil { - ko.Status.State = resp.JobRun.State + if resp.JobRun.State != "" { + ko.Status.State = aws.String(string(resp.JobRun.State)) } else { ko.Status.State = nil } if resp.JobRun.Tags != nil { - f13 := map[string]*string{} - for f13key, f13valiter := range resp.JobRun.Tags { - var f13val string - f13val = *f13valiter - f13[f13key] = &f13val - } - ko.Spec.Tags = f13 + ko.Spec.Tags = aws.StringMap(resp.JobRun.Tags) } else { ko.Spec.Tags = nil } @@ -193,10 +181,10 @@ func (rm *resourceManager) newDescribeRequestPayload( res := &svcsdk.DescribeJobRunInput{} if r.ko.Status.ID != nil { - res.SetId(*r.ko.Status.ID) + res.Id = r.ko.Status.ID } if r.ko.Spec.VirtualClusterID != nil { - res.SetVirtualClusterId(*r.ko.Spec.VirtualClusterID) + res.VirtualClusterId = r.ko.Spec.VirtualClusterID } return res, nil @@ -228,7 +216,7 @@ func (rm *resourceManager) sdkCreate( var resp *svcsdk.StartJobRunOutput _ = resp - resp, err = rm.sdkapi.StartJobRunWithContext(ctx, input) + resp, err = rm.sdkapi.StartJobRun(ctx, input) rm.metrics.RecordAPICall("CREATE", "StartJobRun", err) if err != nil { return nil, err @@ -273,48 +261,36 @@ func (rm *resourceManager) newCreateRequestPayload( res := &svcsdk.StartJobRunInput{} if r.ko.Spec.ExecutionRoleARN != nil { - res.SetExecutionRoleArn(*r.ko.Spec.ExecutionRoleARN) + res.ExecutionRoleArn = r.ko.Spec.ExecutionRoleARN } if r.ko.Spec.JobDriver != nil { - f1 := &svcsdk.JobDriver{} + f1 := &svcsdktypes.JobDriver{} if r.ko.Spec.JobDriver.SparkSubmitJobDriver != nil { - f1f0 := &svcsdk.SparkSubmitJobDriver{} + f1f0 := &svcsdktypes.SparkSubmitJobDriver{} if r.ko.Spec.JobDriver.SparkSubmitJobDriver.EntryPoint != nil { - f1f0.SetEntryPoint(*r.ko.Spec.JobDriver.SparkSubmitJobDriver.EntryPoint) + f1f0.EntryPoint = r.ko.Spec.JobDriver.SparkSubmitJobDriver.EntryPoint } if r.ko.Spec.JobDriver.SparkSubmitJobDriver.EntryPointArguments != nil { - f1f0f1 := []*string{} - for _, f1f0f1iter := range r.ko.Spec.JobDriver.SparkSubmitJobDriver.EntryPointArguments { - var f1f0f1elem string - f1f0f1elem = *f1f0f1iter - f1f0f1 = append(f1f0f1, &f1f0f1elem) - } - f1f0.SetEntryPointArguments(f1f0f1) + f1f0.EntryPointArguments = aws.ToStringSlice(r.ko.Spec.JobDriver.SparkSubmitJobDriver.EntryPointArguments) } if r.ko.Spec.JobDriver.SparkSubmitJobDriver.SparkSubmitParameters != nil { - f1f0.SetSparkSubmitParameters(*r.ko.Spec.JobDriver.SparkSubmitJobDriver.SparkSubmitParameters) + f1f0.SparkSubmitParameters = r.ko.Spec.JobDriver.SparkSubmitJobDriver.SparkSubmitParameters } - f1.SetSparkSubmitJobDriver(f1f0) + f1.SparkSubmitJobDriver = f1f0 } - res.SetJobDriver(f1) + res.JobDriver = f1 } if r.ko.Spec.Name != nil { - res.SetName(*r.ko.Spec.Name) + res.Name = r.ko.Spec.Name } if r.ko.Spec.ReleaseLabel != nil { - res.SetReleaseLabel(*r.ko.Spec.ReleaseLabel) + res.ReleaseLabel = r.ko.Spec.ReleaseLabel } if r.ko.Spec.Tags != nil { - f4 := map[string]*string{} - for f4key, f4valiter := range r.ko.Spec.Tags { - var f4val string - f4val = *f4valiter - f4[f4key] = &f4val - } - res.SetTags(f4) + res.Tags = aws.ToStringMap(r.ko.Spec.Tags) } if r.ko.Spec.VirtualClusterID != nil { - res.SetVirtualClusterId(*r.ko.Spec.VirtualClusterID) + res.VirtualClusterId = r.ko.Spec.VirtualClusterID } return res, nil @@ -347,7 +323,7 @@ func (rm *resourceManager) sdkDelete( } var resp *svcsdk.CancelJobRunOutput _ = resp - resp, err = rm.sdkapi.CancelJobRunWithContext(ctx, input) + resp, err = rm.sdkapi.CancelJobRun(ctx, input) rm.metrics.RecordAPICall("DELETE", "CancelJobRun", err) return nil, err } @@ -360,10 +336,10 @@ func (rm *resourceManager) newDeleteRequestPayload( res := &svcsdk.CancelJobRunInput{} if r.ko.Status.ID != nil { - res.SetId(*r.ko.Status.ID) + res.Id = r.ko.Status.ID } if r.ko.Spec.VirtualClusterID != nil { - res.SetVirtualClusterId(*r.ko.Spec.VirtualClusterID) + res.VirtualClusterId = r.ko.Spec.VirtualClusterID } return res, nil @@ -476,11 +452,12 @@ func (rm *resourceManager) terminalAWSError(err error) bool { if err == nil { return false } - awsErr, ok := ackerr.AWSError(err) - if !ok { + + var terminalErr smithy.APIError + if !errors.As(err, &terminalErr) { return false } - switch awsErr.Code() { + switch terminalErr.ErrorCode() { case "ValidationException": return true default: diff --git a/pkg/resource/virtual_cluster/manager.go b/pkg/resource/virtual_cluster/manager.go index 7082425..bf73f7d 100644 --- a/pkg/resource/virtual_cluster/manager.go +++ b/pkg/resource/virtual_cluster/manager.go @@ -32,9 +32,8 @@ import ( acktags "github.com/aws-controllers-k8s/runtime/pkg/tags" acktypes "github.com/aws-controllers-k8s/runtime/pkg/types" ackutil "github.com/aws-controllers-k8s/runtime/pkg/util" - "github.com/aws/aws-sdk-go/aws/session" - svcsdk "github.com/aws/aws-sdk-go/service/emrcontainers" - svcsdkapi "github.com/aws/aws-sdk-go/service/emrcontainers/emrcontainersiface" + "github.com/aws/aws-sdk-go-v2/aws" + svcsdk "github.com/aws/aws-sdk-go-v2/service/emrcontainers" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" @@ -59,6 +58,9 @@ type resourceManager struct { // cfg is a copy of the ackcfg.Config object passed on start of the service // controller cfg ackcfg.Config + // clientcfg is a copy of the client configuration passed on start of the + // service controller + clientcfg aws.Config // log refers to the logr.Logger object handling logging for the service // controller log logr.Logger @@ -73,12 +75,9 @@ type resourceManager struct { awsAccountID ackv1alpha1.AWSAccountID // The AWS Region that this resource manager targets awsRegion ackv1alpha1.AWSRegion - // sess is the AWS SDK Session object used to communicate with the backend - // AWS service API - sess *session.Session - // sdk is a pointer to the AWS service API interface exposed by the - // aws-sdk-go/services/{alias}/{alias}iface package. - sdkapi svcsdkapi.EMRContainersAPI + // sdk is a pointer to the AWS service API client exposed by the + // aws-sdk-go-v2/services/{alias} package. + sdkapi *svcsdk.Client } // concreteResource returns a pointer to a resource from the supplied @@ -299,24 +298,25 @@ func (rm *resourceManager) EnsureTags( // newResourceManager returns a new struct implementing // acktypes.AWSResourceManager +// This is for AWS-SDK-GO-V2 - Created newResourceManager With AWS sdk-Go-ClientV2 func newResourceManager( cfg ackcfg.Config, + clientcfg aws.Config, log logr.Logger, metrics *ackmetrics.Metrics, rr acktypes.Reconciler, - sess *session.Session, id ackv1alpha1.AWSAccountID, region ackv1alpha1.AWSRegion, ) (*resourceManager, error) { return &resourceManager{ cfg: cfg, + clientcfg: clientcfg, log: log, metrics: metrics, rr: rr, awsAccountID: id, awsRegion: region, - sess: sess, - sdkapi: svcsdk.New(sess), + sdkapi: svcsdk.NewFromConfig(clientcfg), }, nil } diff --git a/pkg/resource/virtual_cluster/manager_factory.go b/pkg/resource/virtual_cluster/manager_factory.go index f6c99b2..4c965c1 100644 --- a/pkg/resource/virtual_cluster/manager_factory.go +++ b/pkg/resource/virtual_cluster/manager_factory.go @@ -23,7 +23,7 @@ import ( ackcfg "github.com/aws-controllers-k8s/runtime/pkg/config" ackmetrics "github.com/aws-controllers-k8s/runtime/pkg/metrics" acktypes "github.com/aws-controllers-k8s/runtime/pkg/types" - "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/go-logr/logr" svcresource "github.com/aws-controllers-k8s/emrcontainers-controller/pkg/resource" @@ -47,10 +47,10 @@ func (f *resourceManagerFactory) ResourceDescriptor() acktypes.AWSResourceDescri // supplied AWS account func (f *resourceManagerFactory) ManagerFor( cfg ackcfg.Config, + clientcfg aws.Config, log logr.Logger, metrics *ackmetrics.Metrics, rr acktypes.Reconciler, - sess *session.Session, id ackv1alpha1.AWSAccountID, region ackv1alpha1.AWSRegion, roleARN ackv1alpha1.AWSResourceName, @@ -70,7 +70,7 @@ func (f *resourceManagerFactory) ManagerFor( f.Lock() defer f.Unlock() - rm, err := newResourceManager(cfg, log, metrics, rr, sess, id, region) + rm, err := newResourceManager(cfg, clientcfg, log, metrics, rr, id, region) if err != nil { return nil, err } diff --git a/pkg/resource/virtual_cluster/references.go b/pkg/resource/virtual_cluster/references.go index 31ecbf4..f36d09b 100644 --- a/pkg/resource/virtual_cluster/references.go +++ b/pkg/resource/virtual_cluster/references.go @@ -17,6 +17,7 @@ package virtual_cluster import ( "context" + "sigs.k8s.io/controller-runtime/pkg/client" acktypes "github.com/aws-controllers-k8s/runtime/pkg/types" diff --git a/pkg/resource/virtual_cluster/sdk.go b/pkg/resource/virtual_cluster/sdk.go index 57cd073..6ca42f3 100644 --- a/pkg/resource/virtual_cluster/sdk.go +++ b/pkg/resource/virtual_cluster/sdk.go @@ -28,8 +28,10 @@ import ( ackerr "github.com/aws-controllers-k8s/runtime/pkg/errors" ackrequeue "github.com/aws-controllers-k8s/runtime/pkg/requeue" ackrtlog "github.com/aws-controllers-k8s/runtime/pkg/runtime/log" - "github.com/aws/aws-sdk-go/aws" - svcsdk "github.com/aws/aws-sdk-go/service/emrcontainers" + "github.com/aws/aws-sdk-go-v2/aws" + svcsdk "github.com/aws/aws-sdk-go-v2/service/emrcontainers" + svcsdktypes "github.com/aws/aws-sdk-go-v2/service/emrcontainers/types" + smithy "github.com/aws/smithy-go" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -40,8 +42,7 @@ import ( var ( _ = &metav1.Time{} _ = strings.ToLower("") - _ = &aws.JSONValue{} - _ = &svcsdk.EMRContainers{} + _ = &svcsdk.Client{} _ = &svcapitypes.VirtualCluster{} _ = ackv1alpha1.AWSAccountID("") _ = &ackerr.NotFound @@ -49,6 +50,7 @@ var ( _ = &reflect.Value{} _ = fmt.Sprintf("") _ = &ackrequeue.NoRequeue{} + _ = &aws.Config{} ) // sdkFind returns SDK-specific information about a supplied resource @@ -74,13 +76,11 @@ func (rm *resourceManager) sdkFind( } var resp *svcsdk.DescribeVirtualClusterOutput - resp, err = rm.sdkapi.DescribeVirtualClusterWithContext(ctx, input) + resp, err = rm.sdkapi.DescribeVirtualCluster(ctx, input) rm.metrics.RecordAPICall("READ_ONE", "DescribeVirtualCluster", err) if err != nil { - if reqErr, ok := ackerr.AWSRequestFailure(err); ok && reqErr.StatusCode() == 404 { - return nil, ackerr.NotFound - } - if awsErr, ok := ackerr.AWSError(err); ok && awsErr.Code() == "UNKNOWN" { + var awsErr smithy.APIError + if errors.As(err, &awsErr) && awsErr.ErrorCode() == "UNKNOWN" { return nil, ackerr.NotFound } return nil, err @@ -104,17 +104,21 @@ func (rm *resourceManager) sdkFind( } if resp.VirtualCluster.ContainerProvider.Info != nil { f1f1 := &svcapitypes.ContainerInfo{} - if resp.VirtualCluster.ContainerProvider.Info.EksInfo != nil { - f1f1f0 := &svcapitypes.EKSInfo{} - if resp.VirtualCluster.ContainerProvider.Info.EksInfo.Namespace != nil { - f1f1f0.Namespace = resp.VirtualCluster.ContainerProvider.Info.EksInfo.Namespace + switch resp.VirtualCluster.ContainerProvider.Info.(type) { + case *svcsdktypes.ContainerInfoMemberEksInfo: + f1f1f0 := resp.VirtualCluster.ContainerProvider.Info.(*svcsdktypes.ContainerInfoMemberEksInfo) + if f1f1f0 != nil { + f1f1f0f0 := &svcapitypes.EKSInfo{} + if f1f1f0.Value.Namespace != nil { + f1f1f0f0.Namespace = f1f1f0.Value.Namespace + } + f1f1.EKSInfo = f1f1f0f0 } - f1f1.EKSInfo = f1f1f0 } f1.Info = f1f1 } - if resp.VirtualCluster.ContainerProvider.Type != nil { - f1.Type = resp.VirtualCluster.ContainerProvider.Type + if resp.VirtualCluster.ContainerProvider.Type != "" { + f1.Type = aws.String(string(resp.VirtualCluster.ContainerProvider.Type)) } ko.Spec.ContainerProvider = f1 } else { @@ -131,13 +135,7 @@ func (rm *resourceManager) sdkFind( ko.Spec.Name = nil } if resp.VirtualCluster.Tags != nil { - f6 := map[string]*string{} - for f6key, f6valiter := range resp.VirtualCluster.Tags { - var f6val string - f6val = *f6valiter - f6[f6key] = &f6val - } - ko.Spec.Tags = f6 + ko.Spec.Tags = aws.StringMap(resp.VirtualCluster.Tags) } else { ko.Spec.Tags = nil } @@ -164,7 +162,7 @@ func (rm *resourceManager) newDescribeRequestPayload( res := &svcsdk.DescribeVirtualClusterInput{} if r.ko.Status.ID != nil { - res.SetId(*r.ko.Status.ID) + res.Id = r.ko.Status.ID } return res, nil @@ -187,9 +185,29 @@ func (rm *resourceManager) sdkCreate( return nil, err } + if input.ContainerProvider != nil { + // Clear any existing Info + if input.ContainerProvider.Info != nil { + input.ContainerProvider.Info = nil + } + + // Set the Info field if it exists in the spec + eksInfo := &svcsdktypes.EksInfo{} + if desired.ko.Spec.ContainerProvider.Info != nil && + desired.ko.Spec.ContainerProvider.Info.EKSInfo != nil && + desired.ko.Spec.ContainerProvider.Info.EKSInfo.Namespace != nil { + eksInfo.Namespace = desired.ko.Spec.ContainerProvider.Info.EKSInfo.Namespace + } else { + eksInfo.Namespace = aws.String("default") + } + input.ContainerProvider.Info = &svcsdktypes.ContainerInfoMemberEksInfo{ + Value: *eksInfo, + } + } + var resp *svcsdk.CreateVirtualClusterOutput _ = resp - resp, err = rm.sdkapi.CreateVirtualClusterWithContext(ctx, input) + resp, err = rm.sdkapi.CreateVirtualCluster(ctx, input) rm.metrics.RecordAPICall("CREATE", "CreateVirtualCluster", err) if err != nil { return nil, err @@ -229,37 +247,36 @@ func (rm *resourceManager) newCreateRequestPayload( res := &svcsdk.CreateVirtualClusterInput{} if r.ko.Spec.ContainerProvider != nil { - f0 := &svcsdk.ContainerProvider{} + f0 := &svcsdktypes.ContainerProvider{} if r.ko.Spec.ContainerProvider.ID != nil { - f0.SetId(*r.ko.Spec.ContainerProvider.ID) + f0.Id = r.ko.Spec.ContainerProvider.ID } if r.ko.Spec.ContainerProvider.Info != nil { - f0f1 := &svcsdk.ContainerInfo{} + var f0f1 svcsdktypes.ContainerInfo + isInterfaceSet := false if r.ko.Spec.ContainerProvider.Info.EKSInfo != nil { - f0f1f0 := &svcsdk.EksInfo{} + if isInterfaceSet { + return nil, ackerr.NewTerminalError(fmt.Errorf("can only set one of the members for EksInfo")) + } + f0f1f0Parent := &svcsdktypes.ContainerInfoMemberEksInfo{} + f0f1f0 := &svcsdktypes.EksInfo{} if r.ko.Spec.ContainerProvider.Info.EKSInfo.Namespace != nil { - f0f1f0.SetNamespace(*r.ko.Spec.ContainerProvider.Info.EKSInfo.Namespace) + f0f1f0.Namespace = r.ko.Spec.ContainerProvider.Info.EKSInfo.Namespace } - f0f1.SetEksInfo(f0f1f0) + f0f1f0Parent.Value = *f0f1f0 } - f0.SetInfo(f0f1) + f0.Info = f0f1 } if r.ko.Spec.ContainerProvider.Type != nil { - f0.SetType(*r.ko.Spec.ContainerProvider.Type) + f0.Type = svcsdktypes.ContainerProviderType(*r.ko.Spec.ContainerProvider.Type) } - res.SetContainerProvider(f0) + res.ContainerProvider = f0 } if r.ko.Spec.Name != nil { - res.SetName(*r.ko.Spec.Name) + res.Name = r.ko.Spec.Name } if r.ko.Spec.Tags != nil { - f2 := map[string]*string{} - for f2key, f2valiter := range r.ko.Spec.Tags { - var f2val string - f2val = *f2valiter - f2[f2key] = &f2val - } - res.SetTags(f2) + res.Tags = aws.ToStringMap(r.ko.Spec.Tags) } return res, nil @@ -292,7 +309,7 @@ func (rm *resourceManager) sdkDelete( } var resp *svcsdk.DeleteVirtualClusterOutput _ = resp - resp, err = rm.sdkapi.DeleteVirtualClusterWithContext(ctx, input) + resp, err = rm.sdkapi.DeleteVirtualCluster(ctx, input) rm.metrics.RecordAPICall("DELETE", "DeleteVirtualCluster", err) return nil, err } @@ -305,7 +322,7 @@ func (rm *resourceManager) newDeleteRequestPayload( res := &svcsdk.DeleteVirtualClusterInput{} if r.ko.Status.ID != nil { - res.SetId(*r.ko.Status.ID) + res.Id = r.ko.Status.ID } return res, nil @@ -413,11 +430,12 @@ func (rm *resourceManager) terminalAWSError(err error) bool { if err == nil { return false } - awsErr, ok := ackerr.AWSError(err) - if !ok { + + var terminalErr smithy.APIError + if !errors.As(err, &terminalErr) { return false } - switch awsErr.Code() { + switch terminalErr.ErrorCode() { case "ValidationException", "ResourceNotFoundException", "InternalServerException": diff --git a/templates/hooks/virtual_cluster/sdk_create_post_build_request.go.tpl b/templates/hooks/virtual_cluster/sdk_create_post_build_request.go.tpl new file mode 100644 index 0000000..6b22014 --- /dev/null +++ b/templates/hooks/virtual_cluster/sdk_create_post_build_request.go.tpl @@ -0,0 +1,20 @@ + + if input.ContainerProvider != nil { + // Clear any existing Info + if input.ContainerProvider.Info != nil { + input.ContainerProvider.Info = nil + } + + // Set the Info field if it exists in the spec + eksInfo := &svcsdktypes.EksInfo{} + if desired.ko.Spec.ContainerProvider.Info != nil && + desired.ko.Spec.ContainerProvider.Info.EKSInfo != nil && + desired.ko.Spec.ContainerProvider.Info.EKSInfo.Namespace != nil { + eksInfo.Namespace = desired.ko.Spec.ContainerProvider.Info.EKSInfo.Namespace + } else { + eksInfo.Namespace = aws.String("default") + } + input.ContainerProvider.Info = &svcsdktypes.ContainerInfoMemberEksInfo{ + Value: *eksInfo, + } + } \ No newline at end of file diff --git a/test/e2e/tests/test_jobrun.py b/test/e2e/tests/test_jobrun.py index d501588..57422df 100644 --- a/test/e2e/tests/test_jobrun.py +++ b/test/e2e/tests/test_jobrun.py @@ -4,7 +4,7 @@ # not use this file except in compliance with the License. A copy of the # License is located at # -# http://aws.amazon.com/apache2.0/ +# http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either @@ -37,18 +37,67 @@ # Time to wait after the zone has changed status, for the CR to update CHECK_STATUS_WAIT_SECONDS = 180 +# Maximum time to wait for EKS cluster to be active (5 minutes) +MAX_EKS_WAIT_SECONDS = 300 + + +@pytest.fixture +def eks_client(): + return boto3.client("eks") + + +def wait_for_eks_cluster_active(eks_client, cluster_name: str, max_wait_seconds: int = MAX_EKS_WAIT_SECONDS) -> bool: + """Wait for EKS cluster to be in ACTIVE status + + Args: + eks_client: boto3 EKS client + cluster_name: Name of the EKS cluster + max_wait_seconds: Maximum time to wait in seconds + + Returns: + bool: True if cluster is active, False if timeout reached + """ + start_time = time.time() + while (time.time() - start_time) < max_wait_seconds: + try: + response = eks_client.describe_cluster(name=cluster_name) + status = response['cluster']['status'] + if status == 'ACTIVE': + return True + elif status in ['FAILED', 'DELETING', 'DELETED']: + logging.error( + f"EKS cluster {cluster_name} in terminal state: {status}") + return False + logging.info(f"Waiting for EKS cluster to be active.") + if not wait_for_eks_cluster_active(eks_client, cluster_name): + pytest.fail(f"EKS cluster did not become active within {MAX_EKS_WAIT_SECONDS} seconds") + except eks_client.exceptions.ResourceNotFoundException: + logging.error(f"EKS cluster not found") + return False + except Exception as e: + logging.warning(f"Error checking EKS cluster status: {str(e)}") + time.sleep(30) + return False + + @pytest.fixture def iam_client(): return boto3.client("iam") + @pytest.fixture -def jobrun(): +def jobrun(eks_client): virtual_cluster_name = random_suffix_name("emr-virtual-cluster", 32) job_run_name = random_suffix_name("emr-job-run", 32) + # Wait for EKS cluster to be active before proceeding + eks_cluster_name = get_bootstrap_resources().HostCluster_JR.cluster.name + if not wait_for_eks_cluster_active(eks_client, eks_cluster_name): + pytest.fail(f"EKS cluster {eks_cluster_name} did not become active within {MAX_EKS_WAIT_SECONDS}seconds") + replacements = REPLACEMENT_VALUES.copy() replacements["VIRTUALCLUSTER_NAME"] = virtual_cluster_name - replacements["EKS_CLUSTER_NAME"] = get_bootstrap_resources().HostCluster_JR.cluster.name + replacements["EKS_CLUSTER_NAME"] = eks_cluster_name resource_data = load_resource( "emr_virtual_cluster", @@ -61,11 +110,34 @@ def jobrun(): CRD_GROUP, CRD_VERSION, VC_RESOURCE_PLURAL, virtual_cluster_name, namespace="default", ) - k8s.create_custom_resource(vc_ref, resource_data) - vc_cr = k8s.wait_resource_consumed_by_controller(vc_ref) + + # Add retry mechanism for VirtualCluster creation + max_retries = 5 + retry_delay = 30 # seconds + for attempt in range(max_retries): + try: + k8s.create_custom_resource(vc_ref, resource_data) + vc_cr = k8s.wait_resource_consumed_by_controller(vc_ref) + + # Check if the resource exists and has an ID + if vc_cr is not None and k8s.get_resource_exists(vc_ref): + if "status" in vc_cr and "id" in vc_cr["status"]: + break + + # If we get here, the creation succeeded but ID is not set + # Delete and retry + k8s.delete_custom_resource(vc_ref, 3, 10) + + except Exception as e: + logging.warning(f"Attempt {attempt + 1} failed: {str(e)}") + if attempt < max_retries - 1: + time.sleep(retry_delay) + continue + raise assert vc_cr is not None assert k8s.get_resource_exists(vc_ref) + assert "status" in vc_cr and "id" in vc_cr["status"], "VirtualCluster status.id not set after retries" virtual_cluster_id = vc_cr["status"]["id"] emr_release_label = "emr-6.3.0-latest" @@ -116,6 +188,7 @@ def jobrun(): except: pass + @service_marker @pytest.mark.canary class Test_JobRun: @@ -145,7 +218,8 @@ def check_if_statement_exists(self, expected_statement, actual_assume_role_docum existing_statements = actual_assume_role_document.get("Statement", []) for existing_statement in existing_statements: - matches = self.check_if_dict_matches(expected_statement, existing_statement) + matches = self.check_if_dict_matches( + expected_statement, existing_statement) if matches: return True return False @@ -175,12 +249,11 @@ def update_assume_role(self, oidc_provider_arn, iam_client): oidc_provider = oidc_provider_arn.split('oidc-provider/')[1] emr_namespace = "emr-ns" base36_encoded_role_name = self.base36_encode(job_execution_role_name) - print("base36_encoded_role_name =", base36_encoded_role_name) account_id = get_account_id() LOG = logging.getLogger(__name__) TRUST_POLICY_STATEMENT_ALREADY_EXISTS = "Trust policy statement already " \ - "exists for role %s. No changes " \ - "were made!" + "exists for role %s. No changes " \ + "were made!" TRUST_POLICY_UPDATE_SUCCESSFUL = "Successfully updated trust policy of role %s" TRUST_POLICY_STATEMENT_FORMAT = '{ \ "Effect": "Allow", \ @@ -198,33 +271,31 @@ def update_assume_role(self, oidc_provider_arn, iam_client): }' job_execution_trust_policy = json.loads(TRUST_POLICY_STATEMENT_FORMAT % { - "AWS_ACCOUNT_ID": account_id, - "OIDC_PROVIDER_ARN": oidc_provider_arn, - "OIDC_PROVIDER": oidc_provider, - "NAMESPACE": emr_namespace, - "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name - }) - - # assume_role_document = self.get_assume_role_policy(iam_client, job_execution_role_name) - assume_role_policy = iam_client.get_role(RoleName=job_execution_role_name) - assume_role_document = assume_role_policy.get("Role").get("AssumeRolePolicyDocument") - print("assume_role_document =", assume_role_document) + "AWS_ACCOUNT_ID": account_id, + "OIDC_PROVIDER_ARN": oidc_provider_arn, + "OIDC_PROVIDER": oidc_provider, + "NAMESPACE": emr_namespace, + "BASE36_ENCODED_ROLE_NAME": base36_encoded_role_name + }) + + assume_role_policy = iam_client.get_role( + RoleName=job_execution_role_name) + assume_role_document = assume_role_policy.get( + "Role").get("AssumeRolePolicyDocument") matches = self.check_if_statement_exists(job_execution_trust_policy, - assume_role_document) + assume_role_document) if not matches: - LOG.debug('Role %s does not have the required trust policy ', - job_execution_role_name) existing_statements = assume_role_document.get("Statement") - print("existing_statements =", existing_statements) if existing_statements is None: - assume_role_document["Statement"] = [job_execution_trust_policy] + assume_role_document["Statement"] = [ + job_execution_trust_policy] else: existing_statements.append(job_execution_trust_policy) - LOG.debug('Updating trust policy of role %s', job_execution_role_name) - iam_client.update_assume_role_policy(RoleName=job_execution_role_name, PolicyDocument=json.dumps(assume_role_document)) + iam_client.update_assume_role_policy( + RoleName=job_execution_role_name, PolicyDocument=json.dumps(assume_role_document)) return TRUST_POLICY_UPDATE_SUCCESSFUL % job_execution_role_name else: return TRUST_POLICY_STATEMENT_ALREADY_EXISTS % job_execution_role_name @@ -246,29 +317,33 @@ def test_create_delete_jobrun(self, jobrun, emrcontainers_client, iam_client): assert jobrun_id try: - aws_res = emrcontainers_client.describe_job_run(id=jobrun_id,virtualClusterId=virtual_cluster_id) + aws_res = emrcontainers_client.describe_job_run( + id=jobrun_id, virtualClusterId=virtual_cluster_id) assert aws_res is not None except emrcontainers_client.exceptions.ResourceNotFoundException: - pytest.fail(f"Could not find job run with ID '{jobrun_id}' in EMR on EKS") + pytest.fail(f"Could not find job run with ID in EMR on EKS") # delete oidc provider try: - aws_res = iam_client.delete_open_id_connect_provider(OpenIDConnectProviderArn=oidc_provider_arn) + aws_res = iam_client.delete_open_id_connect_provider( + OpenIDConnectProviderArn=oidc_provider_arn) assert aws_res is not None except iam_client.exceptions.InvalidInputException: pytest.fail(f"Could not delete oidc identity provider") # check if JobRun is deleted try: - jr_deleted = emrcontainers_client.describe_job_run(id=jobrun_id,virtualClusterId=virtual_cluster_id) + jr_deleted = emrcontainers_client.describe_job_run( + id=jobrun_id, virtualClusterId=virtual_cluster_id) logging.debug('%s is deleted during cleanup', jobrun_id) assert jr_deleted except: logging.debug('some resources such as %s did not cleanup as expected', jobrun_id) - + # check if VirtualCluster is deleted try: - vc_deleted = emrcontainers_client.describe_virtual_cluster(id=virtual_cluster_id) + vc_deleted = emrcontainers_client.describe_virtual_cluster( + id=virtual_cluster_id) logging.debug('%s is deleted during cleanup', virtual_cluster_id) assert vc_deleted except: