Source code for airflow.providers.redis.log.redis_task_handler
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import logging
from functools import cached_property
from typing import TYPE_CHECKING, Any
from airflow.configuration import conf
from airflow.providers.redis.hooks.redis import RedisHook
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import LoggingMixin
if TYPE_CHECKING:
from redis import Redis
from airflow.models import TaskInstance
[docs]class RedisTaskHandler(FileTaskHandler, LoggingMixin):
"""
RedisTaskHandler is a Python log handler that handles and reads task instance logs.
It extends airflow FileTaskHandler and uploads to and reads from Redis.
:param base_log_folder:
base folder to store logs locally
:param max_lines:
Maximum number of lines of log to store
If omitted, this is 10000.
:param ttl_seconds:
Maximum number of seconds to store logs
If omitted, this is the equivalent of 28 days.
:param conn_id:
Airflow connection ID for the Redis hook to use
If omitted or None, the ID specified in the option logging.remote_log_conn_id is used.
"""
[docs] trigger_should_wrap = True
def __init__(
self,
base_log_folder: str,
max_lines: int = 10000,
ttl_seconds: int = 60 * 60 * 24 * 28,
conn_id: str | None = None,
):
super().__init__(base_log_folder)
self.handler: _RedisHandler | None = None
self.max_lines = max_lines
self.ttl_seconds = ttl_seconds
self.conn_id = conn_id or conf.get("logging", "REMOTE_LOG_CONN_ID")
@cached_property
[docs] def conn(self):
return RedisHook(redis_conn_id=self.conn_id).get_conn()
def _read(
self,
ti: TaskInstance,
try_number: int,
metadata: dict[str, Any] | None = None,
):
log_str = b"\n".join(
self.conn.lrange(self._render_filename(ti, try_number), start=0, end=-1)
).decode()
return log_str, {"end_of_log": True}
[docs] def set_context(self, ti: TaskInstance, **kwargs) -> None:
super().set_context(ti)
self.handler = _RedisHandler(
self.conn,
key=self._render_filename(ti, ti.try_number),
max_lines=self.max_lines,
ttl_seconds=self.ttl_seconds,
)
self.handler.setFormatter(self.formatter)
class _RedisHandler(logging.Handler):
def __init__(self, conn: Redis, key: str, max_lines: int, ttl_seconds: int):
super().__init__()
self.conn = conn
self.key = key
self.max_lines = max_lines
self.ttl_seconds = ttl_seconds
def emit(self, record):
p = self.conn.pipeline()
p.rpush(self.key, self.format(record))
p.ltrim(self.key, start=-self.max_lines, end=-1)
p.expire(self.key, time=self.ttl_seconds)
p.execute()