Usare Memcached in Python
Memcached è un sistema di caching distribuito in memoria ad alte prestazioni, progettato per ridurre il carico sui database e accelerare le applicazioni web dinamiche. Sviluppato originariamente da Brad Fitzpatrick per LiveJournal nel 2003, è oggi utilizzato da grandi piattaforme come Facebook, YouTube, Reddit e Twitter. La sua semplicità architetturale e le prestazioni eccezionali lo rendono uno strumento fondamentale nello stack di molte applicazioni moderne.
In questo articolo vedremo come integrare Memcached in applicazioni Python, esplorando le librerie disponibili, i pattern di utilizzo più comuni e le best practice per ottenere il massimo dalle sue funzionalità.
Cos'è Memcached e come funziona
Memcached è essenzialmente un grande dizionario distribuito che vive in memoria RAM. Memorizza coppie chiave-valore con una scadenza opzionale (TTL, Time To Live) e utilizza un algoritmo LRU (Least Recently Used) per evictare i dati quando la memoria allocata si esaurisce. La comunicazione avviene tramite un protocollo testuale o binario su TCP/UDP, tipicamente sulla porta 11211.
Le caratteristiche principali di Memcached includono:
- Architettura completamente in-memory per latenze nell'ordine dei microsecondi
- Supporto nativo per cluster distribuiti con sharding client-side
- Protocollo semplice basato su comandi testuali come
get,set,delete,incr - Nessuna persistenza su disco (a differenza di Redis)
- Threading multiplo per gestire migliaia di connessioni simultanee
Installazione di Memcached
Prima di integrare Memcached con Python, occorre installare il server. Su sistemi Ubuntu o Debian:
sudo apt update
sudo apt install memcached libmemcached-tools
# Avvio del servizio
sudo systemctl start memcached
sudo systemctl enable memcached
# Verifica dello stato
sudo systemctl status memcached
Su macOS tramite Homebrew:
brew install memcached
brew services start memcached
Per verificare che Memcached sia in ascolto sulla porta predefinita 11211:
echo "stats" | nc localhost 11211
Le librerie Python disponibili
Per interagire con Memcached da Python esistono diverse librerie client, ciascuna con caratteristiche specifiche:
- pymemcache: client puro Python sviluppato da Pinterest, ben mantenuto e raccomandato per la maggior parte dei casi d'uso
- pylibmc: wrapper Python attorno alla libreria C libmemcached, offre le migliori prestazioni
- python-memcached: storico client puro Python, ormai considerato legacy
- aiomcache: client asincrono basato su asyncio, ideale per applicazioni async
In questo articolo utilizzeremo principalmente pymemcache per gli esempi sincroni e aiomcache per quelli asincroni.
Connessione di base con pymemcache
Installiamo pymemcache tramite pip:
pip install pymemcache
Il primo esempio mostra una connessione di base e le operazioni fondamentali di set e get:
from pymemcache.client.base import Client
# Connessione al server Memcached locale
client = Client(('localhost', 11211))
# Memorizzazione di un valore semplice
client.set('username', 'gabriele')
# Recupero del valore
value = client.get('username')
print(value) # Output: b'gabriele'
# Decodifica della stringa
print(value.decode('utf-8')) # Output: gabriele
# Chiusura della connessione
client.close()
Notiamo che pymemcache restituisce i valori come bytes per default. Questo comportamento è dovuto alla natura del protocollo Memcached, che tratta tutti i valori come sequenze binarie.
Serializzazione automatica con i serializer
Per memorizzare strutture dati complesse come dizionari, liste o oggetti personalizzati, dobbiamo configurare un serializer. Pymemcache include un serializer JSON pronto all'uso:
from pymemcache.client.base import Client
from pymemcache import serde
# Client con serializzazione JSON automatica
client = Client(
('localhost', 11211),
serde=serde.pickle_serde
)
# Memorizzazione di un dizionario complesso
user_data = {
'id': 42,
'name': 'Gabriele Romanato',
'roles': ['admin', 'developer'],
'active': True
}
client.set('user:42', user_data, expire=3600)
# Recupero con deserializzazione automatica
retrieved = client.get('user:42')
print(retrieved['name']) # Output: Gabriele Romanato
print(retrieved['roles']) # Output: ['admin', 'developer']
Il parametro expire definisce il TTL in secondi. Dopo 3600 secondi (un'ora) la chiave verrà automaticamente rimossa dalla cache.
Serializer personalizzato
In alcuni casi è utile definire un serializer personalizzato, ad esempio per supportare formati specifici o per applicare compressione. Ecco un esempio che combina JSON e gzip per ridurre l'occupazione di memoria:
import json
import gzip
from pymemcache.client.base import Client
# Costanti per identificare i flag di serializzazione
FLAG_BYTES = 0
FLAG_JSON_GZIP = 1
def custom_serializer(key, value):
# Serializza il valore in JSON e lo comprime con gzip
if isinstance(value, bytes):
return value, FLAG_BYTES
json_string = json.dumps(value)
compressed = gzip.compress(json_string.encode('utf-8'))
return compressed, FLAG_JSON_GZIP
def custom_deserializer(key, value, flags):
# Decomprime e deserializza il valore in base ai flag
if flags == FLAG_BYTES:
return value
if flags == FLAG_JSON_GZIP:
decompressed = gzip.decompress(value)
return json.loads(decompressed.decode('utf-8'))
raise Exception(f"Unknown flag: {flags}")
# Client configurato con il serializer personalizzato
client = Client(
('localhost', 11211),
serializer=custom_serializer,
deserializer=custom_deserializer
)
# Test con un grande oggetto
large_data = {'items': [{'id': i, 'name': f'item-{i}'} for i in range(1000)]}
client.set('large_data', large_data)
result = client.get('large_data')
print(f"Items count: {len(result['items'])}")
Operazioni atomiche e contatori
Memcached supporta operazioni atomiche di incremento e decremento, particolarmente utili per implementare contatori, rate limiter o sistemi di statistiche. Queste operazioni sono garantite thread-safe a livello di server:
from pymemcache.client.base import Client
client = Client(('localhost', 11211))
# Inizializzazione del contatore
client.set('page_views', '0')
# Incremento atomico
new_value = client.incr('page_views', 1)
print(new_value) # Output: 1
# Incrementi multipli
for _ in range(10):
client.incr('page_views', 1)
current = client.get('page_views')
print(current.decode()) # Output: 11
# Decremento
client.decr('page_views', 5)
print(client.get('page_views').decode()) # Output: 6
Un caso d'uso pratico è l'implementazione di un rate limiter per limitare le richieste API per utente:
from pymemcache.client.base import Client
import time
class RateLimiter:
def __init__(self, client, max_requests=100, window_seconds=60):
self.client = client
self.max_requests = max_requests
self.window_seconds = window_seconds
def is_allowed(self, user_id):
# Costruzione della chiave basata su utente e finestra temporale
window = int(time.time()) // self.window_seconds
key = f"rate_limit:{user_id}:{window}"
# Tentativo di incremento; se la chiave non esiste, la inizializziamo
try:
current = self.client.incr(key, 1)
except Exception:
self.client.set(key, '1', expire=self.window_seconds)
current = 1
return current <= self.max_requests
# Utilizzo
client = Client(('localhost', 11211))
limiter = RateLimiter(client, max_requests=10, window_seconds=60)
user_id = 'user_42'
for i in range(12):
if limiter.is_allowed(user_id):
print(f"Request {i + 1}: allowed")
else:
print(f"Request {i + 1}: rate limited")
Operazioni condizionali con CAS
L'operazione CAS (Check And Set) permette di aggiornare un valore solo se non è stato modificato da altri client nel frattempo. È fondamentale per evitare race condition in scenari concorrenti:
from pymemcache.client.base import Client
client = Client(('localhost', 11211))
# Inizializzazione del contatore di stock
client.set('product:1:stock', '100')
def safe_decrement_stock(product_id, quantity):
# Recupero del valore con il token CAS
key = f'product:{product_id}:stock'
result = client.gets(key)
if result is None:
return False
current_stock, cas_token = result
current_value = int(current_stock)
if current_value < quantity:
return False
new_value = current_value - quantity
# Aggiornamento condizionale: riesce solo se il token è ancora valido
success = client.cas(key, str(new_value), cas_token)
return success
# Test della decrementazione sicura
if safe_decrement_stock(1, 10):
print("Stock decremented successfully")
print(f"New stock: {client.get('product:1:stock').decode()}")
else:
print("Concurrent modification detected, retry needed")
Cluster Memcached con consistent hashing
In ambienti di produzione è comune avere più server Memcached per distribuire il carico e aumentare la capacità totale della cache. Pymemcache fornisce HashClient che gestisce automaticamente il sharding tra i nodi:
from pymemcache.client.hash import HashClient
# Configurazione di un cluster con tre nodi
servers = [
('memcached-1.local', 11211),
('memcached-2.local', 11211),
('memcached-3.local', 11211),
]
client = HashClient(
servers,
connect_timeout=1,
timeout=0.5,
ignore_exc=True,
retry_attempts=2
)
# Le chiavi vengono distribuite automaticamente tra i nodi
for i in range(100):
client.set(f'key:{i}', f'value:{i}')
# Il recupero individua automaticamente il nodo corretto
for i in range(5):
value = client.get(f'key:{i}')
print(f'key:{i} -> {value.decode() if value else None}')
Il parametro ignore_exc=True indica al client di non sollevare eccezioni quando un nodo è irraggiungibile, comportandosi come se la chiave non fosse presente. Questo è cruciale per la resilienza: la cache deve degradare in modo silenzioso, mai bloccare l'applicazione.
Connection pooling
Per applicazioni ad alto throughput, creare una nuova connessione per ogni operazione è inefficiente. Pymemcache offre PooledClient che mantiene un pool di connessioni riutilizzabili:
from pymemcache.client.base import PooledClient
from pymemcache import serde
# Client con pool di connessioni
client = PooledClient(
('localhost', 11211),
max_pool_size=20,
connect_timeout=2,
timeout=1,
serde=serde.pickle_serde
)
# Le connessioni vengono prese dal pool e restituite automaticamente
def get_user_profile(user_id):
cache_key = f'profile:{user_id}'
profile = client.get(cache_key)
if profile is None:
# Simulazione di una query al database
profile = fetch_from_database(user_id)
client.set(cache_key, profile, expire=1800)
return profile
def fetch_from_database(user_id):
# Logica di accesso al database
return {'id': user_id, 'name': f'User {user_id}', 'email': f'user{user_id}@example.com'}
# Utilizzo concorrente sicuro
import threading
def worker(thread_id):
for i in range(50):
profile = get_user_profile(i)
# Elaborazione del profilo
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
t.start()
for t in threads:
t.join()
Pattern di caching: cache-aside
Il pattern più comune nell'utilizzo di Memcached è il cache-aside (chiamato anche lazy loading): l'applicazione controlla prima la cache e, in caso di miss, recupera i dati dalla sorgente primaria e li memorizza nella cache per le richieste successive.
from pymemcache.client.base import PooledClient
from pymemcache import serde
import sqlite3
from functools import wraps
client = PooledClient(
('localhost', 11211),
serde=serde.pickle_serde,
max_pool_size=10
)
def cache_aside(key_prefix, ttl=300):
# Decoratore che implementa il pattern cache-aside
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# Costruzione della chiave a partire dagli argomenti
cache_key = f"{key_prefix}:{':'.join(str(a) for a in args)}"
# Controllo della cache
cached_value = client.get(cache_key)
if cached_value is not None:
print(f"Cache HIT: {cache_key}")
return cached_value
# Cache miss: esecuzione della funzione originale
print(f"Cache MISS: {cache_key}")
result = func(*args, **kwargs)
# Memorizzazione del risultato
if result is not None:
client.set(cache_key, result, expire=ttl)
return result
return wrapper
return decorator
@cache_aside('product', ttl=600)
def get_product(product_id):
# Simulazione di query al database
connection = sqlite3.connect('shop.db')
cursor = connection.cursor()
cursor.execute('SELECT id, name, price FROM products WHERE id = ?', (product_id,))
row = cursor.fetchone()
connection.close()
if row is None:
return None
return {'id': row[0], 'name': row[1], 'price': row[2]}
# Prima chiamata: cache miss
product = get_product(1)
# Seconda chiamata: cache hit
product = get_product(1)
Invalidazione della cache
Una delle sfide più complesse del caching è l'invalidazione: quando i dati sottostanti cambiano, la cache deve essere aggiornata o invalidata per evitare di servire dati obsoleti. Esistono diverse strategie:
from pymemcache.client.base import Client
client = Client(('localhost', 11211))
class UserCache:
def __init__(self, client):
self.client = client
def get_user(self, user_id):
return self.client.get(f'user:{user_id}')
def set_user(self, user_id, data, ttl=3600):
# Memorizzazione con TTL
self.client.set(f'user:{user_id}', data, expire=ttl)
def invalidate_user(self, user_id):
# Invalidazione esplicita di una singola chiave
self.client.delete(f'user:{user_id}')
def invalidate_user_with_related(self, user_id):
# Invalidazione di chiavi correlate
keys_to_delete = [
f'user:{user_id}',
f'user:{user_id}:profile',
f'user:{user_id}:preferences',
f'user:{user_id}:orders'
]
# delete_many è più efficiente di chiamate singole
self.client.delete_many(keys_to_delete)
# Versioning delle chiavi come alternativa alla cancellazione
class VersionedCache:
def __init__(self, client):
self.client = client
def get_version(self, entity_type):
# Recupera la versione corrente per un tipo di entità
version = self.client.get(f'version:{entity_type}')
return int(version) if version else 1
def bump_version(self, entity_type):
# Incrementa la versione, invalidando tutte le chiavi correlate
return self.client.incr(f'version:{entity_type}', 1) or 1
def make_key(self, entity_type, entity_id):
version = self.get_version(entity_type)
return f'{entity_type}:v{version}:{entity_id}'
def get(self, entity_type, entity_id):
return self.client.get(self.make_key(entity_type, entity_id))
def set(self, entity_type, entity_id, value, ttl=3600):
self.client.set(self.make_key(entity_type, entity_id), value, expire=ttl)
# Utilizzo del versioning
versioned = VersionedCache(client)
versioned.set('product', 1, {'name': 'Laptop', 'price': 999})
# Tutte le chiavi product:v1:* diventano inaccessibili
versioned.bump_version('product')
Memcached asincrono con aiomcache
Per applicazioni basate su asyncio (FastAPI, aiohttp, Sanic), aiomcache offre un client completamente asincrono. Installiamo la libreria:
pip install aiomcache
Ecco un esempio di utilizzo in un'applicazione FastAPI:
import asyncio
import json
import aiomcache
from fastapi import FastAPI, HTTPException
app = FastAPI()
# Client asincrono condiviso
memcache_client = aiomcache.Client('localhost', 11211, pool_size=10)
async def get_cached_or_fetch(key, fetch_function, ttl=300):
# Recupero dalla cache
cached = await memcache_client.get(key.encode('utf-8'))
if cached is not None:
return json.loads(cached.decode('utf-8'))
# Cache miss: fetch dei dati
data = await fetch_function()
# Memorizzazione in cache
await memcache_client.set(
key.encode('utf-8'),
json.dumps(data).encode('utf-8'),
exptime=ttl
)
return data
async def fetch_user_from_db(user_id):
# Simulazione di query asincrona al database
await asyncio.sleep(0.1)
return {
'id': user_id,
'name': f'User {user_id}',
'email': f'user{user_id}@example.com'
}
@app.get('/users/{user_id}')
async def get_user(user_id: int):
cache_key = f'user:{user_id}'
user = await get_cached_or_fetch(
cache_key,
lambda: fetch_user_from_db(user_id),
ttl=600
)
if user is None:
raise HTTPException(status_code=404, detail='User not found')
return user
@app.delete('/users/{user_id}/cache')
async def invalidate_user_cache(user_id: int):
await memcache_client.delete(f'user:{user_id}'.encode('utf-8'))
return {'status': 'invalidated'}
@app.on_event('shutdown')
async def shutdown():
await memcache_client.close()
Monitoraggio e statistiche
Memcached espone numerose metriche tramite il comando stats, accessibili anche dal client Python. Monitorare queste statistiche è essenziale per valutare l'efficacia della cache:
from pymemcache.client.base import Client
client = Client(('localhost', 11211))
# Recupero delle statistiche generali
stats = client.stats()
# Conversione dei valori in formato leggibile
for key, value in sorted(stats.items()):
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
value_str = value.decode('utf-8') if isinstance(value, bytes) else value
print(f'{key_str}: {value_str}')
# Calcolo del hit ratio
hits = int(stats[b'get_hits'])
misses = int(stats[b'get_misses'])
total = hits + misses
if total > 0:
hit_ratio = (hits / total) * 100
print(f'\nHit ratio: {hit_ratio:.2f}%')
print(f'Total operations: {total}')
# Statistiche specifiche per tipo
slab_stats = client.stats('slabs')
items_stats = client.stats('items')
Un hit ratio sopra l'80% generalmente indica una cache ben configurata. Valori più bassi possono suggerire TTL troppo brevi, dimensioni della cache insufficienti o pattern di accesso non adatti al caching.
Best practice e considerazioni finali
Per utilizzare Memcached in modo efficace nelle applicazioni Python, è importante seguire alcune linee guida fondamentali:
- Naming delle chiavi: utilizzare un naming consistente come
tipo:id:attributo, mantenendo le chiavi sotto i 250 caratteri (limite di Memcached) - Gestione degli errori: la cache deve essere sempre opzionale; l'applicazione deve continuare a funzionare anche con cache non disponibile
- Dimensione dei valori: Memcached limita i valori a 1MB per default; per oggetti più grandi considerare la compressione o lo splitting
- TTL appropriati: bilanciare freshness dei dati e hit ratio; valori troppo lunghi servono dati obsoleti, troppo brevi vanificano il caching
- Sicurezza: Memcached non offre autenticazione nativa nel protocollo testuale; deve sempre essere su rete privata o protetto da firewall
- Connection pooling: utilizzare sempre pool di connessioni in produzione per evitare overhead
- Monitoraggio: integrare le statistiche di Memcached con il sistema di monitoring (Prometheus, Datadog, ecc.)
Memcached rimane uno strumento estremamente valido nel panorama del caching distribuito, particolarmente quando si necessita di una soluzione semplice, veloce e affidabile. Sebbene Redis offra funzionalità più avanzate come strutture dati complesse, pub/sub e persistenza, Memcached eccelle nei casi d'uso puri di caching grazie alla sua architettura focalizzata e alle prestazioni costanti sotto carico elevato. L'integrazione con Python è matura e ben supportata, con librerie come pymemcache che coprono praticamente tutti i casi d'uso reali, dal singolo server allo sharding su cluster di dimensioni considerevoli.