How To Trigger A Dag On The Success Of A Another Dag In Airflow Using Python?
I have a python DAG Parent Job and DAG Child Job. The tasks in the Child Job should be triggered on the successful completion of the Parent Job tasks which are run daily. How can a
Solution 1:
Answer is in this thread already. Below is demo code:
Parent dag:
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
}
dag = DAG('Parent_dag', default_args=default_args, schedule_interval='@daily')
leave_work = DummyOperator(
task_id='leave_work',
dag=dag,
)
cook_dinner = DummyOperator(
task_id='cook_dinner',
dag=dag,
)
leave_work >> cook_dinner
Child dag:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 4, 29),
}
dag = DAG('Child_dag', default_args=default_args, schedule_interval='@daily')
# Use ExternalTaskSensor to listen to the Parent_dag and cook_dinner task# when cook_dinner is finished, Child_dag will be triggered
wait_for_dinner = ExternalTaskSensor(
task_id='wait_for_dinner',
external_dag_id='Parent_dag',
external_task_id='cook_dinner',
start_date=datetime(2020, 4, 29),
execution_delta=timedelta(hours=1),
timeout=3600,
)
have_dinner = DummyOperator(
task_id='have_dinner',
dag=dag,
)
play_with_food = DummyOperator(
task_id='play_with_food',
dag=dag,
)
wait_for_dinner >> have_dinner
wait_for_dinner >> play_with_food
Images:
Dags
Parent_dag
Child_dag
Solution 2:
As requested by @pankaj, I'm hereby adding a snippet depicting reactive-triggering using TriggerDagRunOperator
(as opposed to poll-based triggering of ExternalTaskSensor
)
from typing importListfrom airflow.models.baseoperator import BaseOperator
from airflow.models.dag import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.utils.trigger_rule import TriggerRule
# DAG object
my_dag: DAG = DAG(dag_id='my_dag',
start_date=..)
..
# a list of 'tail' tasks: tasks that have no downstream tasks
tail_tasks_of_first_dag: List[BaseOperator] = my_magic_function_that_determines_all_tail_tasks(..)
..
# our trigger task
my_trigger_task: TriggerDagRunOperator = TriggerDagRunOperator(dag=my_dag,
task_id='my_trigger_task',
trigger_rule=TriggerRule.ALL_SUCCESS,
external_dag_id='id_of_dag_to_be_triggered')
# our trigger task should run when all 'tail' tasks have completed / succeeded
tail_tasks_of_first_dag >> my_trigger_task
Note that snippet is for reference purpose only; it has NOT been tested
Points to note / References
Solution 3:
I believe you are looking for SubDags operator, running a Dag in a bigger dag. Note that creating many subdags like in the example below gets messy pretty quick, so I recommend splitting each subdag in a file and importing then in a main file.
The SubDagOperator is simple to use you need to give an Id, a subdag (the child) and a dag(the parent)
subdag_2 = SubDagOperator(
task_id="just_some_id",
subdag=child_subdag, <---- this must be a DAG
dag=parent_dag, <----- this must be a DAG
)
From their examples repo
from airflow import DAG
from airflow.example_dags.subdags.subdag import subdag
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.dates import days_ago
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
defsubdag(parent_dag_name, child_dag_name, args):
dag_subdag = DAG(
dag_id='%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
schedule_interval="@daily",
)
for i inrange(5):
DummyOperator(
task_id='%s-task-%s' % (child_dag_name, i + 1),
default_args=args,
dag=dag_subdag,
)
return dag_subdag
DAG_NAME = 'example_subdag_operator'
args = {
'owner': 'airflow',
'start_date': days_ago(2),
}
dag = DAG(
dag_id=DAG_NAME,
default_args=args,
schedule_interval="@once",
tags=['example']
)
start = DummyOperator(
task_id='start-of-main-job',
dag=dag,
)
some_other_task = DummyOperator(
task_id='some-other-task',
dag=dag,
)
end = DummyOperator(
task_id='end-of-main-job',
dag=dag,
)
subdag = SubDagOperator(
task_id='run-this-dag-after-previous-steps',
subdag=subdag(DAG_NAME, 'run-this-dag-after-previous-steps', args),
dag=dag,
)
start >> some_other_task >> end >> subdag
Post a Comment for "How To Trigger A Dag On The Success Of A Another Dag In Airflow Using Python?"