Skip to content

RabbitMQ

Since testcontainers-python v4.6.0

Introduction

The Testcontainers module for RabbitMQ.

Adding this module to your project dependencies

Please run the following command to add the RabbitMQ module to your python dependencies:

pip install testcontainers[rabbitmq] pika

Usage example

import json

import time

from threading import Thread



import pika



from testcontainers.rabbitmq import RabbitMQContainer





def basic_example():

    with RabbitMQContainer() as rabbitmq:

        # Get connection parameters

        host = rabbitmq.get_container_host_ip()

        port = rabbitmq.get_exposed_port(rabbitmq.port)

        username = rabbitmq.username

        password = rabbitmq.password



        # Create connection

        credentials = pika.PlainCredentials(username, password)

        parameters = pika.ConnectionParameters(host=host, port=port, credentials=credentials)

        connection = pika.BlockingConnection(parameters)

        channel = connection.channel()

        print("Connected to RabbitMQ")



        # Declare exchange

        exchange_name = "test_exchange"

        channel.exchange_declare(exchange=exchange_name, exchange_type="direct", durable=True)

        print(f"Declared exchange: {exchange_name}")



        # Declare queues

        queues = {"queue1": "routing_key1", "queue2": "routing_key2"}



        for queue_name, routing_key in queues.items():

            channel.queue_declare(queue=queue_name, durable=True)

            channel.queue_bind(exchange=exchange_name, queue=queue_name, routing_key=routing_key)

            print(f"Declared and bound queue: {queue_name}")



        # Define message handler

        def message_handler(ch, method, properties, body):

            message = json.loads(body)

            print(f"\nReceived message on {method.routing_key}:")

            print(json.dumps(message, indent=2))

            ch.basic_ack(delivery_tag=method.delivery_tag)



        # Start consuming in a separate thread

        def consume_messages():

            channel.basic_qos(prefetch_count=1)

            for queue_name in queues:

                channel.basic_consume(queue=queue_name, on_message_callback=message_handler)

            channel.start_consuming()



        consumer_thread = Thread(target=consume_messages)

        consumer_thread.daemon = True

        consumer_thread.start()



        # Publish messages

        test_messages = [

            {

                "queue": "queue1",

                "routing_key": "routing_key1",

                "message": {"id": 1, "content": "Message for queue 1", "timestamp": time.time()},

            },

            {

                "queue": "queue2",

                "routing_key": "routing_key2",

                "message": {"id": 2, "content": "Message for queue 2", "timestamp": time.time()},

            },

        ]



        for msg in test_messages:

            channel.basic_publish(

                exchange=exchange_name,

                routing_key=msg["routing_key"],

                body=json.dumps(msg["message"]),

                properties=pika.BasicProperties(

                    delivery_mode=2,  # make message persistent

                    content_type="application/json",

                ),

            )

            print(f"Published message to {msg['queue']}")



        # Wait for messages to be processed

        time.sleep(2)



        # Get queue information

        print("\nQueue information:")

        for queue_name in queues:

            queue = channel.queue_declare(queue=queue_name, passive=True)

            print(f"{queue_name}:")

            print(f"  Messages: {queue.method.message_count}")

            print(f"  Consumers: {queue.method.consumer_count}")



        # Clean up

        connection.close()





if __name__ == "__main__":

    basic_example()