-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47475][CORE][K8S] Support spark.kubernetes.jars.avoidDownloadSchemes
for K8s Cluster Mode
#45715
[SPARK-47475][CORE][K8S] Support spark.kubernetes.jars.avoidDownloadSchemes
for K8s Cluster Mode
#45715
Conversation
val workingDirectory = "." | ||
childClasspath += workingDirectory | ||
def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to add a new empty line here.
def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false): | ||
|
||
def downloadResourcesToCurrentDirectory(uris: String, | ||
isArchive: Boolean = false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you take a look at Apache Spark Coding Guideline
at this time?
For Scala code, Apache Spark follows the official Scala style guide and Databricks Scala guide. The latter is preferred. To format Scala code, run ./dev/scalafmt prior to submitting a PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For this case, the following is the rule.
For method declarations, use 4 space indentation for their parameters
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ya, scalafmt
is only recommended for newly added code. Please don't touch the existing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the tips @dongjoon-hyun!
I tried running ./dev/scalafmt
, somehow a lot of existing code was refactored for style. Is this expected? Is there something I missed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for making a narrow-downed PR. I did a quick review today. I'll take another round review tomorrow, @leletan .
@@ -1458,6 +1458,18 @@ package object config { | |||
.doubleConf | |||
.createWithDefault(1.5) | |||
|
|||
private[spark] val KUBERNETES_AVOID_JAR_DOWNLOAD_SCHEMES = | |||
ConfigBuilder("spark.kubernetes.jars.avoidDownloadSchemes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, the AS-IS PR title match with this?
Make Jars Download to Driver Optional under K8s Cluster Mode
spark.kubernetes.jars.avoidDownloadSchemes
under K8s Cluster Mode
spark.kubernetes.jars.avoidDownloadSchemes
under K8s Cluster Modespark.kubernetes.jars.avoidDownloadSchemes
under K8s Cluster Mode
spark.kubernetes.jars.avoidDownloadSchemes
under K8s Cluster Modespark.kubernetes.jars.avoidDownloadSchemes
under K8s Cluster Mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @leletan .
spark.kubernetes.jars.avoidDownloadSchemes
under K8s Cluster Modespark.kubernetes.jars.avoidDownloadSchemes
for K8s Cluster Mode
…Schemes` for K8s Cluster Mode ### What changes were proposed in this pull request? During spark submit, for K8s cluster mode driver, instead of always downloading jars and serving it to executors, avoid the download if the url matches `spark.kubernetes.jars.avoidDownloadSchemes` in the configuration. ### Why are the changes needed? For K8s cluster mode driver, `SparkSubmit` will download all the jars in the `spark.jars` to driver and then those jars' urls in `spark.jars` will be replaced by the driver local paths. Later when driver starts the `SparkContext`, it will copy all the `spark.jars` to `spark.app.initial.jar.urls`, start a file server and replace the jars with driver local paths in `spark.app.initial.jar.urls` with file service urls. When the executors start, they will download those driver local jars by `spark.app.initial.jar.urls`. When jars are big and the spark application requests a lot of executors, the executors' massive concurrent download of the jars from the driver will cause network saturation. In this case, the executors jar download will timeout, causing executors to be terminated. From user point of view, the application is trapped in the loop of massive executor loss and re-provision, but never gets enough live executors as requested, leads to SLA breach or sometimes failure. So instead of letting driver to download the jars and then serve them to executors, if we just avoid driver from downloading the jars and keeping the urls in `spark.jars` as they were, the executor will try to directly download the jars from the urls provided by user. This will avoid the driver download bottleneck mentioned above, especially when jar urls are with scalable storage schemes, like s3 or hdfs. Meanwhile, there are cases jar urls are with schemes of less scalable than driver file server, e.g. http, ftp, etc, or when the jars are small, or executor count is small - user may still want to fall back to current solution and use driver file server to serve the jars. So in this case, make the driver jars downloading and serving optional by scheme (similar idea to `FORCE_DOWNLOAD_SCHEMES` in YARN) is a good approach for the solution. ### Does this PR introduce _any_ user-facing change? A configuration `spark.kubernetes.jars.avoidDownloadSchemes` is added ### How was this patch tested? - Unit tests added - Tested with an application running on AWS EKS submitted with a 1GB jar on s3. - Before the fix, the application could not scale to 1k live executors. - After the fix, the application had no problem to scale beyond 12k live executors. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45715 from leletan/allow_k8s_executor_to_download_remote_jar. Authored-by: jiale_tan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…Schemes` for K8s Cluster Mode During spark submit, for K8s cluster mode driver, instead of always downloading jars and serving it to executors, avoid the download if the url matches `spark.kubernetes.jars.avoidDownloadSchemes` in the configuration. For K8s cluster mode driver, `SparkSubmit` will download all the jars in the `spark.jars` to driver and then those jars' urls in `spark.jars` will be replaced by the driver local paths. Later when driver starts the `SparkContext`, it will copy all the `spark.jars` to `spark.app.initial.jar.urls`, start a file server and replace the jars with driver local paths in `spark.app.initial.jar.urls` with file service urls. When the executors start, they will download those driver local jars by `spark.app.initial.jar.urls`. When jars are big and the spark application requests a lot of executors, the executors' massive concurrent download of the jars from the driver will cause network saturation. In this case, the executors jar download will timeout, causing executors to be terminated. From user point of view, the application is trapped in the loop of massive executor loss and re-provision, but never gets enough live executors as requested, leads to SLA breach or sometimes failure. So instead of letting driver to download the jars and then serve them to executors, if we just avoid driver from downloading the jars and keeping the urls in `spark.jars` as they were, the executor will try to directly download the jars from the urls provided by user. This will avoid the driver download bottleneck mentioned above, especially when jar urls are with scalable storage schemes, like s3 or hdfs. Meanwhile, there are cases jar urls are with schemes of less scalable than driver file server, e.g. http, ftp, etc, or when the jars are small, or executor count is small - user may still want to fall back to current solution and use driver file server to serve the jars. So in this case, make the driver jars downloading and serving optional by scheme (similar idea to `FORCE_DOWNLOAD_SCHEMES` in YARN) is a good approach for the solution. A configuration `spark.kubernetes.jars.avoidDownloadSchemes` is added - Unit tests added - Tested with an application running on AWS EKS submitted with a 1GB jar on s3. - Before the fix, the application could not scale to 1k live executors. - After the fix, the application had no problem to scale beyond 12k live executors. No Closes apache#45715 from leletan/allow_k8s_executor_to_download_remote_jar. Authored-by: jiale_tan <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 2e4f2b0) Signed-off-by: Dongjoon Hyun <[email protected]>
What changes were proposed in this pull request?
During spark submit, for K8s cluster mode driver, instead of always downloading jars and serving it to executors, avoid the download if the url matches
spark.kubernetes.jars.avoidDownloadSchemes
in the configuration.Why are the changes needed?
For K8s cluster mode driver,
SparkSubmit
will download all the jars in thespark.jars
to driver and then those jars' urls inspark.jars
will be replaced by the driver local paths. Later when driver starts theSparkContext
, it will copy all thespark.jars
tospark.app.initial.jar.urls
, start a file server and replace the jars with driver local paths inspark.app.initial.jar.urls
with file service urls. When the executors start, they will download those driver local jars byspark.app.initial.jar.urls
.When jars are big and the spark application requests a lot of executors, the executors' massive concurrent download of the jars from the driver will cause network saturation. In this case, the executors jar download will timeout, causing executors to be terminated. From user point of view, the application is trapped in the loop of massive executor loss and re-provision, but never gets enough live executors as requested, leads to SLA breach or sometimes failure.
So instead of letting driver to download the jars and then serve them to executors, if we just avoid driver from downloading the jars and keeping the urls in
spark.jars
as they were, the executor will try to directly download the jars from the urls provided by user. This will avoid the driver download bottleneck mentioned above, especially when jar urls are with scalable storage schemes, like s3 or hdfs.Meanwhile, there are cases jar urls are with schemes of less scalable than driver file server, e.g. http, ftp, etc, or when the jars are small, or executor count is small - user may still want to fall back to current solution and use driver file server to serve the jars.
So in this case, make the driver jars downloading and serving optional by scheme (similar idea to
FORCE_DOWNLOAD_SCHEMES
in YARN) is a good approach for the solution.Does this PR introduce any user-facing change?
A configuration
spark.kubernetes.jars.avoidDownloadSchemes
is addedHow was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No