diff --git a/oozie-to-airflow/converter/mappers.py b/oozie-to-airflow/converter/mappers.py
index e7f8dbe8b..bb2d72fe2 100644
--- a/oozie-to-airflow/converter/mappers.py
+++ b/oozie-to-airflow/converter/mappers.py
@@ -26,6 +26,7 @@
from mappers.decision_mapper import DecisionMapper
from mappers.dummy_mapper import DummyMapper
from mappers.end_mapper import EndMapper
+from mappers.fs_mapper import FsMapper
from mappers.kill_mapper import KillMapper
from mappers.pig_mapper import PigMapper
from mappers.shell_mapper import ShellMapper
@@ -49,6 +50,7 @@
"ssh": SSHMapper,
"spark": SparkMapper,
"pig": PigMapper,
+ "fs": FsMapper,
"sub-workflow": SubworkflowMapper,
"shell": ShellMapper,
}
diff --git a/oozie-to-airflow/converter/primitives.py b/oozie-to-airflow/converter/primitives.py
index 8d6c72685..cd2a2e1b6 100644
--- a/oozie-to-airflow/converter/primitives.py
+++ b/oozie-to-airflow/converter/primitives.py
@@ -53,4 +53,5 @@ def __init__(self, input_directory_path, output_directory_path, dag_name=None) -
"import datetime",
"from airflow import models",
"from airflow.utils.trigger_rule import TriggerRule",
+ "from airflow.utils import dates",
}
diff --git a/oozie-to-airflow/examples/fs/configuration-template.properties b/oozie-to-airflow/examples/fs/configuration-template.properties
new file mode 100644
index 000000000..ed2560eea
--- /dev/null
+++ b/oozie-to-airflow/examples/fs/configuration-template.properties
@@ -0,0 +1,16 @@
+# Copyright 2019 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+dataproc_cluster=CLUSTER-NAME
+gcp_region=europe-west3
diff --git a/oozie-to-airflow/examples/fs/configuration.properties b/oozie-to-airflow/examples/fs/configuration.properties
new file mode 100644
index 000000000..58e746b8e
--- /dev/null
+++ b/oozie-to-airflow/examples/fs/configuration.properties
@@ -0,0 +1,16 @@
+# Copyright 2019 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+dataproc_cluster=cluster-o2a
+gcp_region=europe-west3
diff --git a/oozie-to-airflow/examples/fs/job.properties b/oozie-to-airflow/examples/fs/job.properties
new file mode 100644
index 000000000..9a93a8757
--- /dev/null
+++ b/oozie-to-airflow/examples/fs/job.properties
@@ -0,0 +1,20 @@
+# Copyright 2019 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+nameNode=hdfs://localhost:8020
+resourceManager=localhost:8032
+queueName=default
+examplesRoot=examples
+
+oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/shell
diff --git a/oozie-to-airflow/examples/fs/workflow.xml b/oozie-to-airflow/examples/fs/workflow.xml
new file mode 100644
index 000000000..0237b724f
--- /dev/null
+++ b/oozie-to-airflow/examples/fs/workflow.xml
@@ -0,0 +1,98 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Fs workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]
+
+
+
diff --git a/oozie-to-airflow/integration/run_fs_test_with_airflow.bash b/oozie-to-airflow/integration/run_fs_test_with_airflow.bash
new file mode 100755
index 000000000..7a0f13a2b
--- /dev/null
+++ b/oozie-to-airflow/integration/run_fs_test_with_airflow.bash
@@ -0,0 +1,39 @@
+#!/usr/bin/env bash
+MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+BASE_DIR=${MY_DIR}/..
+
+LOCAL_APPLICATION_DIR=${BASE_DIR}/examples/fs
+
+HADOOP_USER=pig
+EXAMPLE_DIR=examples/fs
+CLUSTER_MASTER=cluster-o2a-m
+CLUSTER_NAME=cluster-o2a
+
+COMPOSER_BUCKET=gs://europe-west1-o2a-integratio-f690ede2-bucket
+COMPOSER_NAME=o2a-integration
+COMPOSER_LOCATION=europe-west1
+
+DAG_NAME=test_fs_dag
+
+INPTU_DIR=${BASE_DIR}/examples/fs
+OUTPUT_DIR=${BASE_DIR}/output/fs_test
+
+
+if [[ ! -f ${LOCAL_APPLICATION_DIR}/configuration.properties ]]; then
+ echo
+ echo "Please copy ${LOCAL_APPLICATION_DIR}/configuration-template.properties to ${LOCAL_APPLICATION_DIR}/configuration.properties and update properties to match your case"
+ echo
+ exit 1
+fi
+
+python ${BASE_DIR}/o2a.py -i ${INPTU_DIR} -o ${OUTPUT_DIR} -u ${HADOOP_USER} -d ${DAG_NAME} $@
+
+gsutil cp ${BASE_DIR}/scripts/prepare.sh ${COMPOSER_BUCKET}/data/
+gsutil cp ${OUTPUT_DIR}/* ${COMPOSER_BUCKET}/dags/
+
+gcloud dataproc jobs submit pig --cluster=${CLUSTER_NAME} --execute 'fs -rm -r /home/pig/test-fs' || true
+gcloud dataproc jobs submit pig --cluster=${CLUSTER_NAME} --execute 'fs -mkdir /home/pig/test-fs'
+
+
+gcloud composer environments run ${COMPOSER_NAME} --location ${COMPOSER_LOCATION} list_dags
+gcloud composer environments run ${COMPOSER_NAME} --location ${COMPOSER_LOCATION} trigger_dag -- ${DAG_NAME}
diff --git a/oozie-to-airflow/mappers/fs_mapper.py b/oozie-to-airflow/mappers/fs_mapper.py
new file mode 100644
index 000000000..e3fe8dcdf
--- /dev/null
+++ b/oozie-to-airflow/mappers/fs_mapper.py
@@ -0,0 +1,198 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Maps FS node to Airflow's DAG"""
+
+import shlex
+from typing import Set, NamedTuple, List
+from xml.etree.ElementTree import Element
+
+from converter.parser import Relation
+from mappers.action_mapper import ActionMapper
+from utils.template_utils import render_template
+from utils.el_utils import normalize_path
+from utils.xml_utils import bool_value
+
+ACTION_TYPE = "fs"
+
+FS_OP_MKDIR = "mkdir"
+FS_OP_DELETE = "delete"
+FS_OP_MOVE = "move"
+FS_OP_CHMOD = "chmod"
+FS_OP_TOUCHZ = "touchz"
+FS_OP_CHGRP = "chgrp"
+FS_OP_SETREP = "setrep"
+
+FS_TAG_PATH = "path"
+FS_TAG_SOURCE = "source"
+FS_TAG_TARGET = "target"
+FS_TAG_RECURSIVE = "recursive"
+FS_TAG_DIRFILES = "dir-files"
+FS_TAG_SKIPTRASH = "skip-trash"
+FS_TAG_PERMISSIONS = "permissions"
+FS_TAG_GROUP = "group"
+FS_TAG_REPLFAC = "replication-factor"
+
+
+def prepare_mkdir_command(node: Element, params):
+ path = normalize_path(node.attrib[FS_TAG_PATH], params)
+ command = "fs -mkdir {path}".format(path=shlex.quote(path))
+ return command
+
+
+def prepare_delete_command(node: Element, params):
+ path = normalize_path(node.attrib[FS_TAG_PATH], params)
+ skip_trash = bool_value(node, FS_TAG_SKIPTRASH)
+ extra = " -skipTrash" if skip_trash else ""
+ command = "fs -rm -r {extra} {path}".format(extra=extra, path=shlex.quote(path))
+
+ return command
+
+
+def prepare_move_command(node: Element, params):
+ source = normalize_path(node.attrib[FS_TAG_SOURCE], params)
+ target = normalize_path(node.attrib[FS_TAG_TARGET], params)
+
+ command = "fs -mv {source} {target}".format(source=source, target=target)
+ return command
+
+
+def prepare_chmod_command(node: Element, params):
+ path = normalize_path(node.attrib[FS_TAG_PATH], params)
+ permission = node.attrib[FS_TAG_PERMISSIONS]
+ # TODO: Add support for dirFiles
+ # dirFiles = bool_value(node, FS_TAG_DIRFILES)
+ recursive = node.find(FS_TAG_RECURSIVE) is not None
+ extra_param = "-R" if recursive else ""
+
+ command = "fs -chmod {extra} {permission} {path}".format(
+ extra=extra_param, path=shlex.quote(path), permission=shlex.quote(permission)
+ )
+ return command
+
+
+def prepare_touchz_command(node: Element, params):
+ path = normalize_path(node.attrib[FS_TAG_PATH], params)
+
+ command = "fs -touchz {path}".format(path=shlex.quote(path))
+ return command
+
+
+def prepare_chgrp_command(node: Element, params):
+ path = normalize_path(node.attrib[FS_TAG_PATH], params)
+ group = normalize_path(node.attrib[FS_TAG_GROUP], params)
+
+ recursive = node.find(FS_TAG_RECURSIVE) is not None
+ extra_param = "-R" if recursive else ""
+
+ command = "fs -chgrp {extra} {group} {path}".format(
+ extra=extra_param, group=shlex.quote(group), path=shlex.quote(path)
+ )
+ return command
+
+
+def prepare_setrep_command(node: Element, params):
+ path = normalize_path(node.attrib[FS_TAG_PATH], params)
+ fac = node.attrib[FS_TAG_REPLFAC]
+
+ command = "fs -setrep {fac} {path}".format(fac=shlex.quote(fac), path=shlex.quote(path))
+ return command
+
+
+FS_OPERATION_MAPPERS = {
+ FS_OP_MKDIR: prepare_mkdir_command,
+ FS_OP_DELETE: prepare_delete_command,
+ FS_OP_MOVE: prepare_move_command,
+ FS_OP_CHMOD: prepare_chmod_command,
+ FS_OP_TOUCHZ: prepare_touchz_command,
+ FS_OP_CHGRP: prepare_chgrp_command,
+ FS_OP_SETREP: prepare_setrep_command,
+}
+
+
+class Task(NamedTuple):
+ task_id: str
+ rendered_template: str
+
+
+def chain(ops):
+ return [Relation(from_task_id=a.task_id, to_task_id=b.task_id) for a, b in zip(ops, ops[1::])]
+
+
+class FsMapper(ActionMapper):
+ """
+ Converts a FS Oozie node to an Airflow task.
+ """
+
+ tasks: List[Task]
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.tasks = []
+
+ def on_parse_node(self):
+ super().on_parse_node()
+ self.tasks = self.parse_tasks()
+
+ def parse_tasks(self):
+ if not list(self.oozie_node):
+ return [
+ Task(
+ task_id=self.name,
+ rendered_template=render_template(
+ "dummy.tpl", task_id=self.name, trigger_rule=self.trigger_rule
+ ),
+ )
+ ]
+
+ return [self.parse_fs_action(i, node) for i, node in enumerate(self.oozie_node)]
+
+ def convert_to_text(self):
+ return render_template(
+ template_name="fs.tpl",
+ task_id=self.name,
+ trigger_rule=self.trigger_rule,
+ sub_ops=self.tasks,
+ relations=chain(self.tasks),
+ )
+
+ @staticmethod
+ def required_imports() -> Set[str]:
+ return {
+ "from airflow.operators import dummy_operator",
+ "from airflow.operators import bash_operator",
+ "import shlex",
+ }
+
+ @property
+ def first_task_id(self):
+ return self.tasks[0].task_id
+
+ @property
+ def last_task_id(self):
+ return self.tasks[-1].task_id
+
+ def parse_fs_action(self, index: int, node: Element):
+ tag_name = node.tag
+ tasks_count = len(self.oozie_node)
+ task_id = self.name if tasks_count == 1 else "{}_fs_{}_{}".format(self.name, index, tag_name)
+ mapper_fn = FS_OPERATION_MAPPERS.get(tag_name)
+
+ if not mapper_fn:
+ raise Exception("Unknown FS operation: {}".format(tag_name))
+
+ pig_command = mapper_fn(node, self.params)
+ rendered_template = render_template("fs_op.tpl", task_id=task_id, pig_command=pig_command)
+
+ return Task(task_id=task_id, rendered_template=rendered_template)
diff --git a/oozie-to-airflow/mappers/prepare_mixin.py b/oozie-to-airflow/mappers/prepare_mixin.py
index 3c484f2da..351b0f3a7 100644
--- a/oozie-to-airflow/mappers/prepare_mixin.py
+++ b/oozie-to-airflow/mappers/prepare_mixin.py
@@ -16,7 +16,8 @@
from typing import Dict, List, Tuple
import xml.etree.ElementTree as ET
-from utils import xml_utils, el_utils
+from utils import xml_utils
+from utils.el_utils import normalize_path
class PrepareMixin:
@@ -55,10 +56,7 @@ def parse_prepare_node(oozie_node: ET.Element, params: Dict[str, str]) -> Tuple[
# If there exists a prepare node, there will only be one, according
# to oozie xml schema
for node in prepare_nodes[0]:
- node_path = el_utils.replace_el_with_var(node.attrib["path"], params=params, quote=False)
- if "//" in node_path:
- node_path = node_path.split("//", maxsplit=1)[1] # Removing the hdfs:// or similar part
- node_path = "/" + node_path.split("/", maxsplit=1)[1] # Removing the 'localhost:8082/' part
+ node_path = normalize_path(node.attrib["path"], params=params)
if node.tag == "delete":
delete_paths.append(node_path)
else:
diff --git a/oozie-to-airflow/templates/fs.tpl b/oozie-to-airflow/templates/fs.tpl
new file mode 100644
index 000000000..281813203
--- /dev/null
+++ b/oozie-to-airflow/templates/fs.tpl
@@ -0,0 +1,21 @@
+{#
+ Copyright 2019 Google LLC
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ #}
+{% for op in sub_ops %}
+{{ op.rendered_template }}
+{% endfor %}
+{% with relations=relations %}
+ {% include "relations.tpl" %}
+{% endwith %}
diff --git a/oozie-to-airflow/templates/fs_op.tpl b/oozie-to-airflow/templates/fs_op.tpl
new file mode 100644
index 000000000..c7b9aea3a
--- /dev/null
+++ b/oozie-to-airflow/templates/fs_op.tpl
@@ -0,0 +1,19 @@
+{#
+ Copyright 2019 Google LLC
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ #}
+{{ task_id }} = bash_operator.BashOperator(
+ bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --cluster={dataproc_cluster} --region={gcp_region} --execute {bash_command}".format(dataproc_cluster=PARAMS['dataproc_cluster'], gcp_region=PARAMS['gcp_region'], bash_command=shlex.quote("{{ pig_command }}")),
+ task_id='{{ task_id }}',
+)
diff --git a/oozie-to-airflow/tests/mappers/test_fs_mapper.py b/oozie-to-airflow/tests/mappers/test_fs_mapper.py
new file mode 100644
index 000000000..a7453564e
--- /dev/null
+++ b/oozie-to-airflow/tests/mappers/test_fs_mapper.py
@@ -0,0 +1,314 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""Tests fs mapper"""
+
+import ast
+import unittest
+from xml.etree import ElementTree as ET
+
+from parameterized import parameterized
+from airflow.utils.trigger_rule import TriggerRule
+
+from converter.primitives import Relation
+from mappers import fs_mapper
+
+TEST_PARAMS = {"user.name": "pig", "nameNode": "hdfs://localhost:8020"}
+
+
+# pylint: disable=invalid-name
+class prepare_mkdir_commandTest(unittest.TestCase):
+ @parameterized.expand(
+ [
+ (
+ "",
+ "fs -mkdir /home/pig/test-fs/test-mkdir-1",
+ ),
+ (
+ "",
+ "fs -mkdir /home/pig/test-fs/DDD-mkdir-1",
+ ),
+ ]
+ )
+ def test_result(self, xml, command):
+ node = ET.fromstring(xml)
+ self.assertEqual(fs_mapper.prepare_mkdir_command(node, TEST_PARAMS), command)
+
+
+class prepare_delete_commandTest(unittest.TestCase):
+ @parameterized.expand(
+ [
+ (
+ "",
+ "fs -rm -r -skipTrash /home/pig/test-fs/test-delete-1",
+ ),
+ (
+ "",
+ "fs -rm -r /home/pig/test-fs/test-delete-2",
+ ),
+ (
+ "",
+ "fs -rm -r /home/pig/test-fs/test-delete-3",
+ ),
+ ]
+ )
+ def test_result(self, xml, command):
+ node = ET.fromstring(xml)
+ self.assertEqual(fs_mapper.prepare_delete_command(node, TEST_PARAMS), command)
+
+
+class prepare_move_commanddTest(unittest.TestCase):
+ @parameterized.expand(
+ [
+ (
+ "",
+ "fs -mv /home/pig/test-fs/test-move-1 /home/pig/test-fs/test-move-2",
+ ),
+ (
+ "",
+ "fs -mv /home/pig/test-fs/test-move-1 /home/pig/test-DDD/test-move-2",
+ ),
+ ]
+ )
+ def test_result(self, xml, command):
+ node = ET.fromstring(xml)
+ self.assertEqual(fs_mapper.prepare_move_command(node, TEST_PARAMS), command)
+
+
+class prepare_chmod_commandTest(unittest.TestCase):
+ @parameterized.expand(
+ [
+ (
+ "",
+ "fs -chmod 777 /home/pig/test-fs/test-chmod-1",
+ ),
+ (
+ "",
+ "fs -chmod 777 /home/pig/test-fs/test-chmod-2",
+ ),
+ (
+ "",
+ "fs -chmod 777 /home/pig/test-fs/test-chmod-3",
+ ),
+ (
+ """
+
+ """,
+ "fs -chmod -R 777 /home/pig/test-fs/test-chmod-4",
+ ),
+ ]
+ )
+ def test_result(self, xml, command):
+ node = ET.fromstring(xml)
+ self.assertEqual(fs_mapper.prepare_chmod_command(node, TEST_PARAMS), command)
+
+
+class prepare_touchz_commandTest(unittest.TestCase):
+ @parameterized.expand(
+ [
+ (
+ "",
+ "fs -touchz /home/pig/test-fs/test-touchz-1",
+ ),
+ (
+ "",
+ "fs -touchz /home/pig/test-fs/DDDD-touchz-1",
+ ),
+ ]
+ )
+ def test_result(self, xml, command):
+ node = ET.fromstring(xml)
+ self.assertEqual(fs_mapper.prepare_touchz_command(node, TEST_PARAMS), command)
+
+
+class prepare_chgrp_commandTest(unittest.TestCase):
+ @parameterized.expand(
+ [
+ (
+ "",
+ "fs -chgrp hadoop /home/pig/test-fs/test-chgrp-1",
+ ),
+ (
+ "",
+ "fs -chgrp hadoop /home/pig/test-fs/DDD-chgrp-1",
+ ),
+ ]
+ )
+ def test_result(self, xml, command):
+ node = ET.fromstring(xml)
+ self.assertEqual(fs_mapper.prepare_chgrp_command(node, TEST_PARAMS), command)
+
+
+class prepare_setrep_commandTest(unittest.TestCase):
+ @parameterized.expand(
+ [
+ (
+ "",
+ "fs -setrep 2 /home/pig/test-fs/test-setrep-1",
+ ),
+ (
+ "",
+ "fs -setrep 3 /home/pig/test-fs/DDD-setrep-1",
+ ),
+ ]
+ )
+ def test_result(self, xml, command):
+ node = ET.fromstring(xml)
+ self.assertEqual(fs_mapper.prepare_setrep_command(node, TEST_PARAMS), command)
+
+
+# pylint: disable=invalid-name
+class chainTestCase(unittest.TestCase):
+ def test_empty(self):
+ relations = fs_mapper.chain([])
+ self.assertEqual(relations, [])
+
+ def test_one(self):
+ relations = fs_mapper.chain([fs_mapper.Task(task_id="A", rendered_template="")])
+ self.assertEqual(relations, [])
+
+ def test_multiple(self):
+ relations = fs_mapper.chain(
+ [
+ fs_mapper.Task(task_id="task_1", rendered_template=""),
+ fs_mapper.Task(task_id="task_2", rendered_template=""),
+ fs_mapper.Task(task_id="task_3", rendered_template=""),
+ fs_mapper.Task(task_id="task_4", rendered_template=""),
+ ]
+ )
+ self.assertEqual(
+ relations,
+ [
+ Relation(from_task_id="task_1", to_task_id="task_2"),
+ Relation(from_task_id="task_2", to_task_id="task_3"),
+ Relation(from_task_id="task_3", to_task_id="task_4"),
+ ],
+ )
+
+
+class FsMapperSingleTestCase(unittest.TestCase):
+ def setUp(self):
+ # language=XML
+ node_str = """
+
+
+ """
+ self.node = ET.fromstring(node_str)
+
+ self.mapper = fs_mapper.FsMapper(oozie_node=self.node, name="test_id", trigger_rule=TriggerRule.DUMMY)
+ self.mapper.on_parse_node()
+
+ def test_convert_to_text(self):
+ # Throws a syntax error if doesn't parse correctly
+ self.assertIsNotNone(ast.parse(self.mapper.convert_to_text()))
+
+ def test_required_imports(self):
+ imps = fs_mapper.FsMapper.required_imports()
+ imp_str = "\n".join(imps)
+ self.assertIsNotNone(ast.parse(imp_str))
+
+ def test_get_first_task_id(self):
+ self.assertEqual(self.mapper.first_task_id, "test_id")
+
+ def test_get_last_task_id(self):
+ self.assertEqual(self.mapper.last_task_id, "test_id")
+
+
+class FsMapperEmptyTestCase(unittest.TestCase):
+ def setUp(self):
+ self.node = ET.Element("fs")
+ self.mapper = fs_mapper.FsMapper(oozie_node=self.node, name="test_id", trigger_rule=TriggerRule.DUMMY)
+ self.mapper.on_parse_node()
+
+ def test_convert_to_text(self):
+ # Throws a syntax error if doesn't parse correctly
+ self.assertIsNotNone(ast.parse(self.mapper.convert_to_text()))
+
+ def test_required_imports(self):
+ imps = fs_mapper.FsMapper.required_imports()
+ imp_str = "\n".join(imps)
+ self.assertIsNotNone(ast.parse(imp_str))
+
+ def test_get_first_task_id(self):
+ self.assertEqual(self.mapper.first_task_id, "test_id")
+
+ def test_get_last_task_id(self):
+ self.assertEqual(self.mapper.last_task_id, "test_id")
+
+
+class FsMapperComplexTestCase(unittest.TestCase):
+ def setUp(self):
+ # language=XML
+ node_str = """
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ """
+ self.node = ET.fromstring(node_str)
+
+ self.mapper = fs_mapper.FsMapper(oozie_node=self.node, name="test_id", trigger_rule=TriggerRule.DUMMY)
+ self.mapper.on_parse_node()
+
+ def test_convert_to_text(self):
+ # Throws a syntax error if doesn't parse correctly
+ self.assertIsNotNone(ast.parse(self.mapper.convert_to_text()))
+
+ def test_required_imports(self):
+ imps = fs_mapper.FsMapper.required_imports()
+ imp_str = "\n".join(imps)
+ self.assertIsNotNone(ast.parse(imp_str))
+
+ def test_get_first_task_id(self):
+ self.assertEqual(self.mapper.first_task_id, "test_id_fs_0_mkdir")
+
+ def test_get_last_task_id(self):
+ self.assertEqual(self.mapper.last_task_id, "test_id_fs_20_setrep")
diff --git a/oozie-to-airflow/tests/mappers/test_prepare_mixin.py b/oozie-to-airflow/tests/mappers/test_prepare_mixin.py
index 5119bd836..642f9f59f 100644
--- a/oozie-to-airflow/tests/mappers/test_prepare_mixin.py
+++ b/oozie-to-airflow/tests/mappers/test_prepare_mixin.py
@@ -28,6 +28,7 @@ class TestPrepareMixin(unittest.TestCase):
def test_with_prepare(self):
cluster = "my-cluster"
region = "europe-west3"
+ params = {"nameNode": "hdfs://localhost:8020", "dataproc_cluster": cluster, "gcp_region": region}
# language=XML
pig_node_prepare_str = """
@@ -41,9 +42,8 @@ def test_with_prepare(self):
"""
pig_node_prepare = ET.fromstring(pig_node_prepare_str)
- prepare = prepare_mixin.PrepareMixin().get_prepare_command(
- oozie_node=pig_node_prepare, params={"dataproc_cluster": cluster, "gcp_region": region}
- )
+
+ prepare = prepare_mixin.PrepareMixin().get_prepare_command(oozie_node=pig_node_prepare, params=params)
self.assertEqual(
'$DAGS_FOLDER/../data/prepare.sh -c {0} -r {1} -d "{2} {3}" -m "{4} {5}"'.format(
cluster, region, self.delete_path1, self.delete_path2, self.mkdir_path1, self.mkdir_path2
@@ -54,10 +54,9 @@ def test_with_prepare(self):
def test_no_prepare(self):
cluster = "my-cluster"
region = "europe-west3"
+ params = {"nameNode": "hdfs://localhost:8020", "dataproc_cluster": cluster, "gcp_region": region}
# language=XML
pig_node_str = "hdfs://"
pig_node = ET.fromstring(pig_node_str)
- prepare = prepare_mixin.PrepareMixin().get_prepare_command(
- oozie_node=pig_node, params={"dataproc_cluster": cluster, "gcp_region": region}
- )
+ prepare = prepare_mixin.PrepareMixin().get_prepare_command(oozie_node=pig_node, params=params)
self.assertEqual("", prepare)
diff --git a/oozie-to-airflow/tests/utils/test_xml_utils.py b/oozie-to-airflow/tests/utils/test_xml_utils.py
index a454d286b..d285ed1a5 100644
--- a/oozie-to-airflow/tests/utils/test_xml_utils.py
+++ b/oozie-to-airflow/tests/utils/test_xml_utils.py
@@ -18,6 +18,7 @@
from xml.etree import ElementTree as ET
from utils import xml_utils
+from utils.xml_utils import bool_value
class TestELUtils(unittest.TestCase):
@@ -123,3 +124,8 @@ def test_find_nodes_by_attribute_found_none(self):
# nodes attrib is incorrect
self.assertEqual(0, len(found))
+
+ def test_bool_value(self):
+ self.assertTrue(bool_value(ET.Element("op", {"test_attrib": "true"}), "test_attrib"))
+ self.assertFalse(bool_value(ET.Element("op", {"test_attrib": "false"}), "test_attrib"))
+ self.assertFalse(bool_value(ET.Element("op"), "test_attrib"))
diff --git a/oozie-to-airflow/utils/el_utils.py b/oozie-to-airflow/utils/el_utils.py
index 8a8650294..c01bb6e20 100644
--- a/oozie-to-airflow/utils/el_utils.py
+++ b/oozie-to-airflow/utils/el_utils.py
@@ -17,6 +17,7 @@
import re
import logging
from typing import Dict
+from urllib.parse import urlparse
from o2a_libs import el_basic_functions
@@ -161,3 +162,8 @@ def parse_els(properties_file: str, prop_dict: Dict[str, str] = None):
else:
logging.warning(f"The job.properties file is missing: {properties_file}")
return prop_dict
+
+
+def normalize_path(url, params):
+ url_with_var = replace_el_with_var(url, params=params, quote=False)
+ return urlparse(url_with_var).path
diff --git a/oozie-to-airflow/utils/xml_utils.py b/oozie-to-airflow/utils/xml_utils.py
index c0adb9950..eb4291e1d 100644
--- a/oozie-to-airflow/utils/xml_utils.py
+++ b/oozie-to-airflow/utils/xml_utils.py
@@ -69,3 +69,8 @@ def find_nodes_by_attribute(root, attr, val, tag=None):
if attr in node.attrib and node.attrib[attr] == val:
matching_nodes.append(node)
return matching_nodes
+
+
+def bool_value(node, attr_name, default="false"):
+ value = node.attrib.get(attr_name, default)
+ return value is not None and value != "false"