diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index c8cbedd9ea36d..c60fbe537cbdd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -401,16 +401,23 @@ private[spark] class SparkSubmit extends Logging { // SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current // working directory // SPARK-43540: add current working directory into driver classpath + // SPARK-47475: make download to driver optional so executors may fetch resource from remote + // url directly to avoid overwhelming driver network when resource is big and executor count + // is high val workingDirectory = "." childClasspath += workingDirectory - def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false): - String = { + def downloadResourcesToCurrentDirectory( + uris: String, + isArchive: Boolean = false, + avoidDownload: String => Boolean = _ => false): String = { val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI) + val (avoidDownloads, toDownloads) = + resolvedUris.partition(uri => avoidDownload(uri.getScheme)) val localResources = downloadFileList( - resolvedUris.map( + toDownloads.map( Utils.getUriBuilder(_).fragment(null).build().toString).mkString(","), targetDir, sparkConf, hadoopConf) - Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map { + (Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(toDownloads).map { case (localResources, resolvedUri) => val source = new File(localResources.getPath).getCanonicalFile val dest = new File( @@ -427,14 +434,19 @@ private[spark] class SparkSubmit extends Logging { // Keep the URIs of local files with the given fragments. Utils.getUriBuilder( localResources).fragment(resolvedUri.getFragment).build().toString - }.mkString(",") + } ++ avoidDownloads.map(_.toString)).mkString(",") } + val avoidJarDownloadSchemes = sparkConf.get(KUBERNETES_JARS_AVOID_DOWNLOAD_SCHEMES) + + def avoidJarDownload(scheme: String): Boolean = + avoidJarDownloadSchemes.contains("*") || avoidJarDownloadSchemes.contains(scheme) + val filesLocalFiles = Option(args.files).map { downloadResourcesToCurrentDirectory(_) }.orNull - val jarsLocalJars = Option(args.jars).map { - downloadResourcesToCurrentDirectory(_) + val updatedJars = Option(args.jars).map { + downloadResourcesToCurrentDirectory(_, avoidDownload = avoidJarDownload) }.orNull val archiveLocalFiles = Option(args.archives).map { downloadResourcesToCurrentDirectory(_, true) @@ -445,7 +457,7 @@ private[spark] class SparkSubmit extends Logging { args.files = filesLocalFiles args.archives = archiveLocalFiles args.pyFiles = pyLocalFiles - args.jars = jarsLocalJars + args.jars = updatedJars } } diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 49f24dfbd826b..5a6c52481c645 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1458,6 +1458,18 @@ package object config { .doubleConf .createWithDefault(1.5) + private[spark] val KUBERNETES_JARS_AVOID_DOWNLOAD_SCHEMES = + ConfigBuilder("spark.kubernetes.jars.avoidDownloadSchemes") + .doc("Comma-separated list of schemes for which jars will NOT be downloaded to the " + + "driver local disk prior to be distributed to executors, only for kubernetes deployment. " + + "For use in cases when the jars are big and executor counts are high, " + + "concurrent download causes network saturation and timeouts. " + + "Wildcard '*' is denoted to not downloading jars for any the schemes.") + .version("4.0.0") + .stringConf + .toSequence + .createWithDefault(Nil) + private[spark] val FORCE_DOWNLOAD_SCHEMES = ConfigBuilder("spark.yarn.dist.forceDownloadSchemes") .doc("Comma-separated list of schemes for which resources will be downloaded to the " + diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index bec0a90d0f471..f55c00d7d61a5 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -558,6 +558,48 @@ class SparkSubmitSuite Files.delete(Paths.get("TestUDTF.jar")) } + test("SPARK-47475: Avoid jars download if scheme matches " + + "spark.kubernetes.jars.avoidDownloadSchemes " + + "in k8s client mode & driver runs inside a POD") { + val hadoopConf = new Configuration() + updateConfWithFakeS3Fs(hadoopConf) + withTempDir { tmpDir => + val notToDownload = File.createTempFile("NotToDownload", ".jar", tmpDir) + val remoteJarFile = s"s3a://${notToDownload.getAbsolutePath}" + + val clArgs = Seq( + "--deploy-mode", "client", + "--proxy-user", "test.user", + "--master", "k8s://host:port", + "--class", "org.SomeClass", + "--conf", "spark.kubernetes.submitInDriver=true", + "--conf", "spark.kubernetes.jars.avoidDownloadSchemes=s3a", + "--conf", "spark.hadoop.fs.s3a.impl=org.apache.spark.deploy.TestFileSystem", + "--conf", "spark.hadoop.fs.s3a.impl.disable.cache=true", + "--files", "src/test/resources/test_metrics_config.properties", + "--py-files", "src/test/resources/test_metrics_system.properties", + "--archives", "src/test/resources/log4j2.properties", + "--jars", s"src/test/resources/TestUDTF.jar,$remoteJarFile", + "/home/jarToIgnore.jar", + "arg1") + val appArgs = new SparkSubmitArguments(clArgs) + val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, Some(hadoopConf)) + conf.get("spark.master") should be("k8s://https://host:port") + conf.get("spark.jars").contains(remoteJarFile) shouldBe true + conf.get("spark.jars").contains("TestUDTF") shouldBe true + + Files.exists(Paths.get("test_metrics_config.properties")) should be(true) + Files.exists(Paths.get("test_metrics_system.properties")) should be(true) + Files.exists(Paths.get("log4j2.properties")) should be(true) + Files.exists(Paths.get("TestUDTF.jar")) should be(true) + Files.exists(Paths.get(notToDownload.getName)) should be(false) + Files.delete(Paths.get("test_metrics_config.properties")) + Files.delete(Paths.get("test_metrics_system.properties")) + Files.delete(Paths.get("log4j2.properties")) + Files.delete(Paths.get("TestUDTF.jar")) + } + } + test("SPARK-43014: Set `spark.app.submitTime` if missing ") { val clArgs1 = Seq( "--deploy-mode", "client", diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 01e9d6382c182..778af5f0751a8 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -684,6 +684,18 @@ See the [configuration page](configuration.html) for information on Spark config 2.3.0 + + spark.kubernetes.jars.avoidDownloadSchemes + (none) + + Comma-separated list of schemes for which jars will NOT be downloaded to the + driver local disk prior to be distributed to executors, only for kubernetes deployment. + For use in cases when the jars are big and executor counts are high, + concurrent download causes network saturation and timeouts. + Wildcard '*' is denoted to not downloading jars for any the schemes. + + 4.0.0 + spark.kubernetes.authenticate.submission.caCertFile (none)