Source code for airflow.providers.google.cloud.openlineage.utils
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromtypingimportTYPE_CHECKING,Anyfromattrimportdefine,fieldifTYPE_CHECKING:fromgoogle.cloud.bigquery.tableimportTablefromairflow.providers.common.compat.openlineage.facetimportDatasetfromairflow.providers.common.compat.openlineage.facetimport(ColumnLineageDatasetFacet,DocumentationDatasetFacet,Fields,InputField,RunFacet,SchemaDatasetFacet,SchemaDatasetFacetFields,)fromairflow.providers.googleimport__version__asprovider_version
[docs]defget_facets_from_bq_table(table:Table)->dict[Any,Any]:"""Get facets from BigQuery table object."""facets={"schema":SchemaDatasetFacet(fields=[SchemaDatasetFacetFields(name=field.name,type=field.field_type,description=field.description)forfieldintable.schema]),"documentation":DocumentationDatasetFacet(description=table.descriptionor""),}returnfacets
[docs]defget_identity_column_lineage_facet(field_names:list[str],input_datasets:list[Dataset],)->ColumnLineageDatasetFacet:""" Get column lineage facet. Simple lineage will be created, where each source column corresponds to single destination column in each input dataset and there are no transformations made. """iffield_namesandnotinput_datasets:raiseValueError("When providing `field_names` You must provide at least one `input_dataset`.")column_lineage_facet=ColumnLineageDatasetFacet(fields={field:Fields(inputFields=[InputField(namespace=dataset.namespace,name=dataset.name,field=field)fordatasetininput_datasets],transformationType="IDENTITY",transformationDescription="identical",)forfieldinfield_names})returncolumn_lineage_facet
@define
[docs]classBigQueryJobRunFacet(RunFacet):""" Facet that represents relevant statistics of bigquery run. This facet is used to provide statistics about bigquery run. :param cached: BigQuery caches query results. Rest of the statistics will not be provided for cached queries. :param billedBytes: How many bytes BigQuery bills for. :param properties: Full property tree of BigQUery run. """
# TODO: remove BigQueryErrorRunFacet in next release@define
[docs]classBigQueryErrorRunFacet(RunFacet):""" Represents errors that can happen during execution of BigqueryExtractor. :param clientError: represents errors originating in bigquery client :param parserError: represents errors that happened during parsing SQL provided to bigquery """
[docs]defget_from_nullable_chain(source:Any,chain:list[str])->Any|None:""" Get object from nested structure of objects, where it's not guaranteed that all keys in the nested structure exist. Intended to replace chain of `dict.get()` statements. Example usage: .. code-block:: python if ( not job._properties.get("statistics") or not job._properties.get("statistics").get("query") or not job._properties.get("statistics").get("query").get("referencedTables") ): return None result = job._properties.get("statistics").get("query").get("referencedTables") becomes: .. code-block:: python result = get_from_nullable_chain(properties, ["statistics", "query", "queryPlan"]) if not result: return None """# chain.pop modifies passed list, this can be unexpectedchain=chain.copy()chain.reverse()try:whilechain:whileisinstance(source,list)andlen(source)==1:source=source[0]next_key=chain.pop()ifisinstance(source,dict):source=source.get(next_key)else:source=getattr(source,next_key)returnsourceexceptAttributeError:returnNone