# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportcontextlibimportosimportsignalfromcollectionsimportnamedtuplefromsubprocessimportPIPE,STDOUT,PopenfromtempfileimportTemporaryDirectory,gettempdirfromairflow.hooks.baseimportBaseHook
[docs]classSubprocessHook(BaseHook):"""Hook for running processes with the ``subprocess`` module."""def__init__(self)->None:self.sub_process:Popen[bytes]|None=Nonesuper().__init__()
[docs]defrun_command(self,command:list[str],env:dict[str,str]|None=None,output_encoding:str="utf-8",cwd:str|None=None,)->SubprocessResult:""" Execute the command. If ``cwd`` is None, execute the command in a temporary directory which will be cleaned afterwards. If ``env`` is not supplied, ``os.environ`` is passed :param command: the command to run :param env: Optional dict containing environment variables to be made available to the shell environment in which ``command`` will be executed. If omitted, ``os.environ`` will be used. Note, that in case you have Sentry configured, original variables from the environment will also be passed to the subprocess with ``SUBPROCESS_`` prefix. See :doc:`/administration-and-deployment/logging-monitoring/errors` for details. :param output_encoding: encoding to use for decoding stdout :param cwd: Working directory to run the command in. If None (default), the command is run in a temporary directory. :return: :class:`namedtuple` containing ``exit_code`` and ``output``, the last line from stderr or stdout """self.log.info("Tmp dir root location: \n%s",gettempdir())withcontextlib.ExitStack()asstack:ifcwdisNone:cwd=stack.enter_context(TemporaryDirectory(prefix="airflowtmp"))defpre_exec():# Restore default signal disposition and invoke setsidforsigin("SIGPIPE","SIGXFZ","SIGXFSZ"):ifhasattr(signal,sig):signal.signal(getattr(signal,sig),signal.SIG_DFL)os.setsid()self.log.info("Running command: %s",command)self.sub_process=Popen(command,stdout=PIPE,stderr=STDOUT,cwd=cwd,env=envifenvorenv=={}elseos.environ,preexec_fn=pre_exec,)self.log.info("Output:")line=""ifself.sub_processisNone:raiseRuntimeError("The subprocess should be created here and is None!")ifself.sub_process.stdoutisnotNone:forraw_lineiniter(self.sub_process.stdout.readline,b""):line=raw_line.decode(output_encoding,errors="backslashreplace").rstrip()self.log.info("%s",line)self.sub_process.wait()self.log.info("Command exited with return code %s",self.sub_process.returncode)return_code:int=self.sub_process.returncodereturnSubprocessResult(exit_code=return_code,output=line)
[docs]defsend_sigterm(self):"""Sends SIGTERM signal to ``self.sub_process`` if one exists."""self.log.info("Sending SIGTERM signal to process group")ifself.sub_processandhasattr(self.sub_process,"pid"):os.killpg(os.getpgid(self.sub_process.pid),signal.SIGTERM)