# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportcontextlibimportcopyimportdatetimeimportjsonimportloggingimportwarningsfromtypingimportTYPE_CHECKING,Any,ClassVar,ItemsView,Iterable,MutableMapping,ValuesViewfrompendulum.parsingimportparse_iso8601fromairflow.exceptionsimportAirflowException,ParamValidationError,RemovedInAirflow3Warningfromairflow.utilsimporttimezonefromairflow.utils.mixinsimportResolveMixinfromairflow.utils.typesimportNOTSET,ArgNotSetifTYPE_CHECKING:fromairflow.models.dagimportDAGfromairflow.models.dagrunimportDagRunfromairflow.models.operatorimportOperatorfromairflow.serialization.pydantic.dag_runimportDagRunPydanticfromairflow.utils.contextimportContext
[docs]classParam:""" Class to hold the default value of a Param and rule set to do the validations. Without the rule set it always validates and returns the default value. :param default: The value this Param object holds :param description: Optional help text for the Param :param schema: The validation schema of the Param, if not given then all kwargs except default & description will form the schema """
@staticmethoddef_warn_if_not_json(value):try:json.dumps(value)exceptException:warnings.warn("The use of non-json-serializable params is deprecated and will be removed in ""a future release",RemovedInAirflow3Warning,)@staticmethoddef_warn_if_not_rfc3339_dt(value):"""Fallback to iso8601 datetime validation if rfc3339 failed."""try:iso8601_value=parse_iso8601(value)exceptException:returnNoneifnotisinstance(iso8601_value,datetime.datetime):returnNonewarnings.warn(f"The use of non-RFC3339 datetime: {value!r} is deprecated ""and will be removed in a future release",RemovedInAirflow3Warning,)iftimezone.is_naive(iso8601_value):warnings.warn("The use naive datetime is deprecated and will be removed in a future release",RemovedInAirflow3Warning,)returnvalue
[docs]defresolve(self,value:Any=NOTSET,suppress_exception:bool=False)->Any:""" Run the validations and returns the Param's final value. May raise ValueError on failed validations, or TypeError if no value is passed and no value already exists. We first check that value is json-serializable; if not, warn. In future release we will require the value to be json-serializable. :param value: The value to be updated for the Param :param suppress_exception: To raise an exception or not when the validations fails. If true and validations fails, the return value would be None. """importjsonschemafromjsonschemaimportFormatCheckerfromjsonschema.exceptionsimportValidationErrorifvalueisnotNOTSET:self._warn_if_not_json(value)final_val=self.valueifvalueisNOTSETelsevalueifisinstance(final_val,ArgNotSet):ifsuppress_exception:returnNoneraiseParamValidationError("No value passed and Param has no default value")try:jsonschema.validate(final_val,self.schema,format_checker=FormatChecker())exceptValidationErroraserr:iferr.schema.get("format")=="date-time":rfc3339_value=self._warn_if_not_rfc3339_dt(final_val)ifrfc3339_value:self.value=rfc3339_valuereturnrfc3339_valueifsuppress_exception:returnNoneraiseParamValidationError(err)fromNoneself.value=final_valreturnfinal_val
[docs]defdump(self)->dict:"""Dump the Param as a dictionary."""out_dict:dict[str,str|None]={self.CLASS_IDENTIFIER:f"{self.__module__}.{self.__class__.__name__}"}out_dict.update(self.__dict__)# Ensure that not set is translated to Noneifself.valueisNOTSET:out_dict["value"]=Nonereturnout_dict
[docs]defdeserialize(data:dict[str,Any],version:int)->Param:ifversion>Param.__version__:raiseTypeError("serialized version > class version")returnParam(default=data["value"],description=data["description"],schema=data["schema"])
[docs]classParamsDict(MutableMapping[str,Any]):""" Class to hold all params for dags or tasks. All the keys are strictly string and values are converted into Param's object if they are not already. This class is to replace param's dictionary implicitly and ideally not needed to be used directly. :param dict_obj: A dict or dict like object to init ParamsDict :param suppress_exception: Flag to suppress value exceptions while initializing the ParamsDict """
[docs]def__setitem__(self,key:str,value:Any)->None:""" Override for dictionary's ``setitem`` method to ensure all values are of Param's type only. :param key: A key which needs to be inserted or updated in the dict :param value: A value which needs to be set against the key. It could be of any type but will be converted and stored as a Param object eventually. """ifisinstance(value,Param):param=valueelifkeyinself.__dict:param=self.__dict[key]try:param.resolve(value=value,suppress_exception=self.suppress_exception)exceptParamValidationErrorasve:raiseParamValidationError(f"Invalid input for param {key}: {ve}")fromNoneelse:# if the key isn't there already and if the value isn't of Param type create a new Param objectparam=Param(value)self.__dict[key]=param
[docs]def__getitem__(self,key:str)->Any:""" Override for dictionary's ``getitem`` method to call the resolve method after fetching the key. :param key: The key to fetch """param=self.__dict[key]returnparam.resolve(suppress_exception=self.suppress_exception)
[docs]defget_param(self,key:str)->Param:"""Get the internal :class:`.Param` object for this key."""returnself.__dict[key]
[docs]defdump(self)->dict[str,Any]:"""Dump the ParamsDict object as a dictionary, while suppressing exceptions."""return{k:v.resolve(suppress_exception=True)fork,vinself.items()}
[docs]defvalidate(self)->dict[str,Any]:"""Validate & returns all the Params object stored in the dictionary."""resolved_dict={}try:fork,vinself.items():resolved_dict[k]=v.resolve(suppress_exception=self.suppress_exception)exceptParamValidationErrorasve:raiseParamValidationError(f"Invalid input for param {k}: {ve}")fromNonereturnresolved_dict
[docs]defdeserialize(data:dict,version:int)->ParamsDict:ifversion>ParamsDict.__version__:raiseTypeError("serialized version > class version")returnParamsDict(data)
[docs]classDagParam(ResolveMixin):"""DAG run parameter reference. This binds a simple Param object to a name within a DAG instance, so that it can be resolved during the runtime via the ``{{ context }}`` dictionary. The ideal use case of this class is to implicitly convert args passed to a method decorated by ``@dag``. It can be used to parameterize a DAG. You can overwrite its value by setting it on conf when you trigger your DagRun. This can also be used in templates by accessing ``{{ context.params }}``. **Example**: with DAG(...) as dag: EmailOperator(subject=dag.param('subject', 'Hi from Airflow!')) :param current_dag: Dag being used for parameter. :param name: key value which is used to set the parameter :param default: Default value used if no parameter was set. """def__init__(self,current_dag:DAG,name:str,default:Any=NOTSET):ifdefaultisnotNOTSET:current_dag.params[name]=defaultself._name=nameself._default=defaultdefiter_references(self)->Iterable[tuple[Operator,str]]:return()
[docs]defresolve(self,context:Context)->Any:"""Pull DagParam value from DagRun context. This method is run during ``op.execute()``."""withcontextlib.suppress(KeyError):returncontext["dag_run"].conf[self._name]ifself._defaultisnotNOTSET:returnself._defaultwithcontextlib.suppress(KeyError):returncontext["params"][self._name]raiseAirflowException(f"No value could be resolved for parameter {self._name}")
[docs]defprocess_params(dag:DAG,task:Operator,dag_run:DagRun|DagRunPydantic|None,*,suppress_exception:bool,)->dict[str,Any]:"""Merge, validate params, and convert them into a simple dict."""fromairflow.configurationimportconfparams=ParamsDict(suppress_exception=suppress_exception)withcontextlib.suppress(AttributeError):params.update(dag.params)iftask.params:params.update(task.params)ifconf.getboolean("core","dag_run_conf_overrides_params")anddag_runanddag_run.conf:logger.debug("Updating task params (%s) with DagRun.conf (%s)",params,dag_run.conf)params.update(dag_run.conf)returnparams.validate()