How to Skip Tasks in Airflow DAGs
Recently, I was attempting to add a new task in an existing Airflow DAG that would only run on specific days of the week. However, I was surprised to find that skipping tasks in Airflow isn't as straightforward as I anticipated.
In this article, I will demonstrate how to skip tasks in Airflow DAGs, specifically focusing on the use of AirflowSkipException
when working with PythonOperator
or Operators that inherit from built-in operators (such as TriggerDagRunOperator
).
Lastly, I will discuss the use of BranchPythonOperator
and ShortCircuitOperator
and how then can potentially be used to decide when a tasks needs to be skipped.
Now let's assume we have an Airlflow DAG consisting of three tasks

`PythonOperator
ta`sks – Source: Authorfrom datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
dag_id='test_dag',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
first_task = PythonOperator(task_id='task_a', python_callable=lambda: print('Hi from task_a'))
second_task = PythonOperator(task_id='task_b', python_callable=lambda: print('Hi form task_b'))
third_task = PythonOperator(task_id='task_c', python_callable=lambda: print('Hi form task_c'))
first_task >> second_task >> third_task
Skipping PythonOperator tasks
The most intuitive way to skip tasks created via PythonOperator
is to raise [AirflowSkipException](https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/exceptions/index.html#airflow.exceptions.AirflowSkipException)
. This means python_callable
function that gets executed via PythonOperator
needs to implement the logic that decides when to raise exception.
Let's return to the example DAG we previously discussed and consider a scenario where task task_b
must only run on Mondays of every week of the year.
from datetime import datetime
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
def my_func(**context):
# If the DagRun start date is not a Monday, then skip this task
dag_run_start_date = context['dag_run'].start_date
if dag_run_start_date.weekday() != 0:
raise AirflowSkipException
# Anything beyond this line will be executed only if the
# task is not skipped, based on the condition specified above
print('Hi from task_b')
with DAG(
dag_id='test_dag',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
first_task = PythonOperator(task_id='task_a', python_callable=lambda: print('Hi from task_a'))
second_task = PythonOperator(task_id='task_b', python_callable=my_func)
third_task = PythonOperator(
task_id='task_c',
python_callable=lambda: print('Hi form task_c'),
trigger_rule='none_failed'
)
first_task >> second_task >> third_task
If DagRun's start date is not a Monday, then task_b
will be skipped and appear in pink colour (that denotes skipped tasks as per the legend on the UI).

none_failed
– Source: AuthorIt's important to pay attention to the 'none_failed'
value provided to trigger_rule
keyword argument of third_task
. If we skip this configuration, then task_c
will also be skipped whenever task_b
is also skipped.

all_success
value – Source: AuthorBy default, trigger_rule
is set to all_success
which means that a task will be executed only if all of the upstream tasks are not skipped and are successful.
If the DagRun's start date is a Monday, then task_b
will be executed:

Skipping built-in Operator tasks
Now let's assume we have another DAG consisting of three tasks, including a TriggerDagRunOperator
that is used to trigger another DAG.
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id='test_dag',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
first_task = PythonOperator(
task_id='task_a',
python_callable=lambda: print('Hi from task_a'),
)
trigger_task = TriggerDagRunOperator(
task_id='trigger_other_dag',
trigger_dag_id='example_branch_operator',
)
last_task = PythonOperator(
task_id='task_c',
python_callable=lambda: print('Hi from task_c'),
trigger_rule='none_failed',
)
first_task >> second_task >> third_task

Now things are a bit more complicated if you are looking into skipping tasks created using built-in operators (or even custom ones that inherit from built-in operators). To do so, there are essentially a few different options we can consider. In this section, I'm going to provide all of them and it's up to you to pick the one that best suits your needs.
The first option we have is BranchPythonOperator
that is used to create a branching logic such that the DAG can take certain direction based on some conditional logic. Once again, let's assume that trigger_other_dag
– that essentially uses a TriggerDagRunOperator
to trigger another Airflow DAG – needs to be executed only on Mondays.
We can choose when to skip a task using a BranchPythonOperator
with two branches and a callable that underlying branching logic.
def choose_branch(**context):
dag_run_start_date = context['dag_run'].start_date
if dag_run_start_date.weekday() != 0: # check if Monday
return 'task_a'
return 'trigger_other_dag'
Now let's go ahead and create the Airlfow Tasks.
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator, PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id='test_dag',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=choose_branch,
)
first_task = PythonOperator(
task_id='task_a',
python_callable=lambda: print('Hi from task_a'),
)
trigger_task = TriggerDagRunOperator(
task_id='trigger_other_dag',
trigger_dag_id='example_branch_operator',
)
last_task = PythonOperator(
task_id='task_c',
python_callable=lambda: print('Hi from task_c'),
trigger_rule='none_failed',
)
dummy_task = DummyOperator(task_id='skip', )
first_task >>branch_task >> [trigger_task, dummy_task] >> last_task

BranchPythonOperator
to decide when to skip a task – Source: AuthorNote that branch operators cannot have empty paths and thus we've had to create a dummy task using DummyOperator
and corresponds to skipping task. Whenever the trigger task needs to be skipped, then skip
operator will be executed (not really..) instead:

trigger_other_dag
task— Source: AuthorThe second option we have is to use ShortCircuitOperator
in order to implement a conditional logic to decide when to skip a particular task.
The ShortCircuitOperator is derived from the PythonOperator and evaluates the result of a
python_callable
. If the returned result isFalse
or a falsy value, the pipeline will be short-circuited. Downstream tasks will be marked with a state of "skipped" based on the short-circuiting mode configured. If the returned result isTrue
or a truthy value, downstream tasks proceed as normal and anXCom
of the returned result is pushed.
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator, ShortCircuitOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
def is_monday(**context):
return context['dag_run'].start_date.weekday() == 0
with DAG(
dag_id='test_dag',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
is_monday_task = ShortCircuitOperator(
task_id='is_not_monday',
python_callable=is_monday,
ignore_downstream_trigger_rules=False,
)
first_task = PythonOperator(
task_id='task_a',
python_callable=lambda: print('Hi from task_a'),
)
trigger_task = TriggerDagRunOperator(
task_id='trigger_other_dag',
trigger_dag_id='example_branch_operator',
)
last_task = PythonOperator(
task_id='task_c',
python_callable=lambda: print('Hi from task_c'),
trigger_rule='none_failed',
)
first_task >> is_monday_task >> trigger_task >> last_task

Now if is_monday()
evaluates to False Airflow will skip all downstream tasks. However, this is the default behaviour which means that if we want just one task to be skipped we also need to provide ignore_downstream_trigger_rules=False
when creating an instance of ShortCircuitOperator
. This configuration will then take into account the corresponding trigger rules for downstream tasks and decide which should still be skipped or executed (note the trigger rule in our last task task_c
).

is_monday
ShortCircuitOperator returns False, then skip only – Source: AuthorIf the result from python_callable
is True
then downstream tasks will also be executed.

is_monday
ShortCircuitOperator returns True, then all downstream tasks will be executed – Source: AuthorLastly, the third option involves the implementation of a sub-class that inherits from a built-in operator, such as TriggerDagRunOperator
.
The following custom operator inhertis from the built-in TriggerDagRunOperator
and takes an additional callable argument that will be used to decide whether an AirflowSkipException
will be raised.
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowSkipException
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from typing import Any, Dict, Callable, TypeVar
Context = TypeVar('Context', bound=Dict[Any, Any])
class ConditionalTriggerDagRunOperator(TriggerDagRunOperator):
"""
This is a custom operator that will execute TriggerDagRunOperator only if
`conditional_checker_callable` callable result evaluates to `True`. Otherwise, the task will
be skipped by raising a `AirflowSkipException`
"""
@apply_defaults
def __init__(
self,
conditional_checker_callable: Callable[[Context], bool],
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.conditional_checker_callable = conditional_checker_callable
def execute(self, context: Context) -> None:
if not self.conditional_checker_callable(context):
raise AirflowSkipException
super().execute(context)
If the result of conditional_checker_callable
returns True
, then the operator will be executed otherwise it will be skipped. Now the full code for our DAG becomes as
from datetime import datetime
from airflow import DAG
from airflow.exceptions import AirflowSkipException
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from typing import Any, Dict, Callable, TypeVar
Context = TypeVar('Context', bound=Dict[Any, Any])
class ConditionalTriggerDagRunOperator(TriggerDagRunOperator):
"""
This is a custom operator that will execute TriggerDagRunOperator only if
`conditional_checker_callable` callable result evaluates to `True`. Otherwise, the task will
be skipped by raising a `AirflowSkipException`
"""
def __init__(
self,
conditional_checker_callable: Callable[[Context], bool],
**kwargs: Any,
) -> None:
super().__init__(**kwargs)
self.conditional_checker_callable = conditional_checker_callable
def execute(self, context: Context) -> None:
if not self.conditional_checker_callable(context):
raise AirflowSkipException
super().execute(context)
def is_monday(**context):
return context['dag_run'].start_date.weekday() == 0
with DAG(
dag_id='test_dag',
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['example'],
) as dag:
first_task = PythonOperator(
task_id='task_a',
python_callable=lambda: print('Hi from task_a'),
)
trigger_task = ConditionalTriggerDagRunOperator(
task_id='trigger_other_dag',
conditional_checker_callable=is_monday,
trigger_dag_id='example_branch_operator',
)
last_task = PythonOperator(
task_id='task_c',
python_callable=lambda: print('Hi from task_c'),
trigger_rule='none_failed',
)
first_task >> trigger_task >> last_task

Now if the condition results in False
, the task created using our custom operator will be skipped.

Likewise, if the condition is True
the task will be executed.

Notice that for this particular option, we don't need to create additional tasks like we did in the two previous examples with BranchPythonOperator
and ShortCircuitOperator
which is something I personally like given that it declutters our DAG.
Note that a similar behaviour can be achived by inheriting functionality from SkipMixin
Mixin Airlflow class. For more information, feel free to take a look at the documentation.
Personally, I do like the last approach as it's a bit more clear when it comes to the DAG visualisation on the Airflow UI and at the same time, by creating a sub-class from a built-in operator to implement skipping logic, you can re-use it for other DAGs as well. But this comes down to the specific use-case and any preference you may have, so feel free to choose the approach that best suits your needs.
Final Thoughts
Skipping tasks while authoring Airflow DAGs is a very common requirement that lets Engineers orchestrate tasks in a more dynamic and sophisticated way.
In this article, we demonstrate many different options when it comes to implementing logic that requires conditional execution of certain Airflow tasks. More specifically, we demonstrated how you can implement such functionality when using PythonOperator
by raising AirflowSkipException
.
Furthermore, we also demonstrated a few different approaches when it comes to skipping tasks created using built-in Operators, other than the PythonOperator
. Based on your specific use-case it's up to you to decide which approach to take.
Even though skipping tasks is a common requirement, it doesn't seem that Airflow has a built-in feature to perform conditional runs within a specific DAG. I would expect that a feature allowing developers to specify trigger conditions in specific tasks would be available, but I am pretty sure that sooner or later this functionality will be implemented and packed into a future Airflow version.