RxJS in Node.js

RxJS (Reactive Extensions for JavaScript) è una libreria basata sul pattern Observer e sul paradigma di programmazione reattiva. In pratica, ti permette di lavorare con flussi di dati asincroni e concorrenti (eventi, richieste HTTP, stream di file, WebSocket, timer, ecc.) in modo dichiarativo e composibile.

Anche se RxJS è spesso associato al frontend (Angular in particolare), si integra molto bene con Node.js, dove la gestione di callback, Promise e stream è all'ordine del giorno. In questa guida vedremo come usare RxJS in un contesto Node.js, dai concetti base fino a esempi concreti.

Perché usare RxJS in Node.js

  • Composizione potente: concatenare trasformazioni, filtri, merge e combinazioni di stream.
  • Gestione degli errori avanzata: recupero, retry, fallback.
  • Cancellazione semplice: unsubscribe per interrompere operazioni in corso.
  • Uniformità: un modello unico per Promise, callback, EventEmitter, stream, timer.
  • Testabilità: flussi deterministici, schedulers virtuali, test del tempo.

Installazione e setup in Node.js

Aggiungi RxJS al tuo progetto Node.js con npm o yarn:

npm install rxjs

Poi puoi importare le funzioni necessarie nei tuoi file JavaScript o TypeScript.

// commonjs
const { of, from, interval } = require("rxjs");
const { map, filter } = require("rxjs/operators");

// oppure, con module type o TypeScript
import { of, from, interval } from "rxjs";
import { map, filter } from "rxjs/operators";

Concetti fondamentali di RxJS

Observable

Un Observable rappresenta una sequenza di valori (sincroni o asincroni) che possono arrivare nel tempo. Un Observable può:

  • emettere valori (next),
  • completare (complete),
  • oppure terminare con un errore (error).

Esempio di creazione e sottoscrizione:

const { Observable } = require("rxjs");

const obs = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

obs.subscribe({
  next: value => console.log("Valore:", value),
  error: err => console.error("Errore:", err),
  complete: () => console.log("Completato")
});

Observer e Subscription

L'Observer è l'oggetto passato a subscribe. Una Subscription è il risultato di subscribe e rappresenta la sottoscrizione al flusso, che può essere annullata.

const subscription = obs.subscribe({
  next: v => console.log(v)
});

// Annulla la sottoscrizione (cancellazione)
subscription.unsubscribe();

Operators e pipe

Gli operatori sono funzioni pure che trasformano un Observable in un altro. La funzione pipe permette di concatenare operatori in modo leggibile.

const { of } = require("rxjs");
const { map, filter } = require("rxjs/operators");

of(1, 2, 3, 4, 5)
  .pipe(
    filter(x => x % 2 === 0),
    map(x => x * 10)
  )
  .subscribe(x => console.log(x)); // 20, 40

Creare Observables in Node.js

Da valori statici e array

const { of, from } = require("rxjs");

of("ciao", "mondo").subscribe(console.log);

from([1, 2, 3]).subscribe(console.log);

Da Promise

RxJS converte facilmente una Promise in un Observable usando from.

const { from } = require("rxjs");

function getUser() {
  return Promise.resolve({ id: 1, name: "Alice" });
}

from(getUser()).subscribe(user => {
  console.log("Utente:", user);
});

Da callback stile Node (errore-primo argomento)

Molte API di Node usano callback del tipo (err, result). RxJS offre bindNodeCallback per convertirle in Observables.

const fs = require("fs");
const { bindNodeCallback } = require("rxjs");

const readFile$ = bindNodeCallback(fs.readFile);

readFile$("./file.txt", "utf8").subscribe({
  next: content => console.log("Contenuto:", content),
  error: err => console.error("Errore lettura:", err),
  complete: () => console.log("Lettura completata")
});

Da EventEmitter

Gli EventEmitter di Node sono una sorgente naturale di stream. RxJS fornisce fromEvent.

const EventEmitter = require("events");
const { fromEvent } = require("rxjs");

const emitter = new EventEmitter();

const messages$ = fromEvent(emitter, "message");

const sub = messages$.subscribe(msg => {
  console.log("Messaggio ricevuto:", msg);
});

emitter.emit("message", "ciao");
emitter.emit("message", "come va?");

// Quando non ti serve più:
sub.unsubscribe();

Da timer e intervalli

Per creare flussi temporali puoi usare interval e timer.

const { interval, timer } = require("rxjs");
const { take } = require("rxjs/operators");

// Emissione ogni secondo, ma solo 5 valori
interval(1000)
  .pipe(take(5))
  .subscribe(value => console.log("Tick:", value));

// Un singolo evento dopo 3 secondi
timer(3000).subscribe(() => console.log("Sono passati 3 secondi"));

Operatori utili in contesto Node.js

map, filter, tap

Questi operatori servono a trasformare e osservare i valori senza alterare il flusso in modo distruttivo.

const { of } = require("rxjs");
const { map, filter, tap } = require("rxjs/operators");

of(10, 15, 20)
  .pipe(
    tap(x => console.log("Prima del filtro:", x)),
    filter(x => x > 10),
    map(x => x / 5),
    tap(x => console.log("Dopo la mappatura:", x))
  )
  .subscribe();

mergeMap, concatMap, switchMap

Questi operatori gestiscono operazioni asincrone annidate (ad esempio chiamate HTTP multiple) appiattendo Observables di Observables.

  • mergeMap: esegue in parallelo, merge dei risultati.
  • concatMap: esegue in sequenza, un'operazione alla volta.
  • switchMap: cancella le richieste precedenti quando arriva una nuova emissione.
const { of, from } = require("rxjs");
const { mergeMap } = require("rxjs/operators");
const fetch = require("node-fetch");

function fetchUser(id) {
  return from(
    fetch(`https://jsonplaceholder.typicode.com/users/${id}`).then(res => res.json())
  );
}

of(1, 2, 3)
  .pipe(
    mergeMap(id => fetchUser(id))
  )
  .subscribe(user => {
    console.log("Utente:", user.name);
  });

catchError e retry

In Node.js gli errori di rete, I/O o di validazione sono frequenti. RxJS fornisce strumenti per gestirli elegantemente.

const { of } = require("rxjs");
const { mergeMap, catchError, retry } = require("rxjs/operators");

function unstableOperation() {
  return new Promise((resolve, reject) => {
    const ok = Math.random() > 0.5;
    if (ok) resolve("successo");
    else reject(new Error("fallimento"));
  });
}

from([1])
  .pipe(
    mergeMap(() => unstableOperation()),
    retry(3),
    catchError(err => {
      console.error("Errore dopo i retry:", err.message);
      return of("valore di fallback");
    })
  )
  .subscribe(result => console.log("Risultato finale:", result));

Esempio pratico: elaborazione di file con RxJS

Supponiamo di voler leggere tutti i file in una directory, filtrare solo quelli .json, leggerli, parsare il contenuto e combinare i risultati.

const fs = require("fs");
const path = require("path");
const { bindNodeCallback, from } = require("rxjs");
const { mergeMap, filter, map, toArray } = require("rxjs/operators");

const readdir$ = bindNodeCallback(fs.readdir);
const readFile$ = bindNodeCallback(fs.readFile);

function readJsonFiles(dirPath) {
  return readdir$(dirPath).pipe(
    mergeMap(files => from(files)),
    filter(file => file.endsWith(".json")),
    mergeMap(file =>
      readFile$(path.join(dirPath, file), "utf8").pipe(
        map(content => JSON.parse(content))
      )
    ),
    toArray()
  );
}

readJsonFiles("./data").subscribe({
  next: jsonArray => console.log("Dati JSON:", jsonArray),
  error: err => console.error("Errore:", err),
  complete: () => console.log("Completato")
});

Esempio pratico: flusso di richieste HTTP in parallelo

In un servizio Node potresti dover chiamare molte API esterne. Con RxJS puoi controllare concorrenza e errori in modo elegante.

const { from } = require("rxjs");
const { mergeMap, map, toArray, catchError } = require("rxjs/operators");
const fetch = require("node-fetch");

function fetchPost(id) {
  return from(
    fetch(`https://jsonplaceholder.typicode.com/posts/${id}`).then(res => res.json())
  );
}

from([1, 2, 3, 4, 5])
  .pipe(
    mergeMap(
      id => fetchPost(id).pipe(
        catchError(() => from([{ id, error: true }]))
      ),
      2
    ),
    toArray(),
    map(results => results.filter(r => !r.error))
  )
  .subscribe({
    next: posts => console.log("Post recuperati:", posts.length),
    error: err => console.error("Errore globale:", err),
    complete: () => console.log("Completato")
  });

Cancellazione e gestione delle risorse

Un vantaggio fondamentale delle Subscription è la possibilità di interrompere operazioni in corso e liberare risorse.

const { interval } = require("rxjs");

const sub = interval(1000).subscribe(value => {
  console.log("Tick:", value);
});

setTimeout(() => {
  console.log("Stop");
  sub.unsubscribe();
}, 5000);

Schedulers e performance

Gli schedulers in RxJS controllano su quale "contesto" vengono eseguite le operazioni (sincrono, asincrono, coda di microtask, ecc.). In Node.js questo può aiutare a:

  • evitare di bloccare il thread principale con operazioni sincrone pesanti;
  • simulare il passare del tempo nei test;
  • ottimizzare sequenze di operazioni.

Per molti casi d'uso di base puoi ignorare esplicitamente gli schedulers e lasciare che RxJS usi le impostazioni predefinite, ma sapere che esistono ti aiuta a scalare in casi complessi.

Best practice con RxJS in Node.js

  1. Inizia semplice: non convertire tutto in Observable. Usa RxJS dove offre un vantaggio reale.
  2. Componi, non annidare: usa pipe e operatori invece di callback annidate.
  3. Gestisci sempre gli errori: catchError, fallback e logging centrale.
  4. Attento alle Subscription inattive: fai unsubscribe quando il flusso non serve più.
  5. Separa la logica di flusso: incapsula la costruzione degli Observables in moduli riutilizzabili.

Conclusioni

RxJS porta in Node.js un modello potente e unificato per gestire eventi, stream, I/O e concurrency. All'inizio può sembrare complesso, ma una volta compresi i concetti di Observable, operatori e Subscription, diventa uno strumento estremamente flessibile per costruire servizi affidabili e scalabili.

Puoi iniziare introducendo RxJS in una sola parte della tua applicazione, ad esempio la pipeline di elaborazione di file o la logica di chiamate HTTP parallele, e poi estenderlo gradualmente laddove porta reali benefici.

Torna su