From 28a514ac4bb5b66939053958e927059140b8e1fc Mon Sep 17 00:00:00 2001 From: jiale_tan Date: Thu, 28 Mar 2024 08:52:27 -0700 Subject: [PATCH] [SPARK-47475][CORE][K8S] Support `spark.kubernetes.jars.avoidDownloadSchemes` for K8s Cluster Mode ### What changes were proposed in this pull request? During spark submit, for K8s cluster mode driver, instead of always downloading jars and serving it to executors, avoid the download if the url matches `spark.kubernetes.jars.avoidDownloadSchemes` in the configuration. ### Why are the changes needed? For K8s cluster mode driver, `SparkSubmit` will download all the jars in the `spark.jars` to driver and then those jars' urls in `spark.jars` will be replaced by the driver local paths. Later when driver starts the `SparkContext`, it will copy all the `spark.jars` to `spark.app.initial.jar.urls`, start a file server and replace the jars with driver local paths in `spark.app.initial.jar.urls` with file service urls. When the executors start, they will download those driver local jars by `spark.app.initial.jar.urls`. When jars are big and the spark application requests a lot of executors, the executors' massive concurrent download of the jars from the driver will cause network saturation. In this case, the executors jar download will timeout, causing executors to be terminated. From user point of view, the application is trapped in the loop of massive executor loss and re-provision, but never gets enough live executors as requested, leads to SLA breach or sometimes failure. So instead of letting driver to download the jars and then serve them to executors, if we just avoid driver from downloading the jars and keeping the urls in `spark.jars` as they were, the executor will try to directly download the jars from the urls provided by user. This will avoid the driver download bottleneck mentioned above, especially when jar urls are with scalable storage schemes, like s3 or hdfs. Meanwhile, there are cases jar urls are with schemes of less scalable than driver file server, e.g. http, ftp, etc, or when the jars are small, or executor count is small - user may still want to fall back to current solution and use driver file server to serve the jars. So in this case, make the driver jars downloading and serving optional by scheme (similar idea to `FORCE_DOWNLOAD_SCHEMES` in YARN) is a good approach for the solution. ### Does this PR introduce _any_ user-facing change? A configuration `spark.kubernetes.jars.avoidDownloadSchemes` is added ### How was this patch tested? - Unit tests added - Tested with an application running on AWS EKS submitted with a 1GB jar on s3. - Before the fix, the application could not scale to 1k live executors. - After the fix, the application had no problem to scale beyond 12k live executors. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45715 from leletan/allow_k8s_executor_to_download_remote_jar. Authored-by: jiale_tan Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/deploy/SparkSubmit.scala | 28 +++++++++---- .../spark/internal/config/package.scala | 12 ++++++ .../spark/deploy/SparkSubmitSuite.scala | 42 +++++++++++++++++++ docs/running-on-kubernetes.md | 12 ++++++ 4 files changed, 86 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c8cbedd9ea36d..c60fbe537cbdd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -401,16 +401,23 @@ private[spark] class SparkSubmit extends Logging { // SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current // working directory // SPARK-43540: add current working directory into driver classpath + // SPARK-47475: make download to driver optional so executors may fetch resource from remote + // url directly to avoid overwhelming driver network when resource is big and executor count + // is high val workingDirectory = "." childClasspath += workingDirectory - def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false): - String = { + def downloadResourcesToCurrentDirectory( + uris: String, + isArchive: Boolean = false, + avoidDownload: String => Boolean = _ => false): String = { val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI) + val (avoidDownloads, toDownloads) = + resolvedUris.partition(uri => avoidDownload(uri.getScheme)) val localResources = downloadFileList( - resolvedUris.map( + toDownloads.map( Utils.getUriBuilder(_).fragment(null).build().toString).mkString(","), targetDir, sparkConf, hadoopConf) - Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map { + (Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(toDownloads).map { case (localResources, resolvedUri) => val source = new File(localResources.getPath).getCanonicalFile val dest = new File( @@ -427,14 +434,19 @@ private[spark] class SparkSubmit extends Logging { // Keep the URIs of local files with the given fragments. Utils.getUriBuilder( localResources).fragment(resolvedUri.getFragment).build().toString - }.mkString(",") + } ++ avoidDownloads.map(_.toString)).mkString(",") } + val avoidJarDownloadSchemes = sparkConf.get(KUBERNETES_JARS_AVOID_DOWNLOAD_SCHEMES) + + def avoidJarDownload(scheme: String): Boolean = + avoidJarDownloadSchemes.contains("*") || avoidJarDownloadSchemes.contains(scheme) + val filesLocalFiles = Option(args.files).map { downloadResourcesToCurrentDirectory(_) }.orNull - val jarsLocalJars = Option(args.jars).map { - downloadResourcesToCurrentDirectory(_) + val updatedJars = Option(args.jars).map { + downloadResourcesToCurrentDirectory(_, avoidDownload = avoidJarDownload) }.orNull val archiveLocalFiles = Option(args.archives).map { downloadResourcesToCurrentDirectory(_, true) @@ -445,7 +457,7 @@ private[spark] class SparkSubmit extends Logging { args.files = filesLocalFiles args.archives = archiveLocalFiles args.pyFiles = pyLocalFiles - args.jars = jarsLocalJars + args.jars = updatedJars } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 49f24dfbd826b..5a6c52481c645 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1458,6 +1458,18 @@ package object config { .doubleConf .createWithDefault(1.5) + private[spark] val KUBERNETES_JARS_AVOID_DOWNLOAD_SCHEMES = + ConfigBuilder("spark.kubernetes.jars.avoidDownloadSchemes") + .doc("Comma-separated list of schemes for which jars will NOT be downloaded to the " + + "driver local disk prior to be distributed to executors, only for kubernetes deployment. " + + "For use in cases when the jars are big and executor counts are high, " + + "concurrent download causes network saturation and timeouts. " + + "Wildcard '*' is denoted to not downloading jars for any the schemes.") + .version("4.0.0") + .stringConf + .toSequence + .createWithDefault(Nil) + private[spark] val FORCE_DOWNLOAD_SCHEMES = ConfigBuilder("spark.yarn.dist.forceDownloadSchemes") .doc("Comma-separated list of schemes for which resources will be downloaded to the " + diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index bec0a90d0f471..f55c00d7d61a5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -558,6 +558,48 @@ class SparkSubmitSuite Files.delete(Paths.get("TestUDTF.jar")) } + test("SPARK-47475: Avoid jars download if scheme matches " + + "spark.kubernetes.jars.avoidDownloadSchemes " + + "in k8s client mode & driver runs inside a POD") { + val hadoopConf = new Configuration() + updateConfWithFakeS3Fs(hadoopConf) + withTempDir { tmpDir => + val notToDownload = File.createTempFile("NotToDownload", ".jar", tmpDir) + val remoteJarFile = s"s3a://${notToDownload.getAbsolutePath}" + + val clArgs = Seq( + "--deploy-mode", "client", + "--proxy-user", "test.user", + "--master", "k8s://host:port", + "--class", "org.SomeClass", + "--conf", "spark.kubernetes.submitInDriver=true", + "--conf", "spark.kubernetes.jars.avoidDownloadSchemes=s3a", + "--conf", "spark.hadoop.fs.s3a.impl=org.apache.spark.deploy.TestFileSystem", + "--conf", "spark.hadoop.fs.s3a.impl.disable.cache=true", + "--files", "src/test/resources/test_metrics_config.properties", + "--py-files", "src/test/resources/test_metrics_system.properties", + "--archives", "src/test/resources/log4j2.properties", + "--jars", s"src/test/resources/TestUDTF.jar,$remoteJarFile", + "/home/jarToIgnore.jar", + "arg1") + val appArgs = new SparkSubmitArguments(clArgs) + val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, Some(hadoopConf)) + conf.get("spark.master") should be("k8s://https://host:port") + conf.get("spark.jars").contains(remoteJarFile) shouldBe true + conf.get("spark.jars").contains("TestUDTF") shouldBe true + + Files.exists(Paths.get("test_metrics_config.properties")) should be(true) + Files.exists(Paths.get("test_metrics_system.properties")) should be(true) + Files.exists(Paths.get("log4j2.properties")) should be(true) + Files.exists(Paths.get("TestUDTF.jar")) should be(true) + Files.exists(Paths.get(notToDownload.getName)) should be(false) + Files.delete(Paths.get("test_metrics_config.properties")) + Files.delete(Paths.get("test_metrics_system.properties")) + Files.delete(Paths.get("log4j2.properties")) + Files.delete(Paths.get("TestUDTF.jar")) + } + } + test("SPARK-43014: Set `spark.app.submitTime` if missing ") { val clArgs1 = Seq( "--deploy-mode", "client", diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 01e9d6382c182..778af5f0751a8 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -684,6 +684,18 @@ See the [configuration page](configuration.html) for information on Spark config 2.3.0 + + spark.kubernetes.jars.avoidDownloadSchemes + (none) + + Comma-separated list of schemes for which jars will NOT be downloaded to the + driver local disk prior to be distributed to executors, only for kubernetes deployment. + For use in cases when the jars are big and executor counts are high, + concurrent download causes network saturation and timeouts. + Wildcard '*' is denoted to not downloading jars for any the schemes. + + 4.0.0 + spark.kubernetes.authenticate.submission.caCertFile (none)