Source code for airflow.providers.apache.spark.hooks.spark_sql
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportsubprocessfromtypingimportTYPE_CHECKING,Anyfromairflow.exceptionsimportAirflowException,AirflowNotFoundExceptionfromairflow.hooks.baseimportBaseHookifTYPE_CHECKING:fromairflow.models.connectionimportConnection
[docs]classSparkSqlHook(BaseHook):""" This hook is a wrapper around the spark-sql binary; requires the "spark-sql" binary to be in the PATH. :param sql: The SQL query to execute :param conf: arbitrary Spark configuration property :param conn_id: connection_id string :param total_executor_cores: (Standalone & Mesos only) Total cores for all executors (Default: all the available cores on the worker) :param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2) :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G) :param keytab: Full path to the file that contains the keytab :param master: spark://host:port, mesos://host:port, yarn, or local (Default: The ``host`` and ``port`` set in the Connection, or ``"yarn"``) :param name: Name of the job. :param num_executors: Number of executors to launch :param verbose: Whether to pass the verbose flag to spark-sql :param yarn_queue: The YARN queue to submit to (Default: The ``queue`` value set in the Connection, or ``"default"``) """
[docs]defget_ui_field_behaviour(cls)->dict[str,Any]:"""Return custom UI field behaviour for Spark SQL connection."""return{"hidden_fields":["schema","login","password","extra"],"relabeling":{},}
@classmethod
[docs]defget_connection_form_widgets(cls)->dict[str,Any]:"""Return connection widgets to add to Spark SQL connection form."""fromflask_appbuilder.fieldwidgetsimportBS3TextFieldWidgetfromflask_babelimportlazy_gettextfromwtformsimportStringFieldfromwtforms.validatorsimportOptionalreturn{"queue":StringField(lazy_gettext("YARN queue"),widget=BS3TextFieldWidget(),description="Default YARN queue to use",validators=[Optional()],)}
def__init__(self,sql:str,conf:str|None=None,conn_id:str=default_conn_name,total_executor_cores:int|None=None,executor_cores:int|None=None,executor_memory:str|None=None,keytab:str|None=None,principal:str|None=None,master:str|None=None,name:str="default-name",num_executors:int|None=None,verbose:bool=True,yarn_queue:str|None=None,)->None:super().__init__()options:dict={}conn:Connection|None=Nonetry:conn=self.get_connection(conn_id)exceptAirflowNotFoundException:conn=Noneifconn:options=conn.extra_dejson# Set arguments to values set in Connection if not explicitly provided.ifmasterisNone:ifconnisNone:master="yarn"elifconn.port:master=f"{conn.host}:{conn.port}"else:master=conn.hostifyarn_queueisNone:yarn_queue=options.get("queue","default")self._sql=sqlself._conf=confself._total_executor_cores=total_executor_coresself._executor_cores=executor_coresself._executor_memory=executor_memoryself._keytab=keytabself._principal=principalself._master=masterself._name=nameself._num_executors=num_executorsself._verbose=verboseself._yarn_queue=yarn_queueself._sp:Any=None
def_prepare_command(self,cmd:str|list[str])->list[str]:""" Construct the spark-sql command to execute. Verbose output is enabled as default. :param cmd: command to append to the spark-sql command :return: full command to be executed """connection_cmd=["spark-sql"]ifself._conf:forconf_elinself._conf.split(","):connection_cmd+=["--conf",conf_el]ifself._total_executor_cores:connection_cmd+=["--total-executor-cores",str(self._total_executor_cores)]ifself._executor_cores:connection_cmd+=["--executor-cores",str(self._executor_cores)]ifself._executor_memory:connection_cmd+=["--executor-memory",self._executor_memory]ifself._keytab:connection_cmd+=["--keytab",self._keytab]ifself._principal:connection_cmd+=["--principal",self._principal]ifself._num_executors:connection_cmd+=["--num-executors",str(self._num_executors)]ifself._sql:sql=self._sql.strip()ifsql.endswith((".sql",".hql")):connection_cmd+=["-f",sql]else:connection_cmd+=["-e",sql]ifself._master:connection_cmd+=["--master",self._master]ifself._name:connection_cmd+=["--name",self._name]ifself._verbose:connection_cmd+=["--verbose"]ifself._yarn_queue:connection_cmd+=["--queue",self._yarn_queue]ifisinstance(cmd,str):connection_cmd+=cmd.split()elifisinstance(cmd,list):connection_cmd+=cmdelse:raiseAirflowException(f"Invalid additional command: {cmd}")self.log.debug("Spark-Sql cmd: %s",connection_cmd)returnconnection_cmd
[docs]defrun_query(self,cmd:str="",**kwargs:Any)->None:""" Remote Popen (actually execute the Spark-sql query). :param cmd: command to append to the spark-sql command :param kwargs: extra arguments to Popen (see subprocess.Popen) """spark_sql_cmd=self._prepare_command(cmd)self._sp=subprocess.Popen(spark_sql_cmd,stdout=subprocess.PIPE,stderr=subprocess.STDOUT,universal_newlines=True,**kwargs)forlineiniter(self._sp.stdout):# type: ignoreself.log.info(line)returncode=self._sp.wait()ifreturncode:raiseAirflowException(f"Cannot execute '{self._sql}' on {self._master} (additional parameters: '{cmd}'). "f"Process exit code: {returncode}.")
[docs]defkill(self)->None:"""Kill Spark job."""ifself._spandself._sp.poll()isNone:self.log.info("Killing the Spark-Sql job")self._sp.kill()