Skip to content

Kafka

Since testcontainers-python v4.6.0

Introduction

The Testcontainers module for Kafka.

Adding this module to your project dependencies

Please run the following command to add the Kafka module to your python dependencies:

pip install testcontainers[kafka]

Usage example

import json

import time

from datetime import datetime

from threading import Thread



from kafka import KafkaConsumer, KafkaProducer



from testcontainers.kafka import KafkaContainer





def basic_example():

    with KafkaContainer() as kafka:

        # Get connection parameters

        bootstrap_servers = kafka.get_bootstrap_server()



        # Create Kafka producer

        producer = KafkaProducer(

            bootstrap_servers=bootstrap_servers, value_serializer=lambda v: json.dumps(v).encode("utf-8")

        )

        print("Created Kafka producer")



        # Create Kafka consumer

        consumer = KafkaConsumer(

            bootstrap_servers=bootstrap_servers,

            value_deserializer=lambda v: json.loads(v.decode("utf-8")),

            auto_offset_reset="earliest",

            group_id="test_group",

        )

        print("Created Kafka consumer")



        # Define topics

        topics = ["test_topic1", "test_topic2"]



        # Subscribe to topics

        consumer.subscribe(topics)

        print(f"Subscribed to topics: {topics}")



        # Start consuming in a separate thread

        def consume_messages():

            for message in consumer:

                print(f"\nReceived message from {message.topic}:")

                print(json.dumps(message.value, indent=2))



        consumer_thread = Thread(target=consume_messages)

        consumer_thread.daemon = True

        consumer_thread.start()



        # Produce test messages

        test_messages = [

            {

                "topic": "test_topic1",

                "message": {"id": 1, "content": "Message for topic 1", "timestamp": datetime.utcnow().isoformat()},

            },

            {

                "topic": "test_topic2",

                "message": {"id": 2, "content": "Message for topic 2", "timestamp": datetime.utcnow().isoformat()},

            },

        ]



        for msg in test_messages:

            producer.send(msg["topic"], msg["message"])

            print(f"Sent message to {msg['topic']}")



        # Wait for messages to be processed

        time.sleep(2)



        # Get topic information

        print("\nTopic information:")

        for topic in topics:

            partitions = consumer.partitions_for_topic(topic)

            print(f"{topic}:")

            print(f"  Partitions: {partitions}")



        # Clean up

        producer.close()

        consumer.close()





if __name__ == "__main__":

    basic_example()