Cassandra¶
Since testcontainers-python v4.8.0
Introduction¶
The Testcontainers module for Cassandra.
Adding this module to your project dependencies¶
Please run the following command to add the Cassandra module to your python dependencies:
pip install testcontainers[cassandra] cassandra-driver
Usage example¶
import json
from datetime import datetime
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from testcontainers.cassandra import CassandraContainer
def basic_example():
with CassandraContainer() as cassandra:
# Get connection parameters
host = cassandra.get_container_host_ip()
port = cassandra.get_exposed_port(cassandra.port)
username = cassandra.username
password = cassandra.password
# Create Cassandra client
auth_provider = PlainTextAuthProvider(username=username, password=password)
cluster = Cluster([host], port=port, auth_provider=auth_provider)
session = cluster.connect()
print("Connected to Cassandra")
# Create keyspace
session.execute("""
CREATE KEYSPACE IF NOT EXISTS test_keyspace
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}
""")
print("Created keyspace")
# Use keyspace
session.set_keyspace("test_keyspace")
# Create table
session.execute("""
CREATE TABLE IF NOT EXISTS test_table (
id UUID PRIMARY KEY,
name text,
value int,
category text,
created_at timestamp
)
""")
print("Created table")
# Insert test data
test_data = [
{
"id": "550e8400-e29b-41d4-a716-446655440000",
"name": "test1",
"value": 100,
"category": "A",
"created_at": datetime.utcnow(),
},
{
"id": "550e8400-e29b-41d4-a716-446655440001",
"name": "test2",
"value": 200,
"category": "B",
"created_at": datetime.utcnow(),
},
{
"id": "550e8400-e29b-41d4-a716-446655440002",
"name": "test3",
"value": 300,
"category": "A",
"created_at": datetime.utcnow(),
},
]
insert_stmt = session.prepare("""
INSERT INTO test_table (id, name, value, category, created_at)
VALUES (uuid(), ?, ?, ?, ?)
""")
for data in test_data:
session.execute(insert_stmt, (data["name"], data["value"], data["category"], data["created_at"]))
print("Inserted test data")
# Query data
print("\nQuery results:")
rows = session.execute("SELECT * FROM test_table WHERE category = 'A' ALLOW FILTERING")
for row in rows:
print(
json.dumps(
{
"id": str(row.id),
"name": row.name,
"value": row.value,
"category": row.category,
"created_at": row.created_at.isoformat(),
},
indent=2,
)
)
# Create materialized view
session.execute("""
CREATE MATERIALIZED VIEW IF NOT EXISTS test_view AS
SELECT category, name, value, created_at
FROM test_table
WHERE category IS NOT NULL AND name IS NOT NULL
PRIMARY KEY (category, name)
""")
print("\nCreated materialized view")
# Query materialized view
print("\nMaterialized view results:")
rows = session.execute("SELECT * FROM test_view WHERE category = 'A'")
for row in rows:
print(
json.dumps(
{
"category": row.category,
"name": row.name,
"value": row.value,
"created_at": row.created_at.isoformat(),
},
indent=2,
)
)
# Create secondary index
session.execute("CREATE INDEX IF NOT EXISTS ON test_table (value)")
print("\nCreated secondary index")
# Query using secondary index
print("\nQuery using secondary index:")
rows = session.execute("SELECT * FROM test_table WHERE value > 150 ALLOW FILTERING")
for row in rows:
print(
json.dumps(
{
"id": str(row.id),
"name": row.name,
"value": row.value,
"category": row.category,
"created_at": row.created_at.isoformat(),
},
indent=2,
)
)
# Get table metadata
table_meta = session.cluster.metadata.keyspaces["test_keyspace"].tables["test_table"]
print("\nTable metadata:")
print(f"Columns: {[col.name for col in table_meta.columns.values()]}")
print(f"Partition key: {[col.name for col in table_meta.partition_key]}")
print(f"Clustering key: {[col.name for col in table_meta.clustering_key]}")
if __name__ == "__main__":
basic_example()