diff --git a/extlib/hive-beeline-0.12.0.jar b/extlib/hive-beeline-0.12.0.jar
new file mode 100644
index 00000000..696cf7ec
Binary files /dev/null and b/extlib/hive-beeline-0.12.0.jar differ
diff --git a/plugins/jobtype/build.xml b/plugins/jobtype/build.xml
index 2810df28..09a8f15c 100644
--- a/plugins/jobtype/build.xml
+++ b/plugins/jobtype/build.xml
@@ -1,201 +1,208 @@
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
+
+
-
-
-
+
+
+
-
-
+
+
-
-
-
-
-
-
+ target="1.8" debug="true" deprecation="false" failonerror="true">
+
+
+
+
+
+
-
+
-
-
-
-
-
+
+
+
+
+
+
-
+
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/plugins/jobtype/jobtypes/beelinehive/private.properties b/plugins/jobtype/jobtypes/beelinehive/private.properties
new file mode 100644
index 00000000..b0292f2d
--- /dev/null
+++ b/plugins/jobtype/jobtypes/beelinehive/private.properties
@@ -0,0 +1,3 @@
+jobtype.class=azkaban.jobtype.BeelineHiveJob
+hadoop.home=/usr/hdp/current
+jobtype.classpath=${hadoop.home}/hadoop-client/hadoop-common.jar,${hadoop-home}/hadoop-client/lib/*,${hadoop.home}/hive-client/lib/*,/etc/hadoop/conf
diff --git a/plugins/jobtype/src/azkaban/jobtype/BeelineHiveJob.java b/plugins/jobtype/src/azkaban/jobtype/BeelineHiveJob.java
new file mode 100644
index 00000000..028ba751
--- /dev/null
+++ b/plugins/jobtype/src/azkaban/jobtype/BeelineHiveJob.java
@@ -0,0 +1,307 @@
+package azkaban.jobtype;
+
+import azkaban.flow.CommonJobProperties;
+import azkaban.jobExecutor.JavaProcessJob;
+import azkaban.jobtype.HadoopConfigurationInjector;
+import azkaban.jobtype.HadoopJobUtils;
+import azkaban.jobtype.HadoopSecureWrapperUtils;
+import azkaban.security.commons.HadoopSecurityManager;
+import azkaban.utils.Props;
+import azkaban.utils.StringUtils;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+public class BeelineHiveJob extends JavaProcessJob {
+
+ public static final String HIVE_SCRIPT = "hive.script";
+ public static final String HIVE_URL = "hive.url";
+ private static final String HIVECONF_PARAM_PREFIX = "hiveconf.";
+ private static final String HIVEVAR_PARAM_PREFIX = "hivevar.";
+ public static final String HADOOP_SECURE_HIVE_WRAPPER =
+ "azkaban.jobtype.HadoopSecureBeelineWrapper";
+
+ private String userToProxy = null;
+ private boolean shouldProxy = false;
+ private boolean obtainTokens = false;
+ private File tokenFile = null;
+
+ private HadoopSecurityManager hadoopSecurityManager;
+
+ private boolean debug = false;
+
+ public BeelineHiveJob(final String jobid, final Props sysProps, final Props jobProps, final Logger log)
+ throws IOException {
+
+ super(jobid, sysProps, jobProps, log);
+
+ getJobProps().put(CommonJobProperties.JOB_ID, jobid);
+
+ this.shouldProxy = getSysProps().getBoolean(HadoopSecurityManager.ENABLE_PROXYING, false);
+ getJobProps().put(HadoopSecurityManager.ENABLE_PROXYING, Boolean.toString(this.shouldProxy));
+ this.obtainTokens = getSysProps().getBoolean(HadoopSecurityManager.OBTAIN_BINARY_TOKEN, false);
+
+ this.debug = getJobProps().getBoolean("debug", false);
+
+ if (this.shouldProxy) {
+ getLog().info("Initiating hadoop security manager.");
+ try {
+ this.hadoopSecurityManager = HadoopJobUtils.loadHadoopSecurityManager(getSysProps(), log);
+ } catch (final RuntimeException e) {
+ throw new RuntimeException("Failed to get hadoop security manager!" + e);
+ }
+ }
+ }
+
+ @Override
+ public void run() throws Exception {
+ HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
+ getWorkingDirectory());
+
+ if (this.shouldProxy && this.obtainTokens) {
+ this.userToProxy = getJobProps().getString("user.to.proxy");
+ getLog().info("Need to proxy. Getting tokens.");
+ // get tokens in to a file, and put the location in props
+ final Props props = new Props();
+ props.putAll(getJobProps());
+ props.putAll(getSysProps());
+ HadoopJobUtils.addAdditionalNamenodesToPropsFromMRJob(props, getLog());
+ this.tokenFile = HadoopJobUtils.getHadoopTokens(this.hadoopSecurityManager, props, getLog());
+ getJobProps().put("env." + HADOOP_TOKEN_FILE_LOCATION,
+ this.tokenFile.getAbsolutePath());
+ }
+
+ try {
+ super.run();
+ } catch (final Throwable t) {
+ t.printStackTrace();
+ getLog().error("caught error running the job");
+ throw new Exception(t);
+ } finally {
+ if (this.tokenFile != null) {
+ HadoopJobUtils.cancelHadoopTokens(this.hadoopSecurityManager, this.userToProxy, this.tokenFile, getLog());
+ if (this.tokenFile.exists()) {
+ this.tokenFile.delete();
+ }
+ }
+ }
+ }
+
+ @Override
+ protected String getJavaClass() {
+ return HADOOP_SECURE_HIVE_WRAPPER;
+ }
+
+ @Override
+ protected String getJVMArguments() {
+ String args = super.getJVMArguments();
+
+ final String typeUserGlobalJVMArgs =
+ getJobProps().getString("jobtype.global.jvm.args", null);
+ if (typeUserGlobalJVMArgs != null) {
+ args += " " + typeUserGlobalJVMArgs;
+ }
+ final String typeSysGlobalJVMArgs =
+ getSysProps().getString("jobtype.global.jvm.args", null);
+ if (typeSysGlobalJVMArgs != null) {
+ args += " " + typeSysGlobalJVMArgs;
+ }
+ final String typeUserJVMArgs = getJobProps().getString("jobtype.jvm.args", null);
+ if (typeUserJVMArgs != null) {
+ args += " " + typeUserJVMArgs;
+ }
+ final String typeSysJVMArgs = getSysProps().getString("jobtype.jvm.args", null);
+ if (typeSysJVMArgs != null) {
+ args += " " + typeSysJVMArgs;
+ }
+
+ if (this.shouldProxy) {
+ info("Setting up secure proxy info for child process");
+ String secure;
+ secure =
+ " -D" + HadoopSecurityManager.USER_TO_PROXY + "="
+ + getJobProps().getString(HadoopSecurityManager.USER_TO_PROXY);
+ final String extraToken =
+ getSysProps().getString(HadoopSecurityManager.OBTAIN_BINARY_TOKEN,
+ "false");
+ if (extraToken != null) {
+ secure +=
+ " -D" + HadoopSecurityManager.OBTAIN_BINARY_TOKEN + "="
+ + extraToken;
+ }
+
+ secure += " -Dmapreduce.job.credentials.binary=" + this.tokenFile;
+ info("Secure settings = " + secure);
+ args += secure;
+ } else {
+ info("Not setting up secure proxy info for child process");
+ }
+
+ return args;
+ }
+
+ @Override
+ protected String getMainArguments() {
+ final ArrayList list = new ArrayList();
+
+ list.add("-u");
+ list.add(getJobProps().getString(HIVE_URL));
+
+ list.add("-n");
+ if (this.shouldProxy) {
+ list.add(this.userToProxy);
+ } else {
+ list.add(System.getProperty("user.name"));
+ }
+
+ list.add("-p");
+ list.add("DUMMY");
+
+
+ // for hiveconf
+ final Map map = getHiveConf();
+ if (map != null) {
+ for (final Map.Entry entry : map.entrySet()) {
+ list.add("--hiveconf");
+ list.add(StringUtils.shellQuote(
+ entry.getKey() + "=" + entry.getValue(), StringUtils.SINGLE_QUOTE));
+ }
+ }
+
+ if (this.debug) {
+ list.add("--hiveconf");
+ list.add("hive.root.logger=INFO,console");
+ }
+
+ // for hivevar
+ final Map hiveVarMap = getHiveVar();
+ if (hiveVarMap != null) {
+ for (final Map.Entry entry : hiveVarMap.entrySet()) {
+ list.add("--hivevar");
+ list.add(StringUtils.shellQuote(
+ entry.getKey() + "=" + entry.getValue(), StringUtils.SINGLE_QUOTE));
+ }
+ }
+
+ list.add("-d");
+ list.add("org.apache.hive.jdbc.HiveDriver");
+
+ list.add("-f");
+ list.add(getScript());
+
+ if (this.shouldProxy) {
+ list.add("-a");
+ list.add("delegationToken");
+ }
+
+ return StringUtils.join((Collection) list, " ");
+ }
+
+ @Override
+ protected List getClassPaths() {
+
+ final List classPath = super.getClassPaths();
+
+ // To add az-core jar classpath
+ classPath.add(getSourcePathFromClass(Props.class));
+
+ // To add az-common jar classpath
+ classPath.add(getSourcePathFromClass(JavaProcessJob.class));
+ classPath.add(getSourcePathFromClass(HadoopSecureWrapperUtils.class));
+ classPath.add(getSourcePathFromClass(HadoopSecureBeelineWrapper.class));
+ classPath.add(getSourcePathFromClass(HadoopSecurityManager.class));
+
+
+ final String loggerPath = getSourcePathFromClass(org.apache.log4j.Logger.class);
+ if (!classPath.contains(loggerPath)) {
+ classPath.add(loggerPath);
+ }
+
+ classPath.add(HadoopConfigurationInjector.getPath(getJobProps(),
+ getWorkingDirectory()));
+ final List typeClassPath =
+ getSysProps().getStringList("jobtype.classpath", null, ",");
+ if (typeClassPath != null) {
+ // fill in this when load this jobtype
+ final String pluginDir = getSysProps().get("plugin.dir");
+ for (final String jar : typeClassPath) {
+ File jarFile = new File(jar);
+ if (!jarFile.isAbsolute()) {
+ jarFile = new File(pluginDir + File.separatorChar + jar);
+ }
+
+ if (!classPath.contains(jarFile.getAbsoluteFile())) {
+ classPath.add(jarFile.getAbsolutePath());
+ }
+ }
+ }
+
+ final List typeGlobalClassPath =
+ getSysProps().getStringList("jobtype.global.classpath", null, ",");
+ if (typeGlobalClassPath != null) {
+ for (final String jar : typeGlobalClassPath) {
+ if (!classPath.contains(jar)) {
+ classPath.add(jar);
+ }
+ }
+ }
+
+ return classPath;
+ }
+
+ protected String getScript() {
+ return getJobProps().getString(HIVE_SCRIPT);
+ }
+
+ protected Map getHiveConf() {
+ return getJobProps().getMapByPrefix(HIVECONF_PARAM_PREFIX);
+ }
+
+ protected Map getHiveVar() {
+ return getJobProps().getMapByPrefix(HIVEVAR_PARAM_PREFIX);
+ }
+
+ private static String getSourcePathFromClass(final Class> containedClass) {
+ File file =
+ new File(containedClass.getProtectionDomain().getCodeSource()
+ .getLocation().getPath());
+
+ if (!file.isDirectory() && file.getName().endsWith(".class")) {
+ final String name = containedClass.getName();
+ final StringTokenizer tokenizer = new StringTokenizer(name, ".");
+ while (tokenizer.hasMoreTokens()) {
+ tokenizer.nextElement();
+ file = file.getParentFile();
+ }
+
+ return file.getPath();
+ } else {
+ return containedClass.getProtectionDomain().getCodeSource().getLocation()
+ .getPath();
+ }
+ }
+
+ /**
+ * This cancel method, in addition to the default canceling behavior, also kills the MR jobs launched by Hive
+ * on Hadoop
+ */
+ @Override
+ public void cancel() throws InterruptedException {
+ super.cancel();
+
+ info("Cancel called. Killing the Hive launched MR jobs on the cluster");
+
+ final String azExecId = jobProps.getString(CommonJobProperties.EXEC_ID);
+ final String logFilePath =
+ String.format("%s/_job.%s.%s.log", getWorkingDirectory(), azExecId,
+ getId());
+ info("log file path is: " + logFilePath);
+
+ HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps, this.tokenFile, getLog());
+ }
+}
+
diff --git a/plugins/jobtype/src/azkaban/jobtype/HadoopSecureBeelineWrapper.java b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureBeelineWrapper.java
new file mode 100644
index 00000000..2f0d4900
--- /dev/null
+++ b/plugins/jobtype/src/azkaban/jobtype/HadoopSecureBeelineWrapper.java
@@ -0,0 +1,51 @@
+package azkaban.jobtype;
+
+import azkaban.jobtype.HadoopConfigurationInjector;
+import azkaban.jobtype.HadoopSecureWrapperUtils;
+import azkaban.utils.Props;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.beeline.BeeLine;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Properties;
+
+import static org.apache.hadoop.security.UserGroupInformation.HADOOP_TOKEN_FILE_LOCATION;
+
+public class HadoopSecureBeelineWrapper {
+ private static final Logger logger = Logger.getRootLogger();
+ private static String hiveScript;
+
+ public static void main(final String[] args) throws IOException, InterruptedException {
+ final Properties jobProps = HadoopSecureWrapperUtils.loadAzkabanProps();
+ HadoopConfigurationInjector.injectResources(new Props(null, jobProps));
+
+ hiveScript = jobProps.getProperty("hive.script");
+
+ if (HadoopSecureWrapperUtils.shouldProxy(jobProps)) {
+ final String tokenFile = System.getenv(HADOOP_TOKEN_FILE_LOCATION);
+ System.setProperty("mapreduce.job.credentials.binary", tokenFile);
+ final UserGroupInformation proxyUser =
+ HadoopSecureWrapperUtils.setupProxyUser(jobProps, tokenFile, logger);
+ proxyUser.doAs(new PrivilegedExceptionAction() {
+ @Override
+ public Void run() throws Exception {
+ runBeeline(args);
+ return null;
+ }
+ });
+ } else {
+ runBeeline(args);
+ }
+ }
+
+ private static void runBeeline(final String[] args) throws IOException {
+ final BeeLine beeline = new BeeLine();
+ final int status = beeline.begin(args, null);
+ beeline.close();
+ if (status != 0) {
+ System.exit(0);
+ }
+ }
+}
diff --git a/plugins/jobtype/test/azkaban/jobtype/TestBeelineHiveJob.java b/plugins/jobtype/test/azkaban/jobtype/TestBeelineHiveJob.java
new file mode 100644
index 00000000..9bb05b42
--- /dev/null
+++ b/plugins/jobtype/test/azkaban/jobtype/TestBeelineHiveJob.java
@@ -0,0 +1,48 @@
+package azkaban.jobtype;
+
+import azkaban.utils.Props;
+import com.google.common.io.Files;
+import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+public class TestBeelineHiveJob {
+ Logger testLogger;
+ private static final String WORKING_DIR = Files.createTempDir().getAbsolutePath() + "/TestBeelineHiveJob";
+
+ @Before
+ public void setUp() throws Exception {
+ this.testLogger = Logger.getLogger("testLogger");
+ final File workingDirFile = new File(WORKING_DIR);
+ workingDirFile.mkdirs();
+ }
+
+ @Test
+ public void testMainArguments() throws Exception {
+
+ final Props sysProps = new Props();
+ sysProps.put("database.type", "h2");
+ sysProps.put("h2.path", "./h2");
+
+ final String jobId = "test_job";
+ final Props jobProps = new Props();
+ jobProps.put("type", "beelinehive");
+ jobProps.put("hive.script", "hivescript.sql");
+ jobProps.put("hive.url", "localhost");
+
+ final BeelineHiveJob job = new BeelineHiveJob(jobId, sysProps, jobProps, this.testLogger);
+ final String jobArguments = job.getMainArguments();
+ Assert.assertTrue(jobArguments.endsWith("-d org.apache.hive.jdbc.HiveDriver -f hivescript.sql "));
+ Assert.assertTrue(jobArguments.startsWith("-u localhost"));
+ Assert.assertFalse(jobArguments.contains(" -a "));
+ System.out.println(job.getMainArguments());
+
+ }
+}