Cluster Policies¶
If you want to check or mutate DAGs or Tasks on a cluster-wide level, then a Cluster Policy will let you do that. They have three main purposes:
Checking that DAGs/Tasks meet a certain standard
Setting default arguments on DAGs/Tasks
Performing custom routing logic
There are three main types of cluster policy:
: Takes aDAG
parameter calleddag
. Runs at load time of the DAG from DagBagDagBag
: Takes aBaseOperator
parameter calledtask
. The policy gets executed when the task is created during parsing of the task from DagBag at load time. This means that the whole task definition can be altered in the task policy. It does not relate to a specific task running in a DagRun. Thetask_policy
defined is applied to all the task instances that will be executed in the future.task_instance_mutation_hook
: Takes aTaskInstance
parameter calledtask_instance
. Thetask_instance_mutation_hook
applies not to a task but to the instance of a task that relates to a particular DagRun. It is executed in a “worker”, not in the dag file processor, just before the task instance is executed. The policy is only applied to the currently executed run (i.e. instance) of that task.
The DAG and Task cluster policies can raise the AirflowClusterPolicyViolation
exception to indicate that the dag/task they were passed is not compliant and should not be loaded.
They can also raise the AirflowClusterPolicySkipDag
when skipping that DAG is needed intentionally. Unlike AirflowClusterPolicyViolation
this exception is not displayed on the Airflow web UI (Internally, it’s not recorded on import_error
table on meta database.)
Any extra attributes set by a cluster policy take priority over those defined in your DAG file; for example,
if you set an sla
on your Task in the DAG file, and then your cluster policy also sets an sla
, the
cluster policy’s value will take precedence.
How do define a policy function¶
There are two ways to configure cluster policies:
create an
file somewhere in the python search path (theconfig/
folder under your $AIRFLOW_HOME is a good “default” location) and then add callables to the file matching one or more of the cluster policy names above (e.g.dag_policy
See Configuring local settings for details on how to configure local settings.
By using a setuptools entrypoint in a custom module using the Pluggy interface.
Added in version 2.6.
This method is more advanced and for people who are already comfortable with python packaging.
First create your policy function in a module:
from airflow.policies import hookimpl @hookimpl def task_policy(task) -> None: # Mutate task in place # ... print(f"Hello from {__file__}")
And then add the entrypoint to your project specification. For example, using
:[build-system] requires = ["setuptools", "wheel"] build-backend = "setuptools.build_meta" [project] name = "my-airflow-plugin" version = "0.0.1" # ... dependencies = ["apache-airflow>=2.6"] [project.entry-points.'airflow.policy'] _ = 'my_airflow_plugin.policies'
The entrypoint group must be
, and the name is ignored. The value should be your module (or class) decorated with the@hookimpl
marker.Once you have done that, and you have installed your distribution into your Airflow env, the policy functions will get called by the various Airflow components. (The exact call order is undefined, so don’t rely on any particular calling order if you have multiple plugins).
One important thing to note (for either means of defining policy functions) is that the argument names must exactly match as documented below.
Available Policy Functions¶
- airflow.policies.task_policy(task)[source]¶
Allow altering tasks after they are loaded in the DagBag.
It allows administrator to rewire some task’s parameters. Alternatively you can raise
exception to stop DAG from being executed.Here are a few examples of how this can be useful:
You could enforce a specific queue (say the
queue) for tasks using theSparkOperator
to make sure that these tasks get wired to the right workersYou could enforce a task timeout policy, making sure that no tasks run for more than 48 hours
- Parameters:
task (airflow.models.baseoperator.BaseOperator) – task to be mutated
- airflow.policies.dag_policy(dag)[source]¶
Allow altering DAGs after they are loaded in the DagBag.
It allows administrator to rewire some DAG’s parameters. Alternatively you can raise
exception to stop DAG from being executed.Here are a few examples of how this can be useful:
You could enforce default user for DAGs
Check if every DAG has configured tags
- Parameters:
dag (airflow.models.dag.DAG) – dag to be mutated
- airflow.policies.task_instance_mutation_hook(task_instance)[source]¶
Allow altering task instances before being queued by the Airflow scheduler.
This could be used, for instance, to modify the task instance during retries.
- Parameters:
task_instance (airflow.models.taskinstance.TaskInstance) – task instance to be mutated
- airflow.policies.pod_mutation_hook(pod)[source]¶
Mutate pod before scheduling.
This setting allows altering
object before they are passed to the Kubernetes client for scheduling.This could be used, for instance, to add sidecar or init containers to every worker pod launched by KubernetesExecutor or KubernetesPodOperator.
- airflow.policies.get_airflow_context_vars(context)[source]¶
Inject airflow context vars into default airflow context vars.
This setting allows getting the airflow context vars, which are key value pairs. They are then injected to default airflow context vars, which in the end are available as environment variables when running tasks dag_id, task_id, execution_date, dag_run_id, try_number are reserved keys.
- Parameters:
context – The context for the task_instance of interest.
DAG policies¶
This policy checks if each DAG has at least one tag defined:
def dag_policy(dag: DAG):
"""Ensure that DAG has at least one tag and skip the DAG with `only_for_beta` tag."""
if not dag.tags:
raise AirflowClusterPolicyViolation(
f"DAG {dag.dag_id} has no tags. At least one tag required. File path: {dag.fileloc}"
if "only_for_beta" in dag.tags:
raise AirflowClusterPolicySkipDag(
f"DAG {dag.dag_id} is not loaded on the production cluster, due to `only_for_beta` tag."
To avoid import cycles, if you use DAG
in type annotations in your cluster policy, be sure to import from airflow.models
and not from airflow
DAG policies are applied after the DAG has been completely loaded, so overriding the default_args
parameter has no effect. If you want to override the default operator settings, use task policies instead.
Task policies¶
Here’s an example of enforcing a maximum timeout policy on every task:
class TimedOperator(BaseOperator, ABC):
timeout: timedelta
def task_policy(task: TimedOperator):
if task.task_type == "HivePartitionSensor":
task.queue = "sensor_queue"
if task.timeout > timedelta(hours=48):
task.timeout = timedelta(hours=48)
You could also implement to protect against common errors, rather than as technical security controls. For example, don’t run tasks without airflow owners:
def task_must_have_owners(task: BaseOperator):
if task.owner and not isinstance(task.owner, str):
raise AirflowClusterPolicyViolation(f"""owner should be a string. Current value: {task.owner!r}""")
if not task.owner or task.owner.lower() == conf.get("operators", "default_owner"):
raise AirflowClusterPolicyViolation(
f"""Task must have non-None non-default owner. Current value: {task.owner}"""
If you have multiple checks to apply, it is best practice to curate these rules in a separate python module and have a single policy / task mutation hook that performs multiple of these custom checks and aggregates the various error messages so that a single AirflowClusterPolicyViolation
can be reported in the UI (and import errors table in the database).
For example, your
might follow this pattern:
TASK_RULES: list[Callable[[BaseOperator], None]] = [
def _check_task_rules(current_task: BaseOperator):
"""Check task rules for given task."""
notices = []
for rule in TASK_RULES:
except AirflowClusterPolicyViolation as ex:
if notices:
notices_list = " * " + "\n * ".join(notices)
raise AirflowClusterPolicyViolation(
f"DAG policy violation (DAG ID: {current_task.dag_id}, Path: {current_task.dag.fileloc}):\n"
def example_task_policy(task: BaseOperator):
"""Ensure Tasks have non-default owners."""
See Configuring local settings for details on how to configure local settings.
Task instance mutation¶
Here’s an example of re-routing tasks that are on their second (or greater) retry to a different queue:
def task_instance_mutation_hook(task_instance: TaskInstance):
if task_instance.try_number >= 1:
task_instance.queue = "retry_queue"
Note that since priority weight is determined dynamically using weight rules, you cannot alter the priority_weight
of a task instance within the mutation hook.