Elasticsearch¶
Since testcontainers-python v4.6.0
Introduction¶
The Testcontainers module for Elasticsearch.
Adding this module to your project dependencies¶
Please run the following command to add the Elasticsearch module to your python dependencies:
pip install testcontainers[elasticsearch]
Usage example¶
import json
from datetime import datetime
from elasticsearch import Elasticsearch
from testcontainers.elasticsearch import ElasticsearchContainer
def basic_example():
with ElasticsearchContainer() as elasticsearch:
# Get connection parameters
host = elasticsearch.get_container_host_ip()
port = elasticsearch.get_exposed_port(elasticsearch.port)
# Create Elasticsearch client
es = Elasticsearch(f"http://{host}:{port}")
print("Connected to Elasticsearch")
# Create index
index_name = "test_index"
index_settings = {
"settings": {"number_of_shards": 1, "number_of_replicas": 0},
"mappings": {
"properties": {
"name": {"type": "text"},
"value": {"type": "integer"},
"category": {"type": "keyword"},
"created_at": {"type": "date"},
}
},
}
if not es.indices.exists(index=index_name):
es.indices.create(index=index_name, body=index_settings)
print(f"Created index: {index_name}")
# Insert test documents
test_docs = [
{"name": "test1", "value": 100, "category": "A", "created_at": datetime.utcnow()},
{"name": "test2", "value": 200, "category": "B", "created_at": datetime.utcnow()},
{"name": "test3", "value": 300, "category": "A", "created_at": datetime.utcnow()},
]
for i, doc in enumerate(test_docs, 1):
es.index(index=index_name, id=i, document=doc)
print("Inserted test documents")
# Refresh index
es.indices.refresh(index=index_name)
# Search documents
search_query = {"query": {"bool": {"must": [{"term": {"category": "A"}}]}}}
print("\nSearch results:")
response = es.search(index=index_name, body=search_query)
for hit in response["hits"]["hits"]:
print(json.dumps(hit["_source"], default=str, indent=2))
# Execute aggregation
agg_query = {
"size": 0,
"aggs": {
"categories": {
"terms": {"field": "category"},
"aggs": {
"avg_value": {"avg": {"field": "value"}},
"min_value": {"min": {"field": "value"}},
"max_value": {"max": {"field": "value"}},
},
}
},
}
print("\nAggregation results:")
response = es.search(index=index_name, body=agg_query)
for bucket in response["aggregations"]["categories"]["buckets"]:
print(f"\nCategory: {bucket['key']}")
print(f"Count: {bucket['doc_count']}")
print(f"Avg value: {bucket['avg_value']['value']:.2f}")
print(f"Min value: {bucket['min_value']['value']}")
print(f"Max value: {bucket['max_value']['value']}")
# Update document
update_body = {"doc": {"value": 150, "updated_at": datetime.utcnow()}}
es.update(index=index_name, id=1, body=update_body)
print("\nUpdated document")
# Get document
doc = es.get(index=index_name, id=1)
print("\nUpdated document:")
print(json.dumps(doc["_source"], default=str, indent=2))
# Delete document
es.delete(index=index_name, id=2)
print("\nDeleted document")
# Get index stats
stats = es.indices.stats(index=index_name)
print("\nIndex stats:")
print(f"Documents: {stats['indices'][index_name]['total']['docs']['count']}")
print(f"Size: {stats['indices'][index_name]['total']['store']['size_in_bytes']} bytes")
if __name__ == "__main__":
basic_example()