Source code for airflow.providers.fab.auth_manager.models.db
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from pathlib import Path
from airflow import settings
from airflow.exceptions import AirflowException
from airflow.providers.fab.auth_manager.models import metadata
from airflow.utils.db import _offline_migration, print_happy_cat
from airflow.utils.db_manager import BaseDBManager
[docs]PACKAGE_DIR = Path(__file__).parents[2]
_REVISION_HEADS_MAP: dict[str, str] = {
"1.4.0": "6709f7a774b9",
}
[docs]class FABDBManager(BaseDBManager):
"""Manages FAB database."""
[docs] version_table_name = "alembic_version_fab"
[docs] migration_dir = (PACKAGE_DIR / "migrations").as_posix()
[docs] alembic_file = (PACKAGE_DIR / "alembic.ini").as_posix()
[docs] supports_table_dropping = True
[docs] def upgradedb(self, to_revision=None, from_revision=None, show_sql_only=False):
"""Upgrade the database."""
if from_revision and not show_sql_only:
raise AirflowException("`from_revision` only supported with `sql_only=True`.")
# alembic adds significant import time, so we import it lazily
if not settings.SQL_ALCHEMY_CONN:
raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is a critical assertion.")
from alembic import command
config = self.get_alembic_config()
if show_sql_only:
if settings.engine.dialect.name == "sqlite":
raise SystemExit("Offline migration not supported for SQLite.")
if not from_revision:
from_revision = self.get_current_revision()
if not to_revision:
script = self.get_script_object(config)
to_revision = script.get_current_head()
if to_revision == from_revision:
print_happy_cat("No migrations to apply; nothing to do.")
return
_offline_migration(command.upgrade, config, f"{from_revision}:{to_revision}")
return # only running sql; our job is done
if not self.get_current_revision():
# New DB; initialize and exit
self.initdb()
return
command.upgrade(config, revision=to_revision or "heads")
[docs] def downgrade(self, to_revision, from_revision=None, show_sql_only=False):
if from_revision and not show_sql_only:
raise ValueError(
"`from_revision` can't be combined with `show_sql_only=False`. When actually "
"applying a downgrade (instead of just generating sql), we always "
"downgrade from current revision."
)
if not settings.SQL_ALCHEMY_CONN:
raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set.")
# alembic adds significant import time, so we import it lazily
from alembic import command
self.log.info("Attempting downgrade of FAB migration to revision %s", to_revision)
config = self.get_alembic_config()
if show_sql_only:
self.log.warning("Generating sql scripts for manual migration.")
if not from_revision:
from_revision = self.get_current_revision()
if from_revision is None:
self.log.info("No revision found")
return
revision_range = f"{from_revision}:{to_revision}"
_offline_migration(command.downgrade, config=config, revision=revision_range)
else:
self.log.info("Applying FAB downgrade migrations.")
command.downgrade(config, revision=to_revision, sql=show_sql_only)