Go non possiede un'astrazione nativa chiamata "stream" nel senso in cui la intendono linguaggi come Java o Rust,
eppure la piattaforma offre tutti i mattoni necessari per costruire pipeline di dati efficienti, componibili e
concorrenti. L'interfaccia io.Reader e io.Writer, i canali (channel), le goroutine
e il package bufio costituiscono insieme un ecosistema coerente che permette di trattare qualsiasi
sorgente o destinazione di dati come uno stream.
Questo articolo esplora in profondità come funzionano gli stream in Go, partendo dalle interfacce fondamentali, passando per la composizione di reader e writer, fino ad arrivare agli stream basati su canali e alle tecniche per gestire flussi di dati di grandi dimensioni senza saturare la memoria.
Le interfacce fondamentali: io.Reader e io.Writer
Il nucleo del modello di streaming in Go è costituito da due interfacce minimaliste definite nel package io:
// Definizione delle interfacce fondamentali per lo streaming
type Reader interface {
Read(buf []byte) (n int, err error)
}
type Writer interface {
Write(buf []byte) (n int, err error)
}
La semplicità è intenzionale. Un Reader sa soltanto leggere byte in un buffer; un Writer
sa soltanto scrivere byte da un buffer. Questa minimalità consente a centinaia di tipi diversi, dai file ai socket
di rete, dai buffer in memoria alle risposte HTTP, di implementare la stessa interfaccia e di essere trattati in
modo uniforme.
Il contratto di Read merita attenzione: il metodo restituisce il numero di byte effettivamente letti e
un eventuale errore. Quando lo stream è esaurito, restituisce 0, io.EOF. Un'implementazione corretta
deve essere pronta a ricevere meno byte di quanti ne abbia richiesti, anche se il buffer era sufficientemente grande.
Un Reader personalizzato
Implementare un Reader personalizzato è semplice. Il seguente esempio produce una sequenza infinita
di byte ciclando su un pattern prefissato:
package main
import (
"fmt"
"io"
)
// CyclicReader produce ciclicamente i byte di un pattern dato
type CyclicReader struct {
pattern []byte
offset int
}
// Read riempie il buffer con i byte del pattern, ciclando dall'inizio quando necessario
func (r *CyclicReader) Read(buf []byte) (int, error) {
for i := range buf {
buf[i] = r.pattern[r.offset%len(r.pattern)]
r.offset++
}
return len(buf), nil
}
func main() {
// Crea un reader che cicla sul pattern "ABCD"
reader := &CyclicReader{pattern: []byte("ABCD")}
// Legge i primi 10 byte dallo stream infinito
buf := make([]byte, 10)
n, err := reader.Read(buf)
if err != nil && err != io.EOF {
panic(err)
}
fmt.Printf("Letti %d byte: %s\n", n, buf[:n])
// Stampa: Letti 10 byte: ABCDABCDAB
}
Copiare stream con io.Copy
La funzione io.Copy rappresenta il modo idiomatico di trasferire dati da un Reader a un
Writer. Internamente alloca un buffer di 32 KB e ripete il ciclo read-write fino a EOF o errore.
Se il Writer implementa io.ReaderFrom o il Reader implementa
io.WriterTo, la funzione delega l'operazione a questi metodi, che possono sfruttare ottimizzazioni
a livello di sistema operativo come sendfile.
package main
import (
"io"
"os"
"strings"
)
func main() {
// Crea un reader da una stringa in memoria
src := strings.NewReader("Contenuto dello stream di esempio\n")
// Copia lo stream direttamente sullo stdout
written, err := io.Copy(os.Stdout, src)
if err != nil {
panic(err)
}
_ = written
}
Per copiare esattamente n byte si usa io.CopyN; per copiare usando un buffer fornito
dall'utente si usa io.CopyBuffer, utile quando si vuole controllare le allocazioni.
Composizione con io.TeeReader e io.MultiWriter
Go favorisce la composizione. io.TeeReader crea un Reader che, mentre viene letto,
scrive ogni byte anche su un Writer aggiuntivo. io.MultiWriter fa il contrario:
ogni scrittura viene propagata a più writer contemporaneamente.
package main
import (
"bytes"
"crypto/sha256"
"fmt"
"io"
"strings"
)
func main() {
// Sorgente dati
src := strings.NewReader("dati importanti da verificare")
// Hasher per calcolare il digest dello stream
hasher := sha256.New()
// Buffer per conservare una copia dei dati
var copyBuf bytes.Buffer
// TeeReader: legge da src e scrive contemporaneamente in hasher
tee := io.TeeReader(src, hasher)
// Copia il tutto nel buffer locale
if _, err := io.Copy(©Buf, tee); err != nil {
panic(err)
}
fmt.Printf("Dati: %s\n", copyBuf.String())
fmt.Printf("SHA-256: %x\n", hasher.Sum(nil))
}
package main
import (
"io"
"os"
)
func main() {
// MultiWriter: ogni scrittura va sia su stdout che su stderr
multi := io.MultiWriter(os.Stdout, os.Stderr)
_, _ = fmt.Fprintln(multi, "Questo messaggio appare su entrambi gli output")
}
Lettura bufferizzata con bufio
Leggere un byte alla volta da un Reader che esegue una syscall per ogni accesso è estremamente
inefficiente. Il package bufio risolve il problema avvolgendo qualsiasi Reader in un
layer che legge in anticipo un blocco di byte e li serve dalla memoria.
package main
import (
"bufio"
"fmt"
"strings"
)
func main() {
raw := strings.NewReader("prima riga\nseconda riga\nterza riga\n")
// Avvolge il reader con un buffer da 4096 byte
scanner := bufio.NewScanner(raw)
lineNum := 0
for scanner.Scan() {
lineNum++
// Stampa ogni riga con il suo numero
fmt.Printf("%d: %s\n", lineNum, scanner.Text())
}
if err := scanner.Err(); err != nil {
panic(err)
}
}
bufio.Scanner è particolarmente comodo per testo strutturato per righe. Per stream binari o per
leggere token di dimensione arbitraria, bufio.Reader espone metodi come ReadByte,
ReadRune, ReadString e ReadSlice.
Scanner con token personalizzati
La funzione di split dello scanner è configurabile tramite scanner.Split. L'esempio seguente
spezza lo stream in parole separate da qualsiasi spazio bianco:
package main
import (
"bufio"
"fmt"
"strings"
)
func main() {
src := strings.NewReader("Go è un linguaggio semplice e potente")
scanner := bufio.NewScanner(src)
// Usa la funzione di split predefinita per le parole
scanner.Split(bufio.ScanWords)
var words []string
for scanner.Scan() {
words = append(words, scanner.Text())
}
fmt.Println(words)
// Stampa: [Go è un linguaggio semplice e potente]
}
Stream basati su canali e goroutine
Quando la sorgente di dati è asincrona, ad esempio eventi di un sensore o messaggi da una coda, l'idioma Go preferito è inviare i dati su un canale da una goroutine produttrice e consumarli su un'altra goroutine ricevitrice. Questo schema si chiama pipeline.
package main
import (
"fmt"
"math/rand"
"time"
)
// generate produce numeri casuali a intervalli regolari finché il canale done non viene chiuso
func generate(done <-chan struct{}) <-chan int {
out := make(chan int)
go func() {
defer close(out)
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
for {
select {
case <-done:
return
case out <- rng.Intn(100):
}
}
}()
return out
}
// square eleva al quadrato ogni valore ricevuto
func square(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
select {
case <-done:
return
case out <- v * v:
}
}
}()
return out
}
func main() {
done := make(chan struct{})
defer close(done)
// Costruisce la pipeline: genera -> eleva al quadrato
nums := generate(done)
squares := square(done, nums)
// Consuma i primi cinque risultati
for i := 0; i < 5; i++ {
fmt.Println(<-squares)
}
}
Il canale done è il meccanismo di cancellazione: chiuderlo propaga il segnale di stop a tutte
le goroutine della pipeline, evitando goroutine zombie. Questa tecnica è alla base del package
context, che ne formalizza e generalizza il pattern.
Pipeline con fan-out e fan-in
Quando uno stadio della pipeline è computazionalmente costoso, è vantaggioso avviare più goroutine che elaborano lo stesso canale di input in parallelo (fan-out) e raccogliere i risultati in un unico canale di output (fan-in).
package main
import (
"fmt"
"sync"
)
// merge fonde più canali in uno solo
func merge(done <-chan struct{}, channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
merged := make(chan int)
// Avvia una goroutine per ogni canale in ingresso
forward := func(ch <-chan int) {
defer wg.Done()
for v := range ch {
select {
case <-done:
return
case merged <- v:
}
}
}
wg.Add(len(channels))
for _, ch := range channels {
go forward(ch)
}
// Chiude il canale merged quando tutti i forwarder hanno terminato
go func() {
wg.Wait()
close(merged)
}()
return merged
}
// slowProcess simula un'elaborazione lenta
func slowProcess(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for v := range in {
// Simula lavoro intensivo
result := v * v
select {
case <-done:
return
case out <- result:
}
}
}()
return out
}
func main() {
done := make(chan struct{})
defer close(done)
// Sorgente
source := make(chan int, 10)
for i := 1; i <= 10; i++ {
source <- i
}
close(source)
// Fan-out: tre worker in parallelo
w1 := slowProcess(done, source)
w2 := slowProcess(done, source)
w3 := slowProcess(done, source)
// Fan-in: raccoglie i risultati in ordine arbitrario
for v := range merge(done, w1, w2, w3) {
fmt.Println(v)
}
}
Stream HTTP con io.Pipe
io.Pipe crea una coppia sincrona reader/writer connessa da un buffer interno a dimensione zero.
Ogni scrittura blocca finché non viene consumata da una lettura corrispondente. Questo rende io.Pipe
ideale per connettere un produttore e un consumatore che operano su goroutine separate, senza allocare un
buffer intermedio.
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
)
// Record rappresenta un singolo record di dati
type Record struct {
ID int `json:"id"`
Value string `json:"value"`
}
// streamRecords scrive record JSON sullo writer in modo incrementale
func streamRecords(w io.Writer) error {
encoder := json.NewEncoder(w)
records := []Record{
{ID: 1, Value: "alpha"},
{ID: 2, Value: "beta"},
{ID: 3, Value: "gamma"},
}
for _, rec := range records {
if err := encoder.Encode(rec); err != nil {
return err
}
}
return nil
}
func handleStream(resp http.ResponseWriter, req *http.Request) {
pr, pw := io.Pipe()
// Goroutine produttrice: scrive sul pipe writer
go func() {
err := streamRecords(pw)
// Chiude il writer comunicando eventuale errore al reader
pw.CloseWithError(err)
}()
resp.Header().Set("Content-Type", "application/x-ndjson")
resp.Header().Set("Transfer-Encoding", "chunked")
// Copia i dati dal pipe reader verso il client HTTP
if _, err := io.Copy(resp, pr); err != nil {
fmt.Printf("Errore durante la copia: %v\n", err)
}
}
func main() {
http.HandleFunc("/stream", handleStream)
fmt.Println("Server in ascolto su :8080")
if err := http.ListenAndServe(":8080", nil); err != nil {
panic(err)
}
}
Limitare e controllare il flusso
In contesti reali è spesso necessario imporre limiti alla velocità con cui i dati fluiscono, sia per proteggere
risorse condivise, sia per rispettare quote di API esterne. La tecnica standard in Go è il token bucket,
implementato dal package golang.org/x/time/rate.
package main
import (
"context"
"fmt"
"time"
"golang.org/x/time/rate"
)
// ThrottledReader avvolge un reader e limita la velocità di lettura
type ThrottledReader struct {
source []byte
limiter *rate.Limiter
offset int
}
// Read legge dal reader interno rispettando il rate limiter
func (r *ThrottledReader) Read(buf []byte) (int, error) {
if r.offset >= len(r.source) {
return 0, fmt.Errorf("EOF")
}
// Attende il permesso dal rate limiter prima di procedere
ctx := context.Background()
if err := r.limiter.Wait(ctx); err != nil {
return 0, err
}
chunk := len(buf)
if r.offset+chunk > len(r.source) {
chunk = len(r.source) - r.offset
}
copy(buf[:chunk], r.source[r.offset:r.offset+chunk])
r.offset += chunk
return chunk, nil
}
func main() {
data := []byte("stream di dati da leggere a velocità controllata")
// Limita a 10 token al secondo con un burst massimo di 5
limiter := rate.NewLimiter(rate.Limit(10), 5)
reader := &ThrottledReader{source: data, limiter: limiter}
buf := make([]byte, 5)
start := time.Now()
total := 0
for {
n, err := reader.Read(buf)
total += n
if err != nil {
break
}
}
fmt.Printf("Letti %d byte in %v\n", total, time.Since(start).Round(time.Millisecond))
}
Compressione e decompressione on-the-fly
Poiché compress/gzip implementa le interfacce io.Reader e io.Writer,
è possibile inserire la compressione e la decompressione direttamente nella pipeline senza buffer intermedi
in memoria.
package main
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"strings"
)
func compress(src io.Reader) (io.Reader, error) {
var buf bytes.Buffer
// gzip.Writer scrive dati compressi nel buffer
gz := gzip.NewWriter(&buf)
if _, err := io.Copy(gz, src); err != nil {
return nil, err
}
// Flush e chiusura sono necessari per completare il blocco gzip
if err := gz.Close(); err != nil {
return nil, err
}
return &buf, nil
}
func decompress(src io.Reader) (io.Reader, error) {
// gzip.Reader decomprime on-the-fly mentre viene letto
return gzip.NewReader(src)
}
func main() {
original := "Questo testo verrà compresso e poi decompresso interamente via stream."
// Comprime lo stream
compressed, err := compress(strings.NewReader(original))
if err != nil {
panic(err)
}
// Decomprime lo stream
decompressed, err := decompress(compressed)
if err != nil {
panic(err)
}
result, err := io.ReadAll(decompressed)
if err != nil {
panic(err)
}
fmt.Println(string(result))
fmt.Println("Uguale all'originale:", string(result) == original)
}
Contesto e cancellazione negli stream
Quando uno stream è di lunga durata, ad esempio il download di un file di grandi dimensioni o la lettura
da un socket con timeout, è essenziale poter annullare l'operazione in modo pulito. Il package
context fornisce questo meccanismo. L'esempio seguente implementa un reader che rispetta
la cancellazione del contesto verificandola ad ogni chunk.
package main
import (
"context"
"fmt"
"io"
"strings"
"time"
)
// ContextReader avvolge un Reader e interrompe la lettura se il contesto viene cancellato
type ContextReader struct {
ctx context.Context
source io.Reader
}
// Read controlla il contesto prima di ogni operazione di lettura
func (r *ContextReader) Read(buf []byte) (int, error) {
select {
case <-r.ctx.Done():
// Il contesto è stato cancellato: restituisce l'errore appropriato
return 0, r.ctx.Err()
default:
}
return r.source.Read(buf)
}
func main() {
// Contesto con timeout di 50 millisecondi
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
// Simula uno stream di grandi dimensioni
largeData := strings.Repeat("x", 1_000_000)
reader := &ContextReader{
ctx: ctx,
source: strings.NewReader(largeData),
}
buf := make([]byte, 1024)
total := 0
for {
n, err := reader.Read(buf)
total += n
if err != nil {
fmt.Printf("Lettura interrotta dopo %d byte: %v\n", total, err)
break
}
// Simula un'elaborazione lenta per ogni chunk
time.Sleep(1 * time.Millisecond)
}
}
Stream di grandi file con io.ReadAll e alternative
io.ReadAll legge l'intero contenuto di un Reader in memoria. Per file piccoli
è conveniente, ma per file di grandi dimensioni è una scelta rischiosa: un file da 4 GB richiede 4 GB
di RAM. L'alternativa è processare il file in chunk senza mai materializzare l'intero contenuto.
package main
import (
"bufio"
"fmt"
"os"
)
// countLines conta le righe di un file senza caricarlo interamente in memoria
func countLines(path string) (int, error) {
f, err := os.Open(path)
if err != nil {
return 0, err
}
defer f.Close()
// Scanner legge una riga alla volta con un buffer interno
scanner := bufio.NewScanner(f)
// Imposta un buffer massimo di 1 MB per riga
scanner.Buffer(make([]byte, 64*1024), 1024*1024)
count := 0
for scanner.Scan() {
count++
}
return count, scanner.Err()
}
func main() {
// Crea un file di test temporaneo
f, _ := os.CreateTemp("", "test*.txt")
for i := 0; i < 1_000_000; i++ {
fmt.Fprintf(f, "riga numero %d\n", i+1)
}
f.Close()
defer os.Remove(f.Name())
n, err := countLines(f.Name())
if err != nil {
panic(err)
}
fmt.Printf("Righe trovate: %d\n", n)
}
Transform streams con io.Pipe e goroutine
Un pattern ricorrente è la trasformazione di uno stream in un altro: convertire la codifica del testo,
cifrare i dati, sostituire pattern. In Go questo si realizza combinando io.Pipe con una
goroutine che legge dallo stream originale, trasforma i dati e li scrive sul pipe writer.
package main
import (
"bufio"
"fmt"
"io"
"strings"
"unicode"
)
// upperCaseStream restituisce un Reader che trasforma il testo in maiuscolo on-the-fly
func upperCaseStream(src io.Reader) io.Reader {
pr, pw := io.Pipe()
go func() {
scanner := bufio.NewScanner(src)
for scanner.Scan() {
// Converte ogni riga in maiuscolo e la scrive sul writer
line := strings.Map(unicode.ToUpper, scanner.Text())
if _, err := fmt.Fprintln(pw, line); err != nil {
pw.CloseWithError(err)
return
}
}
pw.CloseWithError(scanner.Err())
}()
return pr
}
func main() {
src := strings.NewReader("ciao mondo\ngo è fantastico\n")
upper := upperCaseStream(src)
result, err := io.ReadAll(upper)
if err != nil {
panic(err)
}
fmt.Print(string(result))
// Stampa:
// CIAO MONDO
// GO È FANTASTICO
}
Gestione degli errori negli stream
Gli errori negli stream hanno natura diversa dagli errori ordinari in Go. io.EOF non è un
errore nel senso comune: segnala semplicemente la fine dei dati. io.ErrUnexpectedEOF invece
indica che lo stream si è chiuso nel mezzo di un'operazione che richiedeva più byte. La distinzione è
cruciale quando si costruiscono parser o protocolli binari.
package main
import (
"encoding/binary"
"errors"
"fmt"
"io"
"strings"
)
// readUint32 legge un intero a 32 bit in big-endian dallo stream
func readUint32(r io.Reader) (uint32, error) {
buf := make([]byte, 4)
_, err := io.ReadFull(r, buf)
if err != nil {
if errors.Is(err, io.EOF) {
// EOF all'inizio di un record: fine legittima dello stream
return 0, io.EOF
}
if errors.Is(err, io.ErrUnexpectedEOF) {
// EOF nel mezzo di un record: errore di protocollo
return 0, fmt.Errorf("record incompleto: %w", err)
}
return 0, err
}
return binary.BigEndian.Uint32(buf), nil
}
func main() {
// Simula uno stream con due record completi e uno troncato
data := "\x00\x00\x00\x2A\x00\x00\x01\x00\x00\x01"
r := strings.NewReader(data)
for {
v, err := readUint32(r)
if errors.Is(err, io.EOF) {
fmt.Println("Fine dello stream")
break
}
if err != nil {
fmt.Printf("Errore: %v\n", err)
break
}
fmt.Printf("Valore letto: %d\n", v)
}
}
Benchmark e considerazioni sulle performance
La dimensione del buffer è il parametro con maggiore impatto sulle performance di I/O. Buffer troppo piccoli moltiplicano le syscall; buffer troppo grandi sprecano memoria e peggiorano la cache locality. In assenza di misurazioni, 4 KB o 8 KB sono buoni punti di partenza per I/O su disco, mentre 32 KB o 64 KB tendono ad essere ottimali per trasferimenti di rete.
package main
import (
"io"
"os"
"testing"
)
// BenchmarkCopyBufferSize misura l'impatto della dimensione del buffer sulla copia di file
func BenchmarkCopyBufferSize(b *testing.B) {
sizes := []int{512, 4 * 1024, 32 * 1024, 256 * 1024}
for _, size := range sizes {
b.Run(fmt.Sprintf("buf=%d", size), func(b *testing.B) {
buf := make([]byte, size)
b.ResetTimer()
for i := 0; i < b.N; i++ {
src, _ := os.Open("/dev/zero")
dst, _ := os.OpenFile("/dev/null", os.O_WRONLY, 0)
// Copia 10 MB con il buffer specificato
limited := io.LimitReader(src, 10*1024*1024)
io.CopyBuffer(dst, limited, buf)
src.Close()
dst.Close()
}
})
}
}
Per ridurre le allocazioni nelle pipeline ad alto throughput conviene riusare i buffer tramite
sync.Pool. Il pool restituisce buffer già allocati evitando il garbage collector.
package main
import (
"io"
"sync"
)
var bufPool = sync.Pool{
// Crea un nuovo buffer da 32 KB quando il pool è vuoto
New: func() any {
buf := make([]byte, 32*1024)
return &buf
},
}
// copyWithPool copia da src a dst riusando buffer dal pool
func copyWithPool(dst io.Writer, src io.Reader) (int64, error) {
bufPtr := bufPool.Get().(*[]byte)
defer bufPool.Put(bufPtr)
return io.CopyBuffer(dst, src, *bufPtr)
}
Conclusioni
Gli stream in Go non sono una feature isolata ma un modo di pensare al flusso dei dati: composizione di interfacce minimaliste, goroutine per il parallelismo, canali per la comunicazione e il backpressure, contesti per la cancellazione. La filosofia del linguaggio si riflette anche qui: niente magia, tutto esplicito, tutto componibile.
Partendo da io.Reader e io.Writer è possibile costruire pipeline sofisticate che
gestiscono file da terabyte, stream di rete ad alto throughput o flussi di eventi in tempo reale, mantenendo
il codice leggibile e il consumo di memoria sotto controllo. La chiave è non materializzare mai più dati
di quelli necessari all'operazione corrente.