Apache Kafka Operators

ConsumeFromTopicOperator

An operator that consumes from one or more Kafka topic(s) and processes the messages. The operator creates a Kafka Consumer that reads a batch of messages from the cluster and processes them using the user-supplied callable apply_function. The consumer will continue to read in batches until it reaches the end of the log or a maximum number of messages read (max_messages) is reached.

For parameter definitions take a look at ConsumeFromTopicOperator.

Using the operator

tests/system/apache/kafka/example_dag_hello_kafka.py[source]

t2 = ConsumeFromTopicOperator(
    kafka_config_id="t2",
    task_id="consume_from_topic",
    topics=["test_1"],
    apply_function="example_dag_hello_kafka.consumer_function",
    apply_function_kwargs={"prefix": "consumed:::"},
    commit_cadence="end_of_batch",
    max_messages=10,
    max_batch_size=2,
)

Reference

For further information, see the Apache Kafka Consumer documentation.

ProduceToTopicOperator

An operator that produces messages to a Kafka topic. The operator will produce messages created as key/value pairs by the user-supplied producer_function.

For parameter definitions take a look at ProduceToTopicOperator.

Using the operator

tests/system/apache/kafka/example_dag_hello_kafka.py[source]

t1 = ProduceToTopicOperator(
    kafka_config_id="t1-3",
    task_id="produce_to_topic",
    topic="test_1",
    producer_function="example_dag_hello_kafka.producer_function",
)

Reference

For further information, see the Apache Kafka Producer documentation.

Was this entry helpful?