From 0c1596d780898fe716119701bd78c73f64d03fb4 Mon Sep 17 00:00:00 2001 From: Peter McClonski Date: Fri, 17 May 2024 18:24:45 -0400 Subject: [PATCH] Fixes a bug with dynamic allocation forcing the executor count to be 1 even when minExecutors is set to 0 (#1979) Signed-off-by: Peter McClonski --- .../sparkoperator.k8s.io/v1beta2/defaults.go | 15 +++++++---- .../v1beta2/defaults_test.go | 25 +++++++++++++++++++ 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/pkg/apis/sparkoperator.k8s.io/v1beta2/defaults.go b/pkg/apis/sparkoperator.k8s.io/v1beta2/defaults.go index f722a36cfd..e46f4012df 100644 --- a/pkg/apis/sparkoperator.k8s.io/v1beta2/defaults.go +++ b/pkg/apis/sparkoperator.k8s.io/v1beta2/defaults.go @@ -16,6 +16,8 @@ limitations under the License. package v1beta2 +import "strconv" + // SetSparkApplicationDefaults sets default values for certain fields of a SparkApplication. func SetSparkApplicationDefaults(app *SparkApplication) { if app == nil { @@ -44,7 +46,7 @@ func SetSparkApplicationDefaults(app *SparkApplication) { } setDriverSpecDefaults(&app.Spec.Driver, app.Spec.SparkConf) - setExecutorSpecDefaults(&app.Spec.Executor, app.Spec.SparkConf) + setExecutorSpecDefaults(&app.Spec.Executor, app.Spec.SparkConf, app.Spec.DynamicAllocation) } func setDriverSpecDefaults(spec *DriverSpec, sparkConf map[string]string) { @@ -59,7 +61,7 @@ func setDriverSpecDefaults(spec *DriverSpec, sparkConf map[string]string) { } } -func setExecutorSpecDefaults(spec *ExecutorSpec, sparkConf map[string]string) { +func setExecutorSpecDefaults(spec *ExecutorSpec, sparkConf map[string]string, allocSpec *DynamicAllocation) { if _, exists := sparkConf["spark.executor.cores"]; !exists && spec.Cores == nil { spec.Cores = new(int32) *spec.Cores = 1 @@ -68,8 +70,11 @@ func setExecutorSpecDefaults(spec *ExecutorSpec, sparkConf map[string]string) { spec.Memory = new(string) *spec.Memory = "1g" } - if _, exists := sparkConf["spark.executor.instances"]; !exists && spec.Instances == nil { - spec.Instances = new(int32) - *spec.Instances = 1 + var dynalloc, _ = sparkConf["spark.dynamicallocation.enabled"] + if dynamic, _ := strconv.ParseBool(dynalloc); !dynamic && (allocSpec == nil || !allocSpec.Enabled) { + if _, exists := sparkConf["spark.executor.instances"]; !exists && spec.Instances == nil { + spec.Instances = new(int32) + *spec.Instances = 1 + } } } diff --git a/pkg/apis/sparkoperator.k8s.io/v1beta2/defaults_test.go b/pkg/apis/sparkoperator.k8s.io/v1beta2/defaults_test.go index 6c1be13de6..624374ee16 100644 --- a/pkg/apis/sparkoperator.k8s.io/v1beta2/defaults_test.go +++ b/pkg/apis/sparkoperator.k8s.io/v1beta2/defaults_test.go @@ -199,4 +199,29 @@ func TestSetSparkApplicationDefaultsExecutorSpecDefaults(t *testing.T) { assert.Nil(t, app.Spec.Executor.Memory) assert.Nil(t, app.Spec.Executor.Instances) + //Case3: Dynamic allocation is enabled with minExecutors = 0 + var minExecs = int32(0) + app = &SparkApplication{ + Spec: SparkApplicationSpec{ + DynamicAllocation: &DynamicAllocation{ + Enabled: true, + MinExecutors: &minExecs, + }, + }, + } + + SetSparkApplicationDefaults(app) + assert.Nil(t, app.Spec.Executor.Instances) + + //Case4: Dynamic allocation is enabled via SparkConf + app = &SparkApplication{ + Spec: SparkApplicationSpec{ + SparkConf: map[string]string{ + "spark.dynamicallocation.enabled": "true", + }, + }, + } + + SetSparkApplicationDefaults(app) + assert.Nil(t, app.Spec.Executor.Instances) }