# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromtypingimportTYPE_CHECKING,Any,Iterator,List,Optional,Sequence,Unionfromairflow.exceptionsimportAirflowExceptionfromairflow.models.abstractoperatorimportAbstractOperatorfromairflow.models.taskmixinimportDAGNode,DependencyMixinfromairflow.models.xcomimportXCOM_RETURN_KEYfromairflow.utils.contextimportContextfromairflow.utils.edgemodifierimportEdgeModifierfromairflow.utils.sessionimportNEW_SESSION,provide_sessionfromairflow.utils.typesimportNOTSETifTYPE_CHECKING:fromsqlalchemy.ormimportSessionfromairflow.models.operatorimportOperator
[docs]classXComArg(DependencyMixin):""" Class that represents a XCom push from a previous operator. Defaults to "return_value" as only key. Current implementation supports xcomarg >> op xcomarg << op op >> xcomarg (by BaseOperator code) op << xcomarg (by BaseOperator code) **Example**: The moment you get a result from any operator (decorated or regular) you can :: any_op = AnyOperator() xcomarg = XComArg(any_op) # or equivalently xcomarg = any_op.output my_op = MyOperator() my_op >> xcomarg This object can be used in legacy Operators via Jinja. **Example**: You can make this result to be part of any generated string :: any_op = AnyOperator() xcomarg = any_op.output op1 = MyOperator(my_text_message=f"the value is {xcomarg}") op2 = MyOperator(my_text_message=f"the value is {xcomarg['topic']}") :param operator: operator to which the XComArg belongs to :param key: key value which is used for xcom_pull (key in the XCom table) """def__init__(self,operator:"Operator",key:str=XCOM_RETURN_KEY):self.operator=operatorself.key=key
[docs]def__getitem__(self,item:str)->"XComArg":"""Implements xcomresult['some_result_key']"""ifnotisinstance(item,str):raiseValueError(f"XComArg only supports str lookup, received {type(item).__name__}")returnXComArg(operator=self.operator,key=item)
[docs]def__iter__(self):"""Override iterable protocol to raise error explicitly. The default ``__iter__`` implementation in Python calls ``__getitem__`` with 0, 1, 2, etc. until it hits an ``IndexError``. This does not work well with our custom ``__getitem__`` implementation, and results in poor DAG-writing experience since a misplaced ``*`` expansion would create an infinite loop consuming the entire DAG parser. This override catches the error eagerly, so an incorrectly implemented DAG fails fast and avoids wasting resources on nonsensical iterating. """raiseTypeError(f"{self.__class__.__name__!r} object is not iterable")
[docs]def__str__(self):""" Backward compatibility for old-style jinja used in Airflow Operators **Example**: to use XComArg at BashOperator:: BashOperator(cmd=f"... { xcomarg } ...") :return: """xcom_pull_kwargs=[f"task_ids='{self.operator.task_id}'",f"dag_id='{self.operator.dag.dag_id}'",]ifself.keyisnotNone:xcom_pull_kwargs.append(f"key='{self.key}'")xcom_pull_kwargs=", ".join(xcom_pull_kwargs)# {{{{ are required for escape {{ in f-stringxcom_pull=f"{{{{ task_instance.xcom_pull({xcom_pull_kwargs}) }}}}"returnxcom_pull
@property
[docs]defroots(self)->List[DAGNode]:"""Required by TaskMixin"""return[self.operator]
@property
[docs]defleaves(self)->List[DAGNode]:"""Required by TaskMixin"""return[self.operator]
[docs]defset_upstream(self,task_or_task_list:Union[DependencyMixin,Sequence[DependencyMixin]],edge_modifier:Optional[EdgeModifier]=None,):"""Proxy to underlying operator set_upstream method. Required by TaskMixin."""self.operator.set_upstream(task_or_task_list,edge_modifier)
[docs]defset_downstream(self,task_or_task_list:Union[DependencyMixin,Sequence[DependencyMixin]],edge_modifier:Optional[EdgeModifier]=None,):"""Proxy to underlying operator set_downstream method. Required by TaskMixin."""self.operator.set_downstream(task_or_task_list,edge_modifier)
@provide_session
[docs]defresolve(self,context:Context,session:"Session"=NEW_SESSION)->Any:""" Pull XCom value for the existing arg. This method is run during ``op.execute()`` in respectable context. """result=context["ti"].xcom_pull(task_ids=self.operator.task_id,key=str(self.key),default=NOTSET,session=session)ifresultisNOTSET:raiseAirflowException(f'XComArg result from {self.operator.task_id} at {context["ti"].dag_id} 'f'with key="{self.key}" is not found!')returnresult
@staticmethod
[docs]defiter_xcom_args(arg:Any)->Iterator["XComArg"]:"""Return XComArg instances in an arbitrary value. This recursively traverse ``arg`` and look for XComArg instances in any collection objects, and instances with ``template_fields`` set. """ifisinstance(arg,XComArg):yieldargelifisinstance(arg,(tuple,set,list)):foreleminarg:yield fromXComArg.iter_xcom_args(elem)elifisinstance(arg,dict):foreleminarg.values():yield fromXComArg.iter_xcom_args(elem)elifisinstance(arg,AbstractOperator):foreleminarg.template_fields:yield fromXComArg.iter_xcom_args(elem)
@staticmethod
[docs]defapply_upstream_relationship(op:"Operator",arg:Any):"""Set dependency for XComArgs. This looks for XComArg objects in ``arg`` "deeply" (looking inside collections objects and classes decorated with ``template_fields``), and sets the relationship to ``op`` on any found. """forrefinXComArg.iter_xcom_args(arg):op.set_upstream(ref.operator)