Airflow Dependencies & Trigger Rules

Setting Dependencies between tasks in Airflow

There are different ways to set dependencies in Airflow. 

The simplest way is to use bitshift operators like << and >>

task1 >> task2
task3 >> [task4, task5] >> task6


or to use functions set_upstream  and set_downstream.
task1.set_downstream(task2)


Running  dag.test_cycle() can be helpful (e.g. adding in a unit test) to make sure that it doesn't contain a cycle, as it is not allowed. 

In case you want to set multiple parallel dependencies, you can use a function chain, as bitshift operator between two lists like [t1, t2] >> [t3, t4] is not supported. 

chain([task1, task3], [task2, task4], task5)


When you set dependencies between tasks, airflow by default runs the tasks only when all parents have succeeded. However, you can add more complex rules by using trigger rules.


Trigger Rules

Trigger rules can be specified as an argument on operators.


  • all_success: (default) all parents have succeeded
  • all_failed: all parents are in a failed or upstream_failed state
  • all_done: all parents are done with their execution
  • all_skipped: All upstream tasks are in a skipped state
  • one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
  • one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
  • none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
  • none_failed_or_skipped: all parents have not failed (failed or upstream_failed) and at least one parent has succeeded.
  • none_skipped: no parent is in a skipped state, i.e. all parents are in a success, failed, or upstream_failed state
  • always: No dependencies at all, run this task at any time

Example 

start = DummyOperator(task_id='start')

source_1 = PythonOperator(task_id="source1", python_callable=update_source)
source_2 = PythonOperator(task_id="source2", python_callable=update_source)
source_3 = PythonOperator(task_id="source3", python_callable=update_source)

task1 = DummyOperator(task_id='one_success', trigger_rule='one_success')
task2 = DummyOperator(task_id='none_failed', trigger_rule='none_failed')

final = DummyOperator(task_id='final')

start >> [source_1, source_2, source_3] 
[source_1, source_2, source_3] >> task1
[source_1, source_2, source_3] >> task2
[one_success, none_failed] >> final


The above code is an example where 'final' task is executed when one of the source1, source2, or source3 succeeds, but none of them fails. 
Two operators with different trigger rules were used to make sure both rules comply.

Comments