Uno degli aspetti più potenti e caratteristici di Node.js è la sua capacità di gestire i dati in modo incrementale attraverso il meccanismo degli stream. Invece di caricare un intero file in memoria prima di elaborarlo, gli stream consentono di leggere o scrivere dati pezzo per pezzo, riducendo drasticamente il consumo di memoria e migliorando la reattività delle applicazioni. Questa guida approfondisce il funzionamento degli stream in Node.js, i tipi disponibili, i pattern più comuni e le tecniche avanzate per sfruttarli al meglio.
Cosa sono gli stream
In informatica, uno stream è una sequenza di dati resi disponibili nel tempo. Invece di ricevere tutti i dati in un unico blocco, con uno stream si lavora su porzioni successive chiamate chunk. Questo modello rispecchia fedelmente la realtà di molte operazioni di I/O: leggere un file da disco, ricevere dati da una connessione di rete, leggere l'input dell'utente dalla riga di comando.
In Node.js gli stream sono implementati come istanze di EventEmitter e fanno parte
del modulo nativo stream. Esistono quattro tipi fondamentali:
- Readable: sorgente da cui si leggono dati.
- Writable: destinazione in cui si scrivono dati.
- Duplex: sia leggibile che scrivibile (ad esempio un socket TCP).
- Transform: duplex che trasforma i dati in transito (ad esempio la compressione gzip).
Perché usare gli stream
Consideriamo il caso di un file da 2 GB da leggere e inviare come risposta HTTP. Senza stream, l'approccio ingenuo sarebbe il seguente:
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
// Carica l'intero file in memoria prima di rispondere
fs.readFile('./large-archive.zip', (err, data) => {
if (err) {
res.statusCode = 500;
return res.end('Internal server error');
}
res.end(data);
});
}).listen(3000);
Questo codice funziona per file piccoli, ma con un file da 2 GB alloca 2 GB di RAM per ogni richiesta concorrente. Con dieci richieste parallele si arriva a 20 GB di memoria occupata. La variante con stream risolve elegantemente il problema:
const http = require('http');
const fs = require('fs');
http.createServer((req, res) => {
// Invia il file al client pezzo per pezzo senza caricarlo in memoria
const readable = fs.createReadStream('./large-archive.zip');
readable.pipe(res);
}).listen(3000);
Il file viene letto a blocchi e ogni blocco viene immediatamente inviato al client. L'occupazione di memoria rimane costante indipendentemente dalla dimensione del file e dal numero di client connessi simultaneamente.
Stream Readable
Uno stream Readable rappresenta una sorgente di dati. Il modulo fs
espone fs.createReadStream() come metodo principale per creare stream di lettura
da file.
const fs = require('fs');
const readStream = fs.createReadStream('./data.csv', {
encoding: 'utf8',
highWaterMark: 64 * 1024 // chunk da 64 KB
});
readStream.on('data', (chunk) => {
console.log(`Ricevuto chunk di ${chunk.length} caratteri`);
});
readStream.on('end', () => {
// Tutti i dati sono stati letti
console.log('Lettura completata');
});
readStream.on('error', (err) => {
console.error('Errore di lettura:', err.message);
});
L'opzione highWaterMark controlla la dimensione dei chunk. Il valore predefinito
è 16 KB per gli stream binari e 16 caratteri per gli stream in modalità oggetto.
Modalità flowing e paused
Uno stream Readable può trovarsi in due stati: paused (predefinito) e flowing.
In modalità paused i dati vengono bufferizzati internamente finché non si chiede esplicitamente
di leggerli con read(). In modalità flowing i dati vengono emessi non appena
disponibili tramite l'evento data.
Il passaggio tra le due modalità avviene automaticamente: aggiungere un listener sull'evento
data o chiamare resume() attiva la modalità flowing; chiamare
pause() o collegare lo stream tramite pipe() gestisce il flusso
in modo trasparente.
const fs = require('fs');
const readStream = fs.createReadStream('./log.txt', { encoding: 'utf8' });
// Lettura manuale in modalità paused
readStream.on('readable', () => {
let chunk;
// Legge blocchi da 1 KB finché il buffer è disponibile
while ((chunk = readStream.read(1024)) !== null) {
process.stdout.write(chunk);
}
});
readStream.on('end', () => {
console.log('\nFine del file');
});
Stream Readable con async/await
A partire da Node.js 10, gli stream Readable implementano il protocollo async iterable,
consentendo di usarli con for await...of:
const fs = require('fs');
async function readFile(filePath) {
const readStream = fs.createReadStream(filePath, { encoding: 'utf8' });
let content = '';
// Ogni iterazione riceve un chunk dallo stream
for await (const chunk of readStream) {
content += chunk;
}
return content;
}
readFile('./config.json')
.then((text) => console.log(JSON.parse(text)))
.catch(console.error);
Stream Writable
Uno stream Writable rappresenta una destinazione per i dati. Il metodo principale
è write(chunk, [encoding], [callback]), che restituisce false quando
il buffer interno ha raggiunto la soglia highWaterMark, segnalando che si dovrebbe
attendere prima di scrivere ulteriori dati.
const fs = require('fs');
const writeStream = fs.createWriteStream('./output.log', { flags: 'a' });
function writeLine(ws, line) {
// write() restituisce false se il buffer interno è saturo
const canContinue = ws.write(line + '\n', 'utf8');
if (!canContinue) {
// Il buffer è pieno: attendiamo lo svuotamento prima di continuare
ws.once('drain', () => {
console.log('Buffer svuotato, pronto a continuare');
});
}
}
writeLine(writeStream, '[INFO] Avvio applicazione');
writeLine(writeStream, '[INFO] Configurazione caricata');
// Segnala la fine della scrittura e chiude il file
writeStream.end(() => {
console.log('File chiuso correttamente');
});
La gestione del backpressure tramite il controllo del valore restituito da
write() e l'ascolto dell'evento drain è fondamentale per evitare
di saturare la memoria del processo.
Il metodo pipe
Il metodo pipe(destination) collega automaticamente uno stream Readable a uno
Writable, gestendo il backpressure in modo trasparente. Rappresenta il modo più semplice e
idiomatico di connettere stream in Node.js.
const fs = require('fs');
const zlib = require('zlib');
// Comprime un file con gzip collegando tre stream in sequenza
const source = fs.createReadStream('./archive.tar');
const destination = fs.createWriteStream('./archive.tar.gz');
const gzip = zlib.createGzip();
source
.pipe(gzip) // dati grezzi → compressore gzip
.pipe(destination) // dati compressi → file di destinazione
.on('finish', () => {
console.log('Compressione completata');
})
.on('error', (err) => {
console.error('Errore durante la compressione:', err.message);
});
Un limite di pipe() è che non propaga automaticamente gli errori lungo la catena.
Se source emette un errore, gzip e destination non
vengono chiusi automaticamente, lasciando descrittori di file aperti. Per questo motivo,
dalla versione 10 di Node.js è preferibile usare stream.pipeline().
stream.pipeline e stream/promises
La funzione pipeline() del modulo stream è l'alternativa moderna a
pipe(). Propaga correttamente gli errori e chiude tutti gli stream nella catena
in caso di fallimento.
const { pipeline } = require('stream');
const { promisify } = require('util');
const fs = require('fs');
const zlib = require('zlib');
// Convertiamo pipeline in una funzione che restituisce una Promise
const pipelineAsync = promisify(pipeline);
async function compress(inputPath, outputPath) {
await pipelineAsync(
fs.createReadStream(inputPath),
zlib.createGzip(),
fs.createWriteStream(outputPath)
);
console.log(`File compresso: ${outputPath}`);
}
compress('./video.mp4', './video.mp4.gz').catch(console.error);
A partire da Node.js 15, il sottomodulo stream/promises espone direttamente una
versione di pipeline che restituisce una Promise, rendendo superfluo il wrapping
con promisify:
const { pipeline } = require('stream/promises');
const fs = require('fs');
const zlib = require('zlib');
async function decompress(inputPath, outputPath) {
// pipeline gestisce errori e chiusura di tutti gli stream automaticamente
await pipeline(
fs.createReadStream(inputPath),
zlib.createGunzip(),
fs.createWriteStream(outputPath)
);
console.log('Decompressione completata');
}
Stream Transform
Uno stream Transform riceve dati in ingresso, li elabora e produce dati in uscita.
È il tipo più versatile: si implementa sovrascrivendo il metodo
_transform(chunk, encoding, callback). Il metodo opzionale _flush(callback)
viene invocato quando non ci sono più dati in ingresso, prima che lo stream emetta l'evento
finish: è il momento giusto per emettere dati residui dal buffer interno.
const { Transform } = require('stream');
class UpperCaseTransform extends Transform {
_transform(chunk, encoding, callback) {
// Converte ogni chunk in maiuscolo e lo rimette nello stream
this.push(chunk.toString().toUpperCase());
callback();
}
}
class LineCounter extends Transform {
constructor(options) {
super(options);
this.count = 0;
}
_transform(chunk, encoding, callback) {
// Conta le righe contando i caratteri newline nel chunk
const lines = chunk.toString().split('\n');
this.count += lines.length - 1;
this.push(chunk); // inoltra il chunk invariato
callback();
}
_flush(callback) {
// Chiamato una volta sola al termine dello stream
console.log(`Righe totali: ${this.count}`);
callback();
}
}
const { pipeline } = require('stream/promises');
const fs = require('fs');
async function processFile() {
const lineCounter = new LineCounter();
await pipeline(
fs.createReadStream('./source.txt'),
lineCounter,
new UpperCaseTransform(),
fs.createWriteStream('./result.txt')
);
}
processFile().catch(console.error);
Stream in modalità oggetto
Per impostazione predefinita, gli stream lavorano con Buffer e stringhe. Attivando
l'opzione objectMode: true, uno stream può trasportare qualsiasi valore JavaScript,
inclusi oggetti, array e numeri. Questa modalità è comunemente usata nelle pipeline di
elaborazione dati strutturati.
const { Transform, Readable, Writable } = require('stream');
// Sorgente che emette oggetti JavaScript invece di Buffer
const userSource = new Readable({
objectMode: true,
read() {
const users = [
{ id: 1, name: 'Alice', age: 30 },
{ id: 2, name: 'Bob', age: 17 },
{ id: 3, name: 'Carol', age: 25 },
null // segnala la fine dello stream
];
for (const user of users) {
this.push(user);
}
}
});
// Lascia passare solo gli utenti maggiorenni
const adultFilter = new Transform({
objectMode: true,
transform(user, _enc, cb) {
if (user.age >= 18) {
this.push(user);
}
cb();
}
});
// Stampa a console ogni oggetto ricevuto
const printer = new Writable({
objectMode: true,
write(user, _enc, cb) {
console.log(`${user.name} (${user.age} anni)`);
cb();
}
});
const { pipeline } = require('stream/promises');
pipeline(userSource, adultFilter, printer).catch(console.error);
Creare stream Readable personalizzati
Implementare un proprio stream Readable è utile quando si vuole esporre come stream una sorgente
dati personalizzata: il risultato di una query a database, una sequenza generata algoritmicamente
o dati provenienti da un'API esterna. Il metodo _read() viene chiamato ogni volta
che il consumatore chiede altri dati.
const { Readable } = require('stream');
class PeriodicCounter extends Readable {
constructor(limit, intervalMs) {
super({ objectMode: true });
this.current = 1;
this.limit = limit;
this.intervalMs = intervalMs;
}
_read() {
// Quando il contatore supera il limite, segnaliamo la fine
if (this.current > this.limit) {
this.push(null);
return;
}
// Emette un oggetto ogni intervalMs millisecondi
setTimeout(() => {
this.push({ value: this.current++, timestamp: Date.now() });
}, this.intervalMs);
}
}
const counter = new PeriodicCounter(5, 500);
counter.on('data', ({ value, timestamp }) => {
console.log(`Valore: ${value} | Timestamp: ${timestamp}`);
});
counter.on('end', () => {
console.log('Contatore terminato');
});
Backpressure: gestire il flusso dei dati
Il backpressure è il meccanismo con cui uno stream Writable segnala al produttore di
rallentare perché il buffer interno ha raggiunto la soglia. Ignorare il backpressure porta a
un accumulo di dati in memoria che può terminare il processo con un errore
JavaScript heap out of memory.
Ecco un esempio di implementazione manuale corretta del backpressure, che illustra esattamente
la logica che pipe() applica internamente in modo automatico:
const fs = require('fs');
function copyFile(sourcePath, destPath) {
const readStream = fs.createReadStream(sourcePath);
const writeStream = fs.createWriteStream(destPath);
readStream.on('data', (chunk) => {
const canContinue = writeStream.write(chunk);
if (!canContinue) {
// Il Writable ci chiede di fermarci: mettiamo in pausa la lettura
readStream.pause();
writeStream.once('drain', () => {
// Il buffer è stato svuotato: riprendiamo a leggere
readStream.resume();
});
}
});
readStream.on('end', () => {
// Tutti i dati sono stati letti: chiudiamo lo stream di scrittura
writeStream.end();
});
writeStream.on('finish', () => {
console.log('Copia completata con gestione corretta del backpressure');
});
}
copyFile('./source.bin', './copy.bin');
Stream e HTTP
Nel modulo http di Node.js, sia la richiesta (IncomingMessage) che
la risposta (ServerResponse) sono stream. La richiesta è un Readable, la risposta
è un Writable. Questo significa che tutti i pattern visti finora si applicano direttamente
alla gestione del traffico HTTP.
const http = require('http');
const zlib = require('zlib');
const fs = require('fs');
const { pipeline } = require('stream/promises');
const server = http.createServer(async (req, res) => {
if (req.method === 'POST' && req.url === '/upload') {
// Il body della richiesta è uno stream Readable: lo salviamo su disco
const writeStream = fs.createWriteStream('./received-upload.bin');
try {
await pipeline(req, writeStream);
res.writeHead(200);
res.end('Upload completato\n');
} catch (err) {
res.writeHead(500);
res.end('Errore durante l\'upload\n');
}
return;
}
if (req.method === 'GET' && req.url === '/download') {
res.writeHead(200, {
'Content-Type': 'application/octet-stream',
'Content-Encoding': 'gzip'
});
// La risposta è uno stream Writable: vi scriviamo direttamente i dati compressi
try {
await pipeline(
fs.createReadStream('./data.bin'),
zlib.createGzip(),
res
);
} catch (err) {
console.error('Errore durante il download:', err.message);
}
return;
}
res.writeHead(404);
res.end('Non trovato\n');
});
server.listen(3000, () => {
console.log('Server in ascolto sulla porta 3000');
});
Elaborazione di file CSV con stream Transform
Un caso d'uso pratico e comune è l'elaborazione di file CSV di grandi dimensioni. Il seguente
esempio mostra come analizzare un CSV riga per riga usando una pipeline di stream Transform,
senza caricare l'intero file in memoria. Si noti la gestione del buffer parziale nel metodo
_transform, necessaria perché un singolo chunk potrebbe spezzare una riga a metà.
const { Transform } = require('stream');
const fs = require('fs');
const { pipeline } = require('stream/promises');
class CSVParser extends Transform {
constructor(separator = ',') {
super({ readableObjectMode: true });
this.separator = separator;
this.headers = null;
this.partialBuffer = '';
}
_transform(chunk, _enc, cb) {
this.partialBuffer += chunk.toString('utf8');
const lines = this.partialBuffer.split('\n');
// L'ultima riga potrebbe essere incompleta: la teniamo da parte
this.partialBuffer = lines.pop();
for (const line of lines) {
const values = line.trim().split(this.separator);
if (!this.headers) {
// La prima riga contiene le intestazioni
this.headers = values;
} else if (values.length === this.headers.length) {
const record = Object.fromEntries(
this.headers.map((h, i) => [h, values[i]])
);
this.push(record);
}
}
cb();
}
_flush(cb) {
// Processiamo l'eventuale riga rimasta nel buffer parziale
if (this.partialBuffer.trim()) {
const values = this.partialBuffer.trim().split(this.separator);
if (this.headers && values.length === this.headers.length) {
this.push(Object.fromEntries(
this.headers.map((h, i) => [h, values[i]])
));
}
}
cb();
}
}
class ColumnFilter extends Transform {
constructor(column, value) {
super({ objectMode: true });
this.column = column;
this.value = value;
}
_transform(record, _enc, cb) {
// Lascia passare solo i record con il valore atteso nella colonna specificata
if (record[this.column] === this.value) {
this.push(record);
}
cb();
}
}
class JSONLSerializer extends Transform {
constructor() {
super({ writableObjectMode: true });
}
_transform(record, _enc, cb) {
// Serializza ogni oggetto come riga JSON (formato JSON Lines)
this.push(JSON.stringify(record) + '\n');
cb();
}
}
async function processCSV() {
await pipeline(
fs.createReadStream('./sales.csv'),
new CSVParser(),
new ColumnFilter('region', 'North'),
new JSONLSerializer(),
fs.createWriteStream('./sales-north.jsonl')
);
console.log('Elaborazione CSV completata');
}
processCSV().catch(console.error);
Stream con i Worker Threads
Per elaborazioni CPU-intensive è possibile combinare gli stream con i Worker Threads. Il thread principale gestisce l'I/O tramite stream, mentre i worker eseguono calcoli pesanti senza bloccare l'event loop. La comunicazione tra thread avviene tramite messaggi serializzati.
// worker.js
const { workerData, parentPort } = require('worker_threads');
const zlib = require('zlib');
// Decomprime il buffer ricevuto e conta le parole nel testo risultante
zlib.gunzip(workerData, (err, buffer) => {
if (err) {
parentPort.postMessage({ error: err.message });
return;
}
const text = buffer.toString('utf8');
const wordCount = text.split(/\s+/).filter(Boolean).length;
parentPort.postMessage({ wordCount });
});
// main.js
const { Worker } = require('worker_threads');
const fs = require('fs');
async function countWords(filePath) {
// Legge l'intero file compresso tramite stream e lo concatena in un Buffer
const chunks = [];
const readStream = fs.createReadStream(filePath);
for await (const chunk of readStream) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
// Spedisce il Buffer al worker e attende il risultato
return new Promise((resolve, reject) => {
const worker = new Worker('./worker.js', { workerData: buffer });
worker.on('message', (msg) => {
if (msg.error) reject(new Error(msg.error));
else resolve(msg.wordCount);
});
worker.on('error', reject);
});
}
countWords('./text.txt.gz')
.then((n) => console.log(`Parole trovate: ${n}`))
.catch(console.error);
Pattern avanzati
Merging di stream multipli
Quando si hanno più sorgenti da leggere in parallelo e si vuole unire i loro dati in un singolo
stream, si può ricorrere a uno stream PassThrough. I dati di ogni sorgente vengono
inoltrati al PassThrough tramite pipe(), con l'accortezza di non chiudere lo
stream di destinazione finché tutte le sorgenti non hanno terminato.
const { PassThrough } = require('stream');
const fs = require('fs');
function mergeStreams(...sources) {
const passThrough = new PassThrough();
let active = sources.length; // conta le sorgenti ancora attive
for (const src of sources) {
// Collega ogni sorgente al PassThrough senza chiuderlo automaticamente
src.pipe(passThrough, { end: false });
src.on('end', () => {
active--;
// Chiude il PassThrough solo quando tutte le sorgenti sono terminate
if (active === 0) {
passThrough.end();
}
});
src.on('error', (err) => passThrough.destroy(err));
}
return passThrough;
}
const merged = mergeStreams(
fs.createReadStream('./part1.txt'),
fs.createReadStream('./part2.txt'),
fs.createReadStream('./part3.txt')
);
merged.pipe(fs.createWriteStream('./complete.txt'));
Stream con timeout
In scenari di rete è spesso necessario interrompere uno stream se non arrivano dati entro un
certo intervallo di tempo. Il pattern seguente usa Promise.race per far scadere
l'operazione:
const { pipeline } = require('stream/promises');
const { setTimeout: delay } = require('timers/promises');
async function withTimeout(promise, ms) {
// Crea una Promise che rigetta dopo ms millisecondi
const timeout = delay(ms).then(() => {
throw new Error(`Operazione scaduta dopo ${ms}ms`);
});
// Vince la prima tra le due Promises a completarsi
return Promise.race([promise, timeout]);
}
const http = require('http');
const fs = require('fs');
async function downloadWithTimeout(url, destPath, timeoutMs) {
const response = await new Promise((resolve, reject) => {
http.get(url, resolve).on('error', reject);
});
await withTimeout(
pipeline(response, fs.createWriteStream(destPath)),
timeoutMs
);
}
downloadWithTimeout('http://example.com/file.bin', './downloaded.bin', 10000)
.then(() => console.log('Download completato'))
.catch(console.error);
Monitoraggio e debug degli stream
Per monitorare lo stato interno di uno stream è possibile accedere alle proprietà
readableLength, writableLength, readableHighWaterMark
e writableHighWaterMark, che espongono rispettivamente la quantità di dati
attualmente presenti nei buffer interni e le soglie configurate.
const fs = require('fs');
const readStream = fs.createReadStream('./large.bin');
const writeStream = fs.createWriteStream('./copy.bin');
// Campiona lo stato dei buffer ogni 100ms
const monitor = setInterval(() => {
console.log({
readableBuffered: readStream.readableLength,
writableBuffered: writeStream.writableLength,
bufferFull: writeStream.writableLength >= writeStream.writableHighWaterMark
});
}, 100);
readStream.pipe(writeStream);
writeStream.on('finish', () => {
clearInterval(monitor);
console.log('Copia completata');
});
Per un debug più approfondito, è possibile usare la variabile d'ambiente
NODE_DEBUG=stream che abilita i log interni del modulo stream di Node.js,
mostrando ogni operazione di lettura, scrittura e cambio di stato.
Lettura riga per riga con readline
Per la lettura riga per riga di file di testo, il modulo readline offre
un'astrazione comoda che si appoggia internamente sugli stream. Implementa anch'esso il
protocollo async iterable, rendendolo compatibile con for await...of:
const fs = require('fs');
const readline = require('readline');
async function processLines(filePath) {
const rl = readline.createInterface({
input: fs.createReadStream(filePath),
crlfDelay: Infinity // gestisce sia \n che \r\n
});
let lineNumber = 0;
// Ogni iterazione riceve una riga completa, senza newline finale
for await (const line of rl) {
lineNumber++;
if (line.includes('ERROR')) {
console.log(`Riga ${lineNumber}: ${line}`);
}
}
console.log(`Elaborate ${lineNumber} righe`);
}
processLines('./application.log').catch(console.error);
Confronto con le API alternative
Gli stream non sono sempre la scelta ottimale. Per file di piccole dimensioni (sotto i pochi
megabyte) l'overhead dell'infrastruttura degli stream supera i vantaggi. In quel caso,
fs.readFile e fs.writeFile sono più semplici e altrettanto efficienti.
Le linee guida seguenti aiutano a orientarsi nella scelta:
- File piccoli e uso occasionale:
fs.readFile/fs.writeFile. - File grandi o molte richieste concorrenti: stream con
pipeline(). - Lettura riga per riga:
readlineconfor await...of. - Elaborazione strutturata di dati sequenziali: pipeline di Transform in modalità oggetto.
Conclusioni
Gli stream sono uno dei pilastri dell'architettura di Node.js e la loro comprensione è indispensabile per scrivere applicazioni efficienti. Permettono di gestire dati di qualsiasi dimensione con un'occupazione di memoria costante, di costruire pipeline di elaborazione componibili e di integrare naturalmente operazioni di I/O che per loro natura producono o consumano dati in modo incrementale.
La scelta tra pipe(), pipeline() e la gestione manuale degli eventi
dipende dal contesto: pipeline() con la sua versione basata su Promise è oggi la
soluzione raccomandata per la maggior parte dei casi, in quanto garantisce la corretta
propagazione degli errori e la chiusura delle risorse. La modalità async iterable con
for await...of semplifica ulteriormente il codice quando non si ha bisogno di
comporre pipeline complesse.
Investire nel padroneggiare gli stream significa acquisire uno strumento che rimane rilevante in ogni contesto: dai server HTTP ad alto traffico, all'elaborazione di log, alla costruzione di pipeline ETL, fino alla comunicazione in tempo reale. La loro natura event-driven si armonizza perfettamente con il modello asincrono di Node.js, rendendoli uno strumento naturale ed espressivo per chiunque lavori con questa piattaforma.