Qubole

Qubole is an open, simple, and secure data lake platform for machine learning, streaming and adhoc analytics. Qubole delivers a Self-Service Platform for Big Data Analytics built on Amazon Web Services, Microsoft and Google Clouds.

Airflow provides operators to execute tasks (commands) on QDS and perform checks against Qubole Commands. Also, there are provided sensors that waits for a file, folder or partition to be present in cloud storage and check for its presence via QDS APIs

Execute tasks

To run following commands use QuboleOperator.

Run Hive command

To run query that shows all tables you can use

tests/system/providers/qubole/example_qubole.py[source]

hive_show_table = QuboleOperator(
    task_id="hive_show_table",
    command_type="hivecmd",
    query="show tables",
    cluster_label="{{ params.cluster_label }}",
    fetch_logs=True,
    # If `fetch_logs`=true, will fetch qubole command logs and concatenate
    # them into corresponding airflow task logs
    tags="airflow_example_run",
    # To attach tags to qubole command, auto attach 3 tags - dag_id, task_id, run_id
    params={
        "cluster_label": "default",
    },
)

Also you can run script that locates in the bucket by passing path to query file

tests/system/providers/qubole/example_qubole.py[source]

hive_s3_location = QuboleOperator(
    task_id="hive_s3_location",
    command_type="hivecmd",
    script_location="s3n://public-qubole/qbol-library/scripts/show_table.hql",
    notify=True,
    tags=["tag1", "tag2"],
    # If the script at s3 location has any qubole specific macros to be replaced
    # macros='[{"date": "{{ ds }}"}, {"name" : "abc"}]',
)

Run Hadoop command

To run jar file in your Hadoop cluster use

tests/system/providers/qubole/example_qubole.py[source]

hadoop_jar_cmd = QuboleOperator(
    task_id="hadoop_jar_cmd",
    command_type="hadoopcmd",
    sub_command="jar s3://paid-qubole/HadoopAPIExamples/"
    "jars/hadoop-0.20.1-dev-streaming.jar "
    "-mapper wc "
    "-numReduceTasks 0 -input s3://paid-qubole/HadoopAPITests/"
    "data/3.tsv -output "
    "s3://paid-qubole/HadoopAPITests/data/3_wc",
    cluster_label="{{ params.cluster_label }}",
    fetch_logs=True,
    params={
        "cluster_label": "default",
    },
)

Run Pig command

To run script in Pig Latin in your Hadoop cluster use

tests/system/providers/qubole/example_qubole.py[source]

pig_cmd = QuboleOperator(
    task_id="pig_cmd",
    command_type="pigcmd",
    script_location="s3://public-qubole/qbol-library/scripts/script1-hadoop-s3-small.pig",
    parameters="key1=value1 key2=value2",
)

Run Shell command

To run Shell-script use

tests/system/providers/qubole/example_qubole.py[source]

shell_cmd = QuboleOperator(
    task_id="shell_cmd",
    command_type="shellcmd",
    script_location="s3://public-qubole/qbol-library/scripts/shellx.sh",
    parameters="param1 param2",
)

Run Presto command

To run query using Presto use

tests/system/providers/qubole/example_qubole.py[source]

presto_cmd = QuboleOperator(task_id="presto_cmd", command_type="prestocmd", query="show tables")

Run DB commands

To run query as DbTap use

tests/system/providers/qubole/example_qubole.py[source]

db_query = QuboleOperator(
    task_id="db_query", command_type="dbtapquerycmd", query="show tables", db_tap_id=2064
)

To run DB export command use

tests/system/providers/qubole/example_qubole.py[source]

db_export = QuboleOperator(
    task_id="db_export",
    command_type="dbexportcmd",
    mode=1,
    hive_table="default_qubole_airline_origin_destination",
    db_table="exported_airline_origin_destination",
    partition_spec="dt=20110104-02",
    dbtap_id=2064,
)

To run DB import command use

tests/system/providers/qubole/example_qubole.py[source]

db_import = QuboleOperator(
    task_id="db_import",
    command_type="dbimportcmd",
    mode=1,
    hive_table="default_qubole_airline_origin_destination",
    db_table="exported_airline_origin_destination",
    where_clause="id < 10",
    parallelism=2,
    dbtap_id=2064,
)

Run Spark commands

To run Scala script as a Spark job use

tests/system/providers/qubole/example_qubole.py[source]

prog = """
import scala.math.random
import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }
}
"""

spark_cmd = QuboleOperator(
    task_id="spark_cmd",
    command_type="sparkcmd",
    program=prog,
    language="scala",
    arguments="--class SparkPi",
    tags="airflow_example_run",
)

File sensor

Usage examples of QuboleFileSensor.

File or directory existence

To wait for file or directory existence in cluster use

tests/system/providers/qubole/example_qubole_sensors.py[source]

check_s3_file = QuboleFileSensor(
    task_id="check_s3_file",
    poke_interval=60,
    timeout=600,
    data={
        "files": [
            "s3://paid-qubole/HadoopAPIExamples/jars/hadoop-0.20.1-dev-streaming.jar",
            "s3://paid-qubole/HadoopAPITests/data/{{ ds.split('-')[2] }}.tsv",
        ]  # will check for availability of all the files in array
    },
)

Partition sensor

Usage examples of QubolePartitionSensor.

Partition existence

To wait for table partition existence in cluster use

tests/system/providers/qubole/example_qubole_sensors.py[source]

check_hive_partition = QubolePartitionSensor(
    task_id="check_hive_partition",
    poke_interval=10,
    timeout=60,
    data={
        "schema": "default",
        "table": "my_partitioned_table",
        "columns": [
            {"column": "month", "values": ["{{ ds.split('-')[1] }}"]},
            {"column": "day", "values": ["{{ ds.split('-')[2] }}", "{{ yesterday_ds.split('-')[2] }}"]},
        ],  # will check for partitions like [month=12/day=12,month=12/day=13]
    },
)

Reference

For further information, look at:

Was this entry helpful?