XComs¶
XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.
An XCom is identified by a key
(essentially its name), as well as the task_id
and dag_id
it came from. They can have any serializable value (including objects that are decorated with @dataclass
or @attr.define
, see TaskFlow arguments:), but they are only designed for small amounts of data; do not use them to pass around large values, like dataframes.
XComs are explicitly “pushed” and “pulled” to/from their storage using the xcom_push
and xcom_pull
methods on Task Instances.
To push a value within a task called “task-1” that will be used by another task:
# pushes data in any_serializable_value into xcom with key "identifier as string"
task_instance.xcom_push(key="identifier as a string", value=any_serializable_value)
To pull the value that was pushed in the code above in a different task:
# pulls the xcom variable with key "identifier as string" that was pushed from within task-1
task_instance.xcom_pull(key="identifier as string", task_ids="task-1")
Many operators will auto-push their results into an XCom key called return_value
if the do_xcom_push
argument is set to True
(as it is by default), and @task
functions do this as well. xcom_pull
defaults to using return_value
as key if no key is passed to it, meaning it’s possible to write code like this:
# Pulls the return_value XCOM from "pushing_task"
value = task_instance.xcom_pull(task_ids='pushing_task')
You can also use XComs in templates:
SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}
XComs are a relative of Variables, with the main difference being that XComs are per-task-instance and designed for communication within a DAG run, while Variables are global and designed for overall configuration and value sharing.
If you want to push multiple XComs at once or rename the pushed XCom key, you can use set do_xcom_push
and multiple_outputs
arguments to True
, and then return a dictionary of values.
Note
If the first task run is not succeeded then on every retry task XComs will be cleared to make the task run idempotent.
Object Storage XCom Backend¶
The default XCom backend is the BaseXCom
class, which stores XComs in the Airflow database. This is fine for small values, but can be problematic for large values, or for large numbers of XComs.
To enable storing XComs in an object store, you can set the xcom_backend
configuration option to airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
.
You will also need to set xcom_objectstorage_path
to the desired location. The connection id is obtained from the user part of the url that you will provide, e.g. xcom_objectstorage_path = s3://conn_id@mybucket/key
. Furthermore, xcom_objectstorage_threshold
is required
to be something larger than -1. Any object smaller than the threshold in bytes will be stored in the database and anything larger will be be
put in object storage. This will allow a hybrid setup. If an xcom is stored on object storage a reference will be
saved in the database. Finally, you can set xcom_objectstorage_compression
to fsspec supported compression methods like zip
or snappy
to
compress the data before storing it in object storage.
So for example the following configuration will store anything above 1MB in S3 and will compress it using gzip:
[core]
xcom_backend = airflow.providers.common.io.xcom.backend.XComObjectStorageBackend
[common.io]
xcom_objectstorage_path = s3://conn_id@mybucket/key
xcom_objectstorage_threshold = 1048576
xcom_objectstorage_compression = gzip
Note
Compression requires the support for it is installed in your python environment. For example, to use snappy
compression, you need to install python-snappy
. Zip, gzip and bz2 work out of the box.
Custom XCom Backends¶
The XCom system has interchangeable backends, and you can set which backend is being used via the xcom_backend
configuration option.
If you want to implement your own backend, you should subclass BaseXCom
, and override the serialize_value
and deserialize_value
methods.
There is also an orm_deserialize_value
method that is called whenever the XCom objects are rendered for UI or reporting purposes; if you have large or expensive-to-retrieve values in your XComs, you should override this method to avoid calling that code (and instead return a lighter, incomplete representation) so the UI remains responsive.
You can also override the clear
method and use it when clearing results for given DAGs and tasks. This allows the custom XCom backend to process the data lifecycle easier.
Working with Custom XCom Backends in Containers¶
Depending on where Airflow is deployed i.e., local, Docker, K8s, etc. it can be useful to be assured that a custom XCom backend is actually being initialized. For example, the complexity of the container environment can make it more difficult to determine if your backend is being loaded correctly during container deployment. Luckily the following guidance can be used to assist you in building confidence in your custom XCom implementation.
Firstly, if you can exec into a terminal in the container then you should be able to do:
from airflow.models.xcom import XCom
print(XCom.__name__)
which will print the actual class that is being used.
You can also examine Airflow’s configuration:
from airflow.settings import conf
conf.get("core", "xcom_backend")
Working with Custom Backends in K8s via Helm¶
Running custom XCom backends in K8s will introduce even more complexity to your Airflow deployment. Put simply, sometimes things go wrong which can be difficult to debug.
For example, if you define a custom XCom backend in the Chart values.yaml
(via the xcom_backend
configuration) and Airflow fails to load the class, the entire Chart deployment will fail with each pod container attempting to restart time and time again.
When deploying in K8s your custom XCom backend needs to be reside in a config
directory otherwise it cannot be located during Chart deployment.
An observed problem is that it is very difficult to acquire logs from the container because there is a very small window of availability where the trace can be obtained. The only way you can determine the root cause is if you are fortunate enough to query and acquire the container logs at the right time. This in turn prevents the entire Helm chart from deploying successfully.