Nella programmazione moderna, la gestione efficiente dei dati è un requisito fondamentale. Che si tratti di elaborare file di grandi dimensioni, leggere dati da una rete, o costruire pipeline di trasformazione, il concetto di stream (flusso) rappresenta uno degli strumenti più potenti a disposizione del programmatore Python. Uno stream è essenzialmente una sequenza di dati resi disponibili nel tempo, che possono essere letti o scritti in modo incrementale, senza la necessità di caricare l'intero contenuto in memoria.
Python offre un ecosistema ricco per lavorare con gli stream, dal modulo built-in io fino a librerie specializzate per stream asincroni come asyncio. In questo articolo esploreremo i concetti fondamentali, i tipi di stream disponibili, le tecniche di composizione e i casi d'uso pratici.
Cosa sono gli Stream
Un stream è un'astrazione che rappresenta una sorgente o destinazione di dati sequenziali. A differenza di una struttura dati statica come una lista o un dizionario, uno stream non richiede che tutti i dati siano presenti in memoria contemporaneamente. I dati vengono prodotti e consumati in modo continuo, elemento dopo elemento o blocco dopo blocco.
Gli stream si distinguono in due categorie principali:
- Stream di lettura (readable streams): sorgenti da cui si leggono dati, come un file aperto, una connessione di rete, o un generatore.
- Stream di scrittura (writable streams): destinazioni verso cui si scrivono dati, come un file, un buffer in memoria, o un socket.
Esistono anche gli stream bidirezionali, che supportano sia la lettura che la scrittura, e gli stream di trasformazione, che leggono dati da una sorgente, li elaborano e li inviano a una destinazione.
Il Modulo io
Il modulo io è il cuore della gestione degli stream in Python. Fornisce le classi base per tutti i tipi di stream e definisce l'interfaccia standard che ogni oggetto file-like deve implementare.
Gerarchia delle classi
Il modulo io organizza gli stream in tre famiglie principali:
- Raw I/O: accesso diretto al sistema operativo, senza buffering.
- Buffered I/O: aggiunge un buffer in memoria per migliorare le prestazioni.
- Text I/O: gestisce la codifica e decodifica del testo.
import io
# Stream binario in memoria
binary_stream = io.BytesIO(b"Dati binari di esempio")
# Lettura dal buffer in memoria
data = binary_stream.read()
print(data) # b'Dati binari di esempio'
# Riposizionamento del cursore all'inizio
binary_stream.seek(0)
# Lettura di una parte dei dati
chunk = binary_stream.read(5)
print(chunk) # b'Dati '
# Stream testuale in memoria
text_stream = io.StringIO("Testo di esempio\nSeconda riga")
# Lettura riga per riga
for line in text_stream:
print(line.strip())
BytesIO e StringIO sono particolarmente utili nei test, quando si vuole simulare la presenza di un file senza crearlo fisicamente sul disco.
Stream su File
La funzione built-in open() restituisce un oggetto stream che appartiene alla gerarchia definita da io. A seconda delle modalità di apertura, si ottiene un'istanza di RawIOBase, BufferedIOBase, o TextIOBase.
import io
def read_file_in_chunks(file_path, chunk_size=4096):
"""Legge un file binario a blocchi di dimensione fissa."""
with open(file_path, "rb") as file_stream:
# Verifica il tipo di stream restituito
print(type(file_stream)) #
while True:
chunk = file_stream.read(chunk_size)
if not chunk:
# Fine del file raggiunta
break
yield chunk
def write_lines_to_file(file_path, lines):
"""Scrive una sequenza di righe in un file di testo."""
with open(file_path, "w", encoding="utf-8") as text_stream:
for line in lines:
text_stream.write(line + "\n")
# Lettura a blocchi di un file
for block in read_file_in_chunks("data.bin"):
process(block)
Generatori come Stream
In Python, i generatori rappresentano uno dei meccanismi più eleganti per implementare stream di dati. Un generatore è una funzione che utilizza la keyword yield per produrre valori uno alla volta, sospendendo la propria esecuzione tra una produzione e l'altra. Questo li rende naturalmente adatti a modellare sequenze potenzialmente infinite o molto grandi.
from typing import Generator, Iterator
import time
def fibonacci_stream() -> Generator[int, None, None]:
"""Genera la sequenza di Fibonacci come stream infinito."""
first, second = 0, 1
while True:
yield first
# Aggiornamento dei due valori correnti
first, second = second, first + second
def take(n: int, stream: Iterator) -> list:
"""Prende i primi n elementi da uno stream."""
result = []
for _ in range(n):
result.append(next(stream))
return result
def throttled_stream(source: Iterator, delay: float) -> Generator:
"""Introduce un ritardo tra gli elementi di uno stream."""
for item in source:
time.sleep(delay)
yield item
# Utilizzo della sequenza di Fibonacci
fib = fibonacci_stream()
print(take(10, fib)) # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]
# Stream rallentato
slow_fib = throttled_stream(fibonacci_stream(), delay=0.1)
for value in take(5, slow_fib):
print(value)
Pipeline di Generatori
I generatori si compongono naturalmente in pipeline, dove l'output di uno stream diventa l'input del successivo. Questo pattern è molto efficiente dal punto di vista della memoria, perché ogni elemento viene elaborato senza che l'intera sequenza venga mai materializzata.
from typing import Iterator
def read_lines(file_path: str) -> Iterator[str]:
"""Legge le righe di un file come stream."""
with open(file_path, "r", encoding="utf-8") as f:
for line in f:
yield line.rstrip("\n")
def filter_empty(lines: Iterator[str]) -> Iterator[str]:
"""Filtra le righe vuote dallo stream."""
for line in lines:
if line.strip():
yield line
def normalize_case(lines: Iterator[str]) -> Iterator[str]:
"""Converte ogni riga in minuscolo."""
for line in lines:
yield line.lower()
def add_line_numbers(lines: Iterator[str]) -> Iterator[tuple[int, str]]:
"""Aggiunge un numero progressivo a ciascuna riga."""
for number, line in enumerate(lines, start=1):
yield number, line
# Composizione della pipeline di trasformazione
def build_pipeline(file_path: str):
raw_lines = read_lines(file_path)
non_empty = filter_empty(raw_lines)
normalized = normalize_case(non_empty)
numbered = add_line_numbers(normalized)
return numbered
# Esecuzione della pipeline
for line_number, content in build_pipeline("input.txt"):
print(f"{line_number:4d}: {content}")
Il bello di questa architettura è che nessuno dei generatori esegue lavoro finché non viene richiesto un elemento. L'intera pipeline è lazy: i dati fluiscono attraverso le trasformazioni solo quando il codice consumatore richiede il prossimo elemento.
Stream con itertools
Il modulo itertools della libreria standard fornisce una collezione di funzioni per lavorare con iteratori e stream in modo efficiente. Tutte le sue funzioni operano in modo lazy e si integrano perfettamente con i generatori.
import itertools
from typing import Iterator
def windowed(stream: Iterator, size: int) -> Iterator[tuple]:
"""Produce finestre scorrevoli di dimensione fissa sullo stream."""
# Creazione di copie indipendenti dell'iteratore
iterators = itertools.tee(stream, size)
# Avanzamento di ciascuna copia di un passo diverso
for index, iterator in enumerate(iterators):
for _ in range(index):
next(iterator, None)
return zip(*iterators)
def batch(stream: Iterator, size: int) -> Iterator[list]:
"""Raggruppa gli elementi dello stream in lotti di dimensione fissa."""
while True:
chunk = list(itertools.islice(stream, size))
if not chunk:
break
yield chunk
def interleave(*streams: Iterator) -> Iterator:
"""Alterna gli elementi di piu stream in sequenza."""
return itertools.chain.from_iterable(zip(*streams))
# Finestre scorrevoli
numbers = iter(range(10))
for window in windowed(numbers, 3):
print(window) # (0,1,2), (1,2,3), ...
# Raggruppamento in lotti
data = iter(range(10))
for group in batch(data, 3):
print(group) # [0,1,2], [3,4,5], [6,7,8], [9]
# Alternanza di stream
evens = iter([0, 2, 4, 6])
odds = iter([1, 3, 5, 7])
for value in interleave(evens, odds):
print(value) # 0, 1, 2, 3, 4, 5, 6, 7
Stream Asincroni con asyncio
Python 3.6 ha introdotto i protocolli per gli stream asincroni: AsyncIterable e AsyncIterator. Con async for e i generatori asincroni, è possibile costruire pipeline di stream che non bloccano il thread di esecuzione durante operazioni di I/O.
Generatori Asincroni
import asyncio
import aiohttp
from typing import AsyncIterator
async def fetch_pages(urls: list[str]) -> AsyncIterator[bytes]:
"""Scarica le pagine indicate dagli URL come stream asincrono."""
async with aiohttp.ClientSession() as session:
for url in urls:
async with session.get(url) as response:
# Attesa asincrona della risposta
content = await response.read()
yield content
async def chunked_response(url: str, chunk_size: int = 1024) -> AsyncIterator[bytes]:
"""Legge una risposta HTTP a blocchi senza caricarla interamente in memoria."""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
async for chunk in response.content.iter_chunked(chunk_size):
yield chunk
async def process_stream():
"""Elabora uno stream di pagine scaricate in modo asincrono."""
urls = [
"https://example.com/page1",
"https://example.com/page2",
]
total_bytes = 0
async for page_content in fetch_pages(urls):
total_bytes += len(page_content)
# Elaborazione del contenuto
print(f"Ricevuti {len(page_content)} byte")
print(f"Totale: {total_bytes} byte")
asyncio.run(process_stream())
Stream TCP con asyncio.streams
Il modulo asyncio fornisce le classi StreamReader e StreamWriter per la gestione di connessioni TCP come stream bidirezionali.
import asyncio
async def handle_connection(
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter
) -> None:
"""Gestisce una singola connessione client come stream bidirezionale."""
client_address = writer.get_extra_info("peername")
print(f"Connessione da {client_address}")
try:
while True:
# Lettura di una riga dallo stream in ingresso
line = await reader.readline()
if not line:
# Il client ha chiuso la connessione
break
message = line.decode("utf-8").strip()
print(f"Ricevuto: {message}")
# Risposta allo stream in uscita
response = f"Echo: {message}\n"
writer.write(response.encode("utf-8"))
await writer.drain() # Attesa che il buffer venga svuotato
except asyncio.IncompleteReadError:
# Connessione interrotta in modo inatteso
pass
finally:
writer.close()
await writer.wait_closed()
print(f"Connessione chiusa: {client_address}")
async def run_echo_server(host: str = "127.0.0.1", port: int = 8888) -> None:
"""Avvia un server echo che gestisce piu connessioni in parallelo."""
server = await asyncio.start_server(handle_connection, host, port)
print(f"Server in ascolto su {host}:{port}")
async with server:
await server.serve_forever()
asyncio.run(run_echo_server())
Stream Binari e Protocolli
Quando si lavora con protocolli di rete o formati binari, è spesso necessario leggere stream binari con attenzione alla struttura dei dati. Python offre il modulo struct per decodificare sequenze di byte secondo formati definiti.
import struct
import io
from dataclasses import dataclass
@dataclass
class PacketHeader:
"""Intestazione di un pacchetto binario personalizzato."""
magic: int # 4 byte, numero magico di identificazione
version: int # 2 byte, versione del protocollo
length: int # 4 byte, lunghezza del payload
# Formato: big-endian, unsigned int (4B), unsigned short (2B), unsigned int (4B)
HEADER_FORMAT = ">IHI"
HEADER_SIZE = struct.calcsize(HEADER_FORMAT)
def read_packet(stream: io.RawIOBase) -> tuple[PacketHeader, bytes] | None:
"""Legge un singolo pacchetto dallo stream binario."""
# Lettura dell'intestazione a dimensione fissa
raw_header = stream.read(HEADER_SIZE)
if len(raw_header) < HEADER_SIZE:
return None # Stream terminato o dati insufficienti
magic, version, payload_length = struct.unpack(HEADER_FORMAT, raw_header)
header = PacketHeader(magic=magic, version=version, length=payload_length)
# Lettura del payload di lunghezza variabile
payload = stream.read(payload_length)
if len(payload) < payload_length:
raise EOFError("Stream terminato prima del completamento del payload")
return header, payload
def write_packet(stream: io.RawIOBase, payload: bytes) -> None:
"""Scrive un singolo pacchetto nello stream binario."""
MAGIC = 0xDEADBEEF
VERSION = 1
# Costruzione dell'intestazione con la dimensione del payload
raw_header = struct.pack(HEADER_FORMAT, MAGIC, VERSION, len(payload))
stream.write(raw_header)
stream.write(payload)
# Esempio di utilizzo con stream in memoria
buffer = io.BytesIO()
write_packet(buffer, b"Hello, stream!")
buffer.seek(0)
result = read_packet(buffer)
if result:
header, payload = result
print(f"Versione: {header.version}, Payload: {payload}")
Trasformazioni e Compressione
Una delle operazioni più comuni sugli stream è la trasformazione dei dati in transito. Python supporta la compressione e decompressione in streaming tramite i moduli zlib, gzip, bz2 e lzma, tutti progettati per operare senza caricare l'intero dataset in memoria.
import gzip
import io
from typing import Iterator
def compress_stream(source: Iterator[bytes]) -> Iterator[bytes]:
"""Comprime i blocchi di uno stream usando gzip in modalita streaming."""
# Utilizzo di un buffer in memoria come destinazione temporanea
output_buffer = io.BytesIO()
compressor = gzip.GzipFile(fileobj=output_buffer, mode="wb")
for chunk in source:
compressor.write(chunk)
# Prelievo dei dati compressi accumulati nel buffer
compressed = output_buffer.getvalue()
if compressed:
output_buffer.seek(0)
output_buffer.truncate()
yield compressed
# Finalizzazione: scrittura del footer gzip
compressor.close()
remaining = output_buffer.getvalue()
if remaining:
yield remaining
def decompress_file_stream(file_path: str, chunk_size: int = 4096) -> Iterator[bytes]:
"""Decomprime un file gzip restituendo i blocchi decodificati come stream."""
with gzip.open(file_path, "rb") as gz_stream:
while True:
chunk = gz_stream.read(chunk_size)
if not chunk:
break
yield chunk
# Compressione di uno stream di testo
def text_to_chunks(text: str, size: int = 64) -> Iterator[bytes]:
"""Suddivide un testo in blocchi binari di dimensione fissa."""
encoded = text.encode("utf-8")
for i in range(0, len(encoded), size):
yield encoded[i:i + size]
source_text = "Questo testo verra compresso in streaming. " * 100
chunks = text_to_chunks(source_text)
compressed_chunks = list(compress_stream(chunks))
total_size = sum(len(c) for c in compressed_chunks)
print(f"Testo originale: {len(source_text)} byte")
print(f"Dopo compressione: {total_size} byte")
Backpressure e Controllo del Flusso
In un sistema a stream, il backpressure (contropressione) è il meccanismo con cui un consumatore lento segnala al produttore di rallentare. Senza un controllo del flusso, un produttore veloce potrebbe saturare la memoria con dati non ancora consumati.
import asyncio
from asyncio import Queue
async def fast_producer(queue: Queue, count: int) -> None:
"""Produce elementi a velocita elevata, rispettando la capacita della coda."""
for i in range(count):
# Attesa se la coda e piena: questo e il meccanismo di backpressure
await queue.put(i)
print(f"Prodotto: {i} (coda: {queue.qsize()}/{queue.maxsize})")
async def slow_consumer(queue: Queue) -> None:
"""Consuma elementi a velocita ridotta simulando un'elaborazione costosa."""
while True:
item = await queue.get()
if item is None:
# Segnale di terminazione ricevuto
break
# Simulazione di un'elaborazione lenta
await asyncio.sleep(0.1)
print(f"Consumato: {item}")
queue.task_done()
async def run_backpressure_demo() -> None:
"""Dimostra il backpressure con una coda a capacita limitata."""
# La coda con capacita massima 5 funge da buffer con backpressure
bounded_queue: Queue[int | None] = asyncio.Queue(maxsize=5)
producer = asyncio.create_task(fast_producer(bounded_queue, count=20))
consumer = asyncio.create_task(slow_consumer(bounded_queue))
# Attesa del completamento della produzione
await producer
# Segnale di fine al consumatore
await bounded_queue.put(None)
await consumer
asyncio.run(run_backpressure_demo())
Stream CSV e JSON Lines
I formati CSV e JSON Lines (NDJSON) si prestano particolarmente bene al trattamento in streaming perché ogni riga è un'unità di dato autonoma. Questo permette di elaborare file di qualsiasi dimensione con un utilizzo di memoria costante.
import csv
import json
import io
from typing import Iterator
def stream_csv_rows(file_path: str) -> Iterator[dict]:
"""Legge un file CSV riga per riga senza caricarlo interamente in memoria."""
with open(file_path, "r", encoding="utf-8", newline="") as csv_file:
reader = csv.DictReader(csv_file)
for row in reader:
yield dict(row)
def stream_jsonl_records(file_path: str) -> Iterator[dict]:
"""Legge un file JSON Lines (un oggetto JSON per riga) come stream."""
with open(file_path, "r", encoding="utf-8") as jsonl_file:
for line in jsonl_file:
line = line.strip()
if line:
try:
yield json.loads(line)
except json.JSONDecodeError as error:
# Riga malformata: registrazione dell'errore e continuazione
print(f"Errore parsing JSON: {error}")
def write_jsonl_stream(records: Iterator[dict], file_path: str) -> int:
"""Scrive un stream di dizionari in formato JSON Lines."""
count = 0
with open(file_path, "w", encoding="utf-8") as output_file:
for record in records:
output_file.write(json.dumps(record, ensure_ascii=False) + "\n")
count += 1
return count
def transform_csv_to_jsonl(
input_path: str,
output_path: str,
transform_fn=None
) -> int:
"""Converte un file CSV in JSON Lines applicando una trasformazione opzionale."""
csv_stream = stream_csv_rows(input_path)
if transform_fn is not None:
# Applicazione della funzione di trasformazione a ciascun record
csv_stream = (transform_fn(row) for row in csv_stream)
return write_jsonl_stream(csv_stream, output_path)
# Esempio: conversione con normalizzazione dei campi
def normalize_record(record: dict) -> dict:
"""Normalizza i valori stringa e converte i numeri nei campi numerici."""
return {
key: value.strip().lower() if isinstance(value, str) else value
for key, value in record.items()
}
written = transform_csv_to_jsonl("input.csv", "output.jsonl", normalize_record)
print(f"Convertiti {written} record")
Misurazione delle Prestazioni
Uno dei principali vantaggi dell'approccio a stream è il ridotto consumo di memoria. Per comprendere concretamente questa differenza, è utile confrontare l'elaborazione tradizionale basata su liste con quella a stream.
import tracemalloc
import time
from typing import Iterator
def sum_with_list(n: int) -> int:
"""Calcola la somma dei primi n numeri caricandoli tutti in memoria."""
# Creazione di una lista completa in memoria
numbers = list(range(n))
return sum(numbers)
def sum_with_generator(n: int) -> int:
"""Calcola la somma dei primi n numeri usando un generatore."""
# Il generatore produce i valori uno alla volta
return sum(range(n))
def measure_memory(fn, *args) -> tuple[int, float]:
"""Misura il picco di memoria e il tempo di esecuzione di una funzione."""
tracemalloc.start()
start_time = time.perf_counter()
result = fn(*args)
elapsed = time.perf_counter() - start_time
_, peak_memory = tracemalloc.get_traced_memory()
tracemalloc.stop()
return peak_memory, elapsed
N = 10_000_000
memory_list, time_list = measure_memory(sum_with_list, N)
memory_gen, time_gen = measure_memory(sum_with_generator, N)
print(f"Lista - Memoria: {memory_list / 1_048_576:.1f} MB, Tempo: {time_list:.3f} s")
print(f"Generatore - Memoria: {memory_gen / 1_048_576:.1f} MB, Tempo: {time_gen:.3f} s")
print(f"Riduzione memoria: {memory_list / max(memory_gen, 1):.0f}x")
Su un sistema tipico, il risultato mostra che il generatore utilizza una quantità di memoria trascurabile rispetto alla lista, indipendentemente dal valore di N, mentre i tempi di esecuzione rimangono comparabili.
Conclusione
Gli stream rappresentano un paradigma fondamentale della programmazione Python per chiunque voglia scrivere codice efficiente e scalabile. Dalla semplicità dei generatori alle complessità degli stream asincroni TCP, Python offre strumenti adeguati per ogni livello di complessità.
I principi chiave da tenere a mente sono: preferire l'elaborazione lazy quando possibile, comporre trasformazioni in pipeline, gestire il backpressure nelle architetture produttore-consumatore, e sfruttare asyncio per operazioni di I/O concorrenti senza bloccare il thread principale.
La padronanza degli stream non è solo una questione di prestazioni: è una forma mentis che porta a scrivere codice più componibile, testabile e leggibile, dove i dati fluiscono attraverso trasformazioni ben definite invece di accumularsi in strutture monolitiche.