Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47475][CORE][K8S] Support spark.kubernetes.jars.avoidDownloadSchemes for K8s Cluster Mode #45715

Closed
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feature(SparkSubmit): make jar download to driver optional
  • Loading branch information
jiale_tan committed Mar 26, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit f99898f8e717492c31d206b083094efc3d302b70
27 changes: 20 additions & 7 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
@@ -401,16 +401,24 @@ private[spark] class SparkSubmit extends Logging {
// SPARK-33782 : This downloads all the files , jars , archiveFiles and pyfiles to current
// working directory
// SPARK-43540: add current working directory into driver classpath
// SPARK-47475: make download to driver optional so executors may fetch resource from remote
// url directly to avoid overwhelming driver network when resource is big and executor count
// is high
val workingDirectory = "."
childClasspath += workingDirectory
def downloadResourcesToCurrentDirectory(uris: String, isArchive: Boolean = false):

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to add a new empty line here.

def downloadResourcesToCurrentDirectory(uris: String,
isArchive: Boolean = false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you take a look at Apache Spark Coding Guideline at this time?

For Scala code, Apache Spark follows the official Scala style guide and Databricks Scala guide. The latter is preferred. To format Scala code, run ./dev/scalafmt prior to submitting a PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this case, the following is the rule.

For method declarations, use 4 space indentation for their parameters

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, scalafmt is only recommended for newly added code. Please don't touch the existing code.

Copy link
Author

@leletan leletan Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tips @dongjoon-hyun!
I tried running ./dev/scalafmt, somehow a lot of existing code was refactored for style. Is this expected? Is there something I missed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

avoidDownload: String => Boolean = _ => false):
String = {
val resolvedUris = Utils.stringToSeq(uris).map(Utils.resolveURI)
val (avoidDownloads, toDownloads) =
resolvedUris.partition(uri => avoidDownload(uri.getScheme))
val localResources = downloadFileList(
resolvedUris.map(
toDownloads.map(
Utils.getUriBuilder(_).fragment(null).build().toString).mkString(","),
targetDir, sparkConf, hadoopConf)
Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(resolvedUris).map {
(Utils.stringToSeq(localResources).map(Utils.resolveURI).zip(toDownloads).map {
case (localResources, resolvedUri) =>
val source = new File(localResources.getPath).getCanonicalFile
val dest = new File(
@@ -427,14 +435,19 @@ private[spark] class SparkSubmit extends Logging {
// Keep the URIs of local files with the given fragments.
Utils.getUriBuilder(
localResources).fragment(resolvedUri.getFragment).build().toString
}.mkString(",")
} ++ avoidDownloads.map(_.toString)).mkString(",")
}

val avoidJarDownloadSchemes = sparkConf.get(KUBERNETES_AVOID_JAR_DOWNLOAD_SCHEMES)

def avoidJarDownload(scheme: String): Boolean =
avoidJarDownloadSchemes.contains("*") || avoidJarDownloadSchemes.contains(scheme)

val filesLocalFiles = Option(args.files).map {
downloadResourcesToCurrentDirectory(_)
}.orNull
val jarsLocalJars = Option(args.jars).map {
downloadResourcesToCurrentDirectory(_)
val updatedJars = Option(args.jars).map {
downloadResourcesToCurrentDirectory(_, avoidDownload = avoidJarDownload)
}.orNull
val archiveLocalFiles = Option(args.archives).map {
downloadResourcesToCurrentDirectory(_, true)
@@ -445,7 +458,7 @@ private[spark] class SparkSubmit extends Logging {
args.files = filesLocalFiles
args.archives = archiveLocalFiles
args.pyFiles = pyLocalFiles
args.jars = jarsLocalJars
args.jars = updatedJars
}
}

12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
@@ -1458,6 +1458,18 @@ package object config {
.doubleConf
.createWithDefault(1.5)

private[spark] val KUBERNETES_AVOID_JAR_DOWNLOAD_SCHEMES =
ConfigBuilder("spark.kubernetes.jars.avoidDownloadSchemes")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, the AS-IS PR title match with this?

Make Jars Download to Driver Optional under K8s Cluster Mode

.doc("Comma-separated list of schemes for which jars will NOT be downloaded to the " +
"driver local disk prior to be distributed to executors, only for kubernetes deployment. " +
"For use in cases when the jars are big and executor counts are high, " +
"concurrent download causes network saturation and timeouts. " +
"Wildcard '*' is denoted to not downloading jars for any the schemes.")
.version("4.0.0")
.stringConf
.toSequence
.createWithDefault(Nil)

private[spark] val FORCE_DOWNLOAD_SCHEMES =
ConfigBuilder("spark.yarn.dist.forceDownloadSchemes")
.doc("Comma-separated list of schemes for which resources will be downloaded to the " +
42 changes: 42 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
@@ -558,6 +558,48 @@ class SparkSubmitSuite
Files.delete(Paths.get("TestUDTF.jar"))
}

test("SPARK-47475: Avoid jars download if scheme matches " +
"spark.kubernetes.jars.avoidDownloadSchemes " +
"in k8s client mode & driver runs inside a POD") {
val hadoopConf = new Configuration()
updateConfWithFakeS3Fs(hadoopConf)
withTempDir { tmpDir =>
val notToDownload = File.createTempFile("NotToDownload", ".jar", tmpDir)
val remoteJarFile = s"s3a://${notToDownload.getAbsolutePath}"

val clArgs = Seq(
"--deploy-mode", "client",
"--proxy-user", "test.user",
"--master", "k8s://host:port",
"--class", "org.SomeClass",
"--conf", "spark.kubernetes.submitInDriver=true",
"--conf", "spark.kubernetes.jars.avoidDownloadSchemes=s3a",
"--conf", "spark.hadoop.fs.s3a.impl=org.apache.spark.deploy.TestFileSystem",
"--conf", "spark.hadoop.fs.s3a.impl.disable.cache=true",
"--files", "src/test/resources/test_metrics_config.properties",
"--py-files", "src/test/resources/test_metrics_system.properties",
"--archives", "src/test/resources/log4j2.properties",
"--jars", s"src/test/resources/TestUDTF.jar,$remoteJarFile",
"/home/jarToIgnore.jar",
"arg1")
val appArgs = new SparkSubmitArguments(clArgs)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, Some(hadoopConf))
conf.get("spark.master") should be("k8s://https://host:port")
conf.get("spark.jars").contains(remoteJarFile) shouldBe true
conf.get("spark.jars").contains("TestUDTF") shouldBe true

Files.exists(Paths.get("test_metrics_config.properties")) should be(true)
Files.exists(Paths.get("test_metrics_system.properties")) should be(true)
Files.exists(Paths.get("log4j2.properties")) should be(true)
Files.exists(Paths.get("TestUDTF.jar")) should be(true)
Files.exists(Paths.get(notToDownload.getName)) should be(false)
Files.delete(Paths.get("test_metrics_config.properties"))
Files.delete(Paths.get("test_metrics_system.properties"))
Files.delete(Paths.get("log4j2.properties"))
Files.delete(Paths.get("TestUDTF.jar"))
}
}

test("SPARK-43014: Set `spark.app.submitTime` if missing ") {
val clArgs1 = Seq(
"--deploy-mode", "client",