Scopri come utilizzare flussi leggibili, scrivibili e trasformarli con l'API Streams.
L'API Streams consente di accedere in modo programmatico a flussi di dati ricevuti sulla rete o creati in locale con qualsiasi mezzo ed elaborarli con JavaScript. I flussi di dati richiedono la suddivisione di una risorsa che vuoi ricevere, inviare o trasformare
in piccoli blocchi, quindi elaborarli poco per volta. Sebbene lo streaming sia qualcosa che i browser fanno comunque quando ricevono asset come HTML o video da mostrare sulle pagine web, questa funzionalità non è mai stata disponibile per JavaScript prima dell'introduzione di fetch
con gli stream nel 2015.
In precedenza, se volessi elaborare una risorsa di qualche tipo (un video o un file di testo ecc.), dovevi scaricare l'intero file, attendere che venisse deserializzato in un formato adatto, quindi elaborarlo. Con gli stream disponibili per JavaScript, tutto cambia. Ora puoi elaborare i dati non elaborati con JavaScript progressivamente non appena sono disponibili sul client, senza dover generare buffer, stringa o blob. In questo modo è possibile accedere a una serie di casi d'uso, alcuni dei quali sono elencati di seguito:
- Effetti video:collega uno stream video leggibile attraverso uno stream di trasformazione che applica gli effetti in tempo reale.
- De)compressione dei dati: collega uno stream di file attraverso un flusso di trasformazione che lo (de)comprime in modo selettivo.
- Decodifica delle immagini:collega uno stream di risposta HTTP attraverso un flusso di trasformazione che decodifica i byte in dati bitmap, quindi attraverso un altro flusso di trasformazione che converte i bitmap in PNG. Se
installata all'interno del gestore
fetch
di un service worker, questa opzione consente di eseguire il polyfill trasparente dei nuovi formati di immagine come AVIF.
Supporto del browser
ReadableStream e WritableStream
TransformStream
Concetti principali
Prima di entrare nel dettaglio sui vari tipi di stream, vorrei introdurre alcuni concetti fondamentali.
Bocconcini
Un blocco è un singolo dato che viene scritto o letto da uno stream. Può essere di qualsiasi tipo;
i flussi possono anche contenere blocchi di diversi tipi. Il più delle volte, un blocco non è l'unità di dati più atomica
per un determinato flusso. Ad esempio, un flusso di byte potrebbe contenere blocchi da 16
unità Uint8Array
KiB, anziché singoli byte.
Flussi leggibili
Uno stream leggibile rappresenta una fonte di dati da cui puoi leggere. In altre parole, i dati escono da uno stream leggibile. Concretamente, uno stream leggibile è un'istanza della classe ReadableStream
.
Stream scrivibili
Un flusso accessibile in scrittura rappresenta una destinazione per i dati in cui puoi scrivere. In altre parole, i dati entrano in un flusso scrivibile. Concretamente, un flusso scrivibile è un'istanza della
classe WritableStream
.
Trasforma flussi
Un flusso di trasformazione è costituito da una coppia di flussi: uno stream accessibile in scrittura, noto come lato scrivibile, e uno stream leggibile, noto come lato leggibile.
Una metafora nel mondo reale sarebbe l'interprete simultaneo che traduce al volo da una lingua all'altra.
In modo specifico per il flusso di trasformazione, la scrittura
sul lato scrivibile comporta la disponibilità di nuovi dati per la lettura dal
lato leggibile. Concretamente, qualsiasi oggetto con una proprietà writable
e una proprietà readable
può fungere da flusso di trasformazione. Tuttavia, la classe TransformStream
standard semplifica la creazione di una coppia di questo tipo correttamente angolata.
Catene per tubi
Gli stream vengono utilizzati principalmente mediante associazione tra loro. Un flusso leggibile può essere reindirizzato direttamente
a uno stream scrivibile, utilizzando il metodo pipeTo()
dello stream leggibile, oppure può essere trasmesso prima attraverso
uno o più flussi di trasformazione, utilizzando il metodo pipeThrough()
dello stream leggibile. Un insieme di flussi collegati tra loro in questo modo è definito catena di tubature.
Contropressione
Una volta creata, una catena di tubi propaga gli indicatori relativi alla velocità con cui i blocchi devono passare attraverso questa catena. Se un passaggio nella catena non può ancora accettare blocchi, propaga un segnale all'indietro attraverso la catena di tubature, fino a quando alla sorgente originale non viene detto di interrompere la produzione dei blocchi così rapidamente. Questo processo di normalizzazione del flusso è chiamato contropressione.
Teeing
È possibile eseguire il tethering di uno stream leggibile (chiamato in base alla forma di una "T" maiuscola) utilizzando il relativo metodo tee()
.
Questa operazione bloccherà il flusso, ovvero non lo renderà più utilizzabile direttamente. Tuttavia, creerà due nuovi flussi, denominati rami, che possono essere utilizzati in modo indipendente.
Il teeing è importante anche perché gli stream non possono essere riavvolti o riavviati. Scoprirai di più in merito più avanti.
I meccanismi di uno stream leggibile
Un flusso leggibile è un'origine dati rappresentata in JavaScript da un oggetto ReadableStream
che scorre da un'origine sottostante. Il costruttore ReadableStream()
crea e restituisce un oggetto flusso leggibile dai gestori specificati. Esistono due tipi di origine di base:
- Le origini push eseguono costantemente il push dei dati quando li accedi e spetta a te avviare, sospendere o annullare l'accesso al flusso. Alcuni esempi sono video stream in diretta, eventi inviati dal server o WebSocket.
- Le origini pull richiedono di richiedere esplicitamente i dati alle origini una volta connesse. Tra gli esempi sono incluse le operazioni HTTP tramite chiamate
fetch()
oXMLHttpRequest
.
I dati di flusso vengono letti in sequenza in piccole parti chiamate blocchi. che si dice che i blocchi posizionati in uno stream siano in coda. Ciò significa che sono in attesa in coda pronte per essere lette. Una coda interna tiene traccia dei blocchi che non sono ancora stati letti.
Una strategia di coda è un oggetto che determina in che modo un flusso deve segnalare la contropressione in base allo stato della sua coda interna. La strategia di coda assegna una dimensione a ogni blocco e confronta la dimensione totale di tutti i blocchi in coda con un numero specificato, noto come filigrana elevata.
I blocchi all'interno dello stream vengono letti da un lettore. Questo lettore recupera i dati un blocco alla volta, permettendoti di eseguire qualsiasi tipo di operazione. Il lettore più l'altro codice di elaborazione associato viene definito consumatore.
Il costrutto successivo in questo contesto è chiamato controller. A ogni stream leggibile è associato un controller che, come suggerito dal nome, ti permette di controllare lo stream.
Uno stream può essere letto da un solo lettore alla volta. Quando un lettore viene creato e inizia a leggere lo stream (ossia, diventa un lettore attivo), il lettore viene bloccato. Se vuoi che un altro lettore assuma la lettura del tuo stream, in genere devi rilasciare il primo lettore prima di fare qualsiasi altra cosa (anche se puoi teneare gli stream).
Creazione di uno stream leggibile
Per creare uno stream leggibile, devi chiamare il suo costruttore
ReadableStream()
.
Il costruttore ha un argomento facoltativo underlyingSource
, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza del flusso creata.
underlyingSource
Puoi utilizzare i seguenti metodi facoltativi definiti dallo sviluppatore:
start(controller)
: richiamato immediatamente durante la creazione dell'oggetto. Il metodo può accedere all'origine dello streaming e fare qualsiasi altra operazione necessaria per configurare la funzionalità dello stream. Se questo processo deve essere eseguito in modo asincrono, il metodo può restituire una promessa di segnalare l'esito positivo o negativo. Il parametrocontroller
passato a questo metodo èReadableStreamDefaultController
.pull(controller)
: può essere utilizzato per controllare lo stream man mano che vengono recuperati più blocchi. Viene chiamato ripetutamente finché la coda interna di blocchi del flusso non è piena fino a quando la coda non raggiunge la filigrana. Se il risultato della chiamata apull()
è una promessa,pull()
non verrà richiamato fino al completamento di questa promessa. Se la promessa viene rifiutata, il flusso risulterà errato.cancel(reason)
: richiamato quando il consumatore annulla lo streaming.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
supporta i seguenti metodi:
ReadableStreamDefaultController.close()
chiude lo stream associato.ReadableStreamDefaultController.enqueue()
accoda un determinato blocco nello stream associato.ReadableStreamDefaultController.error()
causa l'errore di eventuali interazioni future con lo stream associato.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream()
è queuingStrategy
.
È un oggetto che facoltativamente definisce una strategia di coda per il flusso che utilizza due
parametri:
highWaterMark
: un numero non negativo che indica la filigrana alta dello stream utilizzando questa strategia di coda.size(chunk)
: una funzione che calcola e restituisce la dimensione non negativa finita del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietàReadableStreamDefaultController.desiredSize
appropriata. Controlla anche quando viene chiamato il metodopull()
dell'origine sottostante.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
I metodi getReader()
e read()
Per leggere da uno stream leggibile, è necessario un lettore, che sarà un
ReadableStreamDefaultReader
.
Il metodo getReader()
dell'interfaccia ReadableStream
crea un lettore a cui blocca lo stream. Mentre lo stream è bloccato, non è possibile acquisire altri lettori finché questo non viene rilasciato.
Il metodo read()
dell'interfaccia ReadableStreamDefaultReader
restituisce una promessa che fornisce l'accesso al blocco
successivo nella coda interna dello stream. Termina o rifiuta con un risultato che dipende dallo stato
del flusso. Le diverse possibilità sono le seguenti:
- Se è disponibile un blocco, la promessa verrà soddisfatta con un oggetto nel modulo
{ value: chunk, done: false }
. - Se il flusso viene chiuso, la promessa verrà soddisfatta con un oggetto nel modulo
{ value: undefined, done: true }
. - Se lo stream presenta un errore, la promessa verrà rifiutata con l'errore pertinente.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
La proprietà locked
Puoi verificare se uno stream leggibile è bloccato accedendo alla relativa proprietà ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Esempi di codice stream leggibili
L'esempio di codice riportato di seguito mostra tutti i passaggi in azione. Innanzitutto, devi creare un elemento ReadableStream
che nel suo
argomento underlyingSource
(ovvero la classe TimestampSource
) definisce un metodo start()
.
Questo metodo indica al valore controller
dello stream enqueue()
un timestamp ogni secondo durante dieci secondi.
Infine, comunica al controller di close()
lo stream. Utilizzi questo flusso creando un lettore con il metodo getReader()
e chiamando read()
fino a quando il flusso non raggiunge done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iterazione asincrona
Controllare a ogni iterazione di loop read()
se lo stream è done
potrebbe non essere l'API più comoda.
Fortunatamente, presto esiste un modo migliore per farlo: l'iterazione asincrona.
for await (const chunk of stream) {
console.log(chunk);
}
Una soluzione alternativa per utilizzare l'iterazione asincrona oggi è implementare il comportamento con un polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Teeing di uno stream leggibile
Il metodo tee()
dell'interfaccia ReadableStream
restituisce il flusso leggibile corrente, restituendo un array di due elementi contenente i due rami risultanti come nuove istanze ReadableStream
. Ciò consente a due lettori di leggere uno stream contemporaneamente. Puoi farlo, ad esempio, in un service worker se vuoi recuperare una risposta dal server e trasmetterla in streaming al browser, ma anche per trasmetterla alla cache dei service worker. Poiché un corpo della risposta non può essere utilizzato più di una volta, sono necessarie due copie per farlo. Per annullare il flusso, devi annullare entrambi i rami risultanti. Il teing di uno stream in genere ne blocca l'intera durata, impedendo ad altri lettori di bloccarlo.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Flussi di byte leggibili
Per i flussi che rappresentano i byte, viene fornita una versione estesa dello stream leggibile per gestire i byte in modo efficiente, in particolare riducendo al minimo le copie. I flussi di byte consentono l'acquisizione dei lettori BYOB (Bring your own device, Porta il tuo buffer). L'implementazione predefinita può fornire una gamma di output diversi, ad esempio stringhe o buffer di array nel caso di WebSocket, mentre i flussi di byte garantiscono un output di byte. Inoltre, i lettori BYOB (Bring your own device, Porta il tuo dispositivo) offrono vantaggi di stabilità. Questo perché, se un buffer si scollega, può garantire che non venga scritto due volte nello stesso buffer, evitando quindi le condizioni di race. I lettori BYOB possono ridurre il numero di volte in cui il browser deve eseguire la garbage collection, perché può riutilizzare i buffer.
Creazione di un flusso di byte leggibile
Puoi creare un flusso di byte leggibile passando un parametro type
aggiuntivo al costruttore ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
Alla sorgente sottostante di un flusso di byte leggibile viene assegnato un valore ReadableByteStreamController
da
manipolare. Il metodo ReadableByteStreamController.enqueue()
prende un argomento chunk
il cui valore
è ArrayBufferView
. La proprietà ReadableByteStreamController.byobRequest
restituisce la richiesta di pull BYOB corrente o null se non ne esiste nessuna. Infine, la proprietà ReadableByteStreamController.desiredSize
restituisce la dimensione desiderata per riempire la coda interna del flusso controllato.
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream()
è queuingStrategy
.
È un oggetto che facoltativamente definisce una strategia di coda per il flusso che utilizza un
parametro:
highWaterMark
: un numero non negativo di byte che indica il livello alto del flusso con questa strategia di coda. Questo viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietàReadableByteStreamController.desiredSize
appropriata. Controlla anche quando viene chiamato il metodopull()
dell'origine sottostante.
I metodi getReader()
e read()
Puoi quindi ottenere l'accesso a ReadableStreamBYOBReader
impostando di conseguenza il parametro mode
:
ReadableStream.getReader({ mode: "byob" })
. Questo consente un controllo più preciso sull'allocazione
del buffer per evitare copie. Per leggere dal flusso di byte, devi chiamare
ReadableStreamBYOBReader.read(view)
, dove view
è
ArrayBufferView
.
Esempio di codice di flusso di byte leggibile
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
La seguente funzione restituisce flussi di byte leggibili che consentono una lettura zero-copy efficiente di un array generato in modo casuale. Anziché utilizzare una dimensione predeterminata del blocco pari a 1024, prova a riempire il buffer fornito dallo sviluppatore, consentendo il controllo completo.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
I meccanismi di uno stream accessibile in scrittura
Un flusso accessibile in scrittura è una destinazione in cui puoi scrivere dati, rappresentati in JavaScript da un oggetto WritableStream
. Questa funge da astrazione sulla parte superiore di un sink sottostante, un sink I/O di livello inferiore in cui vengono scritti i dati non elaborati.
I dati vengono scritti nello stream tramite uno writer, un blocco alla volta. Un blocco può assumere moltissime forme, proprio come i blocchi in un lettore. Puoi usare qualsiasi codice tu voglia per produrre i blocchi pronti per la scrittura; l'autore e il codice associato sono chiamati producer.
Quando un autore viene creato e inizia a scrivere in uno stream (un autore attivo), si dice che sia bloccato. In uno stream accessibile in scrittura è possibile scrivere un solo autore alla volta. Se vuoi che un altro autore inizi a scrivere nel tuo stream, in genere devi prima distribuirlo prima di collegarvi un altro autore.
Una coda interna tiene traccia dei blocchi che sono stati scritti nel flusso, ma non ancora elaborati dal sink sottostante.
Una strategia di coda è un oggetto che determina in che modo un flusso deve segnalare la contropressione in base allo stato della sua coda interna. La strategia di coda assegna una dimensione a ogni blocco e confronta la dimensione totale di tutti i blocchi in coda con un numero specificato, noto come filigrana elevata.
Il costrutto finale è chiamato controller. A ogni flusso accessibile in scrittura è associato un controller che ti consente di controllarlo (ad esempio per interromperlo).
Creazione di un flusso accessibile in scrittura
L'interfaccia WritableStream
dell'API Streams fornisce un'astrazione standard per la scrittura di flussi di dati in una destinazione, nota come sink. Questo oggetto dispone di contropressione e coda integrate. Per creare un flusso accessibile in scrittura, devi chiamare il costruttore WritableStream()
.
Ha un parametro underlyingSink
facoltativo, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza del flusso creata.
underlyingSink
underlyingSink
può includere i seguenti metodi facoltativi definiti dallo sviluppatore. Il parametro controller
passato ad alcuni dei metodi è
WritableStreamDefaultController
.
start(controller)
: questo metodo viene chiamato immediatamente durante la creazione dell'oggetto. I contenuti di questo metodo dovrebbero mirare ad accedere al sink sottostante. Se questo processo deve essere eseguito in modo asincrono, può restituire una promessa che indica l'esito positivo o negativo.write(chunk, controller)
: questo metodo viene chiamato quando un nuovo blocco di dati (specificato nel parametrochunk
) è pronto per essere scritto nel sink sottostante. Può restituire una promessa che segnala l'esito positivo o negativo dell'operazione di scrittura. Questo metodo verrà chiamato solo dopo l'esito positivo delle scritture precedenti e mai dopo la chiusura o l'interruzione del flusso.close(controller)
: questo metodo viene chiamato se l'app segnala che ha terminato la scrittura di blocchi nello stream. I contenuti dovrebbero eseguire tutte le operazioni necessarie per finalizzare le scritture nel sink sottostante e rilasciare l'accesso. Se questo processo è asincrono, può restituire una promessa che indica l'esito positivo o negativo. Questo metodo verrà chiamato solo dopo che tutte le scritture in coda sono andate a buon fine.abort(reason)
: questo metodo viene chiamato se l'app segnala l'intenzione di chiudere bruscamente lo stream e impostarlo in stato di errore. Può ripulire qualsiasi risorsa bloccata, comeclose()
, ma verrà chiamataabort()
anche se le scritture sono in coda. che verranno gettati via. Se questo processo è asincrono, può restituire una promessa che indica l'esito positivo o negativo. Il parametroreason
contiene un valoreDOMString
che descrive il motivo dell'interruzione dello streaming.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
L'interfaccia WritableStreamDefaultController
dell'API Streams rappresenta un controller che consente di controllare lo stato di WritableStream
durante la configurazione, man mano che più blocchi vengono inviati per la scrittura o alla fine della scrittura. Durante la creazione di un WritableStream
, al sink sottostante viene assegnata un'istanza WritableStreamDefaultController
corrispondente da manipolare. WritableStreamDefaultController
ha un solo metodo:
WritableStreamDefaultController.error()
,
che causa errori nelle interazioni future con lo stream associato.
WritableStreamDefaultController
supporta anche una proprietà signal
che restituisce un'istanza di
AbortSignal
,
consentendo l'interruzione di un'operazione WritableStream
, se necessario.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Il secondo argomento, anch'esso facoltativo, del costruttore WritableStream()
è queuingStrategy
.
È un oggetto che facoltativamente definisce una strategia di coda per il flusso che utilizza due
parametri:
highWaterMark
: un numero non negativo che indica la filigrana alta dello stream utilizzando questa strategia di coda.size(chunk)
: una funzione che calcola e restituisce la dimensione non negativa finita del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietàWritableStreamDefaultWriter.desiredSize
appropriata.
I metodi getWriter()
e write()
Per scrivere in uno stream scrivibile, è necessario un writer, che sarà un
WritableStreamDefaultWriter
. Il metodo getWriter()
dell'interfaccia WritableStream
restituisce una nuova istanza di WritableStreamDefaultWriter
e blocca il flusso a quell'istanza. Mentre il flusso è bloccato, non è possibile acquisire altri autori finché non viene rilasciato quello attuale.
Il metodo write()
dell'interfaccia
WritableStreamDefaultWriter
scrive un blocco di dati passato in un WritableStream
e nel sink sottostante, quindi restituisce
una promessa che risolve il problema indicando l'esito positivo o negativo dell'operazione di scrittura. Tieni presente che ciò che significa "successo" dipende dal sink sottostante; potrebbe indicare che il blocco è stato accettato e non necessariamente che è stato salvato in modo sicuro nella sua destinazione finale.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
La proprietà locked
Puoi verificare se uno stream scrivibile è bloccato accedendo alla relativa proprietà WritableStream.locked
.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Esempio di codice stream scrivibile
L'esempio di codice riportato di seguito mostra tutti i passaggi in azione.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Convertire uno stream leggibile in uno stream scrivibile
Uno stream leggibile può essere trasmesso a uno stream accessibile in scrittura tramite il metodo pipeTo()
dello stream leggibile.
ReadableStream.pipeTo()
indirizza l'attuale ReadableStream
a un determinato WritableStream
e restituisce una
promessa che si verifica quando la procedura di configurazione delle tubazioni viene completata correttamente oppure rifiuta
in caso di errori.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Creazione di un flusso di trasformazione
L'interfaccia TransformStream
dell'API Streams rappresenta un set di dati trasformabili. Puoi
creare un flusso di trasformazione chiamando il suo costruttore TransformStream()
, che crea e restituisce
un oggetto Transform stream dai gestori specificati. Il costruttore TransformStream()
accetta come primo argomento un oggetto JavaScript facoltativo che rappresenta transformer
. Questi oggetti possono contenere uno dei seguenti metodi:
transformer
start(controller)
: questo metodo viene chiamato immediatamente durante la creazione dell'oggetto. In genere viene utilizzato per accodare i blocchi di prefisso, utilizzandocontroller.enqueue()
. Questi blocchi verranno letti dal lato leggibile, ma non dipendono da nessuna scrittura sul lato scrivibile. Se questo processo iniziale è asincrono, ad esempio perché richiede un po' di impegno per acquisire i blocchi del prefisso, la funzione può restituire una promessa che indica l'esito positivo o negativo; una promessa rifiutata causerà un errore nel flusso. Eventuali eccezioni generate verranno nuovamente generate dal costruttoreTransformStream()
.transform(chunk, controller)
: questo metodo viene chiamato quando un nuovo blocco originariamente scritto sul lato scrivibile è pronto per essere trasformato. L'implementazione del flusso garantisce che questa funzione venga chiamata solo dopo l'esito positivo delle trasformazioni precedenti e mai prima del completamento distart()
o dopo la chiamata diflush()
. Questa funzione esegue l'effettivo lavoro di trasformazione del flusso di trasformazione. Può accodare i risultati utilizzandocontroller.enqueue()
. In questo modo è consentito un singolo blocco scritto in scrittura sul lato scrivibile per generare zero o più blocchi sul lato leggibile, a seconda di quante volte viene chiamatocontroller.enqueue()
. Se il processo di trasformazione è asincrono, questa funzione può restituire una promessa che indica il successo o il fallimento della trasformazione. Una promessa rifiutata causa un errore di entrambi i lati leggibili e scrivibili del flusso di trasformazione. Se non viene fornito alcun metodotransform()
, viene utilizzata la trasformazione dell'identità, che accoda i blocchi non modificati dal lato scrivibile a quello leggibile.flush(controller)
: questo metodo viene chiamato dopo che tutti i blocchi scritti in un lato accessibile in scrittura sono stati trasformati passando correttamente atransform()
e il lato scrivibile sta per essere chiuso. In genere viene utilizzata per accodare i blocchi di suffissi in coda sul lato leggibile, prima che anch'esso venga chiuso. Se il processo di svuotamento è asincrono, la funzione può restituire una promessa di segnalare l'esito positivo o negativo; il risultato verrà comunicato al chiamante distream.writable.write()
. Inoltre, una promessa rifiutata genera un errore sia sui lati leggibili sia scrivibili dello stream. La richiesta di un'eccezione equivale alla restituzione di una promessa rifiutata.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Le strategie di coda writableStrategy
e readableStrategy
Il secondo e il terzo parametro facoltativi del costruttore TransformStream()
sono strategie di coda con writableStrategy
e readableStrategy
facoltativi. Vengono definiti come descritto rispettivamente nella sezione dei flussi leggibile e scrivibile.
Esempio di codice stream Transform
Il seguente esempio di codice mostra un semplice flusso di trasformazione in azione.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Creare uno stream leggibile tramite uno stream di trasformazione
Il metodo pipeThrough()
dell'interfaccia ReadableStream
offre un modo concatenato di collegare lo stream attuale
tramite uno stream di trasformazione o qualsiasi altra coppia scrivibile/leggibile. La tubazione di uno stream in genere lo blocca
per l'intera durata, impedendo ad altri lettori di bloccarlo.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Il successivo esempio di codice (un po' inventato) mostra come implementare una versione "shouting" di fetch()
in maiuscolo tutto il testo utilizzando la promessa di risposta restituita
come flusso
e blocco con maiuscolo per blocco. Il vantaggio di questo approccio è che non devi attendere il download dell'intero documento, il che può fare un'enorme differenza quando si gestiscono file di grandi dimensioni.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
La demo riportata di seguito mostra i flussi leggibili, scrivibili e trasformativi in azione. Include inoltre esempi di catene di tubi pipeThrough()
e pipeTo()
e dimostra inoltre tee()
. Facoltativamente, puoi eseguire demo in una propria finestra o visualizzare il codice sorgente.
Stream utili disponibili nel browser
Ci sono diversi stream utili integrati direttamente nel browser. Puoi creare facilmente un ReadableStream
da un BLOB. Il metodo stream() dell'interfaccia Blob
restituisce un ReadableStream
che, al momento della lettura, restituisce i dati contenuti all'interno del blob. Ricorda inoltre che un oggetto File
è un tipo specifico di Blob
e può essere utilizzato in qualsiasi contesto a cui può accedere un BLOB.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Le varianti di streaming di TextDecoder.decode()
e TextEncoder.encode()
sono chiamate rispettivamente
TextDecoderStream
e
TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
È facile comprimere o decomprimere un file rispettivamente con i flussi di trasformazione di CompressionStream
e DecompressionStream
. Il codice di esempio riportato di seguito mostra come scaricare la specifica Streams, comprimerla (gzip)
direttamente nel browser e scrivere il file compresso direttamente su disco.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Gli eventi dell'API File System Access
FileSystemWritableFileStream
e gli stream sperimentali di fetch()
richieste sono
esempi di flussi scrivibili in natura.
L'API Serial fa un uso intensivo degli stream sia leggibili sia scrivibili.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Infine, l'API WebSocketStream
integra i flussi con l'API WebSocket.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Risorse utili
- Specifiche degli stream
- Demo di accompagnamento
- polyfill degli stream
- 2016: l'anno dei live streaming web
- ITeratori e generatori asincroni
- Visualizzatore di stream
Ringraziamenti
Questo articolo è stato rivisto da Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley e Adam Rice. I post del blog di Jake Archibald mi hanno aiutato molto a comprendere gli stream. Alcuni esempi di codice si ispirano alle esplorazioni dell'utente GitHub @bellbind e parti della prosa si basano fortemente sui documenti web MDN sugli stream. Gli autori dello Streams Standard hanno lavorato molto nella stesura di questa specifica. Immagine hero di Ryan Lara su Unsplash.