#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# mypy ignore arg types (for templated fields)
# type: ignore[arg-type]
"""
Example Airflow DAG that demonstrates operators for the Google Vertex AI service in the Google
Cloud Platform.
This DAG relies on the following OS environment variables:
* GCP_VERTEX_AI_BUCKET - Google Cloud Storage bucket where the model will be saved
after training process was finished.
* CUSTOM_CONTAINER_URI - path to container with model.
* PYTHON_PACKAGE_GSC_URI - path to test model in archive.
* LOCAL_TRAINING_SCRIPT_PATH - path to local training script.
* DATASET_ID - ID of dataset which will be used in training process.
* MODEL_ID - ID of model which will be used in predict process.
* MODEL_ARTIFACT_URI - The artifact_uri should be the path to a GCS directory containing saved model
artifacts.
"""
import os
from datetime import datetime
from uuid import uuid4
from google.cloud import aiplatform
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
from airflow import models
from airflow.providers.google.cloud.operators.vertex_ai.auto_ml import (
CreateAutoMLForecastingTrainingJobOperator ,
CreateAutoMLImageTrainingJobOperator ,
CreateAutoMLTabularTrainingJobOperator ,
CreateAutoMLTextTrainingJobOperator ,
CreateAutoMLVideoTrainingJobOperator ,
DeleteAutoMLTrainingJobOperator ,
ListAutoMLTrainingJobOperator ,
)
from airflow.providers.google.cloud.operators.vertex_ai.batch_prediction_job import (
CreateBatchPredictionJobOperator ,
DeleteBatchPredictionJobOperator ,
ListBatchPredictionJobsOperator ,
)
from airflow.providers.google.cloud.operators.vertex_ai.custom_job import (
CreateCustomContainerTrainingJobOperator ,
CreateCustomPythonPackageTrainingJobOperator ,
CreateCustomTrainingJobOperator ,
DeleteCustomTrainingJobOperator ,
ListCustomTrainingJobOperator ,
)
from airflow.providers.google.cloud.operators.vertex_ai.dataset import (
CreateDatasetOperator ,
DeleteDatasetOperator ,
ExportDataOperator ,
GetDatasetOperator ,
ImportDataOperator ,
ListDatasetsOperator ,
UpdateDatasetOperator ,
)
from airflow.providers.google.cloud.operators.vertex_ai.endpoint_service import (
CreateEndpointOperator ,
DeleteEndpointOperator ,
DeployModelOperator ,
ListEndpointsOperator ,
UndeployModelOperator ,
)
from airflow.providers.google.cloud.operators.vertex_ai.hyperparameter_tuning_job import (
CreateHyperparameterTuningJobOperator ,
DeleteHyperparameterTuningJobOperator ,
GetHyperparameterTuningJobOperator ,
ListHyperparameterTuningJobOperator ,
)
from airflow.providers.google.cloud.operators.vertex_ai.model_service import (
DeleteModelOperator ,
ExportModelOperator ,
ListModelsOperator ,
UploadModelOperator ,
)
[docs] PROJECT_ID = os . environ . get ( "GCP_PROJECT_ID" , "an-id" )
[docs] REGION = os . environ . get ( "GCP_LOCATION" , "us-central1" )
[docs] BUCKET = os . environ . get ( "GCP_VERTEX_AI_BUCKET" , "vertex-ai-system-tests" )
[docs] STAGING_BUCKET = f "gs:// { BUCKET } "
[docs] DISPLAY_NAME = str ( uuid4 ()) # Create random display name
[docs] CONTAINER_URI = "gcr.io/cloud-aiplatform/training/tf-cpu.2-2:latest"
[docs] CUSTOM_CONTAINER_URI = os . environ . get ( "CUSTOM_CONTAINER_URI" , "path_to_container_with_model" )
[docs] MODEL_SERVING_CONTAINER_URI = "gcr.io/cloud-aiplatform/prediction/tf2-cpu.2-2:latest"
[docs] MACHINE_TYPE = "n1-standard-4"
[docs] ACCELERATOR_TYPE = "ACCELERATOR_TYPE_UNSPECIFIED"
[docs] TRAINING_FRACTION_SPLIT = 0.7
[docs] TEST_FRACTION_SPLIT = 0.15
[docs] VALIDATION_FRACTION_SPLIT = 0.15
[docs] PYTHON_PACKAGE_GCS_URI = os . environ . get ( "PYTHON_PACKAGE_GSC_URI" , "path_to_test_model_in_arch" )
[docs] PYTHON_MODULE_NAME = "aiplatform_custom_trainer_script.task"
[docs] LOCAL_TRAINING_SCRIPT_PATH = os . environ . get ( "LOCAL_TRAINING_SCRIPT_PATH" , "path_to_training_script" )
[docs] TRAINING_PIPELINE_ID = "test-training-pipeline-id"
[docs] CUSTOM_JOB_ID = "test-custom-job-id"
[docs] IMAGE_DATASET = {
"display_name" : str ( uuid4 ()),
"metadata_schema_uri" : "gs://google-cloud-aiplatform/schema/dataset/metadata/image_1.0.0.yaml" ,
"metadata" : Value ( string_value = "test-image-dataset" ),
}
[docs] TABULAR_DATASET = {
"display_name" : str ( uuid4 ()),
"metadata_schema_uri" : "gs://google-cloud-aiplatform/schema/dataset/metadata/tabular_1.0.0.yaml" ,
"metadata" : Value ( string_value = "test-tabular-dataset" ),
}
[docs] TEXT_DATASET = {
"display_name" : str ( uuid4 ()),
"metadata_schema_uri" : "gs://google-cloud-aiplatform/schema/dataset/metadata/text_1.0.0.yaml" ,
"metadata" : Value ( string_value = "test-text-dataset" ),
}
[docs] VIDEO_DATASET = {
"display_name" : str ( uuid4 ()),
"metadata_schema_uri" : "gs://google-cloud-aiplatform/schema/dataset/metadata/video_1.0.0.yaml" ,
"metadata" : Value ( string_value = "test-video-dataset" ),
}
[docs] TIME_SERIES_DATASET = {
"display_name" : str ( uuid4 ()),
"metadata_schema_uri" : "gs://google-cloud-aiplatform/schema/dataset/metadata/time_series_1.0.0.yaml" ,
"metadata" : Value ( string_value = "test-video-dataset" ),
}
[docs] DATASET_ID = os . environ . get ( "DATASET_ID" , "test-dataset-id" )
[docs] TEST_EXPORT_CONFIG = { "gcs_destination" : { "output_uri_prefix" : "gs://test-vertex-ai-bucket/exports" }}
[docs] TEST_IMPORT_CONFIG = [
{
"data_item_labels" : {
"test-labels-name" : "test-labels-value" ,
},
"import_schema_uri" : (
"gs://google-cloud-aiplatform/schema/dataset/ioformat/image_bounding_box_io_format_1.0.0.yaml"
),
"gcs_source" : {
"uris" : [ "gs://ucaip-test-us-central1/dataset/salads_oid_ml_use_public_unassigned.jsonl" ]
},
},
]
[docs] DATASET_TO_UPDATE = { "display_name" : "test-name" }
[docs] TEST_UPDATE_MASK = { "paths" : [ "displayName" ]}
[docs] TEST_TIME_COLUMN = "date"
[docs] TEST_TIME_SERIES_IDENTIFIER_COLUMN = "store_name"
[docs] TEST_TARGET_COLUMN = "sale_dollars"
[docs] COLUMN_SPECS = {
TEST_TIME_COLUMN : "timestamp" ,
TEST_TARGET_COLUMN : "numeric" ,
"city" : "categorical" ,
"zip_code" : "categorical" ,
"county" : "categorical" ,
}
]
[docs] MODEL_ID = os . environ . get ( "MODEL_ID" , "test-model-id" )
[docs] MODEL_ARTIFACT_URI = os . environ . get ( "MODEL_ARTIFACT_URI" , "path_to_folder_with_model_artifacts" )
[docs] MODEL_NAME = f "projects/ { PROJECT_ID } /locations/ { REGION } /models/ { MODEL_ID } "
[docs] JOB_DISPLAY_NAME = f "temp_create_batch_prediction_job_test_ { uuid4 () } "
[docs] BIGQUERY_SOURCE = f "bq:// { PROJECT_ID } .test_iowa_liquor_sales_forecasting_us.2021_sales_predict"
[docs] GCS_DESTINATION_PREFIX = "gs://test-vertex-ai-bucket-us/output"
[docs] MODEL_PARAMETERS = json_format . ParseDict ({}, Value ())
[docs] ENDPOINT_CONF = {
"display_name" : f "endpoint_test_ { uuid4 () } " ,
}
[docs] DEPLOYED_MODEL = {
# format: 'projects/{project}/locations/{location}/models/{model}'
'model' : f "projects/ { PROJECT_ID } /locations/ { REGION } /models/ { MODEL_ID } " ,
'display_name' : f "temp_endpoint_test_ { uuid4 () } " ,
"dedicated_resources" : {
"machine_spec" : {
"machine_type" : "n1-standard-2" ,
"accelerator_type" : aiplatform . gapic . AcceleratorType . NVIDIA_TESLA_K80 ,
"accelerator_count" : 1 ,
},
'min_replica_count' : 1 ,
"max_replica_count" : 1 ,
},
}
[docs] MODEL_OUTPUT_CONFIG = {
"artifact_destination" : {
"output_uri_prefix" : STAGING_BUCKET ,
},
"export_format_id" : "custom-trained" ,
}
[docs] MODEL_OBJ = {
"display_name" : f "model- { str ( uuid4 ()) } " ,
"artifact_uri" : MODEL_ARTIFACT_URI ,
"container_spec" : {
"image_uri" : MODEL_SERVING_CONTAINER_URI ,
"command" : [],
"args" : [],
"env" : [],
"ports" : [],
"predict_route" : "" ,
"health_route" : "" ,
},
}
with models . DAG (
"example_gcp_vertex_ai_custom_jobs" ,
schedule_interval = "@once" ,
start_date = datetime ( 2021 , 1 , 1 ),
catchup = False ,
) as custom_jobs_dag :
# [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
[docs] create_custom_container_training_job = CreateCustomContainerTrainingJobOperator (
task_id = "custom_container_task" ,
staging_bucket = STAGING_BUCKET ,
display_name = f "train-housing-container- { DISPLAY_NAME } " ,
container_uri = CUSTOM_CONTAINER_URI ,
model_serving_container_image_uri = MODEL_SERVING_CONTAINER_URI ,
# run params
dataset_id = DATASET_ID ,
command = [ "python3" , "task.py" ],
model_display_name = f "container-housing-model- { DISPLAY_NAME } " ,
replica_count = REPLICA_COUNT ,
machine_type = MACHINE_TYPE ,
accelerator_type = ACCELERATOR_TYPE ,
accelerator_count = ACCELERATOR_COUNT ,
training_fraction_split = TRAINING_FRACTION_SPLIT ,
validation_fraction_split = VALIDATION_FRACTION_SPLIT ,
test_fraction_split = TEST_FRACTION_SPLIT ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
# [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
create_custom_python_package_training_job = CreateCustomPythonPackageTrainingJobOperator (
task_id = "python_package_task" ,
staging_bucket = STAGING_BUCKET ,
display_name = f "train-housing-py-package- { DISPLAY_NAME } " ,
python_package_gcs_uri = PYTHON_PACKAGE_GCS_URI ,
python_module_name = PYTHON_MODULE_NAME ,
container_uri = CONTAINER_URI ,
model_serving_container_image_uri = MODEL_SERVING_CONTAINER_URI ,
# run params
dataset_id = DATASET_ID ,
model_display_name = f "py-package-housing-model- { DISPLAY_NAME } " ,
replica_count = REPLICA_COUNT ,
machine_type = MACHINE_TYPE ,
accelerator_type = ACCELERATOR_TYPE ,
accelerator_count = ACCELERATOR_COUNT ,
training_fraction_split = TRAINING_FRACTION_SPLIT ,
validation_fraction_split = VALIDATION_FRACTION_SPLIT ,
test_fraction_split = TEST_FRACTION_SPLIT ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
# [START how_to_cloud_vertex_ai_create_custom_training_job_operator]
create_custom_training_job = CreateCustomTrainingJobOperator (
task_id = "custom_task" ,
staging_bucket = STAGING_BUCKET ,
display_name = f "train-housing-custom- { DISPLAY_NAME } " ,
script_path = LOCAL_TRAINING_SCRIPT_PATH ,
container_uri = CONTAINER_URI ,
requirements = [ "gcsfs==0.7.1" ],
model_serving_container_image_uri = MODEL_SERVING_CONTAINER_URI ,
# run params
dataset_id = DATASET_ID ,
replica_count = 1 ,
model_display_name = f "custom-housing-model- { DISPLAY_NAME } " ,
sync = False ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_custom_training_job_operator]
# [START how_to_cloud_vertex_ai_delete_custom_training_job_operator]
delete_custom_training_job = DeleteCustomTrainingJobOperator (
task_id = "delete_custom_training_job" ,
training_pipeline_id = TRAINING_PIPELINE_ID ,
custom_job_id = CUSTOM_JOB_ID ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_delete_custom_training_job_operator]
# [START how_to_cloud_vertex_ai_list_custom_training_job_operator]
list_custom_training_job = ListCustomTrainingJobOperator (
task_id = "list_custom_training_job" ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_list_custom_training_job_operator]
with models . DAG (
"example_gcp_vertex_ai_dataset" ,
schedule_interval = "@once" ,
start_date = datetime ( 2021 , 1 , 1 ),
catchup = False ,
) as dataset_dag :
# [START how_to_cloud_vertex_ai_create_dataset_operator]
[docs] create_image_dataset_job = CreateDatasetOperator (
task_id = "image_dataset" ,
dataset = IMAGE_DATASET ,
region = REGION ,
project_id = PROJECT_ID ,
)
create_tabular_dataset_job = CreateDatasetOperator (
task_id = "tabular_dataset" ,
dataset = TABULAR_DATASET ,
region = REGION ,
project_id = PROJECT_ID ,
)
create_text_dataset_job = CreateDatasetOperator (
task_id = "text_dataset" ,
dataset = TEXT_DATASET ,
region = REGION ,
project_id = PROJECT_ID ,
)
create_video_dataset_job = CreateDatasetOperator (
task_id = "video_dataset" ,
dataset = VIDEO_DATASET ,
region = REGION ,
project_id = PROJECT_ID ,
)
create_time_series_dataset_job = CreateDatasetOperator (
task_id = "time_series_dataset" ,
dataset = TIME_SERIES_DATASET ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_dataset_operator]
# [START how_to_cloud_vertex_ai_delete_dataset_operator]
delete_dataset_job = DeleteDatasetOperator (
task_id = "delete_dataset" ,
dataset_id = create_text_dataset_job . output [ 'dataset_id' ],
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_delete_dataset_operator]
# [START how_to_cloud_vertex_ai_get_dataset_operator]
get_dataset = GetDatasetOperator (
task_id = "get_dataset" ,
project_id = PROJECT_ID ,
region = REGION ,
dataset_id = create_tabular_dataset_job . output [ 'dataset_id' ],
)
# [END how_to_cloud_vertex_ai_get_dataset_operator]
# [START how_to_cloud_vertex_ai_export_data_operator]
export_data_job = ExportDataOperator (
task_id = "export_data" ,
dataset_id = create_image_dataset_job . output [ 'dataset_id' ],
region = REGION ,
project_id = PROJECT_ID ,
export_config = TEST_EXPORT_CONFIG ,
)
# [END how_to_cloud_vertex_ai_export_data_operator]
# [START how_to_cloud_vertex_ai_import_data_operator]
import_data_job = ImportDataOperator (
task_id = "import_data" ,
dataset_id = create_image_dataset_job . output [ 'dataset_id' ],
region = REGION ,
project_id = PROJECT_ID ,
import_configs = TEST_IMPORT_CONFIG ,
)
# [END how_to_cloud_vertex_ai_import_data_operator]
# [START how_to_cloud_vertex_ai_list_dataset_operator]
list_dataset_job = ListDatasetsOperator (
task_id = "list_dataset" ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_list_dataset_operator]
# [START how_to_cloud_vertex_ai_update_dataset_operator]
update_dataset_job = UpdateDatasetOperator (
task_id = "update_dataset" ,
project_id = PROJECT_ID ,
region = REGION ,
dataset_id = create_video_dataset_job . output [ 'dataset_id' ],
dataset = DATASET_TO_UPDATE ,
update_mask = TEST_UPDATE_MASK ,
)
# [END how_to_cloud_vertex_ai_update_dataset_operator]
create_time_series_dataset_job
create_text_dataset_job >> delete_dataset_job
create_tabular_dataset_job >> get_dataset
create_image_dataset_job >> import_data_job >> export_data_job
create_video_dataset_job >> update_dataset_job
list_dataset_job
with models . DAG (
"example_gcp_vertex_ai_auto_ml" ,
schedule_interval = "@once" ,
start_date = datetime ( 2021 , 1 , 1 ),
catchup = False ,
) as auto_ml_dag :
# [START how_to_cloud_vertex_ai_create_auto_ml_forecasting_training_job_operator]
[docs] create_auto_ml_forecasting_training_job = CreateAutoMLForecastingTrainingJobOperator (
task_id = "auto_ml_forecasting_task" ,
display_name = f "auto-ml-forecasting- { DISPLAY_NAME } " ,
optimization_objective = "minimize-rmse" ,
column_specs = COLUMN_SPECS ,
# run params
dataset_id = DATASET_ID ,
target_column = TEST_TARGET_COLUMN ,
time_column = TEST_TIME_COLUMN ,
time_series_identifier_column = TEST_TIME_SERIES_IDENTIFIER_COLUMN ,
available_at_forecast_columns = [ TEST_TIME_COLUMN ],
unavailable_at_forecast_columns = [ TEST_TARGET_COLUMN ],
time_series_attribute_columns = [ "city" , "zip_code" , "county" ],
forecast_horizon = 30 ,
context_window = 30 ,
data_granularity_unit = "day" ,
data_granularity_count = 1 ,
weight_column = None ,
budget_milli_node_hours = 1000 ,
model_display_name = f "auto-ml-forecasting-model- { DISPLAY_NAME } " ,
predefined_split_column_name = None ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_auto_ml_forecasting_training_job_operator]
# [START how_to_cloud_vertex_ai_create_auto_ml_image_training_job_operator]
create_auto_ml_image_training_job = CreateAutoMLImageTrainingJobOperator (
task_id = "auto_ml_image_task" ,
display_name = f "auto-ml-image- { DISPLAY_NAME } " ,
dataset_id = DATASET_ID ,
prediction_type = "classification" ,
multi_label = False ,
model_type = "CLOUD" ,
training_fraction_split = 0.6 ,
validation_fraction_split = 0.2 ,
test_fraction_split = 0.2 ,
budget_milli_node_hours = 8000 ,
model_display_name = f "auto-ml-image-model- { DISPLAY_NAME } " ,
disable_early_stopping = False ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_auto_ml_image_training_job_operator]
# [START how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
create_auto_ml_tabular_training_job = CreateAutoMLTabularTrainingJobOperator (
task_id = "auto_ml_tabular_task" ,
display_name = f "auto-ml-tabular- { DISPLAY_NAME } " ,
optimization_prediction_type = "classification" ,
column_transformations = COLUMN_TRANSFORMATIONS ,
dataset_id = DATASET_ID ,
target_column = "Adopted" ,
training_fraction_split = 0.8 ,
validation_fraction_split = 0.1 ,
test_fraction_split = 0.1 ,
model_display_name = "adopted-prediction-model" ,
disable_early_stopping = False ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_auto_ml_tabular_training_job_operator]
# [START how_to_cloud_vertex_ai_create_auto_ml_text_training_job_operator]
create_auto_ml_text_training_job = CreateAutoMLTextTrainingJobOperator (
task_id = "auto_ml_text_task" ,
display_name = f "auto-ml-text- { DISPLAY_NAME } " ,
prediction_type = "classification" ,
multi_label = False ,
dataset_id = DATASET_ID ,
model_display_name = f "auto-ml-text-model- { DISPLAY_NAME } " ,
training_fraction_split = 0.7 ,
validation_fraction_split = 0.2 ,
test_fraction_split = 0.1 ,
sync = True ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_auto_ml_text_training_job_operator]
# [START how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
create_auto_ml_video_training_job = CreateAutoMLVideoTrainingJobOperator (
task_id = "auto_ml_video_task" ,
display_name = f "auto-ml-video- { DISPLAY_NAME } " ,
prediction_type = "classification" ,
model_type = "CLOUD" ,
dataset_id = DATASET_ID ,
model_display_name = f "auto-ml-video-model- { DISPLAY_NAME } " ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_auto_ml_video_training_job_operator]
# [START how_to_cloud_vertex_ai_delete_auto_ml_training_job_operator]
delete_auto_ml_training_job = DeleteAutoMLTrainingJobOperator (
task_id = "delete_auto_ml_training_job" ,
training_pipeline_id = TRAINING_PIPELINE_ID ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_delete_auto_ml_training_job_operator]
# [START how_to_cloud_vertex_ai_list_auto_ml_training_job_operator]
list_auto_ml_training_job = ListAutoMLTrainingJobOperator (
task_id = "list_auto_ml_training_job" ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_list_auto_ml_training_job_operator]
with models . DAG (
"example_gcp_vertex_ai_batch_prediction_job" ,
schedule_interval = "@once" ,
start_date = datetime ( 2021 , 1 , 1 ),
catchup = False ,
) as batch_prediction_job_dag :
# [START how_to_cloud_vertex_ai_create_batch_prediction_job_operator]
[docs] create_batch_prediction_job = CreateBatchPredictionJobOperator (
task_id = "create_batch_prediction_job" ,
job_display_name = JOB_DISPLAY_NAME ,
model_name = MODEL_NAME ,
predictions_format = "csv" ,
bigquery_source = BIGQUERY_SOURCE ,
gcs_destination_prefix = GCS_DESTINATION_PREFIX ,
model_parameters = MODEL_PARAMETERS ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_batch_prediction_job_operator]
# [START how_to_cloud_vertex_ai_list_batch_prediction_job_operator]
list_batch_prediction_job = ListBatchPredictionJobsOperator (
task_id = "list_batch_prediction_jobs" ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_list_batch_prediction_job_operator]
# [START how_to_cloud_vertex_ai_delete_batch_prediction_job_operator]
delete_batch_prediction_job = DeleteBatchPredictionJobOperator (
task_id = "delete_batch_prediction_job" ,
batch_prediction_job_id = create_batch_prediction_job . output [ 'batch_prediction_job_id' ],
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_delete_batch_prediction_job_operator]
create_batch_prediction_job >> delete_batch_prediction_job
list_batch_prediction_job
with models . DAG (
"example_gcp_vertex_ai_endpoint" ,
schedule_interval = "@once" ,
start_date = datetime ( 2021 , 1 , 1 ),
catchup = False ,
) as endpoint_dag :
# [START how_to_cloud_vertex_ai_create_endpoint_operator]
[docs] create_endpoint = CreateEndpointOperator (
task_id = "create_endpoint" ,
endpoint = ENDPOINT_CONF ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_create_endpoint_operator]
# [START how_to_cloud_vertex_ai_delete_endpoint_operator]
delete_endpoint = DeleteEndpointOperator (
task_id = "delete_endpoint" ,
endpoint_id = create_endpoint . output [ 'endpoint_id' ],
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_delete_endpoint_operator]
# [START how_to_cloud_vertex_ai_list_endpoints_operator]
list_endpoints = ListEndpointsOperator (
task_id = "list_endpoints" ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_list_endpoints_operator]
# [START how_to_cloud_vertex_ai_deploy_model_operator]
deploy_model = DeployModelOperator (
task_id = "deploy_model" ,
endpoint_id = create_endpoint . output [ 'endpoint_id' ],
deployed_model = DEPLOYED_MODEL ,
traffic_split = { '0' : 100 },
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_deploy_model_operator]
# [START how_to_cloud_vertex_ai_undeploy_model_operator]
undeploy_model = UndeployModelOperator (
task_id = "undeploy_model" ,
endpoint_id = create_endpoint . output [ 'endpoint_id' ],
deployed_model_id = deploy_model . output [ 'deployed_model_id' ],
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_undeploy_model_operator]
create_endpoint >> deploy_model >> undeploy_model >> delete_endpoint
list_endpoints
with models . DAG (
"example_gcp_vertex_ai_hyperparameter_tuning_job" ,
schedule_interval = "@once" ,
start_date = datetime ( 2021 , 1 , 1 ),
catchup = False ,
) as hyperparameter_tuning_job_dag :
# [START how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]
[docs] create_hyperparameter_tuning_job = CreateHyperparameterTuningJobOperator (
task_id = "create_hyperparameter_tuning_job" ,
staging_bucket = STAGING_BUCKET ,
display_name = f "horses-humans-hyptertune- { DISPLAY_NAME } " ,
worker_pool_specs = [
{
"machine_spec" : {
"machine_type" : MACHINE_TYPE ,
"accelerator_type" : ACCELERATOR_TYPE ,
"accelerator_count" : ACCELERATOR_COUNT ,
},
"replica_count" : REPLICA_COUNT ,
"container_spec" : {
"image_uri" : f "gcr.io/ { PROJECT_ID } /horse-human:hypertune" ,
},
}
],
sync = False ,
region = REGION ,
project_id = PROJECT_ID ,
parameter_spec = {
'learning_rate' : aiplatform . hyperparameter_tuning . DoubleParameterSpec (
min = 0.01 , max = 1 , scale = 'log'
),
'momentum' : aiplatform . hyperparameter_tuning . DoubleParameterSpec ( min = 0 , max = 1 , scale = 'linear' ),
'num_neurons' : aiplatform . hyperparameter_tuning . DiscreteParameterSpec (
values = [ 64 , 128 , 512 ], scale = 'linear'
),
},
metric_spec = {
'accuracy' : 'maximize' ,
},
max_trial_count = 15 ,
parallel_trial_count = 3 ,
)
# [END how_to_cloud_vertex_ai_create_hyperparameter_tuning_job_operator]
# [START how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator]
get_hyperparameter_tuning_job = GetHyperparameterTuningJobOperator (
task_id = "get_hyperparameter_tuning_job" ,
project_id = PROJECT_ID ,
region = REGION ,
hyperparameter_tuning_job_id = create_hyperparameter_tuning_job . output [ "hyperparameter_tuning_job_id" ],
)
# [END how_to_cloud_vertex_ai_get_hyperparameter_tuning_job_operator]
# [START how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator]
delete_hyperparameter_tuning_job = DeleteHyperparameterTuningJobOperator (
task_id = "delete_hyperparameter_tuning_job" ,
project_id = PROJECT_ID ,
region = REGION ,
hyperparameter_tuning_job_id = create_hyperparameter_tuning_job . output [ "hyperparameter_tuning_job_id" ],
)
# [END how_to_cloud_vertex_ai_delete_hyperparameter_tuning_job_operator]
# [START how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator]
list_hyperparameter_tuning_job = ListHyperparameterTuningJobOperator (
task_id = "list_hyperparameter_tuning_job" ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_list_hyperparameter_tuning_job_operator]
create_hyperparameter_tuning_job >> get_hyperparameter_tuning_job >> delete_hyperparameter_tuning_job
list_hyperparameter_tuning_job
with models . DAG (
"example_gcp_vertex_ai_model_service" ,
schedule_interval = "@once" ,
start_date = datetime ( 2021 , 1 , 1 ),
catchup = False ,
) as model_service_dag :
# [START how_to_cloud_vertex_ai_upload_model_operator]
[docs] upload_model = UploadModelOperator (
task_id = "upload_model" ,
region = REGION ,
project_id = PROJECT_ID ,
model = MODEL_OBJ ,
)
# [END how_to_cloud_vertex_ai_upload_model_operator]
# [START how_to_cloud_vertex_ai_export_model_operator]
export_model = ExportModelOperator (
task_id = "export_model" ,
project_id = PROJECT_ID ,
region = REGION ,
model_id = upload_model . output [ "model_id" ],
output_config = MODEL_OUTPUT_CONFIG ,
)
# [END how_to_cloud_vertex_ai_export_model_operator]
# [START how_to_cloud_vertex_ai_delete_model_operator]
delete_model = DeleteModelOperator (
task_id = "delete_model" ,
project_id = PROJECT_ID ,
region = REGION ,
model_id = upload_model . output [ "model_id" ],
)
# [END how_to_cloud_vertex_ai_delete_model_operator]
# [START how_to_cloud_vertex_ai_list_models_operator]
list_models = ListModelsOperator (
task_id = "list_models" ,
region = REGION ,
project_id = PROJECT_ID ,
)
# [END how_to_cloud_vertex_ai_list_models_operator]
upload_model >> export_model >> delete_model
list_models
Copy to clipboard