Source code for airflow.providers.amazon.aws.hooks.mwaa
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains AWS MWAA hook."""
from __future__ import annotations
from botocore.exceptions import ClientError
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
[docs]
class MwaaHook(AwsBaseHook):
"""
Interact with AWS Manager Workflows for Apache Airflow.
Provide thin wrapper around :external+boto3:py:class:`boto3.client("mwaa") <MWAA.Client>`
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
- :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
"""
def __init__(self, *args, **kwargs) -> None:
kwargs["client_type"] = "mwaa"
super().__init__(*args, **kwargs)
[docs]
def invoke_rest_api(
self,
env_name: str,
path: str,
method: str,
body: dict | None = None,
query_params: dict | None = None,
) -> dict:
"""
Invoke the REST API on the Airflow webserver with the specified inputs.
.. seealso::
- :external+boto3:py:meth:`MWAA.Client.invoke_rest_api`
:param env_name: name of the MWAA environment
:param path: Apache Airflow REST API endpoint path to be called
:param method: HTTP method used for making Airflow REST API calls
:param body: Request body for the Apache Airflow REST API call
:param query_params: Query parameters to be included in the Apache Airflow REST API call
"""
body = body or {}
api_kwargs = {
"Name": env_name,
"Path": path,
"Method": method,
# Filter out keys with None values because Airflow REST API doesn't accept requests otherwise
"Body": {k: v for k, v in body.items() if v is not None},
"QueryParameters": query_params if query_params else {},
}
try:
result = self.conn.invoke_rest_api(**api_kwargs)
# ResponseMetadata is removed because it contains data that is either very unlikely to be useful
# in XComs and logs, or redundant given the data already included in the response
result.pop("ResponseMetadata", None)
return result
except ClientError as e:
to_log = e.response
# ResponseMetadata and Error are removed because they contain data that is either very unlikely to
# be useful in XComs and logs, or redundant given the data already included in the response
to_log.pop("ResponseMetadata", None)
to_log.pop("Error", None)
self.log.error(to_log)
raise e