Stream con Go

Go non possiede un'astrazione nativa chiamata "stream" nel senso in cui la intendono linguaggi come Java o Rust, eppure la piattaforma offre tutti i mattoni necessari per costruire pipeline di dati efficienti, componibili e concorrenti. L'interfaccia io.Reader e io.Writer, i canali (channel), le goroutine e il package bufio costituiscono insieme un ecosistema coerente che permette di trattare qualsiasi sorgente o destinazione di dati come uno stream.

Questo articolo esplora in profondità come funzionano gli stream in Go, partendo dalle interfacce fondamentali, passando per la composizione di reader e writer, fino ad arrivare agli stream basati su canali e alle tecniche per gestire flussi di dati di grandi dimensioni senza saturare la memoria.

Le interfacce fondamentali: io.Reader e io.Writer

Il nucleo del modello di streaming in Go è costituito da due interfacce minimaliste definite nel package io:

// Definizione delle interfacce fondamentali per lo streaming
type Reader interface {
    Read(buf []byte) (n int, err error)
}

type Writer interface {
    Write(buf []byte) (n int, err error)
}

La semplicità è intenzionale. Un Reader sa soltanto leggere byte in un buffer; un Writer sa soltanto scrivere byte da un buffer. Questa minimalità consente a centinaia di tipi diversi, dai file ai socket di rete, dai buffer in memoria alle risposte HTTP, di implementare la stessa interfaccia e di essere trattati in modo uniforme.

Il contratto di Read merita attenzione: il metodo restituisce il numero di byte effettivamente letti e un eventuale errore. Quando lo stream è esaurito, restituisce 0, io.EOF. Un'implementazione corretta deve essere pronta a ricevere meno byte di quanti ne abbia richiesti, anche se il buffer era sufficientemente grande.

Un Reader personalizzato

Implementare un Reader personalizzato è semplice. Il seguente esempio produce una sequenza infinita di byte ciclando su un pattern prefissato:

package main

import (
    "fmt"
    "io"
)

// CyclicReader produce ciclicamente i byte di un pattern dato
type CyclicReader struct {
    pattern []byte
    offset  int
}

// Read riempie il buffer con i byte del pattern, ciclando dall'inizio quando necessario
func (r *CyclicReader) Read(buf []byte) (int, error) {
    for i := range buf {
        buf[i] = r.pattern[r.offset%len(r.pattern)]
        r.offset++
    }
    return len(buf), nil
}

func main() {
    // Crea un reader che cicla sul pattern "ABCD"
    reader := &CyclicReader{pattern: []byte("ABCD")}

    // Legge i primi 10 byte dallo stream infinito
    buf := make([]byte, 10)
    n, err := reader.Read(buf)
    if err != nil && err != io.EOF {
        panic(err)
    }
    fmt.Printf("Letti %d byte: %s\n", n, buf[:n])
    // Stampa: Letti 10 byte: ABCDABCDAB
}

Copiare stream con io.Copy

La funzione io.Copy rappresenta il modo idiomatico di trasferire dati da un Reader a un Writer. Internamente alloca un buffer di 32 KB e ripete il ciclo read-write fino a EOF o errore. Se il Writer implementa io.ReaderFrom o il Reader implementa io.WriterTo, la funzione delega l'operazione a questi metodi, che possono sfruttare ottimizzazioni a livello di sistema operativo come sendfile.

package main

import (
    "io"
    "os"
    "strings"
)

func main() {
    // Crea un reader da una stringa in memoria
    src := strings.NewReader("Contenuto dello stream di esempio\n")

    // Copia lo stream direttamente sullo stdout
    written, err := io.Copy(os.Stdout, src)
    if err != nil {
        panic(err)
    }
    _ = written
}

Per copiare esattamente n byte si usa io.CopyN; per copiare usando un buffer fornito dall'utente si usa io.CopyBuffer, utile quando si vuole controllare le allocazioni.

Composizione con io.TeeReader e io.MultiWriter

Go favorisce la composizione. io.TeeReader crea un Reader che, mentre viene letto, scrive ogni byte anche su un Writer aggiuntivo. io.MultiWriter fa il contrario: ogni scrittura viene propagata a più writer contemporaneamente.

package main

import (
    "bytes"
    "crypto/sha256"
    "fmt"
    "io"
    "strings"
)

func main() {
    // Sorgente dati
    src := strings.NewReader("dati importanti da verificare")

    // Hasher per calcolare il digest dello stream
    hasher := sha256.New()

    // Buffer per conservare una copia dei dati
    var copyBuf bytes.Buffer

    // TeeReader: legge da src e scrive contemporaneamente in hasher
    tee := io.TeeReader(src, hasher)

    // Copia il tutto nel buffer locale
    if _, err := io.Copy(©Buf, tee); err != nil {
        panic(err)
    }

    fmt.Printf("Dati: %s\n", copyBuf.String())
    fmt.Printf("SHA-256: %x\n", hasher.Sum(nil))
}
package main

import (
    "io"
    "os"
)

func main() {
    // MultiWriter: ogni scrittura va sia su stdout che su stderr
    multi := io.MultiWriter(os.Stdout, os.Stderr)
    _, _ = fmt.Fprintln(multi, "Questo messaggio appare su entrambi gli output")
}

Lettura bufferizzata con bufio

Leggere un byte alla volta da un Reader che esegue una syscall per ogni accesso è estremamente inefficiente. Il package bufio risolve il problema avvolgendo qualsiasi Reader in un layer che legge in anticipo un blocco di byte e li serve dalla memoria.

package main

import (
    "bufio"
    "fmt"
    "strings"
)

func main() {
    raw := strings.NewReader("prima riga\nseconda riga\nterza riga\n")

    // Avvolge il reader con un buffer da 4096 byte
    scanner := bufio.NewScanner(raw)

    lineNum := 0
    for scanner.Scan() {
        lineNum++
        // Stampa ogni riga con il suo numero
        fmt.Printf("%d: %s\n", lineNum, scanner.Text())
    }

    if err := scanner.Err(); err != nil {
        panic(err)
    }
}

bufio.Scanner è particolarmente comodo per testo strutturato per righe. Per stream binari o per leggere token di dimensione arbitraria, bufio.Reader espone metodi come ReadByte, ReadRune, ReadString e ReadSlice.

Scanner con token personalizzati

La funzione di split dello scanner è configurabile tramite scanner.Split. L'esempio seguente spezza lo stream in parole separate da qualsiasi spazio bianco:

package main

import (
    "bufio"
    "fmt"
    "strings"
)

func main() {
    src := strings.NewReader("Go è un linguaggio semplice e potente")
    scanner := bufio.NewScanner(src)

    // Usa la funzione di split predefinita per le parole
    scanner.Split(bufio.ScanWords)

    var words []string
    for scanner.Scan() {
        words = append(words, scanner.Text())
    }
    fmt.Println(words)
    // Stampa: [Go è un linguaggio semplice e potente]
}

Stream basati su canali e goroutine

Quando la sorgente di dati è asincrona, ad esempio eventi di un sensore o messaggi da una coda, l'idioma Go preferito è inviare i dati su un canale da una goroutine produttrice e consumarli su un'altra goroutine ricevitrice. Questo schema si chiama pipeline.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// generate produce numeri casuali a intervalli regolari finché il canale done non viene chiuso
func generate(done <-chan struct{}) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        rng := rand.New(rand.NewSource(time.Now().UnixNano()))
        for {
            select {
            case <-done:
                return
            case out <- rng.Intn(100):
            }
        }
    }()
    return out
}

// square eleva al quadrato ogni valore ricevuto
func square(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for v := range in {
            select {
            case <-done:
                return
            case out <- v * v:
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})
    defer close(done)

    // Costruisce la pipeline: genera -> eleva al quadrato
    nums := generate(done)
    squares := square(done, nums)

    // Consuma i primi cinque risultati
    for i := 0; i < 5; i++ {
        fmt.Println(<-squares)
    }
}

Il canale done è il meccanismo di cancellazione: chiuderlo propaga il segnale di stop a tutte le goroutine della pipeline, evitando goroutine zombie. Questa tecnica è alla base del package context, che ne formalizza e generalizza il pattern.

Pipeline con fan-out e fan-in

Quando uno stadio della pipeline è computazionalmente costoso, è vantaggioso avviare più goroutine che elaborano lo stesso canale di input in parallelo (fan-out) e raccogliere i risultati in un unico canale di output (fan-in).

package main

import (
    "fmt"
    "sync"
)

// merge fonde più canali in uno solo
func merge(done <-chan struct{}, channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    merged := make(chan int)

    // Avvia una goroutine per ogni canale in ingresso
    forward := func(ch <-chan int) {
        defer wg.Done()
        for v := range ch {
            select {
            case <-done:
                return
            case merged <- v:
            }
        }
    }

    wg.Add(len(channels))
    for _, ch := range channels {
        go forward(ch)
    }

    // Chiude il canale merged quando tutti i forwarder hanno terminato
    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

// slowProcess simula un'elaborazione lenta
func slowProcess(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for v := range in {
            // Simula lavoro intensivo
            result := v * v
            select {
            case <-done:
                return
            case out <- result:
            }
        }
    }()
    return out
}

func main() {
    done := make(chan struct{})
    defer close(done)

    // Sorgente
    source := make(chan int, 10)
    for i := 1; i <= 10; i++ {
        source <- i
    }
    close(source)

    // Fan-out: tre worker in parallelo
    w1 := slowProcess(done, source)
    w2 := slowProcess(done, source)
    w3 := slowProcess(done, source)

    // Fan-in: raccoglie i risultati in ordine arbitrario
    for v := range merge(done, w1, w2, w3) {
        fmt.Println(v)
    }
}

Stream HTTP con io.Pipe

io.Pipe crea una coppia sincrona reader/writer connessa da un buffer interno a dimensione zero. Ogni scrittura blocca finché non viene consumata da una lettura corrispondente. Questo rende io.Pipe ideale per connettere un produttore e un consumatore che operano su goroutine separate, senza allocare un buffer intermedio.

package main

import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
)

// Record rappresenta un singolo record di dati
type Record struct {
    ID    int    `json:"id"`
    Value string `json:"value"`
}

// streamRecords scrive record JSON sullo writer in modo incrementale
func streamRecords(w io.Writer) error {
    encoder := json.NewEncoder(w)
    records := []Record{
        {ID: 1, Value: "alpha"},
        {ID: 2, Value: "beta"},
        {ID: 3, Value: "gamma"},
    }
    for _, rec := range records {
        if err := encoder.Encode(rec); err != nil {
            return err
        }
    }
    return nil
}

func handleStream(resp http.ResponseWriter, req *http.Request) {
    pr, pw := io.Pipe()

    // Goroutine produttrice: scrive sul pipe writer
    go func() {
        err := streamRecords(pw)
        // Chiude il writer comunicando eventuale errore al reader
        pw.CloseWithError(err)
    }()

    resp.Header().Set("Content-Type", "application/x-ndjson")
    resp.Header().Set("Transfer-Encoding", "chunked")

    // Copia i dati dal pipe reader verso il client HTTP
    if _, err := io.Copy(resp, pr); err != nil {
        fmt.Printf("Errore durante la copia: %v\n", err)
    }
}

func main() {
    http.HandleFunc("/stream", handleStream)
    fmt.Println("Server in ascolto su :8080")
    if err := http.ListenAndServe(":8080", nil); err != nil {
        panic(err)
    }
}

Limitare e controllare il flusso

In contesti reali è spesso necessario imporre limiti alla velocità con cui i dati fluiscono, sia per proteggere risorse condivise, sia per rispettare quote di API esterne. La tecnica standard in Go è il token bucket, implementato dal package golang.org/x/time/rate.

package main

import (
    "context"
    "fmt"
    "time"

    "golang.org/x/time/rate"
)

// ThrottledReader avvolge un reader e limita la velocità di lettura
type ThrottledReader struct {
    source  []byte
    limiter *rate.Limiter
    offset  int
}

// Read legge dal reader interno rispettando il rate limiter
func (r *ThrottledReader) Read(buf []byte) (int, error) {
    if r.offset >= len(r.source) {
        return 0, fmt.Errorf("EOF")
    }

    // Attende il permesso dal rate limiter prima di procedere
    ctx := context.Background()
    if err := r.limiter.Wait(ctx); err != nil {
        return 0, err
    }

    chunk := len(buf)
    if r.offset+chunk > len(r.source) {
        chunk = len(r.source) - r.offset
    }
    copy(buf[:chunk], r.source[r.offset:r.offset+chunk])
    r.offset += chunk
    return chunk, nil
}

func main() {
    data := []byte("stream di dati da leggere a velocità controllata")

    // Limita a 10 token al secondo con un burst massimo di 5
    limiter := rate.NewLimiter(rate.Limit(10), 5)

    reader := &ThrottledReader{source: data, limiter: limiter}

    buf := make([]byte, 5)
    start := time.Now()
    total := 0
    for {
        n, err := reader.Read(buf)
        total += n
        if err != nil {
            break
        }
    }
    fmt.Printf("Letti %d byte in %v\n", total, time.Since(start).Round(time.Millisecond))
}

Compressione e decompressione on-the-fly

Poiché compress/gzip implementa le interfacce io.Reader e io.Writer, è possibile inserire la compressione e la decompressione direttamente nella pipeline senza buffer intermedi in memoria.

package main

import (
    "bytes"
    "compress/gzip"
    "fmt"
    "io"
    "strings"
)

func compress(src io.Reader) (io.Reader, error) {
    var buf bytes.Buffer

    // gzip.Writer scrive dati compressi nel buffer
    gz := gzip.NewWriter(&buf)
    if _, err := io.Copy(gz, src); err != nil {
        return nil, err
    }
    // Flush e chiusura sono necessari per completare il blocco gzip
    if err := gz.Close(); err != nil {
        return nil, err
    }
    return &buf, nil
}

func decompress(src io.Reader) (io.Reader, error) {
    // gzip.Reader decomprime on-the-fly mentre viene letto
    return gzip.NewReader(src)
}

func main() {
    original := "Questo testo verrà compresso e poi decompresso interamente via stream."

    // Comprime lo stream
    compressed, err := compress(strings.NewReader(original))
    if err != nil {
        panic(err)
    }

    // Decomprime lo stream
    decompressed, err := decompress(compressed)
    if err != nil {
        panic(err)
    }

    result, err := io.ReadAll(decompressed)
    if err != nil {
        panic(err)
    }

    fmt.Println(string(result))
    fmt.Println("Uguale all'originale:", string(result) == original)
}

Contesto e cancellazione negli stream

Quando uno stream è di lunga durata, ad esempio il download di un file di grandi dimensioni o la lettura da un socket con timeout, è essenziale poter annullare l'operazione in modo pulito. Il package context fornisce questo meccanismo. L'esempio seguente implementa un reader che rispetta la cancellazione del contesto verificandola ad ogni chunk.

package main

import (
    "context"
    "fmt"
    "io"
    "strings"
    "time"
)

// ContextReader avvolge un Reader e interrompe la lettura se il contesto viene cancellato
type ContextReader struct {
    ctx    context.Context
    source io.Reader
}

// Read controlla il contesto prima di ogni operazione di lettura
func (r *ContextReader) Read(buf []byte) (int, error) {
    select {
    case <-r.ctx.Done():
        // Il contesto è stato cancellato: restituisce l'errore appropriato
        return 0, r.ctx.Err()
    default:
    }
    return r.source.Read(buf)
}

func main() {
    // Contesto con timeout di 50 millisecondi
    ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
    defer cancel()

    // Simula uno stream di grandi dimensioni
    largeData := strings.Repeat("x", 1_000_000)
    reader := &ContextReader{
        ctx:    ctx,
        source: strings.NewReader(largeData),
    }

    buf := make([]byte, 1024)
    total := 0
    for {
        n, err := reader.Read(buf)
        total += n
        if err != nil {
            fmt.Printf("Lettura interrotta dopo %d byte: %v\n", total, err)
            break
        }
        // Simula un'elaborazione lenta per ogni chunk
        time.Sleep(1 * time.Millisecond)
    }
}

Stream di grandi file con io.ReadAll e alternative

io.ReadAll legge l'intero contenuto di un Reader in memoria. Per file piccoli è conveniente, ma per file di grandi dimensioni è una scelta rischiosa: un file da 4 GB richiede 4 GB di RAM. L'alternativa è processare il file in chunk senza mai materializzare l'intero contenuto.

package main

import (
    "bufio"
    "fmt"
    "os"
)

// countLines conta le righe di un file senza caricarlo interamente in memoria
func countLines(path string) (int, error) {
    f, err := os.Open(path)
    if err != nil {
        return 0, err
    }
    defer f.Close()

    // Scanner legge una riga alla volta con un buffer interno
    scanner := bufio.NewScanner(f)

    // Imposta un buffer massimo di 1 MB per riga
    scanner.Buffer(make([]byte, 64*1024), 1024*1024)

    count := 0
    for scanner.Scan() {
        count++
    }
    return count, scanner.Err()
}

func main() {
    // Crea un file di test temporaneo
    f, _ := os.CreateTemp("", "test*.txt")
    for i := 0; i < 1_000_000; i++ {
        fmt.Fprintf(f, "riga numero %d\n", i+1)
    }
    f.Close()
    defer os.Remove(f.Name())

    n, err := countLines(f.Name())
    if err != nil {
        panic(err)
    }
    fmt.Printf("Righe trovate: %d\n", n)
}

Transform streams con io.Pipe e goroutine

Un pattern ricorrente è la trasformazione di uno stream in un altro: convertire la codifica del testo, cifrare i dati, sostituire pattern. In Go questo si realizza combinando io.Pipe con una goroutine che legge dallo stream originale, trasforma i dati e li scrive sul pipe writer.

package main

import (
    "bufio"
    "fmt"
    "io"
    "strings"
    "unicode"
)

// upperCaseStream restituisce un Reader che trasforma il testo in maiuscolo on-the-fly
func upperCaseStream(src io.Reader) io.Reader {
    pr, pw := io.Pipe()

    go func() {
        scanner := bufio.NewScanner(src)
        for scanner.Scan() {
            // Converte ogni riga in maiuscolo e la scrive sul writer
            line := strings.Map(unicode.ToUpper, scanner.Text())
            if _, err := fmt.Fprintln(pw, line); err != nil {
                pw.CloseWithError(err)
                return
            }
        }
        pw.CloseWithError(scanner.Err())
    }()

    return pr
}

func main() {
    src := strings.NewReader("ciao mondo\ngo è fantastico\n")
    upper := upperCaseStream(src)

    result, err := io.ReadAll(upper)
    if err != nil {
        panic(err)
    }
    fmt.Print(string(result))
    // Stampa:
    // CIAO MONDO
    // GO È FANTASTICO
}

Gestione degli errori negli stream

Gli errori negli stream hanno natura diversa dagli errori ordinari in Go. io.EOF non è un errore nel senso comune: segnala semplicemente la fine dei dati. io.ErrUnexpectedEOF invece indica che lo stream si è chiuso nel mezzo di un'operazione che richiedeva più byte. La distinzione è cruciale quando si costruiscono parser o protocolli binari.

package main

import (
    "encoding/binary"
    "errors"
    "fmt"
    "io"
    "strings"
)

// readUint32 legge un intero a 32 bit in big-endian dallo stream
func readUint32(r io.Reader) (uint32, error) {
    buf := make([]byte, 4)
    _, err := io.ReadFull(r, buf)
    if err != nil {
        if errors.Is(err, io.EOF) {
            // EOF all'inizio di un record: fine legittima dello stream
            return 0, io.EOF
        }
        if errors.Is(err, io.ErrUnexpectedEOF) {
            // EOF nel mezzo di un record: errore di protocollo
            return 0, fmt.Errorf("record incompleto: %w", err)
        }
        return 0, err
    }
    return binary.BigEndian.Uint32(buf), nil
}

func main() {
    // Simula uno stream con due record completi e uno troncato
    data := "\x00\x00\x00\x2A\x00\x00\x01\x00\x00\x01"
    r := strings.NewReader(data)

    for {
        v, err := readUint32(r)
        if errors.Is(err, io.EOF) {
            fmt.Println("Fine dello stream")
            break
        }
        if err != nil {
            fmt.Printf("Errore: %v\n", err)
            break
        }
        fmt.Printf("Valore letto: %d\n", v)
    }
}

Benchmark e considerazioni sulle performance

La dimensione del buffer è il parametro con maggiore impatto sulle performance di I/O. Buffer troppo piccoli moltiplicano le syscall; buffer troppo grandi sprecano memoria e peggiorano la cache locality. In assenza di misurazioni, 4 KB o 8 KB sono buoni punti di partenza per I/O su disco, mentre 32 KB o 64 KB tendono ad essere ottimali per trasferimenti di rete.

package main

import (
    "io"
    "os"
    "testing"
)

// BenchmarkCopyBufferSize misura l'impatto della dimensione del buffer sulla copia di file
func BenchmarkCopyBufferSize(b *testing.B) {
    sizes := []int{512, 4 * 1024, 32 * 1024, 256 * 1024}

    for _, size := range sizes {
        b.Run(fmt.Sprintf("buf=%d", size), func(b *testing.B) {
            buf := make([]byte, size)
            b.ResetTimer()
            for i := 0; i < b.N; i++ {
                src, _ := os.Open("/dev/zero")
                dst, _ := os.OpenFile("/dev/null", os.O_WRONLY, 0)

                // Copia 10 MB con il buffer specificato
                limited := io.LimitReader(src, 10*1024*1024)
                io.CopyBuffer(dst, limited, buf)

                src.Close()
                dst.Close()
            }
        })
    }
}

Per ridurre le allocazioni nelle pipeline ad alto throughput conviene riusare i buffer tramite sync.Pool. Il pool restituisce buffer già allocati evitando il garbage collector.

package main

import (
    "io"
    "sync"
)

var bufPool = sync.Pool{
    // Crea un nuovo buffer da 32 KB quando il pool è vuoto
    New: func() any {
        buf := make([]byte, 32*1024)
        return &buf
    },
}

// copyWithPool copia da src a dst riusando buffer dal pool
func copyWithPool(dst io.Writer, src io.Reader) (int64, error) {
    bufPtr := bufPool.Get().(*[]byte)
    defer bufPool.Put(bufPtr)

    return io.CopyBuffer(dst, src, *bufPtr)
}

Conclusioni

Gli stream in Go non sono una feature isolata ma un modo di pensare al flusso dei dati: composizione di interfacce minimaliste, goroutine per il parallelismo, canali per la comunicazione e il backpressure, contesti per la cancellazione. La filosofia del linguaggio si riflette anche qui: niente magia, tutto esplicito, tutto componibile.

Partendo da io.Reader e io.Writer è possibile costruire pipeline sofisticate che gestiscono file da terabyte, stream di rete ad alto throughput o flussi di eventi in tempo reale, mantenendo il codice leggibile e il consumo di memoria sotto controllo. La chiave è non materializzare mai più dati di quelli necessari all'operazione corrente.

Torna su