Skip to content

Commit

Permalink
Resolves a bug with inappropriate minimum executors in dynamic alloca…
Browse files Browse the repository at this point in the history
…tion environments.
  • Loading branch information
peter-mcclonski committed Apr 26, 2024
1 parent 333ee0b commit c256baf
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 8 deletions.
4 changes: 2 additions & 2 deletions charts/spark-operator-chart/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
apiVersion: v2
name: spark-operator
description: A Helm chart for Spark on Kubernetes operator
version: 1.2.14
appVersion: v1beta2-1.4.5-3.5.0
version: 1.2.15
appVersion: v1beta2-1.4.6-3.5.0
keywords:
- spark
home: https://github.com/kubeflow/spark-operator
Expand Down
2 changes: 1 addition & 1 deletion charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# spark-operator

![Version: 1.2.14](https://img.shields.io/badge/Version-1.2.14-informational?style=flat-square) ![AppVersion: v1beta2-1.4.5-3.5.0](https://img.shields.io/badge/AppVersion-v1beta2--1.4.5--3.5.0-informational?style=flat-square)
![Version: 1.2.15](https://img.shields.io/badge/Version-1.2.15-informational?style=flat-square) ![AppVersion: v1beta2-1.4.6-3.5.0](https://img.shields.io/badge/AppVersion-v1beta2--1.4.6--3.5.0-informational?style=flat-square)

A Helm chart for Spark on Kubernetes operator

Expand Down
15 changes: 10 additions & 5 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package v1beta2

import "strconv"

// SetSparkApplicationDefaults sets default values for certain fields of a SparkApplication.
func SetSparkApplicationDefaults(app *SparkApplication) {
if app == nil {
Expand Down Expand Up @@ -44,7 +46,7 @@ func SetSparkApplicationDefaults(app *SparkApplication) {
}

setDriverSpecDefaults(&app.Spec.Driver, app.Spec.SparkConf)
setExecutorSpecDefaults(&app.Spec.Executor, app.Spec.SparkConf)
setExecutorSpecDefaults(&app.Spec.Executor, app.Spec.SparkConf, app.Spec.DynamicAllocation)
}

func setDriverSpecDefaults(spec *DriverSpec, sparkConf map[string]string) {
Expand All @@ -59,7 +61,7 @@ func setDriverSpecDefaults(spec *DriverSpec, sparkConf map[string]string) {
}
}

func setExecutorSpecDefaults(spec *ExecutorSpec, sparkConf map[string]string) {
func setExecutorSpecDefaults(spec *ExecutorSpec, sparkConf map[string]string, allocSpec *DynamicAllocation) {
if _, exists := sparkConf["spark.executor.cores"]; !exists && spec.Cores == nil {
spec.Cores = new(int32)
*spec.Cores = 1
Expand All @@ -68,8 +70,11 @@ func setExecutorSpecDefaults(spec *ExecutorSpec, sparkConf map[string]string) {
spec.Memory = new(string)
*spec.Memory = "1g"
}
if _, exists := sparkConf["spark.executor.instances"]; !exists && spec.Instances == nil {
spec.Instances = new(int32)
*spec.Instances = 1
var dynalloc, _ = sparkConf["spark.dynamicallocation.enabled"]
if dynamic, _ := strconv.ParseBool(dynalloc); !dynamic && (allocSpec == nil || !allocSpec.Enabled) {
if _, exists := sparkConf["spark.executor.instances"]; !exists && spec.Instances == nil {
spec.Instances = new(int32)
*spec.Instances = 1
}
}
}
25 changes: 25 additions & 0 deletions pkg/apis/sparkoperator.k8s.io/v1beta2/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,29 @@ func TestSetSparkApplicationDefaultsExecutorSpecDefaults(t *testing.T) {
assert.Nil(t, app.Spec.Executor.Memory)
assert.Nil(t, app.Spec.Executor.Instances)

//Case3: Dynamic allocation is enabled with minExecutors = 0
var minExecs = int32(0)
app = &SparkApplication{
Spec: SparkApplicationSpec{
DynamicAllocation: &DynamicAllocation{
Enabled: true,
MinExecutors: &minExecs,
},
},
}

SetSparkApplicationDefaults(app)
assert.Nil(t, app.Spec.Executor.Instances)

//Case4: Dynamic allocation is enabled via SparkConf
app = &SparkApplication{
Spec: SparkApplicationSpec{
SparkConf: map[string]string{
"spark.dynamicallocation.enabled": "true",
},
},
}

SetSparkApplicationDefaults(app)
assert.Nil(t, app.Spec.Executor.Instances)
}

0 comments on commit c256baf

Please sign in to comment.