Skip to content

MQTT

Since testcontainers-python v4.7.0

Introduction

The Testcontainers module for MQTT.

Adding this module to your project dependencies

Please run the following command to add the MQTT module to your python dependencies:

pip install testcontainers[mqtt] paho-mqtt

Usage example

import time



import paho.mqtt.client as mqtt



from testcontainers.mqtt import MqttContainer





def basic_example():

    with MqttContainer() as mqtt_container:

        # Get connection parameters

        host = mqtt_container.get_container_host_ip()

        port = mqtt_container.get_exposed_port(mqtt_container.port)



        # Create MQTT client

        client = mqtt.Client()



        # Define callback functions

        def on_connect(client, userdata, flags, rc):

            print(f"Connected with result code {rc}")

            # Subscribe to topics

            client.subscribe("test/topic")



        def on_message(client, userdata, msg):

            print(f"Received message on topic {msg.topic}: {msg.payload.decode()}")



        # Set callbacks

        client.on_connect = on_connect

        client.on_message = on_message



        # Connect to broker

        client.connect(host, port)

        client.loop_start()



        # Publish test messages

        test_messages = ["Hello MQTT!", "This is a test message", "MQTT is working!"]



        for msg in test_messages:

            client.publish("test/topic", msg)

            print(f"Published message: {msg}")

            time.sleep(1)  # Wait a bit between messages



        # Wait for messages to be processed

        time.sleep(2)



        # Clean up

        client.loop_stop()

        client.disconnect()





if __name__ == "__main__":

    basic_example()