diff --git a/plugins/jobtype/build.xml b/plugins/jobtype/build.xml index 1d474432..12957a3f 100644 --- a/plugins/jobtype/build.xml +++ b/plugins/jobtype/build.xml @@ -95,7 +95,7 @@ - + @@ -106,6 +106,7 @@ + diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopConfigurationInjector.java b/plugins/jobtype/src/azkaban/jobtype/HadoopConfigurationInjector.java index 37460f08..154d781d 100644 --- a/plugins/jobtype/src/azkaban/jobtype/HadoopConfigurationInjector.java +++ b/plugins/jobtype/src/azkaban/jobtype/HadoopConfigurationInjector.java @@ -128,7 +128,8 @@ private static void addHadoopProperties(Props props) { CommonJobProperties.OUT_NODES, CommonJobProperties.IN_NODES, CommonJobProperties.PROJECT_LAST_CHANGED_DATE, - CommonJobProperties.PROJECT_LAST_CHANGED_BY + CommonJobProperties.PROJECT_LAST_CHANGED_BY, + CommonJobProperties.SUBMIT_USER }; for(String propertyName : propsToInject) { diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java index 4a052a7f..8d3e1e89 100644 --- a/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java +++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java @@ -281,10 +281,11 @@ private static void sparkFlagPrefixHelper(Props jobProps, List argList) private static void sparkJarsHelper(Props jobProps, String workingDir, Logger log, List argList) { + String propSparkJars = + jobProps.getString(SparkJobArg.SPARK_JARS.azPropName, ""); String jarList = - HadoopJobUtils.resolveWildCardForJarSpec(workingDir, jobProps - .getString(SparkJobArg.SPARK_JARS.azPropName, - SparkJobArg.SPARK_JARS.defaultValue), log); + HadoopJobUtils + .resolveWildCardForJarSpec(workingDir, propSparkJars, log); if (jarList.length() > 0) { argList.add(SparkJobArg.SPARK_JARS.sparkParamName); argList.add(jarList); @@ -306,14 +307,6 @@ private static void handleStandardArgument(Props jobProps, if (jobProps.containsKey(sparkJobArg.azPropName)) { argList.add(sparkJobArg.sparkParamName); argList.add(jobProps.getString(sparkJobArg.azPropName)); - } else { - String defaultValue = sparkJobArg.defaultValue; - if (defaultValue.length() == 0) { - // do nothing - } else { - argList.add(sparkJobArg.sparkParamName); - argList.add(sparkJobArg.defaultValue); - } } } @@ -388,7 +381,7 @@ public void cancel() throws InterruptedException { info("Cancel called. Killing the Spark job on the cluster"); - String azExecId = jobProps.getString("azkaban.flow.execid"); + String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID); final String logFilePath = String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId, getId()); diff --git a/plugins/jobtype/src/azkaban/jobtype/SparkJobArg.java b/plugins/jobtype/src/azkaban/jobtype/SparkJobArg.java index b9d29947..db629115 100644 --- a/plugins/jobtype/src/azkaban/jobtype/SparkJobArg.java +++ b/plugins/jobtype/src/azkaban/jobtype/SparkJobArg.java @@ -3,61 +3,54 @@ public enum SparkJobArg { // standard spark submit arguments, ordered in the spark-submit --help order - MASTER("master", "yarn-cluster", false), // just to trick the eclipse formatter + MASTER("master", false), // just to trick the eclipse formatter DEPLOY_MODE("deploy-mode", false), // CLASS("class", false), // NAME("name", false), // - SPARK_JARS("jars", "./lib/*",true), // + SPARK_JARS("jars", true), // PACKAGES("packages", false), // REPOSITORIES("repositories", false), // PY_FILES("py-files", false), // FILES("files", false), // - SPARK_CONF_PREFIX("conf.", "--conf", "",true), // + SPARK_CONF_PREFIX("conf.", "--conf", true), // PROPERTIES_FILE("properties-file", false), // - DRIVER_MEMORY("driver-memory", "512M", false), // + DRIVER_MEMORY("driver-memory", false), // DRIVER_JAVA_OPTIONS("driver-java-options", true), // DRIVER_LIBRARY_PATH("driver-library-path", false), // DRIVER_CLASS_PATH("driver-class-path", false), // - EXECUTOR_MEMORY("executor-memory", "1g", false), // + EXECUTOR_MEMORY("executor-memory", false), // PROXY_USER("proxy-user", false), // - SPARK_FLAG_PREFIX("flag.", "--", "",true), // --help, --verbose, --supervise, --version + SPARK_FLAG_PREFIX("flag.", "--", true), // --help, --verbose, --supervise, --version // Yarn only Arguments - EXECUTOR_CORES("executor-cores", "1", false), // - DRIVER_CORES("driver-cores", "1", false), // - QUEUE("queue", "marathon", false), // - NUM_EXECUTORS("num-executors", "2", false), // + EXECUTOR_CORES("executor-cores", false), // + DRIVER_CORES("driver-cores", false), // + QUEUE("queue", false), // + NUM_EXECUTORS("num-executors", false), // ARCHIVES("archives", false), // PRINCIPAL("principal", false), // KEYTAB("keytab", false), // // Not SparkSubmit arguments: only exists in azkaban - EXECUTION_JAR("execution-jar", null, null,true), // - PARAMS("params", null, null,true), // + EXECUTION_JAR("execution-jar", null, true), // + PARAMS("params", null, true), // ; public static final String delimiter = "\u001A"; SparkJobArg(String propName, boolean specialTreatment) { - this(propName, "--" + propName, "",specialTreatment); + this(propName, "--" + propName, specialTreatment); } - SparkJobArg(String propName, String defaultValue, boolean specialTreatment) { - this(propName, "--" + propName, defaultValue,specialTreatment); - } - - SparkJobArg(String azPropName, String sparkParamName, String defaultValue, boolean specialTreatment) { + SparkJobArg(String azPropName, String sparkParamName, boolean specialTreatment) { this.azPropName = azPropName; - this.sparkParamName = sparkParamName; - this.defaultValue = defaultValue; + this.sparkParamName = sparkParamName; this.needSpecialTreatment = specialTreatment; } final String azPropName; - final String sparkParamName; - - final String defaultValue; + final String sparkParamName; final boolean needSpecialTreatment; diff --git a/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsExecutionJar.java b/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsExecutionJar.java index 451df99b..365da145 100644 --- a/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsExecutionJar.java +++ b/plugins/jobtype/test/azkaban/jobtype/TestHadoopJobUtilsExecutionJar.java @@ -63,7 +63,6 @@ public void testLibFolderHasNothingInIt() throws IOException { @Test public void testOneLibFolderExpansion() throws IOException { - String retval = HadoopJobUtils.resolveWildCardForJarSpec(workingDirString, "./lib/*", logger); Assert.assertEquals( @@ -72,7 +71,7 @@ public void testOneLibFolderExpansion() throws IOException { } @Test - public void testTwoLibFolderExpansion() throws IOException { + public void testTwoLibFolderExpansionAllFilesResolved() throws IOException { File lib2FolderFile = new File(workingDirFile, "lib2"); lib2FolderFile.mkdirs(); File lib2test1Jar = new File(lib2FolderFile, "test1.jar"); @@ -82,8 +81,27 @@ public void testTwoLibFolderExpansion() throws IOException { String retval = HadoopJobUtils.resolveWildCardForJarSpec(workingDirString, "./lib/*,./lib2/*", logger); - Assert.assertEquals( - retval, - "/tmp/TestHadoopSpark/./lib/library.jar,/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar,/tmp/TestHadoopSpark/./lib2/test1.jar,/tmp/TestHadoopSpark/./lib2/test2.jar"); + Assert.assertTrue(retval.contains("/tmp/TestHadoopSpark/./lib/library.jar")); + Assert.assertTrue(retval.contains("/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar")); + Assert.assertTrue(retval.contains("/tmp/TestHadoopSpark/./lib2/test1.jar")); + Assert.assertTrue(retval.contains("/tmp/TestHadoopSpark/./lib2/test2.jar")); + } + + @Test + public void testTwoLibFolderExpansionExpandsInOrder() throws IOException { + + executionJarFile.delete(); + + File lib2FolderFile = new File(workingDirFile, "lib2"); + lib2FolderFile.mkdirs(); + File lib2test1Jar = new File(lib2FolderFile, "test1.jar"); + lib2test1Jar.createNewFile(); + + String retval = HadoopJobUtils.resolveWildCardForJarSpec(workingDirString, "./lib/*,./lib2/*", + logger); + + Assert.assertEquals( + retval, + "/tmp/TestHadoopSpark/./lib/library.jar,/tmp/TestHadoopSpark/./lib2/test1.jar"); } } \ No newline at end of file diff --git a/plugins/jobtype/test/azkaban/jobtype/TestHadoopSparkJobGetMainArguments.java b/plugins/jobtype/test/azkaban/jobtype/TestHadoopSparkJobGetMainArguments.java index 554e7a58..171b0cd4 100644 --- a/plugins/jobtype/test/azkaban/jobtype/TestHadoopSparkJobGetMainArguments.java +++ b/plugins/jobtype/test/azkaban/jobtype/TestHadoopSparkJobGetMainArguments.java @@ -55,32 +55,12 @@ public void testDefault() { String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger); // the first one, so no delimiter at front - Assert.assertTrue(retval.contains(SparkJobArg.DRIVER_JAVA_OPTIONS.sparkParamName + delim)); - Assert.assertTrue(retval - .contains(delim - + "-Dazkaban.link.workflow.url=http://azkaban.link.workflow.url -Dazkaban.link.job.url=http://azkaban.link.job.url -Dazkaban.link.execution.url=http://azkaban.link.execution.url -Dazkaban.link.attempt.url=http://azkaban.link.attempt.url" - + delim)); - Assert.assertTrue(retval.contains(delim + SparkJobArg.MASTER.sparkParamName + delim - + "yarn-cluster" + delim)); - Assert.assertTrue(retval - .contains(delim - + SparkJobArg.SPARK_JARS.sparkParamName - + delim - + "/tmp/TestHadoopSpark/./lib/library.jar,/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar" - + delim)); + // due to new communication mechanism between HAdoopSparkJob and HadoopSparkSecureWrapper, + // these Azkaban variables are sent through the configuration file and not through the command line + Assert.assertTrue(retval.contains(SparkJobArg.DRIVER_JAVA_OPTIONS.sparkParamName + delim + "" + delim)); Assert.assertTrue(retval.contains(delim + SparkJobArg.CLASS.sparkParamName + delim + "hadoop.spark.job.test.ExecutionClass" + delim)); - Assert.assertTrue(retval.contains(delim + SparkJobArg.NUM_EXECUTORS.sparkParamName + delim - + "2" + delim)); - Assert.assertTrue(retval.contains(delim + SparkJobArg.EXECUTOR_CORES.sparkParamName + delim - + "1" + delim)); - Assert.assertTrue(retval.contains(delim + SparkJobArg.QUEUE.sparkParamName + delim + "marathon" - + delim)); - Assert.assertTrue(retval.contains(delim + SparkJobArg.DRIVER_MEMORY.sparkParamName + delim - + "512M" + delim)); - Assert.assertTrue(retval.contains(delim + SparkJobArg.EXECUTOR_MEMORY.sparkParamName + delim - + "1g" + delim)); - // last one, no delimiter at back + // last one, no delimiter at back Assert.assertTrue(retval.contains(delim + "/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar")); @@ -272,9 +252,7 @@ public void testDriverJavaOptions() { String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger); // only on the ending side has the delimiter - Assert.assertTrue(retval - .contains(" -Dazkaban.link.attempt.url=http://azkaban.link.attempt.url -Dabc=def -Dfgh=ijk" - + delim)); + Assert.assertTrue(retval.contains(" -Dabc=def -Dfgh=ijk" + delim)); } @Test