Source code for airflow.providers.fab.auth_manager.cli_commands.role_command

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Roles sub-commands."""

from __future__ import annotations

import itertools
import json
import os
from argparse import Namespace
from collections import defaultdict
from typing import TYPE_CHECKING

from airflow.cli.simple_table import AirflowConsole
from airflow.providers.fab.auth_manager.cli_commands.utils import get_application_builder
from airflow.providers.fab.auth_manager.security_manager.constants import EXISTING_ROLES
from airflow.utils import cli as cli_utils
from airflow.utils.cli import suppress_logs_and_warning
from airflow.utils.providers_configuration_loader import providers_configuration_loaded

if TYPE_CHECKING:
    from airflow.providers.fab.auth_manager.models import Action, Permission, Resource, Role


@suppress_logs_and_warning
@providers_configuration_loaded
[docs]def roles_list(args): """List all existing roles.""" with get_application_builder() as appbuilder: roles = appbuilder.sm.get_all_roles() if not args.permission: AirflowConsole().print_as( data=sorted(r.name for r in roles), output=args.output, mapper=lambda x: {"name": x} ) return permission_map: dict[tuple[str, str], list[str]] = defaultdict(list) for role in roles: for permission in role.permissions: permission_map[(role.name, permission.resource.name)].append(permission.action.name) AirflowConsole().print_as( data=sorted(permission_map), output=args.output, mapper=lambda x: {"name": x[0], "resource": x[1], "action": ",".join(sorted(permission_map[x]))}, )
@cli_utils.action_cli @suppress_logs_and_warning @providers_configuration_loaded
[docs]def roles_create(args): """Create new empty role in DB.""" with get_application_builder() as appbuilder: for role_name in args.role: appbuilder.sm.add_role(role_name) print(f"Added {len(args.role)} role(s)")
@cli_utils.action_cli @suppress_logs_and_warning @providers_configuration_loaded
[docs]def roles_delete(args): """Delete role in DB.""" with get_application_builder() as appbuilder: for role_name in args.role: role = appbuilder.sm.find_role(role_name) if not role: print(f"Role named '{role_name}' does not exist") exit(1) for role_name in args.role: appbuilder.sm.delete_role(role_name) print(f"Deleted {len(args.role)} role(s)")
def __roles_add_or_remove_permissions(args): with get_application_builder() as appbuilder: is_add: bool = args.subcommand.startswith("add") role_map = {} perm_map: dict[tuple[str, str], set[str]] = defaultdict(set) asm = appbuilder.sm for name in args.role: role: Role | None = asm.find_role(name) if not role: print(f"Role named '{name}' does not exist") exit(1) role_map[name] = role for permission in role.permissions: perm_map[(name, permission.resource.name)].add(permission.action.name) for name in args.resource: resource: Resource | None = asm.get_resource(name) if not resource: print(f"Resource named '{name}' does not exist") exit(1) for name in args.action or []: action: Action | None = asm.get_action(name) if not action: print(f"Action named '{name}' does not exist") exit(1) permission_count = 0 for role_name, resource_name, action_name in itertools.product( args.role, args.resource, args.action or [None] ): res_key = (role_name, resource_name) if is_add and action_name not in perm_map[res_key]: perm: Permission | None = asm.create_permission(action_name, resource_name) asm.add_permission_to_role(role_map[role_name], perm) print(f"Added {perm} to role {role_name}") permission_count += 1 elif not is_add and res_key in perm_map: for _action_name in perm_map[res_key] if action_name is None else [action_name]: perm: Permission | None = asm.get_permission(_action_name, resource_name) asm.remove_permission_from_role(role_map[role_name], perm) print(f"Deleted {perm} from role {role_name}") permission_count += 1 print(f"{'Added' if is_add else 'Deleted'} {permission_count} permission(s)") @cli_utils.action_cli @suppress_logs_and_warning @providers_configuration_loaded
[docs]def roles_add_perms(args): """Add permissions to role in DB.""" __roles_add_or_remove_permissions(args)
@cli_utils.action_cli @suppress_logs_and_warning @providers_configuration_loaded
[docs]def roles_del_perms(args): """Delete permissions from role in DB.""" __roles_add_or_remove_permissions(args)
@suppress_logs_and_warning @providers_configuration_loaded
[docs]def roles_export(args): """Export all the roles from the database to a file including permissions.""" with get_application_builder() as appbuilder: roles = appbuilder.sm.get_all_roles() exporting_roles = [role for role in roles if role.name not in EXISTING_ROLES] filename = os.path.expanduser(args.file) permission_map: dict[tuple[str, str], list[str]] = defaultdict(list) for role in exporting_roles: if role.permissions: for permission in role.permissions: permission_map[(role.name, permission.resource.name)].append(permission.action.name) else: permission_map[(role.name, "")].append("") export_data = [ {"name": role, "resource": resource, "action": ",".join(sorted(permissions))} for (role, resource), permissions in permission_map.items() ] kwargs = {} if not args.pretty else {"sort_keys": False, "indent": 4} with open(filename, "w", encoding="utf-8") as f: json.dump(export_data, f, **kwargs) print( f"{len(exporting_roles)} roles with {len(export_data)} linked permissions successfully exported to {filename}" )
@cli_utils.action_cli @suppress_logs_and_warning
[docs]def roles_import(args): """ Import all the roles into the db from the given json file including their permissions. Note, if a role already exists in the db, it is not overwritten, even when the permissions change. """ json_file = args.file try: with open(json_file) as f: role_list = json.load(f) except FileNotFoundError: print(f"File '{json_file}' does not exist") exit(1) except ValueError as e: print(f"File '{json_file}' is not a valid JSON file. Error: {e}") exit(1) with get_application_builder() as appbuilder: existing_roles = [role.name for role in appbuilder.sm.get_all_roles()] roles_to_import = [role_dict for role_dict in role_list if role_dict["name"] not in existing_roles] for role_dict in roles_to_import: if role_dict["name"] not in appbuilder.sm.get_all_roles(): if role_dict["action"] == "" or role_dict["resource"] == "": appbuilder.sm.add_role(role_dict["name"]) else: appbuilder.sm.add_role(role_dict["name"]) role_args = Namespace( subcommand="add-perms", role=[role_dict["name"]], resource=[role_dict["resource"]], action=role_dict["action"].split(","), ) __roles_add_or_remove_permissions(role_args) if role_dict["name"] in appbuilder.sm.get_all_roles(): if role_dict["action"] == "" or role_dict["resource"] == "": pass else: role_args = Namespace( subcommand="add-perms", role=[role_dict["name"]], resource=[role_dict["resource"]], action=role_dict["action"].split(","), ) __roles_add_or_remove_permissions(role_args) print("roles and permissions successfully imported")

Was this entry helpful?