Skip to content

NATS

Since testcontainers-python v4.6.0

Introduction

The Testcontainers module for NATS.

Adding this module to your project dependencies

Please run the following command to add the NATS module to your python dependencies:

pip install testcontainers[nats] nats-py

Usage example

import asyncio

import json



from nats.aio.client import Client as NATS

from nats.aio.msg import Msg



from testcontainers.nats import NatsContainer





async def message_handler(msg: Msg):

    subject = msg.subject

    data = msg.data.decode()

    print(f"Received message on {subject}: {data}")





async def basic_example():

    with NatsContainer() as nats_container:

        # Get connection parameters

        host = nats_container.get_container_host_ip()

        port = nats_container.get_exposed_port(nats_container.port)



        # Create NATS client

        nc = NATS()

        await nc.connect(f"nats://{host}:{port}")

        print("Connected to NATS")



        # Create JetStream context

        js = nc.jetstream()



        # Create stream

        stream = await js.add_stream(name="test-stream", subjects=["test.>"])

        print(f"\nCreated stream: {stream.config.name}")



        # Create consumer

        consumer = await js.add_consumer(stream_name="test-stream", durable_name="test-consumer")

        print(f"Created consumer: {consumer.name}")



        # Subscribe to subjects

        subjects = ["test.1", "test.2", "test.3"]

        for subject in subjects:

            await nc.subscribe(subject, cb=message_handler)

            print(f"Subscribed to {subject}")



        # Publish messages

        messages = {"test.1": "Hello from test.1", "test.2": "Hello from test.2", "test.3": "Hello from test.3"}



        for subject, message in messages.items():

            await nc.publish(subject, message.encode())

            print(f"Published to {subject}")



        # Publish with headers

        headers = {"header1": "value1", "header2": "value2"}

        await nc.publish("test.headers", b"Message with headers", headers=headers)

        print("\nPublished message with headers")



        # Publish with reply

        reply_subject = "test.reply"

        await nc.subscribe(reply_subject, cb=message_handler)

        print(f"Subscribed to {reply_subject}")



        response = await nc.request("test.request", b"Request message", timeout=1)

        print(f"Received reply: {response.data.decode()}")



        # Publish to JetStream

        for subject, message in messages.items():

            ack = await js.publish(subject, message.encode())

            print(f"Published to JetStream {subject}: {ack.stream}")



        # Get stream info

        stream_info = await js.stream_info("test-stream")

        print("\nStream info:")

        print(

            json.dumps(

                {

                    "name": stream_info.config.name,

                    "subjects": stream_info.config.subjects,

                    "messages": stream_info.state.messages,

                    "bytes": stream_info.state.bytes,

                },

                indent=2,

            )

        )



        # Get consumer info

        consumer_info = await js.consumer_info("test-stream", "test-consumer")

        print("\nConsumer info:")

        print(

            json.dumps(

                {

                    "name": consumer_info.name,

                    "stream_name": consumer_info.stream_name,

                    "delivered": consumer_info.delivered.stream_seq,

                    "ack_floor": consumer_info.ack_floor.stream_seq,

                },

                indent=2,

            )

        )



        # Create key-value store

        kv = await js.create_key_value(bucket="test-kv", history=5, ttl=3600)

        print("\nCreated key-value store")



        # Put values

        await kv.put("key1", b"value1")

        await kv.put("key2", b"value2")

        print("Put values in key-value store")



        # Get values

        entry = await kv.get("key1")

        print(f"Got value: {entry.value.decode()}")



        # List keys

        keys = await kv.keys()

        print("\nKeys in store:")

        for key in keys:

            print(f"- {key}")



        # Delete key

        await kv.delete("key1")

        print("Deleted key1")



        # Create object store

        os = await js.create_object_store(bucket="test-os", ttl=3600)

        print("\nCreated object store")



        # Put object

        await os.put("test.txt", b"Hello from object store")

        print("Put object in store")



        # Get object

        obj = await os.get("test.txt")

        print(f"Got object: {obj.data.decode()}")



        # List objects

        objects = await os.list()

        print("\nObjects in store:")

        for obj in objects:

            print(f"- {obj.name}")



        # Delete object

        await os.delete("test.txt")

        print("Deleted object")



        # Clean up

        await js.delete_stream("test-stream")

        print("\nDeleted stream")



        await nc.close()





if __name__ == "__main__":

    asyncio.run(basic_example())