Amazon Bedrock

Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon via a single API, along with a broad set of capabilities you need to build generative AI applications with security, privacy, and responsible AI.

Prerequisite Tasks

To use these operators, you must do a few things:

Generic Parameters

aws_conn_id

Reference to Amazon Web Services Connection ID. If this parameter is set to None then the default boto3 behaviour is used without a connection lookup. Otherwise use the credentials stored in the Connection. Default: aws_default

region_name

AWS Region Name. If this parameter is set to None or omitted then region_name from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

verify

Whether or not to verify SSL certificates.

  • False - Do not validate SSL certificates.

  • path/to/cert/bundle.pem - A filename of the CA cert bundle to use. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore.

If this parameter is set to None or is omitted then verify from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

botocore_config

The provided dictionary is used to construct a botocore.config.Config. This configuration can be used to configure Avoid Throttling exceptions, timeouts, etc.

Example, for more detail about parameters please have a look botocore.config.Config
{
    "signature_version": "unsigned",
    "s3": {
        "us_east_1_regional_endpoint": True,
    },
    "retries": {
      "mode": "standard",
      "max_attempts": 10,
    },
    "connect_timeout": 300,
    "read_timeout": 300,
    "tcp_keepalive": True,
}

If this parameter is set to None or omitted then config_kwargs from AWS Connection Extra Parameter will be used. Otherwise use the specified value instead of the connection value. Default: None

Note

Specifying an empty dictionary, {}, will overwrite the connection configuration for botocore.config.Config

Operators

Invoke an existing Amazon Bedrock Model

To invoke an existing Amazon Bedrock model, you can use BedrockInvokeModelOperator.

Note that every model family has different input and output formats. Some examples are included below, but for details on the different formats, see Inference parameters for foundation models

For example, to invoke a Meta Llama model you would use:

tests/system/amazon/aws/example_bedrock.py

invoke_llama_model = BedrockInvokeModelOperator(
    task_id="invoke_llama",
    model_id=LLAMA_SHORT_MODEL_ID,
    input_data={"prompt": PROMPT},
)

To invoke an Amazon Titan model you would use:

tests/system/amazon/aws/example_bedrock.py

invoke_titan_model = BedrockInvokeModelOperator(
    task_id="invoke_titan",
    model_id=TITAN_SHORT_MODEL_ID,
    input_data={"inputText": PROMPT},
)

To invoke a Claude V2 model using the Completions API you would use:

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

invoke_claude_completions = BedrockInvokeModelOperator(
    task_id="invoke_claude_completions",
    model_id=CLAUDE_MODEL_ID,
    input_data={"max_tokens_to_sample": 4000, "prompt": f"\n\nHuman: {PROMPT}\n\nAssistant:"},
)

Customize an existing Amazon Bedrock Model

To create a fine-tuning job to customize a base model, you can use BedrockCustomizeModelOperator.

Model-customization jobs are asynchronous and the completion time depends on the base model and the training/validation data size. To monitor the state of the job, you can use the “model_customization_job_complete” Waiter, the BedrockCustomizeModelCompletedSensor Sensor, or the BedrockCustomizeModelCompletedTrigger Trigger.

tests/system/amazon/aws/example_bedrock.py

customize_model = BedrockCustomizeModelOperator(
    task_id="customize_model",
    job_name=custom_model_job_name,
    custom_model_name=custom_model_name,
    role_arn=test_context[ROLE_ARN_KEY],
    base_model_id=f"{model_arn_prefix}{TITAN_SHORT_MODEL_ID}",
    hyperparameters=HYPERPARAMETERS,
    training_data_uri=training_data_uri,
    output_data_uri=f"s3://{bucket_name}/myOutputData",
)

Provision Throughput for an existing Amazon Bedrock Model

To create a provisioned throughput with dedicated capacity for a foundation model or a fine-tuned model, you can use BedrockCreateProvisionedModelThroughputOperator.

Provision throughput jobs are asynchronous. To monitor the state of the job, you can use the “provisioned_model_throughput_complete” Waiter, the BedrockProvisionModelThroughputCompletedSensor Sensor, or the BedrockProvisionModelThroughputCompletedSensorTrigger Trigger.

tests/system/amazon/aws/example_bedrock.py

provision_throughput = BedrockCreateProvisionedModelThroughputOperator(
    task_id="provision_throughput",
    model_units=1,
    provisioned_model_name=provisioned_model_name,
    model_id=f"{model_arn_prefix}{TITAN_MODEL_ID}",
)

Create an Amazon Bedrock Knowledge Base

To create an Amazon Bedrock Knowledge Base, you can use BedrockCreateKnowledgeBaseOperator.

For more information on which models support embedding data into a vector store, see https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

create_knowledge_base = BedrockCreateKnowledgeBaseOperator(
    task_id="create_knowledge_base",
    name=knowledge_base_name,
    embedding_model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{TITAN_MODEL_ID}",
    role_arn=test_context[ROLE_ARN_KEY],
    storage_config={
        "type": "OPENSEARCH_SERVERLESS",
        "opensearchServerlessConfiguration": {
            "collectionArn": get_collection_arn(collection),
            "vectorIndexName": index_name,
            "fieldMapping": {
                "vectorField": "vector",
                "textField": "text",
                "metadataField": "text-metadata",
            },
        },
    },
)

Delete an Amazon Bedrock Knowledge Base

Deleting a Knowledge Base is a simple boto API call and can be done in a TaskFlow task like the example below.

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_knowledge_base(knowledge_base_id: str):
    """
    Delete the Amazon Bedrock knowledge base created earlier.

    .. seealso::
        For more information on how to use this sensor, take a look at the guide:
        :ref:`howto/operator:BedrockDeleteKnowledgeBase`

    :param knowledge_base_id: The unique identifier of the knowledge base to delete.
    """
    log.info("Deleting Knowledge Base %s.", knowledge_base_id)
    bedrock_agent_client.delete_knowledge_base(knowledgeBaseId=knowledge_base_id)


Create an Amazon Bedrock Data Source

To create an Amazon Bedrock Data Source, you can use BedrockCreateDataSourceOperator.

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

create_data_source = BedrockCreateDataSourceOperator(
    task_id="create_data_source",
    knowledge_base_id=create_knowledge_base.output,
    name=data_source_name,
    bucket_name=bucket_name,
)

Delete an Amazon Bedrock Data Source

Deleting a Data Source is a simple boto API call and can be done in a TaskFlow task like the example below.

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

@task(trigger_rule=TriggerRule.ALL_DONE)
def delete_data_source(knowledge_base_id: str, data_source_id: str):
    """
    Delete the Amazon Bedrock data source created earlier.

    .. seealso::
        For more information on how to use this sensor, take a look at the guide:
        :ref:`howto_operator:BedrockDeleteDataSource`

    :param knowledge_base_id: The unique identifier of the knowledge base which the data source is attached to.
    :param data_source_id: The unique identifier of the data source to delete.
    """
    log.info("Deleting data source %s from Knowledge Base %s.", data_source_id, knowledge_base_id)
    bedrock_agent_client.delete_data_source(dataSourceId=data_source_id, knowledgeBaseId=knowledge_base_id)


Ingest data into an Amazon Bedrock Data Source

To add data from an Amazon S3 bucket into an Amazon Bedrock Data Source, you can use BedrockIngestDataOperator.

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

ingest_data = BedrockIngestDataOperator(
    task_id="ingest_data",
    knowledge_base_id=create_knowledge_base.output,
    data_source_id=create_data_source.output,
)

Amazon Bedrock Retrieve

To query a knowledge base, you can use BedrockRetrieveOperator.

The response will only contain citations to sources that are relevant to the query. If you would like to pass the results through an LLM in order to generate a text response, see BedrockRaGOperator

For more information on which models support retrieving information from a knowledge base, see https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

retrieve = BedrockRetrieveOperator(
    task_id="retrieve",
    knowledge_base_id=create_knowledge_base.output,
    retrieval_query="Who was the CEO of Amazon in 1997?",
)

Amazon Bedrock Retrieve and Generate (RaG)

To query a knowledge base or external sources and generate a text response based on the retrieved results, you can use BedrockRaGOperator.

The response will contain citations to sources that are relevant to the query as well as a generated text reply. For more information on which models support retrieving information from a knowledge base, see https://docs.aws.amazon.com/bedrock/latest/userguide/knowledge-base-supported.html

NOTE: Support for “external sources” was added in boto 1.34.90

Example using an Amazon Bedrock Knowledge Base:

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

knowledge_base_rag = BedrockRaGOperator(
    task_id="knowledge_base_rag",
    input="Who was the CEO of Amazon on 2022?",
    source_type="KNOWLEDGE_BASE",
    model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/{CLAUDE_MODEL_ID}",
    knowledge_base_id=create_knowledge_base.output,
)

Example using a PDF file in an Amazon S3 Bucket:

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

external_sources_rag = BedrockRaGOperator(
    task_id="external_sources_rag",
    input="Who was the CEO of Amazon in 2022?",
    source_type="EXTERNAL_SOURCES",
    model_arn=f"arn:aws:bedrock:{region_name}::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
    sources=[
        {
            "sourceType": "S3",
            "s3Location": {"uri": f"s3://{bucket_name}/AMZN-2022-Shareholder-Letter.pdf"},
        }
    ],
)

Sensors

Wait for an Amazon Bedrock customize model job

To wait on the state of an Amazon Bedrock customize model job until it reaches a terminal state you can use BedrockCustomizeModelCompletedSensor

tests/system/amazon/aws/example_bedrock.py

await_custom_model_job = BedrockCustomizeModelCompletedSensor(
    task_id="await_custom_model_job",
    job_name=custom_model_job_name,
)

Wait for an Amazon Bedrock provision model throughput job

To wait on the state of an Amazon Bedrock provision model throughput job until it reaches a terminal state you can use BedrockProvisionModelThroughputCompletedSensor

tests/system/amazon/aws/example_bedrock.py

await_provision_throughput = BedrockProvisionModelThroughputCompletedSensor(
    task_id="await_provision_throughput",
    model_id=provision_throughput.output,
)

Wait for an Amazon Bedrock Knowledge Base

To wait on the state of an Amazon Bedrock Knowledge Base until it reaches a terminal state you can use BedrockKnowledgeBaseActiveSensor

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

await_knowledge_base = BedrockKnowledgeBaseActiveSensor(
    task_id="await_knowledge_base", knowledge_base_id=create_knowledge_base.output
)

Wait for an Amazon Bedrock ingestion job to finish

To wait on the state of an Amazon Bedrock data ingestion job until it reaches a terminal state you can use BedrockIngestionJobSensor

tests/system/amazon/aws/example_bedrock_retrieve_and_generate.py

await_ingest = BedrockIngestionJobSensor(
    task_id="await_ingest",
    knowledge_base_id=create_knowledge_base.output,
    data_source_id=create_data_source.output,
    ingestion_job_id=ingest_data.output,
)

Was this entry helpful?