SQLAlchemy con PostgreSQL in Python

SQLAlchemy è il toolkit SQL e ORM (Object Relational Mapper) più utilizzato nell'ecosistema Python. Il suo obiettivo principale è quello di fornire un'interfaccia completa e flessibile per interagire con i database relazionali, astraendo le complessità del linguaggio SQL grezzo e offrendo al contempo la possibilità di utilizzarlo quando necessario. PostgreSQL, d'altra parte, è uno dei sistemi di gestione di database relazionali open source più potenti e affidabili disponibili, noto per la sua conformità agli standard SQL, l'estensibilità e le funzionalità avanzate come i tipi di dati JSON, le ricerche full-text e il supporto per le transazioni ACID.

L'accoppiata SQLAlchemy e PostgreSQL rappresenta una scelta estremamente popolare nello sviluppo di applicazioni web e servizi backend in Python. SQLAlchemy supporta due modalità principali di interazione con il database: il Core, che offre un'astrazione a livello SQL mediante un sistema di espressioni, e l'ORM, che consente di mappare le tabelle del database a classi Python, lavorando con oggetti anziché con query SQL dirette.

In questo articolo esploreremo nel dettaglio come configurare, utilizzare e sfruttare al meglio SQLAlchemy con PostgreSQL, partendo dall'installazione e dalla connessione fino alle operazioni CRUD, alle relazioni tra modelli, alle query avanzate e alla gestione delle migrazioni con Alembic.

Installazione e configurazione

Per iniziare a lavorare con SQLAlchemy e PostgreSQL, è necessario installare alcuni pacchetti. Il driver più comune per collegare Python a PostgreSQL è psycopg2, disponibile anche nella variante binaria precompilata psycopg2-binary, più semplice da installare in ambienti di sviluppo.

pip install sqlalchemy psycopg2-binary

In alternativa, è possibile utilizzare il più recente psycopg (versione 3), che offre supporto nativo per le operazioni asincrone e una migliore gestione dei tipi di dati. Per installarlo:

pip install sqlalchemy "psycopg[binary]"

Una volta installati i pacchetti, il primo passo è creare un motore di connessione (engine) che rappresenta il punto di ingresso verso il database. L'engine gestisce il pool di connessioni e funge da factory per le sessioni di lavoro.

from sqlalchemy import create_engine

# Creazione dell'engine con psycopg2
engine = create_engine(
    "postgresql+psycopg2://username:password@localhost:5432/my_database",
    echo=True,  # Abilita il logging delle query SQL
    pool_size=10,  # Dimensione del pool di connessioni
    max_overflow=20,  # Connessioni extra oltre il pool
    pool_pre_ping=True  # Verifica la connessione prima dell'uso
)

La stringa di connessione segue il formato dialect+driver://user:password@host:port/database. Il parametro echo=True è utile durante lo sviluppo perché stampa tutte le query SQL generate da SQLAlchemy nella console. Il parametro pool_pre_ping è particolarmente importante in produzione perché verifica che ogni connessione presa dal pool sia ancora attiva prima di utilizzarla, evitando errori dovuti a connessioni interrotte.

Per utilizzare il driver psycopg (versione 3), è sufficiente modificare il dialetto nella stringa di connessione:

from sqlalchemy import create_engine

# Creazione dell'engine con psycopg3
engine = create_engine(
    "postgresql+psycopg://username:password@localhost:5432/my_database",
    echo=True
)

Una pratica comune è quella di utilizzare variabili d'ambiente per gestire le credenziali del database, evitando di inserirle direttamente nel codice sorgente:

import os
from sqlalchemy import create_engine
from sqlalchemy.engine import URL

# Costruzione dell'URL di connessione da variabili d'ambiente
connection_url = URL.create(
    drivername="postgresql+psycopg2",
    username=os.environ.get("DB_USER", "postgres"),
    password=os.environ.get("DB_PASSWORD", "secret"),
    host=os.environ.get("DB_HOST", "localhost"),
    port=int(os.environ.get("DB_PORT", "5432")),
    database=os.environ.get("DB_NAME", "my_database")
)

engine = create_engine(connection_url, echo=True)

Definizione dei modelli con l'ORM

SQLAlchemy 2.0 ha introdotto un nuovo stile dichiarativo per la definizione dei modelli ORM basato su DeclarativeBase e sulle annotazioni di tipo di Python. Questo approccio modernizzato sfrutta appieno il sistema di type hints, rendendo il codice più leggibile e compatibile con gli strumenti di analisi statica come mypy.

from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Text, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import (
    DeclarativeBase,
    Mapped,
    mapped_column,
    relationship
)


class Base(DeclarativeBase):
    """Classe base per tutti i modelli ORM."""
    pass


class Author(Base):
    """Modello che rappresenta un autore."""
    __tablename__ = "authors"

    # Chiave primaria con autoincremento
    id: Mapped[int] = mapped_column(primary_key=True)
    # Nome dell'autore, obbligatorio e unico
    first_name: Mapped[str] = mapped_column(String(100), nullable=False)
    last_name: Mapped[str] = mapped_column(String(100), nullable=False)
    # Email opzionale con vincolo di unicità
    email: Mapped[Optional[str]] = mapped_column(
        String(255), unique=True, nullable=True
    )
    # Data di creazione con valore predefinito
    created_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow
    )

    # Relazione uno-a-molti con gli articoli
    articles: Mapped[List["Article"]] = relationship(
        back_populates="author",
        cascade="all, delete-orphan",
        lazy="selectin"
    )

    # Vincolo di unicità composto
    __table_args__ = (
        UniqueConstraint("first_name", "last_name", name="uq_author_full_name"),
        Index("ix_author_last_name", "last_name"),
    )

    def __repr__(self) -> str:
        return f"Author(id={self.id}, name='{self.first_name} {self.last_name}')"


class Article(Base):
    """Modello che rappresenta un articolo."""
    __tablename__ = "articles"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(300), nullable=False)
    slug: Mapped[str] = mapped_column(
        String(350), unique=True, nullable=False
    )
    content: Mapped[str] = mapped_column(Text, nullable=False)
    is_published: Mapped[bool] = mapped_column(default=False)
    # Chiave esterna verso la tabella authors
    author_id: Mapped[int] = mapped_column(
        ForeignKey("authors.id", ondelete="CASCADE"),
        nullable=False
    )
    created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
    updated_at: Mapped[Optional[datetime]] = mapped_column(
        onupdate=datetime.utcnow, nullable=True
    )

    # Relazione inversa verso l'autore
    author: Mapped["Author"] = relationship(back_populates="articles")

    # Indice sulla chiave esterna per ottimizzare i join
    __table_args__ = (
        Index("ix_article_author_id", "author_id"),
        Index("ix_article_slug", "slug"),
    )

    def __repr__(self) -> str:
        return f"Article(id={self.id}, title='{self.title}')"

In questo esempio, Mapped[int] e Mapped[str] definiscono il tipo Python dell'attributo, mentre mapped_column() consente di specificare opzioni aggiuntive come la lunghezza massima della stringa, i valori predefiniti, i vincoli di unicità e le chiavi esterne. L'uso di Optional indica che un campo può contenere None, corrispondente a un valore NULL nel database.

La funzione relationship() definisce le relazioni tra i modelli a livello ORM. Il parametro back_populates collega bidirezionalmente i due lati della relazione. Il parametro cascade="all, delete-orphan" specifica che quando un autore viene eliminato, anche tutti i suoi articoli vengono rimossi dal database. Il parametro lazy="selectin" indica la strategia di caricamento: in questo caso gli articoli collegati vengono caricati con una query SELECT ... IN separata ma eseguita contestualmente, evitando il problema delle N+1 query.

Creazione delle tabelle

Una volta definiti i modelli, le tabelle corrispondenti possono essere create nel database con una singola chiamata al metodo create_all() della classe Base:

from sqlalchemy import create_engine

engine = create_engine(
    "postgresql+psycopg2://username:password@localhost:5432/my_database"
)

# Creazione di tutte le tabelle definite nei modelli
Base.metadata.create_all(engine)

Questo metodo genera ed esegue le istruzioni CREATE TABLE solo per le tabelle che non esistono ancora nel database. Non modifica le tabelle esistenti: per gestire le modifiche strutturali nel tempo è necessario utilizzare un sistema di migrazioni come Alembic, che verrà trattato più avanti.

Gestione delle sessioni

Le sessioni in SQLAlchemy rappresentano una "zona di lavoro" che tiene traccia di tutti gli oggetti caricati dal database e delle modifiche pendenti. Ogni operazione CRUD passa attraverso una sessione. La classe Session gestisce internamente la mappa di identità (identity map), garantendo che lo stesso record del database sia rappresentato da un unico oggetto Python all'interno della stessa sessione.

from sqlalchemy.orm import Session, sessionmaker

# Creazione di una factory per le sessioni
SessionFactory = sessionmaker(bind=engine)

# Utilizzo con il context manager (consigliato)
with Session(engine) as session:
    # Le operazioni vengono eseguite qui
    session.commit()

L'utilizzo del context manager with è il metodo raccomandato perché garantisce che la sessione venga chiusa automaticamente anche in caso di eccezioni. In contesti più strutturati, come le applicazioni web, è comune utilizzare un pattern di dependency injection per fornire la sessione ai vari componenti:

from contextlib import contextmanager
from sqlalchemy.orm import Session


@contextmanager
def get_session():
    """Generatore di sessioni con gestione automatica del commit e del rollback."""
    session = Session(engine)
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()


# Utilizzo del generatore di sessioni
with get_session() as session:
    # Le operazioni sulla sessione vengono gestite automaticamente
    pass

Operazioni CRUD

Le operazioni CRUD (Create, Read, Update, Delete) rappresentano le quattro operazioni fondamentali per la manipolazione dei dati in un database. Vediamo come eseguirle con SQLAlchemy.

Creazione di record

Per inserire nuovi record nel database, si creano istanze dei modelli e le si aggiungono alla sessione:

from sqlalchemy.orm import Session

with Session(engine) as session:
    # Creazione di un singolo autore
    author = Author(
        first_name="Italo",
        last_name="Calvino",
        email="italo.calvino@example.com"
    )
    session.add(author)
    session.flush()  # Sincronizza con il database per ottenere l'ID

    # Creazione di più articoli in una volta
    articles = [
        Article(
            title="Il sentiero dei nidi di ragno",
            slug="il-sentiero-dei-nidi-di-ragno",
            content="Contenuto del primo articolo...",
            author_id=author.id,
            is_published=True
        ),
        Article(
            title="Le città invisibili",
            slug="le-citta-invisibili",
            content="Contenuto del secondo articolo...",
            author_id=author.id,
            is_published=True
        ),
        Article(
            title="Se una notte d'inverno un viaggiatore",
            slug="se-una-notte-dinverno-un-viaggiatore",
            content="Contenuto del terzo articolo...",
            author_id=author.id,
            is_published=False
        ),
    ]
    session.add_all(articles)
    session.commit()

    print(f"Autore creato con ID: {author.id}")
    print(f"Articoli creati: {len(articles)}")

Il metodo flush() sincronizza le modifiche pendenti con il database senza confermarle definitivamente, consentendo di ottenere l'ID generato automaticamente dal database. Il metodo commit() conferma tutte le modifiche in modo permanente all'interno di una transazione.

Lettura di record

SQLAlchemy 2.0 utilizza il metodo select() come punto di partenza per costruire query di lettura. Questo approccio unificato sostituisce i vari metodi query() delle versioni precedenti:

from sqlalchemy import select
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Lettura di un singolo record per chiave primaria
    author = session.get(Author, 1)
    print(author)

    # Query con filtro
    statement = select(Author).where(Author.last_name == "Calvino")
    result = session.execute(statement).scalars().first()
    print(result)

    # Lettura di tutti gli autori ordinati per cognome
    statement = select(Author).order_by(Author.last_name)
    all_authors = session.execute(statement).scalars().all()
    for a in all_authors:
        print(a)

    # Query con paginazione
    page = 1
    per_page = 10
    statement = (
        select(Article)
        .where(Article.is_published == True)
        .order_by(Article.created_at.desc())
        .offset((page - 1) * per_page)
        .limit(per_page)
    )
    paginated_articles = session.execute(statement).scalars().all()
    for art in paginated_articles:
        print(f"{art.title} - Pubblicato: {art.is_published}")

Il metodo session.get() è ottimizzato per la ricerca per chiave primaria: consulta prima la mappa di identità della sessione e accede al database solo se l'oggetto non è già stato caricato. Il metodo scalars() estrae direttamente gli oggetti del modello dal risultato, evitando di lavorare con tuple di riga.

Aggiornamento di record

Per aggiornare i record, è possibile modificare gli attributi degli oggetti caricati nella sessione oppure utilizzare un'istruzione UPDATE in blocco:

from sqlalchemy import update
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Aggiornamento tramite modifica dell'oggetto
    author = session.get(Author, 1)
    if author:
        author.email = "nuovo.indirizzo@example.com"
        session.commit()

    # Aggiornamento in blocco con istruzione UPDATE
    statement = (
        update(Article)
        .where(Article.is_published == False)
        .values(is_published=True)
    )
    result = session.execute(statement)
    session.commit()
    print(f"Articoli aggiornati: {result.rowcount}")

Quando si modificano gli attributi di un oggetto già presente nella sessione, SQLAlchemy tiene traccia automaticamente delle modifiche grazie al sistema di dirty tracking. Al momento del commit(), solo le colonne effettivamente modificate vengono incluse nell'istruzione UPDATE generata.

Eliminazione di record

L'eliminazione segue un pattern simile, con la possibilità di rimuovere singoli oggetti o di eseguire eliminazioni in blocco:

from sqlalchemy import delete
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Eliminazione di un singolo oggetto
    author = session.get(Author, 1)
    if author:
        session.delete(author)
        session.commit()

    # Eliminazione in blocco
    statement = (
        delete(Article)
        .where(Article.is_published == False)
    )
    result = session.execute(statement)
    session.commit()
    print(f"Articoli eliminati: {result.rowcount}")

Quando si elimina un autore con session.delete(), la cascade "all, delete-orphan" configurata nella relazione assicura che anche tutti gli articoli associati vengano eliminati. Nelle eliminazioni in blocco con delete(), tuttavia, le cascade ORM non vengono applicate: è la clausola ON DELETE CASCADE definita sulla chiave esterna a livello di database a gestire l'eliminazione degli articoli collegati.

Query avanzate

SQLAlchemy offre un sistema di espressioni molto ricco che consente di costruire query complesse in modo programmatico. Questa sezione esplora alcune delle funzionalità più utili per le query avanzate.

Filtri composti e operatori

from sqlalchemy import select, and_, or_, not_, func
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Condizioni multiple con AND e OR
    statement = (
        select(Article)
        .where(
            and_(
                Article.is_published == True,
                or_(
                    Article.title.ilike("%python%"),
                    Article.title.ilike("%postgresql%")
                )
            )
        )
    )
    results = session.execute(statement).scalars().all()

    # Utilizzo di BETWEEN e IN
    from datetime import datetime, timedelta
    one_month_ago = datetime.utcnow() - timedelta(days=30)

    statement = (
        select(Article)
        .where(
            Article.created_at >= one_month_ago,
            Article.author_id.in_([1, 2, 3])
        )
    )
    recent_articles = session.execute(statement).scalars().all()

    # Negazione di una condizione
    statement = (
        select(Author)
        .where(not_(Author.email.is_(None)))
    )
    authors_with_email = session.execute(statement).scalars().all()

Join e caricamento delle relazioni

from sqlalchemy import select, func
from sqlalchemy.orm import Session, joinedload, selectinload

with Session(engine) as session:
    # Join esplicito tra autori e articoli
    statement = (
        select(Author, func.count(Article.id).label("article_count"))
        .join(Article, Author.id == Article.author_id, isouter=True)
        .group_by(Author.id)
        .having(func.count(Article.id) > 0)
        .order_by(func.count(Article.id).desc())
    )
    results = session.execute(statement).all()
    for author, count in results:
        print(f"{author.first_name} {author.last_name}: {count} articoli")

    # Eager loading con joinedload (una sola query con JOIN)
    statement = (
        select(Author)
        .options(joinedload(Author.articles))
        .where(Author.id == 1)
    )
    author = session.execute(statement).scalars().unique().first()
    if author:
        # Gli articoli sono già caricati, nessuna query aggiuntiva
        for art in author.articles:
            print(art.title)

    # Eager loading con selectinload (query separata con IN)
    statement = (
        select(Author)
        .options(selectinload(Author.articles))
        .order_by(Author.last_name)
    )
    authors = session.execute(statement).scalars().all()
    for a in authors:
        print(f"{a.first_name}: {[art.title for art in a.articles]}")

Le strategie di caricamento eager sono fondamentali per evitare il problema delle N+1 query. Con joinedload, le relazioni vengono caricate con un'unica query SQL contenente un JOIN. Con selectinload, viene eseguita una query aggiuntiva con una clausola IN per caricare gli oggetti correlati. Quest'ultima strategia è generalmente preferibile quando si caricano collezioni di grandi dimensioni, perché evita la duplicazione delle righe della tabella principale che si verifica con i JOIN.

Subquery e CTE

from sqlalchemy import select, func
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Subquery: autori con più di 5 articoli pubblicati
    subquery = (
        select(Article.author_id)
        .where(Article.is_published == True)
        .group_by(Article.author_id)
        .having(func.count(Article.id) > 5)
    ).subquery()

    statement = (
        select(Author)
        .where(Author.id.in_(select(subquery.c.author_id)))
    )
    prolific_authors = session.execute(statement).scalars().all()

    # Common Table Expression (CTE)
    article_stats_cte = (
        select(
            Article.author_id,
            func.count(Article.id).label("total_articles"),
            func.sum(
                func.cast(Article.is_published, Integer)
            ).label("published_count")
        )
        .group_by(Article.author_id)
    ).cte("article_stats")

    statement = (
        select(
            Author.first_name,
            Author.last_name,
            article_stats_cte.c.total_articles,
            article_stats_cte.c.published_count
        )
        .join(
            article_stats_cte,
            Author.id == article_stats_cte.c.author_id
        )
        .order_by(article_stats_cte.c.total_articles.desc())
    )
    stats = session.execute(statement).all()
    for row in stats:
        print(
            f"{row.first_name} {row.last_name}: "
            f"{row.total_articles} totali, "
            f"{row.published_count} pubblicati"
        )

Funzioni aggregate e raggruppamento

from sqlalchemy import select, func, extract
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Conteggio degli articoli per autore
    statement = (
        select(
            Author.last_name,
            func.count(Article.id).label("total"),
            func.min(Article.created_at).label("first_article"),
            func.max(Article.created_at).label("last_article")
        )
        .join(Article)
        .group_by(Author.last_name)
        .order_by(func.count(Article.id).desc())
    )
    results = session.execute(statement).all()
    for row in results:
        print(
            f"{row.last_name}: {row.total} articoli "
            f"(dal {row.first_article} al {row.last_article})"
        )

    # Raggruppamento per mese con extract()
    statement = (
        select(
            extract("year", Article.created_at).label("year"),
            extract("month", Article.created_at).label("month"),
            func.count(Article.id).label("count")
        )
        .group_by("year", "month")
        .order_by("year", "month")
    )
    monthly_stats = session.execute(statement).all()
    for row in monthly_stats:
        print(f"{int(row.year)}-{int(row.month):02d}: {row.count} articoli")

Funzionalità specifiche di PostgreSQL

SQLAlchemy offre un supporto eccellente per le funzionalità specifiche di PostgreSQL attraverso il modulo sqlalchemy.dialects.postgresql. Questo consente di sfruttare tipi di dati e operatori nativi di PostgreSQL direttamente nei modelli e nelle query.

Tipi di dati PostgreSQL

from sqlalchemy import String
from sqlalchemy.dialects.postgresql import (
    ARRAY, JSONB, UUID, INET, DATERANGE, TSVECTOR
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import uuid


class Base(DeclarativeBase):
    pass


class UserProfile(Base):
    """Modello che utilizza tipi di dati specifici di PostgreSQL."""
    __tablename__ = "user_profiles"

    # UUID come chiave primaria
    id: Mapped[uuid.UUID] = mapped_column(
        UUID(as_uuid=True),
        primary_key=True,
        default=uuid.uuid4
    )
    username: Mapped[str] = mapped_column(String(50), unique=True)
    # Array di stringhe per i tag
    tags: Mapped[list] = mapped_column(
        ARRAY(String(50)),
        default=list
    )
    # JSONB per dati strutturati flessibili
    preferences: Mapped[dict] = mapped_column(
        JSONB,
        default=dict
    )
    # Indirizzo IP
    ip_address: Mapped[str] = mapped_column(
        INET,
        nullable=True
    )
    # Vettore di ricerca full-text
    search_vector: Mapped[str] = mapped_column(
        TSVECTOR,
        nullable=True
    )

Operazioni su JSONB

PostgreSQL offre operatori potenti per interrogare e manipolare i dati JSONB. SQLAlchemy li espone in modo naturale attraverso gli operatori degli attributi:

from sqlalchemy import select, cast, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Accesso a una chiave del campo JSONB
    statement = (
        select(UserProfile)
        .where(
            UserProfile.preferences["theme"].astext == "dark"
        )
    )
    dark_theme_users = session.execute(statement).scalars().all()

    # Accesso a chiavi nidificate
    statement = (
        select(UserProfile)
        .where(
            UserProfile.preferences["notifications"]["email"].astext == "true"
        )
    )
    email_notify_users = session.execute(statement).scalars().all()

    # Verifica dell'esistenza di una chiave
    statement = (
        select(UserProfile)
        .where(UserProfile.preferences.has_key("theme"))
    )
    users_with_theme = session.execute(statement).scalars().all()

    # Contenimento: il JSONB contiene un sotto-oggetto
    statement = (
        select(UserProfile)
        .where(
            UserProfile.preferences.contains({"theme": "dark"})
        )
    )
    results = session.execute(statement).scalars().all()

Operazioni su ARRAY

from sqlalchemy import select
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Verifica se un array contiene un valore
    statement = (
        select(UserProfile)
        .where(UserProfile.tags.contains(["python"]))
    )
    python_users = session.execute(statement).scalars().all()

    # Sovrapposizione tra array (almeno un elemento in comune)
    statement = (
        select(UserProfile)
        .where(
            UserProfile.tags.overlap(["python", "postgresql"])
        )
    )
    matching_users = session.execute(statement).scalars().all()

    # Accesso a un elemento specifico dell'array (indice 1-based in PostgreSQL)
    statement = (
        select(UserProfile)
        .where(UserProfile.tags[1] == "python")
    )
    results = session.execute(statement).scalars().all()

Upsert con ON CONFLICT

PostgreSQL supporta l'istruzione INSERT ... ON CONFLICT, nota anche come "upsert", che consente di gestire i conflitti durante l'inserimento. SQLAlchemy fornisce un supporto diretto per questa funzionalità:

from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Upsert: inserisce o aggiorna in caso di conflitto
    statement = insert(Author).values(
        first_name="Italo",
        last_name="Calvino",
        email="calvino@example.com"
    )

    # In caso di conflitto sulla colonna email, aggiorna i campi specificati
    statement = statement.on_conflict_do_update(
        index_elements=["email"],
        set_={
            "first_name": statement.excluded.first_name,
            "last_name": statement.excluded.last_name,
        }
    )
    session.execute(statement)
    session.commit()

    # Upsert in blocco con più valori
    values_list = [
        {"first_name": "Umberto", "last_name": "Eco", "email": "eco@example.com"},
        {"first_name": "Elena", "last_name": "Ferrante", "email": "ferrante@example.com"},
    ]

    statement = insert(Author).values(values_list)
    statement = statement.on_conflict_do_nothing(
        index_elements=["email"]
    )
    session.execute(statement)
    session.commit()

Relazioni avanzate

Oltre alle relazioni uno-a-molti viste in precedenza, SQLAlchemy supporta relazioni molti-a-molti e relazioni con tabelle di associazione personalizzate.

Relazione molti-a-molti

from sqlalchemy import Table, Column, Integer, ForeignKey, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from typing import List


class Base(DeclarativeBase):
    pass


# Tabella di associazione per la relazione molti-a-molti
article_tag_table = Table(
    "article_tags",
    Base.metadata,
    Column(
        "article_id",
        Integer,
        ForeignKey("articles.id", ondelete="CASCADE"),
        primary_key=True
    ),
    Column(
        "tag_id",
        Integer,
        ForeignKey("tags.id", ondelete="CASCADE"),
        primary_key=True
    ),
)


class Tag(Base):
    """Modello che rappresenta un tag."""
    __tablename__ = "tags"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)

    # Relazione molti-a-molti con gli articoli
    articles: Mapped[List["Article"]] = relationship(
        secondary=article_tag_table,
        back_populates="tags",
        lazy="selectin"
    )

    def __repr__(self) -> str:
        return f"Tag(id={self.id}, name='{self.name}')"


class Article(Base):
    """Modello che rappresenta un articolo con tag."""
    __tablename__ = "articles"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(300), nullable=False)

    # Relazione molti-a-molti con i tag
    tags: Mapped[List["Tag"]] = relationship(
        secondary=article_tag_table,
        back_populates="articles",
        lazy="selectin"
    )

L'utilizzo delle relazioni molti-a-molti è diretto:

from sqlalchemy.orm import Session

with Session(engine) as session:
    # Creazione di tag
    tag_python = Tag(name="python")
    tag_database = Tag(name="database")
    tag_tutorial = Tag(name="tutorial")
    session.add_all([tag_python, tag_database, tag_tutorial])
    session.flush()

    # Creazione di un articolo con tag associati
    article = Article(title="Guida a SQLAlchemy con PostgreSQL")
    article.tags.extend([tag_python, tag_database, tag_tutorial])
    session.add(article)
    session.commit()

    # Lettura dei tag di un articolo
    loaded_article = session.get(Article, article.id)
    print(f"Tag dell'articolo: {[t.name for t in loaded_article.tags]}")

    # Ricerca di articoli per tag
    from sqlalchemy import select
    statement = (
        select(Article)
        .where(Article.tags.any(Tag.name == "python"))
    )
    python_articles = session.execute(statement).scalars().all()
    print(f"Articoli con tag 'python': {len(python_articles)}")

Relazione con tabella di associazione personalizzata

Quando la tabella di associazione deve contenere campi aggiuntivi (come un timestamp o un ruolo), è necessario definirla come un modello ORM completo:

from datetime import datetime
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from typing import List, Optional


class Base(DeclarativeBase):
    pass


class ArticleAuthorAssociation(Base):
    """Tabella di associazione con dati aggiuntivi."""
    __tablename__ = "article_author_associations"

    article_id: Mapped[int] = mapped_column(
        ForeignKey("articles.id", ondelete="CASCADE"),
        primary_key=True
    )
    author_id: Mapped[int] = mapped_column(
        ForeignKey("authors.id", ondelete="CASCADE"),
        primary_key=True
    )
    # Ruolo dell'autore nell'articolo
    role: Mapped[str] = mapped_column(
        String(50), default="contributor"
    )
    assigned_at: Mapped[datetime] = mapped_column(
        default=datetime.utcnow
    )

    # Relazioni verso i modelli principali
    article: Mapped["Article"] = relationship(
        back_populates="author_associations"
    )
    author: Mapped["Author"] = relationship(
        back_populates="article_associations"
    )

Transazioni e isolamento

La gestione corretta delle transazioni è fondamentale per garantire la consistenza dei dati. SQLAlchemy offre un controllo granulare sulle transazioni, inclusa la possibilità di specificare il livello di isolamento e di utilizzare i savepoint.

from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session

# Engine con livello di isolamento predefinito
engine = create_engine(
    "postgresql+psycopg2://username:password@localhost:5432/my_database",
    isolation_level="READ COMMITTED"
)

with Session(engine) as session:
    try:
        # Prima operazione
        author = Author(first_name="Primo", last_name="Levi")
        session.add(author)
        session.flush()

        # Creazione di un savepoint
        savepoint = session.begin_nested()
        try:
            # Operazione rischiosa
            article = Article(
                title="Se questo è un uomo",
                slug="se-questo-e-un-uomo",
                content="Contenuto dell'articolo...",
                author_id=author.id
            )
            session.add(article)
            session.flush()
        except Exception:
            # Rollback solo fino al savepoint
            savepoint.rollback()
            print("Errore nell'inserimento dell'articolo, savepoint ripristinato")

        # Il commit finale include l'autore ma non l'articolo se il savepoint è stato ripristinato
        session.commit()
    except Exception:
        session.rollback()
        raise

I savepoint sono particolarmente utili quando si vogliono annullare solo alcune operazioni all'interno di una transazione più ampia, senza dover ripristinare l'intera transazione. PostgreSQL supporta pienamente i savepoint e SQLAlchemy li mappa al costrutto SAVEPOINT nativo.

Per le operazioni che richiedono un livello di isolamento diverso da quello predefinito, è possibile specificarlo a livello di singola connessione:

from sqlalchemy.orm import Session

with Session(engine) as session:
    # Impostazione del livello di isolamento per questa sessione
    session.connection(
        execution_options={"isolation_level": "SERIALIZABLE"}
    )

    # Le operazioni qui avranno isolamento SERIALIZABLE
    statement = select(Author).where(Author.id == 1)
    author = session.execute(statement).scalars().first()
    if author:
        author.email = "aggiornato@example.com"
    session.commit()

Operazioni asincrone

SQLAlchemy 2.0 supporta nativamente le operazioni asincrone tramite asyncio, utilizzando driver asincroni come asyncpg o la modalità asincrona di psycopg (versione 3). Questa funzionalità è particolarmente utile nelle applicazioni web costruite con framework asincroni come FastAPI o Starlette.

pip install sqlalchemy asyncpg
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession
)
from sqlalchemy import select

# Creazione dell'engine asincrono
async_engine = create_async_engine(
    "postgresql+asyncpg://username:password@localhost:5432/my_database",
    echo=True,
    pool_size=10
)

# Factory per le sessioni asincrone
AsyncSessionFactory = async_sessionmaker(
    async_engine,
    class_=AsyncSession,
    expire_on_commit=False
)


async def create_tables():
    """Creazione delle tabelle in modo asincrono."""
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)


async def get_all_authors() -> list[Author]:
    """Lettura di tutti gli autori in modo asincrono."""
    async with AsyncSessionFactory() as session:
        statement = select(Author).order_by(Author.last_name)
        result = await session.execute(statement)
        return result.scalars().all()


async def create_author(first_name: str, last_name: str, email: str) -> Author:
    """Creazione di un autore in modo asincrono."""
    async with AsyncSessionFactory() as session:
        author = Author(
            first_name=first_name,
            last_name=last_name,
            email=email
        )
        session.add(author)
        await session.commit()
        await session.refresh(author)
        return author


async def search_articles(keyword: str) -> list[Article]:
    """Ricerca di articoli per parola chiave in modo asincrono."""
    async with AsyncSessionFactory() as session:
        statement = (
            select(Article)
            .where(
                Article.title.ilike(f"%{keyword}%")
            )
            .order_by(Article.created_at.desc())
        )
        result = await session.execute(statement)
        return result.scalars().all()

Il parametro expire_on_commit=False è importante nelle sessioni asincrone perché evita che gli attributi degli oggetti vengano invalidati dopo il commit. Senza questo parametro, accedere a un attributo dopo il commit richiederebbe un'ulteriore operazione asincrona di refresh, che non può avvenire in modo implicito come nelle sessioni sincrone.

Migrazioni con Alembic

Alembic è lo strumento di migrazione ufficiale per SQLAlchemy. Consente di gestire le modifiche strutturali del database in modo versionato e riproducibile, generando script di migrazione che possono essere applicati o ripristinati.

# Installazione di Alembic
pip install alembic

# Inizializzazione del progetto Alembic
alembic init alembic

Dopo l'inizializzazione, è necessario configurare il file alembic/env.py per puntare ai modelli dell'applicazione:

# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context

# Importazione della classe Base e di tutti i modelli
from app.models import Base

# Configurazione del logging
config = context.config
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# Impostazione dei metadati per il rilevamento automatico delle modifiche
target_metadata = Base.metadata


def run_migrations_offline() -> None:
    """Esecuzione delle migrazioni in modalità offline."""
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
    )
    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """Esecuzione delle migrazioni in modalità online."""
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata
        )
        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

I comandi principali di Alembic per la gestione delle migrazioni sono i seguenti:

# Generazione automatica di una migrazione basata sulle modifiche ai modelli
alembic revision --autogenerate -m "Aggiunta tabella articoli"

# Applicazione di tutte le migrazioni pendenti
alembic upgrade head

# Ripristino dell'ultima migrazione
alembic downgrade -1

# Visualizzazione dello stato corrente
alembic current

# Visualizzazione della cronologia delle migrazioni
alembic history --verbose

Un esempio di file di migrazione generato automaticamente:

"""Aggiunta tabella articoli

Revision ID: a1b2c3d4e5f6
Revises: 9z8y7x6w5v4u
Create Date: 2025-05-15 10:30:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa

# Identificatori della revisione
revision: str = "a1b2c3d4e5f6"
down_revision: Union[str, None] = "9z8y7x6w5v4u"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Applicazione della migrazione."""
    op.create_table(
        "articles",
        sa.Column("id", sa.Integer(), nullable=False),
        sa.Column("title", sa.String(length=300), nullable=False),
        sa.Column("slug", sa.String(length=350), nullable=False),
        sa.Column("content", sa.Text(), nullable=False),
        sa.Column("is_published", sa.Boolean(), nullable=False),
        sa.Column("author_id", sa.Integer(), nullable=False),
        sa.Column("created_at", sa.DateTime(), nullable=False),
        sa.Column("updated_at", sa.DateTime(), nullable=True),
        sa.ForeignKeyConstraint(
            ["author_id"], ["authors.id"], ondelete="CASCADE"
        ),
        sa.PrimaryKeyConstraint("id"),
        sa.UniqueConstraint("slug"),
    )
    op.create_index("ix_article_author_id", "articles", ["author_id"])
    op.create_index("ix_article_slug", "articles", ["slug"])


def downgrade() -> None:
    """Ripristino della migrazione."""
    op.drop_index("ix_article_slug", table_name="articles")
    op.drop_index("ix_article_author_id", table_name="articles")
    op.drop_table("articles")

Pattern e best practice

In questa sezione vengono illustrati alcuni pattern comuni e best practice per l'utilizzo di SQLAlchemy con PostgreSQL in applicazioni reali.

Pattern Repository

Il pattern Repository astrae l'accesso ai dati dietro un'interfaccia coerente, separando la logica di business dalla logica di persistenza:

from typing import Generic, TypeVar, Type, Optional, Sequence
from sqlalchemy import select
from sqlalchemy.orm import Session

T = TypeVar("T", bound=Base)


class BaseRepository(Generic[T]):
    """Repository generico per le operazioni CRUD."""

    def __init__(self, session: Session, model_class: Type[T]):
        self._session = session
        self._model_class = model_class

    def get_by_id(self, entity_id: int) -> Optional[T]:
        """Recupera un'entità per ID."""
        return self._session.get(self._model_class, entity_id)

    def get_all(
        self, offset: int = 0, limit: int = 100
    ) -> Sequence[T]:
        """Recupera tutte le entità con paginazione."""
        statement = (
            select(self._model_class)
            .offset(offset)
            .limit(limit)
        )
        return self._session.execute(statement).scalars().all()

    def create(self, entity: T) -> T:
        """Crea una nuova entità."""
        self._session.add(entity)
        self._session.flush()
        return entity

    def update(self, entity: T) -> T:
        """Aggiorna un'entità esistente."""
        self._session.merge(entity)
        self._session.flush()
        return entity

    def delete(self, entity: T) -> None:
        """Elimina un'entità."""
        self._session.delete(entity)
        self._session.flush()


class AuthorRepository(BaseRepository[Author]):
    """Repository specifico per gli autori."""

    def __init__(self, session: Session):
        super().__init__(session, Author)

    def find_by_email(self, email: str) -> Optional[Author]:
        """Cerca un autore per indirizzo email."""
        statement = (
            select(Author).where(Author.email == email)
        )
        return self._session.execute(statement).scalars().first()

    def find_by_full_name(
        self, first_name: str, last_name: str
    ) -> Optional[Author]:
        """Cerca un autore per nome completo."""
        statement = (
            select(Author)
            .where(
                Author.first_name == first_name,
                Author.last_name == last_name
            )
        )
        return self._session.execute(statement).scalars().first()


# Esempio di utilizzo
with Session(engine) as session:
    repo = AuthorRepository(session)

    # Creazione di un autore tramite il repository
    new_author = Author(
        first_name="Elsa", last_name="Morante", email="morante@example.com"
    )
    repo.create(new_author)

    # Ricerca per email
    found = repo.find_by_email("morante@example.com")
    if found:
        print(f"Trovato: {found}")

    session.commit()

Mixin per campi comuni

I mixin consentono di definire campi comuni che vengono riutilizzati in più modelli, evitando la duplicazione del codice:

from datetime import datetime
from typing import Optional
from sqlalchemy import func
from sqlalchemy.orm import Mapped, mapped_column, declared_attr


class TimestampMixin:
    """Mixin per aggiungere i campi di timestamp automatici."""

    created_at: Mapped[datetime] = mapped_column(
        server_default=func.now()
    )
    updated_at: Mapped[Optional[datetime]] = mapped_column(
        onupdate=func.now(),
        nullable=True
    )


class SoftDeleteMixin:
    """Mixin per l'eliminazione logica (soft delete)."""

    is_deleted: Mapped[bool] = mapped_column(default=False)
    deleted_at: Mapped[Optional[datetime]] = mapped_column(nullable=True)

    def soft_delete(self) -> None:
        """Segna l'entità come eliminata senza rimuoverla dal database."""
        self.is_deleted = True
        self.deleted_at = datetime.utcnow()


class SlugMixin:
    """Mixin per i campi con slug."""

    @declared_attr
    def slug(cls) -> Mapped[str]:
        """Campo slug con vincolo di unicità."""
        return mapped_column(
            String(350), unique=True, nullable=False, index=True
        )


# Utilizzo dei mixin nei modelli
class Article(TimestampMixin, SoftDeleteMixin, Base):
    """Modello con timestamp automatici e soft delete."""
    __tablename__ = "articles"

    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str] = mapped_column(String(300), nullable=False)
    content: Mapped[str] = mapped_column(Text, nullable=False)

Gestione degli eventi

SQLAlchemy fornisce un sistema di eventi che consente di eseguire logica personalizzata in risposta a operazioni specifiche sul database o sui modelli:

from sqlalchemy import event, inspect
from sqlalchemy.orm import Session
from datetime import datetime
import logging

logger = logging.getLogger(__name__)


@event.listens_for(Session, "before_flush")
def before_flush_handler(session, flush_context, instances):
    """Gestore eseguito prima del flush della sessione."""
    # Registrazione delle entità nuove
    for obj in session.new:
        logger.info(f"Nuova entità: {type(obj).__name__}")

    # Registrazione delle entità modificate
    for obj in session.dirty:
        if session.is_modified(obj):
            # Recupero dei campi modificati
            state = inspect(obj)
            changes = {}
            for attr in state.attrs:
                history = attr.history
                if history.has_changes():
                    changes[attr.key] = {
                        "old": history.deleted,
                        "new": history.added
                    }
            if changes:
                logger.info(
                    f"Entità modificata: {type(obj).__name__}, "
                    f"cambiamenti: {changes}"
                )


@event.listens_for(Author, "before_insert")
def author_before_insert(mapper, connection, target):
    """Normalizzazione dei dati prima dell'inserimento di un autore."""
    # Capitalizzazione del nome e del cognome
    target.first_name = target.first_name.strip().title()
    target.last_name = target.last_name.strip().title()
    # Normalizzazione dell'email
    if target.email:
        target.email = target.email.strip().lower()

Ottimizzazione delle prestazioni

L'ottimizzazione delle prestazioni è un aspetto cruciale quando si lavora con database in applicazioni di produzione. SQLAlchemy offre diversi strumenti e tecniche per migliorare le performance.

Inserimenti e aggiornamenti in blocco

from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert

with Session(engine) as session:
    # Inserimento in blocco ottimizzato
    authors_data = [
        {"first_name": "Luigi", "last_name": "Pirandello", "email": "pirandello@example.com"},
        {"first_name": "Grazia", "last_name": "Deledda", "email": "deledda@example.com"},
        {"first_name": "Eugenio", "last_name": "Montale", "email": "montale@example.com"},
    ]

    # Metodo 1: bulk insert con il Core (più veloce)
    session.execute(
        insert(Author),
        authors_data
    )

    # Metodo 2: inserimento con returning per ottenere gli ID
    statement = (
        insert(Author)
        .values(authors_data)
        .returning(Author.id, Author.first_name, Author.last_name)
    )
    result = session.execute(statement)
    for row in result:
        print(f"Inserito: ID={row.id}, {row.first_name} {row.last_name}")

    session.commit()

Configurazione del pool di connessioni

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Configurazione avanzata del pool di connessioni
engine = create_engine(
    "postgresql+psycopg2://username:password@localhost:5432/my_database",
    poolclass=QueuePool,
    pool_size=20,  # Numero massimo di connessioni nel pool
    max_overflow=10,  # Connessioni extra temporanee
    pool_timeout=30,  # Timeout per ottenere una connessione dal pool
    pool_recycle=1800,  # Ricicla le connessioni dopo 30 minuti
    pool_pre_ping=True,  # Verifica la connessione prima dell'uso
)

Il parametro pool_recycle è particolarmente importante in ambienti di produzione perché previene problemi dovuti a connessioni che il server di database potrebbe chiudere dopo un periodo di inattività. Il valore va impostato a un tempo inferiore rispetto al timeout di connessione configurato in PostgreSQL (parametro idle_in_transaction_session_timeout).

Query con SQL grezzo

In alcuni casi, l'utilizzo di SQL grezzo può essere necessario per query particolarmente complesse o per sfruttare funzionalità specifiche di PostgreSQL non direttamente supportate dall'ORM:

from sqlalchemy import text
from sqlalchemy.orm import Session

with Session(engine) as session:
    # Query con SQL grezzo e parametri
    statement = text("""
        SELECT a.first_name, a.last_name, COUNT(ar.id) AS article_count
        FROM authors a
        LEFT JOIN articles ar ON a.id = ar.author_id
        WHERE ar.is_published = :is_published
        GROUP BY a.id, a.first_name, a.last_name
        HAVING COUNT(ar.id) > :min_articles
        ORDER BY article_count DESC
    """)

    result = session.execute(
        statement,
        {"is_published": True, "min_articles": 2}
    )
    for row in result:
        print(f"{row.first_name} {row.last_name}: {row.article_count}")

È importante utilizzare sempre i parametri nominati (:nome_parametro) anziché l'interpolazione di stringhe per prevenire attacchi di SQL injection. SQLAlchemy si occupa automaticamente dell'escaping e della parametrizzazione della query.

Conclusione

L'integrazione tra SQLAlchemy e PostgreSQL in Python offre una piattaforma robusta e flessibile per lo sviluppo di applicazioni che richiedono un accesso efficiente e sicuro ai dati. SQLAlchemy, con il suo doppio paradigma Core e ORM, si adatta a una vasta gamma di esigenze: dalla prototipazione rapida di piccole applicazioni fino alla gestione di sistemi complessi e ad alte prestazioni.

Le funzionalità specifiche di PostgreSQL, come i tipi JSONB, ARRAY e UUID, l'istruzione UPSERT e il supporto avanzato per le transazioni, sono pienamente accessibili attraverso SQLAlchemy, consentendo di sfruttare al massimo le capacità del database senza rinunciare all'astrazione offerta dall'ORM.

Il supporto nativo per le operazioni asincrone, introdotto con SQLAlchemy 2.0, rende questa combinazione particolarmente adatta allo sviluppo di applicazioni web moderne basate su framework asincroni. Combinato con Alembic per la gestione delle migrazioni, il pattern Repository per l'organizzazione del codice e le varie strategie di ottimizzazione delle prestazioni, SQLAlchemy con PostgreSQL rappresenta una delle scelte più complete e mature per lo sviluppo backend in Python.