Crea il tuo Redis in Python

Crea il tuo Redis in Python

Redis è uno dei datastore in-memory più utilizzati al mondo: viene impiegato come cache, come broker di messaggi, come archivio di sessioni e come database chiave-valore ad altissime prestazioni. Dietro la sua reputazione di strumento sofisticato si nasconde in realtà un nucleo concettualmente semplice: un server TCP che mantiene i dati in memoria, parla un protocollo testuale ben definito ed espone un insieme di comandi che operano su poche strutture dati fondamentali.

In questo articolo costruiremo da zero, in Python e usando soltanto la libreria standard, un server compatibile con Redis. Implementeremo il protocollo RESP (Redis Serialization Protocol), un dispatcher dei comandi e i comandi più comuni che operano su stringhe, liste e hash, oltre alla scadenza automatica delle chiavi. Al termine avremo un server con cui sarà possibile interagire direttamente tramite il client ufficiale redis-cli, perché parlerà esattamente lo stesso protocollo.

L'obiettivo non è sostituire Redis, ma comprenderne i meccanismi interni: il parsing del protocollo, la gestione dello stato condiviso, la scadenza delle chiavi e la concorrenza tra client. Sono concetti che si ritrovano, identici, in qualunque servizio di rete stateful.

Il protocollo RESP

Ogni interazione con Redis passa attraverso il protocollo RESP, un formato testuale semplice da analizzare ma sufficientemente espressivo. Il primo byte di ogni messaggio ne identifica il tipo, e ogni elemento termina con la sequenza \r\n (CRLF). I tipi fondamentali sono cinque.

Le simple string iniziano con + e rappresentano risposte brevi di successo, ad esempio +OK\r\n. Gli error iniziano con - e trasportano un messaggio di errore, come -ERR unknown command\r\n. Gli integer iniziano con : e codificano un numero, ad esempio :42\r\n. Le bulk string iniziano con $ seguito dalla lunghezza in byte, poi dai dati veri e propri: $5\r\nhello\r\n; la lunghezza -1 indica un valore nullo ($-1\r\n). Infine gli array iniziano con * seguito dal numero di elementi, ciascuno serializzato a sua volta secondo le regole precedenti.

Un dettaglio importante è che i comandi inviati dal client arrivano sempre come array di bulk string. Quando digitiamo SET name gabriele in redis-cli, sul filo viaggia in realtà questa sequenza:

*3\r\n
$3\r\n
SET\r\n
$4\r\n
name\r\n
$8\r\n
gabriele\r\n

Vale a dire: un array di tre elementi, ciascuno dei quali è una bulk string con la propria lunghezza dichiarata. Saper riconoscere e ricostruire questa struttura è il primo mattone del nostro server.

Leggere i comandi dal client

Leggere da una socket TCP presenta un'insidia ricorrente: i dati non arrivano necessariamente in blocchi che coincidono con i messaggi applicativi. Una singola chiamata a recv potrebbe restituire mezzo comando, oppure due comandi insieme. Per questo motivo introduciamo un lettore con un buffer interno che accumula i byte ricevuti e li consuma man mano che ricostruisce gli elementi del protocollo.

Definiamo innanzitutto un'eccezione dedicata agli errori di protocollo e la classe RESPReader, responsabile della decodifica.

import socket
import threading
import time


class ProtocolError(Exception):
    """Sollevata quando i dati ricevuti non rispettano il protocollo RESP."""
    pass


class RESPReader:
    """Legge e decodifica i comandi dal client secondo il protocollo RESP."""

    def __init__(self, connection):
        self.connection = connection
        # buffer interno per i byte non ancora consumati
        self.buffer = bytearray()

    def _fill_buffer(self):
        # legge un blocco di dati dalla socket e lo accoda al buffer
        chunk = self.connection.recv(4096)
        if not chunk:
            # connessione chiusa dal client
            raise ConnectionError("connessione chiusa dal client")
        self.buffer.extend(chunk)

Il cuore del lettore sono due metodi di basso livello. Il primo, _read_line, estrae una riga terminata da CRLF, riempiendo il buffer finché il terminatore non è disponibile. Il secondo, _read_exactly, legge un numero preciso di byte più il CRLF di chiusura: serve per il corpo delle bulk string, la cui lunghezza è nota in anticipo.

    def _read_line(self):
        # legge fino al terminatore CRLF, riempiendo il buffer se necessario
        while b"\r\n" not in self.buffer:
            self._fill_buffer()
        line, _, rest = self.buffer.partition(b"\r\n")
        self.buffer = bytearray(rest)
        return bytes(line)

    def _read_exactly(self, count):
        # legge esattamente "count" byte più il CRLF finale
        while len(self.buffer) < count + 2:
            self._fill_buffer()
        data = bytes(self.buffer[:count])
        # rimuove i dati letti e il CRLF di chiusura
        self.buffer = bytearray(self.buffer[count + 2:])
        return data

Con questi strumenti la lettura di un comando completo diventa lineare: leggiamo l'intestazione dell'array per conoscere il numero di argomenti, poi per ciascun argomento leggiamo l'intestazione della bulk string (che ne dichiara la lunghezza) e infine il payload corrispondente.

    def read_command(self):
        # un comando RESP è un array di bulk string
        line = self._read_line()
        if not line or line[0:1] != b"*":
            raise ProtocolError("atteso un array RESP")
        element_count = int(line[1:])
        arguments = []
        for _ in range(element_count):
            header = self._read_line()
            if header[0:1] != b"$":
                raise ProtocolError("atteso un bulk string")
            length = int(header[1:])
            payload = self._read_exactly(length)
            arguments.append(payload.decode("utf-8"))
        return arguments

Il metodo restituisce una semplice lista di stringhe, ad esempio ["SET", "name", "gabriele"]. Da questo punto in poi non dovremo più preoccuparci dei byte grezzi: lavoreremo con argomenti già decodificati.

Scrivere le risposte

Speculare al lettore è lo RESPWriter, che serializza le risposte nei vari tipi previsti dal protocollo. Ogni metodo costruisce la rappresentazione testuale corretta e la invia con sendall, che garantisce la trasmissione completa dei byte anche quando il sistema operativo accetta la scrittura in più passaggi.

class RESPWriter:
    """Serializza le risposte del server nel formato RESP."""

    def __init__(self, connection):
        self.connection = connection

    def send_simple_string(self, value):
        self.connection.sendall(f"+{value}\r\n".encode("utf-8"))

    def send_error(self, message):
        self.connection.sendall(f"-{message}\r\n".encode("utf-8"))

    def send_integer(self, value):
        self.connection.sendall(f":{value}\r\n".encode("utf-8"))

    def send_bulk_string(self, value):
        if value is None:
            # bulk string nulla: rappresenta un valore assente
            self.connection.sendall(b"$-1\r\n")
            return
        payload = value.encode("utf-8")
        header = f"${len(payload)}\r\n".encode("utf-8")
        self.connection.sendall(header + payload + b"\r\n")

    def send_array(self, values):
        self.connection.sendall(f"*{len(values)}\r\n".encode("utf-8"))
        for value in values:
            self.send_bulk_string(value)

La distinzione tra simple string e bulk string non è arbitraria. Una simple string è adatta a risposte di controllo brevi e prive di CRLF, come OK o PONG. Una bulk string può invece contenere dati binari arbitrari di lunghezza nota, e gestisce in modo esplicito il caso del valore nullo: è la forma corretta per restituire il contenuto di una chiave.

Lo stato condiviso e la scadenza

Tutti i dati del nostro server vivono in memoria, in un'unica struttura condivisa tra le connessioni. La classe DataStore mantiene due dizionari: uno per i valori veri e propri e uno per i timestamp di scadenza. Un lock protegge l'accesso concorrente, dato che più client potrebbero operare contemporaneamente sulle stesse chiavi.

class DataStore:
    """Contiene i dati condivisi e gestisce la scadenza delle chiavi."""

    def __init__(self):
        self.values = {}
        # mappa chiave -> timestamp di scadenza (in secondi)
        self.expirations = {}
        # lock per proteggere l'accesso concorrente
        self.lock = threading.Lock()

    def _is_expired(self, key):
        expires_at = self.expirations.get(key)
        return expires_at is not None and expires_at <= time.time()

    def _remove_if_expired(self, key):
        # rimozione "pigra": la chiave viene eliminata solo al momento dell'accesso
        if self._is_expired(key):
            self.values.pop(key, None)
            self.expirations.pop(key, None)

La strategia adottata è la lazy expiration, la stessa usata da Redis come meccanismo principale. Una chiave scaduta non viene rimossa immediatamente allo scadere del tempo: resta in memoria finché qualcuno non tenta di accedervi. Al momento dell'accesso, il metodo _remove_if_expired verifica la scadenza e, se necessario, elimina la chiave prima che venga letta. Questo approccio evita di dover mantenere timer attivi per ogni chiave e concentra il costo della pulizia nei momenti in cui i dati vengono effettivamente richiesti.

Il dispatcher dei comandi

Una volta ottenuti gli argomenti decodificati, dobbiamo instradare ciascun comando verso la sua implementazione. Il CommandDispatcher mantiene una tabella che associa il nome di ogni comando al metodo corrispondente. Questo disaccoppiamento rende banale aggiungere nuovi comandi in futuro: basta scrivere il metodo e registrarlo nella tabella.

class CommandDispatcher:
    """Associa i nomi dei comandi alle relative implementazioni."""

    def __init__(self, store):
        self.store = store
        self.handlers = {
            "PING": self.handle_ping,
            "ECHO": self.handle_echo,
            "SET": self.handle_set,
            "GET": self.handle_get,
            "DEL": self.handle_del,
            "EXISTS": self.handle_exists,
            "EXPIRE": self.handle_expire,
            "TTL": self.handle_ttl,
            "LPUSH": self.handle_lpush,
            "RPUSH": self.handle_rpush,
            "LRANGE": self.handle_lrange,
            "LLEN": self.handle_llen,
            "HSET": self.handle_hset,
            "HGET": self.handle_hget,
            "HGETALL": self.handle_hgetall,
        }

    def execute(self, arguments, writer):
        if not arguments:
            return
        # i nomi dei comandi sono trattati senza distinzione di maiuscole
        command_name = arguments[0].upper()
        handler = self.handlers.get(command_name)
        if handler is None:
            writer.send_error(f"ERR comando sconosciuto '{command_name}'")
            return
        handler(arguments, writer)

Si noti che il nome del comando viene normalizzato in maiuscolo, perché Redis è insensibile alle maiuscole sui nomi dei comandi: get, GET e Get sono equivalenti. Se il comando non è riconosciuto, restituiamo un errore esplicito anziché lasciare il client in attesa.

I comandi sulle stringhe

Cominciamo dai comandi più semplici. PING verifica che il server sia raggiungibile e risponde PONG, oppure restituisce il messaggio fornito come argomento. ECHO rimanda indietro la stringa ricevuta ed è utile per i test.

    def handle_ping(self, arguments, writer):
        # PING risponde con PONG, oppure con il messaggio fornito
        if len(arguments) >= 2:
            writer.send_bulk_string(arguments[1])
        else:
            writer.send_simple_string("PONG")

    def handle_echo(self, arguments, writer):
        if len(arguments) != 2:
            writer.send_error("ERR numero di argomenti errato per 'echo'")
            return
        writer.send_bulk_string(arguments[1])

Passiamo ora al cuore di un datastore chiave-valore. SET memorizza un valore e supporta l'opzione EX per impostare contestualmente una scadenza in secondi. È importante rimuovere qualsiasi scadenza preesistente quando una chiave viene riscritta senza opzioni, perché un nuovo SET azzera lo stato precedente della chiave.

    def handle_set(self, arguments, writer):
        # SET key value [EX seconds]
        if len(arguments) < 3:
            writer.send_error("ERR numero di argomenti errato per 'set'")
            return
        key = arguments[1]
        value = arguments[2]
        with self.store.lock:
            self.store.values[key] = value
            # un nuovo SET rimuove un'eventuale scadenza precedente
            self.store.expirations.pop(key, None)
            # gestione opzionale dell'opzione EX (scadenza in secondi)
            if len(arguments) >= 5 and arguments[3].upper() == "EX":
                seconds = int(arguments[4])
                self.store.expirations[key] = time.time() + seconds
        writer.send_simple_string("OK")

Il comando GET recupera il valore associato a una chiave. Prima di leggerlo invoca il controllo di scadenza, garantendo che una chiave scaduta venga trattata come inesistente. Se il valore esiste ma non è una stringa (ad esempio perché la chiave contiene una lista), restituiamo un errore di tipo, esattamente come fa Redis con il suo WRONGTYPE.

    def handle_get(self, arguments, writer):
        if len(arguments) != 2:
            writer.send_error("ERR numero di argomenti errato per 'get'")
            return
        key = arguments[1]
        with self.store.lock:
            self.store._remove_if_expired(key)
            value = self.store.values.get(key)
        if value is None:
            writer.send_bulk_string(None)
        elif isinstance(value, str):
            writer.send_bulk_string(value)
        else:
            writer.send_error("WRONGTYPE il valore non è una stringa")

I comandi DEL ed EXISTS operano su una o più chiavi contemporaneamente e restituiscono un intero: rispettivamente il numero di chiavi effettivamente rimosse e il numero di chiavi presenti. Anche qui il controllo di scadenza precede ogni verifica, così una chiave scaduta non viene mai conteggiata come esistente.

    def handle_del(self, arguments, writer):
        # DEL accetta una o più chiavi e restituisce il numero di chiavi rimosse
        deleted = 0
        with self.store.lock:
            for key in arguments[1:]:
                self.store._remove_if_expired(key)
                if key in self.store.values:
                    self.store.values.pop(key, None)
                    self.store.expirations.pop(key, None)
                    deleted += 1
        writer.send_integer(deleted)

    def handle_exists(self, arguments, writer):
        # EXISTS conta quante delle chiavi indicate sono presenti
        count = 0
        with self.store.lock:
            for key in arguments[1:]:
                self.store._remove_if_expired(key)
                if key in self.store.values:
                    count += 1
        writer.send_integer(count)

Gestire la scadenza esplicita

Oltre all'opzione EX di SET, Redis offre comandi dedicati alla scadenza. EXPIRE imposta un tempo di vita su una chiave già esistente e restituisce 1 in caso di successo, oppure 0 se la chiave non esiste. TTL interroga il tempo residuo: restituisce i secondi rimanenti, -1 se la chiave esiste ma non ha scadenza, e -2 se la chiave non esiste affatto. Questi valori sentinella fanno parte della semantica ufficiale di Redis e vanno rispettati per garantire la compatibilità.

    def handle_expire(self, arguments, writer):
        # EXPIRE imposta una scadenza in secondi su una chiave esistente
        if len(arguments) != 3:
            writer.send_error("ERR numero di argomenti errato per 'expire'")
            return
        key = arguments[1]
        seconds = int(arguments[2])
        with self.store.lock:
            self.store._remove_if_expired(key)
            if key not in self.store.values:
                writer.send_integer(0)
                return
            self.store.expirations[key] = time.time() + seconds
        writer.send_integer(1)

    def handle_ttl(self, arguments, writer):
        # TTL restituisce i secondi rimanenti prima della scadenza
        if len(arguments) != 2:
            writer.send_error("ERR numero di argomenti errato per 'ttl'")
            return
        key = arguments[1]
        with self.store.lock:
            self.store._remove_if_expired(key)
            if key not in self.store.values:
                # -2 indica che la chiave non esiste
                writer.send_integer(-2)
                return
            expires_at = self.store.expirations.get(key)
            if expires_at is None:
                # -1 indica una chiave senza scadenza
                writer.send_integer(-1)
                return
            remaining = int(expires_at - time.time())
        writer.send_integer(remaining)

I comandi sulle liste

Una delle caratteristiche distintive di Redis è il supporto a strutture dati più ricche della semplice stringa. Le liste sono sequenze ordinate di elementi su cui si può operare da entrambe le estremità. LPUSH inserisce in testa, RPUSH in coda; entrambi accettano più valori in una sola invocazione e restituiscono la lunghezza finale della lista.

Rappresentiamo le liste con le normali liste di Python. Il metodo setdefault crea la lista vuota al primo inserimento, ma se la chiave contiene già un valore di tipo diverso restituiamo l'errore WRONGTYPE senza modificare nulla.

    def handle_lpush(self, arguments, writer):
        # LPUSH inserisce uno o più valori in testa alla lista
        if len(arguments) < 3:
            writer.send_error("ERR numero di argomenti errato per 'lpush'")
            return
        key = arguments[1]
        with self.store.lock:
            self.store._remove_if_expired(key)
            current = self.store.values.setdefault(key, [])
            if not isinstance(current, list):
                writer.send_error("WRONGTYPE il valore non è una lista")
                return
            for value in arguments[2:]:
                # ogni nuovo valore diventa il nuovo primo elemento
                current.insert(0, value)
            length = len(current)
        writer.send_integer(length)

    def handle_rpush(self, arguments, writer):
        # RPUSH inserisce uno o più valori in coda alla lista
        if len(arguments) < 3:
            writer.send_error("ERR numero di argomenti errato per 'rpush'")
            return
        key = arguments[1]
        with self.store.lock:
            self.store._remove_if_expired(key)
            current = self.store.values.setdefault(key, [])
            if not isinstance(current, list):
                writer.send_error("WRONGTYPE il valore non è una lista")
                return
            for value in arguments[2:]:
                current.append(value)
            length = len(current)
        writer.send_integer(length)

La lettura avviene con LRANGE, che restituisce gli elementi compresi tra due indici, estremi inclusi. Una particolarità da rispettare è la gestione degli indici negativi: -1 indica l'ultimo elemento, -2 il penultimo e così via. Inoltre, dato che in Redis l'indice finale è inclusivo mentre lo slicing di Python è esclusivo, occorre aggiungere uno all'indice di fine.

    def handle_lrange(self, arguments, writer):
        # LRANGE restituisce gli elementi compresi tra start e stop (inclusi)
        if len(arguments) != 4:
            writer.send_error("ERR numero di argomenti errato per 'lrange'")
            return
        key = arguments[1]
        start = int(arguments[2])
        stop = int(arguments[3])
        with self.store.lock:
            self.store._remove_if_expired(key)
            current = self.store.values.get(key, [])
            if not isinstance(current, list):
                writer.send_error("WRONGTYPE il valore non è una lista")
                return
            length = len(current)
            # normalizza gli indici negativi come fa Redis
            if start < 0:
                start = max(length + start, 0)
            if stop < 0:
                stop = length + stop
            # stop è inclusivo, quindi serve +1 per lo slicing di Python
            result = current[start:stop + 1]
        writer.send_array(result)

    def handle_llen(self, arguments, writer):
        # LLEN restituisce il numero di elementi della lista
        if len(arguments) != 2:
            writer.send_error("ERR numero di argomenti errato per 'llen'")
            return
        key = arguments[1]
        with self.store.lock:
            self.store._remove_if_expired(key)
            current = self.store.values.get(key, [])
            if not isinstance(current, list):
                writer.send_error("WRONGTYPE il valore non è una lista")
                return
            length = len(current)
        writer.send_integer(length)

I comandi sulle hash

Le hash sono mappe di campi e valori associate a una singola chiave, ideali per rappresentare oggetti strutturati come il profilo di un utente. Le modelliamo con i dizionari di Python. HSET imposta una o più coppie campo-valore e restituisce il numero di campi nuovi creati, escludendo quindi gli aggiornamenti di campi già esistenti.

    def handle_hset(self, arguments, writer):
        # HSET key field value [field value ...]
        if len(arguments) < 4 or len(arguments) % 2 != 0:
            writer.send_error("ERR numero di argomenti errato per 'hset'")
            return
        key = arguments[1]
        with self.store.lock:
            self.store._remove_if_expired(key)
            current = self.store.values.setdefault(key, {})
            if not isinstance(current, dict):
                writer.send_error("WRONGTYPE il valore non è una hash")
                return
            added = 0
            pairs = arguments[2:]
            # scorre le coppie campo/valore a passo di due
            for index in range(0, len(pairs), 2):
                field = pairs[index]
                field_value = pairs[index + 1]
                if field not in current:
                    added += 1
                current[field] = field_value
        writer.send_integer(added)

La lettura avviene con HGET, che recupera il valore di un singolo campo, e con HGETALL, che restituisce l'intera hash. Quest'ultimo merita attenzione: in RESP la hash viene serializzata come un array piatto in cui si alternano campo e valore. Spetta al client ricomporre le coppie a partire da questa sequenza.

    def handle_hget(self, arguments, writer):
        # HGET recupera il valore di un singolo campo
        if len(arguments) != 3:
            writer.send_error("ERR numero di argomenti errato per 'hget'")
            return
        key = arguments[1]
        field = arguments[2]
        with self.store.lock:
            self.store._remove_if_expired(key)
            current = self.store.values.get(key)
            if current is None:
                writer.send_bulk_string(None)
                return
            if not isinstance(current, dict):
                writer.send_error("WRONGTYPE il valore non è una hash")
                return
            value = current.get(field)
        writer.send_bulk_string(value)

    def handle_hgetall(self, arguments, writer):
        # HGETALL restituisce tutti i campi e valori come array piatto
        if len(arguments) != 2:
            writer.send_error("ERR numero di argomenti errato per 'hgetall'")
            return
        key = arguments[1]
        with self.store.lock:
            self.store._remove_if_expired(key)
            current = self.store.values.get(key, {})
            if not isinstance(current, dict):
                writer.send_error("WRONGTYPE il valore non è una hash")
                return
            # appiattisce la hash in una sequenza campo, valore, campo, valore...
            flattened = []
            for field, field_value in current.items():
                flattened.append(field)
                flattened.append(field_value)
        writer.send_array(flattened)

Il server TCP e la concorrenza

Con il protocollo, lo stato e i comandi al loro posto, manca soltanto il server che accetti le connessioni e gestisca il ciclo di vita di ciascun client. Adottiamo un modello a thread: ogni connessione viene affidata a un thread dedicato, e il lock del DataStore garantisce che le operazioni sui dati condivisi restino consistenti.

class RedisServer:
    """Server TCP che accetta connessioni e processa i comandi RESP."""

    def __init__(self, host="127.0.0.1", port=6379):
        self.host = host
        self.port = port
        self.store = DataStore()
        self.dispatcher = CommandDispatcher(self.store)

    def start(self):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
            # consente il riutilizzo immediato dell'indirizzo dopo un riavvio
            server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server_socket.bind((self.host, self.port))
            server_socket.listen()
            print(f"server in ascolto su {self.host}:{self.port}")
            while True:
                client_socket, address = server_socket.accept()
                # ogni client viene gestito da un thread dedicato
                thread = threading.Thread(
                    target=self.handle_client,
                    args=(client_socket, address),
                    daemon=True,
                )
                thread.start()

    def handle_client(self, client_socket, address):
        reader = RESPReader(client_socket)
        writer = RESPWriter(client_socket)
        with client_socket:
            while True:
                try:
                    arguments = reader.read_command()
                except (ConnectionError, ProtocolError):
                    # il client ha chiuso la connessione o ha inviato dati non validi
                    break
                self.dispatcher.execute(arguments, writer)


if __name__ == "__main__":
    server = RedisServer()
    server.start()

L'opzione SO_REUSEADDR evita l'errore "address already in use" quando si riavvia il server subito dopo averlo arrestato, situazione frequente in fase di sviluppo. Il ciclo interno di handle_client legge un comando alla volta e lo esegue, interrompendosi in modo pulito quando il client chiude la connessione o invia dati malformati.

Vale la pena soffermarsi sul modello di concorrenza scelto. Redis, nella sua implementazione reale, è fondamentalmente a thread singolo: utilizza un event loop che multiplexa migliaia di connessioni senza bisogno di lock, e questa scelta architetturale elimina alla radice le corse critiche. La nostra versione adotta invece un thread per client perché è più immediata da comprendere, ma proprio per questo introduce la necessità di sincronizzare l'accesso allo stato condiviso. Il lock viene mantenuto per intervalli volutamente brevi, circoscritti alla sola manipolazione dei dizionari; in un'implementazione orientata alla produzione sarebbe preferibile calcolare la risposta all'interno della sezione critica e spostare la scrittura sulla socket all'esterno del lock, in modo che le operazioni di rete non blocchino gli altri client.

Provare il server

Salviamo tutto il codice in un file e avviamo il server. Se Redis non è in esecuzione sulla porta predefinita, il nostro processo si metterà in ascolto sulla 6379.

python3 miniredis.py

Poiché parliamo esattamente il protocollo RESP, possiamo usare il client ufficiale redis-cli per interagire con il nostro server come se fosse un Redis reale.

redis-cli -p 6379
127.0.0.1:6379> PING
PONG
127.0.0.1:6379> SET name gabriele EX 60
OK
127.0.0.1:6379> GET name
"gabriele"
127.0.0.1:6379> TTL name
(integer) 58
127.0.0.1:6379> RPUSH colors red green blue
(integer) 3
127.0.0.1:6379> LRANGE colors 0 -1
1) "red"
2) "green"
3) "blue"
127.0.0.1:6379> HSET user name gabriele role developer
(integer) 2
127.0.0.1:6379> HGETALL user
1) "name"
2) "gabriele"
3) "role"
4) "developer"

Ogni comando si comporta come l'originale, scadenze incluse: attendendo oltre il tempo impostato e rileggendo la chiave, otterremo una risposta nulla.

Conclusione

In poche centinaia di righe abbiamo costruito un server compatibile con Redis che implementa il protocollo RESP, gestisce stringhe, liste e hash, supporta la scadenza delle chiavi e serve più client in parallelo. Soprattutto, abbiamo visto che dietro un sistema apparentemente complesso si nascondono idee chiare e componibili: un parser robusto che affronta la natura a stream del TCP, una tabella di dispatch che mantiene il codice estensibile, uno stato condiviso protetto da un lock e una strategia di scadenza pigra che evita lavoro inutile.

Da qui le direzioni di crescita sono molte. Si potrebbero aggiungere altre strutture dati come i set e i sorted set, implementare la persistenza su disco con snapshot periodici o un log delle operazioni, introdurre il meccanismo di pubblicazione e sottoscrizione, oppure sostituire il modello a thread con un event loop basato su selectors o su asyncio per avvicinarsi all'architettura reale di Redis. Ognuna di queste estensioni si innesta naturalmente sulle fondamenta che abbiamo posto, e affrontarle è il modo migliore per trasformare la comprensione teorica in padronanza concreta.