Azurite¶
Since testcontainers-python v4.6.0
Introduction¶
The Testcontainers module for Azurite.
Adding this module to your project dependencies¶
Please run the following command to add the Azurite module to your python dependencies:
pip install testcontainers[azurite] azure-storage-blob
Usage example¶
import json
from azure.storage.blob import BlobServiceClient
from azure.storage.queue import QueueServiceClient
from testcontainers.azurite import AzuriteContainer
def basic_example():
with AzuriteContainer() as azurite:
# Get connection string
connection_string = azurite.get_connection_string()
# Create BlobServiceClient
blob_service_client = BlobServiceClient.from_connection_string(connection_string)
# Create QueueServiceClient
queue_service_client = QueueServiceClient.from_connection_string(connection_string)
# Create a test container
container_name = "test-container"
container_client = blob_service_client.create_container(container_name)
print(f"Created container: {container_name}")
# Upload test blobs
test_data = [
{"name": "test1", "value": 100, "category": "A"},
{"name": "test2", "value": 200, "category": "B"},
{"name": "test3", "value": 300, "category": "A"},
]
for i, data in enumerate(test_data, 1):
blob_name = f"test{i}.json"
blob_client = container_client.get_blob_client(blob_name)
blob_client.upload_blob(json.dumps(data), overwrite=True)
print(f"Uploaded blob: {blob_name}")
# List blobs
print("\nBlobs in container:")
for blob in container_client.list_blobs():
print(f"Name: {blob.name}, Size: {blob.size} bytes")
# Download and read a blob
blob_client = container_client.get_blob_client("test1.json")
blob_data = blob_client.download_blob()
content = json.loads(blob_data.readall())
print("\nBlob content:")
print(json.dumps(content, indent=2))
# Create a test queue
queue_name = "test-queue"
queue_client = queue_service_client.create_queue(queue_name)
print(f"\nCreated queue: {queue_name}")
# Send test messages
test_messages = ["Hello Azurite!", "This is a test message", "Queue is working!"]
for msg in test_messages:
queue_client.send_message(msg)
print(f"Sent message: {msg}")
# Receive messages
print("\nReceived messages:")
for _ in range(len(test_messages)):
message = queue_client.receive_message()
if message:
print(f"Message: {message.content}")
queue_client.delete_message(message.id, message.pop_receipt)
print("Deleted message")
if __name__ == "__main__":
basic_example()