## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimportloggingfromtypingimportAnyfromsqlalchemyimportBoolean,Column,Integer,String,Textfromsqlalchemy.dialects.mysqlimportMEDIUMTEXTfromsqlalchemy.ext.declarativeimportdeclared_attrfromsqlalchemy.ormimportSession,reconstructor,synonymfromairflow.configurationimportensure_secrets_loadedfromairflow.models.baseimportID_LEN,Basefromairflow.models.cryptoimportget_fernetfromairflow.secrets.metastoreimportMetastoreBackendfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.log.secrets_maskerimportmask_secretfromairflow.utils.sessionimportprovide_session
[docs]classVariable(Base,LoggingMixin):""" Variables are a generic way to store and retrieve arbitrary content or settings as a simple key value store within Airflow. """
[docs]def__repr__(self):# Hiding the valuereturnf"{self.key} : {self._val}"
[docs]defget_val(self):"""Get Airflow Variable from Metadata DB and decode it using the Fernet Key"""fromcryptography.fernetimportInvalidTokenasInvalidFernetTokenifself._valisnotNoneandself.is_encrypted:try:fernet=get_fernet()returnfernet.decrypt(bytes(self._val,"utf-8")).decode()exceptInvalidFernetToken:self.log.error("Can't decrypt _val for key=%s, invalid token or value",self.key)returnNoneexceptException:self.log.error("Can't decrypt _val for key=%s, FERNET_KEY configuration missing",self.key)returnNoneelse:returnself._val
[docs]defset_val(self,value):"""Encode the specified value with Fernet Key and store it in Variables Table."""ifvalueisnotNone:fernet=get_fernet()self._val=fernet.encrypt(bytes(value,"utf-8")).decode()self.is_encrypted=fernet.is_encrypted
@declared_attr
[docs]defval(cls):"""Get Airflow Variable from Metadata DB and decode it using the Fernet Key"""returnsynonym("_val",descriptor=property(cls.get_val,cls.set_val))
@classmethod
[docs]defsetdefault(cls,key,default,description=None,deserialize_json=False):""" Like a Python builtin dict object, setdefault returns the current value for a key, and if it isn't there, stores the default value and returns it. :param key: Dict key for this Variable :param default: Default value to set and return if the variable isn't already in the DB :param deserialize_json: Store this as a JSON encoded value in the DB and un-encode it when retrieving a value :return: Mixed """obj=Variable.get(key,default_var=None,deserialize_json=deserialize_json)ifobjisNone:ifdefaultisnotNone:Variable.set(key,default,description=description,serialize_json=deserialize_json)returndefaultelse:raiseValueError("Default Value must be set")else:returnobj
@classmethod
[docs]defget(cls,key:str,default_var:Any=__NO_DEFAULT_SENTINEL,deserialize_json:bool=False,)->Any:""" Gets a value for an Airflow Variable Key :param key: Variable Key :param default_var: Default value of the Variable if the Variable doesn't exist :param deserialize_json: Deserialize the value to a Python dict """var_val=Variable.get_variable_from_secrets(key=key)ifvar_valisNone:ifdefault_varisnotcls.__NO_DEFAULT_SENTINEL:returndefault_varelse:raiseKeyError(f"Variable {key} does not exist")else:ifdeserialize_json:obj=json.loads(var_val)mask_secret(var_val,key)returnobjelse:mask_secret(var_val,key)returnvar_val
@classmethod@provide_session
[docs]defset(cls,key:str,value:Any,description:str|None=None,serialize_json:bool=False,session:Session=None,):""" Sets a value for an Airflow Variable with a given Key. This operation will overwrite an existing variable. :param key: Variable Key :param value: Value to set for the Variable :param description: Description of the Variable :param serialize_json: Serialize the value to a JSON string :param session: SQL Alchemy Sessions """# check if the secret exists in the custom secrets backend.cls.check_for_write_conflict(key)ifserialize_json:stored_value=json.dumps(value,indent=2)else:stored_value=str(value)Variable.delete(key,session=session)session.add(Variable(key=key,val=stored_value,description=description))session.flush()
@classmethod@provide_session
[docs]defupdate(cls,key:str,value:Any,serialize_json:bool=False,session:Session=None,):""" Updates a given Airflow Variable with the Provided value :param key: Variable Key :param value: Value to set for the Variable :param serialize_json: Serialize the value to a JSON string :param session: SQL Alchemy Session """cls.check_for_write_conflict(key)ifcls.get_variable_from_secrets(key=key)isNone:raiseKeyError(f"Variable {key} does not exist")obj=session.query(cls).filter(cls.key==key).first()ifobjisNone:raiseAttributeError(f"Variable {key} does not exist in the Database and cannot be updated.")cls.set(key,value,description=obj.description,serialize_json=serialize_json)
@classmethod@provide_session
[docs]defdelete(cls,key:str,session:Session=None)->int:""" Delete an Airflow Variable for a given key :param key: Variable Key :param session: SQL Alchemy Sessions """returnsession.query(cls).filter(cls.key==key).delete()
[docs]defcheck_for_write_conflict(key:str)->None:""" Logs a warning if a variable exists outside of the metastore. If we try to write a variable to the metastore while the same key exists in an environment variable or custom secrets backend, then subsequent reads will not read the set value. :param key: Variable Key """forsecrets_backendinensure_secrets_loaded():ifnotisinstance(secrets_backend,MetastoreBackend):try:var_val=secrets_backend.get_variable(key=key)ifvar_valisnotNone:log.warning("The variable {key} is defined in the {cls} secrets backend, which takes ""precedence over reading from the database. The value in the database will be ""updated, but to read it you have to delete the conflicting variable ""from {cls}".format(key=key,cls=secrets_backend.__class__.__name__))returnexceptException:log.exception("Unable to retrieve variable from secrets backend (%s). ""Checking subsequent secrets backend.",type(secrets_backend).__name__,)returnNone
@staticmethod
[docs]defget_variable_from_secrets(key:str)->str|None:""" Get Airflow Variable by iterating over all Secret Backends. :param key: Variable Key :return: Variable Value """forsecrets_backendinensure_secrets_loaded():try:var_val=secrets_backend.get_variable(key=key)ifvar_valisnotNone:returnvar_valexceptException:log.exception("Unable to retrieve variable from secrets backend (%s). ""Checking subsequent secrets backend.",type(secrets_backend).__name__,)returnNone