Skip to content

Commit

Permalink
Fixes a bug with dynamic allocation forcing the executor count to be …
Browse files Browse the repository at this point in the history
…1 even when minExecutors is set to 0 (kubeflow#1979)

Signed-off-by: Peter McClonski <mcclonski_peter@bah.com>
peter-mcclonski authored May 17, 2024

Verified

This commit was signed with the committer’s verified signature.
1 parent d094970 commit 0c1596d
Showing 2 changed files with 35 additions and 5 deletions.
15 changes: 10 additions & 5 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/defaults.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,8 @@ limitations under the License.

package v1beta2

import "strconv"

// SetSparkApplicationDefaults sets default values for certain fields of a SparkApplication.
func SetSparkApplicationDefaults(app *SparkApplication) {
if app == nil {
@@ -44,7 +46,7 @@ func SetSparkApplicationDefaults(app *SparkApplication) {
}

setDriverSpecDefaults(&app.Spec.Driver, app.Spec.SparkConf)
setExecutorSpecDefaults(&app.Spec.Executor, app.Spec.SparkConf)
setExecutorSpecDefaults(&app.Spec.Executor, app.Spec.SparkConf, app.Spec.DynamicAllocation)
}

func setDriverSpecDefaults(spec *DriverSpec, sparkConf map[string]string) {
@@ -59,7 +61,7 @@ func setDriverSpecDefaults(spec *DriverSpec, sparkConf map[string]string) {
}
}

func setExecutorSpecDefaults(spec *ExecutorSpec, sparkConf map[string]string) {
func setExecutorSpecDefaults(spec *ExecutorSpec, sparkConf map[string]string, allocSpec *DynamicAllocation) {
if _, exists := sparkConf["spark.executor.cores"]; !exists && spec.Cores == nil {
spec.Cores = new(int32)
*spec.Cores = 1
@@ -68,8 +70,11 @@ func setExecutorSpecDefaults(spec *ExecutorSpec, sparkConf map[string]string) {
spec.Memory = new(string)
*spec.Memory = "1g"
}
if _, exists := sparkConf["spark.executor.instances"]; !exists && spec.Instances == nil {
spec.Instances = new(int32)
*spec.Instances = 1
var dynalloc, _ = sparkConf["spark.dynamicallocation.enabled"]
if dynamic, _ := strconv.ParseBool(dynalloc); !dynamic && (allocSpec == nil || !allocSpec.Enabled) {
if _, exists := sparkConf["spark.executor.instances"]; !exists && spec.Instances == nil {
spec.Instances = new(int32)
*spec.Instances = 1
}
}
}
25 changes: 25 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/defaults_test.go
Original file line number Diff line number Diff line change
@@ -199,4 +199,29 @@ func TestSetSparkApplicationDefaultsExecutorSpecDefaults(t *testing.T) {
assert.Nil(t, app.Spec.Executor.Memory)
assert.Nil(t, app.Spec.Executor.Instances)

//Case3: Dynamic allocation is enabled with minExecutors = 0
var minExecs = int32(0)
app = &SparkApplication{
Spec: SparkApplicationSpec{
DynamicAllocation: &DynamicAllocation{
Enabled: true,
MinExecutors: &minExecs,
},
},
}

SetSparkApplicationDefaults(app)
assert.Nil(t, app.Spec.Executor.Instances)

//Case4: Dynamic allocation is enabled via SparkConf
app = &SparkApplication{
Spec: SparkApplicationSpec{
SparkConf: map[string]string{
"spark.dynamicallocation.enabled": "true",
},
},
}

SetSparkApplicationDefaults(app)
assert.Nil(t, app.Spec.Executor.Instances)
}

0 comments on commit 0c1596d

Please sign in to comment.