Operators

Ingest data into a pinecone index

Use the PineconeIngestOperator to interact with Pinecone APIs to ingest vectors.

Using the Operator

The PineconeIngestOperator requires the vectors as an input ingest into Pinecone. Use the conn_id parameter to specify the Pinecone connection to use to connect to your account. The vectors could also contain metadata referencing the original text corresponding to the vectors that could be ingested into the database.

An example using the operator in this way:

tests/system/pinecone/example_dag_pinecone.py[source]

PineconeIngestOperator(
    task_id="pinecone_vector_ingest",
    index_name=index_name,
    input_vectors=[
        ("id1", [1.0, 2.0, 3.0], {"key": "value"}),
        ("id2", [1.0, 2.0, 3.0]),
    ],
    namespace=namespace,
    batch_size=1,
)

Create a Pod based Index

Use the CreatePodIndexOperator to interact with Pinecone APIs to create a Pod based Index.

Using the Operator

The CreatePodIndexOperator requires the index details as well as the pod configuration details. api_key, environment can be passed via arguments to the operator or via the connection.

An example using the operator in this way:

tests/system/pinecone/example_create_pod_index.py[source]

# reference: https://docs.pinecone.io/reference/api/control-plane/create_index
create_index = CreatePodIndexOperator(
    task_id="pinecone_create_pod_index",
    index_name=index_name,
    dimension=3,
    replicas=1,
    shards=1,
    pods=1,
    pod_type="p1.x1",
)

Create a Serverless Index

Use the CreateServerlessIndexOperator to interact with Pinecone APIs to create a Pod based Index.

Using the Operator

The CreateServerlessIndexOperator requires the index details as well as the Serverless configuration details. api_key, environment can be passed via arguments to the operator or via the connection.

An example using the operator in this way:

tests/system/pinecone/example_create_serverless_index.py[source]

# reference: https://docs.pinecone.io/reference/api/control-plane/create_index
create_index = CreateServerlessIndexOperator(
    task_id="pinecone_create_serverless_index",
    index_name=index_name,
    dimension=128,
    cloud="aws",
    region="us-west-2",
    metric="cosine",
)

Was this entry helpful?