Source code for airflow.providers.apache.spark.hooks.spark_submit
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportcontextlibimportosimportreimportsubprocessimporttimefromtypingimportAny,Iteratorfromairflow.configurationimportconfasairflow_conffromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.security.kerberosimportrenew_from_ktfromairflow.utils.log.logging_mixinimportLoggingMixinwithcontextlib.suppress(ImportError,NameError):fromairflow.kubernetesimportkube_client
[docs]classSparkSubmitHook(BaseHook,LoggingMixin):""" This hook is a wrapper around the spark-submit binary to kick off a spark-submit job. It requires that the "spark-submit" binary is in the PATH. :param conf: Arbitrary Spark configuration properties :param spark_conn_id: The :ref:`spark connection id <howto/connection:spark>` as configured in Airflow administration. When an invalid connection_id is supplied, it will default to yarn. :param files: Upload additional files to the executor running the job, separated by a comma. Files will be placed in the working directory of each executor. For example, serialized objects. :param py_files: Additional python files used by the job, can be .zip, .egg or .py. :param archives: Archives that spark should unzip (and possibly tag with #ALIAS) into the application working directory. :param driver_class_path: Additional, driver-specific, classpath settings. :param jars: Submit additional jars to upload and place them in executor classpath. :param java_class: the main class of the Java application :param packages: Comma-separated list of maven coordinates of jars to include on the driver and executor classpaths :param exclude_packages: Comma-separated list of maven coordinates of jars to exclude while resolving the dependencies provided in 'packages' :param repositories: Comma-separated list of additional remote repositories to search for the maven coordinates given with 'packages' :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker) :param executor_cores: (Standalone, YARN and Kubernetes only) Number of cores per executor (Default: 2) :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G) :param driver_memory: Memory allocated to the driver (e.g. 1000M, 2G) (Default: 1G) :param keytab: Full path to the file that contains the keytab :param principal: The name of the kerberos principal used for keytab :param proxy_user: User to impersonate when submitting the application :param name: Name of the job (default airflow-spark) :param num_executors: Number of executors to launch :param status_poll_interval: Seconds to wait between polls of driver status in cluster mode (Default: 1) :param application_args: Arguments for the application being submitted :param env_vars: Environment variables for spark-submit. It supports yarn and k8s mode too. :param verbose: Whether to pass the verbose flag to spark-submit process for debugging :param spark_binary: The command to use for spark submit. Some distros may use spark2-submit. """
[docs]defget_ui_field_behaviour()->dict[str,Any]:"""Returns custom field behaviour"""return{"hidden_fields":["schema","login","password"],"relabeling":{},
}def__init__(self,conf:dict[str,Any]|None=None,conn_id:str="spark_default",files:str|None=None,py_files:str|None=None,archives:str|None=None,driver_class_path:str|None=None,jars:str|None=None,java_class:str|None=None,packages:str|None=None,exclude_packages:str|None=None,repositories:str|None=None,total_executor_cores:int|None=None,executor_cores:int|None=None,executor_memory:str|None=None,driver_memory:str|None=None,keytab:str|None=None,principal:str|None=None,proxy_user:str|None=None,name:str="default-name",num_executors:int|None=None,status_poll_interval:int=1,application_args:list[Any]|None=None,env_vars:dict[str,Any]|None=None,verbose:bool=False,spark_binary:str|None=None,)->None:super().__init__()self._conf=confor{}self._conn_id=conn_idself._files=filesself._py_files=py_filesself._archives=archivesself._driver_class_path=driver_class_pathself._jars=jarsself._java_class=java_classself._packages=packagesself._exclude_packages=exclude_packagesself._repositories=repositoriesself._total_executor_cores=total_executor_coresself._executor_cores=executor_coresself._executor_memory=executor_memoryself._driver_memory=driver_memoryself._keytab=keytabself._principal=principalself._proxy_user=proxy_userself._name=nameself._num_executors=num_executorsself._status_poll_interval=status_poll_intervalself._application_args=application_argsself._env_vars=env_varsself._verbose=verboseself._submit_sp:Any|None=Noneself._yarn_application_id:str|None=Noneself._kubernetes_driver_pod:str|None=Noneself._spark_binary=spark_binaryifself._spark_binaryisnotNoneandself._spark_binarynotinALLOWED_SPARK_BINARIES:raiseRuntimeError(f"The spark-binary extra can be on of {ALLOWED_SPARK_BINARIES} and it"f" was `{spark_binary}`. Please make sure your spark binary is one of the"f" allowed ones and that it is available on the PATH")self._connection=self._resolve_connection()self._is_yarn="yarn"inself._connection["master"]self._is_kubernetes="k8s"inself._connection["master"]ifself._is_kubernetesandkube_clientisNone:raiseRuntimeError(f"{self._connection['master']} specified by kubernetes dependencies are not installed!")self._should_track_driver_status=self._resolve_should_track_driver_status()self._driver_id:str|None=Noneself._driver_status:str|None=Noneself._spark_exit_code:int|None=Noneself._env:dict[str,Any]|None=Nonedef_resolve_should_track_driver_status(self)->bool:""" Determines whether this hook should poll the spark driver status through subsequent spark-submit status requests after the initial spark-submit request :return: if the driver status should be tracked """return"spark://"inself._connection["master"]andself._connection["deploy_mode"]=="cluster"def_resolve_connection(self)->dict[str,Any]:# Build from connection master or default to yarn if not availableconn_data={"master":"yarn","queue":None,"deploy_mode":None,"spark_binary":self._spark_binaryor"spark-submit","namespace":None,}try:# Master can be local, yarn, spark://HOST:PORT, mesos://HOST:PORT and# k8s://https://<HOST>:<PORT>conn=self.get_connection(self._conn_id)ifconn.port:conn_data["master"]=f"{conn.host}:{conn.port}"else:conn_data["master"]=conn.host# Determine optional yarn queue from the extra fieldextra=conn.extra_dejsonconn_data["queue"]=extra.get("queue")conn_data["deploy_mode"]=extra.get("deploy-mode")spark_binary=self._spark_binaryorextra.get("spark-binary","spark-submit")ifspark_binarynotinALLOWED_SPARK_BINARIES:raiseRuntimeError(f"The `spark-binary` extra can be on of {ALLOWED_SPARK_BINARIES} and it"f" was `{spark_binary}`. Please make sure your spark binary is one of the"" allowed ones and that it is available on the PATH")conn_spark_home=extra.get("spark-home")ifconn_spark_home:raiseRuntimeError("The `spark-home` extra is not allowed any more. Please make sure your `spark-submit` or"" `spark2-submit` are available on the PATH.")conn_data["spark_binary"]=spark_binaryconn_data["namespace"]=extra.get("namespace")exceptAirflowException:self.log.info("Could not load connection string %s, defaulting to %s",self._conn_id,conn_data["master"])if"spark.kubernetes.namespace"inself._conf:conn_data["namespace"]=self._conf["spark.kubernetes.namespace"]returnconn_data
def_get_spark_binary_path(self)->list[str]:# Assume that spark-submit is present in the path to the executing userreturn[self._connection["spark_binary"]]def_mask_cmd(self,connection_cmd:str|list[str])->str:# Mask any password related fields in application args with key value pair# where key contains password (case insensitive), e.g. HivePassword='abc'connection_cmd_masked=re.sub(r"("r"\S*?"# Match all non-whitespace characters before...r"(?:secret|password)"# ...literally a "secret" or "password"# word (not capturing them).r"\S*?"# All non-whitespace characters before either...r"(?:=|\s+)"# ...an equal sign or whitespace characters# (not capturing them).r"(['\"]?)"# An optional single or double quote.r")"# This is the end of the first capturing group.r"(?:(?!\2\s).)*"# All characters between optional quotes# (matched above); if the value is quoted,# it may contain whitespace.r"(\2)",# Optional matching quote.r"\1******\3"," ".join(connection_cmd),flags=re.I,)returnconnection_cmd_maskeddef_build_spark_submit_command(self,application:str)->list[str]:""" Construct the spark-submit command to execute. :param application: command to append to the spark-submit command :return: full command to be executed """connection_cmd=self._get_spark_binary_path()# The url of the spark masterconnection_cmd+=["--master",self._connection["master"]]forkeyinself._conf:connection_cmd+=["--conf",f"{key}={str(self._conf[key])}"]ifself._env_varsand(self._is_kubernetesorself._is_yarn):ifself._is_yarn:tmpl="spark.yarn.appMasterEnv.{}={}"# Allow dynamic setting of hadoop/yarn configuration environmentsself._env=self._env_varselse:tmpl="spark.kubernetes.driverEnv.{}={}"forkeyinself._env_vars:connection_cmd+=["--conf",tmpl.format(key,str(self._env_vars[key]))]elifself._env_varsandself._connection["deploy_mode"]!="cluster":self._env=self._env_vars# Do it on Popen of the processelifself._env_varsandself._connection["deploy_mode"]=="cluster":raiseAirflowException("SparkSubmitHook env_vars is not supported in standalone-cluster mode.")ifself._is_kubernetesandself._connection["namespace"]:connection_cmd+=["--conf",f"spark.kubernetes.namespace={self._connection['namespace']}",]ifself._files:connection_cmd+=["--files",self._files]ifself._py_files:connection_cmd+=["--py-files",self._py_files]ifself._archives:connection_cmd+=["--archives",self._archives]ifself._driver_class_path:connection_cmd+=["--driver-class-path",self._driver_class_path]ifself._jars:connection_cmd+=["--jars",self._jars]ifself._packages:connection_cmd+=["--packages",self._packages]ifself._exclude_packages:connection_cmd+=["--exclude-packages",self._exclude_packages]ifself._repositories:connection_cmd+=["--repositories",self._repositories]ifself._num_executors:connection_cmd+=["--num-executors",str(self._num_executors)]ifself._total_executor_cores:connection_cmd+=["--total-executor-cores",str(self._total_executor_cores)]ifself._executor_cores:connection_cmd+=["--executor-cores",str(self._executor_cores)]ifself._executor_memory:connection_cmd+=["--executor-memory",self._executor_memory]ifself._driver_memory:connection_cmd+=["--driver-memory",self._driver_memory]ifself._keytab:connection_cmd+=["--keytab",self._keytab]ifself._principal:connection_cmd+=["--principal",self._principal]ifself._proxy_user:connection_cmd+=["--proxy-user",self._proxy_user]ifself._name:connection_cmd+=["--name",self._name]ifself._java_class:connection_cmd+=["--class",self._java_class]ifself._verbose:connection_cmd+=["--verbose"]ifself._connection["queue"]:connection_cmd+=["--queue",self._connection["queue"]]ifself._connection["deploy_mode"]:connection_cmd+=["--deploy-mode",self._connection["deploy_mode"]]# The actual script to executeconnection_cmd+=[application]# Append any application argumentsifself._application_args:connection_cmd+=self._application_argsself.log.info("Spark-Submit cmd: %s",self._mask_cmd(connection_cmd))returnconnection_cmddef_build_track_driver_status_command(self)->list[str]:""" Construct the command to poll the driver status. :return: full command to be executed """curl_max_wait_time=30spark_host=self._connection["master"]ifspark_host.endswith(":6066"):spark_host=spark_host.replace("spark://","http://")connection_cmd=["/usr/bin/curl","--max-time",str(curl_max_wait_time),f"{spark_host}/v1/submissions/status/{self._driver_id}",]self.log.info(connection_cmd)# The driver id so we can poll for its statusifnotself._driver_id:raiseAirflowException("Invalid status: attempted to poll driver status but no driver id is known. Giving up.")else:connection_cmd=self._get_spark_binary_path()# The url to the spark masterconnection_cmd+=["--master",self._connection["master"]]# The driver id so we can poll for its statusifself._driver_id:connection_cmd+=["--status",self._driver_id]else:raiseAirflowException("Invalid status: attempted to poll driver status but no driver id is known. Giving up.")self.log.debug("Poll driver status cmd: %s",connection_cmd)returnconnection_cmd
[docs]defsubmit(self,application:str="",**kwargs:Any)->None:""" Remote Popen to execute the spark-submit job :param application: Submitted application, jar or py file :param kwargs: extra arguments to Popen (see subprocess.Popen) """spark_submit_cmd=self._build_spark_submit_command(application)ifself._env:env=os.environ.copy()env.update(self._env)kwargs["env"]=envself._submit_sp=subprocess.Popen(spark_submit_cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,bufsize=-1,universal_newlines=True,**kwargs,)self._process_spark_submit_log(iter(self._submit_sp.stdout))# type: ignorereturncode=self._submit_sp.wait()# Check spark-submit return code. In Kubernetes mode, also check the value# of exit code in the log, as it may differ.ifreturncodeor(self._is_kubernetesandself._spark_exit_code!=0):ifself._is_kubernetes:raiseAirflowException(f"Cannot execute: {self._mask_cmd(spark_submit_cmd)}. Error code is: {returncode}. "f"Kubernetes spark exit code is: {self._spark_exit_code}")else:raiseAirflowException(f"Cannot execute: {self._mask_cmd(spark_submit_cmd)}. Error code is: {returncode}.")self.log.debug("Should track driver: %s",self._should_track_driver_status)# We want the Airflow job to wait until the Spark driver is finishedifself._should_track_driver_status:ifself._driver_idisNone:raiseAirflowException("No driver id is known: something went wrong when executing the spark submit command")# We start with the SUBMITTED status as initial statusself._driver_status="SUBMITTED"# Start tracking the driver status (blocking function)self._start_driver_status_tracking()ifself._driver_status!="FINISHED":raiseAirflowException(f"ERROR : Driver {self._driver_id} badly exited with status {self._driver_status}"
)def_process_spark_submit_log(self,itr:Iterator[Any])->None:""" Processes the log files and extracts useful information out of it. If the deploy-mode is 'client', log the output of the submit command as those are the output logs of the Spark worker directly. Remark: If the driver needs to be tracked for its status, the log-level of the spark deploy needs to be at least INFO (log4j.logger.org.apache.spark.deploy=INFO) :param itr: An iterator which iterates over the input of the subprocess """# Consume the iteratorforlineinitr:line=line.strip()# If we run yarn cluster mode, we want to extract the application id from# the logs so we can kill the application when we stop it unexpectedlyifself._is_yarnandself._connection["deploy_mode"]=="cluster":match=re.search("(application[0-9_]+)",line)ifmatch:self._yarn_application_id=match.groups()[0]self.log.info("Identified spark driver id: %s",self._yarn_application_id)# If we run Kubernetes cluster mode, we want to extract the driver pod id# from the logs so we can kill the application when we stop it unexpectedlyelifself._is_kubernetes:match=re.search(r"\s*pod name: ((.+?)-([a-z0-9]+)-driver)",line)ifmatch:self._kubernetes_driver_pod=match.groups()[0]self.log.info("Identified spark driver pod: %s",self._kubernetes_driver_pod)# Store the Spark Exit codematch_exit_code=re.search(r"\s*[eE]xit code: (\d+)",line)ifmatch_exit_code:self._spark_exit_code=int(match_exit_code.groups()[0])# if we run in standalone cluster mode and we want to track the driver status# we need to extract the driver id from the logs. This allows us to poll for# the status using the driver id. Also, we can kill the driver when needed.elifself._should_track_driver_statusandnotself._driver_id:match_driver_id=re.search(r"(driver-[0-9\-]+)",line)ifmatch_driver_id:self._driver_id=match_driver_id.groups()[0]self.log.info("identified spark driver id: %s",self._driver_id)self.log.info(line)def_process_spark_status_log(self,itr:Iterator[Any])->None:""" Parses the logs of the spark driver status query process :param itr: An iterator which iterates over the input of the subprocess """driver_found=Falsevalid_response=False# Consume the iteratorforlineinitr:line=line.strip()# A valid Spark status response should contain a submissionIdif"submissionId"inline:valid_response=True# Check if the log line is about the driver status and extract the status.if"driverState"inline:self._driver_status=line.split(" : ")[1].replace(",","").replace('"',"").strip()driver_found=Trueself.log.debug("spark driver status log: %s",line)ifvalid_responseandnotdriver_found:self._driver_status="UNKNOWN"def_start_driver_status_tracking(self)->None:""" Polls the driver based on self._driver_id to get the status. Finish successfully when the status is FINISHED. Finish failed when the status is ERROR/UNKNOWN/KILLED/FAILED. Possible status: SUBMITTED Submitted but not yet scheduled on a worker RUNNING Has been allocated to a worker to run FINISHED Previously ran and exited cleanly RELAUNCHING Exited non-zero or due to worker failure, but has not yet started running again UNKNOWN The status of the driver is temporarily not known due to master failure recovery KILLED A user manually killed this driver FAILED The driver exited non-zero and was not supervised ERROR Unable to run or restart due to an unrecoverable error (e.g. missing jar file) """# When your Spark Standalone cluster is not performing well# due to misconfiguration or heavy loads.# it is possible that the polling request will timeout.# Therefore we use a simple retry mechanism.missed_job_status_reports=0max_missed_job_status_reports=10# Keep polling as long as the driver is processingwhileself._driver_statusnotin["FINISHED","UNKNOWN","KILLED","FAILED","ERROR"]:# Sleep for n seconds as we do not want to spam the clustertime.sleep(self._status_poll_interval)self.log.debug("polling status of spark driver with id %s",self._driver_id)poll_drive_status_cmd=self._build_track_driver_status_command()status_process:Any=subprocess.Popen(poll_drive_status_cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,bufsize=-1,universal_newlines=True,)self._process_spark_status_log(iter(status_process.stdout))returncode=status_process.wait()ifreturncode:ifmissed_job_status_reports<max_missed_job_status_reports:missed_job_status_reports+=1else:raiseAirflowException(f"Failed to poll for the driver status {max_missed_job_status_reports} times: "f"returncode = {returncode}")def_build_spark_driver_kill_command(self)->list[str]:""" Construct the spark-submit command to kill a driver. :return: full command to kill a driver """# Assume that spark-submit is present in the path to the executing userconnection_cmd=[self._connection["spark_binary"]]# The url to the spark masterconnection_cmd+=["--master",self._connection["master"]]# The actual kill commandifself._driver_id:connection_cmd+=["--kill",self._driver_id]self.log.debug("Spark-Kill cmd: %s",connection_cmd)returnconnection_cmd
[docs]defon_kill(self)->None:"""Kill Spark submit command"""self.log.debug("Kill Command is being called")ifself._should_track_driver_statusandself._driver_id:self.log.info("Killing driver %s on cluster",self._driver_id)kill_cmd=self._build_spark_driver_kill_command()withsubprocess.Popen(kill_cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE)asdriver_kill:self.log.info("Spark driver %s killed with return code: %s",self._driver_id,driver_kill.wait())ifself._submit_spandself._submit_sp.poll()isNone:self.log.info("Sending kill signal to %s",self._connection["spark_binary"])self._submit_sp.kill()ifself._yarn_application_id:kill_cmd=f"yarn application -kill {self._yarn_application_id}".split()env={**os.environ,**(self._envor{})}ifself._keytabisnotNoneandself._principalisnotNone:# we are ignoring renewal failures from renew_from_kt# here as the failure could just be due to a non-renewable ticket,# we still attempt to kill the yarn applicationrenew_from_kt(self._principal,self._keytab,exit_on_fail=False)env=os.environ.copy()ccacche=airflow_conf.get_mandatory_value("kerberos","ccache")env["KRB5CCNAME"]=ccacchewithsubprocess.Popen(kill_cmd,env=env,stdout=subprocess.PIPE,stderr=subprocess.PIPE)asyarn_kill:self.log.info("YARN app killed with return code: %s",yarn_kill.wait())ifself._kubernetes_driver_pod:self.log.info("Killing pod %s on Kubernetes",self._kubernetes_driver_pod)# Currently only instantiate Kubernetes client for killing a spark pod.try:importkubernetesclient=kube_client.get_kube_client()api_response=client.delete_namespaced_pod(self._kubernetes_driver_pod,self._connection["namespace"],body=kubernetes.client.V1DeleteOptions(),pretty=True,)self.log.info("Spark on K8s killed with response: %s",api_response)exceptkube_client.ApiException:self.log.exception("Exception when attempting to kill Spark on K8s")