Skip to content

Commit

Permalink
Remove end and start tasks (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj authored Apr 11, 2019
1 parent 98f35ac commit 44c3ce0
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 7 deletions.
6 changes: 4 additions & 2 deletions oozie-to-airflow/converter/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,23 @@
from mappers.base_mapper import BaseMapper
from mappers.decision_mapper import DecisionMapper
from mappers.dummy_mapper import DummyMapper
from mappers.end_mapper import EndMapper
from mappers.kill_mapper import KillMapper
from mappers.pig_mapper import PigMapper
from mappers.shell_mapper import ShellMapper
from mappers.spark_mapper import SparkMapper
from mappers.ssh_mapper import SSHMapper
from mappers.start_mapper import StartMapper
from mappers.subworkflow_mapper import SubworkflowMapper


CONTROL_MAP: Dict[str, Type[BaseMapper]] = {
"decision": DecisionMapper,
"end": DummyMapper,
"end": EndMapper,
"kill": KillMapper,
"fork": DummyMapper,
"join": DummyMapper,
"start": DummyMapper,
"start": StartMapper,
}

ACTION_MAP: Dict[str, Type[ActionMapper]] = {
Expand Down
7 changes: 5 additions & 2 deletions oozie-to-airflow/converter/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ def parse_workflow(self):
logging.debug(f"Parsing node: {node}")
self.parse_node(root, node)

self.create_relations()

for node in self.workflow.nodes.values():
node.mapper.on_parse_finish(self.workflow)

def create_relations(self) -> None:
"""
Given a dictionary of task_ids and ParsedNodes,
Expand Down Expand Up @@ -340,8 +345,6 @@ def update_trigger_rules(self) -> None:
node.update_trigger_rule()

def get_relations(self) -> Set[Relation]:
if not self.workflow.relations:
self.create_relations()
return self.workflow.relations

def get_dependencies(self) -> Set[str]:
Expand Down
7 changes: 7 additions & 0 deletions oozie-to-airflow/mappers/base_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ def on_parse_node(self):
Called when processing a node.
"""

def on_parse_finish(self, workflow):
"""
Called when processing of all nodes is finished.
This is a good time to copy additional files, or to perform additional operations on the workflow.
"""

# pylint: disable=unused-argument,no-self-use
def copy_extra_assets(self, input_directory_path: str, output_directory_path: str) -> None:
"""
Expand Down
33 changes: 33 additions & 0 deletions oozie-to-airflow/mappers/end_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Maps Oozie end node to Airflow's DAG"""
from typing import Set

from mappers.base_mapper import BaseMapper


class EndMapper(BaseMapper):
@staticmethod
def required_imports() -> Set[str]:
return set()

def convert_to_text(self) -> str:
return ""

def on_parse_finish(self, workflow):
super().on_parse_finish(self)
workflow.relations -= {
relation for relation in workflow.relations if relation.to_task_id == self.name
}
33 changes: 33 additions & 0 deletions oozie-to-airflow/mappers/start_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# -*- coding: utf-8 -*-
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Maps Oozie start node to Airflow's DAG"""
from typing import Set

from mappers.base_mapper import BaseMapper


class StartMapper(BaseMapper):
@staticmethod
def required_imports() -> Set[str]:
return set()

def convert_to_text(self) -> str:
return ""

def on_parse_finish(self, workflow):
super().on_parse_finish(self)
workflow.relations -= {
relation for relation in workflow.relations if relation.from_task_id == self.name
}
29 changes: 26 additions & 3 deletions oozie-to-airflow/tests/converter/test_oozie_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_parse_kill_node(self, on_parse_node_mock):

on_parse_node_mock.assert_called_once_with()

@mock.patch("mappers.dummy_mapper.DummyMapper.on_parse_node", wraps=None)
@mock.patch("mappers.end_mapper.EndMapper.on_parse_node", wraps=None)
def test_parse_end_node(self, on_parse_node_mock):
node_name = "end_name"
# language=XML
Expand Down Expand Up @@ -154,7 +154,7 @@ def test_parse_decision_node(self, on_parse_node_mock):

on_parse_node_mock.assert_called_once_with()

@mock.patch("mappers.dummy_mapper.DummyMapper.on_parse_node", wraps=None)
@mock.patch("mappers.start_mapper.StartMapper.on_parse_node", wraps=None)
@mock.patch("uuid.uuid4")
def test_parse_start_node(self, uuid_mock, on_parse_node_mock):
uuid_mock.return_value = "1234"
Expand Down Expand Up @@ -358,7 +358,9 @@ def test_update_trigger_rules(self):
self.assertFalse(fail.is_ok)
self.assertTrue(fail.is_error)

def test_parse_workflow(self): # pylint: disable=unused-argument
@mock.patch("uuid.uuid4", return_value="1234")
@mock.patch("mappers.base_mapper.BaseMapper.on_parse_finish", wraps=None)
def test_parse_workflow(self, _, on_parse_finish_mock):
filename = os.path.join(ROOT_DIR, "examples/demo/workflow.xml")
self.parser.workflow_file = filename
self.parser.parse_workflow()
Expand All @@ -367,3 +369,24 @@ def test_parse_workflow(self): # pylint: disable=unused-argument
self.assertIn("fork_node", self.parser.workflow.nodes)
self.assertIn("pig_node", self.parser.workflow.nodes)
self.assertIn("fail", self.parser.workflow.nodes)

self.assertEqual(
self.parser.workflow.relations,
{
Relation(from_task_id="cleanup_node", to_task_id="fail"),
Relation(from_task_id="cleanup_node", to_task_id="fork_node"),
Relation(from_task_id="decision_node", to_task_id="hdfs_node"),
Relation(from_task_id="fork_node", to_task_id="pig_node_prepare"),
Relation(from_task_id="fork_node", to_task_id="streaming_node"),
Relation(from_task_id="hdfs_node", to_task_id="fail"),
Relation(from_task_id="join_node", to_task_id="mr_node"),
Relation(from_task_id="mr_node", to_task_id="decision_node"),
Relation(from_task_id="mr_node", to_task_id="fail"),
Relation(from_task_id="pig_node", to_task_id="fail"),
Relation(from_task_id="pig_node", to_task_id="join_node"),
Relation(from_task_id="streaming_node", to_task_id="fail"),
Relation(from_task_id="streaming_node", to_task_id="join_node"),
},
)

on_parse_finish_mock.assert_called()

0 comments on commit 44c3ce0

Please sign in to comment.