Airflow Dependencies & Trigger Rules
Setting Dependencies between tasks in Airflow
There are different ways to set dependencies in Airflow.
The simplest way is to use bitshift operators like << and >>
task1 >> task2 task3 >> [task4, task5] >> task6
or to use functions set_upstream and set_downstream.
task1.set_downstream(task2)
Running dag.test_cycle() can be helpful (e.g. adding in a unit test) to make sure that it doesn't contain a cycle, as it is not allowed.
In case you want to set multiple parallel dependencies, you can use a function chain, as bitshift operator between two lists like [t1, t2] >> [t3, t4] is not supported.
chain([task1, task3], [task2, task4], task5)
When you set dependencies between tasks, airflow by default runs the tasks only when all parents have succeeded. However, you can add more complex rules by using trigger rules.
Trigger Rules
Trigger rules can be specified as an argument on operators.
all_success
: (default) all parents have succeededall_failed
: all parents are in a failed or upstream_failed stateall_done
: all parents are done with their execution- all_skipped: All upstream tasks are in a skipped state
one_failed
: fires as soon as at least one parent has failed, it does not wait for all parents to be doneone_success
: fires as soon as at least one parent succeeds, it does not wait for all parents to be donenone_failed
: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skippednone_failed_or_skipped
: all parents have not failed (failed or upstream_failed) and at least one parent has succeeded.none_skipped
: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed statealways
: No dependencies at all, run this task at any time
Example
start = DummyOperator(task_id='start') source_1 = PythonOperator(task_id="source1", python_callable=update_source) source_2 = PythonOperator(task_id="source2", python_callable=update_source) source_3 = PythonOperator(task_id="source3", python_callable=update_source) task1 = DummyOperator(task_id='one_success', trigger_rule='one_success') task2 = DummyOperator(task_id='none_failed', trigger_rule='none_failed') final = DummyOperator(task_id='final') start >> [source_1, source_2, source_3] [source_1, source_2, source_3] >> task1 [source_1, source_2, source_3] >> task2 [one_success, none_failed] >> final
The above code is an example where 'final' task is executed when one of the source1, source2, or source3 succeeds, but none of them fails.
Two operators with different trigger rules were used to make sure both rules comply.
Comments
Post a Comment