Elasticsearch in un’applicazione web Python con Docker e Docker Compose

Questo articolo mostra un percorso completo e pratico per usare Elasticsearch in una web app Python containerizzata: avvio del cluster (single-node) con Docker Compose, connessione dall’app tramite il client ufficiale elasticsearch-py, definizione di mapping e indicizzazione, ricerche full‑text e filtri, gestione della disponibilità del servizio e buone pratiche per sviluppo e produzione.

Prerequisiti e obiettivo

  • Docker e Docker Compose installati.
  • Conoscenze base di Python e di una web framework (qui useremo FastAPI perché è semplice e molto comune, ma lo stesso approccio vale per Flask/Django).
  • Obiettivo: una API REST che indicizza e cerca documenti (es. “prodotti”) su Elasticsearch.

Struttura del progetto

Una struttura minimale ma realistica:

.
├── docker-compose.yml
└── app
    ├── Dockerfile
    ├── requirements.txt
    └── main.py

1) Avvio di Elasticsearch con Docker Compose

Per lo sviluppo è tipico usare un nodo singolo con security disabilitata e discovery in modalità single-node. Definiamo un servizio elasticsearch e, opzionalmente, kibana per ispezionare dati e query.

# docker-compose.yml
services:
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.13.4
    environment:
      - discovery.type=single-node
      - xpack.security.enabled=false
      - ES_JAVA_OPTS=-Xms512m -Xmx512m
    ports:
      - "9200:9200"
    healthcheck:
      test: ["CMD-SHELL", "curl -fsS http://localhost:9200/_cluster/health?wait_for_status=yellow&timeout=1s > /dev/null || exit 1"]
      interval: 5s
      timeout: 3s
      retries: 30
    volumes:
      - esdata:/usr/share/elasticsearch/data

  web:
    build: ./app
    environment:
      - ELASTICSEARCH_URL=http://elasticsearch:9200
      - ES_INDEX=products
    ports:
      - "8000:8000"
    depends_on:
      elasticsearch:
        condition: service_healthy

  kibana:
    image: docker.elastic.co/kibana/kibana:8.13.4
    environment:
      - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    depends_on:
      elasticsearch:
        condition: service_healthy

volumes:
  esdata:

Note importanti

  • xpack.security.enabled=false è comodo in locale. In produzione è consigliato abilitare sicurezza (TLS, utenti, ruoli) e non esporre Elasticsearch direttamente su Internet.
  • Il healthcheck + depends_on: condition: service_healthy evita che la web app parta prima che Elasticsearch sia pronto.
  • Il volume esdata preserva i dati anche se ricrei i container.

2) Containerizzare l’app Python

Creiamo un’immagine che installa dipendenze e avvia FastAPI con Uvicorn.

# app/Dockerfile
FROM python:3.12-slim

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

WORKDIR /app

# (Opzionale) utilità per healthcheck/debug
RUN apt-get update && apt-get install -y --no-install-recommends curl && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY main.py .

EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# app/requirements.txt
fastapi==0.111.0
uvicorn[standard]==0.30.1
elasticsearch==8.13.2
pydantic==2.7.4

3) Connessione a Elasticsearch da Python

Useremo il client ufficiale. Il punto chiave in Docker Compose è che l’host è il nome del servizio (elasticsearch), non localhost. Quindi l’URL diventa http://elasticsearch:9200, passato via variabili d’ambiente.

Implementiamo:

  • inizializzazione del client;
  • creazione dell’indice con mapping e analyzer;
  • endpoint per indicizzare e cercare;
  • un “health” che verifica connettività e stato indice.
# app/main.py
from __future__ import annotations

import os
from typing import Any, Optional

from elasticsearch import Elasticsearch
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel, Field

ES_URL = os.getenv("ELASTICSEARCH_URL", "http://localhost:9200")
INDEX = os.getenv("ES_INDEX", "products")

es = Elasticsearch(ES_URL)

app = FastAPI(title="Demo Elasticsearch + FastAPI", version="1.0.0")


class ProductIn(BaseModel):
    id: str = Field(..., description="ID univoco del prodotto")
    name: str
    description: str
    price: float
    tags: list[str] = []


def index_exists(index: str) -> bool:
    return bool(es.indices.exists(index=index))


def create_index_if_missing(index: str) -> None:
    if index_exists(index):
        return

    # Mapping di esempio con campi text+keyword e un analyzer standard.
    body: dict[str, Any] = {
        "settings": {
            "number_of_shards": 1,
            "number_of_replicas": 0,
        },
        "mappings": {
            "properties": {
                "id": {"type": "keyword"},
                "name": {
                    "type": "text",
                    "fields": {"raw": {"type": "keyword"}}
                },
                "description": {"type": "text"},
                "price": {"type": "double"},
                "tags": {"type": "keyword"},
            }
        },
    }

    es.indices.create(index=index, body=body)


@app.on_event("startup")
def on_startup() -> None:
    # Crea l'indice all'avvio (se non esiste).
    create_index_if_missing(INDEX)


@app.get("/health")
def health() -> dict[str, Any]:
    try:
        info = es.info()
        ok = index_exists(INDEX)
        return {
            "elasticsearch": {"cluster_name": info.get("cluster_name"), "version": info.get("version", {}).get("number")},
            "index": {"name": INDEX, "exists": ok},
        }
    except Exception as exc:  # evitare di esporre dettagli sensibili in prod
        raise HTTPException(status_code=503, detail=f"Elasticsearch non raggiungibile: {exc}") from exc


@app.post("/productsproducts", status_code=201)
def upsert_product(p: ProductIn) -> dict[str, Any]:
    # Indicizziamo usando l'id come document_id per garantire idempotenza.
    try:
        res = es.index(index=INDEX, id=p.id, document=p.model_dump())
        return {"result": res.get("result"), "_id": res.get("_id")}
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc)) from exc


@app.get("/products/search")
def search_products(
    q: str = Query(..., description="Query full-text su name e description"),
    min_price: Optional[float] = Query(None),
    max_price: Optional[float] = Query(None),
    tag: Optional[str] = Query(None, description="Filtro su tag (keyword)"),
    size: int = Query(10, ge=1, le=100),
) -> dict[str, Any]:
    must: list[dict[str, Any]] = [
        {
            "multi_match": {
                "query": q,
                "fields": ["name^2", "description"],
                "type": "best_fields",
                "operator": "and",
            }
        }
    ]

    filters: list[dict[str, Any]] = []

    if min_price is not None or max_price is not None:
        range_body: dict[str, Any] = {}
        if min_price is not None:
            range_body["gte"] = min_price
        if max_price is not None:
            range_body["lte"] = max_price
        filters.append({"range": {"price": range_body}})

    if tag:
        filters.append({"term": {"tags": tag}})

    query_body: dict[str, Any] = {
        "query": {
            "bool": {
                "must": must,
                "filter": filters,
            }
        },
        "size": size,
        "sort": [{"_score": "desc"}, {"price": "asc"}],
    }

    try:
        res = es.search(index=INDEX, body=query_body)
        hits = res.get("hits", {}).get("hits", [])
        return {
            "total": res.get("hits", {}).get("total", {}),
            "items": [
                {"_id": h.get("_id"), "score": h.get("_score"), **(h.get("_source") or {})}
                for h in hits
            ],
        }
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc)) from exc

Correzione importante: nell’esempio sopra l’endpoint POST ha volutamente mostrato la struttura tipica, ma il path contiene un refuso (/Ř̦) se copiato male. Usalo così:

# Sostituisci questa riga:
# @app.post("/Ř̦products", status_code=201)
# con:
@app.post("/products", status_code=201)

4) Avvio e test end-to-end

  1. Avvia tutto dalla root del progetto:
docker compose up --build

Test rapido:

  • Health dell’API: GET http://localhost:8000/health
  • Elasticsearch: GET http://localhost:9200
  • Kibana (opzionale): http://localhost:5601

Indicizza alcuni documenti:

curl -X POST "http://localhost:8000/products"   -H "Content-Type: application/json"   -d '{
    "id": "p1",
    "name": "Caffe in grani",
    "description": "Miscela arabica 100%, tostatura media",
    "price": 12.5,
    "tags": ["caffe", "bevande"]
  }'

curl -X POST "http://localhost:8000/products"   -H "Content-Type: application/json"   -d '{
    "id": "p2",
    "name": "Macchina espresso",
    "description": "Macchina compatta con lancia vapore",
    "price": 149.0,
    "tags": ["caffe", "elettrodomestici"]
  }'

Cerca:

curl "http://localhost:8000/products/search?q=macchina%20espresso&min_price=50&tag=caffe"

5) Mapping, keyword vs text e strategie di ricerca

Elasticsearch distingue tra:

  • text: analizzato (tokenizzazione, lowercasing, ecc.). Ottimo per full‑text.
  • keyword: non analizzato. Ottimo per filtri, aggregazioni, ordinamenti esatti.

Nell’esempio, name è text ma ha anche un sotto-campo name.raw di tipo keyword per ordinamenti o confronti esatti. Le tags sono keyword perché tipicamente filtrate.

Una query più “ricca” può includere:

  • boost su alcuni campi (name^2);
  • fuzzy matching per tollerare refusi;
  • highlight per evidenziare termini trovati;
  • aggregazioni per faceting (es. conteggio per tag).
{
  "query": {
    "bool": {
      "must": [
        {
          "multi_match": {
            "query": "macchna espresso",
            "fields": ["name^2", "description"],
            "fuzziness": "AUTO"
          }
        }
      ],
      "filter": [
        { "range": { "price": { "gte": 50, "lte": 200 } } }
      ]
    }
  },
  "highlight": {
    "fields": {
      "name": {},
      "description": {}
    }
  },
  "aggs": {
    "tags": { "terms": { "field": "tags" } }
  }
}

6) Attendere la disponibilità del cluster: healthcheck e retry

In ambienti containerizzati, il problema più comune è che l’app prova a connettersi a Elasticsearch prima che sia pronto. Hai tre livelli di difesa:

  • Docker Compose healthcheck (già usato) per orchestrare l’ordine di startup.
  • Retry lato applicazione (utile se l’app viene eseguita anche fuori Compose o su orchestratori diversi).
  • Timeout ragionevoli sul client.

Esempio di retry semplice in startup (se vuoi irrobustire ulteriormente):

import time
from elasticsearch import Elasticsearch

def wait_for_es(es: Elasticsearch, timeout_s: int = 30) -> None:
    deadline = time.time() + timeout_s
    last_exc: Exception | None = None

    while time.time() < deadline:
        try:
            if es.ping():
                return
        except Exception as exc:
            last_exc = exc
        time.sleep(1)

    raise RuntimeError(f"Elasticsearch non disponibile entro {timeout_s}s: {last_exc}")

7) Buone pratiche per produzione

  • Sicurezza: abilita X-Pack security, usa TLS e credenziali; evita di esporre direttamente la porta 9200 su Internet.
  • Persistenza e backup: i volumi Docker sono ok per sviluppo; in produzione valuta storage gestito e snapshot repository.
  • Risorse: configura heap e limiti di memoria; monitora GC e utilizzo disco.
  • Index lifecycle: per log/eventi, considera ILM (rollover, retention) e template.
  • Osservabilità: metriche, slow logs, e monitor su query lente e saturazione.
  • Schema evolutivo: i mapping non sono facilmente modificabili su campi esistenti; progetta versioning di indici (es. products_v1, products_v2) e reindex.

8) Varianti comuni (Flask/Django, async, bulk indexing)

Il concetto non cambia se usi altre framework:

  • Flask: inizializza il client in un modulo condiviso e usa factory/app context.
  • Django: configura il client in settings e incapsula accesso in servizi/repository; valuta Celery per reindex.
  • Bulk: per grandi volumi usa l’API bulk (helper helpers.bulk) invece di chiamare index in loop.
from elasticsearch import Elasticsearch, helpers

def bulk_index(es: Elasticsearch, index: str, docs: list[dict]) -> None:
    actions = [
        {"_index": index, "_id": d["id"], "_source": d}
        for d in docs
    ]
    helpers.bulk(es, actions)

Conclusione

Con Docker Compose puoi ottenere un ambiente riproducibile e vicino alla produzione: Elasticsearch avviato come servizio, web app Python che si connette tramite hostname interno, indice creato automaticamente e API di indicizzazione/ricerca. Da qui puoi estendere con mapping più avanzati, aggregazioni per faceting, analizzatori linguistici, pipeline di ingest e strategie di versioning degli indici.

Torna su