From 6b72fd6b8d0f172bd64d967f13ec4cade51dba7e Mon Sep 17 00:00:00 2001 From: Jan Kumor Date: Wed, 2 Feb 2022 15:40:38 -0800 Subject: [PATCH] Spark and all pods in Flyte can now have global DNS config (#235) Signed-off-by: Jan Kumor Co-authored-by: Jan Kumor --- go/tasks/config_load_test.go | 21 +++++++++ .../pluginmachinery/flytek8s/config/config.go | 4 ++ .../pluginmachinery/flytek8s/pod_helper.go | 3 ++ .../flytek8s/pod_helper_test.go | 43 +++++++++++++++++ go/tasks/plugins/k8s/spark/spark.go | 2 + go/tasks/plugins/k8s/spark/spark_test.go | 46 +++++++++++++++++++ go/tasks/testdata/config.yaml | 16 +++++++ 7 files changed, 135 insertions(+) diff --git a/go/tasks/config_load_test.go b/go/tasks/config_load_test.go index 5cbad71221..34c0b95cce 100755 --- a/go/tasks/config_load_test.go +++ b/go/tasks/config_load_test.go @@ -89,6 +89,27 @@ func TestLoadConfig(t *testing.T) { assert.False(t, *k8sConfig.DefaultSecurityContext.AllowPrivilegeEscalation) assert.NotNil(t, k8sConfig.EnableHostNetworkingPod) assert.True(t, *k8sConfig.EnableHostNetworkingPod) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[0].Name) + assert.Equal(t, "ndots", k8sConfig.DefaultPodDNSConfig.Options[0].Name) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[0].Value) + assert.Equal(t, "1", *k8sConfig.DefaultPodDNSConfig.Options[0].Value) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[1].Name) + assert.Equal(t, "single-request-reopen", k8sConfig.DefaultPodDNSConfig.Options[1].Name) + assert.Nil(t, k8sConfig.DefaultPodDNSConfig.Options[1].Value) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[2].Name) + assert.Equal(t, "timeout", k8sConfig.DefaultPodDNSConfig.Options[2].Name) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[2].Value) + assert.Equal(t, "1", *k8sConfig.DefaultPodDNSConfig.Options[2].Value) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[3].Name) + assert.Equal(t, "attempts", k8sConfig.DefaultPodDNSConfig.Options[3].Name) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[3].Value) + assert.Equal(t, "3", *k8sConfig.DefaultPodDNSConfig.Options[3].Value) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Nameservers) + assert.Equal(t, []string{"8.8.8.8", "8.8.4.4"}, k8sConfig.DefaultPodDNSConfig.Nameservers) + assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Searches) + assert.Equal(t, []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"}, k8sConfig.DefaultPodDNSConfig.Searches) }) t.Run("logs-config-test", func(t *testing.T) { diff --git a/go/tasks/pluginmachinery/flytek8s/config/config.go b/go/tasks/pluginmachinery/flytek8s/config/config.go index e53fad4736..96f4c6201e 100755 --- a/go/tasks/pluginmachinery/flytek8s/config/config.go +++ b/go/tasks/pluginmachinery/flytek8s/config/config.go @@ -144,6 +144,10 @@ type K8sPluginConfig struct { // Refer to - https://kubernetes.io/docs/concepts/policy/pod-security-policy/#host-namespaces. // As a follow up, the default pod configurations will now be adjusted using podTemplates per namespace EnableHostNetworkingPod *bool `json:"enable-host-networking-pod" pflag:"-,If true, will schedule all pods with hostNetwork: true."` + + // DefaultPodDNSConfig provides a default pod DNS config that that should be applied for the primary container launched and created by FlytePropeller. This may not be applicable to all plugins. For + // // downstream plugins - i.e. TensorflowOperators may not support setting this. + DefaultPodDNSConfig *v1.PodDNSConfig `json:"default-pod-dns-config" pflag:"-,Optionally specify a default DNS config that should be applied to every Pod launched by FlytePropeller."` } // FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper.go b/go/tasks/pluginmachinery/flytek8s/pod_helper.go index be5ca90925..d0da5a42bf 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper.go @@ -94,6 +94,9 @@ func UpdatePodWithInterruptibleFlag(taskExecutionMetadata pluginsCore.TaskExecut if config.GetK8sPluginConfig().EnableHostNetworkingPod != nil { podSpec.HostNetwork = *config.GetK8sPluginConfig().EnableHostNetworkingPod } + if podSpec.DNSConfig == nil && config.GetK8sPluginConfig().DefaultPodDNSConfig != nil { + podSpec.DNSConfig = config.GetK8sPluginConfig().DefaultPodDNSConfig.DeepCopy() + } ApplyInterruptibleNodeAffinity(isInterruptible, podSpec) } diff --git a/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go b/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go index 80b03da54a..7f63aed133 100755 --- a/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go +++ b/go/tasks/pluginmachinery/flytek8s/pod_helper_test.go @@ -524,6 +524,49 @@ func TestToK8sPod(t *testing.T) { assert.NoError(t, err) assert.False(t, p.HostNetwork) }) + + t.Run("default-pod-dns-config", func(t *testing.T) { + val1 := "1" + val2 := "1" + val3 := "3" + assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ + DefaultPodDNSConfig: &v1.PodDNSConfig{ + Nameservers: []string{"8.8.8.8", "8.8.4.4"}, + Options: []v1.PodDNSConfigOption{ + { + Name: "ndots", + Value: &val1, + }, + { + Name: "single-request-reopen", + }, + { + Name: "timeout", + Value: &val2, + }, + { + Name: "attempts", + Value: &val3, + }, + }, + Searches: []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"}, + }, + })) + + x := dummyExecContext(&v1.ResourceRequirements{}) + p, err := ToK8sPodSpec(ctx, x) + assert.NoError(t, err) + assert.NotNil(t, p.DNSConfig) + assert.Equal(t, []string{"8.8.8.8", "8.8.4.4"}, p.DNSConfig.Nameservers) + assert.Equal(t, "ndots", p.DNSConfig.Options[0].Name) + assert.Equal(t, val1, *p.DNSConfig.Options[0].Value) + assert.Equal(t, "single-request-reopen", p.DNSConfig.Options[1].Name) + assert.Equal(t, "timeout", p.DNSConfig.Options[2].Name) + assert.Equal(t, val2, *p.DNSConfig.Options[2].Value) + assert.Equal(t, "attempts", p.DNSConfig.Options[3].Name) + assert.Equal(t, val3, *p.DNSConfig.Options[3].Value) + assert.Equal(t, []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"}, p.DNSConfig.Searches) + }) } func TestDemystifyPending(t *testing.T) { diff --git a/go/tasks/plugins/k8s/spark/spark.go b/go/tasks/plugins/k8s/spark/spark.go index fc8007a62f..60be295535 100755 --- a/go/tasks/plugins/k8s/spark/spark.go +++ b/go/tasks/plugins/k8s/spark/spark.go @@ -104,6 +104,7 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo EnvVars: sparkEnvVars, Image: &container.Image, SecurityContenxt: config.GetK8sPluginConfig().DefaultPodSecurityContext.DeepCopy(), + DNSConfig: config.GetK8sPluginConfig().DefaultPodDNSConfig.DeepCopy(), }, ServiceAccount: &serviceAccountName, } @@ -115,6 +116,7 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo Image: &container.Image, EnvVars: sparkEnvVars, SecurityContenxt: config.GetK8sPluginConfig().DefaultPodSecurityContext.DeepCopy(), + DNSConfig: config.GetK8sPluginConfig().DefaultPodDNSConfig.DeepCopy(), }, } diff --git a/go/tasks/plugins/k8s/spark/spark_test.go b/go/tasks/plugins/k8s/spark/spark_test.go index 17dad17716..13721aee63 100755 --- a/go/tasks/plugins/k8s/spark/spark_test.go +++ b/go/tasks/plugins/k8s/spark/spark_test.go @@ -350,10 +350,34 @@ func TestBuildResourceSpark(t *testing.T) { // Set Interruptible Config runAsUser := int64(1000) + dnsOptVal1 := "1" + dnsOptVal2 := "1" + dnsOptVal3 := "3" assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{ DefaultPodSecurityContext: &corev1.PodSecurityContext{ RunAsUser: &runAsUser, }, + DefaultPodDNSConfig: &corev1.PodDNSConfig{ + Nameservers: []string{"8.8.8.8", "8.8.4.4"}, + Options: []corev1.PodDNSConfigOption{ + { + Name: "ndots", + Value: &dnsOptVal1, + }, + { + Name: "single-request-reopen", + }, + { + Name: "timeout", + Value: &dnsOptVal2, + }, + { + Name: "attempts", + Value: &dnsOptVal3, + }, + }, + Searches: []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"}, + }, InterruptibleNodeSelector: map[string]string{ "x/interruptible": "true", }, @@ -379,6 +403,28 @@ func TestBuildResourceSpark(t *testing.T) { assert.Equal(t, testImage, *sparkApp.Spec.Image) assert.NotNil(t, sparkApp.Spec.Driver.SparkPodSpec.SecurityContenxt) assert.Equal(t, *sparkApp.Spec.Driver.SparkPodSpec.SecurityContenxt.RunAsUser, runAsUser) + assert.NotNil(t, sparkApp.Spec.Driver.DNSConfig) + assert.Equal(t, []string{"8.8.8.8", "8.8.4.4"}, sparkApp.Spec.Driver.DNSConfig.Nameservers) + assert.Equal(t, "ndots", sparkApp.Spec.Driver.DNSConfig.Options[0].Name) + assert.Equal(t, dnsOptVal1, *sparkApp.Spec.Driver.DNSConfig.Options[0].Value) + assert.Equal(t, "single-request-reopen", sparkApp.Spec.Driver.DNSConfig.Options[1].Name) + assert.Equal(t, "timeout", sparkApp.Spec.Driver.DNSConfig.Options[2].Name) + assert.Equal(t, dnsOptVal2, *sparkApp.Spec.Driver.DNSConfig.Options[2].Value) + assert.Equal(t, "attempts", sparkApp.Spec.Driver.DNSConfig.Options[3].Name) + assert.Equal(t, dnsOptVal3, *sparkApp.Spec.Driver.DNSConfig.Options[3].Value) + assert.Equal(t, []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"}, sparkApp.Spec.Driver.DNSConfig.Searches) + assert.NotNil(t, sparkApp.Spec.Executor.SparkPodSpec.SecurityContenxt) + assert.Equal(t, *sparkApp.Spec.Executor.SparkPodSpec.SecurityContenxt.RunAsUser, runAsUser) + assert.NotNil(t, sparkApp.Spec.Executor.DNSConfig) + assert.NotNil(t, sparkApp.Spec.Executor.DNSConfig) + assert.Equal(t, "ndots", sparkApp.Spec.Executor.DNSConfig.Options[0].Name) + assert.Equal(t, dnsOptVal1, *sparkApp.Spec.Executor.DNSConfig.Options[0].Value) + assert.Equal(t, "single-request-reopen", sparkApp.Spec.Executor.DNSConfig.Options[1].Name) + assert.Equal(t, "timeout", sparkApp.Spec.Executor.DNSConfig.Options[2].Name) + assert.Equal(t, dnsOptVal2, *sparkApp.Spec.Executor.DNSConfig.Options[2].Value) + assert.Equal(t, "attempts", sparkApp.Spec.Executor.DNSConfig.Options[3].Name) + assert.Equal(t, dnsOptVal3, *sparkApp.Spec.Executor.DNSConfig.Options[3].Value) + assert.Equal(t, []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"}, sparkApp.Spec.Executor.DNSConfig.Searches) //Validate Driver/Executor Spec. driverCores, _ := strconv.ParseInt(dummySparkConf["spark.driver.cores"], 10, 32) diff --git a/go/tasks/testdata/config.yaml b/go/tasks/testdata/config.yaml index 208e4df49e..11a0f10a24 100755 --- a/go/tasks/testdata/config.yaml +++ b/go/tasks/testdata/config.yaml @@ -57,6 +57,22 @@ plugins: default-security-context: allowPrivilegeEscalation: false enable-host-networking-pod: true + default-pod-dns-config: + options: + - name: "ndots" + value: "1" + - name: "single-request-reopen" + - name: "timeout" + value: "1" + - name: "attempts" + value: "3" + nameservers: + - "8.8.8.8" + - "8.8.4.4" + searches: + - "ns1.svc.cluster-domain.example" + - "my.dns.search.suffix" + # Spark Plugin configuration spark: spark-config-default: