RxJS è una libreria per la programmazione reattiva basata su Observable, perfetta per modellare azioni asincrone come l'upload di file. In questo articolo vedremo come usare RxJS per costruire una pipeline di caricamento robusta, estendibile e facile da testare, partendo da esempi semplici fino a scenari con progresso, annullamento e gestione degli errori.
Perché usare RxJS per l'upload di file
L'upload di file è un problema apparentemente semplice, ma che diventa rapidamente complesso quando aggiungiamo:
- barra di avanzamento e feedback in tempo reale;
- gestione degli errori con tentativi di ritentare;
- possibilità di annullare il caricamento;
- caricamento multiplo e concorrenza controllata;
- integrazione con altri eventi dell'interfaccia (click, cambi di stato, ecc.).
Gli Observable di RxJS sono perfetti per esprimere questi flussi come composizione di operatori, invece di avere una giungla di callback e listener difficili da mantenere.
Setup di base: catturare l'evento di selezione file
Immaginiamo di avere un input file e un pulsante "Carica":
<input type="file" id="fileInput" multiple>
<button id="uploadBtn">Carica</button>
<div id="status"></div>
Con RxJS possiamo trasformare gli eventi DOM in Observable usando fromEvent:
import { fromEvent } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const fileInput = document.getElementById('fileInput');
const uploadBtn = document.getElementById('uploadBtn');
const statusEl = document.getElementById('status');
// Stream dei file selezionati
const files$ = fromEvent(fileInput, 'change').pipe(
map(event => /** @type {HTMLInputElement} */ (event.target).files),
filter(files => !!files && files.length > 0)
);
// Stream dei click sul pulsante upload
const uploadClick$ = fromEvent(uploadBtn, 'click');
Abbiamo ora due flussi distinti:
files$: emette unaFileListogni volta che l'utente seleziona dei file;uploadClick$: emette un valore ogni volta che il pulsante "Carica" viene premuto.
Creare un Observable per l'upload tramite XMLHttpRequest
Per tracciare il progresso dell'upload abbiamo bisogno degli eventi nativi di XMLHttpRequest,
in particolare quelli di xhr.upload. Incapsuliamo tutto in un Observable custom:
import { Observable } from 'rxjs';
/**
* Esegue l'upload di un singolo file e restituisce un Observable
* che emette oggetti di stato: { type, progress, response, error }.
*/
function uploadFile$(url, file) {
return new Observable(subscriber => {
const xhr = new XMLHttpRequest();
const formData = new FormData();
formData.append('file', file);
// Eventi di progresso
xhr.upload.addEventListener('progress', event => {
if (event.lengthComputable) {
const percent = Math.round((event.loaded / event.total) * 100);
subscriber.next({
type: 'progress',
progress: percent
});
}
});
// Completamento
xhr.addEventListener('load', () => {
if (xhr.status >= 200 && xhr.status < 300) {
subscriber.next({
type: 'success',
response: xhr.response
});
subscriber.complete();
} else {
subscriber.error({
type: 'error',
status: xhr.status,
message: xhr.statusText || 'Errore di upload'
});
}
});
// Errori di rete
xhr.addEventListener('error', () => {
subscriber.error({
type: 'error',
status: xhr.status,
message: 'Errore di rete'
});
});
// Annullamento
xhr.addEventListener('abort', () => {
subscriber.next({ type: 'cancelled' });
subscriber.complete();
});
xhr.open('POST', url);
xhr.send(formData);
// Funzione di cleanup chiamata su unsubscribe
return () => {
if (xhr.readyState !== XMLHttpRequest.DONE) {
xhr.abort();
}
};
});
}
Questo Observable è molto potente:
- emette eventi di progresso durante il caricamento;
- emette un evento di successo alla fine;
- propaga gli errori tramite
subscriber.error; - supporta l'annullamento con
unsubscribe.
Collegare selezione file, click e upload
Ora vogliamo che al click sul pulsante parta l'upload dei file selezionati. Usiamo gli operatori RxJS per combinare i flussi:
import { withLatestFrom, mergeMap, tap } from 'rxjs/operators';
const uploadUrl = '/api/upload';
// Ogni click prende l'ultima FileList disponibile
const upload$ = uploadClick$.pipe(
withLatestFrom(files$),
map(([, files]) => Array.from(files)), // FileList -> File[]
tap(() => {
statusEl.textContent = 'Inizio upload...';
}),
// Carica i file in sequenza (concorrenza 1)
mergeMap(
files => files,
(files, file) => file
),
mergeMap(file => {
statusEl.textContent = `Carico: ${file.name}`;
return uploadFile$(uploadUrl, file).pipe(
tap(event => {
if (event.type === 'progress') {
statusEl.textContent = `Carico ${file.name}: ${event.progress}%`;
}
})
);
})
);
upload$.subscribe({
next: event => {
if (event.type === 'success') {
statusEl.textContent = 'Upload completato con successo';
}
},
error: err => {
statusEl.textContent = `Errore: ${err.message || 'Upload fallito'}`;
},
complete: () => {
statusEl.textContent = 'Tutti i file sono stati caricati';
}
});
In questo esempio i file vengono caricati in sequenza. Possiamo facilmente cambiare strategia di concorrenza regolando gli operatori.
Caricamento multiplo con concorrenza controllata
Se i file sono molti, conviene caricarne alcuni in parallelo, ma non tutti, per non saturare la banda
o il server. mergeMap accetta un parametro di concorrenza:
import { from } from 'rxjs';
import { mergeMap } from 'rxjs/operators';
const MAX_CONCURRENCY = 3;
const parallelUpload$ = uploadClick$.pipe(
withLatestFrom(files$),
map(([, files]) => Array.from(files)),
mergeMap(files => from(files)),
mergeMap(file => uploadFile$(uploadUrl, file), MAX_CONCURRENCY)
);
Con MAX_CONCURRENCY = 3, al massimo tre upload saranno attivi contemporaneamente. È un
esempio classico di come RxJS permetta di esprimere policy complesse con poche righe.
Gestione degli errori e retry
RxJS offre operatori avanzati per la gestione degli errori, come retry e
retryWhen. Possiamo ad esempio ritentare l'upload di un file alcune volte, con
attesa crescente tra un tentativo e l'altro.
import { retryWhen, scan, delay } from 'rxjs/operators';
function uploadFileWithRetry$(url, file) {
const maxRetries = 3;
const baseDelay = 1000; // ms
return uploadFile$(url, file).pipe(
retryWhen(errors$ =>
errors$.pipe(
scan((acc, error) => {
if (acc.attempts >= maxRetries) {
throw error;
}
return { attempts: acc.attempts + 1 };
}, { attempts: 0 }),
delay(acc => acc.attempts * baseDelay)
)
)
);
}
Possiamo sostituire uploadFile$ con uploadFileWithRetry$ nella nostra pipeline
per ottenere automaticamente un comportamento più resiliente, senza cambiare la logica di orchestrazione.
Annullare un upload in corso
Poiché il nostro Observable di upload supporta unsubscribe, annullare è semplice: basta
conservare la Subscription e chiamare unsubscribe quando l'utente preme un pulsante
"Annulla".
<button id="cancelBtn">Annulla upload</button>
const cancelBtn = document.getElementById('cancelBtn');
let currentUploadSub = null;
const cancellableUpload$ = uploadClick$.pipe(
withLatestFrom(files$),
map(([, files]) => Array.from(files)),
mergeMap(file => uploadFile$(uploadUrl, file))
);
currentUploadSub = cancellableUpload$.subscribe({
next: event => {
// gestisci progresso / successo
},
error: err => {
statusEl.textContent = `Errore: ${err.message}`;
},
complete: () => {
statusEl.textContent = 'Upload completato o annullato';
}
});
fromEvent(cancelBtn, 'click').subscribe(() => {
if (currentUploadSub) {
currentUploadSub.unsubscribe();
statusEl.textContent = 'Upload annullato dall'utente';
}
});
In un'applicazione reale potremmo modellare meglio il ciclo di vita della subscription (ad esempio creando un nuovo stream per ogni click), ma il concetto chiave è che l'Observable incapsula anche la logica di abort dell'XHR.
Combinare stato dell'interfaccia e upload
Una delle forze di RxJS è la possibilità di combinare facilmente upload, input dell'utente e stato dell'interfaccia. Ad esempio possiamo disabilitare il pulsante di upload quando non ci sono file selezionati, o quando è già in corso un caricamento.
import { combineLatest, BehaviorSubject } from 'rxjs';
import { map, startWith } from 'rxjs/operators';
const hasFiles$ = files$.pipe(
map(files => files.length > 0),
startWith(false)
);
const isUploading$ = new BehaviorSubject(false);
combineLatest([hasFiles$, isUploading$]).pipe(
map(([hasFiles, isUploading]) => hasFiles && !isUploading)
).subscribe(canUpload => {
uploadBtn.disabled = !canUpload;
});
// Aggiorna lo stato durante l'upload
uploadClick$.pipe(
withLatestFrom(files$),
map(([, files]) => Array.from(files)),
mergeMap(file => {
isUploading$.next(true);
return uploadFile$(uploadUrl, file);
})
).subscribe({
next: () => {},
error: () => {
isUploading$.next(false);
},
complete: () => {
isUploading$.next(false);
}
});
In questo modo la UI rimane coerente con lo stato del sistema senza dover gestire manualmente mille flag.
Consigli di progettazione
-
Incapsula la logica di upload in funzioni come
uploadFile$, invece di spargereXMLHttpRequestnel codice dell'interfaccia. -
Usa tipi di evento espliciti (es.
{ type: 'progress' | 'success' | 'error' }) così da poterli combinare e filtrare facilmente. - Separa orchestrazione e rendering: la pipeline RxJS dovrebbe solo emettere stati, e un layer superiore dovrebbe occuparsi di trasformarli in aggiornamenti del DOM.
-
Gestisci il backpressure: se il backend ha limiti stretti, usa la concorrenza di
mergeMapo altri operatori per limitare il numero di upload paralleli. - Testa le pipeline usando i test di marble di RxJS, così puoi verificare facilmente comportamenti complessi (retry, annullamento, ecc.).
Conclusioni
Usare RxJS per l'upload di file in JavaScript permette di modellare l'intero flusso come una combinazione di Observable e operatori: selezione dei file, click del pulsante, avanzamento, errori, ritentativi, annullamento, stato dell'interfaccia. Il risultato è un codice più dichiarativo, modulare e manutenibile.
A partire dagli esempi visti in questo articolo puoi adattare la pipeline alle tue esigenze: supporto al drag&drop, caricamento chunked, autenticazione, integrazione con framework come React o Angular, e molto altro, mantenendo sempre la stessa filosofia reattiva.