Source code for airflow.providers.yandex.operators.dataproc

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import warnings
from dataclasses import dataclass
from typing import TYPE_CHECKING, Iterable, Sequence

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.models import BaseOperator
from airflow.providers.yandex.hooks.dataproc import DataprocHook

if TYPE_CHECKING:
    from airflow.utils.context import Context


@dataclass
[docs]class InitializationAction: """Data for initialization action to be run at start of DataProc cluster."""
[docs] uri: str # Uri of the executable file
[docs] args: Iterable[str] # Arguments to the initialization action
[docs] timeout: int # Execution timeout
[docs]class DataprocCreateClusterOperator(BaseOperator): """ Creates Yandex.Cloud Data Proc cluster. :param folder_id: ID of the folder in which cluster should be created. :param cluster_name: Cluster name. Must be unique inside the folder. :param cluster_description: Cluster description. :param cluster_image_version: Cluster image version. Use default. :param ssh_public_keys: List of SSH public keys that will be deployed to created compute instances. :param subnet_id: ID of the subnetwork. All Data Proc cluster nodes will use one subnetwork. :param services: List of services that will be installed to the cluster. Possible options: HDFS, YARN, MAPREDUCE, HIVE, TEZ, ZOOKEEPER, HBASE, SQOOP, FLUME, SPARK, SPARK, ZEPPELIN, OOZIE :param s3_bucket: Yandex.Cloud S3 bucket to store cluster logs. Jobs will not work if the bucket is not specified. :param zone: Availability zone to create cluster in. Currently there are ru-central1-a, ru-central1-b and ru-central1-c. :param service_account_id: Service account id for the cluster. Service account can be created inside the folder. :param masternode_resource_preset: Resources preset (CPU+RAM configuration) for the primary node of the cluster. :param masternode_disk_size: Masternode storage size in GiB. :param masternode_disk_type: Masternode storage type. Possible options: network-ssd, network-hdd. :param datanode_resource_preset: Resources preset (CPU+RAM configuration) for the data nodes of the cluster. :param datanode_disk_size: Datanodes storage size in GiB. :param datanode_disk_type: Datanodes storage type. Possible options: network-ssd, network-hdd. :param computenode_resource_preset: Resources preset (CPU+RAM configuration) for the compute nodes of the cluster. :param computenode_disk_size: Computenodes storage size in GiB. :param computenode_disk_type: Computenodes storage type. Possible options: network-ssd, network-hdd. :param connection_id: ID of the Yandex.Cloud Airflow connection. :param computenode_max_count: Maximum number of nodes of compute autoscaling subcluster. :param computenode_warmup_duration: The warmup time of the instance in seconds. During this time, traffic is sent to the instance, but instance metrics are not collected. In seconds. :param computenode_stabilization_duration: Minimum amount of time in seconds for monitoring before Instance Groups can reduce the number of instances in the group. During this time, the group size doesn't decrease, even if the new metric values indicate that it should. In seconds. :param computenode_preemptible: Preemptible instances are stopped at least once every 24 hours, and can be stopped at any time if their resources are needed by Compute. :param computenode_cpu_utilization_target: Defines an autoscaling rule based on the average CPU utilization of the instance group. in percents. 10-100. By default is not set and default autoscaling strategy is used. :param computenode_decommission_timeout: Timeout to gracefully decommission nodes during downscaling. In seconds :param properties: Properties passed to main node software. Docs: https://cloud.yandex.com/docs/data-proc/concepts/settings-list :param enable_ui_proxy: Enable UI Proxy feature for forwarding Hadoop components web interfaces Docs: https://cloud.yandex.com/docs/data-proc/concepts/ui-proxy :param host_group_ids: Dedicated host groups to place VMs of cluster on. Docs: https://cloud.yandex.com/docs/compute/concepts/dedicated-host :param security_group_ids: User security groups. Docs: https://cloud.yandex.com/docs/data-proc/concepts/network#security-groups :param log_group_id: Id of log group to write logs. By default logs will be sent to default log group. To disable cloud log sending set cluster property dataproc:disable_cloud_logging = true Docs: https://cloud.yandex.com/docs/data-proc/concepts/logs :param initialization_actions: Set of init-actions to run when cluster starts. Docs: https://cloud.yandex.com/docs/data-proc/concepts/init-action :param labels: Cluster labels as key:value pairs. No more than 64 per resource. Docs: https://cloud.yandex.com/docs/resource-manager/concepts/labels """ def __init__( self, *, folder_id: str | None = None, cluster_name: str | None = None, cluster_description: str | None = "", cluster_image_version: str | None = None, ssh_public_keys: str | Iterable[str] | None = None, subnet_id: str | None = None, services: Iterable[str] = ("HDFS", "YARN", "MAPREDUCE", "HIVE", "SPARK"), s3_bucket: str | None = None, zone: str = "ru-central1-b", service_account_id: str | None = None, masternode_resource_preset: str | None = None, masternode_disk_size: int | None = None, masternode_disk_type: str | None = None, datanode_resource_preset: str | None = None, datanode_disk_size: int | None = None, datanode_disk_type: str | None = None, datanode_count: int = 1, computenode_resource_preset: str | None = None, computenode_disk_size: int | None = None, computenode_disk_type: str | None = None, computenode_count: int = 0, computenode_max_hosts_count: int | None = None, computenode_measurement_duration: int | None = None, computenode_warmup_duration: int | None = None, computenode_stabilization_duration: int | None = None, computenode_preemptible: bool = False, computenode_cpu_utilization_target: int | None = None, computenode_decommission_timeout: int | None = None, connection_id: str | None = None, properties: dict[str, str] | None = None, enable_ui_proxy: bool = False, host_group_ids: Iterable[str] | None = None, security_group_ids: Iterable[str] | None = None, log_group_id: str | None = None, initialization_actions: Iterable[InitializationAction] | None = None, labels: dict[str, str] | None = None, **kwargs, ) -> None: super().__init__(**kwargs) if ssh_public_keys is None: ssh_public_keys = [] if services is None: services = [] self.folder_id = folder_id self.yandex_conn_id = connection_id self.cluster_name = cluster_name self.cluster_description = cluster_description self.cluster_image_version = cluster_image_version self.ssh_public_keys = ssh_public_keys self.subnet_id = subnet_id self.services = services self.s3_bucket = s3_bucket self.zone = zone self.service_account_id = service_account_id self.masternode_resource_preset = masternode_resource_preset self.masternode_disk_size = masternode_disk_size self.masternode_disk_type = masternode_disk_type self.datanode_resource_preset = datanode_resource_preset self.datanode_disk_size = datanode_disk_size self.datanode_disk_type = datanode_disk_type self.datanode_count = datanode_count self.computenode_resource_preset = computenode_resource_preset self.computenode_disk_size = computenode_disk_size self.computenode_disk_type = computenode_disk_type self.computenode_count = computenode_count self.computenode_max_hosts_count = computenode_max_hosts_count self.computenode_measurement_duration = computenode_measurement_duration self.computenode_warmup_duration = computenode_warmup_duration self.computenode_stabilization_duration = computenode_stabilization_duration self.computenode_preemptible = computenode_preemptible self.computenode_cpu_utilization_target = computenode_cpu_utilization_target self.computenode_decommission_timeout = computenode_decommission_timeout self.properties = properties self.enable_ui_proxy = enable_ui_proxy self.host_group_ids = host_group_ids self.security_group_ids = security_group_ids self.log_group_id = log_group_id self.initialization_actions = initialization_actions self.labels = labels self.hook: DataprocHook | None = None
[docs] def execute(self, context: Context) -> dict: self.hook = DataprocHook( yandex_conn_id=self.yandex_conn_id, ) operation_result = self.hook.dataproc_client.create_cluster( folder_id=self.folder_id, cluster_name=self.cluster_name, cluster_description=self.cluster_description, cluster_image_version=self.cluster_image_version, ssh_public_keys=self.ssh_public_keys, subnet_id=self.subnet_id, services=self.services, s3_bucket=self.s3_bucket, zone=self.zone, service_account_id=self.service_account_id or self.hook.default_service_account_id, masternode_resource_preset=self.masternode_resource_preset, masternode_disk_size=self.masternode_disk_size, masternode_disk_type=self.masternode_disk_type, datanode_resource_preset=self.datanode_resource_preset, datanode_disk_size=self.datanode_disk_size, datanode_disk_type=self.datanode_disk_type, datanode_count=self.datanode_count, computenode_resource_preset=self.computenode_resource_preset, computenode_disk_size=self.computenode_disk_size, computenode_disk_type=self.computenode_disk_type, computenode_count=self.computenode_count, computenode_max_hosts_count=self.computenode_max_hosts_count, computenode_measurement_duration=self.computenode_measurement_duration, computenode_warmup_duration=self.computenode_warmup_duration, computenode_stabilization_duration=self.computenode_stabilization_duration, computenode_preemptible=self.computenode_preemptible, computenode_cpu_utilization_target=self.computenode_cpu_utilization_target, computenode_decommission_timeout=self.computenode_decommission_timeout, properties=self.properties, enable_ui_proxy=self.enable_ui_proxy, host_group_ids=self.host_group_ids, security_group_ids=self.security_group_ids, log_group_id=self.log_group_id, labels=self.labels, initialization_actions=[ self.hook.sdk.wrappers.InitializationAction( uri=init_action.uri, args=init_action.args, timeout=init_action.timeout, ) for init_action in self.initialization_actions ] if self.initialization_actions else None, ) cluster_id = operation_result.response.id context["task_instance"].xcom_push(key="cluster_id", value=cluster_id) # Deprecated context["task_instance"].xcom_push(key="yandexcloud_connection_id", value=self.yandex_conn_id) return cluster_id
@property
[docs] def cluster_id(self): return self.output
[docs]class DataprocBaseOperator(BaseOperator): """ Base class for DataProc operators working with given cluster. :param connection_id: ID of the Yandex.Cloud Airflow connection. :param cluster_id: ID of the cluster to remove. (templated) """
[docs] template_fields: Sequence[str] = ("cluster_id",)
def __init__(self, *, yandex_conn_id: str | None = None, cluster_id: str | None = None, **kwargs) -> None: super().__init__(**kwargs) self.cluster_id = cluster_id self.yandex_conn_id = yandex_conn_id def _setup(self, context: Context) -> DataprocHook: if self.cluster_id is None: self.cluster_id = context["task_instance"].xcom_pull(key="cluster_id") if self.yandex_conn_id is None: xcom_yandex_conn_id = context["task_instance"].xcom_pull(key="yandexcloud_connection_id") if xcom_yandex_conn_id: warnings.warn( "Implicit pass of `yandex_conn_id` is deprecated, please pass it explicitly", AirflowProviderDeprecationWarning, stacklevel=2, ) self.yandex_conn_id = xcom_yandex_conn_id return DataprocHook(yandex_conn_id=self.yandex_conn_id)
[docs] def execute(self, context: Context): raise NotImplementedError()
[docs]class DataprocDeleteClusterOperator(DataprocBaseOperator): """ Deletes Yandex.Cloud Data Proc cluster. :param connection_id: ID of the Yandex.Cloud Airflow connection. :param cluster_id: ID of the cluster to remove. (templated) """ def __init__(self, *, connection_id: str | None = None, cluster_id: str | None = None, **kwargs) -> None: super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id, **kwargs)
[docs] def execute(self, context: Context) -> None: hook = self._setup(context) hook.dataproc_client.delete_cluster(self.cluster_id)
[docs]class DataprocCreateHiveJobOperator(DataprocBaseOperator): """ Runs Hive job in Data Proc cluster. :param query: Hive query. :param query_file_uri: URI of the script that contains Hive queries. Can be placed in HDFS or S3. :param properties: A mapping of property names to values, used to configure Hive. :param script_variables: Mapping of query variable names to values. :param continue_on_failure: Whether to continue executing queries if a query fails. :param name: Name of the job. Used for labeling. :param cluster_id: ID of the cluster to run job in. Will try to take the ID from Dataproc Hook object if it's specified. (templated) :param connection_id: ID of the Yandex.Cloud Airflow connection. """ def __init__( self, *, query: str | None = None, query_file_uri: str | None = None, script_variables: dict[str, str] | None = None, continue_on_failure: bool = False, properties: dict[str, str] | None = None, name: str = "Hive job", cluster_id: str | None = None, connection_id: str | None = None, **kwargs, ) -> None: super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id, **kwargs) self.query = query self.query_file_uri = query_file_uri self.script_variables = script_variables self.continue_on_failure = continue_on_failure self.properties = properties self.name = name
[docs] def execute(self, context: Context) -> None: hook = self._setup(context) hook.dataproc_client.create_hive_job( query=self.query, query_file_uri=self.query_file_uri, script_variables=self.script_variables, continue_on_failure=self.continue_on_failure, properties=self.properties, name=self.name, cluster_id=self.cluster_id, )
[docs]class DataprocCreateMapReduceJobOperator(DataprocBaseOperator): """ Runs Mapreduce job in Data Proc cluster. :param main_jar_file_uri: URI of jar file with job. Can be placed in HDFS or S3. Can be specified instead of main_class. :param main_class: Name of the main class of the job. Can be specified instead of main_jar_file_uri. :param file_uris: URIs of files used in the job. Can be placed in HDFS or S3. :param archive_uris: URIs of archive files used in the job. Can be placed in HDFS or S3. :param jar_file_uris: URIs of JAR files used in the job. Can be placed in HDFS or S3. :param properties: Properties for the job. :param args: Arguments to be passed to the job. :param name: Name of the job. Used for labeling. :param cluster_id: ID of the cluster to run job in. Will try to take the ID from Dataproc Hook object if it's specified. (templated) :param connection_id: ID of the Yandex.Cloud Airflow connection. """ def __init__( self, *, main_class: str | None = None, main_jar_file_uri: str | None = None, jar_file_uris: Iterable[str] | None = None, archive_uris: Iterable[str] | None = None, file_uris: Iterable[str] | None = None, args: Iterable[str] | None = None, properties: dict[str, str] | None = None, name: str = "Mapreduce job", cluster_id: str | None = None, connection_id: str | None = None, **kwargs, ) -> None: super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id, **kwargs) self.main_class = main_class self.main_jar_file_uri = main_jar_file_uri self.jar_file_uris = jar_file_uris self.archive_uris = archive_uris self.file_uris = file_uris self.args = args self.properties = properties self.name = name
[docs] def execute(self, context: Context) -> None: hook = self._setup(context) hook.dataproc_client.create_mapreduce_job( main_class=self.main_class, main_jar_file_uri=self.main_jar_file_uri, jar_file_uris=self.jar_file_uris, archive_uris=self.archive_uris, file_uris=self.file_uris, args=self.args, properties=self.properties, name=self.name, cluster_id=self.cluster_id, )
[docs]class DataprocCreateSparkJobOperator(DataprocBaseOperator): """ Runs Spark job in Data Proc cluster. :param main_jar_file_uri: URI of jar file with job. Can be placed in HDFS or S3. :param main_class: Name of the main class of the job. :param file_uris: URIs of files used in the job. Can be placed in HDFS or S3. :param archive_uris: URIs of archive files used in the job. Can be placed in HDFS or S3. :param jar_file_uris: URIs of JAR files used in the job. Can be placed in HDFS or S3. :param properties: Properties for the job. :param args: Arguments to be passed to the job. :param name: Name of the job. Used for labeling. :param cluster_id: ID of the cluster to run job in. Will try to take the ID from Dataproc Hook object if it's specified. (templated) :param connection_id: ID of the Yandex.Cloud Airflow connection. :param packages: List of maven coordinates of jars to include on the driver and executor classpaths. :param repositories: List of additional remote repositories to search for the maven coordinates given with --packages. :param exclude_packages: List of groupId:artifactId, to exclude while resolving the dependencies provided in --packages to avoid dependency conflicts. """ def __init__( self, *, main_class: str | None = None, main_jar_file_uri: str | None = None, jar_file_uris: Iterable[str] | None = None, archive_uris: Iterable[str] | None = None, file_uris: Iterable[str] | None = None, args: Iterable[str] | None = None, properties: dict[str, str] | None = None, name: str = "Spark job", cluster_id: str | None = None, connection_id: str | None = None, packages: Iterable[str] | None = None, repositories: Iterable[str] | None = None, exclude_packages: Iterable[str] | None = None, **kwargs, ) -> None: super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id, **kwargs) self.main_class = main_class self.main_jar_file_uri = main_jar_file_uri self.jar_file_uris = jar_file_uris self.archive_uris = archive_uris self.file_uris = file_uris self.args = args self.properties = properties self.name = name self.packages = packages self.repositories = repositories self.exclude_packages = exclude_packages
[docs] def execute(self, context: Context) -> None: hook = self._setup(context) hook.dataproc_client.create_spark_job( main_class=self.main_class, main_jar_file_uri=self.main_jar_file_uri, jar_file_uris=self.jar_file_uris, archive_uris=self.archive_uris, file_uris=self.file_uris, args=self.args, properties=self.properties, packages=self.packages, repositories=self.repositories, exclude_packages=self.exclude_packages, name=self.name, cluster_id=self.cluster_id, )
[docs]class DataprocCreatePysparkJobOperator(DataprocBaseOperator): """ Runs Pyspark job in Data Proc cluster. :param main_python_file_uri: URI of python file with job. Can be placed in HDFS or S3. :param python_file_uris: URIs of python files used in the job. Can be placed in HDFS or S3. :param file_uris: URIs of files used in the job. Can be placed in HDFS or S3. :param archive_uris: URIs of archive files used in the job. Can be placed in HDFS or S3. :param jar_file_uris: URIs of JAR files used in the job. Can be placed in HDFS or S3. :param properties: Properties for the job. :param args: Arguments to be passed to the job. :param name: Name of the job. Used for labeling. :param cluster_id: ID of the cluster to run job in. Will try to take the ID from Dataproc Hook object if it's specified. (templated) :param connection_id: ID of the Yandex.Cloud Airflow connection. :param packages: List of maven coordinates of jars to include on the driver and executor classpaths. :param repositories: List of additional remote repositories to search for the maven coordinates given with --packages. :param exclude_packages: List of groupId:artifactId, to exclude while resolving the dependencies provided in --packages to avoid dependency conflicts. """ def __init__( self, *, main_python_file_uri: str | None = None, python_file_uris: Iterable[str] | None = None, jar_file_uris: Iterable[str] | None = None, archive_uris: Iterable[str] | None = None, file_uris: Iterable[str] | None = None, args: Iterable[str] | None = None, properties: dict[str, str] | None = None, name: str = "Pyspark job", cluster_id: str | None = None, connection_id: str | None = None, packages: Iterable[str] | None = None, repositories: Iterable[str] | None = None, exclude_packages: Iterable[str] | None = None, **kwargs, ) -> None: super().__init__(yandex_conn_id=connection_id, cluster_id=cluster_id, **kwargs) self.main_python_file_uri = main_python_file_uri self.python_file_uris = python_file_uris self.jar_file_uris = jar_file_uris self.archive_uris = archive_uris self.file_uris = file_uris self.args = args self.properties = properties self.name = name self.packages = packages self.repositories = repositories self.exclude_packages = exclude_packages
[docs] def execute(self, context: Context) -> None: hook = self._setup(context) hook.dataproc_client.create_pyspark_job( main_python_file_uri=self.main_python_file_uri, python_file_uris=self.python_file_uris, jar_file_uris=self.jar_file_uris, archive_uris=self.archive_uris, file_uris=self.file_uris, args=self.args, properties=self.properties, packages=self.packages, repositories=self.repositories, exclude_packages=self.exclude_packages, name=self.name, cluster_id=self.cluster_id, )

Was this entry helpful?