#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Roles sub-commands."""
from __future__ import annotations
import itertools
import json
import os
from argparse import Namespace
from collections import defaultdict
from typing import TYPE_CHECKING
from airflow.cli.simple_table import AirflowConsole
from airflow.providers.fab.auth_manager.cli_commands.utils import get_application_builder
from airflow.providers.fab.auth_manager.security_manager.constants import EXISTING_ROLES
from airflow.utils import cli as cli_utils
from airflow.utils.cli import suppress_logs_and_warning
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
if TYPE_CHECKING:
from airflow.providers.fab.auth_manager.models import Action, Permission, Resource, Role
@suppress_logs_and_warning
@providers_configuration_loaded
[docs]def roles_list(args):
"""List all existing roles."""
with get_application_builder() as appbuilder:
roles = appbuilder.sm.get_all_roles()
if not args.permission:
AirflowConsole().print_as(
data=sorted(r.name for r in roles), output=args.output, mapper=lambda x: {"name": x}
)
return
permission_map: dict[tuple[str, str], list[str]] = defaultdict(list)
for role in roles:
for permission in role.permissions:
permission_map[(role.name, permission.resource.name)].append(permission.action.name)
AirflowConsole().print_as(
data=sorted(permission_map),
output=args.output,
mapper=lambda x: {"name": x[0], "resource": x[1], "action": ",".join(sorted(permission_map[x]))},
)
@cli_utils.action_cli
@suppress_logs_and_warning
@providers_configuration_loaded
[docs]def roles_create(args):
"""Create new empty role in DB."""
with get_application_builder() as appbuilder:
for role_name in args.role:
appbuilder.sm.add_role(role_name)
print(f"Added {len(args.role)} role(s)")
@cli_utils.action_cli
@suppress_logs_and_warning
@providers_configuration_loaded
[docs]def roles_delete(args):
"""Delete role in DB."""
with get_application_builder() as appbuilder:
for role_name in args.role:
role = appbuilder.sm.find_role(role_name)
if not role:
print(f"Role named '{role_name}' does not exist")
exit(1)
for role_name in args.role:
appbuilder.sm.delete_role(role_name)
print(f"Deleted {len(args.role)} role(s)")
def __roles_add_or_remove_permissions(args):
with get_application_builder() as appbuilder:
is_add: bool = args.subcommand.startswith("add")
role_map = {}
perm_map: dict[tuple[str, str], set[str]] = defaultdict(set)
asm = appbuilder.sm
for name in args.role:
role: Role | None = asm.find_role(name)
if not role:
print(f"Role named '{name}' does not exist")
exit(1)
role_map[name] = role
for permission in role.permissions:
perm_map[(name, permission.resource.name)].add(permission.action.name)
for name in args.resource:
resource: Resource | None = asm.get_resource(name)
if not resource:
print(f"Resource named '{name}' does not exist")
exit(1)
for name in args.action or []:
action: Action | None = asm.get_action(name)
if not action:
print(f"Action named '{name}' does not exist")
exit(1)
permission_count = 0
for role_name, resource_name, action_name in itertools.product(
args.role, args.resource, args.action or [None]
):
res_key = (role_name, resource_name)
if is_add and action_name not in perm_map[res_key]:
perm: Permission | None = asm.create_permission(action_name, resource_name)
asm.add_permission_to_role(role_map[role_name], perm)
print(f"Added {perm} to role {role_name}")
permission_count += 1
elif not is_add and res_key in perm_map:
for _action_name in perm_map[res_key] if action_name is None else [action_name]:
perm: Permission | None = asm.get_permission(_action_name, resource_name)
asm.remove_permission_from_role(role_map[role_name], perm)
print(f"Deleted {perm} from role {role_name}")
permission_count += 1
print(f"{'Added' if is_add else 'Deleted'} {permission_count} permission(s)")
@cli_utils.action_cli
@suppress_logs_and_warning
@providers_configuration_loaded
[docs]def roles_add_perms(args):
"""Add permissions to role in DB."""
__roles_add_or_remove_permissions(args)
@cli_utils.action_cli
@suppress_logs_and_warning
@providers_configuration_loaded
[docs]def roles_del_perms(args):
"""Delete permissions from role in DB."""
__roles_add_or_remove_permissions(args)
@suppress_logs_and_warning
@providers_configuration_loaded
[docs]def roles_export(args):
"""Export all the roles from the database to a file including permissions."""
with get_application_builder() as appbuilder:
roles = appbuilder.sm.get_all_roles()
exporting_roles = [role for role in roles if role.name not in EXISTING_ROLES]
filename = os.path.expanduser(args.file)
permission_map: dict[tuple[str, str], list[str]] = defaultdict(list)
for role in exporting_roles:
if role.permissions:
for permission in role.permissions:
permission_map[(role.name, permission.resource.name)].append(permission.action.name)
else:
permission_map[(role.name, "")].append("")
export_data = [
{"name": role, "resource": resource, "action": ",".join(sorted(permissions))}
for (role, resource), permissions in permission_map.items()
]
kwargs = {} if not args.pretty else {"sort_keys": False, "indent": 4}
with open(filename, "w", encoding="utf-8") as f:
json.dump(export_data, f, **kwargs)
print(
f"{len(exporting_roles)} roles with {len(export_data)} linked permissions successfully exported to {filename}"
)
@cli_utils.action_cli
@suppress_logs_and_warning
[docs]def roles_import(args):
"""
Import all the roles into the db from the given json file including their permissions.
Note, if a role already exists in the db, it is not overwritten, even when the permissions change.
"""
json_file = args.file
try:
with open(json_file) as f:
role_list = json.load(f)
except FileNotFoundError:
print(f"File '{json_file}' does not exist")
exit(1)
except ValueError as e:
print(f"File '{json_file}' is not a valid JSON file. Error: {e}")
exit(1)
with get_application_builder() as appbuilder:
existing_roles = [role.name for role in appbuilder.sm.get_all_roles()]
roles_to_import = [role_dict for role_dict in role_list if role_dict["name"] not in existing_roles]
for role_dict in roles_to_import:
if role_dict["name"] not in appbuilder.sm.get_all_roles():
if role_dict["action"] == "" or role_dict["resource"] == "":
appbuilder.sm.add_role(role_dict["name"])
else:
appbuilder.sm.add_role(role_dict["name"])
role_args = Namespace(
subcommand="add-perms",
role=[role_dict["name"]],
resource=[role_dict["resource"]],
action=role_dict["action"].split(","),
)
__roles_add_or_remove_permissions(role_args)
if role_dict["name"] in appbuilder.sm.get_all_roles():
if role_dict["action"] == "" or role_dict["resource"] == "":
pass
else:
role_args = Namespace(
subcommand="add-perms",
role=[role_dict["name"]],
resource=[role_dict["resource"]],
action=role_dict["action"].split(","),
)
__roles_add_or_remove_permissions(role_args)
print("roles and permissions successfully imported")