Source code for airflow.providers.fab.auth_manager.models
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportdatetime# This product contains a modified portion of 'Flask App Builder' developed by Daniel Vaz Gaspar.# (https://github.com/dpgaspar/Flask-AppBuilder).# Copyright 2013, Daniel Vaz GasparfromtypingimportTYPE_CHECKINGimportpackaging.versionfromflaskimportcurrent_app,gfromflask_appbuilder.models.sqlaimportModelfromsqlalchemyimport(Boolean,Column,DateTime,ForeignKey,Index,Integer,MetaData,String,Table,UniqueConstraint,event,func,select,)fromsqlalchemy.ormimportbackref,declared_attr,registry,relationshipfromairflowimport__version__asairflow_versionfromairflow.auth.managers.models.base_userimportBaseUserfromairflow.models.baseimport_get_schema,naming_conventionifTYPE_CHECKING:try:fromsqlalchemyimportIdentityexceptException:
[docs]defperms(self):ifnotself._perms:# Using the ORM here is _slow_ (Creating lots of objects to then throw them away) since this is in# the path for every request. Avoid it if we can!ifcurrent_app:sm=current_app.appbuilder.smself._perms:set[tuple[str,str]]=set(sm.get_session.execute(select(sm.action_model.name,sm.resource_model.name).join(sm.permission_model.action).join(sm.permission_model.resource).join(sm.permission_model.role).where(sm.role_model.user.contains(self))))else:self._perms={(perm.action.name,perm.resource.name)forroleinself.rolesforperminrole.permissions}returnself._perms