Skip to content

Commit

Permalink
Add FS Mapper
Browse files Browse the repository at this point in the history
  • Loading branch information
Kamil Breguła committed Apr 8, 2019
1 parent 5dc3ace commit 67a3e05
Show file tree
Hide file tree
Showing 11 changed files with 530 additions and 17 deletions.
2 changes: 2 additions & 0 deletions oozie-to-airflow/converter/mappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from mappers.base_mapper import BaseMapper
from mappers.decision_mapper import DecisionMapper
from mappers.dummy_mapper import DummyMapper
from mappers.fs_mapper import FsMapper
from mappers.kill_mapper import KillMapper
from mappers.pig_mapper import PigMapper
from mappers.shell_mapper import ShellMapper
Expand All @@ -38,6 +39,7 @@
"ssh": SSHMapper,
"spark": SparkMapper,
"pig": PigMapper,
"fs": FsMapper,
"sub-workflow": SubworkflowMapper,
"shell": ShellMapper,
}
16 changes: 16 additions & 0 deletions oozie-to-airflow/examples/fs/configuration-template.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

dataproc_cluster=CLUSTER-NAME
gcp_region=europe-west3
16 changes: 16 additions & 0 deletions oozie-to-airflow/examples/fs/configuration.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

dataproc_cluster=cluster-o2a
gcp_region=europe-west3
24 changes: 24 additions & 0 deletions oozie-to-airflow/examples/fs/job.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

nameNode=hdfs://localhost:8020
resourceManager=localhost:8032
queueName=default
examplesRoot=examples

oozie.wf.application.path=${nameNode}/user/${user.name}/${examplesRoot}/apps/shell
44 changes: 44 additions & 0 deletions oozie-to-airflow/examples/fs/workflow.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<workflow-app xmlns="uri:oozie:workflow:1.0" name="fs-wf">
<start to="fs-node"/>
<action name="fs-node">
<fs>
<!-- mkdir -->
<mkdir path='/home/pig/test-delete-1'/>
<!-- delete -->
<mkdir path='/home/pig/test-delete-1'/>
<mkdir path='/home/pig/test-delete-2'/>
<mkdir path='/home/pig/test-delete-3'/>
<delete path='/home/pig/test-delete-1' skip-trash='true'/>
<delete path='/home/pig/test-delete-2' skip-trash='false'/>
<delete path='/home/pig/test-delete-3'/>
<!-- move -->
<mkdir path='/home/pig/test-delete-1'/>
<move source='/home/pig/test-chmod-1' target='/home/pig/test-chmod-2' />

<!-- chmod -->
<mkdir path='/home/pig/test-chmod-1'/>
<mkdir path='/home/pig/test-chmod-2'/>
<mkdir path='/home/pig/test-chmod-3'/>
<mkdir path='/home/pig/test-chmod-4'/>
<chmod path='/home/pig/test-chmod-1' permissions='-rwxrw-rw-' dir-files='false' />
<chmod path='/home/pig/test-chmod-2' permissions='-rwxrw-rw-' dir-files='true' />
<chmod path='/home/pig/test-chmod-3' permissions='-rwxrw-rw-' />
<chmod path='/home/pig/test-chmod-4' permissions='-rwxrw-rw-' dir-files='false' ><recursive/></chmod>

<!-- touchz -->
<touchz path='/home/pig/test-touchz-1' />
<!-- chgrp -->
<chgrp path='/home/pig/test-touchz-1' group='pig' />

<!-- setrep -->
<mkdir path='/home/pig/test-setrep-1'/>
<setrep path='/home/pig/test-setrep-1' replication-factor='2'/>
</fs>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Fs workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>
31 changes: 31 additions & 0 deletions oozie-to-airflow/integration/run_fs_test_with_airflow.bash
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#!/usr/bin/env bash
MY_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
BASE_DIR=${MY_DIR}/..

LOCAL_APPLICATION_DIR=${BASE_DIR}/examples/fs

HADOOP_USER=pig
EXAMPLE_DIR=examples/fs
CLUSTER_MASTER=cluster-o2a-m
CLUSTER_NAME=cluster-o2a

COMPOSER_BUCKET=gs://europe-west1-o2a-integratio-f690ede2-bucket
COMPOSER_NAME=o2a-integration
COMPOSER_LOCATION=europe-west1

DAG_NAME=test_fs_dag

if [[ ! -f ${LOCAL_APPLICATION_DIR}/configuration.properties ]]; then
echo
echo "Please copy ${LOCAL_APPLICATION_DIR}/configuration-template.properties to ${LOCAL_APPLICATION_DIR}/configuration.properties and update properties to match your case"
echo
exit 1
fi

python ${BASE_DIR}/oozie_converter.py -i ${BASE_DIR}/examples/fs -o ${BASE_DIR}/output/fs_test -u ${HADOOP_USER} -d ${DAG_NAME} $@

#gsutil cp ${BASE_DIR}/scripts/prepare.sh ${COMPOSER_BUCKET}/data/
#gsutil cp ${BASE_DIR}/output/pig_test/* ${COMPOSER_BUCKET}/dags/
#
#gcloud composer environments run ${COMPOSER_NAME} --location ${COMPOSER_LOCATION} list_dags
#gcloud composer environments run ${COMPOSER_NAME} --location ${COMPOSER_LOCATION} trigger_dag -- test_pig_dag
186 changes: 186 additions & 0 deletions oozie-to-airflow/mappers/fs_mapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import shlex
from typing import Set, NamedTuple

from converter.parser import Relation
from mappers.action_mapper import ActionMapper
from utils.template_utils import render_template
from xml.etree.ElementTree import Element

ACTION_TYPE = "fs"

FS_OP_MKDIR = "mkdir"
FS_OP_DELETE = "delete"
FS_OP_MOVE = "move"
FS_OP_CHMOD = "chmod"
FS_OP_TOUCHZ = "touchz"
FS_OP_CHGRP = "chgrp"
FS_OP_SETREP = "setrep"

FS_TAG_PATH = "path"
FS_TAG_SOURCE = "source"
FS_TAG_TARGET = "target"
FS_TAG_RECURSIVE = "recursive"
FS_TAG_DIRFILES = "dir-files"
FS_TAG_SKIPTRASH = "skip-trash"
FS_TAG_PERMISSIONS = "permissions"
FS_TAG_GROUP = "group"
FS_TAG_REPLFAC = "replication-factor"


def bool_value(node, attr_name, default="false"):
value = node.attrib.get(attr_name, default)
return value is not None and value != "false"


def prepare_mkdir_command(node: Element):
path = node.attrib[FS_TAG_PATH]
command = "fs -mkdir {path}".format(path=shlex.quote(path))
return command


def prepare_delete_command(node: Element):
path = node.attrib[FS_TAG_PATH]
command = "fs -rm -r {path}".format(path=shlex.quote(path))
if bool_value(node, FS_TAG_SKIPTRASH):
command += " -skipTrash"
return command


def prepare_move_command(node: Element):
source = node.attrib[FS_TAG_SOURCE]
target = node.attrib[FS_TAG_TARGET]

command = "fs -mv {source} {target}".format(source=source, target=target)
return command


def prepare_chmod_command(node: Element):
path = node.attrib[FS_TAG_PATH]
permission = node.attrib[FS_TAG_PERMISSIONS]
# TODO: Add support for dirFiles
# dirFiles = bool_value(node, FS_TAG_DIRFILES)
recursive = node.find(FS_TAG_RECURSIVE) is not None
extra_param = "-R" if recursive else ""

command = "fs -chmod {extra} {path} '{permission}'".format(
extra=extra_param, path=shlex.quote(path), permission=shlex.quote(permission)
)
return command


def prepare_touchz_command(node: Element):
path = node.attrib[FS_TAG_PATH]

command = "fs -touchz {path}".format(path=shlex.quote(path))
return command


def prepare_chgrp_command(node: Element):
path = node.attrib[FS_TAG_PATH]
group = node.attrib[FS_TAG_GROUP]

recursive = node.find(FS_TAG_RECURSIVE) is not None
extra_param = "-R" if recursive else ""

command = "fs -chgrp {extra} {path} {group}".format(
extra=extra_param, path=shlex.quote(path), group=shlex.quote(group)
)
return command


def prepare_setrep_command(node: Element):
path = node.attrib[FS_TAG_PATH]
fac = node.attrib[FS_TAG_REPLFAC]

command = "fs -setrep {fac} {path}".format(fac=shlex.quote(fac), path=shlex.quote(path))
return command


FS_OPERATION_MAPPERS = {
FS_OP_MKDIR: prepare_mkdir_command,
FS_OP_DELETE: prepare_delete_command,
FS_OP_MOVE: prepare_move_command,
FS_OP_CHMOD: prepare_chmod_command,
FS_OP_TOUCHZ: prepare_touchz_command,
FS_OP_CHGRP: prepare_chgrp_command,
FS_OP_SETREP: prepare_setrep_command,
}


class BaseFsActionMapper:
template_name = None


class SubOperator(NamedTuple):
task_id: str
rendered_template: str


def chain(ops):
return [Relation(from_name=a.task_id, to_name=b.task_id) for a, b in zip(ops, ops[1::])]


class FsMapper(ActionMapper):
def get_sub_operators(self):
if len(self.oozie_node) == 0:
return [
SubOperator(
task_id=self.name,
rendered_template=render_template(
"dummy.tpl", task_id=self.name, trigger_rule=self.trigger_rule
),
)
]
return [self.parse_fs_action(i, node) for i, node in enumerate(self.oozie_node)]

def convert_to_text(self):
sub_ops = self.get_sub_operators()

return render_template(
template_name="fs.tpl",
task_id=self.name,
trigger_rule=self.trigger_rule,
sub_ops=sub_ops,
relations=chain(sub_ops),
)

@staticmethod
def required_imports() -> Set[str]:
return {
"from airflow.operators import dummy_operator",
"from airflow.operators import bash_operator",
"import shlex",
}

def get_first_task_id(self):
return self.get_sub_operators()[0].task_id

def get_last_task_id(self):
return self.get_sub_operators()[-1].task_id

def parse_fs_action(self, index: int, node: Element):
task_id = "{}_fs_{}".format(self.name, index)

tag_name = node.tag
mapper_fn = FS_OPERATION_MAPPERS.get(tag_name)

if not mapper_fn:
raise Exception("Unknown FS operation: {}".format(tag_name))

pig_command = mapper_fn(node)
rendered_template = render_template("fs_op.tpl", task_id=task_id, pig_command=pig_command)

return SubOperator(task_id=task_id, rendered_template=rendered_template)
6 changes: 6 additions & 0 deletions oozie-to-airflow/templates/fs.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{% for op in sub_ops %}
{{ op.rendered_template }}
{% endfor %}
{% with relations=relations %}
{% include "relations.tpl" %}
{% endwith %}
4 changes: 4 additions & 0 deletions oozie-to-airflow/templates/fs_op.tpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{{ task_id }} = bash_operator.BashOperator(
bash_command="gcloud dataproc jobs submit pig --cluster=cluster-o2a --execute {}".format(shlex.quote("{{ pig_command }}")),
task_id='{{ task_id }}',
)
Loading

0 comments on commit 67a3e05

Please sign in to comment.