Skip to content

Commit

Permalink
Add executor_path and applications_path to spark config (#1634)
Browse files Browse the repository at this point in the history
* Add executor_path and applications_path to spark config

Signed-off-by: Kevin Su <[email protected]>

* nit

Signed-off-by: Kevin Su <[email protected]>

---------

Signed-off-by: Kevin Su <[email protected]>
  • Loading branch information
pingsutw authored May 14, 2023
1 parent dab1eed commit 8ef79e5
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
8 changes: 6 additions & 2 deletions plugins/flytekit-spark/flytekitplugins/spark/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ class Spark(object):
Args:
spark_conf: Dictionary of spark config. The variables should match what spark expects
hadoop_conf: Dictionary of hadoop conf. The variables should match a typical hadoop configuration for spark
executor_path: Python binary executable to use for PySpark in driver and executor.
applications_path: MainFile is the path to a bundled JAR, Python, or R file of the application to execute.
"""

spark_conf: Optional[Dict[str, str]] = None
hadoop_conf: Optional[Dict[str, str]] = None
executor_path: Optional[str] = None
applications_path: Optional[str] = None

def __post_init__(self):
if self.spark_conf is None:
Expand Down Expand Up @@ -107,8 +111,8 @@ def __init__(
**kwargs,
):
self.sess: Optional[SparkSession] = None
self._default_executor_path: Optional[str] = None
self._default_applications_path: Optional[str] = None
self._default_executor_path: Optional[str] = task_config.executor_path
self._default_applications_path: Optional[str] = task_config.applications_path

if isinstance(container_image, ImageSpec):
if container_image.base_image is None:
Expand Down
10 changes: 9 additions & 1 deletion plugins/flytekit-spark/tests/test_spark_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ def test_spark_task(reset_spark_session):
},
}

@task(task_config=Spark(spark_conf={"spark": "1"}))
@task(
task_config=Spark(
spark_conf={"spark": "1"},
executor_path="/usr/bin/python3",
applications_path="local:///usr/local/bin/entrypoint.py",
)
)
def my_spark(a: str) -> int:
session = flytekit.current_context().spark_session
assert session.sparkContext.appName == "FlyteSpark: ex:local:local:local"
Expand All @@ -56,6 +62,8 @@ def my_spark(a: str) -> int:

retrieved_settings = my_spark.get_custom(settings)
assert retrieved_settings["sparkConf"] == {"spark": "1"}
assert retrieved_settings["executorPath"] == "/usr/bin/python3"
assert retrieved_settings["mainApplicationFile"] == "local:///usr/local/bin/entrypoint.py"

pb = ExecutionParameters.new_builder()
pb.working_dir = "/tmp"
Expand Down

0 comments on commit 8ef79e5

Please sign in to comment.