Stream: la guida definitiva

Scopri come utilizzare flussi leggibili, scrivibili e trasformarli con l'API Streams.

L'API Streams consente di accedere in modo programmatico a flussi di dati ricevuti sulla rete o creati in locale con qualsiasi mezzo ed elaborarli con JavaScript. I flussi di dati richiedono la suddivisione di una risorsa che vuoi ricevere, inviare o trasformare in piccoli blocchi, quindi elaborarli poco per volta. Sebbene lo streaming sia qualcosa che i browser fanno comunque quando ricevono asset come HTML o video da mostrare sulle pagine web, questa funzionalità non è mai stata disponibile per JavaScript prima dell'introduzione di fetch con gli stream nel 2015.

In precedenza, se volessi elaborare una risorsa di qualche tipo (un video o un file di testo ecc.), dovevi scaricare l'intero file, attendere che venisse deserializzato in un formato adatto, quindi elaborarlo. Con gli stream disponibili per JavaScript, tutto cambia. Ora puoi elaborare i dati non elaborati con JavaScript progressivamente non appena sono disponibili sul client, senza dover generare buffer, stringa o blob. In questo modo è possibile accedere a una serie di casi d'uso, alcuni dei quali sono elencati di seguito:

  • Effetti video:collega uno stream video leggibile attraverso uno stream di trasformazione che applica gli effetti in tempo reale.
  • De)compressione dei dati: collega uno stream di file attraverso un flusso di trasformazione che lo (de)comprime in modo selettivo.
  • Decodifica delle immagini:collega uno stream di risposta HTTP attraverso un flusso di trasformazione che decodifica i byte in dati bitmap, quindi attraverso un altro flusso di trasformazione che converte i bitmap in PNG. Se installata all'interno del gestore fetch di un service worker, questa opzione consente di eseguire il polyfill trasparente dei nuovi formati di immagine come AVIF.

Supporto del browser

ReadableStream e WritableStream

Supporto dei browser

  • 43
  • 14
  • 65
  • 10.1

Fonte

TransformStream

Supporto dei browser

  • 67
  • 79
  • 102
  • 14.1

Fonte

Concetti principali

Prima di entrare nel dettaglio sui vari tipi di stream, vorrei introdurre alcuni concetti fondamentali.

Bocconcini

Un blocco è un singolo dato che viene scritto o letto da uno stream. Può essere di qualsiasi tipo; i flussi possono anche contenere blocchi di diversi tipi. Il più delle volte, un blocco non è l'unità di dati più atomica per un determinato flusso. Ad esempio, un flusso di byte potrebbe contenere blocchi da 16 unità Uint8Array KiB, anziché singoli byte.

Flussi leggibili

Uno stream leggibile rappresenta una fonte di dati da cui puoi leggere. In altre parole, i dati escono da uno stream leggibile. Concretamente, uno stream leggibile è un'istanza della classe ReadableStream.

Stream scrivibili

Un flusso accessibile in scrittura rappresenta una destinazione per i dati in cui puoi scrivere. In altre parole, i dati entrano in un flusso scrivibile. Concretamente, un flusso scrivibile è un'istanza della classe WritableStream.

Trasforma flussi

Un flusso di trasformazione è costituito da una coppia di flussi: uno stream accessibile in scrittura, noto come lato scrivibile, e uno stream leggibile, noto come lato leggibile. Una metafora nel mondo reale sarebbe l'interprete simultaneo che traduce al volo da una lingua all'altra. In modo specifico per il flusso di trasformazione, la scrittura sul lato scrivibile comporta la disponibilità di nuovi dati per la lettura dal lato leggibile. Concretamente, qualsiasi oggetto con una proprietà writable e una proprietà readable può fungere da flusso di trasformazione. Tuttavia, la classe TransformStream standard semplifica la creazione di una coppia di questo tipo correttamente angolata.

Catene per tubi

Gli stream vengono utilizzati principalmente mediante associazione tra loro. Un flusso leggibile può essere reindirizzato direttamente a uno stream scrivibile, utilizzando il metodo pipeTo() dello stream leggibile, oppure può essere trasmesso prima attraverso uno o più flussi di trasformazione, utilizzando il metodo pipeThrough() dello stream leggibile. Un insieme di flussi collegati tra loro in questo modo è definito catena di tubature.

Contropressione

Una volta creata, una catena di tubi propaga gli indicatori relativi alla velocità con cui i blocchi devono passare attraverso questa catena. Se un passaggio nella catena non può ancora accettare blocchi, propaga un segnale all'indietro attraverso la catena di tubature, fino a quando alla sorgente originale non viene detto di interrompere la produzione dei blocchi così rapidamente. Questo processo di normalizzazione del flusso è chiamato contropressione.

Teeing

È possibile eseguire il tethering di uno stream leggibile (chiamato in base alla forma di una "T" maiuscola) utilizzando il relativo metodo tee(). Questa operazione bloccherà il flusso, ovvero non lo renderà più utilizzabile direttamente. Tuttavia, creerà due nuovi flussi, denominati rami, che possono essere utilizzati in modo indipendente. Il teeing è importante anche perché gli stream non possono essere riavvolti o riavviati. Scoprirai di più in merito più avanti.

Diagramma di una catena di tubi costituita da un flusso leggibile proveniente da una chiamata all'API di recupero, che viene poi condotto attraverso un flusso di trasformazione il cui output viene trasmesso al primo flusso leggibile risultante e alla cache del service worker per il secondo flusso leggibile risultante.
Una catena di tubature.

I meccanismi di uno stream leggibile

Un flusso leggibile è un'origine dati rappresentata in JavaScript da un oggetto ReadableStream che scorre da un'origine sottostante. Il costruttore ReadableStream() crea e restituisce un oggetto flusso leggibile dai gestori specificati. Esistono due tipi di origine di base:

  • Le origini push eseguono costantemente il push dei dati quando li accedi e spetta a te avviare, sospendere o annullare l'accesso al flusso. Alcuni esempi sono video stream in diretta, eventi inviati dal server o WebSocket.
  • Le origini pull richiedono di richiedere esplicitamente i dati alle origini una volta connesse. Tra gli esempi sono incluse le operazioni HTTP tramite chiamate fetch() o XMLHttpRequest.

I dati di flusso vengono letti in sequenza in piccole parti chiamate blocchi. che si dice che i blocchi posizionati in uno stream siano in coda. Ciò significa che sono in attesa in coda pronte per essere lette. Una coda interna tiene traccia dei blocchi che non sono ancora stati letti.

Una strategia di coda è un oggetto che determina in che modo un flusso deve segnalare la contropressione in base allo stato della sua coda interna. La strategia di coda assegna una dimensione a ogni blocco e confronta la dimensione totale di tutti i blocchi in coda con un numero specificato, noto come filigrana elevata.

I blocchi all'interno dello stream vengono letti da un lettore. Questo lettore recupera i dati un blocco alla volta, permettendoti di eseguire qualsiasi tipo di operazione. Il lettore più l'altro codice di elaborazione associato viene definito consumatore.

Il costrutto successivo in questo contesto è chiamato controller. A ogni stream leggibile è associato un controller che, come suggerito dal nome, ti permette di controllare lo stream.

Uno stream può essere letto da un solo lettore alla volta. Quando un lettore viene creato e inizia a leggere lo stream (ossia, diventa un lettore attivo), il lettore viene bloccato. Se vuoi che un altro lettore assuma la lettura del tuo stream, in genere devi rilasciare il primo lettore prima di fare qualsiasi altra cosa (anche se puoi teneare gli stream).

Creazione di uno stream leggibile

Per creare uno stream leggibile, devi chiamare il suo costruttore ReadableStream(). Il costruttore ha un argomento facoltativo underlyingSource, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza del flusso creata.

underlyingSource

Puoi utilizzare i seguenti metodi facoltativi definiti dallo sviluppatore:

  • start(controller): richiamato immediatamente durante la creazione dell'oggetto. Il metodo può accedere all'origine dello streaming e fare qualsiasi altra operazione necessaria per configurare la funzionalità dello stream. Se questo processo deve essere eseguito in modo asincrono, il metodo può restituire una promessa di segnalare l'esito positivo o negativo. Il parametro controller passato a questo metodo è ReadableStreamDefaultController.
  • pull(controller): può essere utilizzato per controllare lo stream man mano che vengono recuperati più blocchi. Viene chiamato ripetutamente finché la coda interna di blocchi del flusso non è piena fino a quando la coda non raggiunge la filigrana. Se il risultato della chiamata a pull() è una promessa, pull() non verrà richiamato fino al completamento di questa promessa. Se la promessa viene rifiutata, il flusso risulterà errato.
  • cancel(reason): richiamato quando il consumatore annulla lo streaming.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController supporta i seguenti metodi:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream() è queuingStrategy. È un oggetto che facoltativamente definisce una strategia di coda per il flusso che utilizza due parametri:

  • highWaterMark: un numero non negativo che indica la filigrana alta dello stream utilizzando questa strategia di coda.
  • size(chunk): una funzione che calcola e restituisce la dimensione non negativa finita del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà ReadableStreamDefaultController.desiredSize appropriata. Controlla anche quando viene chiamato il metodo pull() dell'origine sottostante.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

I metodi getReader() e read()

Per leggere da uno stream leggibile, è necessario un lettore, che sarà un ReadableStreamDefaultReader. Il metodo getReader() dell'interfaccia ReadableStream crea un lettore a cui blocca lo stream. Mentre lo stream è bloccato, non è possibile acquisire altri lettori finché questo non viene rilasciato.

Il metodo read() dell'interfaccia ReadableStreamDefaultReader restituisce una promessa che fornisce l'accesso al blocco successivo nella coda interna dello stream. Termina o rifiuta con un risultato che dipende dallo stato del flusso. Le diverse possibilità sono le seguenti:

  • Se è disponibile un blocco, la promessa verrà soddisfatta con un oggetto nel modulo
    { value: chunk, done: false }.
  • Se il flusso viene chiuso, la promessa verrà soddisfatta con un oggetto nel modulo
    { value: undefined, done: true }.
  • Se lo stream presenta un errore, la promessa verrà rifiutata con l'errore pertinente.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

La proprietà locked

Puoi verificare se uno stream leggibile è bloccato accedendo alla relativa proprietà ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Esempi di codice stream leggibili

L'esempio di codice riportato di seguito mostra tutti i passaggi in azione. Innanzitutto, devi creare un elemento ReadableStream che nel suo argomento underlyingSource (ovvero la classe TimestampSource) definisce un metodo start(). Questo metodo indica al valore controller dello stream enqueue() un timestamp ogni secondo durante dieci secondi. Infine, comunica al controller di close() lo stream. Utilizzi questo flusso creando un lettore con il metodo getReader() e chiamando read() fino a quando il flusso non raggiunge done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iterazione asincrona

Controllare a ogni iterazione di loop read() se lo stream è done potrebbe non essere l'API più comoda. Fortunatamente, presto esiste un modo migliore per farlo: l'iterazione asincrona.

for await (const chunk of stream) {
  console.log(chunk);
}

Una soluzione alternativa per utilizzare l'iterazione asincrona oggi è implementare il comportamento con un polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Teeing di uno stream leggibile

Il metodo tee() dell'interfaccia ReadableStream restituisce il flusso leggibile corrente, restituendo un array di due elementi contenente i due rami risultanti come nuove istanze ReadableStream. Ciò consente a due lettori di leggere uno stream contemporaneamente. Puoi farlo, ad esempio, in un service worker se vuoi recuperare una risposta dal server e trasmetterla in streaming al browser, ma anche per trasmetterla alla cache dei service worker. Poiché un corpo della risposta non può essere utilizzato più di una volta, sono necessarie due copie per farlo. Per annullare il flusso, devi annullare entrambi i rami risultanti. Il teing di uno stream in genere ne blocca l'intera durata, impedendo ad altri lettori di bloccarlo.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Flussi di byte leggibili

Per i flussi che rappresentano i byte, viene fornita una versione estesa dello stream leggibile per gestire i byte in modo efficiente, in particolare riducendo al minimo le copie. I flussi di byte consentono l'acquisizione dei lettori BYOB (Bring your own device, Porta il tuo buffer). L'implementazione predefinita può fornire una gamma di output diversi, ad esempio stringhe o buffer di array nel caso di WebSocket, mentre i flussi di byte garantiscono un output di byte. Inoltre, i lettori BYOB (Bring your own device, Porta il tuo dispositivo) offrono vantaggi di stabilità. Questo perché, se un buffer si scollega, può garantire che non venga scritto due volte nello stesso buffer, evitando quindi le condizioni di race. I lettori BYOB possono ridurre il numero di volte in cui il browser deve eseguire la garbage collection, perché può riutilizzare i buffer.

Creazione di un flusso di byte leggibile

Puoi creare un flusso di byte leggibile passando un parametro type aggiuntivo al costruttore ReadableStream().

new ReadableStream({ type: 'bytes' });

underlyingSource

Alla sorgente sottostante di un flusso di byte leggibile viene assegnato un valore ReadableByteStreamController da manipolare. Il metodo ReadableByteStreamController.enqueue() prende un argomento chunk il cui valore è ArrayBufferView. La proprietà ReadableByteStreamController.byobRequest restituisce la richiesta di pull BYOB corrente o null se non ne esiste nessuna. Infine, la proprietà ReadableByteStreamController.desiredSize restituisce la dimensione desiderata per riempire la coda interna del flusso controllato.

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore ReadableStream() è queuingStrategy. È un oggetto che facoltativamente definisce una strategia di coda per il flusso che utilizza un parametro:

  • highWaterMark: un numero non negativo di byte che indica il livello alto del flusso con questa strategia di coda. Questo viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà ReadableByteStreamController.desiredSize appropriata. Controlla anche quando viene chiamato il metodo pull() dell'origine sottostante.

I metodi getReader() e read()

Puoi quindi ottenere l'accesso a ReadableStreamBYOBReader impostando di conseguenza il parametro mode: ReadableStream.getReader({ mode: "byob" }). Questo consente un controllo più preciso sull'allocazione del buffer per evitare copie. Per leggere dal flusso di byte, devi chiamare ReadableStreamBYOBReader.read(view), dove view è ArrayBufferView.

Esempio di codice di flusso di byte leggibile

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

La seguente funzione restituisce flussi di byte leggibili che consentono una lettura zero-copy efficiente di un array generato in modo casuale. Anziché utilizzare una dimensione predeterminata del blocco pari a 1024, prova a riempire il buffer fornito dallo sviluppatore, consentendo il controllo completo.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

I meccanismi di uno stream accessibile in scrittura

Un flusso accessibile in scrittura è una destinazione in cui puoi scrivere dati, rappresentati in JavaScript da un oggetto WritableStream. Questa funge da astrazione sulla parte superiore di un sink sottostante, un sink I/O di livello inferiore in cui vengono scritti i dati non elaborati.

I dati vengono scritti nello stream tramite uno writer, un blocco alla volta. Un blocco può assumere moltissime forme, proprio come i blocchi in un lettore. Puoi usare qualsiasi codice tu voglia per produrre i blocchi pronti per la scrittura; l'autore e il codice associato sono chiamati producer.

Quando un autore viene creato e inizia a scrivere in uno stream (un autore attivo), si dice che sia bloccato. In uno stream accessibile in scrittura è possibile scrivere un solo autore alla volta. Se vuoi che un altro autore inizi a scrivere nel tuo stream, in genere devi prima distribuirlo prima di collegarvi un altro autore.

Una coda interna tiene traccia dei blocchi che sono stati scritti nel flusso, ma non ancora elaborati dal sink sottostante.

Una strategia di coda è un oggetto che determina in che modo un flusso deve segnalare la contropressione in base allo stato della sua coda interna. La strategia di coda assegna una dimensione a ogni blocco e confronta la dimensione totale di tutti i blocchi in coda con un numero specificato, noto come filigrana elevata.

Il costrutto finale è chiamato controller. A ogni flusso accessibile in scrittura è associato un controller che ti consente di controllarlo (ad esempio per interromperlo).

Creazione di un flusso accessibile in scrittura

L'interfaccia WritableStream dell'API Streams fornisce un'astrazione standard per la scrittura di flussi di dati in una destinazione, nota come sink. Questo oggetto dispone di contropressione e coda integrate. Per creare un flusso accessibile in scrittura, devi chiamare il costruttore WritableStream(). Ha un parametro underlyingSink facoltativo, che rappresenta un oggetto con metodi e proprietà che definiscono il comportamento dell'istanza del flusso creata.

underlyingSink

underlyingSink può includere i seguenti metodi facoltativi definiti dallo sviluppatore. Il parametro controller passato ad alcuni dei metodi è WritableStreamDefaultController.

  • start(controller): questo metodo viene chiamato immediatamente durante la creazione dell'oggetto. I contenuti di questo metodo dovrebbero mirare ad accedere al sink sottostante. Se questo processo deve essere eseguito in modo asincrono, può restituire una promessa che indica l'esito positivo o negativo.
  • write(chunk, controller): questo metodo viene chiamato quando un nuovo blocco di dati (specificato nel parametro chunk) è pronto per essere scritto nel sink sottostante. Può restituire una promessa che segnala l'esito positivo o negativo dell'operazione di scrittura. Questo metodo verrà chiamato solo dopo l'esito positivo delle scritture precedenti e mai dopo la chiusura o l'interruzione del flusso.
  • close(controller): questo metodo viene chiamato se l'app segnala che ha terminato la scrittura di blocchi nello stream. I contenuti dovrebbero eseguire tutte le operazioni necessarie per finalizzare le scritture nel sink sottostante e rilasciare l'accesso. Se questo processo è asincrono, può restituire una promessa che indica l'esito positivo o negativo. Questo metodo verrà chiamato solo dopo che tutte le scritture in coda sono andate a buon fine.
  • abort(reason): questo metodo viene chiamato se l'app segnala l'intenzione di chiudere bruscamente lo stream e impostarlo in stato di errore. Può ripulire qualsiasi risorsa bloccata, come close(), ma verrà chiamata abort() anche se le scritture sono in coda. che verranno gettati via. Se questo processo è asincrono, può restituire una promessa che indica l'esito positivo o negativo. Il parametro reason contiene un valore DOMString che descrive il motivo dell'interruzione dello streaming.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

L'interfaccia WritableStreamDefaultController dell'API Streams rappresenta un controller che consente di controllare lo stato di WritableStream durante la configurazione, man mano che più blocchi vengono inviati per la scrittura o alla fine della scrittura. Durante la creazione di un WritableStream, al sink sottostante viene assegnata un'istanza WritableStreamDefaultController corrispondente da manipolare. WritableStreamDefaultController ha un solo metodo: WritableStreamDefaultController.error(), che causa errori nelle interazioni future con lo stream associato. WritableStreamDefaultController supporta anche una proprietà signal che restituisce un'istanza di AbortSignal, consentendo l'interruzione di un'operazione WritableStream, se necessario.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Il secondo argomento, anch'esso facoltativo, del costruttore WritableStream() è queuingStrategy. È un oggetto che facoltativamente definisce una strategia di coda per il flusso che utilizza due parametri:

  • highWaterMark: un numero non negativo che indica la filigrana alta dello stream utilizzando questa strategia di coda.
  • size(chunk): una funzione che calcola e restituisce la dimensione non negativa finita del valore del blocco specificato. Il risultato viene utilizzato per determinare la contropressione, che si manifesta tramite la proprietà WritableStreamDefaultWriter.desiredSize appropriata.

I metodi getWriter() e write()

Per scrivere in uno stream scrivibile, è necessario un writer, che sarà un WritableStreamDefaultWriter. Il metodo getWriter() dell'interfaccia WritableStream restituisce una nuova istanza di WritableStreamDefaultWriter e blocca il flusso a quell'istanza. Mentre il flusso è bloccato, non è possibile acquisire altri autori finché non viene rilasciato quello attuale.

Il metodo write() dell'interfaccia WritableStreamDefaultWriter scrive un blocco di dati passato in un WritableStream e nel sink sottostante, quindi restituisce una promessa che risolve il problema indicando l'esito positivo o negativo dell'operazione di scrittura. Tieni presente che ciò che significa "successo" dipende dal sink sottostante; potrebbe indicare che il blocco è stato accettato e non necessariamente che è stato salvato in modo sicuro nella sua destinazione finale.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

La proprietà locked

Puoi verificare se uno stream scrivibile è bloccato accedendo alla relativa proprietà WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Esempio di codice stream scrivibile

L'esempio di codice riportato di seguito mostra tutti i passaggi in azione.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Convertire uno stream leggibile in uno stream scrivibile

Uno stream leggibile può essere trasmesso a uno stream accessibile in scrittura tramite il metodo pipeTo() dello stream leggibile. ReadableStream.pipeTo() indirizza l'attuale ReadableStream a un determinato WritableStream e restituisce una promessa che si verifica quando la procedura di configurazione delle tubazioni viene completata correttamente oppure rifiuta in caso di errori.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Creazione di un flusso di trasformazione

L'interfaccia TransformStream dell'API Streams rappresenta un set di dati trasformabili. Puoi creare un flusso di trasformazione chiamando il suo costruttore TransformStream(), che crea e restituisce un oggetto Transform stream dai gestori specificati. Il costruttore TransformStream() accetta come primo argomento un oggetto JavaScript facoltativo che rappresenta transformer. Questi oggetti possono contenere uno dei seguenti metodi:

transformer

  • start(controller): questo metodo viene chiamato immediatamente durante la creazione dell'oggetto. In genere viene utilizzato per accodare i blocchi di prefisso, utilizzando controller.enqueue(). Questi blocchi verranno letti dal lato leggibile, ma non dipendono da nessuna scrittura sul lato scrivibile. Se questo processo iniziale è asincrono, ad esempio perché richiede un po' di impegno per acquisire i blocchi del prefisso, la funzione può restituire una promessa che indica l'esito positivo o negativo; una promessa rifiutata causerà un errore nel flusso. Eventuali eccezioni generate verranno nuovamente generate dal costruttore TransformStream().
  • transform(chunk, controller): questo metodo viene chiamato quando un nuovo blocco originariamente scritto sul lato scrivibile è pronto per essere trasformato. L'implementazione del flusso garantisce che questa funzione venga chiamata solo dopo l'esito positivo delle trasformazioni precedenti e mai prima del completamento di start() o dopo la chiamata di flush(). Questa funzione esegue l'effettivo lavoro di trasformazione del flusso di trasformazione. Può accodare i risultati utilizzando controller.enqueue(). In questo modo è consentito un singolo blocco scritto in scrittura sul lato scrivibile per generare zero o più blocchi sul lato leggibile, a seconda di quante volte viene chiamato controller.enqueue(). Se il processo di trasformazione è asincrono, questa funzione può restituire una promessa che indica il successo o il fallimento della trasformazione. Una promessa rifiutata causa un errore di entrambi i lati leggibili e scrivibili del flusso di trasformazione. Se non viene fornito alcun metodo transform(), viene utilizzata la trasformazione dell'identità, che accoda i blocchi non modificati dal lato scrivibile a quello leggibile.
  • flush(controller): questo metodo viene chiamato dopo che tutti i blocchi scritti in un lato accessibile in scrittura sono stati trasformati passando correttamente a transform() e il lato scrivibile sta per essere chiuso. In genere viene utilizzata per accodare i blocchi di suffissi in coda sul lato leggibile, prima che anch'esso venga chiuso. Se il processo di svuotamento è asincrono, la funzione può restituire una promessa di segnalare l'esito positivo o negativo; il risultato verrà comunicato al chiamante di stream.writable.write(). Inoltre, una promessa rifiutata genera un errore sia sui lati leggibili sia scrivibili dello stream. La richiesta di un'eccezione equivale alla restituzione di una promessa rifiutata.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Le strategie di coda writableStrategy e readableStrategy

Il secondo e il terzo parametro facoltativi del costruttore TransformStream() sono strategie di coda con writableStrategy e readableStrategy facoltativi. Vengono definiti come descritto rispettivamente nella sezione dei flussi leggibile e scrivibile.

Esempio di codice stream Transform

Il seguente esempio di codice mostra un semplice flusso di trasformazione in azione.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Creare uno stream leggibile tramite uno stream di trasformazione

Il metodo pipeThrough() dell'interfaccia ReadableStream offre un modo concatenato di collegare lo stream attuale tramite uno stream di trasformazione o qualsiasi altra coppia scrivibile/leggibile. La tubazione di uno stream in genere lo blocca per l'intera durata, impedendo ad altri lettori di bloccarlo.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Il successivo esempio di codice (un po' inventato) mostra come implementare una versione "shouting" di fetch() in maiuscolo tutto il testo utilizzando la promessa di risposta restituita come flusso e blocco con maiuscolo per blocco. Il vantaggio di questo approccio è che non devi attendere il download dell'intero documento, il che può fare un'enorme differenza quando si gestiscono file di grandi dimensioni.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

La demo riportata di seguito mostra i flussi leggibili, scrivibili e trasformativi in azione. Include inoltre esempi di catene di tubi pipeThrough() e pipeTo() e dimostra inoltre tee(). Facoltativamente, puoi eseguire demo in una propria finestra o visualizzare il codice sorgente.

Stream utili disponibili nel browser

Ci sono diversi stream utili integrati direttamente nel browser. Puoi creare facilmente un ReadableStream da un BLOB. Il metodo stream() dell'interfaccia Blob restituisce un ReadableStream che, al momento della lettura, restituisce i dati contenuti all'interno del blob. Ricorda inoltre che un oggetto File è un tipo specifico di Blob e può essere utilizzato in qualsiasi contesto a cui può accedere un BLOB.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Le varianti di streaming di TextDecoder.decode() e TextEncoder.encode() sono chiamate rispettivamente TextDecoderStream e TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

È facile comprimere o decomprimere un file rispettivamente con i flussi di trasformazione di CompressionStream e DecompressionStream. Il codice di esempio riportato di seguito mostra come scaricare la specifica Streams, comprimerla (gzip) direttamente nel browser e scrivere il file compresso direttamente su disco.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Gli eventi dell'API File System Access FileSystemWritableFileStream e gli stream sperimentali di fetch() richieste sono esempi di flussi scrivibili in natura.

L'API Serial fa un uso intensivo degli stream sia leggibili sia scrivibili.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Infine, l'API WebSocketStream integra i flussi con l'API WebSocket.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Risorse utili

Ringraziamenti

Questo articolo è stato rivisto da Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley e Adam Rice. I post del blog di Jake Archibald mi hanno aiutato molto a comprendere gli stream. Alcuni esempi di codice si ispirano alle esplorazioni dell'utente GitHub @bellbind e parti della prosa si basano fortemente sui documenti web MDN sugli stream. Gli autori dello Streams Standard hanno lavorato molto nella stesura di questa specifica. Immagine hero di Ryan Lara su Unsplash.