RxJS (Reactive Extensions for JavaScript) è una libreria pensata per lavorare con flussi di dati asincroni e basati su eventi. Si appoggia al modello di programmazione reattiva e introduce un concetto centrale: l'Observable.
In un mondo fatto di chiamate HTTP, eventi dell'interfaccia utente, WebSocket, timer e stream continui di informazioni, la gestione dell'asincronia diventa complessa. RxJS fornisce astrazioni potenti e unificate per gestire questi casi d'uso in modo dichiarativo e composabile.
Concetti fondamentali di RxJS
Observable
Un Observable rappresenta una sorgente di dati che produce valori nel tempo. È simile a una Promise, ma con alcune differenze importanti:
- può emettere più valori, non solo uno;
- può mai completare, completare con successo o terminare con un errore;
- è lazily evaluated: finché nessuno si iscrive, non fa nulla.
Un esempio semplice con TypeScript:
import { Observable } from 'rxjs';
const observable = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});
Observer
Un Observer è un oggetto che sa come reagire ai valori emessi da un Observable. Implementa fino a tre callback:
next(value)per ogni valore emesso;error(err)se si verifica un errore;complete()quando lo stream termina con successo.
const observer = {
next: (value: number) => console.log('Valore:', value),
error: (err: unknown) => console.error('Errore:', err),
complete: () => console.log('Completato')
};
Subscription
Per collegare un Observer a un Observable si usa il metodo subscribe, che restituisce
una Subscription. Quest'ultima rappresenta l'iscrizione attiva allo stream e
può essere annullata con unsubscribe().
const subscription = observable.subscribe(observer);
// Quando non serve più lo stream
subscription.unsubscribe();
Operatori
Gli operatori sono funzioni che trasformano, filtrano o combinano Observables.
Sono il vero cuore di RxJS. Vengono applicati tramite il metodo pipe.
Esempio con alcuni operatori di trasformazione e filtraggio:
import { of, map, filter } from 'rxjs';
of(1, 2, 3, 4, 5)
.pipe(
filter(x => x % 2 === 0), // tiene solo i pari
map(x => x * 10) // moltiplica per 10
)
.subscribe(value => console.log(value)); // 20, 40
Subject
Un Subject è un tipo particolare di Observable che è anche un Observer.
Può quindi ricevere valori (tramite next) e allo stesso tempo farli
propagare ai suoi sottoscrittori.
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe(value => console.log('Observer A:', value));
subject.subscribe(value => console.log('Observer B:', value));
subject.next(1);
subject.next(2);
// Output:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2
I Subject sono utili quando si vuole condividere lo stesso flusso di dati tra più Observer, trasformando uno stream da cold a hot (vedremo tra poco cosa significa).
Cold e hot Observables
Gli Observables cold creano una nuova sorgente di dati per ogni sottoscrittore.
Ogni subscribe riceve il proprio flusso indipendente. Un esempio tipico è un Observable
creato con of o con una richiesta HTTP.
Gli Observables hot condividono una sorgente comune tra più sottoscrittori. Gli Observer si connettono a un flusso già in corso, come nel caso degli eventi del DOM o dei WebSocket.
I Subject sono uno strumento chiave per trasformare un Observable cold in uno hot, permettendo il multicasting dei valori.
Operatori essenziali in RxJS
Creazione
of: crea un Observable a partire da una lista di valori.from: crea un Observable da Promise, array o iterable.interval: emette numeri interi a intervalli regolari.fromEvent: crea un Observable a partire da eventi del DOM o altri emitter.
import { fromEvent } from 'rxjs';
const button = document.querySelector('button');
if (button) {
const clicks$ = fromEvent<MouseEvent>(button, 'click');
clicks$.subscribe(event => console.log('Click', event.clientX, event.clientY));
}
Transformazione
map: trasforma ogni valore emesso.tap: esegue un effetto collaterale senza modificare i valori.scan: simile areducesugli array, ma nel tempo.
import { interval, map } from 'rxjs';
interval(1000)
.pipe(
map(i => `Secondi trascorsi: ${i}`)
)
.subscribe(text => console.log(text));
Filtraggio
filter: lascia passare solo i valori che soddisfano una condizione.debounceTime: ignora i valori emessi troppo rapidamente, utile per input utente.distinctUntilChanged: emette il valore solo se è diverso dal precedente.
import { fromEvent, map, debounceTime, distinctUntilChanged } from 'rxjs';
const searchInput = document.querySelector('input[type="search"]');
if (searchInput) {
const search$ = fromEvent<Event>(searchInput, 'input').pipe(
map(event => (event.target as HTMLInputElement).value),
debounceTime(300),
distinctUntilChanged()
);
search$.subscribe(term => {
console.log('Query di ricerca:', term);
// qui potresti chiamare un servizio HTTP
});
}
Combinazione
merge: unisce più Observables in uno, intercalando i valori.concat: concatena gli Observables uno dopo l'altro.combineLatest: emette una combinazione degli ultimi valori di più sorgenti.
import { combineLatest, map } from 'rxjs';
const prezzo$ = /* Observable<number> */;
const quantita$ = /* Observable<number> */;
const totale$ = combineLatest([prezzo$, quantita$]).pipe(
map(([prezzo, quantita]) => prezzo * quantita)
);
totale$.subscribe(totale => console.log('Totale:', totale));
Operatori di mapping ad alto livello
In RxJS esistono operatori che lavorano con Observables di Observables, ovvero flussi che producono a loro volta altri flussi. I principali sono:
mergeMap: appiattisce gli Observables interni in uno stream condiviso.switchMap: cancella il flusso precedente quando ne arriva uno nuovo.concatMap: accoda le richieste, gestendole una alla volta in ordine.
Un esempio classico è la ricerca con chiamate HTTP:
import { fromEvent, map, debounceTime, distinctUntilChanged, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
const searchInput = document.querySelector('input[type="search"]');
if (searchInput) {
const search$ = fromEvent<Event>(searchInput, 'input').pipe(
map(event => (event.target as HTMLInputElement).value),
debounceTime(300),
distinctUntilChanged(),
switchMap(term =>
ajax.getJSON(`https://api.example.com/search?q=${encodeURIComponent(term)}`)
)
);
search$.subscribe(result => {
console.log('Risultati:', result);
});
}
Gestione degli errori
RxJS permette di trattare gli errori come parte del flusso di dati. Invece di bloccare
l'esecuzione, gli errori possono essere intercettati e gestiti tramite operatori
come catchError, retry, retryWhen.
import { ajax } from 'rxjs/ajax';
import { catchError, retry } from 'rxjs';
import { of } from 'rxjs';
ajax.getJSON('/api/dati')
.pipe(
retry(2),
catchError(err => {
console.error('Errore nella richiesta:', err);
return of({ fallback: true, dati: [] });
})
)
.subscribe(response => console.log('Risposta:', response));
Schedulazione
Gli Scheduler di RxJS controllano in quale contesto di esecuzione avvengono le emissioni e le sottoscrizioni (sincrono, asincrono, animazione, ecc.). Nella pratica quotidiana spesso non è necessario configurarli manualmente, ma è utile sapere che esistono per avere un controllo fine sulle performance e sull'ordine di esecuzione.
Esempio completo: autosalvataggio di un form
Supponiamo di avere un form che deve essere salvato automaticamente dopo che l'utente smette di scrivere per almeno 1 secondo. Inoltre vogliamo evitare salvataggi ridondanti se il contenuto non è cambiato.
import { fromEvent } from 'rxjs';
import { map, debounceTime, distinctUntilChanged, switchMap } from 'rxjs';
import { ajax } from 'rxjs/ajax';
const textarea = document.querySelector('textarea');
function saveDraft(content: string) {
return ajax.post('/api/draft', { content });
}
if (textarea) {
const draft$ = fromEvent<Event>(textarea, 'input').pipe(
map(event => (event.target as HTMLTextAreaElement).value),
debounceTime(1000),
distinctUntilChanged(),
switchMap(content => saveDraft(content))
);
draft$.subscribe({
next: () => console.log('Bozza salvata'),
error: err => console.error('Errore nel salvataggio:', err)
});
}
Best practice con RxJS
- Tipizza sempre i tuoi Observables per sfruttare al massimo TypeScript.
- Usa una naming convention chiara, ad esempio il suffisso
$per gli Observables (user$,clicks$). - Gestisci le Subscription per evitare memory leak, specialmente nei componenti frontend.
- Preferisci gli operatori dichiarativi a logica imperativa annidata.
- Tratta gli errori come dati, usando gli operatori di gestione degli errori invece di blocchi try/catch annidati.
- In framework come Angular, sfrutta l'async pipe e i pattern reattivi per ridurre il codice di gestione manuale delle Subscription.
Conclusione
RxJS è una libreria potente che richiede un certo cambio di mentalità: dalla programmazione imperativa tradizionale a una visione dichiarativa e basata su flussi di eventi. Una volta acquisiti i concetti fondamentali di Observable, operatori e Subject, diventa uno strumento estremamente efficace per gestire l'asincronia in applicazioni complesse.
Per imparare RxJS in profondità il suggerimento è di partire da piccoli esercizi
pratici (timer, input utente, richieste HTTP) e poi introdurre gradualmente pattern
più avanzati come l'uso combinato di switchMap, mergeMap e
Subject per orchestrare logiche complesse.