RxJS: programmazione reattiva in JavaScript

RxJS (Reactive Extensions for JavaScript) è una libreria pensata per lavorare con flussi di dati asincroni e basati su eventi. Si appoggia al modello di programmazione reattiva e introduce un concetto centrale: l'Observable.

In un mondo fatto di chiamate HTTP, eventi dell'interfaccia utente, WebSocket, timer e stream continui di informazioni, la gestione dell'asincronia diventa complessa. RxJS fornisce astrazioni potenti e unificate per gestire questi casi d'uso in modo dichiarativo e composabile.

Concetti fondamentali di RxJS

Observable

Un Observable rappresenta una sorgente di dati che produce valori nel tempo. È simile a una Promise, ma con alcune differenze importanti:

  • può emettere più valori, non solo uno;
  • può mai completare, completare con successo o terminare con un errore;
  • è lazily evaluated: finché nessuno si iscrive, non fa nulla.

Un esempio semplice con TypeScript:


import { Observable } from 'rxjs';

const observable = new Observable<number>(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});
  

Observer

Un Observer è un oggetto che sa come reagire ai valori emessi da un Observable. Implementa fino a tre callback:

  • next(value) per ogni valore emesso;
  • error(err) se si verifica un errore;
  • complete() quando lo stream termina con successo.

const observer = {
  next: (value: number) => console.log('Valore:', value),
  error: (err: unknown) => console.error('Errore:', err),
  complete: () => console.log('Completato')
};
  

Subscription

Per collegare un Observer a un Observable si usa il metodo subscribe, che restituisce una Subscription. Quest'ultima rappresenta l'iscrizione attiva allo stream e può essere annullata con unsubscribe().


const subscription = observable.subscribe(observer);

// Quando non serve più lo stream
subscription.unsubscribe();
  

Operatori

Gli operatori sono funzioni che trasformano, filtrano o combinano Observables. Sono il vero cuore di RxJS. Vengono applicati tramite il metodo pipe.

Esempio con alcuni operatori di trasformazione e filtraggio:


import { of, map, filter } from 'rxjs';

of(1, 2, 3, 4, 5)
  .pipe(
    filter(x => x % 2 === 0),     // tiene solo i pari
    map(x => x * 10)              // moltiplica per 10
  )
  .subscribe(value => console.log(value)); // 20, 40
  

Subject

Un Subject è un tipo particolare di Observable che è anche un Observer. Può quindi ricevere valori (tramite next) e allo stesso tempo farli propagare ai suoi sottoscrittori.


import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(value => console.log('Observer A:', value));
subject.subscribe(value => console.log('Observer B:', value));

subject.next(1);
subject.next(2);

// Output:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2
  

I Subject sono utili quando si vuole condividere lo stesso flusso di dati tra più Observer, trasformando uno stream da cold a hot (vedremo tra poco cosa significa).

Cold e hot Observables

Gli Observables cold creano una nuova sorgente di dati per ogni sottoscrittore. Ogni subscribe riceve il proprio flusso indipendente. Un esempio tipico è un Observable creato con of o con una richiesta HTTP.

Gli Observables hot condividono una sorgente comune tra più sottoscrittori. Gli Observer si connettono a un flusso già in corso, come nel caso degli eventi del DOM o dei WebSocket.

I Subject sono uno strumento chiave per trasformare un Observable cold in uno hot, permettendo il multicasting dei valori.

Operatori essenziali in RxJS

Creazione

  • of: crea un Observable a partire da una lista di valori.
  • from: crea un Observable da Promise, array o iterable.
  • interval: emette numeri interi a intervalli regolari.
  • fromEvent: crea un Observable a partire da eventi del DOM o altri emitter.

import { fromEvent } from 'rxjs';

const button = document.querySelector('button');

if (button) {
  const clicks$ = fromEvent<MouseEvent>(button, 'click');
  clicks$.subscribe(event => console.log('Click', event.clientX, event.clientY));
}
  

Transformazione

  • map: trasforma ogni valore emesso.
  • tap: esegue un effetto collaterale senza modificare i valori.
  • scan: simile a reduce sugli array, ma nel tempo.

import { interval, map } from 'rxjs';

interval(1000)
  .pipe(
    map(i => `Secondi trascorsi: ${i}`)
  )
  .subscribe(text => console.log(text));
  

Filtraggio

  • filter: lascia passare solo i valori che soddisfano una condizione.
  • debounceTime: ignora i valori emessi troppo rapidamente, utile per input utente.
  • distinctUntilChanged: emette il valore solo se è diverso dal precedente.

import { fromEvent, map, debounceTime, distinctUntilChanged } from 'rxjs';

const searchInput = document.querySelector('input[type="search"]');

if (searchInput) {
  const search$ = fromEvent<Event>(searchInput, 'input').pipe(
    map(event => (event.target as HTMLInputElement).value),
    debounceTime(300),
    distinctUntilChanged()
  );

  search$.subscribe(term => {
    console.log('Query di ricerca:', term);
    // qui potresti chiamare un servizio HTTP
  });
}
  

Combinazione

  • merge: unisce più Observables in uno, intercalando i valori.
  • concat: concatena gli Observables uno dopo l'altro.
  • combineLatest: emette una combinazione degli ultimi valori di più sorgenti.

import { combineLatest, map } from 'rxjs';

const prezzo$ = /* Observable<number> */;
const quantita$ = /* Observable<number> */;

const totale$ = combineLatest([prezzo$, quantita$]).pipe(
  map(([prezzo, quantita]) => prezzo * quantita)
);

totale$.subscribe(totale => console.log('Totale:', totale));
  

Operatori di mapping ad alto livello

In RxJS esistono operatori che lavorano con Observables di Observables, ovvero flussi che producono a loro volta altri flussi. I principali sono:

  • mergeMap: appiattisce gli Observables interni in uno stream condiviso.
  • switchMap: cancella il flusso precedente quando ne arriva uno nuovo.
  • concatMap: accoda le richieste, gestendole una alla volta in ordine.

Un esempio classico è la ricerca con chiamate HTTP:


import { fromEvent, map, debounceTime, distinctUntilChanged, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const searchInput = document.querySelector('input[type="search"]');

if (searchInput) {
  const search$ = fromEvent<Event>(searchInput, 'input').pipe(
    map(event => (event.target as HTMLInputElement).value),
    debounceTime(300),
    distinctUntilChanged(),
    switchMap(term =>
      ajax.getJSON(`https://api.example.com/search?q=${encodeURIComponent(term)}`)
    )
  );

  search$.subscribe(result => {
    console.log('Risultati:', result);
  });
}
  

Gestione degli errori

RxJS permette di trattare gli errori come parte del flusso di dati. Invece di bloccare l'esecuzione, gli errori possono essere intercettati e gestiti tramite operatori come catchError, retry, retryWhen.


import { ajax } from 'rxjs/ajax';
import { catchError, retry } from 'rxjs';
import { of } from 'rxjs';

ajax.getJSON('/api/dati')
  .pipe(
    retry(2),
    catchError(err => {
      console.error('Errore nella richiesta:', err);
      return of({ fallback: true, dati: [] });
    })
  )
  .subscribe(response => console.log('Risposta:', response));
  

Schedulazione

Gli Scheduler di RxJS controllano in quale contesto di esecuzione avvengono le emissioni e le sottoscrizioni (sincrono, asincrono, animazione, ecc.). Nella pratica quotidiana spesso non è necessario configurarli manualmente, ma è utile sapere che esistono per avere un controllo fine sulle performance e sull'ordine di esecuzione.

Esempio completo: autosalvataggio di un form

Supponiamo di avere un form che deve essere salvato automaticamente dopo che l'utente smette di scrivere per almeno 1 secondo. Inoltre vogliamo evitare salvataggi ridondanti se il contenuto non è cambiato.


import { fromEvent } from 'rxjs';
import { map, debounceTime, distinctUntilChanged, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';

const textarea = document.querySelector('textarea');

function saveDraft(content: string) {
  return ajax.post('/api/draft', { content });
}

if (textarea) {
  const draft$ = fromEvent<Event>(textarea, 'input').pipe(
    map(event => (event.target as HTMLTextAreaElement).value),
    debounceTime(1000),
    distinctUntilChanged(),
    switchMap(content => saveDraft(content))
  );

  draft$.subscribe({
    next: () => console.log('Bozza salvata'),
    error: err => console.error('Errore nel salvataggio:', err)
  });
}
  

Best practice con RxJS

  • Tipizza sempre i tuoi Observables per sfruttare al massimo TypeScript.
  • Usa una naming convention chiara, ad esempio il suffisso $ per gli Observables (user$, clicks$).
  • Gestisci le Subscription per evitare memory leak, specialmente nei componenti frontend.
  • Preferisci gli operatori dichiarativi a logica imperativa annidata.
  • Tratta gli errori come dati, usando gli operatori di gestione degli errori invece di blocchi try/catch annidati.
  • In framework come Angular, sfrutta l'async pipe e i pattern reattivi per ridurre il codice di gestione manuale delle Subscription.

Conclusione

RxJS è una libreria potente che richiede un certo cambio di mentalità: dalla programmazione imperativa tradizionale a una visione dichiarativa e basata su flussi di eventi. Una volta acquisiti i concetti fondamentali di Observable, operatori e Subject, diventa uno strumento estremamente efficace per gestire l'asincronia in applicazioni complesse.

Per imparare RxJS in profondità il suggerimento è di partire da piccoli esercizi pratici (timer, input utente, richieste HTTP) e poi introdurre gradualmente pattern più avanzati come l'uso combinato di switchMap, mergeMap e Subject per orchestrare logiche complesse.

Torna su