#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import argparse
import warnings
from functools import cached_property
from pathlib import Path
from typing import TYPE_CHECKING, Container
import packaging.version
from connexion import FlaskApi
from flask import Blueprint, url_for
from packaging.version import Version
from sqlalchemy import select
from sqlalchemy.orm import Session, joinedload
from airflow import __version__ as airflow_version
from airflow.auth.managers.base_auth_manager import BaseAuthManager, ResourceMethod
from airflow.auth.managers.models.resource_details import (
AccessView,
ConfigurationDetails,
ConnectionDetails,
DagAccessEntity,
DagDetails,
PoolDetails,
VariableDetails,
)
from airflow.auth.managers.utils.fab import get_fab_action_from_method_map, get_method_from_fab_action_map
from airflow.cli.cli_config import (
DefaultHelpParser,
GroupCommand,
)
from airflow.configuration import conf
from airflow.exceptions import AirflowConfigException, AirflowException, AirflowProviderDeprecationWarning
from airflow.models import DagModel
from airflow.providers.fab.auth_manager.cli_commands.definition import (
DB_COMMANDS,
ROLES_COMMANDS,
SYNC_PERM_COMMAND,
USERS_COMMANDS,
)
from airflow.providers.fab.auth_manager.models import Permission, Role, User
from airflow.security import permissions
from airflow.security.permissions import (
RESOURCE_AUDIT_LOG,
RESOURCE_CLUSTER_ACTIVITY,
RESOURCE_CONFIG,
RESOURCE_CONNECTION,
RESOURCE_DAG,
RESOURCE_DAG_CODE,
RESOURCE_DAG_DEPENDENCIES,
RESOURCE_DAG_RUN,
RESOURCE_DAG_WARNING,
RESOURCE_DOCS,
RESOURCE_IMPORT_ERROR,
RESOURCE_JOB,
RESOURCE_PLUGIN,
RESOURCE_POOL,
RESOURCE_PROVIDER,
RESOURCE_SLA_MISS,
RESOURCE_TASK_INSTANCE,
RESOURCE_TASK_LOG,
RESOURCE_TASK_RESCHEDULE,
RESOURCE_TRIGGER,
RESOURCE_VARIABLE,
RESOURCE_WEBSITE,
RESOURCE_XCOM,
)
from airflow.utils.session import NEW_SESSION, provide_session
from airflow.utils.yaml import safe_load
from airflow.version import version
from airflow.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED
from airflow.www.extensions.init_views import _CustomErrorRequestBodyValidator, _LazyResolver
if TYPE_CHECKING:
from airflow.auth.managers.models.base_user import BaseUser
from airflow.cli.cli_config import (
CLICommand,
)
from airflow.providers.common.compat.assets import AssetDetails
from airflow.providers.fab.auth_manager.security_manager.override import FabAirflowSecurityManagerOverride
from airflow.security.permissions import RESOURCE_ASSET
else:
from airflow.providers.common.compat.security.permissions import RESOURCE_ASSET
_MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE: dict[DagAccessEntity, tuple[str, ...]] = {
DagAccessEntity.AUDIT_LOG: (RESOURCE_AUDIT_LOG,),
DagAccessEntity.CODE: (RESOURCE_DAG_CODE,),
DagAccessEntity.DEPENDENCIES: (RESOURCE_DAG_DEPENDENCIES,),
DagAccessEntity.RUN: (RESOURCE_DAG_RUN,),
DagAccessEntity.SLA_MISS: (RESOURCE_SLA_MISS,),
# RESOURCE_TASK_INSTANCE has been originally misused. RESOURCE_TASK_INSTANCE referred to task definition
# AND task instances without making the difference
# To be backward compatible, we translate DagAccessEntity.TASK_INSTANCE to RESOURCE_TASK_INSTANCE AND
# RESOURCE_DAG_RUN
# See https://github.com/apache/airflow/pull/34317#discussion_r1355917769
DagAccessEntity.TASK: (RESOURCE_TASK_INSTANCE,),
DagAccessEntity.TASK_INSTANCE: (RESOURCE_DAG_RUN, RESOURCE_TASK_INSTANCE),
DagAccessEntity.TASK_LOGS: (RESOURCE_TASK_LOG,),
DagAccessEntity.TASK_RESCHEDULE: (RESOURCE_TASK_RESCHEDULE,),
DagAccessEntity.WARNING: (RESOURCE_DAG_WARNING,),
DagAccessEntity.XCOM: (RESOURCE_XCOM,),
}
_MAP_ACCESS_VIEW_TO_FAB_RESOURCE_TYPE = {
AccessView.CLUSTER_ACTIVITY: RESOURCE_CLUSTER_ACTIVITY,
AccessView.DOCS: RESOURCE_DOCS,
AccessView.IMPORT_ERRORS: RESOURCE_IMPORT_ERROR,
AccessView.JOBS: RESOURCE_JOB,
AccessView.PLUGINS: RESOURCE_PLUGIN,
AccessView.PROVIDERS: RESOURCE_PROVIDER,
AccessView.TRIGGERS: RESOURCE_TRIGGER,
AccessView.WEBSITE: RESOURCE_WEBSITE,
}
[docs]class FabAuthManager(BaseAuthManager):
"""
Flask-AppBuilder auth manager.
This auth manager is responsible for providing a backward compatible user management experience to users.
"""
@staticmethod
[docs] def get_cli_commands() -> list[CLICommand]:
"""Vends CLI commands to be included in Airflow CLI."""
commands: list[CLICommand] = [
GroupCommand(
name="users",
help="Manage users",
subcommands=USERS_COMMANDS,
),
GroupCommand(
name="roles",
help="Manage roles",
subcommands=ROLES_COMMANDS,
),
SYNC_PERM_COMMAND, # not in a command group
]
# If Airflow version is 3.0.0 or higher, add the fab-db command group
if packaging.version.parse(
packaging.version.parse(airflow_version).base_version
) >= packaging.version.parse("3.0.0"):
commands.append(GroupCommand(name="fab-db", help="Manage FAB", subcommands=DB_COMMANDS))
return commands
[docs] def get_api_endpoints(self) -> None | Blueprint:
folder = Path(__file__).parents[0].resolve() # this is airflow/auth/managers/fab/
with folder.joinpath("openapi", "v1.yaml").open() as f:
specification = safe_load(f)
return FlaskApi(
specification=specification,
resolver=_LazyResolver(),
base_path="/auth/fab/v1",
options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()},
strict_validation=True,
validate_responses=True,
validator_map={"body": _CustomErrorRequestBodyValidator},
).blueprint
[docs] def get_user_display_name(self) -> str:
"""Return the user's display name associated to the user in session."""
user = self.get_user()
first_name = user.first_name.strip() if isinstance(user.first_name, str) else ""
last_name = user.last_name.strip() if isinstance(user.last_name, str) else ""
return f"{first_name} {last_name}".strip()
[docs] def get_user(self) -> User:
"""Return the user associated to the user in session."""
from flask_login import current_user
return current_user
[docs] def init(self) -> None:
"""Run operations when Airflow is initializing."""
self._sync_appbuilder_roles()
[docs] def is_logged_in(self) -> bool:
"""Return whether the user is logged in."""
user = self.get_user()
if Version(Version(version).base_version) < Version("3.0.0"):
return not user.is_anonymous and user.is_active
else:
return self.appbuilder.get_app.config.get("AUTH_ROLE_PUBLIC", None) or (
not user.is_anonymous and user.is_active
)
[docs] def is_authorized_configuration(
self,
*,
method: ResourceMethod,
details: ConfigurationDetails | None = None,
user: BaseUser | None = None,
) -> bool:
return self._is_authorized(method=method, resource_type=RESOURCE_CONFIG, user=user)
[docs] def is_authorized_connection(
self,
*,
method: ResourceMethod,
details: ConnectionDetails | None = None,
user: BaseUser | None = None,
) -> bool:
return self._is_authorized(method=method, resource_type=RESOURCE_CONNECTION, user=user)
[docs] def is_authorized_dag(
self,
*,
method: ResourceMethod,
access_entity: DagAccessEntity | None = None,
details: DagDetails | None = None,
user: BaseUser | None = None,
) -> bool:
"""
Return whether the user is authorized to access the dag.
There are multiple scenarios:
1. ``dag_access`` is not provided which means the user wants to access the DAG itself and not a sub
entity (e.g. DAG runs).
2. ``dag_access`` is provided which means the user wants to access a sub entity of the DAG
(e.g. DAG runs).
a. If ``method`` is GET, then check the user has READ permissions on the DAG and the sub entity.
b. Else, check the user has EDIT permissions on the DAG and ``method`` on the sub entity. However,
if no specific DAG is targeted, just check the sub entity.
:param method: The method to authorize.
:param access_entity: The dag access entity.
:param details: The dag details.
:param user: The user.
"""
if not access_entity:
# Scenario 1
return self._is_authorized_dag(method=method, details=details, user=user)
else:
# Scenario 2
resource_types = self._get_fab_resource_types(access_entity)
dag_method: ResourceMethod = "GET" if method == "GET" else "PUT"
if (details and details.id) and not self._is_authorized_dag(
method=dag_method, details=details, user=user
):
return False
return all(
self._is_authorized(method=method, resource_type=resource_type, user=user)
if resource_type != RESOURCE_DAG_RUN or not hasattr(permissions, "resource_name")
else self._is_authorized_dag_run(method=method, details=details, user=user)
for resource_type in resource_types
)
[docs] def is_authorized_asset(
self, *, method: ResourceMethod, details: AssetDetails | None = None, user: BaseUser | None = None
) -> bool:
return self._is_authorized(method=method, resource_type=RESOURCE_ASSET, user=user)
[docs] def is_authorized_dataset(
self, *, method: ResourceMethod, details: AssetDetails | None = None, user: BaseUser | None = None
) -> bool:
warnings.warn(
"is_authorized_dataset will be renamed as is_authorized_asset in Airflow 3 and will be removed when the minimum Airflow version is set to 3.0 for the fab provider",
AirflowProviderDeprecationWarning,
stacklevel=2,
)
return self.is_authorized_asset(method=method, user=user)
[docs] def is_authorized_pool(
self, *, method: ResourceMethod, details: PoolDetails | None = None, user: BaseUser | None = None
) -> bool:
return self._is_authorized(method=method, resource_type=RESOURCE_POOL, user=user)
[docs] def is_authorized_variable(
self, *, method: ResourceMethod, details: VariableDetails | None = None, user: BaseUser | None = None
) -> bool:
return self._is_authorized(method=method, resource_type=RESOURCE_VARIABLE, user=user)
[docs] def is_authorized_view(self, *, access_view: AccessView, user: BaseUser | None = None) -> bool:
# "Docs" are only links in the menu, there is no page associated
method: ResourceMethod = "MENU" if access_view == AccessView.DOCS else "GET"
return self._is_authorized(
method=method, resource_type=_MAP_ACCESS_VIEW_TO_FAB_RESOURCE_TYPE[access_view], user=user
)
[docs] def is_authorized_custom_view(
self, *, method: ResourceMethod | str, resource_name: str, user: BaseUser | None = None
):
if not user:
user = self.get_user()
fab_action_name = get_fab_action_from_method_map().get(method, method)
return (fab_action_name, resource_name) in self._get_user_permissions(user)
@provide_session
[docs] def get_permitted_dag_ids(
self,
*,
methods: Container[ResourceMethod] | None = None,
user=None,
session: Session = NEW_SESSION,
) -> set[str]:
if not methods:
methods = ["PUT", "GET"]
if not user:
user = self.get_user()
if not self.is_logged_in():
roles = user.roles
else:
if ("GET" in methods and self.is_authorized_dag(method="GET", user=user)) or (
"PUT" in methods and self.is_authorized_dag(method="PUT", user=user)
):
# If user is authorized to read/edit all DAGs, return all DAGs
return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
user_query = session.scalar(
select(User)
.options(
joinedload(User.roles)
.subqueryload(Role.permissions)
.options(joinedload(Permission.action), joinedload(Permission.resource))
)
.where(User.id == user.id)
)
roles = user_query.roles
map_fab_action_name_to_method_name = get_method_from_fab_action_map()
resources = set()
for role in roles:
for permission in role.permissions:
action = permission.action.name
if (
action in map_fab_action_name_to_method_name
and map_fab_action_name_to_method_name[action] in methods
):
resource = permission.resource.name
if resource == permissions.RESOURCE_DAG:
return {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
if resource.startswith(permissions.RESOURCE_DAG_PREFIX):
resources.add(resource[len(permissions.RESOURCE_DAG_PREFIX) :])
else:
resources.add(resource)
return set(session.scalars(select(DagModel.dag_id).where(DagModel.dag_id.in_(resources))))
@cached_property
[docs] def security_manager(self) -> FabAirflowSecurityManagerOverride:
"""Return the security manager specific to FAB."""
from airflow.providers.fab.auth_manager.security_manager.override import (
FabAirflowSecurityManagerOverride,
)
sm_from_config = self.appbuilder.get_app.config.get("SECURITY_MANAGER_CLASS")
if sm_from_config:
if not issubclass(sm_from_config, FabAirflowSecurityManagerOverride):
raise AirflowConfigException(
"""Your CUSTOM_SECURITY_MANAGER must extend FabAirflowSecurityManagerOverride."""
)
return sm_from_config(self.appbuilder)
return FabAirflowSecurityManagerOverride(self.appbuilder)
[docs] def get_url_login(self, **kwargs) -> str:
"""Return the login page url."""
if not self.security_manager.auth_view:
raise AirflowException("`auth_view` not defined in the security manager.")
if next_url := kwargs.get("next_url"):
return url_for(f"{self.security_manager.auth_view.endpoint}.login", next=next_url)
else:
return url_for(f"{self.security_manager.auth_view.endpoint}.login")
[docs] def get_url_logout(self):
"""Return the logout page url."""
if not self.security_manager.auth_view:
raise AirflowException("`auth_view` not defined in the security manager.")
return url_for(f"{self.security_manager.auth_view.endpoint}.logout")
[docs] def get_url_user_profile(self) -> str | None:
"""Return the url to a page displaying info about the current user."""
if not self.security_manager.user_view or self.appbuilder.get_app.config.get(
"AUTH_ROLE_PUBLIC", None
):
return None
return url_for(f"{self.security_manager.user_view.endpoint}.userinfo")
[docs] def register_views(self) -> None:
self.security_manager.register_views()
def _is_authorized(
self,
*,
method: ResourceMethod,
resource_type: str,
user: BaseUser | None = None,
) -> bool:
"""
Return whether the user is authorized to perform a given action.
:param method: the method to perform
:param resource_type: the type of resource the user attempts to perform the action on
:param user: the user to perform the action on. If not provided (or None), it uses the current user
:meta private:
"""
if not user:
user = self.get_user()
fab_action = self._get_fab_action(method)
user_permissions = self._get_user_permissions(user)
return (fab_action, resource_type) in user_permissions
def _is_authorized_dag(
self,
method: ResourceMethod,
details: DagDetails | None = None,
user: BaseUser | None = None,
) -> bool:
"""
Return whether the user is authorized to perform a given action on a DAG.
:param method: the method to perform
:param details: optional details about the DAG
:param user: the user to perform the action on. If not provided (or None), it uses the current user
:meta private:
"""
is_global_authorized = self._is_authorized(method=method, resource_type=RESOURCE_DAG, user=user)
if is_global_authorized:
return True
if details and details.id:
# Check whether the user has permissions to access a specific DAG
resource_dag_name = self._resource_name(details.id, RESOURCE_DAG)
return self._is_authorized(method=method, resource_type=resource_dag_name, user=user)
return False
def _is_authorized_dag_run(
self,
method: ResourceMethod,
details: DagDetails | None = None,
user: BaseUser | None = None,
) -> bool:
"""
Return whether the user is authorized to perform a given action on a DAG Run.
:param method: the method to perform
:param details: optional, details about the DAG
:param user: optional, the user to perform the action on. If not provided, it uses the current user
:meta private:
"""
is_global_authorized = self._is_authorized(method=method, resource_type=RESOURCE_DAG_RUN, user=user)
if is_global_authorized:
return True
if details and details.id:
# Check whether the user has permissions to access a specific DAG Run permission on a DAG Level
resource_dag_name = self._resource_name(details.id, RESOURCE_DAG_RUN)
return self._is_authorized(method=method, resource_type=resource_dag_name, user=user)
return False
@staticmethod
def _get_fab_action(method: ResourceMethod) -> str:
"""
Convert the method to a FAB action.
:param method: the method to convert
:meta private:
"""
fab_action_from_method_map = get_fab_action_from_method_map()
if method not in fab_action_from_method_map:
raise AirflowException(f"Unknown method: {method}")
return fab_action_from_method_map[method]
@staticmethod
def _get_fab_resource_types(dag_access_entity: DagAccessEntity) -> tuple[str, ...]:
"""
Convert a DAG access entity to a tuple of FAB resource type.
:param dag_access_entity: the DAG access entity
:meta private:
"""
if dag_access_entity not in _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE:
raise AirflowException(f"Unknown DAG access entity: {dag_access_entity}")
return _MAP_DAG_ACCESS_ENTITY_TO_FAB_RESOURCE_TYPE[dag_access_entity]
def _resource_name(self, dag_id: str, resource_type: str) -> str:
"""
Return the FAB resource name for a DAG id.
:param dag_id: the DAG id
:meta private:
"""
root_dag_id = self._get_root_dag_id(dag_id)
if hasattr(permissions, "resource_name"):
return getattr(permissions, "resource_name")(root_dag_id, resource_type)
return getattr(permissions, "resource_name_for_dag")(root_dag_id)
@staticmethod
def _get_user_permissions(user: BaseUser):
"""
Return the user permissions.
:param user: the user to get permissions for
:meta private:
"""
return getattr(user, "perms") or []
def _get_root_dag_id(self, dag_id: str) -> str:
"""
Return the root DAG id in case of sub DAG, return the DAG id otherwise.
:param dag_id: the DAG id
:meta private:
"""
if "." in dag_id and hasattr(DagModel, "root_dag_id"):
return self.appbuilder.get_session.scalar(
select(DagModel.dag_id, DagModel.root_dag_id).where(DagModel.dag_id == dag_id).limit(1)
)
return dag_id
def _sync_appbuilder_roles(self):
"""
Sync appbuilder roles to DB.
:meta private:
"""
# Garbage collect old permissions/views after they have been modified.
# Otherwise, when the name of a view or menu is changed, the framework
# will add the new Views and Menus names to the backend, but will not
# delete the old ones.
if Version(Version(version).base_version) >= Version("3.0.0"):
fallback = None
else:
fallback = conf.getboolean("webserver", "UPDATE_FAB_PERMS")
if conf.getboolean("fab", "UPDATE_FAB_PERMS", fallback=fallback):
self.security_manager.sync_roles()
[docs]def get_parser() -> argparse.ArgumentParser:
"""Generate documentation; used by Sphinx argparse."""
from airflow.cli.cli_parser import AirflowHelpFormatter, _add_command
parser = DefaultHelpParser(prog="airflow", formatter_class=AirflowHelpFormatter)
subparsers = parser.add_subparsers(dest="subcommand", metavar="GROUP_OR_COMMAND")
for group_command in FabAuthManager.get_cli_commands():
_add_command(subparsers, group_command)
return parser