Using OpenLineage integration¶
OpenLineage is an open framework for data lineage collection and analysis. At its core is an extensible specification that systems can use to interoperate with lineage metadata. Check out OpenLineage docs.
No change to user DAG files is required to use OpenLineage. Basic configuration is needed so that OpenLineage knows where to send events.
Quickstart¶
Note
OpenLineage Provider offers a diverse range of data transport options (http, kafka, file etc.), including the flexibility to create a custom solution. Configuration can be managed through several approaches and there is an extensive array of settings available for users to fine-tune and enhance their use of OpenLineage. For a comprehensive explanation of these features, please refer to the subsequent sections of this document.
This example is a basic demonstration of OpenLineage setup.
Install provider package or add it to
requirements.txt
file.pip install apache-airflow-providers-openlineage
Provide a
Transport
configuration so that OpenLineage knows where to send the events. Withinairflow.cfg
file[openlineage] transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
or with
AIRFLOW__OPENLINEAGE__TRANSPORT
environment variableAIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
That’s it ! OpenLineage events should be sent to the configured backend when DAGs are run.
Usage¶
When enabled and configured, the integration requires no further action from the user. It will automatically:
Collect task input / output metadata (source, schema, etc.).
Collect task run-level metadata (execution time, state, parameters, etc.)
Collect task job-level metadata (owners, type, description, etc.)
Collect task-specific metadata (bigquery job id, python source code, etc.) - depending on the Operator
All this data will be sent as OpenLineage events to the configured backend as described in Job Hierarchy.
Transport setup¶
Primary, and recommended method of configuring OpenLineage Airflow Provider is Airflow configuration (airflow.cfg
file).
All possible configuration options, with example values, can be found in the configuration section.
At minimum, one thing that needs to be set up in every case is Transport
- where do you wish for
your events to end up - for example Marquez.
Transport as JSON string¶
The transport
option in Airflow configuration is used for that purpose.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
AIRFLOW__OPENLINEAGE__TRANSPORT
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}'
If you want to look at OpenLineage events without sending them anywhere, you can set up ConsoleTransport
- the events will end up in task logs.
[openlineage]
transport = {"type": "console"}
Note
For full list of built-in transport types, specific transport’s options or instructions on how to implement your custom transport, refer to Python client documentation.
Transport as config file¶
You can also configure OpenLineage Transport
using a YAML file (f.e. openlineage.yml
).
Provide the path to the YAML file as config_path
option in Airflow configuration.
[openlineage]
config_path = '/path/to/openlineage.yml'
AIRFLOW__OPENLINEAGE__CONFIG_PATH
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__CONFIG_PATH='/path/to/openlineage.yml'
Example content of config YAML file:
transport:
type: http
url: https://backend:5000
endpoint: events/receive
auth:
type: api_key
apiKey: f048521b-dfe8-47cd-9c65-0cb07d57591e
Note
Detailed description of that configuration method, together with example config files, can be found in Python client documentation.
Configuration precedence¶
As there are multiple possible ways of configuring OpenLineage, it’s important to keep in mind the precedence of different configurations. OpenLineage Airflow Provider looks for the configuration in the following order:
Check
config_path
inairflow.cfg
underopenlineage
section (or AIRFLOW__OPENLINEAGE__CONFIG_PATH environment variable)Check
transport
inairflow.cfg
underopenlineage
section (or AIRFLOW__OPENLINEAGE__TRANSPORT environment variable)If all the above options are missing, the OpenLineage Python client used underneath looks for configuration in the order described in this documentation. Please note that using Airflow configuration is encouraged and is the only future proof solution.
Backwards compatibility¶
Warning
Below variables should not be used and can be removed in the future. Consider using Airflow configuration (described above) for a future proof solution.
For backwards compatibility with openlineage-airflow
package, some environment variables are still available:
OPENLINEAGE_DISABLED
is an equivalent ofAIRFLOW__OPENLINEAGE__DISABLED
.OPENLINEAGE_CONFIG
is an equivalent ofAIRFLOW__OPENLINEAGE__CONFIG_PATH
.OPENLINEAGE_NAMESPACE
is an equivalent ofAIRFLOW__OPENLINEAGE__NAMESPACE
.OPENLINEAGE_EXTRACTORS
is an equivalent of settingAIRFLOW__OPENLINEAGE__EXTRACTORS
.OPENLINEAGE_AIRFLOW_DISABLE_SOURCE_CODE
is an equivalent ofAIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE
.OPENLINEAGE_URL
can be used to set up simple http transport. This method has some limitations and may require using other environment variables to achieve desired output. See docs.
Additional Options¶
Namespace¶
It’s very useful to set up OpenLineage namespace for this particular instance.
That way, if you use multiple OpenLineage producers, events coming from them will be logically separated.
If not set, it’s using default
namespace. Provide the name of the namespace as namespace
option in Airflow configuration.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
namespace = 'my-team-airflow-instance'
AIRFLOW__OPENLINEAGE__NAMESPACE
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__NAMESPACE='my-team-airflow-instance'
Timeout¶
To add a layer of isolation between task execution and OpenLineage, adding a level of assurance that OpenLineage execution does not
interfere with task execution in a way other than taking time, OpenLineage methods run in separate process.
The code runs with default timeout of 10 seconds. You can increase this by setting the execution_timeout
value.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
execution_timeout = 60
AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__EXECUTION_TIMEOUT=60
Disable¶
You can disable sending OpenLineage events without uninstalling OpenLineage provider by setting
disabled
option to true
in Airflow configuration.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled = true
AIRFLOW__OPENLINEAGE__DISABLED
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__DISABLED=true
Disable source code¶
Several Operators (f.e. Python, Bash) will by default include their source code in their OpenLineage events.
To prevent that, set disable_source_code
option to true
in Airflow configuration.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disable_source_code = true
AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__DISABLE_SOURCE_CODE=true
Disabled for Operators¶
You can easily exclude some Operators from emitting OpenLineage events by passing a string of semicolon separated
full import paths of Airflow Operators to disable as disabled_for_operators
field in Airflow configuration.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
disabled_for_operators = 'airflow.providers.standard.operators.bash.BashOperator;airflow.operators.python.PythonOperator'
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__DISABLED_FOR_OPERATORS='airflow.providers.standard.operators.bash.BashOperator;airflow.operators.python.PythonOperator'
Full Task Info¶
By default, OpenLineage integration’s AirflowRunFacet - attached on START event for every task instance event - does not contain full serialized task information (parameters to given operator), but only includes select parameters.
However, we allow users to set OpenLineage integration to include full task information. By doing this, rather than serializing only a few known attributes, we exclude certain non-serializable elements and send everything else.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
include_full_task_info = true
AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__INCLUDE_FULL_TASK_INFO=true
Warning
By setting this variable to true, OpenLineage integration does not control the size of event you sent. It can potentially include elements that are megabytes in size or larger, depending on the size of data you pass to the task.
Custom Extractors¶
To use custom Extractors feature, register the extractors by passing
a string of semicolon separated Airflow Operators full import paths to extractors
option in Airflow configuration.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
extractors = full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass
AIRFLOW__OPENLINEAGE__EXTRACTORS
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__EXTRACTORS='full.path.to.ExtractorClass;full.path.to.AnotherExtractorClass'
Custom Run Facets¶
To inject custom run facets, register the custom run facet functions by passing
a string of semicolon separated full import paths to custom_run_facets
option in Airflow configuration.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
custom_run_facets = full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__CUSTOM_RUN_FACETS='full.path.to.get_my_custom_facet;full.path.to.another_custom_facet_function'
Debug Mode¶
You can enable sending additional information in OpenLineage events that can be useful for debugging and
reproducing your environment setup by setting debug_mode
option to true
in Airflow configuration.
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
debug_mode = true
AIRFLOW__OPENLINEAGE__DEBUG_MODE
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__DEBUG_MODE=true
Warning
By setting this variable to true, OpenLineage integration may log and emit extensive details. It should only be enabled temporary for debugging purposes.
Enabling OpenLineage on DAG/task level¶
One can selectively enable OpenLineage for specific DAGs and tasks by using the selective_enable
policy.
To enable this policy, set the selective_enable
option to True in the [openlineage] section of your Airflow configuration file:
[openlineage]
selective_enable = True
AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE
environment variable is an equivalent.
AIRFLOW__OPENLINEAGE__SELECTIVE_ENABLE=true
While selective_enable
enables selective control, the disabled
option still has precedence.
If you set disabled
to True in the configuration, OpenLineage will be disabled for all DAGs and tasks regardless of the selective_enable
setting.
Once the selective_enable
policy is enabled, you can choose to enable OpenLineage
for individual DAGs and tasks using the enable_lineage
and disable_lineage
functions.
Enabling Lineage on a DAG:
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with enable_lineage(DAG(...)):
# Tasks within this DAG will have lineage tracking enabled
MyOperator(...)
AnotherOperator(...)
Enabling Lineage on a Task:
While enabling lineage on a DAG implicitly enables it for all tasks within that DAG, you can still selectively disable it for specific tasks:
from airflow.providers.openlineage.utils.selective_enable import disable_lineage, enable_lineage
with DAG(...) as dag:
t1 = MyOperator(...)
t2 = AnotherOperator(...)
# Enable lineage for the entire DAG
enable_lineage(dag)
# Disable lineage for task t1
disable_lineage(t1)
Enabling lineage on the DAG level automatically enables it for all tasks within that DAG unless explicitly disabled per task.
Enabling lineage on the task level implicitly enables lineage on its DAG. This is because each emitting task sends a ParentRunFacet, which requires the DAG-level lineage to be enabled in some OpenLineage backend systems. Disabling DAG-level lineage while enabling task-level lineage might cause errors or inconsistencies.
Troubleshooting¶
See Troubleshooting for details on how to troubleshoot OpenLineage.
Adding support for custom Operators¶
If you want to add OpenLineage coverage for particular Operator, take a look at Implementing OpenLineage in Operators
Where can I learn more?¶
Check out OpenLineage website.
Visit our GitHub repository.
Watch multiple talks about OpenLineage.
How to contribute¶
We welcome your contributions! OpenLineage is an Open Source project under active development, and we’d love your help!
Sounds fun? Check out our new contributor guide to get started.