Lineage¶
Note
Lineage support is very experimental and subject to change.
Airflow provides a powerful feature for tracking data lineage not only between tasks but also from hooks used within those tasks. This functionality helps you understand how data flows throughout your Airflow pipelines.
A global instance of HookLineageCollector
serves as the central hub for collecting lineage information.
Hooks can send details about assets they interact with to this collector.
The collector then uses this data to construct AIP-60 compliant Assets, a standard format for describing assets.
from airflow.lineage.hook import get_hook_lineage_collector
class CustomHook(BaseHook):
def run(self):
# run actual code
collector = get_hook_lineage_collector()
collector.add_input_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/in"})
collector.add_output_asset(self, asset_kwargs={"scheme": "file", "path": "/tmp/out"})
Lineage data collected by the HookLineageCollector
can be accessed using an instance of HookLineageReader
,
which is registered in an Airflow plugin.
from airflow.lineage.hook_lineage import HookLineageReader
from airflow.plugins_manager import AirflowPlugin
class CustomHookLineageReader(HookLineageReader):
def get_inputs(self):
return self.lineage_collector.collected_assets.inputs
class HookLineageCollectionPlugin(AirflowPlugin):
name = "HookLineageCollectionPlugin"
hook_lineage_readers = [CustomHookLineageReader]
If no HookLineageReader
is registered within Airflow, a default NoOpCollector
is used instead.
This collector does not create AIP-60 compliant assets or collect lineage information.