## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#importwarningsfromtypingimportDict,Optional,Sequence,Set,Tuplefromflaskimportcurrent_app,gfromflask_appbuilder.security.sqlaimportmodelsassqla_modelsfromflask_appbuilder.security.sqla.managerimportSecurityManagerfromflask_appbuilder.security.sqla.modelsimportPermissionView,Role,Userfromsqlalchemyimportor_fromsqlalchemy.ormimportjoinedloadfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportDagBag,DagModelfromairflow.securityimportpermissionsfromairflow.utils.log.logging_mixinimportLoggingMixinfromairflow.utils.sessionimportprovide_sessionfromairflow.www.utilsimportCustomSQLAInterfacefromairflow.www.viewsimport(CustomPermissionModelView,CustomPermissionViewModelView,CustomResetMyPasswordView,CustomResetPasswordView,CustomRoleModelView,CustomUserDBModelView,CustomUserInfoEditView,CustomUserLDAPModelView,CustomUserOAuthModelView,CustomUserOIDModelView,CustomUserRemoteUserModelView,CustomUserStatsChartView,CustomViewMenuModelView,)EXISTING_ROLES={'Admin','Viewer','User','Op','Public',}classAirflowSecurityManager(SecurityManager,LoggingMixin):# pylint: disable=too-many-public-methods"""Custom security manager, which introduces a permission model adapted to Airflow"""############################################################################ PERMISSIONS############################################################################ [START security_viewer_perms]VIEWER_PERMISSIONS=[(permissions.ACTION_CAN_READ,permissions.RESOURCE_AUDIT_LOG),(permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG),(permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG_DEPENDENCIES),(permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG_CODE),(permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG_RUN),(permissions.ACTION_CAN_READ,permissions.RESOURCE_IMPORT_ERROR),(permissions.ACTION_CAN_READ,permissions.RESOURCE_JOB),(permissions.ACTION_CAN_READ,permissions.RESOURCE_MY_PASSWORD),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_MY_PASSWORD),(permissions.ACTION_CAN_READ,permissions.RESOURCE_MY_PROFILE),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_MY_PROFILE),(permissions.ACTION_CAN_READ,permissions.RESOURCE_PLUGIN),(permissions.ACTION_CAN_READ,permissions.RESOURCE_SLA_MISS),(permissions.ACTION_CAN_READ,permissions.RESOURCE_TASK_INSTANCE),(permissions.ACTION_CAN_READ,permissions.RESOURCE_TASK_LOG),(permissions.ACTION_CAN_READ,permissions.RESOURCE_XCOM),(permissions.ACTION_CAN_READ,permissions.RESOURCE_WEBSITE),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_BROWSE_MENU),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_DAG_RUN),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_DOCS),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_DOCS_MENU),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_JOB),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_AUDIT_LOG),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_PLUGIN),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_SLA_MISS),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_TASK_INSTANCE),]# [END security_viewer_perms]# [START security_user_perms]USER_PERMISSIONS=[(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_DAG),(permissions.ACTION_CAN_CREATE,permissions.RESOURCE_TASK_INSTANCE),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_TASK_INSTANCE),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_TASK_INSTANCE),(permissions.ACTION_CAN_CREATE,permissions.RESOURCE_DAG_RUN),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG_RUN),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_DAG_RUN),]# [END security_user_perms]# [START security_op_perms]OP_PERMISSIONS=[(permissions.ACTION_CAN_READ,permissions.RESOURCE_CONFIG),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_ADMIN_MENU),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_CONNECTION),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_POOL),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_VARIABLE),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_XCOM),(permissions.ACTION_CAN_CREATE,permissions.RESOURCE_CONNECTION),(permissions.ACTION_CAN_READ,permissions.RESOURCE_CONNECTION),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_CONNECTION),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_CONNECTION),(permissions.ACTION_CAN_CREATE,permissions.RESOURCE_POOL),(permissions.ACTION_CAN_READ,permissions.RESOURCE_POOL),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_POOL),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_POOL),(permissions.ACTION_CAN_READ,permissions.RESOURCE_PROVIDER),(permissions.ACTION_CAN_CREATE,permissions.RESOURCE_VARIABLE),(permissions.ACTION_CAN_READ,permissions.RESOURCE_VARIABLE),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_VARIABLE),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_VARIABLE),(permissions.ACTION_CAN_DELETE,permissions.RESOURCE_XCOM),]# [END security_op_perms]ADMIN_PERMISSIONS=[(permissions.ACTION_CAN_READ,permissions.RESOURCE_TASK_RESCHEDULE),(permissions.ACTION_CAN_ACCESS_MENU,permissions.RESOURCE_TASK_RESCHEDULE),(permissions.ACTION_CAN_READ,permissions.RESOURCE_PASSWORD),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_PASSWORD),]# global view-menu for dag-level accessDAG_VMS={permissions.RESOURCE_DAG}READ_DAG_PERMS={permissions.ACTION_CAN_READ}DAG_PERMS=permissions.DAG_PERMS############################################################################ DEFAULT ROLE CONFIGURATIONS###########################################################################ROLE_CONFIGS=[{'role':'Public','perms':[]},{'role':'Viewer','perms':VIEWER_PERMISSIONS},{'role':'User','perms':VIEWER_PERMISSIONS+USER_PERMISSIONS,},{'role':'Op','perms':VIEWER_PERMISSIONS+USER_PERMISSIONS+OP_PERMISSIONS,},{'role':'Admin','perms':VIEWER_PERMISSIONS+USER_PERMISSIONS+OP_PERMISSIONS+ADMIN_PERMISSIONS,},]permissionmodelview=CustomPermissionModelViewpermissionviewmodelview=CustomPermissionViewModelViewrolemodelview=CustomRoleModelViewviewmenumodelview=CustomViewMenuModelViewuserdbmodelview=CustomUserDBModelViewresetmypasswordview=CustomResetMyPasswordViewresetpasswordview=CustomResetPasswordViewuserinfoeditview=CustomUserInfoEditViewuserldapmodelview=CustomUserLDAPModelViewuseroauthmodelview=CustomUserOAuthModelViewuserremoteusermodelview=CustomUserRemoteUserModelViewuseroidmodelview=CustomUserOIDModelViewuserstatschartview=CustomUserStatsChartViewdef__init__(self,appbuilder):super().__init__(appbuilder)# Go and fix up the SQLAInterface used from the stock one to our subclass.# This is needed to support the "hack" where we had to edit# FieldConverter.conversion_table in place in airflow.www.utilsforattrindir(self):ifnotattr.endswith('view'):continueview=getattr(self,attr,None)ifnotviewornotgetattr(view,'datamodel',None):continueview.datamodel=CustomSQLAInterface(view.datamodel.obj)self.perms=Nonedefinit_role(self,role_name,perms):""" Initialize the role with the permissions and related view-menus. :param role_name: :param perms: :return: """warnings.warn("`init_role` has been deprecated. Please use `bulk_sync_roles` instead.",DeprecationWarning,stacklevel=2,)self.bulk_sync_roles([{'role':role_name,'perms':perms}])defbulk_sync_roles(self,roles):"""Sync the provided roles and permissions."""existing_roles=self._get_all_roles_with_permissions()pvs=self._get_all_non_dag_permissionviews()forconfiginroles:role_name=config['role']perms=config['perms']role=existing_roles.get(role_name)orself.add_role(role_name)forperm_name,view_nameinperms:perm_view=pvs.get((perm_name,view_name))orself.add_permission_view_menu(perm_name,view_name)ifperm_viewnotinrole.permissions:self.add_permission_role(role,perm_view)defadd_permissions(self,role,perms):"""Adds resource permissions to a given role."""forperm_name,view_nameinperms:perm_view=self.add_permission_view_menu(perm_name,view_name)self.add_permission_role(role,perm_view)defdelete_role(self,role_name):""" Delete the given Role :param role_name: the name of a role in the ab_role table """session=self.get_sessionrole=session.query(sqla_models.Role).filter(sqla_models.Role.name==role_name).first()ifrole:self.log.info("Deleting role '%s'",role_name)session.delete(role)session.commit()else:raiseAirflowException(f"Role named '{role_name}' does not exist")@staticmethoddefget_user_roles(user=None):""" Get all the roles associated with the user. :param user: the ab_user in FAB model. :return: a list of roles associated with the user. """ifuserisNone:user=g.userifuser.is_anonymous:public_role=current_app.appbuilder.get_app.config["AUTH_ROLE_PUBLIC"]return[current_app.appbuilder.sm.find_role(public_role)]ifpublic_roleelse[]returnuser.rolesdefget_current_user_permissions(self):"""Returns permissions for logged in user as a set of tuples with the perm name and view menu name"""perms_views=set()forroleinself.get_user_roles():perms_views.update({(perm_view.permission.name,perm_view.view_menu.name)forperm_viewinrole.permissions})returnperms_viewsdefget_readable_dags(self,user):"""Gets the DAGs readable by authenticated user."""returnself.get_accessible_dags([permissions.ACTION_CAN_READ],user)defget_editable_dags(self,user):"""Gets the DAGs editable by authenticated user."""returnself.get_accessible_dags([permissions.ACTION_CAN_EDIT],user)defget_readable_dag_ids(self,user)->Set[str]:"""Gets the DAG IDs readable by authenticated user."""return{dag.dag_idfordaginself.get_readable_dags(user)}defget_editable_dag_ids(self,user)->Set[str]:"""Gets the DAG IDs editable by authenticated user."""return{dag.dag_idfordaginself.get_editable_dags(user)}defget_accessible_dag_ids(self,user)->Set[str]:"""Gets the DAG IDs editable or readable by authenticated user."""accessible_dags=self.get_accessible_dags([permissions.ACTION_CAN_EDIT,permissions.ACTION_CAN_READ],user)return{dag.dag_idfordaginaccessible_dags}@provide_sessiondefget_accessible_dags(self,user_actions,user,session=None):"""Generic function to get readable or writable DAGs for user."""ifuser.is_anonymous:roles=self.get_user_roles(user)else:user_query=(session.query(User).options(joinedload(User.roles).subqueryload(Role.permissions).options(joinedload(PermissionView.permission),joinedload(PermissionView.view_menu))).filter(User.id==user.id).first())roles=user_query.rolesresources=set()forroleinroles:forpermissioninrole.permissions:action=permission.permission.nameifactionnotinuser_actions:continueresource=permission.view_menu.nameifresource==permissions.RESOURCE_DAG:returnsession.query(DagModel)ifresource.startswith(permissions.RESOURCE_DAG_PREFIX):resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX):])else:resources.add(resource)returnsession.query(DagModel).filter(DagModel.dag_id.in_(resources))defcan_access_some_dags(self,action:str,dag_id:Optional[str]=None)->bool:"""Checks if user has read or write access to some dags."""ifdag_idanddag_id!='~':returnself.has_access(action,permissions.resource_name_for_dag(dag_id))user=g.userifaction==permissions.ACTION_CAN_READ:returnany(self.get_readable_dags(user))returnany(self.get_editable_dags(user))defcan_read_dag(self,dag_id,user=None)->bool:"""Determines whether a user has DAG read access."""ifnotuser:user=g.userdag_resource_name=permissions.resource_name_for_dag(dag_id)returnself._has_view_access(user,permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG)orself._has_view_access(user,permissions.ACTION_CAN_READ,dag_resource_name)defcan_edit_dag(self,dag_id,user=None)->bool:"""Determines whether a user has DAG edit access."""ifnotuser:user=g.userdag_resource_name=permissions.resource_name_for_dag(dag_id)returnself._has_view_access(user,permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG)orself._has_view_access(user,permissions.ACTION_CAN_EDIT,dag_resource_name)defprefixed_dag_id(self,dag_id):"""Returns the permission name for a DAG id."""warnings.warn("`prefixed_dag_id` has been deprecated. ""Please use `airflow.security.permissions.resource_name_for_dag` instead.",DeprecationWarning,stacklevel=2,)returnpermissions.resource_name_for_dag(dag_id)defis_dag_resource(self,resource_name):"""Determines if a permission belongs to a DAG or all DAGs."""ifresource_name==permissions.RESOURCE_DAG:returnTruereturnresource_name.startswith(permissions.RESOURCE_DAG_PREFIX)defhas_access(self,permission,resource,user=None)->bool:""" Verify whether a given user could perform certain permission (e.g can_read, can_write) on the given resource. :param permission: permission on resource (e.g can_read, can_edit). :type permission: str :param resource: name of view-menu or resource. :type resource: str :param user: user name :type user: str :return: a bool whether user could perform certain permission on the resource. :rtype bool """ifnotuser:user=g.userifuser.is_anonymous:user.roles=self.get_user_roles(user)has_access=self._has_view_access(user,permission,resource)# FAB built-in view access method. Won't work for AllDag access.ifself.is_dag_resource(resource):ifpermission==permissions.ACTION_CAN_READ:has_access|=self.can_read_dag(resource,user)elifpermission==permissions.ACTION_CAN_EDIT:has_access|=self.can_edit_dag(resource,user)returnhas_accessdef_get_and_cache_perms(self):"""Cache permissions-views"""self.perms=self.get_current_user_permissions()def_has_role(self,role_name_or_list):"""Whether the user has this role name"""ifnotisinstance(role_name_or_list,list):role_name_or_list=[role_name_or_list]returnany(r.nameinrole_name_or_listforrinself.get_user_roles())def_has_perm(self,permission_name,view_menu_name):"""Whether the user has this perm"""ifhasattr(self,'perms')andself.permsisnotNone:if(permission_name,view_menu_name)inself.perms:returnTrue# rebuild the permissions setself._get_and_cache_perms()return(permission_name,view_menu_name)inself.permsdefhas_all_dags_access(self):""" Has all the dag access in any of the 3 cases: 1. Role needs to be in (Admin, Viewer, User, Op). 2. Has can_read permission on dags view. 3. Has can_edit permission on dags view. """return(self._has_role(['Admin','Viewer','Op','User'])orself._has_perm(permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG)orself._has_perm(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG))defclean_perms(self):"""FAB leaves faulty permissions that need to be cleaned up"""self.log.debug('Cleaning faulty perms')sesh=self.get_sessionpvms=sesh.query(sqla_models.PermissionView).filter(or_(sqla_models.PermissionView.permission==None,# noqa pylint: disable=singleton-comparisonsqla_models.PermissionView.view_menu==None,# noqa pylint: disable=singleton-comparison))# Since FAB doesn't define ON DELETE CASCADE on these tables, we need# to delete the _object_ so that SQLA knows to delete the many-to-many# relationship object too. :(deleted_count=0forpvminpvms:sesh.delete(pvm)deleted_count+=1sesh.commit()ifdeleted_count:self.log.info('Deleted %s faulty permissions',deleted_count)def_merge_perm(self,permission_name,view_menu_name):""" Add the new (permission, view_menu) to assoc_permissionview_role if it doesn't exist. It will add the related entry to ab_permission and ab_view_menu two meta tables as well. :param permission_name: Name of the permission. :type permission_name: str :param view_menu_name: Name of the view-menu :type view_menu_name: str :return: """permission=self.find_permission(permission_name)view_menu=self.find_view_menu(view_menu_name)permission_view=Noneifpermissionandview_menu:permission_view=(self.get_session.query(self.permissionview_model).filter_by(permission=permission,view_menu=view_menu).first())ifnotpermission_viewandpermission_nameandview_menu_name:self.add_permission_view_menu(permission_name,view_menu_name)defadd_homepage_access_to_custom_roles(self):""" Add Website.can_read access to all custom roles. :return: None. """website_permission=self.add_permission_view_menu(permissions.ACTION_CAN_READ,permissions.RESOURCE_WEBSITE)custom_roles=[roleforroleinself.get_all_roles()ifrole.namenotinEXISTING_ROLES]forroleincustom_roles:self.add_permission_role(role,website_permission)self.get_session.commit()defget_all_permissions(self)->Set[Tuple[str,str]]:"""Returns all permissions as a set of tuples with the perm name and view menu name"""returnset(self.get_session.query(self.permissionview_model).join(self.permission_model).join(self.viewmenu_model).with_entities(self.permission_model.name,self.viewmenu_model.name).all())def_get_all_non_dag_permissionviews(self)->Dict[Tuple[str,str],PermissionView]:""" Returns a dict with a key of (perm name, view menu name) and value of perm view with all perm views except those that are for specific DAGs. """return{(perm_name,viewmodel_name):viewmodelforperm_name,viewmodel_name,viewmodelin(self.get_session.query(self.permissionview_model).join(self.permission_model).join(self.viewmenu_model).filter(~self.viewmenu_model.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%")).with_entities(self.permission_model.name,self.viewmenu_model.name,self.permissionview_model).all())}def_get_all_roles_with_permissions(self)->Dict[str,Role]:"""Returns a dict with a key of role name and value of role with eagrly loaded permissions"""return{r.name:rforrin(self.get_session.query(self.role_model).options(joinedload(self.role_model.permissions)).all())}defcreate_dag_specific_permissions(self)->None:""" Creates 'can_read' and 'can_edit' permissions for all DAGs, along with any `access_control` permissions provided in them. This does iterate through ALL the DAGs, which can be slow. See `sync_perm_for_dag` if you only need to sync a single DAG. :return: None. """perms=self.get_all_permissions()dagbag=DagBag(read_dags_from_db=True)dagbag.collect_dags_from_db()dags=dagbag.dags.values()fordagindags:dag_resource_name=permissions.resource_name_for_dag(dag.dag_id)forperm_nameinself.DAG_PERMS:if(perm_name,dag_resource_name)notinperms:self._merge_perm(perm_name,dag_resource_name)ifdag.access_control:self._sync_dag_view_permissions(dag_resource_name,dag.access_control)defupdate_admin_perm_view(self):""" Admin should have all the permission-views, except the dag views. because Admin already has Dags permission. Add the missing ones to the table for admin. :return: None. """dag_pvs=(self.get_session.query(sqla_models.ViewMenu).filter(sqla_models.ViewMenu.name.like(f"{permissions.RESOURCE_DAG_PREFIX}%")).all())pv_ids=[pv.idforpvindag_pvs]pvms=(self.get_session.query(sqla_models.PermissionView).filter(~sqla_models.PermissionView.view_menu_id.in_(pv_ids)).all())pvms=[pforpinpvmsifp.permissionandp.view_menu]admin=self.find_role('Admin')admin.permissions=list(set(admin.permissions)|set(pvms))self.get_session.commit()defsync_roles(self):""" 1. Init the default role(Admin, Viewer, User, Op, public) with related permissions. 2. Init the custom role(dag-user) with related permissions. :return: None. """# Create global all-dag VMself.create_perm_vm_for_all_dag()# Sync the default roles (Admin, Viewer, User, Op, public) with related permissionsself.bulk_sync_roles(self.ROLE_CONFIGS)self.add_homepage_access_to_custom_roles()# init existing roles, the rest role could be created through UI.self.update_admin_perm_view()self.clean_perms()defsync_resource_permissions(self,perms=None):"""Populates resource-based permissions."""ifnotperms:returnforaction,resourceinperms:self.add_view_menu(resource)self.add_permission_view_menu(action,resource)defsync_perm_for_dag(self,dag_id,access_control=None):""" Sync permissions for given dag id. The dag id surely exists in our dag bag as only / refresh button or DagBag will call this function :param dag_id: the ID of the DAG whose permissions should be updated :type dag_id: str :param access_control: a dict where each key is a rolename and each value is a set() of permission names (e.g., {'can_read'} :type access_control: dict :return: """dag_resource_name=permissions.resource_name_for_dag(dag_id)fordag_perminself.DAG_PERMS:self.add_permission_view_menu(dag_perm,dag_resource_name)ifaccess_control:self._sync_dag_view_permissions(dag_resource_name,access_control)def_sync_dag_view_permissions(self,dag_id,access_control):"""Set the access policy on the given DAG's ViewModel. :param dag_id: the ID of the DAG whose permissions should be updated :type dag_id: str :param access_control: a dict where each key is a rolename and each value is a set() of permission names (e.g., {'can_read'} :type access_control: dict """dag_resource_name=permissions.resource_name_for_dag(dag_id)def_get_or_create_dag_permission(perm_name):dag_perm=self.find_permission_view_menu(perm_name,dag_resource_name)ifnotdag_perm:self.log.info("Creating new permission '%s' on view '%s'",perm_name,dag_resource_name)dag_perm=self.add_permission_view_menu(perm_name,dag_resource_name)returndag_permdef_revoke_stale_permissions(dag_view):existing_dag_perms=self.find_permissions_view_menu(dag_view)forperminexisting_dag_perms:non_admin_roles=[roleforroleinperm.roleifrole.name!='Admin']forroleinnon_admin_roles:target_perms_for_role=access_control.get(role.name,{})ifperm.permission.namenotintarget_perms_for_role:self.log.info("Revoking '%s' on DAG '%s' for role '%s'",perm.permission,dag_resource_name,role.name,)self.del_permission_role(role,perm)dag_view=self.find_view_menu(dag_resource_name)ifdag_view:_revoke_stale_permissions(dag_view)forrolename,permsinaccess_control.items():role=self.find_role(rolename)ifnotrole:raiseAirflowException("The access_control mapping for DAG '{}' includes a role ""named '{}', but that role does not exist".format(dag_id,rolename))perms=set(perms)invalid_perms=perms-self.DAG_PERMSifinvalid_perms:raiseAirflowException("The access_control map for DAG '{}' includes the following ""invalid permissions: {}; The set of valid permissions ""is: {}".format(dag_resource_name,invalid_perms,self.DAG_PERMS))forperm_nameinperms:dag_perm=_get_or_create_dag_permission(perm_name)self.add_permission_role(role,dag_perm)defcreate_perm_vm_for_all_dag(self):"""Create perm-vm if not exist and insert into FAB security model for all-dags."""# create perm for global logical dagfordag_vminself.DAG_VMS:forperminself.DAG_PERMS:self._merge_perm(permission_name=perm,view_menu_name=dag_vm)defcheck_authorization(self,perms:Optional[Sequence[Tuple[str,str]]]=None,dag_id:Optional[str]=None)->bool:"""Checks that the logged in user has the specified permissions."""ifnotperms:returnTrueforperminperms:ifpermin((permissions.ACTION_CAN_READ,permissions.RESOURCE_DAG),(permissions.ACTION_CAN_EDIT,permissions.RESOURCE_DAG),):can_access_all_dags=self.has_access(*perm)ifcan_access_all_dags:continueaction=perm[0]ifself.can_access_some_dags(action,dag_id):continuereturnFalseelifnotself.has_access(*perm):returnFalsereturnTrueclassApplessAirflowSecurityManager(AirflowSecurityManager):"""Security Manager that doesn't need the whole flask app"""def__init__(self,session=None):# pylint: disable=super-init-not-calledself.session=session@propertydefget_session(self):returnself.session