Questo articolo mostra un percorso completo e pratico per usare Elasticsearch in una web app Python containerizzata: avvio del cluster (single-node) con Docker Compose, connessione dall’app tramite il client ufficiale elasticsearch-py, definizione di mapping e indicizzazione, ricerche full‑text e filtri, gestione della disponibilità del servizio e buone pratiche per sviluppo e produzione.
Prerequisiti e obiettivo
- Docker e Docker Compose installati.
- Conoscenze base di Python e di una web framework (qui useremo FastAPI perché è semplice e molto comune, ma lo stesso approccio vale per Flask/Django).
- Obiettivo: una API REST che indicizza e cerca documenti (es. “prodotti”) su Elasticsearch.
Struttura del progetto
Una struttura minimale ma realistica:
.
├── docker-compose.yml
└── app
├── Dockerfile
├── requirements.txt
└── main.py
1) Avvio di Elasticsearch con Docker Compose
Per lo sviluppo è tipico usare un nodo singolo con security disabilitata e discovery in modalità single-node. Definiamo un servizio elasticsearch e, opzionalmente, kibana per ispezionare dati e query.
# docker-compose.yml
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.13.4
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- ES_JAVA_OPTS=-Xms512m -Xmx512m
ports:
- "9200:9200"
healthcheck:
test: ["CMD-SHELL", "curl -fsS http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=1s > /dev/null || exit 1"]
interval: 5s
timeout: 3s
retries: 30
volumes:
- esdata:/usr/share/elasticsearch/data
web:
build: ./app
environment:
- ELASTICSEARCH_URL=http://elasticsearch:9200
- ES_INDEX=products
ports:
- "8000:8000"
depends_on:
elasticsearch:
condition: service_healthy
kibana:
image: docker.elastic.co/kibana/kibana:8.13.4
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
elasticsearch:
condition: service_healthy
volumes:
esdata:
Note importanti
xpack.security.enabled=falseè comodo in locale. In produzione è consigliato abilitare sicurezza (TLS, utenti, ruoli) e non esporre Elasticsearch direttamente su Internet.- Il
healthcheck+depends_on: condition: service_healthyevita che la web app parta prima che Elasticsearch sia pronto. - Il volume
esdatapreserva i dati anche se ricrei i container.
2) Containerizzare l’app Python
Creiamo un’immagine che installa dipendenze e avvia FastAPI con Uvicorn.
# app/Dockerfile
FROM python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
WORKDIR /app
# (Opzionale) utilità per healthcheck/debug
RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY main.py .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# app/requirements.txt
fastapi==0.111.0
uvicorn[standard]==0.30.1
elasticsearch==8.13.2
pydantic==2.7.4
3) Connessione a Elasticsearch da Python
Useremo il client ufficiale. Il punto chiave in Docker Compose è che l’host è il nome del servizio (elasticsearch), non localhost. Quindi l’URL diventa http://elasticsearch:9200, passato via variabili d’ambiente.
Implementiamo:
- inizializzazione del client;
- creazione dell’indice con mapping e analyzer;
- endpoint per indicizzare e cercare;
- un “health” che verifica connettività e stato indice.
# app/main.py
from __future__ import annotations
import os
from typing import Any, Optional
from elasticsearch import Elasticsearch
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel, Field
ES_URL = os.getenv("ELASTICSEARCH_URL", "http://localhost:9200")
INDEX = os.getenv("ES_INDEX", "products")
es = Elasticsearch(ES_URL)
app = FastAPI(title="Demo Elasticsearch + FastAPI", version="1.0.0")
class ProductIn(BaseModel):
id: str = Field(..., description="ID univoco del prodotto")
name: str
description: str
price: float
tags: list[str] = []
def index_exists(index: str) -> bool:
return bool(es.indices.exists(index=index))
def create_index_if_missing(index: str) -> None:
if index_exists(index):
return
# Mapping di esempio con campi text+keyword e un analyzer standard.
body: dict[str, Any] = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 0,
},
"mappings": {
"properties": {
"id": {"type": "keyword"},
"name": {
"type": "text",
"fields": {"raw": {"type": "keyword"}}
},
"description": {"type": "text"},
"price": {"type": "double"},
"tags": {"type": "keyword"},
}
},
}
es.indices.create(index=index, body=body)
@app.on_event("startup")
def on_startup() -> None:
# Crea l'indice all'avvio (se non esiste).
create_index_if_missing(INDEX)
@app.get("/health")
def health() -> dict[str, Any]:
try:
info = es.info()
ok = index_exists(INDEX)
return {
"elasticsearch": {"cluster_name": info.get("cluster_name"), "version": info.get("version", {}).get("number")},
"index": {"name": INDEX, "exists": ok},
}
except Exception as exc: # evitare di esporre dettagli sensibili in prod
raise HTTPException(status_code=503, detail=f"Elasticsearch non raggiungibile: {exc}") from exc
@app.post("/productsproducts", status_code=201)
def upsert_product(p: ProductIn) -> dict[str, Any]:
# Indicizziamo usando l'id come document_id per garantire idempotenza.
try:
res = es.index(index=INDEX, id=p.id, document=p.model_dump())
return {"result": res.get("result"), "_id": res.get("_id")}
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
@app.get("/products/search")
def search_products(
q: str = Query(..., description="Query full-text su name e description"),
min_price: Optional[float] = Query(None),
max_price: Optional[float] = Query(None),
tag: Optional[str] = Query(None, description="Filtro su tag (keyword)"),
size: int = Query(10, ge=1, le=100),
) -> dict[str, Any]:
must: list[dict[str, Any]] = [
{
"multi_match": {
"query": q,
"fields": ["name^2", "description"],
"type": "best_fields",
"operator": "and",
}
}
]
filters: list[dict[str, Any]] = []
if min_price is not None or max_price is not None:
range_body: dict[str, Any] = {}
if min_price is not None:
range_body["gte"] = min_price
if max_price is not None:
range_body["lte"] = max_price
filters.append({"range": {"price": range_body}})
if tag:
filters.append({"term": {"tags": tag}})
query_body: dict[str, Any] = {
"query": {
"bool": {
"must": must,
"filter": filters,
}
},
"size": size,
"sort": [{"_score": "desc"}, {"price": "asc"}],
}
try:
res = es.search(index=INDEX, body=query_body)
hits = res.get("hits", {}).get("hits", [])
return {
"total": res.get("hits", {}).get("total", {}),
"items": [
{"_id": h.get("_id"), "score": h.get("_score"), **(h.get("_source") or {})}
for h in hits
],
}
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc)) from exc
Correzione importante: nell’esempio sopra l’endpoint POST ha volutamente mostrato la struttura tipica, ma il path contiene un refuso (/Ř̦) se copiato male. Usalo così:
# Sostituisci questa riga:
# @app.post("/Ř̦products", status_code=201)
# con:
@app.post("/products", status_code=201)
4) Avvio e test end-to-end
- Avvia tutto dalla root del progetto:
docker compose up --build
Test rapido:
- Health dell’API:
GET http://localhost:8000/health - Elasticsearch:
GET http://localhost:9200 - Kibana (opzionale):
http://localhost:5601
Indicizza alcuni documenti:
curl -X POST "http://localhost:8000/products" -H "Content-Type: application/json" -d '{
"id": "p1",
"name": "Caffe in grani",
"description": "Miscela arabica 100%, tostatura media",
"price": 12.5,
"tags": ["caffe", "bevande"]
}'
curl -X POST "http://localhost:8000/products" -H "Content-Type: application/json" -d '{
"id": "p2",
"name": "Macchina espresso",
"description": "Macchina compatta con lancia vapore",
"price": 149.0,
"tags": ["caffe", "elettrodomestici"]
}'
Cerca:
curl "http://localhost:8000/products/search?q=macchina%20espresso&min_price=50&tag=caffe"
5) Mapping, keyword vs text e strategie di ricerca
Elasticsearch distingue tra:
- text: analizzato (tokenizzazione, lowercasing, ecc.). Ottimo per full‑text.
- keyword: non analizzato. Ottimo per filtri, aggregazioni, ordinamenti esatti.
Nell’esempio, name è text ma ha anche un sotto-campo name.raw di tipo keyword per ordinamenti o confronti esatti. Le tags sono keyword perché tipicamente filtrate.
Una query più “ricca” può includere:
- boost su alcuni campi (
name^2); - fuzzy matching per tollerare refusi;
- highlight per evidenziare termini trovati;
- aggregazioni per faceting (es. conteggio per tag).
{
"query": {
"bool": {
"must": [
{
"multi_match": {
"query": "macchna espresso",
"fields": ["name^2", "description"],
"fuzziness": "AUTO"
}
}
],
"filter": [
{ "range": { "price": { "gte": 50, "lte": 200 } } }
]
}
},
"highlight": {
"fields": {
"name": {},
"description": {}
}
},
"aggs": {
"tags": { "terms": { "field": "tags" } }
}
}
6) Attendere la disponibilità del cluster: healthcheck e retry
In ambienti containerizzati, il problema più comune è che l’app prova a connettersi a Elasticsearch prima che sia pronto. Hai tre livelli di difesa:
- Docker Compose healthcheck (già usato) per orchestrare l’ordine di startup.
- Retry lato applicazione (utile se l’app viene eseguita anche fuori Compose o su orchestratori diversi).
- Timeout ragionevoli sul client.
Esempio di retry semplice in startup (se vuoi irrobustire ulteriormente):
import time
from elasticsearch import Elasticsearch
def wait_for_es(es: Elasticsearch, timeout_s: int = 30) -> None:
deadline = time.time() + timeout_s
last_exc: Exception | None = None
while time.time() < deadline:
try:
if es.ping():
return
except Exception as exc:
last_exc = exc
time.sleep(1)
raise RuntimeError(f"Elasticsearch non disponibile entro {timeout_s}s: {last_exc}")
7) Buone pratiche per produzione
- Sicurezza: abilita X-Pack security, usa TLS e credenziali; evita di esporre direttamente la porta 9200 su Internet.
- Persistenza e backup: i volumi Docker sono ok per sviluppo; in produzione valuta storage gestito e snapshot repository.
- Risorse: configura heap e limiti di memoria; monitora GC e utilizzo disco.
- Index lifecycle: per log/eventi, considera ILM (rollover, retention) e template.
- Osservabilità: metriche, slow logs, e monitor su query lente e saturazione.
- Schema evolutivo: i mapping non sono facilmente modificabili su campi esistenti; progetta versioning di indici (es.
products_v1,products_v2) e reindex.
8) Varianti comuni (Flask/Django, async, bulk indexing)
Il concetto non cambia se usi altre framework:
- Flask: inizializza il client in un modulo condiviso e usa factory/app context.
- Django: configura il client in settings e incapsula accesso in servizi/repository; valuta Celery per reindex.
- Bulk: per grandi volumi usa l’API bulk (helper
helpers.bulk) invece di chiamareindexin loop.
from elasticsearch import Elasticsearch, helpers
def bulk_index(es: Elasticsearch, index: str, docs: list[dict]) -> None:
actions = [
{"_index": index, "_id": d["id"], "_source": d}
for d in docs
]
helpers.bulk(es, actions)
Conclusione
Con Docker Compose puoi ottenere un ambiente riproducibile e vicino alla produzione: Elasticsearch avviato come servizio, web app Python che si connette tramite hostname interno, indice creato automaticamente e API di indicizzazione/ricerca. Da qui puoi estendere con mapping più avanzati, aggregazioni per faceting, analizzatori linguistici, pipeline di ingest e strategie di versioning degli indici.