SQLAlchemy con PostgreSQL in Python
SQLAlchemy è il toolkit SQL e ORM (Object Relational Mapper) più utilizzato nell'ecosistema Python. Il suo obiettivo principale è quello di fornire un'interfaccia completa e flessibile per interagire con i database relazionali, astraendo le complessità del linguaggio SQL grezzo e offrendo al contempo la possibilità di utilizzarlo quando necessario. PostgreSQL, d'altra parte, è uno dei sistemi di gestione di database relazionali open source più potenti e affidabili disponibili, noto per la sua conformità agli standard SQL, l'estensibilità e le funzionalità avanzate come i tipi di dati JSON, le ricerche full-text e il supporto per le transazioni ACID.
L'accoppiata SQLAlchemy e PostgreSQL rappresenta una scelta estremamente popolare nello sviluppo di applicazioni web e servizi backend in Python. SQLAlchemy supporta due modalità principali di interazione con il database: il Core, che offre un'astrazione a livello SQL mediante un sistema di espressioni, e l'ORM, che consente di mappare le tabelle del database a classi Python, lavorando con oggetti anziché con query SQL dirette.
In questo articolo esploreremo nel dettaglio come configurare, utilizzare e sfruttare al meglio SQLAlchemy con PostgreSQL, partendo dall'installazione e dalla connessione fino alle operazioni CRUD, alle relazioni tra modelli, alle query avanzate e alla gestione delle migrazioni con Alembic.
Installazione e configurazione
Per iniziare a lavorare con SQLAlchemy e PostgreSQL, è necessario installare alcuni pacchetti. Il driver più comune per collegare Python a PostgreSQL è psycopg2, disponibile anche nella variante binaria precompilata psycopg2-binary, più semplice da installare in ambienti di sviluppo.
pip install sqlalchemy psycopg2-binary
In alternativa, è possibile utilizzare il più recente psycopg (versione 3), che offre supporto nativo per le operazioni asincrone e una migliore gestione dei tipi di dati. Per installarlo:
pip install sqlalchemy "psycopg[binary]"
Una volta installati i pacchetti, il primo passo è creare un motore di connessione (engine) che rappresenta il punto di ingresso verso il database. L'engine gestisce il pool di connessioni e funge da factory per le sessioni di lavoro.
from sqlalchemy import create_engine
# Creazione dell'engine con psycopg2
engine = create_engine(
"postgresql+psycopg2://username:password@localhost:5432/my_database",
echo=True, # Abilita il logging delle query SQL
pool_size=10, # Dimensione del pool di connessioni
max_overflow=20, # Connessioni extra oltre il pool
pool_pre_ping=True # Verifica la connessione prima dell'uso
)
La stringa di connessione segue il formato dialect+driver://user:password@host:port/database. Il parametro echo=True è utile durante lo sviluppo perché stampa tutte le query SQL generate da SQLAlchemy nella console. Il parametro pool_pre_ping è particolarmente importante in produzione perché verifica che ogni connessione presa dal pool sia ancora attiva prima di utilizzarla, evitando errori dovuti a connessioni interrotte.
Per utilizzare il driver psycopg (versione 3), è sufficiente modificare il dialetto nella stringa di connessione:
from sqlalchemy import create_engine
# Creazione dell'engine con psycopg3
engine = create_engine(
"postgresql+psycopg://username:password@localhost:5432/my_database",
echo=True
)
Una pratica comune è quella di utilizzare variabili d'ambiente per gestire le credenziali del database, evitando di inserirle direttamente nel codice sorgente:
import os
from sqlalchemy import create_engine
from sqlalchemy.engine import URL
# Costruzione dell'URL di connessione da variabili d'ambiente
connection_url = URL.create(
drivername="postgresql+psycopg2",
username=os.environ.get("DB_USER", "postgres"),
password=os.environ.get("DB_PASSWORD", "secret"),
host=os.environ.get("DB_HOST", "localhost"),
port=int(os.environ.get("DB_PORT", "5432")),
database=os.environ.get("DB_NAME", "my_database")
)
engine = create_engine(connection_url, echo=True)
Definizione dei modelli con l'ORM
SQLAlchemy 2.0 ha introdotto un nuovo stile dichiarativo per la definizione dei modelli ORM basato su DeclarativeBase e sulle annotazioni di tipo di Python. Questo approccio modernizzato sfrutta appieno il sistema di type hints, rendendo il codice più leggibile e compatibile con gli strumenti di analisi statica come mypy.
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Text, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
relationship
)
class Base(DeclarativeBase):
"""Classe base per tutti i modelli ORM."""
pass
class Author(Base):
"""Modello che rappresenta un autore."""
__tablename__ = "authors"
# Chiave primaria con autoincremento
id: Mapped[int] = mapped_column(primary_key=True)
# Nome dell'autore, obbligatorio e unico
first_name: Mapped[str] = mapped_column(String(100), nullable=False)
last_name: Mapped[str] = mapped_column(String(100), nullable=False)
# Email opzionale con vincolo di unicità
email: Mapped[Optional[str]] = mapped_column(
String(255), unique=True, nullable=True
)
# Data di creazione con valore predefinito
created_at: Mapped[datetime] = mapped_column(
default=datetime.utcnow
)
# Relazione uno-a-molti con gli articoli
articles: Mapped[List["Article"]] = relationship(
back_populates="author",
cascade="all, delete-orphan",
lazy="selectin"
)
# Vincolo di unicità composto
__table_args__ = (
UniqueConstraint("first_name", "last_name", name="uq_author_full_name"),
Index("ix_author_last_name", "last_name"),
)
def __repr__(self) -> str:
return f"Author(id={self.id}, name='{self.first_name} {self.last_name}')"
class Article(Base):
"""Modello che rappresenta un articolo."""
__tablename__ = "articles"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(300), nullable=False)
slug: Mapped[str] = mapped_column(
String(350), unique=True, nullable=False
)
content: Mapped[str] = mapped_column(Text, nullable=False)
is_published: Mapped[bool] = mapped_column(default=False)
# Chiave esterna verso la tabella authors
author_id: Mapped[int] = mapped_column(
ForeignKey("authors.id", ondelete="CASCADE"),
nullable=False
)
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
updated_at: Mapped[Optional[datetime]] = mapped_column(
onupdate=datetime.utcnow, nullable=True
)
# Relazione inversa verso l'autore
author: Mapped["Author"] = relationship(back_populates="articles")
# Indice sulla chiave esterna per ottimizzare i join
__table_args__ = (
Index("ix_article_author_id", "author_id"),
Index("ix_article_slug", "slug"),
)
def __repr__(self) -> str:
return f"Article(id={self.id}, title='{self.title}')"
In questo esempio, Mapped[int] e Mapped[str] definiscono il tipo Python dell'attributo, mentre mapped_column() consente di specificare opzioni aggiuntive come la lunghezza massima della stringa, i valori predefiniti, i vincoli di unicità e le chiavi esterne. L'uso di Optional indica che un campo può contenere None, corrispondente a un valore NULL nel database.
La funzione relationship() definisce le relazioni tra i modelli a livello ORM. Il parametro back_populates collega bidirezionalmente i due lati della relazione. Il parametro cascade="all, delete-orphan" specifica che quando un autore viene eliminato, anche tutti i suoi articoli vengono rimossi dal database. Il parametro lazy="selectin" indica la strategia di caricamento: in questo caso gli articoli collegati vengono caricati con una query SELECT ... IN separata ma eseguita contestualmente, evitando il problema delle N+1 query.
Creazione delle tabelle
Una volta definiti i modelli, le tabelle corrispondenti possono essere create nel database con una singola chiamata al metodo create_all() della classe Base:
from sqlalchemy import create_engine
engine = create_engine(
"postgresql+psycopg2://username:password@localhost:5432/my_database"
)
# Creazione di tutte le tabelle definite nei modelli
Base.metadata.create_all(engine)
Questo metodo genera ed esegue le istruzioni CREATE TABLE solo per le tabelle che non esistono ancora nel database. Non modifica le tabelle esistenti: per gestire le modifiche strutturali nel tempo è necessario utilizzare un sistema di migrazioni come Alembic, che verrà trattato più avanti.
Gestione delle sessioni
Le sessioni in SQLAlchemy rappresentano una "zona di lavoro" che tiene traccia di tutti gli oggetti caricati dal database e delle modifiche pendenti. Ogni operazione CRUD passa attraverso una sessione. La classe Session gestisce internamente la mappa di identità (identity map), garantendo che lo stesso record del database sia rappresentato da un unico oggetto Python all'interno della stessa sessione.
from sqlalchemy.orm import Session, sessionmaker
# Creazione di una factory per le sessioni
SessionFactory = sessionmaker(bind=engine)
# Utilizzo con il context manager (consigliato)
with Session(engine) as session:
# Le operazioni vengono eseguite qui
session.commit()
L'utilizzo del context manager with è il metodo raccomandato perché garantisce che la sessione venga chiusa automaticamente anche in caso di eccezioni. In contesti più strutturati, come le applicazioni web, è comune utilizzare un pattern di dependency injection per fornire la sessione ai vari componenti:
from contextlib import contextmanager
from sqlalchemy.orm import Session
@contextmanager
def get_session():
"""Generatore di sessioni con gestione automatica del commit e del rollback."""
session = Session(engine)
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Utilizzo del generatore di sessioni
with get_session() as session:
# Le operazioni sulla sessione vengono gestite automaticamente
pass
Operazioni CRUD
Le operazioni CRUD (Create, Read, Update, Delete) rappresentano le quattro operazioni fondamentali per la manipolazione dei dati in un database. Vediamo come eseguirle con SQLAlchemy.
Creazione di record
Per inserire nuovi record nel database, si creano istanze dei modelli e le si aggiungono alla sessione:
from sqlalchemy.orm import Session
with Session(engine) as session:
# Creazione di un singolo autore
author = Author(
first_name="Italo",
last_name="Calvino",
email="italo.calvino@example.com"
)
session.add(author)
session.flush() # Sincronizza con il database per ottenere l'ID
# Creazione di più articoli in una volta
articles = [
Article(
title="Il sentiero dei nidi di ragno",
slug="il-sentiero-dei-nidi-di-ragno",
content="Contenuto del primo articolo...",
author_id=author.id,
is_published=True
),
Article(
title="Le città invisibili",
slug="le-citta-invisibili",
content="Contenuto del secondo articolo...",
author_id=author.id,
is_published=True
),
Article(
title="Se una notte d'inverno un viaggiatore",
slug="se-una-notte-dinverno-un-viaggiatore",
content="Contenuto del terzo articolo...",
author_id=author.id,
is_published=False
),
]
session.add_all(articles)
session.commit()
print(f"Autore creato con ID: {author.id}")
print(f"Articoli creati: {len(articles)}")
Il metodo flush() sincronizza le modifiche pendenti con il database senza confermarle definitivamente, consentendo di ottenere l'ID generato automaticamente dal database. Il metodo commit() conferma tutte le modifiche in modo permanente all'interno di una transazione.
Lettura di record
SQLAlchemy 2.0 utilizza il metodo select() come punto di partenza per costruire query di lettura. Questo approccio unificato sostituisce i vari metodi query() delle versioni precedenti:
from sqlalchemy import select
from sqlalchemy.orm import Session
with Session(engine) as session:
# Lettura di un singolo record per chiave primaria
author = session.get(Author, 1)
print(author)
# Query con filtro
statement = select(Author).where(Author.last_name == "Calvino")
result = session.execute(statement).scalars().first()
print(result)
# Lettura di tutti gli autori ordinati per cognome
statement = select(Author).order_by(Author.last_name)
all_authors = session.execute(statement).scalars().all()
for a in all_authors:
print(a)
# Query con paginazione
page = 1
per_page = 10
statement = (
select(Article)
.where(Article.is_published == True)
.order_by(Article.created_at.desc())
.offset((page - 1) * per_page)
.limit(per_page)
)
paginated_articles = session.execute(statement).scalars().all()
for art in paginated_articles:
print(f"{art.title} - Pubblicato: {art.is_published}")
Il metodo session.get() è ottimizzato per la ricerca per chiave primaria: consulta prima la mappa di identità della sessione e accede al database solo se l'oggetto non è già stato caricato. Il metodo scalars() estrae direttamente gli oggetti del modello dal risultato, evitando di lavorare con tuple di riga.
Aggiornamento di record
Per aggiornare i record, è possibile modificare gli attributi degli oggetti caricati nella sessione oppure utilizzare un'istruzione UPDATE in blocco:
from sqlalchemy import update
from sqlalchemy.orm import Session
with Session(engine) as session:
# Aggiornamento tramite modifica dell'oggetto
author = session.get(Author, 1)
if author:
author.email = "nuovo.indirizzo@example.com"
session.commit()
# Aggiornamento in blocco con istruzione UPDATE
statement = (
update(Article)
.where(Article.is_published == False)
.values(is_published=True)
)
result = session.execute(statement)
session.commit()
print(f"Articoli aggiornati: {result.rowcount}")
Quando si modificano gli attributi di un oggetto già presente nella sessione, SQLAlchemy tiene traccia automaticamente delle modifiche grazie al sistema di dirty tracking. Al momento del commit(), solo le colonne effettivamente modificate vengono incluse nell'istruzione UPDATE generata.
Eliminazione di record
L'eliminazione segue un pattern simile, con la possibilità di rimuovere singoli oggetti o di eseguire eliminazioni in blocco:
from sqlalchemy import delete
from sqlalchemy.orm import Session
with Session(engine) as session:
# Eliminazione di un singolo oggetto
author = session.get(Author, 1)
if author:
session.delete(author)
session.commit()
# Eliminazione in blocco
statement = (
delete(Article)
.where(Article.is_published == False)
)
result = session.execute(statement)
session.commit()
print(f"Articoli eliminati: {result.rowcount}")
Quando si elimina un autore con session.delete(), la cascade "all, delete-orphan" configurata nella relazione assicura che anche tutti gli articoli associati vengano eliminati. Nelle eliminazioni in blocco con delete(), tuttavia, le cascade ORM non vengono applicate: è la clausola ON DELETE CASCADE definita sulla chiave esterna a livello di database a gestire l'eliminazione degli articoli collegati.
Query avanzate
SQLAlchemy offre un sistema di espressioni molto ricco che consente di costruire query complesse in modo programmatico. Questa sezione esplora alcune delle funzionalità più utili per le query avanzate.
Filtri composti e operatori
from sqlalchemy import select, and_, or_, not_, func
from sqlalchemy.orm import Session
with Session(engine) as session:
# Condizioni multiple con AND e OR
statement = (
select(Article)
.where(
and_(
Article.is_published == True,
or_(
Article.title.ilike("%python%"),
Article.title.ilike("%postgresql%")
)
)
)
)
results = session.execute(statement).scalars().all()
# Utilizzo di BETWEEN e IN
from datetime import datetime, timedelta
one_month_ago = datetime.utcnow() - timedelta(days=30)
statement = (
select(Article)
.where(
Article.created_at >= one_month_ago,
Article.author_id.in_([1, 2, 3])
)
)
recent_articles = session.execute(statement).scalars().all()
# Negazione di una condizione
statement = (
select(Author)
.where(not_(Author.email.is_(None)))
)
authors_with_email = session.execute(statement).scalars().all()
Join e caricamento delle relazioni
from sqlalchemy import select, func
from sqlalchemy.orm import Session, joinedload, selectinload
with Session(engine) as session:
# Join esplicito tra autori e articoli
statement = (
select(Author, func.count(Article.id).label("article_count"))
.join(Article, Author.id == Article.author_id, isouter=True)
.group_by(Author.id)
.having(func.count(Article.id) > 0)
.order_by(func.count(Article.id).desc())
)
results = session.execute(statement).all()
for author, count in results:
print(f"{author.first_name} {author.last_name}: {count} articoli")
# Eager loading con joinedload (una sola query con JOIN)
statement = (
select(Author)
.options(joinedload(Author.articles))
.where(Author.id == 1)
)
author = session.execute(statement).scalars().unique().first()
if author:
# Gli articoli sono già caricati, nessuna query aggiuntiva
for art in author.articles:
print(art.title)
# Eager loading con selectinload (query separata con IN)
statement = (
select(Author)
.options(selectinload(Author.articles))
.order_by(Author.last_name)
)
authors = session.execute(statement).scalars().all()
for a in authors:
print(f"{a.first_name}: {[art.title for art in a.articles]}")
Le strategie di caricamento eager sono fondamentali per evitare il problema delle N+1 query. Con joinedload, le relazioni vengono caricate con un'unica query SQL contenente un JOIN. Con selectinload, viene eseguita una query aggiuntiva con una clausola IN per caricare gli oggetti correlati. Quest'ultima strategia è generalmente preferibile quando si caricano collezioni di grandi dimensioni, perché evita la duplicazione delle righe della tabella principale che si verifica con i JOIN.
Subquery e CTE
from sqlalchemy import select, func
from sqlalchemy.orm import Session
with Session(engine) as session:
# Subquery: autori con più di 5 articoli pubblicati
subquery = (
select(Article.author_id)
.where(Article.is_published == True)
.group_by(Article.author_id)
.having(func.count(Article.id) > 5)
).subquery()
statement = (
select(Author)
.where(Author.id.in_(select(subquery.c.author_id)))
)
prolific_authors = session.execute(statement).scalars().all()
# Common Table Expression (CTE)
article_stats_cte = (
select(
Article.author_id,
func.count(Article.id).label("total_articles"),
func.sum(
func.cast(Article.is_published, Integer)
).label("published_count")
)
.group_by(Article.author_id)
).cte("article_stats")
statement = (
select(
Author.first_name,
Author.last_name,
article_stats_cte.c.total_articles,
article_stats_cte.c.published_count
)
.join(
article_stats_cte,
Author.id == article_stats_cte.c.author_id
)
.order_by(article_stats_cte.c.total_articles.desc())
)
stats = session.execute(statement).all()
for row in stats:
print(
f"{row.first_name} {row.last_name}: "
f"{row.total_articles} totali, "
f"{row.published_count} pubblicati"
)
Funzioni aggregate e raggruppamento
from sqlalchemy import select, func, extract
from sqlalchemy.orm import Session
with Session(engine) as session:
# Conteggio degli articoli per autore
statement = (
select(
Author.last_name,
func.count(Article.id).label("total"),
func.min(Article.created_at).label("first_article"),
func.max(Article.created_at).label("last_article")
)
.join(Article)
.group_by(Author.last_name)
.order_by(func.count(Article.id).desc())
)
results = session.execute(statement).all()
for row in results:
print(
f"{row.last_name}: {row.total} articoli "
f"(dal {row.first_article} al {row.last_article})"
)
# Raggruppamento per mese con extract()
statement = (
select(
extract("year", Article.created_at).label("year"),
extract("month", Article.created_at).label("month"),
func.count(Article.id).label("count")
)
.group_by("year", "month")
.order_by("year", "month")
)
monthly_stats = session.execute(statement).all()
for row in monthly_stats:
print(f"{int(row.year)}-{int(row.month):02d}: {row.count} articoli")
Funzionalità specifiche di PostgreSQL
SQLAlchemy offre un supporto eccellente per le funzionalità specifiche di PostgreSQL attraverso il modulo sqlalchemy.dialects.postgresql. Questo consente di sfruttare tipi di dati e operatori nativi di PostgreSQL direttamente nei modelli e nelle query.
Tipi di dati PostgreSQL
from sqlalchemy import String
from sqlalchemy.dialects.postgresql import (
ARRAY, JSONB, UUID, INET, DATERANGE, TSVECTOR
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import uuid
class Base(DeclarativeBase):
pass
class UserProfile(Base):
"""Modello che utilizza tipi di dati specifici di PostgreSQL."""
__tablename__ = "user_profiles"
# UUID come chiave primaria
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True),
primary_key=True,
default=uuid.uuid4
)
username: Mapped[str] = mapped_column(String(50), unique=True)
# Array di stringhe per i tag
tags: Mapped[list] = mapped_column(
ARRAY(String(50)),
default=list
)
# JSONB per dati strutturati flessibili
preferences: Mapped[dict] = mapped_column(
JSONB,
default=dict
)
# Indirizzo IP
ip_address: Mapped[str] = mapped_column(
INET,
nullable=True
)
# Vettore di ricerca full-text
search_vector: Mapped[str] = mapped_column(
TSVECTOR,
nullable=True
)
Operazioni su JSONB
PostgreSQL offre operatori potenti per interrogare e manipolare i dati JSONB. SQLAlchemy li espone in modo naturale attraverso gli operatori degli attributi:
from sqlalchemy import select, cast, String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Session
with Session(engine) as session:
# Accesso a una chiave del campo JSONB
statement = (
select(UserProfile)
.where(
UserProfile.preferences["theme"].astext == "dark"
)
)
dark_theme_users = session.execute(statement).scalars().all()
# Accesso a chiavi nidificate
statement = (
select(UserProfile)
.where(
UserProfile.preferences["notifications"]["email"].astext == "true"
)
)
email_notify_users = session.execute(statement).scalars().all()
# Verifica dell'esistenza di una chiave
statement = (
select(UserProfile)
.where(UserProfile.preferences.has_key("theme"))
)
users_with_theme = session.execute(statement).scalars().all()
# Contenimento: il JSONB contiene un sotto-oggetto
statement = (
select(UserProfile)
.where(
UserProfile.preferences.contains({"theme": "dark"})
)
)
results = session.execute(statement).scalars().all()
Operazioni su ARRAY
from sqlalchemy import select
from sqlalchemy.orm import Session
with Session(engine) as session:
# Verifica se un array contiene un valore
statement = (
select(UserProfile)
.where(UserProfile.tags.contains(["python"]))
)
python_users = session.execute(statement).scalars().all()
# Sovrapposizione tra array (almeno un elemento in comune)
statement = (
select(UserProfile)
.where(
UserProfile.tags.overlap(["python", "postgresql"])
)
)
matching_users = session.execute(statement).scalars().all()
# Accesso a un elemento specifico dell'array (indice 1-based in PostgreSQL)
statement = (
select(UserProfile)
.where(UserProfile.tags[1] == "python")
)
results = session.execute(statement).scalars().all()
Upsert con ON CONFLICT
PostgreSQL supporta l'istruzione INSERT ... ON CONFLICT, nota anche come "upsert", che consente di gestire i conflitti durante l'inserimento. SQLAlchemy fornisce un supporto diretto per questa funzionalità:
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
with Session(engine) as session:
# Upsert: inserisce o aggiorna in caso di conflitto
statement = insert(Author).values(
first_name="Italo",
last_name="Calvino",
email="calvino@example.com"
)
# In caso di conflitto sulla colonna email, aggiorna i campi specificati
statement = statement.on_conflict_do_update(
index_elements=["email"],
set_={
"first_name": statement.excluded.first_name,
"last_name": statement.excluded.last_name,
}
)
session.execute(statement)
session.commit()
# Upsert in blocco con più valori
values_list = [
{"first_name": "Umberto", "last_name": "Eco", "email": "eco@example.com"},
{"first_name": "Elena", "last_name": "Ferrante", "email": "ferrante@example.com"},
]
statement = insert(Author).values(values_list)
statement = statement.on_conflict_do_nothing(
index_elements=["email"]
)
session.execute(statement)
session.commit()
Relazioni avanzate
Oltre alle relazioni uno-a-molti viste in precedenza, SQLAlchemy supporta relazioni molti-a-molti e relazioni con tabelle di associazione personalizzate.
Relazione molti-a-molti
from sqlalchemy import Table, Column, Integer, ForeignKey, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from typing import List
class Base(DeclarativeBase):
pass
# Tabella di associazione per la relazione molti-a-molti
article_tag_table = Table(
"article_tags",
Base.metadata,
Column(
"article_id",
Integer,
ForeignKey("articles.id", ondelete="CASCADE"),
primary_key=True
),
Column(
"tag_id",
Integer,
ForeignKey("tags.id", ondelete="CASCADE"),
primary_key=True
),
)
class Tag(Base):
"""Modello che rappresenta un tag."""
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
# Relazione molti-a-molti con gli articoli
articles: Mapped[List["Article"]] = relationship(
secondary=article_tag_table,
back_populates="tags",
lazy="selectin"
)
def __repr__(self) -> str:
return f"Tag(id={self.id}, name='{self.name}')"
class Article(Base):
"""Modello che rappresenta un articolo con tag."""
__tablename__ = "articles"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(300), nullable=False)
# Relazione molti-a-molti con i tag
tags: Mapped[List["Tag"]] = relationship(
secondary=article_tag_table,
back_populates="articles",
lazy="selectin"
)
L'utilizzo delle relazioni molti-a-molti è diretto:
from sqlalchemy.orm import Session
with Session(engine) as session:
# Creazione di tag
tag_python = Tag(name="python")
tag_database = Tag(name="database")
tag_tutorial = Tag(name="tutorial")
session.add_all([tag_python, tag_database, tag_tutorial])
session.flush()
# Creazione di un articolo con tag associati
article = Article(title="Guida a SQLAlchemy con PostgreSQL")
article.tags.extend([tag_python, tag_database, tag_tutorial])
session.add(article)
session.commit()
# Lettura dei tag di un articolo
loaded_article = session.get(Article, article.id)
print(f"Tag dell'articolo: {[t.name for t in loaded_article.tags]}")
# Ricerca di articoli per tag
from sqlalchemy import select
statement = (
select(Article)
.where(Article.tags.any(Tag.name == "python"))
)
python_articles = session.execute(statement).scalars().all()
print(f"Articoli con tag 'python': {len(python_articles)}")
Relazione con tabella di associazione personalizzata
Quando la tabella di associazione deve contenere campi aggiuntivi (come un timestamp o un ruolo), è necessario definirla come un modello ORM completo:
from datetime import datetime
from sqlalchemy import ForeignKey, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
from typing import List, Optional
class Base(DeclarativeBase):
pass
class ArticleAuthorAssociation(Base):
"""Tabella di associazione con dati aggiuntivi."""
__tablename__ = "article_author_associations"
article_id: Mapped[int] = mapped_column(
ForeignKey("articles.id", ondelete="CASCADE"),
primary_key=True
)
author_id: Mapped[int] = mapped_column(
ForeignKey("authors.id", ondelete="CASCADE"),
primary_key=True
)
# Ruolo dell'autore nell'articolo
role: Mapped[str] = mapped_column(
String(50), default="contributor"
)
assigned_at: Mapped[datetime] = mapped_column(
default=datetime.utcnow
)
# Relazioni verso i modelli principali
article: Mapped["Article"] = relationship(
back_populates="author_associations"
)
author: Mapped["Author"] = relationship(
back_populates="article_associations"
)
Transazioni e isolamento
La gestione corretta delle transazioni è fondamentale per garantire la consistenza dei dati. SQLAlchemy offre un controllo granulare sulle transazioni, inclusa la possibilità di specificare il livello di isolamento e di utilizzare i savepoint.
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
# Engine con livello di isolamento predefinito
engine = create_engine(
"postgresql+psycopg2://username:password@localhost:5432/my_database",
isolation_level="READ COMMITTED"
)
with Session(engine) as session:
try:
# Prima operazione
author = Author(first_name="Primo", last_name="Levi")
session.add(author)
session.flush()
# Creazione di un savepoint
savepoint = session.begin_nested()
try:
# Operazione rischiosa
article = Article(
title="Se questo è un uomo",
slug="se-questo-e-un-uomo",
content="Contenuto dell'articolo...",
author_id=author.id
)
session.add(article)
session.flush()
except Exception:
# Rollback solo fino al savepoint
savepoint.rollback()
print("Errore nell'inserimento dell'articolo, savepoint ripristinato")
# Il commit finale include l'autore ma non l'articolo se il savepoint è stato ripristinato
session.commit()
except Exception:
session.rollback()
raise
I savepoint sono particolarmente utili quando si vogliono annullare solo alcune operazioni all'interno di una transazione più ampia, senza dover ripristinare l'intera transazione. PostgreSQL supporta pienamente i savepoint e SQLAlchemy li mappa al costrutto SAVEPOINT nativo.
Per le operazioni che richiedono un livello di isolamento diverso da quello predefinito, è possibile specificarlo a livello di singola connessione:
from sqlalchemy.orm import Session
with Session(engine) as session:
# Impostazione del livello di isolamento per questa sessione
session.connection(
execution_options={"isolation_level": "SERIALIZABLE"}
)
# Le operazioni qui avranno isolamento SERIALIZABLE
statement = select(Author).where(Author.id == 1)
author = session.execute(statement).scalars().first()
if author:
author.email = "aggiornato@example.com"
session.commit()
Operazioni asincrone
SQLAlchemy 2.0 supporta nativamente le operazioni asincrone tramite asyncio, utilizzando driver asincroni come asyncpg o la modalità asincrona di psycopg (versione 3). Questa funzionalità è particolarmente utile nelle applicazioni web costruite con framework asincroni come FastAPI o Starlette.
pip install sqlalchemy asyncpg
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession
)
from sqlalchemy import select
# Creazione dell'engine asincrono
async_engine = create_async_engine(
"postgresql+asyncpg://username:password@localhost:5432/my_database",
echo=True,
pool_size=10
)
# Factory per le sessioni asincrone
AsyncSessionFactory = async_sessionmaker(
async_engine,
class_=AsyncSession,
expire_on_commit=False
)
async def create_tables():
"""Creazione delle tabelle in modo asincrono."""
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_all_authors() -> list[Author]:
"""Lettura di tutti gli autori in modo asincrono."""
async with AsyncSessionFactory() as session:
statement = select(Author).order_by(Author.last_name)
result = await session.execute(statement)
return result.scalars().all()
async def create_author(first_name: str, last_name: str, email: str) -> Author:
"""Creazione di un autore in modo asincrono."""
async with AsyncSessionFactory() as session:
author = Author(
first_name=first_name,
last_name=last_name,
email=email
)
session.add(author)
await session.commit()
await session.refresh(author)
return author
async def search_articles(keyword: str) -> list[Article]:
"""Ricerca di articoli per parola chiave in modo asincrono."""
async with AsyncSessionFactory() as session:
statement = (
select(Article)
.where(
Article.title.ilike(f"%{keyword}%")
)
.order_by(Article.created_at.desc())
)
result = await session.execute(statement)
return result.scalars().all()
Il parametro expire_on_commit=False è importante nelle sessioni asincrone perché evita che gli attributi degli oggetti vengano invalidati dopo il commit. Senza questo parametro, accedere a un attributo dopo il commit richiederebbe un'ulteriore operazione asincrona di refresh, che non può avvenire in modo implicito come nelle sessioni sincrone.
Migrazioni con Alembic
Alembic è lo strumento di migrazione ufficiale per SQLAlchemy. Consente di gestire le modifiche strutturali del database in modo versionato e riproducibile, generando script di migrazione che possono essere applicati o ripristinati.
# Installazione di Alembic
pip install alembic
# Inizializzazione del progetto Alembic
alembic init alembic
Dopo l'inizializzazione, è necessario configurare il file alembic/env.py per puntare ai modelli dell'applicazione:
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# Importazione della classe Base e di tutti i modelli
from app.models import Base
# Configurazione del logging
config = context.config
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Impostazione dei metadati per il rilevamento automatico delle modifiche
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Esecuzione delle migrazioni in modalità offline."""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Esecuzione delle migrazioni in modalità online."""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
I comandi principali di Alembic per la gestione delle migrazioni sono i seguenti:
# Generazione automatica di una migrazione basata sulle modifiche ai modelli
alembic revision --autogenerate -m "Aggiunta tabella articoli"
# Applicazione di tutte le migrazioni pendenti
alembic upgrade head
# Ripristino dell'ultima migrazione
alembic downgrade -1
# Visualizzazione dello stato corrente
alembic current
# Visualizzazione della cronologia delle migrazioni
alembic history --verbose
Un esempio di file di migrazione generato automaticamente:
"""Aggiunta tabella articoli
Revision ID: a1b2c3d4e5f6
Revises: 9z8y7x6w5v4u
Create Date: 2025-05-15 10:30:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# Identificatori della revisione
revision: str = "a1b2c3d4e5f6"
down_revision: Union[str, None] = "9z8y7x6w5v4u"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Applicazione della migrazione."""
op.create_table(
"articles",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("title", sa.String(length=300), nullable=False),
sa.Column("slug", sa.String(length=350), nullable=False),
sa.Column("content", sa.Text(), nullable=False),
sa.Column("is_published", sa.Boolean(), nullable=False),
sa.Column("author_id", sa.Integer(), nullable=False),
sa.Column("created_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=True),
sa.ForeignKeyConstraint(
["author_id"], ["authors.id"], ondelete="CASCADE"
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("slug"),
)
op.create_index("ix_article_author_id", "articles", ["author_id"])
op.create_index("ix_article_slug", "articles", ["slug"])
def downgrade() -> None:
"""Ripristino della migrazione."""
op.drop_index("ix_article_slug", table_name="articles")
op.drop_index("ix_article_author_id", table_name="articles")
op.drop_table("articles")
Pattern e best practice
In questa sezione vengono illustrati alcuni pattern comuni e best practice per l'utilizzo di SQLAlchemy con PostgreSQL in applicazioni reali.
Pattern Repository
Il pattern Repository astrae l'accesso ai dati dietro un'interfaccia coerente, separando la logica di business dalla logica di persistenza:
from typing import Generic, TypeVar, Type, Optional, Sequence
from sqlalchemy import select
from sqlalchemy.orm import Session
T = TypeVar("T", bound=Base)
class BaseRepository(Generic[T]):
"""Repository generico per le operazioni CRUD."""
def __init__(self, session: Session, model_class: Type[T]):
self._session = session
self._model_class = model_class
def get_by_id(self, entity_id: int) -> Optional[T]:
"""Recupera un'entità per ID."""
return self._session.get(self._model_class, entity_id)
def get_all(
self, offset: int = 0, limit: int = 100
) -> Sequence[T]:
"""Recupera tutte le entità con paginazione."""
statement = (
select(self._model_class)
.offset(offset)
.limit(limit)
)
return self._session.execute(statement).scalars().all()
def create(self, entity: T) -> T:
"""Crea una nuova entità."""
self._session.add(entity)
self._session.flush()
return entity
def update(self, entity: T) -> T:
"""Aggiorna un'entità esistente."""
self._session.merge(entity)
self._session.flush()
return entity
def delete(self, entity: T) -> None:
"""Elimina un'entità."""
self._session.delete(entity)
self._session.flush()
class AuthorRepository(BaseRepository[Author]):
"""Repository specifico per gli autori."""
def __init__(self, session: Session):
super().__init__(session, Author)
def find_by_email(self, email: str) -> Optional[Author]:
"""Cerca un autore per indirizzo email."""
statement = (
select(Author).where(Author.email == email)
)
return self._session.execute(statement).scalars().first()
def find_by_full_name(
self, first_name: str, last_name: str
) -> Optional[Author]:
"""Cerca un autore per nome completo."""
statement = (
select(Author)
.where(
Author.first_name == first_name,
Author.last_name == last_name
)
)
return self._session.execute(statement).scalars().first()
# Esempio di utilizzo
with Session(engine) as session:
repo = AuthorRepository(session)
# Creazione di un autore tramite il repository
new_author = Author(
first_name="Elsa", last_name="Morante", email="morante@example.com"
)
repo.create(new_author)
# Ricerca per email
found = repo.find_by_email("morante@example.com")
if found:
print(f"Trovato: {found}")
session.commit()
Mixin per campi comuni
I mixin consentono di definire campi comuni che vengono riutilizzati in più modelli, evitando la duplicazione del codice:
from datetime import datetime
from typing import Optional
from sqlalchemy import func
from sqlalchemy.orm import Mapped, mapped_column, declared_attr
class TimestampMixin:
"""Mixin per aggiungere i campi di timestamp automatici."""
created_at: Mapped[datetime] = mapped_column(
server_default=func.now()
)
updated_at: Mapped[Optional[datetime]] = mapped_column(
onupdate=func.now(),
nullable=True
)
class SoftDeleteMixin:
"""Mixin per l'eliminazione logica (soft delete)."""
is_deleted: Mapped[bool] = mapped_column(default=False)
deleted_at: Mapped[Optional[datetime]] = mapped_column(nullable=True)
def soft_delete(self) -> None:
"""Segna l'entità come eliminata senza rimuoverla dal database."""
self.is_deleted = True
self.deleted_at = datetime.utcnow()
class SlugMixin:
"""Mixin per i campi con slug."""
@declared_attr
def slug(cls) -> Mapped[str]:
"""Campo slug con vincolo di unicità."""
return mapped_column(
String(350), unique=True, nullable=False, index=True
)
# Utilizzo dei mixin nei modelli
class Article(TimestampMixin, SoftDeleteMixin, Base):
"""Modello con timestamp automatici e soft delete."""
__tablename__ = "articles"
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str] = mapped_column(String(300), nullable=False)
content: Mapped[str] = mapped_column(Text, nullable=False)
Gestione degli eventi
SQLAlchemy fornisce un sistema di eventi che consente di eseguire logica personalizzata in risposta a operazioni specifiche sul database o sui modelli:
from sqlalchemy import event, inspect
from sqlalchemy.orm import Session
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
@event.listens_for(Session, "before_flush")
def before_flush_handler(session, flush_context, instances):
"""Gestore eseguito prima del flush della sessione."""
# Registrazione delle entità nuove
for obj in session.new:
logger.info(f"Nuova entità: {type(obj).__name__}")
# Registrazione delle entità modificate
for obj in session.dirty:
if session.is_modified(obj):
# Recupero dei campi modificati
state = inspect(obj)
changes = {}
for attr in state.attrs:
history = attr.history
if history.has_changes():
changes[attr.key] = {
"old": history.deleted,
"new": history.added
}
if changes:
logger.info(
f"Entità modificata: {type(obj).__name__}, "
f"cambiamenti: {changes}"
)
@event.listens_for(Author, "before_insert")
def author_before_insert(mapper, connection, target):
"""Normalizzazione dei dati prima dell'inserimento di un autore."""
# Capitalizzazione del nome e del cognome
target.first_name = target.first_name.strip().title()
target.last_name = target.last_name.strip().title()
# Normalizzazione dell'email
if target.email:
target.email = target.email.strip().lower()
Ottimizzazione delle prestazioni
L'ottimizzazione delle prestazioni è un aspetto cruciale quando si lavora con database in applicazioni di produzione. SQLAlchemy offre diversi strumenti e tecniche per migliorare le performance.
Inserimenti e aggiornamenti in blocco
from sqlalchemy.orm import Session
from sqlalchemy.dialects.postgresql import insert
with Session(engine) as session:
# Inserimento in blocco ottimizzato
authors_data = [
{"first_name": "Luigi", "last_name": "Pirandello", "email": "pirandello@example.com"},
{"first_name": "Grazia", "last_name": "Deledda", "email": "deledda@example.com"},
{"first_name": "Eugenio", "last_name": "Montale", "email": "montale@example.com"},
]
# Metodo 1: bulk insert con il Core (più veloce)
session.execute(
insert(Author),
authors_data
)
# Metodo 2: inserimento con returning per ottenere gli ID
statement = (
insert(Author)
.values(authors_data)
.returning(Author.id, Author.first_name, Author.last_name)
)
result = session.execute(statement)
for row in result:
print(f"Inserito: ID={row.id}, {row.first_name} {row.last_name}")
session.commit()
Configurazione del pool di connessioni
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Configurazione avanzata del pool di connessioni
engine = create_engine(
"postgresql+psycopg2://username:password@localhost:5432/my_database",
poolclass=QueuePool,
pool_size=20, # Numero massimo di connessioni nel pool
max_overflow=10, # Connessioni extra temporanee
pool_timeout=30, # Timeout per ottenere una connessione dal pool
pool_recycle=1800, # Ricicla le connessioni dopo 30 minuti
pool_pre_ping=True, # Verifica la connessione prima dell'uso
)
Il parametro pool_recycle è particolarmente importante in ambienti di produzione perché previene problemi dovuti a connessioni che il server di database potrebbe chiudere dopo un periodo di inattività. Il valore va impostato a un tempo inferiore rispetto al timeout di connessione configurato in PostgreSQL (parametro idle_in_transaction_session_timeout).
Query con SQL grezzo
In alcuni casi, l'utilizzo di SQL grezzo può essere necessario per query particolarmente complesse o per sfruttare funzionalità specifiche di PostgreSQL non direttamente supportate dall'ORM:
from sqlalchemy import text
from sqlalchemy.orm import Session
with Session(engine) as session:
# Query con SQL grezzo e parametri
statement = text("""
SELECT a.first_name, a.last_name, COUNT(ar.id) AS article_count
FROM authors a
LEFT JOIN articles ar ON a.id = ar.author_id
WHERE ar.is_published = :is_published
GROUP BY a.id, a.first_name, a.last_name
HAVING COUNT(ar.id) > :min_articles
ORDER BY article_count DESC
""")
result = session.execute(
statement,
{"is_published": True, "min_articles": 2}
)
for row in result:
print(f"{row.first_name} {row.last_name}: {row.article_count}")
È importante utilizzare sempre i parametri nominati (:nome_parametro) anziché l'interpolazione di stringhe per prevenire attacchi di SQL injection. SQLAlchemy si occupa automaticamente dell'escaping e della parametrizzazione della query.
Conclusione
L'integrazione tra SQLAlchemy e PostgreSQL in Python offre una piattaforma robusta e flessibile per lo sviluppo di applicazioni che richiedono un accesso efficiente e sicuro ai dati. SQLAlchemy, con il suo doppio paradigma Core e ORM, si adatta a una vasta gamma di esigenze: dalla prototipazione rapida di piccole applicazioni fino alla gestione di sistemi complessi e ad alte prestazioni.
Le funzionalità specifiche di PostgreSQL, come i tipi JSONB, ARRAY e UUID, l'istruzione UPSERT e il supporto avanzato per le transazioni, sono pienamente accessibili attraverso SQLAlchemy, consentendo di sfruttare al massimo le capacità del database senza rinunciare all'astrazione offerta dall'ORM.
Il supporto nativo per le operazioni asincrone, introdotto con SQLAlchemy 2.0, rende questa combinazione particolarmente adatta allo sviluppo di applicazioni web moderne basate su framework asincroni. Combinato con Alembic per la gestione delle migrazioni, il pattern Repository per l'organizzazione del codice e le varie strategie di ottimizzazione delle prestazioni, SQLAlchemy con PostgreSQL rappresenta una delle scelte più complete e mature per lo sviluppo backend in Python.