airflow.providers.apache.hive.hooks.hive

Module Contents

Classes

HiveCliHook

Simple wrapper around the hive CLI.

HiveMetastoreHook

Wrapper to interact with the Hive Metastore.

HiveServer2Hook

Wrapper around the pyhive library.

Functions

get_context_from_env_var()

Extract context from env variable, (dag_id, task_id, etc) for use in BashOperator and PythonOperator.

Attributes

HIVE_QUEUE_PRIORITIES

airflow.providers.apache.hive.hooks.hive.HIVE_QUEUE_PRIORITIES = ['VERY_HIGH', 'HIGH', 'NORMAL', 'LOW', 'VERY_LOW'][source]
airflow.providers.apache.hive.hooks.hive.get_context_from_env_var()[source]

Extract context from env variable, (dag_id, task_id, etc) for use in BashOperator and PythonOperator.

Returns

The context of interest.

Return type

dict[Any, Any]

class airflow.providers.apache.hive.hooks.hive.HiveCliHook(hive_cli_conn_id=default_conn_name, mapred_queue=None, mapred_queue_priority=None, mapred_job_name=None, hive_cli_params='', auth=None, proxy_user=None)[source]

Bases: airflow.hooks.base.BaseHook

Simple wrapper around the hive CLI.

It also supports the beeline a lighter CLI that runs JDBC and is replacing the heavier traditional CLI. To enable beeline, set the use_beeline param in the extra field of your connection as in { "use_beeline": true }

Note that you can also set default hive CLI parameters by passing hive_cli_params space separated list of parameters to add to the hive command.

The extra connection parameter auth gets passed as in the jdbc connection string as is.

Parameters
  • hive_cli_conn_id (str) – Reference to the Hive CLI connection id.

  • mapred_queue (str | None) – queue used by the Hadoop Scheduler (Capacity or Fair)

  • mapred_queue_priority (str | None) – priority within the job queue. Possible settings include: VERY_HIGH, HIGH, NORMAL, LOW, VERY_LOW

  • mapred_job_name (str | None) – This name will appear in the jobtracker. This can make monitoring easier.

  • hive_cli_params (str) – Space separated list of hive command parameters to add to the hive command.

  • proxy_user (str | None) – Run HQL code as this user.

conn_name_attr = 'hive_cli_conn_id'[source]
default_conn_name = 'hive_cli_default'[source]
conn_type = 'hive_cli'[source]
hook_name = 'Hive Client Wrapper'[source]
classmethod get_connection_form_widgets()[source]

Return connection widgets to add to Hive Client Wrapper connection form.

classmethod get_ui_field_behaviour()[source]

Return custom UI field behaviour for Hive Client Wrapper connection.

run_cli(hql, schema=None, verbose=True, hive_conf=None)[source]

Run an hql statement using the hive cli.

If hive_conf is specified it should be a dict and the entries will be set as key/value pairs in HiveConf.

Parameters
  • hql (str) – an hql (hive query language) statement to run with hive cli

  • schema (str | None) – Name of hive schema (database) to use

  • verbose (bool) – Provides additional logging. Defaults to True.

  • hive_conf (dict[Any, Any] | None) – if specified these key value pairs will be passed to hive as -hiveconf "key"="value". Note that they will be passed after the hive_cli_params and thus will override whatever values are specified in the database.

>>> hh = HiveCliHook()
>>> result = hh.run_cli("USE airflow;")
>>> ("OK" in result)
True
test_hql(hql)[source]

Test an hql statement using the hive cli and EXPLAIN.

load_df(df, table, field_dict=None, delimiter=',', encoding='utf8', pandas_kwargs=None, **kwargs)[source]

Load a pandas DataFrame into hive.

Hive data types will be inferred if not passed but column names will not be sanitized.

Parameters
  • df (pandas.DataFrame) – DataFrame to load into a Hive table

  • table (str) – target Hive table, use dot notation to target a specific database

  • field_dict (dict[Any, Any] | None) – mapping from column name to hive data type. Note that Python dict is ordered so it keeps columns’ order.

  • delimiter (str) – field delimiter in the file

  • encoding (str) – str encoding to use when writing DataFrame to file

  • pandas_kwargs (Any) – passed to DataFrame.to_csv

  • kwargs (Any) – passed to self.load_file

load_file(filepath, table, delimiter=',', field_dict=None, create=True, overwrite=True, partition=None, recreate=False, tblproperties=None)[source]

Load a local file into Hive.

Note that the table generated in Hive uses STORED AS textfile which isn’t the most efficient serialization format. If a large amount of data is loaded and/or if the tables gets queried considerably, you may want to use this operator only to stage the data into a temporary table before loading it into its final destination using a HiveOperator.

Parameters
  • filepath (str) – local filepath of the file to load

  • table (str) – target Hive table, use dot notation to target a specific database

  • delimiter (str) – field delimiter in the file

  • field_dict (dict[Any, Any] | None) – A dictionary of the fields name in the file as keys and their Hive types as values. Note that Python dict is ordered so it keeps columns’ order.

  • create (bool) – whether to create the table if it doesn’t exist

  • overwrite (bool) – whether to overwrite the data in table or partition

  • partition (dict[str, Any] | None) – target partition as a dict of partition columns and values

  • recreate (bool) – whether to drop and recreate the table at every execution

  • tblproperties (dict[str, Any] | None) – TBLPROPERTIES of the hive table being created

kill()[source]

Kill Hive cli command.

class airflow.providers.apache.hive.hooks.hive.HiveMetastoreHook(metastore_conn_id=default_conn_name)[source]

Bases: airflow.hooks.base.BaseHook

Wrapper to interact with the Hive Metastore.

Parameters

metastore_conn_id (str) – reference to the :ref: metastore thrift service connection id <howto/connection:hive_metastore>.

MAX_PART_COUNT = 32767[source]
conn_name_attr = 'metastore_conn_id'[source]
default_conn_name = 'metastore_default'[source]
conn_type = 'hive_metastore'[source]
hook_name = 'Hive Metastore Thrift'[source]
__getstate__()[source]
__setstate__(d)[source]
get_metastore_client()[source]

Return a Hive thrift client.

get_conn()[source]

Return connection for the hook.

check_for_partition(schema, table, partition)[source]

Check whether a partition exists.

Parameters
  • schema (str) – Name of hive schema (database) @table belongs to

  • table (str) – Name of hive table @partition belongs to

  • partition (str) – Expression that matches the partitions to check for (e.g. a = ‘b’ AND c = ‘d’)

>>> hh = HiveMetastoreHook()
>>> t = "static_babynames_partitioned"
>>> hh.check_for_partition("airflow", t, "ds='2015-01-01'")
True
check_for_named_partition(schema, table, partition_name)[source]

Check whether a partition with a given name exists.

Parameters
  • schema (str) – Name of hive schema (database) @table belongs to

  • table (str) – Name of hive table @partition belongs to

  • partition_name (str) – Name of the partitions to check for (eg a=b/c=d)

>>> hh = HiveMetastoreHook()
>>> t = "static_babynames_partitioned"
>>> hh.check_for_named_partition("airflow", t, "ds=2015-01-01")
True
>>> hh.check_for_named_partition("airflow", t, "ds=xxx")
False
get_table(table_name, db='default')[source]

Get a metastore table object.

>>> hh = HiveMetastoreHook()
>>> t = hh.get_table(db="airflow", table_name="static_babynames")
>>> t.tableName
'static_babynames'
>>> [col.name for col in t.sd.cols]
['state', 'year', 'name', 'gender', 'num']
get_tables(db, pattern='*')[source]

Get a metastore table object.

get_databases(pattern='*')[source]

Get a metastore table object.

get_partitions(schema, table_name, partition_filter=None)[source]

Return a list of all partitions in a table.

Works only for tables with less than 32767 (java short max val). For subpartitioned table, the number might easily exceed this.

>>> hh = HiveMetastoreHook()
>>> t = "static_babynames_partitioned"
>>> parts = hh.get_partitions(schema="airflow", table_name=t)
>>> len(parts)
1
>>> parts
[{'ds': '2015-01-01'}]
max_partition(schema, table_name, field=None, filter_map=None)[source]

Return the maximum value for all partitions with given field in a table.

If only one partition key exist in the table, the key will be used as field. filter_map should be a partition_key:partition_value map and will be used to filter out partitions.

Parameters
  • schema (str) – schema name.

  • table_name (str) – table name.

  • field (str | None) – partition key to get max partition from.

  • filter_map (dict[Any, Any] | None) – partition_key:partition_value map used for partition filtering.

>>> hh = HiveMetastoreHook()
>>> filter_map = {'ds': '2015-01-01'}
>>> t = 'static_babynames_partitioned'
>>> hh.max_partition(schema='airflow',        ... table_name=t, field='ds', filter_map=filter_map)
'2015-01-01'
table_exists(table_name, db='default')[source]

Check if table exists.

>>> hh = HiveMetastoreHook()
>>> hh.table_exists(db="airflow", table_name="static_babynames")
True
>>> hh.table_exists(db="airflow", table_name="does_not_exist")
False
drop_partitions(table_name, part_vals, delete_data=False, db='default')[source]

Drop partitions from the given table matching the part_vals input.

Parameters
  • table_name – table name.

  • part_vals – list of partition specs.

  • delete_data – Setting to control if underlying data have to deleted in addition to dropping partitions.

  • db – Name of hive schema (database) @table belongs to

>>> hh = HiveMetastoreHook()
>>> hh.drop_partitions(db='airflow', table_name='static_babynames',
part_vals="['2020-05-01']")
True
class airflow.providers.apache.hive.hooks.hive.HiveServer2Hook(*args, schema=None, log_sql=True, **kwargs)[source]

Bases: airflow.providers.common.sql.hooks.sql.DbApiHook

Wrapper around the pyhive library.

Notes: * the default auth_mechanism is PLAIN, to override it you can specify it in the extra of your connection in the UI * the default for run_set_variable_statements is true, if you are using impala you may need to set it to false in the extra of your connection in the UI

Parameters
  • hiveserver2_conn_id – Reference to the :ref: Hive Server2 thrift service connection id <howto/connection:hiveserver2>.

  • schema (str | None) – Hive database name.

conn_name_attr = 'hiveserver2_conn_id'[source]
default_conn_name = 'hiveserver2_default'[source]
conn_type = 'hiveserver2'[source]
hook_name = 'Hive Server 2 Thrift'[source]
supports_autocommit = False[source]
get_conn(schema=None)[source]

Return a Hive connection object.

get_results(sql, schema='default', fetch_size=None, hive_conf=None)[source]

Get results of the provided hql in target schema.

Parameters
  • sql (str | list[str]) – hql to be executed.

  • schema (str) – target schema, default to ‘default’.

  • fetch_size (int | None) – max size of result to fetch.

  • hive_conf (Iterable | Mapping | None) – hive_conf to execute alone with the hql.

Returns

results of hql execution, dict with data (list of results) and header

Return type

dict[str, Any]

to_csv(sql, csv_filepath, schema='default', delimiter=',', lineterminator='\r\n', output_header=True, fetch_size=1000, hive_conf=None)[source]

Execute hql in target schema and write results to a csv file.

Parameters
  • sql (str) – hql to be executed.

  • csv_filepath (str) – filepath of csv to write results into.

  • schema (str) – target schema, default to ‘default’.

  • delimiter (str) – delimiter of the csv file, default to ‘,’.

  • lineterminator (str) – lineterminator of the csv file.

  • output_header (bool) – header of the csv file, default to True.

  • fetch_size (int) – number of result rows to write into the csv file, default to 1000.

  • hive_conf (dict[Any, Any] | None) – hive_conf to execute alone with the hql.

get_records(sql, parameters=None, **kwargs)[source]

Get a set of records from a Hive query; optionally pass a ‘schema’ kwarg to specify target schema.

Parameters
  • sql (str | list[str]) – hql to be executed.

  • parameters (Iterable | Mapping[str, Any] | None) – optional configuration passed to get_results

Returns

result of hive execution

Return type

Any

>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> len(hh.get_records(sql))
100
get_pandas_df(sql, schema='default', hive_conf=None, **kwargs)[source]

Get a pandas dataframe from a Hive query.

Parameters
  • sql (str) – hql to be executed.

  • schema (str) – target schema, default to ‘default’.

  • hive_conf (dict[Any, Any] | None) – hive_conf to execute alone with the hql.

  • kwargs – (optional) passed into pandas.DataFrame constructor

Returns

result of hive execution

Return type

pandas.DataFrame

>>> hh = HiveServer2Hook()
>>> sql = "SELECT * FROM airflow.static_babynames LIMIT 100"
>>> df = hh.get_pandas_df(sql)
>>> len(df.index)
100
Returns

pandas.DateFrame

Return type

pandas.DataFrame

Was this entry helpful?