Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spark updates to 1.5 and also removing setting of default #204

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion plugins/jobtype/build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@
</target>

<target name="junit" depends="build" description="running test cases">
<junit printsummary="yes">
<junit haltonfailure="no" failureproperty="test.failed" printsummary="yes">
<classpath>
<path refid="main.classpath" />
<pathelement path="${dist.classes.dir}" />
Expand All @@ -106,6 +106,7 @@
<fileset dir="${dist.classes.test.dir}" includes="**/*Test*.class" />
</batchtest>
</junit>
<fail message="Test failure detected, check test results." if="test.failed" />
</target>

<target name="jars" depends="junit" description="Compile main source tree java files">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ private static void addHadoopProperties(Props props) {
CommonJobProperties.OUT_NODES,
CommonJobProperties.IN_NODES,
CommonJobProperties.PROJECT_LAST_CHANGED_DATE,
CommonJobProperties.PROJECT_LAST_CHANGED_BY
CommonJobProperties.PROJECT_LAST_CHANGED_BY,
CommonJobProperties.SUBMIT_USER
};

for(String propertyName : propsToInject) {
Expand Down
17 changes: 5 additions & 12 deletions plugins/jobtype/src/azkaban/jobtype/HadoopSparkJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,11 @@ private static void sparkFlagPrefixHelper(Props jobProps, List<String> argList)

private static void sparkJarsHelper(Props jobProps, String workingDir,
Logger log, List<String> argList) {
String propSparkJars =
jobProps.getString(SparkJobArg.SPARK_JARS.azPropName, "");
String jarList =
HadoopJobUtils.resolveWildCardForJarSpec(workingDir, jobProps
.getString(SparkJobArg.SPARK_JARS.azPropName,
SparkJobArg.SPARK_JARS.defaultValue), log);
HadoopJobUtils
.resolveWildCardForJarSpec(workingDir, propSparkJars, log);
if (jarList.length() > 0) {
argList.add(SparkJobArg.SPARK_JARS.sparkParamName);
argList.add(jarList);
Expand All @@ -306,14 +307,6 @@ private static void handleStandardArgument(Props jobProps,
if (jobProps.containsKey(sparkJobArg.azPropName)) {
argList.add(sparkJobArg.sparkParamName);
argList.add(jobProps.getString(sparkJobArg.azPropName));
} else {
String defaultValue = sparkJobArg.defaultValue;
if (defaultValue.length() == 0) {
// do nothing
} else {
argList.add(sparkJobArg.sparkParamName);
argList.add(sparkJobArg.defaultValue);
}
}
}

Expand Down Expand Up @@ -388,7 +381,7 @@ public void cancel() throws InterruptedException {

info("Cancel called. Killing the Spark job on the cluster");

String azExecId = jobProps.getString("azkaban.flow.execid");
String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID);
final String logFilePath =
String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
getId());
Expand Down
39 changes: 16 additions & 23 deletions plugins/jobtype/src/azkaban/jobtype/SparkJobArg.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,61 +3,54 @@
public enum SparkJobArg {

// standard spark submit arguments, ordered in the spark-submit --help order
MASTER("master", "yarn-cluster", false), // just to trick the eclipse formatter
MASTER("master", false), // just to trick the eclipse formatter
DEPLOY_MODE("deploy-mode", false), //
CLASS("class", false), //
NAME("name", false), //
SPARK_JARS("jars", "./lib/*",true), //
SPARK_JARS("jars", true), //
PACKAGES("packages", false), //
REPOSITORIES("repositories", false), //
PY_FILES("py-files", false), //
FILES("files", false), //
SPARK_CONF_PREFIX("conf.", "--conf", "",true), //
SPARK_CONF_PREFIX("conf.", "--conf", true), //
PROPERTIES_FILE("properties-file", false), //
DRIVER_MEMORY("driver-memory", "512M", false), //
DRIVER_MEMORY("driver-memory", false), //
DRIVER_JAVA_OPTIONS("driver-java-options", true), //
DRIVER_LIBRARY_PATH("driver-library-path", false), //
DRIVER_CLASS_PATH("driver-class-path", false), //
EXECUTOR_MEMORY("executor-memory", "1g", false), //
EXECUTOR_MEMORY("executor-memory", false), //
PROXY_USER("proxy-user", false), //
SPARK_FLAG_PREFIX("flag.", "--", "",true), // --help, --verbose, --supervise, --version
SPARK_FLAG_PREFIX("flag.", "--", true), // --help, --verbose, --supervise, --version

// Yarn only Arguments
EXECUTOR_CORES("executor-cores", "1", false), //
DRIVER_CORES("driver-cores", "1", false), //
QUEUE("queue", "marathon", false), //
NUM_EXECUTORS("num-executors", "2", false), //
EXECUTOR_CORES("executor-cores", false), //
DRIVER_CORES("driver-cores", false), //
QUEUE("queue", false), //
NUM_EXECUTORS("num-executors", false), //
ARCHIVES("archives", false), //
PRINCIPAL("principal", false), //
KEYTAB("keytab", false), //

// Not SparkSubmit arguments: only exists in azkaban
EXECUTION_JAR("execution-jar", null, null,true), //
PARAMS("params", null, null,true), //
EXECUTION_JAR("execution-jar", null, true), //
PARAMS("params", null, true), //
;

public static final String delimiter = "\u001A";

SparkJobArg(String propName, boolean specialTreatment) {
this(propName, "--" + propName, "",specialTreatment);
this(propName, "--" + propName, specialTreatment);
}

SparkJobArg(String propName, String defaultValue, boolean specialTreatment) {
this(propName, "--" + propName, defaultValue,specialTreatment);
}

SparkJobArg(String azPropName, String sparkParamName, String defaultValue, boolean specialTreatment) {
SparkJobArg(String azPropName, String sparkParamName, boolean specialTreatment) {
this.azPropName = azPropName;
this.sparkParamName = sparkParamName;
this.defaultValue = defaultValue;
this.sparkParamName = sparkParamName;
this.needSpecialTreatment = specialTreatment;
}

final String azPropName;

final String sparkParamName;

final String defaultValue;
final String sparkParamName;

final boolean needSpecialTreatment;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ public void testLibFolderHasNothingInIt() throws IOException {

@Test
public void testOneLibFolderExpansion() throws IOException {

String retval = HadoopJobUtils.resolveWildCardForJarSpec(workingDirString, "./lib/*", logger);

Assert.assertEquals(
Expand All @@ -72,7 +71,7 @@ public void testOneLibFolderExpansion() throws IOException {
}

@Test
public void testTwoLibFolderExpansion() throws IOException {
public void testTwoLibFolderExpansionAllFilesResolved() throws IOException {
File lib2FolderFile = new File(workingDirFile, "lib2");
lib2FolderFile.mkdirs();
File lib2test1Jar = new File(lib2FolderFile, "test1.jar");
Expand All @@ -82,8 +81,27 @@ public void testTwoLibFolderExpansion() throws IOException {
String retval = HadoopJobUtils.resolveWildCardForJarSpec(workingDirString, "./lib/*,./lib2/*",
logger);

Assert.assertEquals(
retval,
"/tmp/TestHadoopSpark/./lib/library.jar,/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar,/tmp/TestHadoopSpark/./lib2/test1.jar,/tmp/TestHadoopSpark/./lib2/test2.jar");
Assert.assertTrue(retval.contains("/tmp/TestHadoopSpark/./lib/library.jar"));
Assert.assertTrue(retval.contains("/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar"));
Assert.assertTrue(retval.contains("/tmp/TestHadoopSpark/./lib2/test1.jar"));
Assert.assertTrue(retval.contains("/tmp/TestHadoopSpark/./lib2/test2.jar"));
}

@Test
public void testTwoLibFolderExpansionExpandsInOrder() throws IOException {

executionJarFile.delete();

File lib2FolderFile = new File(workingDirFile, "lib2");
lib2FolderFile.mkdirs();
File lib2test1Jar = new File(lib2FolderFile, "test1.jar");
lib2test1Jar.createNewFile();

String retval = HadoopJobUtils.resolveWildCardForJarSpec(workingDirString, "./lib/*,./lib2/*",
logger);

Assert.assertEquals(
retval,
"/tmp/TestHadoopSpark/./lib/library.jar,/tmp/TestHadoopSpark/./lib2/test1.jar");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,32 +55,12 @@ public void testDefault() {
String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);

// the first one, so no delimiter at front
Assert.assertTrue(retval.contains(SparkJobArg.DRIVER_JAVA_OPTIONS.sparkParamName + delim));
Assert.assertTrue(retval
.contains(delim
+ "-Dazkaban.link.workflow.url=http://azkaban.link.workflow.url -Dazkaban.link.job.url=http://azkaban.link.job.url -Dazkaban.link.execution.url=http://azkaban.link.execution.url -Dazkaban.link.attempt.url=http://azkaban.link.attempt.url"
+ delim));
Assert.assertTrue(retval.contains(delim + SparkJobArg.MASTER.sparkParamName + delim
+ "yarn-cluster" + delim));
Assert.assertTrue(retval
.contains(delim
+ SparkJobArg.SPARK_JARS.sparkParamName
+ delim
+ "/tmp/TestHadoopSpark/./lib/library.jar,/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar"
+ delim));
// due to new communication mechanism between HAdoopSparkJob and HadoopSparkSecureWrapper,
// these Azkaban variables are sent through the configuration file and not through the command line
Assert.assertTrue(retval.contains(SparkJobArg.DRIVER_JAVA_OPTIONS.sparkParamName + delim + "" + delim));
Assert.assertTrue(retval.contains(delim + SparkJobArg.CLASS.sparkParamName + delim
+ "hadoop.spark.job.test.ExecutionClass" + delim));
Assert.assertTrue(retval.contains(delim + SparkJobArg.NUM_EXECUTORS.sparkParamName + delim
+ "2" + delim));
Assert.assertTrue(retval.contains(delim + SparkJobArg.EXECUTOR_CORES.sparkParamName + delim
+ "1" + delim));
Assert.assertTrue(retval.contains(delim + SparkJobArg.QUEUE.sparkParamName + delim + "marathon"
+ delim));
Assert.assertTrue(retval.contains(delim + SparkJobArg.DRIVER_MEMORY.sparkParamName + delim
+ "512M" + delim));
Assert.assertTrue(retval.contains(delim + SparkJobArg.EXECUTOR_MEMORY.sparkParamName + delim
+ "1g" + delim));
// last one, no delimiter at back
// last one, no delimiter at back
Assert.assertTrue(retval.contains(delim
+ "/tmp/TestHadoopSpark/./lib/hadoop-spark-job-test-execution-x.y.z-a.b.c.jar"));

Expand Down Expand Up @@ -272,9 +252,7 @@ public void testDriverJavaOptions() {
String retval = HadoopSparkJob.testableGetMainArguments(jobProps, workingDirString, logger);

// only on the ending side has the delimiter
Assert.assertTrue(retval
.contains(" -Dazkaban.link.attempt.url=http://azkaban.link.attempt.url -Dabc=def -Dfgh=ijk"
+ delim));
Assert.assertTrue(retval.contains(" -Dabc=def -Dfgh=ijk" + delim));
}

@Test
Expand Down