Skip to content

Commit

Permalink
[AIRFLOW-5644] Simplify TriggerDagRunOperator usage (#6317)
Browse files Browse the repository at this point in the history
* [AIRFLOW-5644] Simplify TriggerDagRunOperator usage

* Call timezone.parse in execute() to allow for templating
  • Loading branch information
BasPH authored and Fokko committed Oct 23, 2019
1 parent 74d2a0d commit f3c3812
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 202 deletions.
5 changes: 5 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ assists users migrating to a new version.

## Airflow Master

### Simplification of the TriggerDagRunOperator

The TriggerDagRunOperator now takes a `conf` argument to which a dict can be provided as conf for the DagRun.
As a result, the `python_callable` argument was removed. PR: https://github.com/apache/airflow/pull/6317.

### Changes in Google Cloud Platform related hooks

The change in GCP operators implies that GCP Hooks for those operators require now keyword parameters rather
Expand Down
54 changes: 11 additions & 43 deletions airflow/example_dags/example_trigger_controller_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,58 +16,26 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
2. The Target DAG - DAG being triggered (in example_trigger_target_dag.py)

This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
a. A python callable that decides whether or not to trigger the Target DAG
b. An optional params dict passed to the python callable to help in
evaluating whether or not to trigger the Target DAG
c. The id (name) of the Target DAG
d. The python callable can add contextual info to the DagRun created by
way of adding a Pickleable payload (e.g. dictionary of primitives). This
state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
"""
Example usage of the TriggerDagRunOperator. This example holds 2 DAGs:
1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG
2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG
"""

import pprint

import airflow
import airflow.utils.dates
from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator

pp = pprint.PrettyPrinter(indent=4)


def conditionally_trigger(context, dag_run_obj):
"""This function decides whether or not to Trigger the remote DAG"""
c_p = context['params']['condition_param']
print("Controller DAG : conditionally_trigger = {}".format(c_p))
if context['params']['condition_param']:
dag_run_obj.payload = {'message': context['params']['message']}
pp.pprint(dag_run_obj.payload)
return dag_run_obj
return None


# Define the DAG
dag = DAG(
dag_id='example_trigger_controller_dag',
default_args={
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(2),
},
schedule_interval='@once',
dag_id="example_trigger_controller_dag",
default_args={"owner": "airflow", "start_date": airflow.utils.dates.days_ago(2)},
schedule_interval="@once",
)

# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(
task_id='test_trigger_dagrun',
trigger_dag_id="example_trigger_target_dag",
python_callable=conditionally_trigger,
params={'condition_param': True, 'message': 'Hello World'},
task_id="test_trigger_dagrun",
trigger_dag_id="example_trigger_target_dag", # Ensure this equals the dag_id of the DAG to trigger
conf={"message": "Hello World"},
dag=dag,
)
52 changes: 12 additions & 40 deletions airflow/example_dags/example_trigger_target_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,37 @@
# under the License.

"""
This example illustrates the use of the TriggerDagRunOperator. There are 2
entities at work in this scenario:
1. The Controller DAG - the DAG that conditionally executes the trigger
(in example_trigger_controller.py)
2. The Target DAG - DAG being triggered
This example illustrates the following features :
1. A TriggerDagRunOperator that takes:
a. A python callable that decides whether or not to trigger the Target DAG
b. An optional params dict passed to the python callable to help in
evaluating whether or not to trigger the Target DAG
c. The id (name) of the Target DAG
d. The python callable can add contextual info to the DagRun created by
way of adding a Pickleable payload (e.g. dictionary of primitives). This
state is then made available to the TargetDag
2. A Target DAG : c.f. example_trigger_target_dag.py
Example usage of the TriggerDagRunOperator. This example holds 2 DAGs:
1. 1st DAG (example_trigger_controller_dag) holds a TriggerDagRunOperator, which will trigger the 2nd DAG
2. 2nd DAG (example_trigger_target_dag) which will be triggered by the TriggerDagRunOperator in the 1st DAG
"""

import pprint

import airflow
import airflow.utils.dates
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

pp = pprint.PrettyPrinter(indent=4)

args = {
'start_date': airflow.utils.dates.days_ago(2),
'owner': 'Airflow',
}

dag = DAG(
dag_id='example_trigger_target_dag',
default_args=args,
dag_id="example_trigger_target_dag",
default_args={"start_date": airflow.utils.dates.days_ago(2), "owner": "Airflow"},
schedule_interval=None,
)


def run_this_func(**kwargs):
def run_this_func(**context):
"""
Print the payload "message" passed to the DagRun conf attribute.
:param dict kwargs: Context
:param context: The execution context
:type context: dict
"""
print("Remotely received value of {} for key=message".
format(kwargs['dag_run'].conf['message']))
print("Remotely received value of {} for key=message".format(context["dag_run"].conf["message"]))


run_this = PythonOperator(
task_id='run_this',
python_callable=run_this_func,
dag=dag,
)
run_this = PythonOperator(task_id="run_this", python_callable=run_this_func, dag=dag)

# You can also access the DagRun object in templates
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: '
'{{ dag_run.conf["message"] if dag_run else "" }}" ',
bash_command='echo "Here is the message: \'{{ dag_run.conf["message"] if dag_run else "" }}\'"',
dag=dag,
)
85 changes: 34 additions & 51 deletions airflow/operators/dagrun_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,81 +18,64 @@
# under the License.

import datetime
import json
from typing import Callable, Dict, Optional, Union
from typing import Dict, Optional, Union

from airflow.api.common.experimental.trigger_dag import trigger_dag
from airflow.models import BaseOperator
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults


class DagRunOrder:
def __init__(self, run_id=None, payload=None):
self.run_id = run_id
self.payload = payload


class TriggerDagRunOperator(BaseOperator):
"""
Triggers a DAG run for a specified ``dag_id``
:param trigger_dag_id: the dag_id to trigger (templated)
:type trigger_dag_id: str
:param python_callable: a reference to a python function that will be
called while passing it the ``context`` object and a placeholder
object ``obj`` for your callable to fill and return if you want
a DagRun created. This ``obj`` object contains a ``run_id`` and
``payload`` attribute that you can modify in your function.
The ``run_id`` should be a unique identifier for that DAG run, and
the payload has to be a picklable object that will be made available
to your tasks while executing that DAG run. Your function header
should look like ``def foo(context, dag_run_obj):``
:type python_callable: python callable
:param conf: Configuration for the DAG run
:type conf: dict
:param execution_date: Execution date for the dag (templated)
:type execution_date: str or datetime.datetime
"""
template_fields = ('trigger_dag_id', 'execution_date')
ui_color = '#ffefeb'

template_fields = ("trigger_dag_id", "execution_date", "conf")
ui_color = "#ffefeb"

@apply_defaults
def __init__(
self,
trigger_dag_id: str,
python_callable: Optional[Callable[[Dict, DagRunOrder], DagRunOrder]] = None,
execution_date: Optional[Union[str, datetime.datetime]] = None,
*args, **kwargs) -> None:
self,
trigger_dag_id: str,
conf: Optional[Dict] = None,
execution_date: Optional[Union[str, datetime.datetime]] = None,
*args,
**kwargs
) -> None:
super().__init__(*args, **kwargs)
self.python_callable = python_callable
self.trigger_dag_id = trigger_dag_id
self.conf = conf

self.execution_date = None # type: Optional[Union[str, datetime.datetime]]
if isinstance(execution_date, datetime.datetime):
self.execution_date = execution_date.isoformat()
elif isinstance(execution_date, str):
if execution_date is None or isinstance(execution_date, (str, datetime.datetime)):
self.execution_date = execution_date
elif execution_date is None:
self.execution_date = None
else:
raise TypeError(
'Expected str or datetime.datetime type '
'for execution_date. Got {}'.format(
type(execution_date)))
"Expected str or datetime.datetime type for execution_date. "
"Got {}".format(type(execution_date))
)

def execute(self, context):
if self.execution_date is not None:
run_id = 'trig__{}'.format(self.execution_date)
self.execution_date = timezone.parse(self.execution_date)
def execute(self, context: Dict):
if isinstance(self.execution_date, datetime.datetime):
run_id = "trig__{}".format(self.execution_date.isoformat())
elif isinstance(self.execution_date, str):
run_id = "trig__{}".format(self.execution_date)
self.execution_date = timezone.parse(self.execution_date) # trigger_dag() expects datetime
else:
run_id = 'trig__' + timezone.utcnow().isoformat()
dro = DagRunOrder(run_id=run_id)
if self.python_callable is not None:
dro = self.python_callable(context, dro)
if dro:
trigger_dag(dag_id=self.trigger_dag_id,
run_id=dro.run_id,
conf=json.dumps(dro.payload),
execution_date=self.execution_date,
replace_microseconds=False)
else:
self.log.info("Criteria not met, moving on")
run_id = "trig__{}".format(timezone.utcnow().isoformat())

# Ignore MyPy type for self.execution_date because it doesn't pick up the timezone.parse() for strings
trigger_dag( # type: ignore
dag_id=self.trigger_dag_id,
run_id=run_id,
conf=self.conf,
execution_date=self.execution_date,
replace_microseconds=False,
)
2 changes: 1 addition & 1 deletion airflow/utils/dates.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def date_range(start_date, end_date=None, num=None, delta=None):
delta = abs(delta)
dates = []
if end_date:
if timezone.is_naive(start_date):
if timezone.is_naive(start_date) and not timezone.is_naive(end_date):
end_date = timezone.make_naive(end_date, tz)
while start_date <= end_date:
if timezone.is_naive(start_date):
Expand Down
67 changes: 0 additions & 67 deletions tests/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
)
from airflow.operators.bash_operator import BashOperator
from airflow.operators.check_operator import CheckOperator, ValueCheckOperator
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.settings import Session
Expand Down Expand Up @@ -528,18 +527,6 @@ def check_failure(context, test_case=self):
start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
self.assertTrue(data['called'])

def test_trigger_dagrun(self):
def trigga(_, obj):
if True:
return obj

t = TriggerDagRunOperator(
task_id='test_trigger_dagrun',
trigger_dag_id='example_bash_operator',
python_callable=trigga,
dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

def test_dryrun(self):
t = BashOperator(
task_id='test_dryrun',
Expand Down Expand Up @@ -985,60 +972,6 @@ def test_run_command(self):
self.assertEqual(run_command('echo "foo bar"'), 'foo bar\n')
self.assertRaises(AirflowConfigException, run_command, 'bash -c "exit 1"')

def test_trigger_dagrun_with_execution_date(self):
utc_now = timezone.utcnow()
run_id = 'trig__' + utc_now.isoformat()

def payload_generator(context, object): # pylint: disable=unused-argument
object.run_id = run_id
return object

task = TriggerDagRunOperator(task_id='test_trigger_dagrun_with_execution_date',
trigger_dag_id='example_bash_operator',
python_callable=payload_generator,
execution_date=utc_now,
dag=self.dag)
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
dag_runs = DagRun.find(dag_id='example_bash_operator', run_id=run_id)
self.assertEqual(len(dag_runs), 1)
dag_run = dag_runs[0]
self.assertEqual(dag_run.execution_date, utc_now)

def test_trigger_dagrun_with_str_execution_date(self):
utc_now_str = timezone.utcnow().isoformat()
self.assertIsInstance(utc_now_str, (str,))
run_id = 'trig__' + utc_now_str

def payload_generator(context, object): # pylint: disable=unused-argument
object.run_id = run_id
return object

task = TriggerDagRunOperator(
task_id='test_trigger_dagrun_with_str_execution_date',
trigger_dag_id='example_bash_operator',
python_callable=payload_generator,
execution_date=utc_now_str,
dag=self.dag)
task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
dag_runs = DagRun.find(dag_id='example_bash_operator', run_id=run_id)
self.assertEqual(len(dag_runs), 1)
dag_run = dag_runs[0]
self.assertEqual(dag_run.execution_date.isoformat(), utc_now_str)

def test_trigger_dagrun_with_templated_execution_date(self):
task = TriggerDagRunOperator(
task_id='test_trigger_dagrun_with_str_execution_date',
trigger_dag_id='example_bash_operator',
execution_date='{{ execution_date }}',
dag=self.dag)

self.assertTrue(isinstance(task.execution_date, str))
self.assertEqual(task.execution_date, '{{ execution_date }}')

ti = TaskInstance(task=task, execution_date=DEFAULT_DATE)
ti.render_templates()
self.assertEqual(timezone.parse(task.execution_date), DEFAULT_DATE)

def test_externally_triggered_dagrun(self):
TI = TaskInstance

Expand Down
Loading

0 comments on commit f3c3812

Please sign in to comment.