Skip to content

Commit

Permalink
Spark and all pods in Flyte can now have global DNS config (flyteorg#235
Browse files Browse the repository at this point in the history
)

Signed-off-by: Jan Kumor <[email protected]>

Co-authored-by: Jan Kumor <[email protected]>
  • Loading branch information
akumor and jan-zip authored Feb 2, 2022
1 parent 20088ca commit 6b72fd6
Show file tree
Hide file tree
Showing 7 changed files with 135 additions and 0 deletions.
21 changes: 21 additions & 0 deletions go/tasks/config_load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,27 @@ func TestLoadConfig(t *testing.T) {
assert.False(t, *k8sConfig.DefaultSecurityContext.AllowPrivilegeEscalation)
assert.NotNil(t, k8sConfig.EnableHostNetworkingPod)
assert.True(t, *k8sConfig.EnableHostNetworkingPod)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[0].Name)
assert.Equal(t, "ndots", k8sConfig.DefaultPodDNSConfig.Options[0].Name)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[0].Value)
assert.Equal(t, "1", *k8sConfig.DefaultPodDNSConfig.Options[0].Value)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[1].Name)
assert.Equal(t, "single-request-reopen", k8sConfig.DefaultPodDNSConfig.Options[1].Name)
assert.Nil(t, k8sConfig.DefaultPodDNSConfig.Options[1].Value)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[2].Name)
assert.Equal(t, "timeout", k8sConfig.DefaultPodDNSConfig.Options[2].Name)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[2].Value)
assert.Equal(t, "1", *k8sConfig.DefaultPodDNSConfig.Options[2].Value)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[3].Name)
assert.Equal(t, "attempts", k8sConfig.DefaultPodDNSConfig.Options[3].Name)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Options[3].Value)
assert.Equal(t, "3", *k8sConfig.DefaultPodDNSConfig.Options[3].Value)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Nameservers)
assert.Equal(t, []string{"8.8.8.8", "8.8.4.4"}, k8sConfig.DefaultPodDNSConfig.Nameservers)
assert.NotNil(t, k8sConfig.DefaultPodDNSConfig.Searches)
assert.Equal(t, []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"}, k8sConfig.DefaultPodDNSConfig.Searches)
})

t.Run("logs-config-test", func(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ type K8sPluginConfig struct {
// Refer to - https://kubernetes.io/docs/concepts/policy/pod-security-policy/#host-namespaces.
// As a follow up, the default pod configurations will now be adjusted using podTemplates per namespace
EnableHostNetworkingPod *bool `json:"enable-host-networking-pod" pflag:"-,If true, will schedule all pods with hostNetwork: true."`

// DefaultPodDNSConfig provides a default pod DNS config that that should be applied for the primary container launched and created by FlytePropeller. This may not be applicable to all plugins. For
// // downstream plugins - i.e. TensorflowOperators may not support setting this.
DefaultPodDNSConfig *v1.PodDNSConfig `json:"default-pod-dns-config" pflag:"-,Optionally specify a default DNS config that should be applied to every Pod launched by FlytePropeller."`
}

// FlyteCoPilotConfig specifies configuration for the Flyte CoPilot system. FlyteCoPilot, allows running flytekit-less containers
Expand Down
3 changes: 3 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/pod_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func UpdatePodWithInterruptibleFlag(taskExecutionMetadata pluginsCore.TaskExecut
if config.GetK8sPluginConfig().EnableHostNetworkingPod != nil {
podSpec.HostNetwork = *config.GetK8sPluginConfig().EnableHostNetworkingPod
}
if podSpec.DNSConfig == nil && config.GetK8sPluginConfig().DefaultPodDNSConfig != nil {
podSpec.DNSConfig = config.GetK8sPluginConfig().DefaultPodDNSConfig.DeepCopy()
}
ApplyInterruptibleNodeAffinity(isInterruptible, podSpec)
}

Expand Down
43 changes: 43 additions & 0 deletions go/tasks/pluginmachinery/flytek8s/pod_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,49 @@ func TestToK8sPod(t *testing.T) {
assert.NoError(t, err)
assert.False(t, p.HostNetwork)
})

t.Run("default-pod-dns-config", func(t *testing.T) {
val1 := "1"
val2 := "1"
val3 := "3"
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
DefaultPodDNSConfig: &v1.PodDNSConfig{
Nameservers: []string{"8.8.8.8", "8.8.4.4"},
Options: []v1.PodDNSConfigOption{
{
Name: "ndots",
Value: &val1,
},
{
Name: "single-request-reopen",
},
{
Name: "timeout",
Value: &val2,
},
{
Name: "attempts",
Value: &val3,
},
},
Searches: []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"},
},
}))

x := dummyExecContext(&v1.ResourceRequirements{})
p, err := ToK8sPodSpec(ctx, x)
assert.NoError(t, err)
assert.NotNil(t, p.DNSConfig)
assert.Equal(t, []string{"8.8.8.8", "8.8.4.4"}, p.DNSConfig.Nameservers)
assert.Equal(t, "ndots", p.DNSConfig.Options[0].Name)
assert.Equal(t, val1, *p.DNSConfig.Options[0].Value)
assert.Equal(t, "single-request-reopen", p.DNSConfig.Options[1].Name)
assert.Equal(t, "timeout", p.DNSConfig.Options[2].Name)
assert.Equal(t, val2, *p.DNSConfig.Options[2].Value)
assert.Equal(t, "attempts", p.DNSConfig.Options[3].Name)
assert.Equal(t, val3, *p.DNSConfig.Options[3].Value)
assert.Equal(t, []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"}, p.DNSConfig.Searches)
})
}

func TestDemystifyPending(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions go/tasks/plugins/k8s/spark/spark.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo
EnvVars: sparkEnvVars,
Image: &container.Image,
SecurityContenxt: config.GetK8sPluginConfig().DefaultPodSecurityContext.DeepCopy(),
DNSConfig: config.GetK8sPluginConfig().DefaultPodDNSConfig.DeepCopy(),
},
ServiceAccount: &serviceAccountName,
}
Expand All @@ -115,6 +116,7 @@ func (sparkResourceHandler) BuildResource(ctx context.Context, taskCtx pluginsCo
Image: &container.Image,
EnvVars: sparkEnvVars,
SecurityContenxt: config.GetK8sPluginConfig().DefaultPodSecurityContext.DeepCopy(),
DNSConfig: config.GetK8sPluginConfig().DefaultPodDNSConfig.DeepCopy(),
},
}

Expand Down
46 changes: 46 additions & 0 deletions go/tasks/plugins/k8s/spark/spark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,34 @@ func TestBuildResourceSpark(t *testing.T) {

// Set Interruptible Config
runAsUser := int64(1000)
dnsOptVal1 := "1"
dnsOptVal2 := "1"
dnsOptVal3 := "3"
assert.NoError(t, config.SetK8sPluginConfig(&config.K8sPluginConfig{
DefaultPodSecurityContext: &corev1.PodSecurityContext{
RunAsUser: &runAsUser,
},
DefaultPodDNSConfig: &corev1.PodDNSConfig{
Nameservers: []string{"8.8.8.8", "8.8.4.4"},
Options: []corev1.PodDNSConfigOption{
{
Name: "ndots",
Value: &dnsOptVal1,
},
{
Name: "single-request-reopen",
},
{
Name: "timeout",
Value: &dnsOptVal2,
},
{
Name: "attempts",
Value: &dnsOptVal3,
},
},
Searches: []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"},
},
InterruptibleNodeSelector: map[string]string{
"x/interruptible": "true",
},
Expand All @@ -379,6 +403,28 @@ func TestBuildResourceSpark(t *testing.T) {
assert.Equal(t, testImage, *sparkApp.Spec.Image)
assert.NotNil(t, sparkApp.Spec.Driver.SparkPodSpec.SecurityContenxt)
assert.Equal(t, *sparkApp.Spec.Driver.SparkPodSpec.SecurityContenxt.RunAsUser, runAsUser)
assert.NotNil(t, sparkApp.Spec.Driver.DNSConfig)
assert.Equal(t, []string{"8.8.8.8", "8.8.4.4"}, sparkApp.Spec.Driver.DNSConfig.Nameservers)
assert.Equal(t, "ndots", sparkApp.Spec.Driver.DNSConfig.Options[0].Name)
assert.Equal(t, dnsOptVal1, *sparkApp.Spec.Driver.DNSConfig.Options[0].Value)
assert.Equal(t, "single-request-reopen", sparkApp.Spec.Driver.DNSConfig.Options[1].Name)
assert.Equal(t, "timeout", sparkApp.Spec.Driver.DNSConfig.Options[2].Name)
assert.Equal(t, dnsOptVal2, *sparkApp.Spec.Driver.DNSConfig.Options[2].Value)
assert.Equal(t, "attempts", sparkApp.Spec.Driver.DNSConfig.Options[3].Name)
assert.Equal(t, dnsOptVal3, *sparkApp.Spec.Driver.DNSConfig.Options[3].Value)
assert.Equal(t, []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"}, sparkApp.Spec.Driver.DNSConfig.Searches)
assert.NotNil(t, sparkApp.Spec.Executor.SparkPodSpec.SecurityContenxt)
assert.Equal(t, *sparkApp.Spec.Executor.SparkPodSpec.SecurityContenxt.RunAsUser, runAsUser)
assert.NotNil(t, sparkApp.Spec.Executor.DNSConfig)
assert.NotNil(t, sparkApp.Spec.Executor.DNSConfig)
assert.Equal(t, "ndots", sparkApp.Spec.Executor.DNSConfig.Options[0].Name)
assert.Equal(t, dnsOptVal1, *sparkApp.Spec.Executor.DNSConfig.Options[0].Value)
assert.Equal(t, "single-request-reopen", sparkApp.Spec.Executor.DNSConfig.Options[1].Name)
assert.Equal(t, "timeout", sparkApp.Spec.Executor.DNSConfig.Options[2].Name)
assert.Equal(t, dnsOptVal2, *sparkApp.Spec.Executor.DNSConfig.Options[2].Value)
assert.Equal(t, "attempts", sparkApp.Spec.Executor.DNSConfig.Options[3].Name)
assert.Equal(t, dnsOptVal3, *sparkApp.Spec.Executor.DNSConfig.Options[3].Value)
assert.Equal(t, []string{"ns1.svc.cluster-domain.example", "my.dns.search.suffix"}, sparkApp.Spec.Executor.DNSConfig.Searches)

//Validate Driver/Executor Spec.
driverCores, _ := strconv.ParseInt(dummySparkConf["spark.driver.cores"], 10, 32)
Expand Down
16 changes: 16 additions & 0 deletions go/tasks/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,22 @@ plugins:
default-security-context:
allowPrivilegeEscalation: false
enable-host-networking-pod: true
default-pod-dns-config:
options:
- name: "ndots"
value: "1"
- name: "single-request-reopen"
- name: "timeout"
value: "1"
- name: "attempts"
value: "3"
nameservers:
- "8.8.8.8"
- "8.8.4.4"
searches:
- "ns1.svc.cluster-domain.example"
- "my.dns.search.suffix"

# Spark Plugin configuration
spark:
spark-config-default:
Expand Down

0 comments on commit 6b72fd6

Please sign in to comment.