Skip to content

Commit

Permalink
[SPARK-47475][CORE][K8S] Support `spark.kubernetes.jars.avoidDownload…
Browse files Browse the repository at this point in the history
…Schemes` for K8s Cluster Mode

### What changes were proposed in this pull request?

During spark submit, for K8s cluster mode driver, instead of always downloading jars and serving it to executors, avoid the download if the url matches `spark.kubernetes.jars.avoidDownloadSchemes` in the configuration.

### Why are the changes needed?

For K8s cluster mode driver, `SparkSubmit` will download all the jars in the `spark.jars` to driver and then those jars' urls in `spark.jars` will be replaced by the driver local paths. Later when driver starts the `SparkContext`, it will copy all the `spark.jars` to `spark.app.initial.jar.urls`, start a file server and replace the jars with driver local paths in `spark.app.initial.jar.urls` with file service urls. When the executors start, they will download those driver local jars by `spark.app.initial.jar.urls`.
When jars are big and the spark application requests a lot of executors, the executors' massive concurrent download of the jars from the driver will cause network saturation. In this case, the executors jar download will timeout, causing executors to be terminated. From user point of view, the application is trapped in the loop of massive executor loss and re-provision, but never gets enough live executors as requested, leads to SLA breach or sometimes failure.
So instead of letting driver to download the jars and then serve them to executors, if we just avoid driver from downloading the jars and keeping the urls in `spark.jars` as they were, the executor will try to directly download the jars from the urls provided by user. This will avoid the driver download bottleneck mentioned above, especially when jar urls are with scalable storage schemes, like s3 or hdfs.
Meanwhile, there are cases jar urls are with schemes of less scalable than driver file server, e.g. http, ftp, etc, or when the jars are small, or executor count is small - user may still want to fall back to current solution and use driver file server to serve the jars.
So in this case, make the driver jars downloading and serving optional by scheme (similar idea to `FORCE_DOWNLOAD_SCHEMES` in YARN) is a good approach for the solution.

### Does this PR introduce _any_ user-facing change?

A configuration `spark.kubernetes.jars.avoidDownloadSchemes` is added

### How was this patch tested?

- Unit tests added
- Tested with an application running on AWS EKS submitted with a 1GB jar on s3.
  - Before the fix, the application could not scale to 1k live executors.
  - After the fix, the application had no problem to scale beyond 12k live executors.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45715 from leletan/allow_k8s_executor_to_download_remote_jar.

Authored-by: jiale_tan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
jiale_tan authored and sweisdb committed Apr 1, 2024
1 parent 7976ec9 commit 28a514a
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 8 deletions.
28 changes: 20 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,16 +401,23 @@ private[spark] class SparkSubmit extends Logging {
// SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current
// working directory
// SPARK-43540: add current working directory into driver classpath
// SPARK-47475: make download to driver optional so executors may fetch resource from remote
// url directly to avoid overwhelming driver network when resource is big and executor count
// is high
val workingDirectory = "."
childClasspath += workingDirectory
def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false):
String = {
def downloadResourcesToCurrentDirectory(
uris: String,
isArchive: Boolean = false,
avoidDownload: String => Boolean = _ => false): String = {
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
val (avoidDownloads, toDownloads) =
resolvedUris.partition(uri => avoidDownload(uri.getScheme))
val localResources = downloadFileList(
resolvedUris.map(
toDownloads.map(
Utils.getUriBuilder(_).fragment(null).build().toString).mkString(","),
targetDir, sparkConf, hadoopConf)
Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map {
(Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(toDownloads).map {
case (localResources, resolvedUri) =>
val source = new File(localResources.getPath).getCanonicalFile
val dest = new File(
Expand All @@ -427,14 +434,19 @@ private[spark] class SparkSubmit extends Logging {
// Keep the URIs of local files with the given fragments.
Utils.getUriBuilder(
localResources).fragment(resolvedUri.getFragment).build().toString
}.mkString(",")
} ++ avoidDownloads.map(_.toString)).mkString(",")
}

val avoidJarDownloadSchemes = sparkConf.get(KUBERNETES_JARS_AVOID_DOWNLOAD_SCHEMES)

def avoidJarDownload(scheme: String): Boolean =
avoidJarDownloadSchemes.contains("*") || avoidJarDownloadSchemes.contains(scheme)

val filesLocalFiles = Option(args.files).map {
downloadResourcesToCurrentDirectory(_)
}.orNull
val jarsLocalJars = Option(args.jars).map {
downloadResourcesToCurrentDirectory(_)
val updatedJars = Option(args.jars).map {
downloadResourcesToCurrentDirectory(_, avoidDownload = avoidJarDownload)
}.orNull
val archiveLocalFiles = Option(args.archives).map {
downloadResourcesToCurrentDirectory(_, true)
Expand All @@ -445,7 +457,7 @@ private[spark] class SparkSubmit extends Logging {
args.files = filesLocalFiles
args.archives = archiveLocalFiles
args.pyFiles = pyLocalFiles
args.jars = jarsLocalJars
args.jars = updatedJars
}
}

Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1458,6 +1458,18 @@ package object config {
.doubleConf
.createWithDefault(1.5)

private[spark] val KUBERNETES_JARS_AVOID_DOWNLOAD_SCHEMES =
ConfigBuilder("spark.kubernetes.jars.avoidDownloadSchemes")
.doc("Comma-separated list of schemes for which jars will NOT be downloaded to the " +
"driver local disk prior to be distributed to executors, only for kubernetes deployment. " +
"For use in cases when the jars are big and executor counts are high, " +
"concurrent download causes network saturation and timeouts. " +
"Wildcard '*' is denoted to not downloading jars for any the schemes.")
.version("4.0.0")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val FORCE_DOWNLOAD_SCHEMES =
ConfigBuilder("spark.yarn.dist.forceDownloadSchemes")
.doc("Comma-separated list of schemes for which resources will be downloaded to the " +
Expand Down
42 changes: 42 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,48 @@ class SparkSubmitSuite
Files.delete(Paths.get("TestUDTF.jar"))
}

test("SPARK-47475: Avoid jars download if scheme matches " +
"spark.kubernetes.jars.avoidDownloadSchemes " +
"in k8s client mode & driver runs inside a POD") {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
withTempDir { tmpDir =>
val notToDownload = File.createTempFile("NotToDownload", ".jar", tmpDir)
val remoteJarFile = s"s3a://${notToDownload.getAbsolutePath}"

val clArgs = Seq(
"--deploy-mode", "client",
"--proxy-user", "test.user",
"--master", "k8s://host:port",
"--class", "org.SomeClass",
"--conf", "spark.kubernetes.submitInDriver=true",
"--conf", "spark.kubernetes.jars.avoidDownloadSchemes=s3a",
"--conf", "spark.hadoop.fs.s3a.impl=org.apache.spark.deploy.TestFileSystem",
"--conf", "spark.hadoop.fs.s3a.impl.disable.cache=true",
"--files", "src/test/resources/test_metrics_config.properties",
"--py-files", "src/test/resources/test_metrics_system.properties",
"--archives", "src/test/resources/log4j2.properties",
"--jars", s"src/test/resources/TestUDTF.jar,$remoteJarFile",
"/home/jarToIgnore.jar",
"arg1")
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))
conf.get("spark.master") should be("k8s://https://host:port")
conf.get("spark.jars").contains(remoteJarFile) shouldBe true
conf.get("spark.jars").contains("TestUDTF") shouldBe true

Files.exists(Paths.get("test_metrics_config.properties")) should be(true)
Files.exists(Paths.get("test_metrics_system.properties")) should be(true)
Files.exists(Paths.get("log4j2.properties")) should be(true)
Files.exists(Paths.get("TestUDTF.jar")) should be(true)
Files.exists(Paths.get(notToDownload.getName)) should be(false)
Files.delete(Paths.get("test_metrics_config.properties"))
Files.delete(Paths.get("test_metrics_system.properties"))
Files.delete(Paths.get("log4j2.properties"))
Files.delete(Paths.get("TestUDTF.jar"))
}
}

test("SPARK-43014: Set `spark.app.submitTime` if missing ") {
val clArgs1 = Seq(
"--deploy-mode", "client",
Expand Down
12 changes: 12 additions & 0 deletions docs/running-on-kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,18 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
<td>2.3.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.jars.avoidDownloadSchemes</code></td>
<td><code>(none)</code></td>
<td>
Comma-separated list of schemes for which jars will NOT be downloaded to the
driver local disk prior to be distributed to executors, only for kubernetes deployment.
For use in cases when the jars are big and executor counts are high,
concurrent download causes network saturation and timeouts.
Wildcard '*' is denoted to not downloading jars for any the schemes.
</td>
<td>4.0.0</td>
</tr>
<tr>
<td><code>spark.kubernetes.authenticate.submission.caCertFile</code></td>
<td>(none)</td>
Expand Down

0 comments on commit 28a514a

Please sign in to comment.