Qubole¶
Qubole is an open, simple, and secure data lake platform for machine learning, streaming and adhoc analytics. Qubole delivers a Self-Service Platform for Big Data Analytics built on Amazon Web Services, Microsoft and Google Clouds.
Airflow provides operators to execute tasks (commands) on QDS and perform checks against Qubole Commands. Also, there are provided sensors that waits for a file, folder or partition to be present in cloud storage and check for its presence via QDS APIs
Execute tasks¶
To run following commands use
QuboleOperator
.
Run Hive command¶
To run query that shows all tables you can use
hive_show_table = QuboleOperator(
task_id="hive_show_table",
command_type="hivecmd",
query="show tables",
cluster_label="{{ params.cluster_label }}",
fetch_logs=True,
# If `fetch_logs`=true, will fetch qubole command logs and concatenate
# them into corresponding airflow task logs
tags="airflow_example_run",
# To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_id
params={
"cluster_label": "default",
},
)
Also you can run script that locates in the bucket by passing path to query file
hive_s3_location = QuboleOperator(
task_id="hive_s3_location",
command_type="hivecmd",
script_location="s3n://public-qubole/qbol-library/scripts/show_table.hql",
notify=True,
tags=["tag1", "tag2"],
# If the script at s3 location has any qubole specific macros to be replaced
# macros='[{"date": "{{ ds }}"}, {"name" : "abc"}]',
)
Run Hadoop command¶
To run jar file in your Hadoop cluster use
hadoop_jar_cmd = QuboleOperator(
task_id="hadoop_jar_cmd",
command_type="hadoopcmd",
sub_command="jar s3://paid-qubole/HadoopAPIExamples/"
"jars/hadoop-0.20.1-dev-streaming.jar "
"-mapper wc "
"-numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/"
"data/3.tsv -output "
"s3://paid-qubole/HadoopAPITests/data/3_wc",
cluster_label="{{ params.cluster_label }}",
fetch_logs=True,
params={
"cluster_label": "default",
},
)
Run Pig command¶
To run script in Pig Latin in your Hadoop cluster use
pig_cmd = QuboleOperator(
task_id="pig_cmd",
command_type="pigcmd",
script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
parameters="key1=value1 key2=value2",
)
Run Shell command¶
To run Shell-script use
shell_cmd = QuboleOperator(
task_id="shell_cmd",
command_type="shellcmd",
script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
parameters="param1 param2",
)
Run Presto command¶
To run query using Presto use
presto_cmd = QuboleOperator(task_id="presto_cmd", command_type="prestocmd", query="show tables")
Run DB commands¶
To run query as DbTap use
db_query = QuboleOperator(
task_id="db_query", command_type="dbtapquerycmd", query="show tables", db_tap_id=2064
)
To run DB export command use
db_export = QuboleOperator(
task_id="db_export",
command_type="dbexportcmd",
mode=1,
hive_table="default_qubole_airline_origin_destination",
db_table="exported_airline_origin_destination",
partition_spec="dt=20110104-02",
dbtap_id=2064,
)
To run DB import command use
db_import = QuboleOperator(
task_id="db_import",
command_type="dbimportcmd",
mode=1,
hive_table="default_qubole_airline_origin_destination",
db_table="exported_airline_origin_destination",
where_clause="id < 10",
parallelism=2,
dbtap_id=2064,
)
Run Spark commands¶
To run Scala script as a Spark job use
prog = """
import scala.math.random
import org.apache.spark._
/** Computes an approximation to pi */
object SparkPi {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Spark Pi")
val spark = new SparkContext(conf)
val slices = if (args.length > 0) args(0).toInt else 2
val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
val count = spark.parallelize(1 until n, slices).map { i =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x*x + y*y < 1) 1 else 0
}.reduce(_ + _)
println("Pi is roughly " + 4.0 * count / n)
spark.stop()
}
}
"""
spark_cmd = QuboleOperator(
task_id="spark_cmd",
command_type="sparkcmd",
program=prog,
language="scala",
arguments="--class SparkPi",
tags="airflow_example_run",
)
File sensor¶
Usage examples of
QuboleFileSensor
.
File or directory existence¶
To wait for file or directory existence in cluster use
check_s3_file = QuboleFileSensor(
task_id="check_s3_file",
poke_interval=60,
timeout=600,
data={
"files": [
"s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar",
"s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] }}.tsv",
] # will check for availability of all the files in array
},
)
Partition sensor¶
Usage examples of
QubolePartitionSensor
.
Partition existence¶
To wait for table partition existence in cluster use
check_hive_partition = QubolePartitionSensor(
task_id="check_hive_partition",
poke_interval=10,
timeout=60,
data={
"schema": "default",
"table": "my_partitioned_table",
"columns": [
{"column": "month", "values": ["{{ ds.split('-')[1] }}"]},
{"column": "day", "values": ["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]},
], # will check for partitions like [month=12/day=12,month=12/day=13]
},
)