Stream con Python

Nella programmazione moderna, la gestione efficiente dei dati è un requisito fondamentale. Che si tratti di elaborare file di grandi dimensioni, leggere dati da una rete, o costruire pipeline di trasformazione, il concetto di stream (flusso) rappresenta uno degli strumenti più potenti a disposizione del programmatore Python. Uno stream è essenzialmente una sequenza di dati resi disponibili nel tempo, che possono essere letti o scritti in modo incrementale, senza la necessità di caricare l'intero contenuto in memoria.

Python offre un ecosistema ricco per lavorare con gli stream, dal modulo built-in io fino a librerie specializzate per stream asincroni come asyncio. In questo articolo esploreremo i concetti fondamentali, i tipi di stream disponibili, le tecniche di composizione e i casi d'uso pratici.

Cosa sono gli Stream

Un stream è un'astrazione che rappresenta una sorgente o destinazione di dati sequenziali. A differenza di una struttura dati statica come una lista o un dizionario, uno stream non richiede che tutti i dati siano presenti in memoria contemporaneamente. I dati vengono prodotti e consumati in modo continuo, elemento dopo elemento o blocco dopo blocco.

Gli stream si distinguono in due categorie principali:

  • Stream di lettura (readable streams): sorgenti da cui si leggono dati, come un file aperto, una connessione di rete, o un generatore.
  • Stream di scrittura (writable streams): destinazioni verso cui si scrivono dati, come un file, un buffer in memoria, o un socket.

Esistono anche gli stream bidirezionali, che supportano sia la lettura che la scrittura, e gli stream di trasformazione, che leggono dati da una sorgente, li elaborano e li inviano a una destinazione.

Il Modulo io

Il modulo io è il cuore della gestione degli stream in Python. Fornisce le classi base per tutti i tipi di stream e definisce l'interfaccia standard che ogni oggetto file-like deve implementare.

Gerarchia delle classi

Il modulo io organizza gli stream in tre famiglie principali:

  • Raw I/O: accesso diretto al sistema operativo, senza buffering.
  • Buffered I/O: aggiunge un buffer in memoria per migliorare le prestazioni.
  • Text I/O: gestisce la codifica e decodifica del testo.
import io

# Stream binario in memoria
binary_stream = io.BytesIO(b"Dati binari di esempio")

# Lettura dal buffer in memoria
data = binary_stream.read()
print(data)  # b'Dati binari di esempio'

# Riposizionamento del cursore all'inizio
binary_stream.seek(0)

# Lettura di una parte dei dati
chunk = binary_stream.read(5)
print(chunk)  # b'Dati '

# Stream testuale in memoria
text_stream = io.StringIO("Testo di esempio\nSeconda riga")

# Lettura riga per riga
for line in text_stream:
    print(line.strip())

BytesIO e StringIO sono particolarmente utili nei test, quando si vuole simulare la presenza di un file senza crearlo fisicamente sul disco.

Stream su File

La funzione built-in open() restituisce un oggetto stream che appartiene alla gerarchia definita da io. A seconda delle modalità di apertura, si ottiene un'istanza di RawIOBase, BufferedIOBase, o TextIOBase.

import io

def read_file_in_chunks(file_path, chunk_size=4096):
    """Legge un file binario a blocchi di dimensione fissa."""
    with open(file_path, "rb") as file_stream:
        # Verifica il tipo di stream restituito
        print(type(file_stream))  # 

        while True:
            chunk = file_stream.read(chunk_size)
            if not chunk:
                # Fine del file raggiunta
                break
            yield chunk

def write_lines_to_file(file_path, lines):
    """Scrive una sequenza di righe in un file di testo."""
    with open(file_path, "w", encoding="utf-8") as text_stream:
        for line in lines:
            text_stream.write(line + "\n")

# Lettura a blocchi di un file
for block in read_file_in_chunks("data.bin"):
    process(block)

Generatori come Stream

In Python, i generatori rappresentano uno dei meccanismi più eleganti per implementare stream di dati. Un generatore è una funzione che utilizza la keyword yield per produrre valori uno alla volta, sospendendo la propria esecuzione tra una produzione e l'altra. Questo li rende naturalmente adatti a modellare sequenze potenzialmente infinite o molto grandi.

from typing import Generator, Iterator
import time

def fibonacci_stream() -> Generator[int, None, None]:
    """Genera la sequenza di Fibonacci come stream infinito."""
    first, second = 0, 1
    while True:
        yield first
        # Aggiornamento dei due valori correnti
        first, second = second, first + second

def take(n: int, stream: Iterator) -> list:
    """Prende i primi n elementi da uno stream."""
    result = []
    for _ in range(n):
        result.append(next(stream))
    return result

def throttled_stream(source: Iterator, delay: float) -> Generator:
    """Introduce un ritardo tra gli elementi di uno stream."""
    for item in source:
        time.sleep(delay)
        yield item

# Utilizzo della sequenza di Fibonacci
fib = fibonacci_stream()
print(take(10, fib))  # [0, 1, 1, 2, 3, 5, 8, 13, 21, 34]

# Stream rallentato
slow_fib = throttled_stream(fibonacci_stream(), delay=0.1)
for value in take(5, slow_fib):
    print(value)

Pipeline di Generatori

I generatori si compongono naturalmente in pipeline, dove l'output di uno stream diventa l'input del successivo. Questo pattern è molto efficiente dal punto di vista della memoria, perché ogni elemento viene elaborato senza che l'intera sequenza venga mai materializzata.

from typing import Iterator

def read_lines(file_path: str) -> Iterator[str]:
    """Legge le righe di un file come stream."""
    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            yield line.rstrip("\n")

def filter_empty(lines: Iterator[str]) -> Iterator[str]:
    """Filtra le righe vuote dallo stream."""
    for line in lines:
        if line.strip():
            yield line

def normalize_case(lines: Iterator[str]) -> Iterator[str]:
    """Converte ogni riga in minuscolo."""
    for line in lines:
        yield line.lower()

def add_line_numbers(lines: Iterator[str]) -> Iterator[tuple[int, str]]:
    """Aggiunge un numero progressivo a ciascuna riga."""
    for number, line in enumerate(lines, start=1):
        yield number, line

# Composizione della pipeline di trasformazione
def build_pipeline(file_path: str):
    raw_lines = read_lines(file_path)
    non_empty = filter_empty(raw_lines)
    normalized = normalize_case(non_empty)
    numbered = add_line_numbers(normalized)
    return numbered

# Esecuzione della pipeline
for line_number, content in build_pipeline("input.txt"):
    print(f"{line_number:4d}: {content}")

Il bello di questa architettura è che nessuno dei generatori esegue lavoro finché non viene richiesto un elemento. L'intera pipeline è lazy: i dati fluiscono attraverso le trasformazioni solo quando il codice consumatore richiede il prossimo elemento.

Stream con itertools

Il modulo itertools della libreria standard fornisce una collezione di funzioni per lavorare con iteratori e stream in modo efficiente. Tutte le sue funzioni operano in modo lazy e si integrano perfettamente con i generatori.

import itertools
from typing import Iterator

def windowed(stream: Iterator, size: int) -> Iterator[tuple]:
    """Produce finestre scorrevoli di dimensione fissa sullo stream."""
    # Creazione di copie indipendenti dell'iteratore
    iterators = itertools.tee(stream, size)
    # Avanzamento di ciascuna copia di un passo diverso
    for index, iterator in enumerate(iterators):
        for _ in range(index):
            next(iterator, None)
    return zip(*iterators)

def batch(stream: Iterator, size: int) -> Iterator[list]:
    """Raggruppa gli elementi dello stream in lotti di dimensione fissa."""
    while True:
        chunk = list(itertools.islice(stream, size))
        if not chunk:
            break
        yield chunk

def interleave(*streams: Iterator) -> Iterator:
    """Alterna gli elementi di piu stream in sequenza."""
    return itertools.chain.from_iterable(zip(*streams))

# Finestre scorrevoli
numbers = iter(range(10))
for window in windowed(numbers, 3):
    print(window)  # (0,1,2), (1,2,3), ...

# Raggruppamento in lotti
data = iter(range(10))
for group in batch(data, 3):
    print(group)  # [0,1,2], [3,4,5], [6,7,8], [9]

# Alternanza di stream
evens = iter([0, 2, 4, 6])
odds = iter([1, 3, 5, 7])
for value in interleave(evens, odds):
    print(value)  # 0, 1, 2, 3, 4, 5, 6, 7

Stream Asincroni con asyncio

Python 3.6 ha introdotto i protocolli per gli stream asincroni: AsyncIterable e AsyncIterator. Con async for e i generatori asincroni, è possibile costruire pipeline di stream che non bloccano il thread di esecuzione durante operazioni di I/O.

Generatori Asincroni

import asyncio
import aiohttp
from typing import AsyncIterator

async def fetch_pages(urls: list[str]) -> AsyncIterator[bytes]:
    """Scarica le pagine indicate dagli URL come stream asincrono."""
    async with aiohttp.ClientSession() as session:
        for url in urls:
            async with session.get(url) as response:
                # Attesa asincrona della risposta
                content = await response.read()
                yield content

async def chunked_response(url: str, chunk_size: int = 1024) -> AsyncIterator[bytes]:
    """Legge una risposta HTTP a blocchi senza caricarla interamente in memoria."""
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            async for chunk in response.content.iter_chunked(chunk_size):
                yield chunk

async def process_stream():
    """Elabora uno stream di pagine scaricate in modo asincrono."""
    urls = [
        "https://example.com/page1",
        "https://example.com/page2",
    ]

    total_bytes = 0
    async for page_content in fetch_pages(urls):
        total_bytes += len(page_content)
        # Elaborazione del contenuto
        print(f"Ricevuti {len(page_content)} byte")

    print(f"Totale: {total_bytes} byte")

asyncio.run(process_stream())

Stream TCP con asyncio.streams

Il modulo asyncio fornisce le classi StreamReader e StreamWriter per la gestione di connessioni TCP come stream bidirezionali.

import asyncio

async def handle_connection(
    reader: asyncio.StreamReader,
    writer: asyncio.StreamWriter
) -> None:
    """Gestisce una singola connessione client come stream bidirezionale."""
    client_address = writer.get_extra_info("peername")
    print(f"Connessione da {client_address}")

    try:
        while True:
            # Lettura di una riga dallo stream in ingresso
            line = await reader.readline()
            if not line:
                # Il client ha chiuso la connessione
                break

            message = line.decode("utf-8").strip()
            print(f"Ricevuto: {message}")

            # Risposta allo stream in uscita
            response = f"Echo: {message}\n"
            writer.write(response.encode("utf-8"))
            await writer.drain()  # Attesa che il buffer venga svuotato

    except asyncio.IncompleteReadError:
        # Connessione interrotta in modo inatteso
        pass
    finally:
        writer.close()
        await writer.wait_closed()
        print(f"Connessione chiusa: {client_address}")

async def run_echo_server(host: str = "127.0.0.1", port: int = 8888) -> None:
    """Avvia un server echo che gestisce piu connessioni in parallelo."""
    server = await asyncio.start_server(handle_connection, host, port)
    print(f"Server in ascolto su {host}:{port}")

    async with server:
        await server.serve_forever()

asyncio.run(run_echo_server())

Stream Binari e Protocolli

Quando si lavora con protocolli di rete o formati binari, è spesso necessario leggere stream binari con attenzione alla struttura dei dati. Python offre il modulo struct per decodificare sequenze di byte secondo formati definiti.

import struct
import io
from dataclasses import dataclass

@dataclass
class PacketHeader:
    """Intestazione di un pacchetto binario personalizzato."""
    magic: int       # 4 byte, numero magico di identificazione
    version: int     # 2 byte, versione del protocollo
    length: int      # 4 byte, lunghezza del payload

# Formato: big-endian, unsigned int (4B), unsigned short (2B), unsigned int (4B)
HEADER_FORMAT = ">IHI"
HEADER_SIZE = struct.calcsize(HEADER_FORMAT)

def read_packet(stream: io.RawIOBase) -> tuple[PacketHeader, bytes] | None:
    """Legge un singolo pacchetto dallo stream binario."""
    # Lettura dell'intestazione a dimensione fissa
    raw_header = stream.read(HEADER_SIZE)
    if len(raw_header) < HEADER_SIZE:
        return None  # Stream terminato o dati insufficienti

    magic, version, payload_length = struct.unpack(HEADER_FORMAT, raw_header)
    header = PacketHeader(magic=magic, version=version, length=payload_length)

    # Lettura del payload di lunghezza variabile
    payload = stream.read(payload_length)
    if len(payload) < payload_length:
        raise EOFError("Stream terminato prima del completamento del payload")

    return header, payload

def write_packet(stream: io.RawIOBase, payload: bytes) -> None:
    """Scrive un singolo pacchetto nello stream binario."""
    MAGIC = 0xDEADBEEF
    VERSION = 1

    # Costruzione dell'intestazione con la dimensione del payload
    raw_header = struct.pack(HEADER_FORMAT, MAGIC, VERSION, len(payload))
    stream.write(raw_header)
    stream.write(payload)

# Esempio di utilizzo con stream in memoria
buffer = io.BytesIO()
write_packet(buffer, b"Hello, stream!")
buffer.seek(0)

result = read_packet(buffer)
if result:
    header, payload = result
    print(f"Versione: {header.version}, Payload: {payload}")

Trasformazioni e Compressione

Una delle operazioni più comuni sugli stream è la trasformazione dei dati in transito. Python supporta la compressione e decompressione in streaming tramite i moduli zlib, gzip, bz2 e lzma, tutti progettati per operare senza caricare l'intero dataset in memoria.

import gzip
import io
from typing import Iterator

def compress_stream(source: Iterator[bytes]) -> Iterator[bytes]:
    """Comprime i blocchi di uno stream usando gzip in modalita streaming."""
    # Utilizzo di un buffer in memoria come destinazione temporanea
    output_buffer = io.BytesIO()
    compressor = gzip.GzipFile(fileobj=output_buffer, mode="wb")

    for chunk in source:
        compressor.write(chunk)
        # Prelievo dei dati compressi accumulati nel buffer
        compressed = output_buffer.getvalue()
        if compressed:
            output_buffer.seek(0)
            output_buffer.truncate()
            yield compressed

    # Finalizzazione: scrittura del footer gzip
    compressor.close()
    remaining = output_buffer.getvalue()
    if remaining:
        yield remaining

def decompress_file_stream(file_path: str, chunk_size: int = 4096) -> Iterator[bytes]:
    """Decomprime un file gzip restituendo i blocchi decodificati come stream."""
    with gzip.open(file_path, "rb") as gz_stream:
        while True:
            chunk = gz_stream.read(chunk_size)
            if not chunk:
                break
            yield chunk

# Compressione di uno stream di testo
def text_to_chunks(text: str, size: int = 64) -> Iterator[bytes]:
    """Suddivide un testo in blocchi binari di dimensione fissa."""
    encoded = text.encode("utf-8")
    for i in range(0, len(encoded), size):
        yield encoded[i:i + size]

source_text = "Questo testo verra compresso in streaming. " * 100
chunks = text_to_chunks(source_text)
compressed_chunks = list(compress_stream(chunks))
total_size = sum(len(c) for c in compressed_chunks)
print(f"Testo originale: {len(source_text)} byte")
print(f"Dopo compressione: {total_size} byte")

Backpressure e Controllo del Flusso

In un sistema a stream, il backpressure (contropressione) è il meccanismo con cui un consumatore lento segnala al produttore di rallentare. Senza un controllo del flusso, un produttore veloce potrebbe saturare la memoria con dati non ancora consumati.

import asyncio
from asyncio import Queue

async def fast_producer(queue: Queue, count: int) -> None:
    """Produce elementi a velocita elevata, rispettando la capacita della coda."""
    for i in range(count):
        # Attesa se la coda e piena: questo e il meccanismo di backpressure
        await queue.put(i)
        print(f"Prodotto: {i} (coda: {queue.qsize()}/{queue.maxsize})")

async def slow_consumer(queue: Queue) -> None:
    """Consuma elementi a velocita ridotta simulando un'elaborazione costosa."""
    while True:
        item = await queue.get()
        if item is None:
            # Segnale di terminazione ricevuto
            break
        # Simulazione di un'elaborazione lenta
        await asyncio.sleep(0.1)
        print(f"Consumato: {item}")
        queue.task_done()

async def run_backpressure_demo() -> None:
    """Dimostra il backpressure con una coda a capacita limitata."""
    # La coda con capacita massima 5 funge da buffer con backpressure
    bounded_queue: Queue[int | None] = asyncio.Queue(maxsize=5)

    producer = asyncio.create_task(fast_producer(bounded_queue, count=20))
    consumer = asyncio.create_task(slow_consumer(bounded_queue))

    # Attesa del completamento della produzione
    await producer
    # Segnale di fine al consumatore
    await bounded_queue.put(None)
    await consumer

asyncio.run(run_backpressure_demo())

Stream CSV e JSON Lines

I formati CSV e JSON Lines (NDJSON) si prestano particolarmente bene al trattamento in streaming perché ogni riga è un'unità di dato autonoma. Questo permette di elaborare file di qualsiasi dimensione con un utilizzo di memoria costante.

import csv
import json
import io
from typing import Iterator

def stream_csv_rows(file_path: str) -> Iterator[dict]:
    """Legge un file CSV riga per riga senza caricarlo interamente in memoria."""
    with open(file_path, "r", encoding="utf-8", newline="") as csv_file:
        reader = csv.DictReader(csv_file)
        for row in reader:
            yield dict(row)

def stream_jsonl_records(file_path: str) -> Iterator[dict]:
    """Legge un file JSON Lines (un oggetto JSON per riga) come stream."""
    with open(file_path, "r", encoding="utf-8") as jsonl_file:
        for line in jsonl_file:
            line = line.strip()
            if line:
                try:
                    yield json.loads(line)
                except json.JSONDecodeError as error:
                    # Riga malformata: registrazione dell'errore e continuazione
                    print(f"Errore parsing JSON: {error}")

def write_jsonl_stream(records: Iterator[dict], file_path: str) -> int:
    """Scrive un stream di dizionari in formato JSON Lines."""
    count = 0
    with open(file_path, "w", encoding="utf-8") as output_file:
        for record in records:
            output_file.write(json.dumps(record, ensure_ascii=False) + "\n")
            count += 1
    return count

def transform_csv_to_jsonl(
    input_path: str,
    output_path: str,
    transform_fn=None
) -> int:
    """Converte un file CSV in JSON Lines applicando una trasformazione opzionale."""
    csv_stream = stream_csv_rows(input_path)

    if transform_fn is not None:
        # Applicazione della funzione di trasformazione a ciascun record
        csv_stream = (transform_fn(row) for row in csv_stream)

    return write_jsonl_stream(csv_stream, output_path)

# Esempio: conversione con normalizzazione dei campi
def normalize_record(record: dict) -> dict:
    """Normalizza i valori stringa e converte i numeri nei campi numerici."""
    return {
        key: value.strip().lower() if isinstance(value, str) else value
        for key, value in record.items()
    }

written = transform_csv_to_jsonl("input.csv", "output.jsonl", normalize_record)
print(f"Convertiti {written} record")

Misurazione delle Prestazioni

Uno dei principali vantaggi dell'approccio a stream è il ridotto consumo di memoria. Per comprendere concretamente questa differenza, è utile confrontare l'elaborazione tradizionale basata su liste con quella a stream.

import tracemalloc
import time
from typing import Iterator

def sum_with_list(n: int) -> int:
    """Calcola la somma dei primi n numeri caricandoli tutti in memoria."""
    # Creazione di una lista completa in memoria
    numbers = list(range(n))
    return sum(numbers)

def sum_with_generator(n: int) -> int:
    """Calcola la somma dei primi n numeri usando un generatore."""
    # Il generatore produce i valori uno alla volta
    return sum(range(n))

def measure_memory(fn, *args) -> tuple[int, float]:
    """Misura il picco di memoria e il tempo di esecuzione di una funzione."""
    tracemalloc.start()
    start_time = time.perf_counter()

    result = fn(*args)

    elapsed = time.perf_counter() - start_time
    _, peak_memory = tracemalloc.get_traced_memory()
    tracemalloc.stop()

    return peak_memory, elapsed

N = 10_000_000

memory_list, time_list = measure_memory(sum_with_list, N)
memory_gen, time_gen = measure_memory(sum_with_generator, N)

print(f"Lista    - Memoria: {memory_list / 1_048_576:.1f} MB, Tempo: {time_list:.3f} s")
print(f"Generatore - Memoria: {memory_gen / 1_048_576:.1f} MB, Tempo: {time_gen:.3f} s")
print(f"Riduzione memoria: {memory_list / max(memory_gen, 1):.0f}x")

Su un sistema tipico, il risultato mostra che il generatore utilizza una quantità di memoria trascurabile rispetto alla lista, indipendentemente dal valore di N, mentre i tempi di esecuzione rimangono comparabili.

Conclusione

Gli stream rappresentano un paradigma fondamentale della programmazione Python per chiunque voglia scrivere codice efficiente e scalabile. Dalla semplicità dei generatori alle complessità degli stream asincroni TCP, Python offre strumenti adeguati per ogni livello di complessità.

I principi chiave da tenere a mente sono: preferire l'elaborazione lazy quando possibile, comporre trasformazioni in pipeline, gestire il backpressure nelle architetture produttore-consumatore, e sfruttare asyncio per operazioni di I/O concorrenti senza bloccare il thread principale.

La padronanza degli stream non è solo una questione di prestazioni: è una forma mentis che porta a scrivere codice più componibile, testabile e leggibile, dove i dati fluiscono attraverso trasformazioni ben definite invece di accumularsi in strutture monolitiche.

Torna su