Using Operators¶
An operator represents a single, ideally idempotent, task. Operators determine what actually executes when your DAG runs.
See the Operators Concepts documentation and the Operators API Reference for more information.
- BashOperator
- PythonOperator
- Google Cloud Platform Operators
- GoogleCloudStorageToBigQueryOperator
- GceInstanceStartOperator
- GceInstanceStopOperator
- GceSetMachineTypeOperator
- GcfFunctionDeleteOperator
- GcfFunctionDeployOperator
- CloudSqlInstanceDatabaseCreateOperator
- CloudSqlInstanceDatabaseDeleteOperator
- CloudSqlInstanceDatabasePatchOperator
- CloudSqlInstanceDeleteOperator
- CloudSqlInstanceCreateOperator
- CloudSqlInstancePatchOperator
BashOperator¶
Use the BashOperator
to execute
commands in a Bash shell.
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)
Templating¶
You can use Jinja templates to parameterize the
bash_command
argument.
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)
Troubleshooting¶
Jinja template not found¶
Add a space after the script name when directly calling a Bash script with
the bash_command
argument. This is because Airflow tries to apply a Jinja
template to it, which will fail.
t2 = BashOperator(
task_id='bash_example',
# This fails with `Jinja template not found` error
# bash_command="/home/batcher/test.sh",
# This works (has a space after)
bash_command="/home/batcher/test.sh ",
dag=dag)
PythonOperator¶
Use the PythonOperator
to execute
Python callables.
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
provide_context=True,
python_callable=print_context,
dag=dag,
)
Passing in arguments¶
Use the op_args
and op_kwargs
arguments to pass additional arguments
to the Python callable.
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
# Generate 10 sleeping tasks, sleeping from 0 to 4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag,
)
run_this >> task
Templating¶
When you set the provide_context
argument to True
, Airflow passes in
an additional set of keyword arguments: one for each of the Jinja
template variables and a templates_dict
argument.
The templates_dict
argument is templated, so each value in the dictionary
is evaluated as a Jinja template.
Google Cloud Platform Operators¶
GoogleCloudStorageToBigQueryOperator¶
Use the
GoogleCloudStorageToBigQueryOperator
to execute a BigQuery load job.
GceInstanceStartOperator¶
Allows to start an existing Google Compute Engine instance.
In this example parameter values are extracted from Airflow variables.
Moreover, the default_args
dict is used to pass common arguments to all operators in a single DAG.
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
INSTANCE = models.Variable.get('INSTANCE', '')
SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', '')
SET_MACHINE_TYPE_BODY = {
'machineType': 'zones/{}/machineTypes/{}'.format(LOCATION, SHORT_MACHINE_TYPE_NAME)
}
default_args = {
'start_date': airflow.utils.dates.days_ago(1)
}
Define the GceInstanceStartOperator
by passing the required arguments to the constructor.
gce_instance_start = GceInstanceStartOperator(
project_id=PROJECT_ID,
zone=LOCATION,
resource_id=INSTANCE,
task_id='gcp_compute_start_task'
)
GceInstanceStopOperator¶
Allows to stop an existing Google Compute Engine instance.
For parameter definition take a look at GceInstanceStartOperator
above.
Define the GceInstanceStopOperator
by passing the required arguments to the constructor.
gce_instance_stop = GceInstanceStopOperator(
project_id=PROJECT_ID,
zone=LOCATION,
resource_id=INSTANCE,
task_id='gcp_compute_stop_task'
)
GceSetMachineTypeOperator¶
Allows to change the machine type for a stopped instance to the specified machine type.
For parameter definition take a look at GceInstanceStartOperator
above.
Define the GceSetMachineTypeOperator
by passing the required arguments to the constructor.
gce_set_machine_type = GceSetMachineTypeOperator(
project_id=PROJECT_ID,
zone=LOCATION,
resource_id=INSTANCE,
body=SET_MACHINE_TYPE_BODY,
task_id='gcp_compute_set_machine_type'
)
GcfFunctionDeleteOperator¶
Use the default_args
dict to pass arguments to the operator.
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
# A fully-qualified name of the function to delete
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
ENTRYPOINT)
default_args = {
'start_date': airflow.utils.dates.days_ago(1)
}
Use the GcfFunctionDeleteOperator
to delete a function from Google Cloud Functions.
t1 = GcfFunctionDeleteOperator(
task_id="gcf_delete_task",
name=FUNCTION_NAME
)
Troubleshooting¶
If you want to run or deploy an operator using a service account and get “forbidden 403” errors, it means that your service account does not have the correct Cloud IAM permissions.
- Assign your Service Account the Cloud Functions Developer role.
- Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime service account.
The typical way of assigning Cloud IAM permissions with gcloud is shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
gcloud iam service-accounts add-iam-policy-binding \
PROJECT_ID@appspot.gserviceaccount.com \
--member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
--role="roles/iam.serviceAccountUser"
See Adding the IAM service agent user role to the runtime service for details
GcfFunctionDeployOperator¶
Use the GcfFunctionDeployOperator
to deploy a function from Google Cloud Functions.
The following examples of Airflow variables show various variants and combinations of default_args that you can use. The variables are defined as follows:
PROJECT_ID = models.Variable.get('PROJECT_ID', '')
LOCATION = models.Variable.get('LOCATION', '')
SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY', '')
ZIP_PATH = models.Variable.get('ZIP_PATH', '')
ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
ENTRYPOINT)
RUNTIME = 'nodejs6'
VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True)
With those variables you can define the body of the request:
body = {
"name": FUNCTION_NAME,
"entryPoint": ENTRYPOINT,
"runtime": RUNTIME,
"httpsTrigger": {}
}
When you create a DAG, the default_args dictionary can be used to pass the body and other arguments:
default_args = {
'start_date': dates.days_ago(1),
'project_id': PROJECT_ID,
'location': LOCATION,
'body': body,
'validate_body': VALIDATE_BODY
}
Note that the neither the body nor the default args are complete in the above examples. Depending on the set variables, there might be different variants on how to pass source code related fields. Currently, you can pass either sourceArchiveUrl, sourceRepository or sourceUploadUrl as described in the CloudFunction API specification. Additionally, default_args might contain zip_path parameter to run the extra step of uploading the source code before deploying it. In the last case, you also need to provide an empty sourceUploadUrl parameter in the body.
Based on the variables defined above, example logic of setting the source code related fields is shown here:
if SOURCE_ARCHIVE_URL:
body['sourceArchiveUrl'] = SOURCE_ARCHIVE_URL
elif SOURCE_REPOSITORY:
body['sourceRepository'] = {
'url': SOURCE_REPOSITORY
}
elif ZIP_PATH:
body['sourceUploadUrl'] = ''
default_args['zip_path'] = ZIP_PATH
elif SOURCE_UPLOAD_URL:
body['sourceUploadUrl'] = SOURCE_UPLOAD_URL
else:
raise Exception("Please provide one of the source_code parameters")
The code to create the operator:
deploy_task = GcfFunctionDeployOperator(
task_id="gcf_deploy_task",
name=FUNCTION_NAME
)
Troubleshooting¶
If you want to run or deploy an operator using a service account and get “forbidden 403” errors, it means that your service account does not have the correct Cloud IAM permissions.
- Assign your Service Account the Cloud Functions Developer role.
- Grant the user the Cloud IAM Service Account User role on the Cloud Functions runtime service account.
The typical way of assigning Cloud IAM permissions with gcloud is shown below. Just replace PROJECT_ID with ID of your Google Cloud Platform project and SERVICE_ACCOUNT_EMAIL with the email ID of your service account.
gcloud iam service-accounts add-iam-policy-binding \
PROJECT_ID@appspot.gserviceaccount.com \
--member="serviceAccount:[SERVICE_ACCOUNT_EMAIL]" \
--role="roles/iam.serviceAccountUser"
See Adding the IAM service agent user role to the runtime service for details
If the source code for your function is in Google Source Repository, make sure that your service account has the Source Repository Viewer role so that the source code can be downloaded if necessary.
CloudSqlInstanceDatabaseCreateOperator¶
Creates a new database inside a Cloud SQL instance.
For parameter definition take a look at
CloudSqlInstanceDatabaseCreateOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
Using the operator¶
sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
project_id=PROJECT_ID,
body=db_create_body,
instance=INSTANCE_NAME,
task_id='sql_db_create_task'
)
Example request body:
db_create_body = {
"instance": INSTANCE_NAME,
"name": DB_NAME,
"project": PROJECT_ID
}
Templating¶
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
CloudSqlInstanceDatabaseDeleteOperator¶
Deletes a database from a Cloud SQL instance.
For parameter definition take a look at
CloudSqlInstanceDatabaseDeleteOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
Using the operator¶
sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
project_id=PROJECT_ID,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id='sql_db_delete_task'
)
Templating¶
template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
'api_version')
CloudSqlInstanceDatabasePatchOperator¶
Updates a resource containing information about a database inside a Cloud SQL instance using patch semantics. See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
For parameter definition take a look at
CloudSqlInstanceDatabasePatchOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
Using the operator¶
sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
project_id=PROJECT_ID,
body=db_patch_body,
instance=INSTANCE_NAME,
database=DB_NAME,
task_id='sql_db_patch_task'
)
Example request body:
db_patch_body = {
"charset": "utf16",
"collation": "utf16_general_ci"
}
Templating¶
template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
'api_version')
CloudSqlInstanceDeleteOperator¶
Deletes a Cloud SQL instance in Google Cloud Platform.
For parameter definition take a look at
CloudSqlInstanceDeleteOperator
.
Arguments¶
Some arguments in the example DAG are taken from environment variables:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
Using the operator¶
sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
project_id=PROJECT_ID,
instance=INSTANCE_NAME,
task_id='sql_instance_delete_task'
)
Templating¶
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
CloudSqlInstanceCreateOperator¶
Creates a new Cloud SQL instance in Google Cloud Platform.
For parameter definition take a look at
CloudSqlInstanceCreateOperator
.
If an instance with the same name exists, no action will be taken and the operator will succeed.
Arguments¶
Some arguments in the example DAG are taken from environment variables:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
Example body defining the instance:
body = {
"name": INSTANCE_NAME,
"settings": {
"tier": "db-n1-standard-1",
"backupConfiguration": {
"binaryLogEnabled": True,
"enabled": True,
"startTime": "05:00"
},
"activationPolicy": "ALWAYS",
"dataDiskSizeGb": 30,
"dataDiskType": "PD_SSD",
"databaseFlags": [],
"ipConfiguration": {
"ipv4Enabled": True,
"requireSsl": True,
},
"locationPreference": {
"zone": "europe-west4-a"
},
"maintenanceWindow": {
"hour": 5,
"day": 7,
"updateTrack": "canary"
},
"pricingPlan": "PER_USE",
"replicationType": "ASYNCHRONOUS",
"storageAutoResize": False,
"storageAutoResizeLimit": 0,
"userLabels": {
"my-key": "my-value"
}
},
"databaseVersion": "MYSQL_5_7",
"region": "europe-west4",
}
Using the operator¶
sql_instance_create_task = CloudSqlInstanceCreateOperator(
project_id=PROJECT_ID,
body=body,
instance=INSTANCE_NAME,
task_id='sql_instance_create_task'
)
Templating¶
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
CloudSqlInstancePatchOperator¶
Updates settings of a Cloud SQL instance in Google Cloud Platform (partial update).
For parameter definition take a look at
CloudSqlInstancePatchOperator
.
This is a partial update, so only values for the settings specified in the body will be set / updated. The rest of the existing instance’s configuration will remain unchanged.
Arguments¶
Some arguments in the example DAG are taken from environment variables:
PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
DB_NAME = os.environ.get('DB_NAME', 'testdb')
Example body defining the instance:
patch_body = {
"name": INSTANCE_NAME,
"settings": {
"dataDiskSizeGb": 35,
"maintenanceWindow": {
"hour": 3,
"day": 6,
"updateTrack": "canary"
},
"userLabels": {
"my-key-patch": "my-value-patch"
}
}
}
Using the operator¶
sql_instance_patch_task = CloudSqlInstancePatchOperator(
project_id=PROJECT_ID,
body=patch_body,
instance=INSTANCE_NAME,
task_id='sql_instance_patch_task'
)
Templating¶
template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')