RxJS (Reactive Extensions for JavaScript) è una libreria basata sul pattern Observer e sul paradigma di programmazione reattiva. In pratica, ti permette di lavorare con flussi di dati asincroni e concorrenti (eventi, richieste HTTP, stream di file, WebSocket, timer, ecc.) in modo dichiarativo e composibile.
Anche se RxJS è spesso associato al frontend (Angular in particolare), si integra molto bene con Node.js, dove la gestione di callback, Promise e stream è all'ordine del giorno. In questa guida vedremo come usare RxJS in un contesto Node.js, dai concetti base fino a esempi concreti.
Perché usare RxJS in Node.js
- Composizione potente: concatenare trasformazioni, filtri, merge e combinazioni di stream.
- Gestione degli errori avanzata: recupero, retry, fallback.
- Cancellazione semplice: unsubscribe per interrompere operazioni in corso.
- Uniformità: un modello unico per Promise, callback, EventEmitter, stream, timer.
- Testabilità: flussi deterministici, schedulers virtuali, test del tempo.
Installazione e setup in Node.js
Aggiungi RxJS al tuo progetto Node.js con npm o yarn:
npm install rxjs
Poi puoi importare le funzioni necessarie nei tuoi file JavaScript o TypeScript.
// commonjs
const { of, from, interval } = require("rxjs");
const { map, filter } = require("rxjs/operators");
// oppure, con module type o TypeScript
import { of, from, interval } from "rxjs";
import { map, filter } from "rxjs/operators";
Concetti fondamentali di RxJS
Observable
Un Observable rappresenta una sequenza di valori (sincroni o asincroni) che possono arrivare nel tempo. Un Observable può:
- emettere valori (next),
- completare (complete),
- oppure terminare con un errore (error).
Esempio di creazione e sottoscrizione:
const { Observable } = require("rxjs");
const obs = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
obs.subscribe({
next: value => console.log("Valore:", value),
error: err => console.error("Errore:", err),
complete: () => console.log("Completato")
});
Observer e Subscription
L'Observer è l'oggetto passato a subscribe.
Una Subscription è il risultato di subscribe e
rappresenta la sottoscrizione al flusso, che può essere annullata.
const subscription = obs.subscribe({
next: v => console.log(v)
});
// Annulla la sottoscrizione (cancellazione)
subscription.unsubscribe();
Operators e pipe
Gli operatori sono funzioni pure che trasformano un Observable in
un altro. La funzione pipe permette di concatenare operatori in modo
leggibile.
const { of } = require("rxjs");
const { map, filter } = require("rxjs/operators");
of(1, 2, 3, 4, 5)
.pipe(
filter(x => x % 2 === 0),
map(x => x * 10)
)
.subscribe(x => console.log(x)); // 20, 40
Creare Observables in Node.js
Da valori statici e array
const { of, from } = require("rxjs");
of("ciao", "mondo").subscribe(console.log);
from([1, 2, 3]).subscribe(console.log);
Da Promise
RxJS converte facilmente una Promise in un Observable usando from.
const { from } = require("rxjs");
function getUser() {
return Promise.resolve({ id: 1, name: "Alice" });
}
from(getUser()).subscribe(user => {
console.log("Utente:", user);
});
Da callback stile Node (errore-primo argomento)
Molte API di Node usano callback del tipo (err, result). RxJS offre
bindNodeCallback per convertirle in Observables.
const fs = require("fs");
const { bindNodeCallback } = require("rxjs");
const readFile$ = bindNodeCallback(fs.readFile);
readFile$("./file.txt", "utf8").subscribe({
next: content => console.log("Contenuto:", content),
error: err => console.error("Errore lettura:", err),
complete: () => console.log("Lettura completata")
});
Da EventEmitter
Gli EventEmitter di Node sono una sorgente naturale di stream.
RxJS fornisce fromEvent.
const EventEmitter = require("events");
const { fromEvent } = require("rxjs");
const emitter = new EventEmitter();
const messages$ = fromEvent(emitter, "message");
const sub = messages$.subscribe(msg => {
console.log("Messaggio ricevuto:", msg);
});
emitter.emit("message", "ciao");
emitter.emit("message", "come va?");
// Quando non ti serve più:
sub.unsubscribe();
Da timer e intervalli
Per creare flussi temporali puoi usare interval e timer.
const { interval, timer } = require("rxjs");
const { take } = require("rxjs/operators");
// Emissione ogni secondo, ma solo 5 valori
interval(1000)
.pipe(take(5))
.subscribe(value => console.log("Tick:", value));
// Un singolo evento dopo 3 secondi
timer(3000).subscribe(() => console.log("Sono passati 3 secondi"));
Operatori utili in contesto Node.js
map, filter, tap
Questi operatori servono a trasformare e osservare i valori senza alterare il flusso in modo distruttivo.
const { of } = require("rxjs");
const { map, filter, tap } = require("rxjs/operators");
of(10, 15, 20)
.pipe(
tap(x => console.log("Prima del filtro:", x)),
filter(x => x > 10),
map(x => x / 5),
tap(x => console.log("Dopo la mappatura:", x))
)
.subscribe();
mergeMap, concatMap, switchMap
Questi operatori gestiscono operazioni asincrone annidate (ad esempio chiamate HTTP multiple) appiattendo Observables di Observables.
- mergeMap: esegue in parallelo, merge dei risultati.
- concatMap: esegue in sequenza, un'operazione alla volta.
- switchMap: cancella le richieste precedenti quando arriva una nuova emissione.
const { of, from } = require("rxjs");
const { mergeMap } = require("rxjs/operators");
const fetch = require("node-fetch");
function fetchUser(id) {
return from(
fetch(`https://jsonplaceholder.typicode.com/users/${id}`).then(res => res.json())
);
}
of(1, 2, 3)
.pipe(
mergeMap(id => fetchUser(id))
)
.subscribe(user => {
console.log("Utente:", user.name);
});
catchError e retry
In Node.js gli errori di rete, I/O o di validazione sono frequenti. RxJS fornisce strumenti per gestirli elegantemente.
const { of } = require("rxjs");
const { mergeMap, catchError, retry } = require("rxjs/operators");
function unstableOperation() {
return new Promise((resolve, reject) => {
const ok = Math.random() > 0.5;
if (ok) resolve("successo");
else reject(new Error("fallimento"));
});
}
from([1])
.pipe(
mergeMap(() => unstableOperation()),
retry(3),
catchError(err => {
console.error("Errore dopo i retry:", err.message);
return of("valore di fallback");
})
)
.subscribe(result => console.log("Risultato finale:", result));
Esempio pratico: elaborazione di file con RxJS
Supponiamo di voler leggere tutti i file in una directory, filtrare solo quelli
.json, leggerli, parsare il contenuto e combinare i risultati.
const fs = require("fs");
const path = require("path");
const { bindNodeCallback, from } = require("rxjs");
const { mergeMap, filter, map, toArray } = require("rxjs/operators");
const readdir$ = bindNodeCallback(fs.readdir);
const readFile$ = bindNodeCallback(fs.readFile);
function readJsonFiles(dirPath) {
return readdir$(dirPath).pipe(
mergeMap(files => from(files)),
filter(file => file.endsWith(".json")),
mergeMap(file =>
readFile$(path.join(dirPath, file), "utf8").pipe(
map(content => JSON.parse(content))
)
),
toArray()
);
}
readJsonFiles("./data").subscribe({
next: jsonArray => console.log("Dati JSON:", jsonArray),
error: err => console.error("Errore:", err),
complete: () => console.log("Completato")
});
Esempio pratico: flusso di richieste HTTP in parallelo
In un servizio Node potresti dover chiamare molte API esterne. Con RxJS puoi controllare concorrenza e errori in modo elegante.
const { from } = require("rxjs");
const { mergeMap, map, toArray, catchError } = require("rxjs/operators");
const fetch = require("node-fetch");
function fetchPost(id) {
return from(
fetch(`https://jsonplaceholder.typicode.com/posts/${id}`).then(res => res.json())
);
}
from([1, 2, 3, 4, 5])
.pipe(
mergeMap(
id => fetchPost(id).pipe(
catchError(() => from([{ id, error: true }]))
),
2
),
toArray(),
map(results => results.filter(r => !r.error))
)
.subscribe({
next: posts => console.log("Post recuperati:", posts.length),
error: err => console.error("Errore globale:", err),
complete: () => console.log("Completato")
});
Cancellazione e gestione delle risorse
Un vantaggio fondamentale delle Subscription è la possibilità di interrompere operazioni in corso e liberare risorse.
const { interval } = require("rxjs");
const sub = interval(1000).subscribe(value => {
console.log("Tick:", value);
});
setTimeout(() => {
console.log("Stop");
sub.unsubscribe();
}, 5000);
Schedulers e performance
Gli schedulers in RxJS controllano su quale "contesto" vengono eseguite le operazioni (sincrono, asincrono, coda di microtask, ecc.). In Node.js questo può aiutare a:
- evitare di bloccare il thread principale con operazioni sincrone pesanti;
- simulare il passare del tempo nei test;
- ottimizzare sequenze di operazioni.
Per molti casi d'uso di base puoi ignorare esplicitamente gli schedulers e lasciare che RxJS usi le impostazioni predefinite, ma sapere che esistono ti aiuta a scalare in casi complessi.
Best practice con RxJS in Node.js
- Inizia semplice: non convertire tutto in Observable. Usa RxJS dove offre un vantaggio reale.
- Componi, non annidare: usa
pipee operatori invece di callback annidate. - Gestisci sempre gli errori:
catchError, fallback e logging centrale. - Attento alle Subscription inattive: fai
unsubscribequando il flusso non serve più. - Separa la logica di flusso: incapsula la costruzione degli Observables in moduli riutilizzabili.
Conclusioni
RxJS porta in Node.js un modello potente e unificato per gestire eventi, stream, I/O e concurrency. All'inizio può sembrare complesso, ma una volta compresi i concetti di Observable, operatori e Subscription, diventa uno strumento estremamente flessibile per costruire servizi affidabili e scalabili.
Puoi iniziare introducendo RxJS in una sola parte della tua applicazione, ad esempio la pipeline di elaborazione di file o la logica di chiamate HTTP parallele, e poi estenderlo gradualmente laddove porta reali benefici.