Streams – der ultimative Leitfaden

Hier erfahren Sie, wie Sie mit der Streams API lesbare und beschreibbare Streams verwenden und transformieren können.

Mit der Streams API können Sie programmatisch auf Datenstreams zugreifen, die über das Netzwerk empfangen oder auf andere Weise lokal erstellt wurden, und sie mit JavaScript verarbeiten. Beim Streaming wird eine Ressource, die Sie empfangen, senden oder transformieren möchten, in kleine Blöcke unterteilt und diese dann Stück für Stück verarbeitet. Während Browser beim Empfang von Assets wie HTML oder Videos, die auf Webseiten angezeigt werden sollen, trotzdem etwas tun, war diese Funktion für JavaScript erst seit der Einführung von fetch mit Streams im Jahr 2015 für JavaScript verfügbar.

Wenn Sie bisher eine Ressource (z. B. ein Video oder eine Textdatei usw.) verarbeiten wollten, mussten Sie die gesamte Datei herunterladen, warten, bis sie in ein geeignetes Format deserialisiert wurde, und sie dann verarbeiten. Mit Streams, die für JavaScript verfügbar sind, ändert sich das. Sie können nun Rohdaten mit JavaScript verarbeiten, sobald sie auf dem Client verfügbar sind, ohne einen Puffer, String oder Blob generieren zu müssen. Dadurch wird eine Reihe von Anwendungsfällen genutzt, von denen ich einige nachfolgend auflistet:

  • Videoeffekte:Weiterleitung eines lesbaren Videostreams durch einen Umwandlungsstream, bei dem Effekte in Echtzeit angewendet werden.
  • Datenkomprimierung (De-)Komprimierung:Pipeline eines Dateistreams durch einen Transformationsstream, der ihn selektiv (de)komprimiert.
  • Bilddecodierung:Leiten eines HTTP-Antwortstreams durch einen Transformationsstream, der Bytes in Bitmapdaten decodiert, und anschließend über einen anderen Transformationsstream, der Bitmaps in PNGs übersetzt. Wenn dieser im fetch-Handler eines Service Workers installiert ist, können Sie neue Bildformate wie AVIF transparent mit Polyfill für neue Bildformate füllen.

Unterstützte Browser

ReadableStream und WritableStream

Unterstützte Browser

  • 43
  • 14
  • 65
  • 10.1

Quelle

TransformStream

Unterstützte Browser

  • 67
  • 79
  • 102
  • 14.1

Quelle

Wichtige Konzepte

Bevor ich näher auf die verschiedenen Arten von Streams eingehe, möchte ich zuerst einige grundlegende Konzepte vorstellen.

Stücke

Ein Chunk ist ein einzelnes Datenelement, das in einen Stream geschrieben oder aus diesem gelesen wird. Sie können einen beliebigen Typ haben. Streams können sogar Blöcke verschiedener Typen enthalten. In den meisten Fällen ist ein Block nicht die atomarste Dateneinheit für einen bestimmten Stream. Beispielsweise kann ein Bytestream Blöcke enthalten, die aus 16 KiB-Uint8Array-Einheiten anstelle von einzelnen Byte bestehen.

Lesbare Streams

Ein lesbarer Stream stellt eine Datenquelle dar, aus der Sie lesen können. Mit anderen Worten: Die Daten kommen aus einem lesbaren Stream. Konkret ist ein lesbarer Stream eine Instanz der ReadableStream-Klasse.

Beschreibbare Streams

Ein beschreibbarer Stream ist ein Ziel für Daten, in die Sie schreiben können. Mit anderen Worten, die Daten gehen in einen beschreibbaren Stream. Konkret ist ein beschreibbarer Stream eine Instanz der Klasse WritableStream.

Streams transformieren

Ein Transformationsstream besteht aus einem Paar von Streams: einem beschreibbaren Stream, der als beschreibbare Seite bezeichnet wird, und einem lesbaren Stream, der als lesbare Seite bezeichnet wird. Eine reale Metapher dafür wäre ein Simultandolmetscher, der spontan von einer Sprache in eine andere übersetzt. Wenn Sie auf die beschreibbare Seite schreiben, werden für den Transformationsstream neue Daten für das Lesen von der lesbaren Seite zur Verfügung gestellt. Konkret kann jedes Objekt mit den Attributen writable und readable als Transformationsstream dienen. Mit der Standardklasse TransformStream ist es jedoch einfacher, ein solches Paar zu erstellen, das richtig verkettet ist.

Rohrketten

Streams werden hauptsächlich über Weiterleitungen aneinander verwendet. Ein lesbarer Stream kann mithilfe der Methode pipeTo() des lesbaren Streams direkt an einen beschreibbaren Stream übergeben oder mit der Methode pipeThrough() des lesbaren Streams zuerst durch einen oder mehrere Transformationsstreams geleitet werden. Eine Gruppe von Streams, die auf diese Weise per Pipe verbunden werden, wird als Pipe-Kette bezeichnet.

Gegendruck

Sobald eine Pipe-Kette erstellt wurde, propagiert sie Signale darüber, wie schnell Blöcke durch sie fließen sollen. Wenn ein Schritt in der Kette noch keine Blöcke annehmen kann, breitet er ein Signal rückwärts durch die Pipe-Kette weiter, bis schließlich die ursprüngliche Quelle angewiesen wird, keine so schnellen Blöcke mehr zu erzeugen. Dieser Vorgang der Normalisierung des Flusses wird als Rückdruck bezeichnet.

Abschlag

Ein lesbarer Stream kann mithilfe der Methode tee() abgerufen werden (nach der Form eines Großbuchstabens „T“). Dadurch wird der Stream gesperrt, er ist also nicht mehr direkt nutzbar. Es werden jedoch zwei neue Streams, sogenannte Zweige, erstellt, die unabhängig voneinander verarbeitet werden können. Teeing ist auch wichtig, da Streams nicht zurückgespult oder neu gestartet werden können. Mehr dazu später.

Diagramm einer Pipe-Kette, bestehend aus einem lesbaren Stream, der aus einem Aufruf der Abruf-API stammt, durch einen Transformationsstream geleitet wird, dessen Ausgabe als T-Shirt abgerufen und dann an den Browser für den ersten resultierenden lesbaren Stream und an den Service Worker-Cache für den zweiten resultierenden lesbaren Stream gesendet wird.
Eine Pipe-Kette.

Die Mechanismen eines lesbaren Streams

Ein lesbarer Stream ist eine Datenquelle, die in JavaScript durch ein ReadableStream-Objekt dargestellt wird, das aus einer zugrunde liegenden Quelle stammt. Der Konstruktor ReadableStream() erstellt und gibt ein lesbares Streamobjekt von den angegebenen Handlern zurück. Es gibt zwei Arten zugrunde liegender Quelle:

  • Push-Quellen senden ständig Daten an Sie, wenn Sie darauf zugegriffen haben. Es liegt an Ihnen, den Zugriff auf den Stream zu starten, zu pausieren oder abzubrechen. Beispiele hierfür sind Live-Videostreams, vom Server gesendete Ereignisse oder WebSockets.
  • Bei Pull-Quellen müssen Sie Daten explizit anfordern, sobald eine Verbindung besteht. Beispiele hierfür sind HTTP-Vorgänge über fetch()- oder XMLHttpRequest-Aufrufe.

Streamdaten werden sequenziell in kleinen Stücken gelesen, die als Chunks bezeichnet werden. Die in einem Stream platzierten Blöcke werden angeblich in die Warteschlange gestellt. Sie befinden sich also in einer Warteschlange, die zum Lesen bereit ist. Eine interne Warteschlange erfasst die noch nicht gelesenen Blöcke.

Eine Warteschlangenstrategie ist ein Objekt, das bestimmt, wie ein Stream basierend auf dem Status seiner internen Warteschlange einen Rückdruck signalisieren soll. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht die Gesamtgröße aller Blöcke in der Warteschlange mit einer bestimmten Zahl, die auch als High Watermark bezeichnet wird.

Die Blöcke im Stream werden von einem Reader gelesen. Dieser Reader ruft die Daten Block für Stück ab, sodass Sie jede Art von Operation ausführen können, mit der Sie arbeiten möchten. Das Lesegerät und der andere zugehörige Verarbeitungscode werden als Nutzer bezeichnet.

Das nächste Konstrukt in diesem Kontext wird als Controller bezeichnet. Jedem lesbaren Stream ist ein Controller zugeordnet, mit dem Sie den Stream steuern können.

Es kann immer nur ein Leser einen Stream auf einmal lesen. Wenn ein Leser erstellt wird und mit dem Lesen eines Streams beginnt (also zu einem aktiven Leser wird), ist er an diesen gesperrt. Wenn Sie möchten, dass ein anderer Leser das Lesen Ihres Streams übernimmt, müssen Sie in der Regel den ersten Reader freigeben, bevor Sie etwas anderes tun können. Sie können aber auch Streams abschlagen.

Einen lesbaren Stream erstellen

Sie erstellen einen lesbaren Stream, indem Sie dessen Konstruktor ReadableStream() aufrufen. Der Konstruktor hat das optionale Argument underlyingSource, das ein Objekt mit Methoden und Attributen darstellt, die definieren, wie sich die erstellte Streaminstanz verhält.

underlyingSource

Dabei können die folgenden optionalen, vom Entwickler definierten Methoden verwendet werden:

  • start(controller): Wird sofort bei der Erstellung des Objekts aufgerufen. Die Methode kann auf die Streamquelle zugreifen und andere Aktionen ausführen, die zum Einrichten der Streamfunktionalität erforderlich sind. Wenn dieser Prozess asynchron erfolgen soll, kann die Methode ein Versprechen über Erfolg oder Misserfolg zurückgeben. Der an diese Methode übergebene Parameter controller ist ReadableStreamDefaultController.
  • pull(controller): Kann verwendet werden, um den Stream zu steuern, wenn mehr Blöcke abgerufen werden. Sie wird wiederholt aufgerufen, solange die interne Warteschlange des Streams nicht voll ist, bis die Warteschlange ihren Höchstwert erreicht. Wenn das Ergebnis des Aufrufs von pull() ein Promise ist, wird pull() erst dann noch einmal aufgerufen, wenn das Versprechen erfüllt ist. Wenn das Promise abgelehnt wird, tritt ein Fehler im Stream auf.
  • cancel(reason): Wird aufgerufen, wenn der Streamnutzer den Stream abbricht.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

Das ReadableStreamDefaultController unterstützt die folgenden Methoden:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Das zweite, ebenfalls optionale Argument des ReadableStream()-Konstruktors ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Dafür sind zwei Parameter erforderlich:

  • highWaterMark: Eine positive Zahl, die die Hochwassermarke des Streams angibt, die auf diese Warteschlangenstrategie angewendet wird.
  • size(chunk): Eine Funktion, die die endliche, nicht negative Größe des gegebenen Chunk-Werts berechnet und zurückgibt. Mit dem Ergebnis wird der Rückdruck bestimmt, der sich über die entsprechende ReadableStreamDefaultController.desiredSize-Eigenschaft zeigt. Außerdem legt sie fest, wann die Methode pull() der zugrunde liegenden Quelle aufgerufen wird.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Die Methoden getReader() und read()

Zum Lesen aus einem lesbaren Stream benötigen Sie einen Reader, der ein ReadableStreamDefaultReader ist. Die Methode getReader() der ReadableStream-Schnittstelle erstellt einen Reader und sperrt den Stream darauf. Solange der Stream gesperrt ist, kann kein anderes Lesegerät erworben werden, bis dieses Lesegerät freigegeben wird.

Die Methode read() der ReadableStreamDefaultReader-Schnittstelle gibt ein Versprechen zurück, das Zugriff auf den nächsten Block in der internen Warteschlange des Streams bietet. Er erfüllt oder lehnt ihn mit einem Ergebnis ab, das vom Status des Streams abhängt. Es gibt folgende Möglichkeiten:

  • Wenn ein Chunk verfügbar ist, wird das Versprechen mit einem Objekt der Form
    { value: chunk, done: false } erfüllt.
  • Wenn der Stream geschlossen wird, wird das Versprechen mit einem Objekt des Formats
    { value: undefined, done: true } erfüllt.
  • Wenn beim Stream ein Fehler auftritt, wird das Promise mit dem entsprechenden Fehler abgelehnt.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Das Attribut locked

Sie können prüfen, ob ein lesbarer Stream gesperrt ist. Dazu verwenden Sie das Attribut ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Lesbare Codebeispiele für Streams

Das folgende Codebeispiel zeigt alle Schritte in Aktion. Erstellen Sie zuerst eine ReadableStream, die in ihrem underlyingSource-Argument (d. h. der Klasse TimestampSource) eine start()-Methode definiert. Mit dieser Methode wird der controller des Streams angewiesen, während zehn Sekunden jede Sekunde einen enqueue()-Zeitstempel zu generieren. Schließlich wird der Controller angewiesen, den Stream zu close(). Sie können diesen Stream nutzen, indem Sie mit der Methode getReader() einen Reader erstellen und read() aufrufen, bis der Stream done ist.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Asynchrone Iteration

Die Prüfung der einzelnen read()-Schleifendurchläufe, ob der Stream done ist, ist möglicherweise nicht die bequemste API. Glücklicherweise gibt es dazu bald eine bessere Methode: die asynchrone Iteration.

for await (const chunk of stream) {
  console.log(chunk);
}

Eine Umgehung der heutigen Verwendung der asynchronen Iteration besteht in der Implementierung des Verhaltens mit einem Polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Einen lesbaren Stream abschicken

Die Methode tee() der ReadableStream-Schnittstelle schneidet den aktuell lesbaren Stream ab und gibt ein Array mit zwei Elementen zurück, das die beiden resultierenden Zweige als neue ReadableStream-Instanzen enthält. Dadurch können zwei Leser einen Stream gleichzeitig lesen. Sie können dies beispielsweise in einem Service Worker tun, wenn Sie eine Antwort vom Server abrufen und an den Browser streamen, aber auch in den Service Worker-Cache streamen möchten. Da ein Antworttext nicht mehr als einmal verarbeitet werden kann, benötigen Sie dazu zwei Kopien. Wenn Sie den Stream abbrechen möchten, müssen Sie dann beide resultierenden Zweige abbrechen. Durch das Teegen eines Streams wird dieser in der Regel für die Dauer gesperrt, sodass andere Leser ihn nicht sperren können.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Lesbare Bytestreams

Für Streams, die Byte darstellen, wird eine erweiterte Version des lesbaren Streams bereitgestellt, um Bytes effizient zu verarbeiten, insbesondere durch Minimieren von Kopien. Bytestreams ermöglichen den Erwerb eigener BYOB-Lesegeräte (Bring-your-own-buffer). Die Standardimplementierung kann einen Bereich verschiedener Ausgaben bereitstellen, wie z. B. Strings oder Array-Zwischenspeicher im Fall von WebSockets, während Bytestreams die Byteausgabe garantieren. Darüber hinaus bieten BYOB-Lesegeräte Stabilitätsvorteile. Dies liegt daran, dass beim Trennen eines Zwischenspeichers garantiert werden kann, dass nicht zweimal in denselben Zwischenspeicher geschrieben wird, wodurch Race-Bedingungen vermieden werden. BYOB-Leser können die Häufigkeit reduzieren, mit der der Browser die automatische Speicherbereinigung ausführen muss, da er Puffer wiederverwenden kann.

Lesbaren Bytestream erstellen

Sie können einen lesbaren Bytestream erstellen, indem Sie einen zusätzlichen type-Parameter an den ReadableStream()-Konstruktor übergeben.

new ReadableStream({ type: 'bytes' });

underlyingSource

Der zugrunde liegenden Quelle eines lesbaren Bytestreams wird eine ReadableByteStreamController zur Bearbeitung zugewiesen. Die Methode ReadableByteStreamController.enqueue() verwendet ein chunk-Argument, dessen Wert ein ArrayBufferView ist. Das Attribut ReadableByteStreamController.byobRequest gibt die aktuelle BYOB-Pull-Anfrage oder null zurück, wenn keine vorhanden ist. Schließlich gibt das Attribut ReadableByteStreamController.desiredSize die gewünschte Größe zurück, um die interne Warteschlange des kontrollierten Streams zu füllen.

queuingStrategy

Das zweite, ebenfalls optionale Argument des ReadableStream()-Konstruktors ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Es enthält einen Parameter:

  • highWaterMark: Eine positive Anzahl von Byte, die den Höchstwert des Streams angibt, der diese Warteschlangenstrategie verwendet. Damit lässt sich der Rückdruck über das entsprechende ReadableByteStreamController.desiredSize-Attribut ermitteln. Außerdem legt sie fest, wann die Methode pull() der zugrunde liegenden Quelle aufgerufen wird.

Die Methoden getReader() und read()

Wenn Sie dann Zugriff auf ein ReadableStreamBYOBReader erhalten, legen Sie den Parameter mode entsprechend fest: ReadableStream.getReader({ mode: "byob" }). Dies ermöglicht eine präzisere Kontrolle über die Zwischenspeicherzuweisung, um Kopien zu vermeiden. Zum Lesen aus dem Bytestream müssen Sie ReadableStreamBYOBReader.read(view) aufrufen, wobei view ein ArrayBufferView ist.

Codebeispiel für lesbaren Bytestream

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Die folgende Funktion gibt lesbare Bytestreams zurück, die ein effizientes Lesen ohne Kopie eines zufällig generierten Arrays ermöglichen. Anstatt eine vordefinierte Chunk-Größe von 1.024 zu verwenden,wird versucht, den vom Entwickler bereitgestellten Puffer zu füllen, um die volle Kontrolle zu ermöglichen.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Die Mechanismen eines beschreibbaren Streams

Ein beschreibbarer Stream ist ein Ziel, in das Sie Daten schreiben können. Dies wird in JavaScript durch ein WritableStream-Objekt dargestellt. Dies dient als Abstraktion über einer zugrunde liegenden Senke – einer untergeordneten E/A-Senke, in die Rohdaten geschrieben werden.

Die Daten werden Chunk für Stück über einen Writer in den Stream geschrieben. Ein Chunk kann eine Vielzahl von Formen annehmen, genau wie die Chunks in einem Reader. Sie können jeden beliebigen Code verwenden, um die schreibbaren Blöcke zu erzeugen. Der Autor und der zugehörige Code werden als Producer bezeichnet.

Wenn ein Writer erstellt wird und mit dem Schreiben in einen Stream beginnt (ein aktiver Autor), gilt er als gesperrt. Es kann immer nur ein Autor gleichzeitig in einen beschreibbaren Stream schreiben. Wenn ein anderer Autor mit dem Schreiben in Ihren Stream beginnen soll, müssen Sie den Stream zuerst freigeben, bevor Sie einen anderen Autor anhängen können.

Eine interne Warteschlange erfasst die Blöcke, die in den Stream geschrieben, aber noch nicht von der zugrunde liegenden Senke verarbeitet wurden.

Eine Warteschlangenstrategie ist ein Objekt, das bestimmt, wie ein Stream basierend auf dem Status seiner internen Warteschlange einen Rückdruck signalisieren soll. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht die Gesamtgröße aller Blöcke in der Warteschlange mit einer bestimmten Zahl, die auch als High Watermark bezeichnet wird.

Das letzte Konstrukt wird als Controller bezeichnet. Jedem beschreibbaren Stream ist ein Controller zugeordnet, mit dem Sie den Stream steuern und ihn beispielsweise abbrechen können.

Beschreibbaren Stream erstellen

Die WritableStream-Schnittstelle der Streams API bietet eine Standardabstraktion zum Schreiben von Streamingdaten in ein Ziel, das als Senke bezeichnet wird. Dieses Objekt hat einen integrierten Rückdruck und eine Warteschlange. Sie erstellen einen beschreibbaren Stream, indem Sie dessen Konstruktor WritableStream() aufrufen. Der optionale Parameter underlyingSink steht für ein Objekt mit Methoden und Attributen, die das Verhalten der erstellten Streaminstanz definieren.

underlyingSink

Der underlyingSink kann die folgenden optionalen, vom Entwickler definierten Methoden enthalten. Der controller-Parameter, der an einige der Methoden übergeben wird, ist WritableStreamDefaultController.

  • start(controller): Diese Methode wird sofort beim Erstellen des Objekts aufgerufen. Der Inhalt dieser Methode sollte darauf abzielen, Zugriff auf die zugrunde liegende Senke zu erhalten. Wenn dieser Prozess asynchron ausgeführt werden soll, kann er ein Versprechen über Erfolg oder Misserfolg zurückgeben.
  • write(chunk, controller): Diese Methode wird aufgerufen, wenn ein neuer Datenblock (im Parameter chunk angegeben) für das Schreiben in die zugrunde liegende Senke bereit ist. Sie kann ein Versprechen über einen erfolgreichen oder fehlgeschlagenen Schreibvorgang zurückgeben. Diese Methode wird erst aufgerufen, nachdem vorherige Schreibvorgänge erfolgreich waren und nie, nachdem der Stream geschlossen oder abgebrochen wurde.
  • close(controller): Diese Methode wird aufgerufen, wenn die Anwendung signalisiert, dass sie das Schreiben von Blöcken in den Stream abgeschlossen hat. Der Inhalt sollte alles tun, was erforderlich ist, um Schreibvorgänge in der zugrunde liegenden Senke abzuschließen und den Zugriff darauf freizugeben. Wenn dieser Prozess asynchron ist, kann er ein Versprechen über Erfolg oder Misserfolg zurückgeben. Diese Methode wird erst aufgerufen, wenn alle Schreibvorgänge in der Warteschlange erfolgreich waren.
  • abort(reason): Diese Methode wird aufgerufen, wenn die Anwendung signalisiert, dass der Stream abrupt geschlossen und in einen fehlerhaften Zustand versetzt werden soll. Er kann alle zurückgehaltenen Ressourcen bereinigen, ähnlich wie close(), aber abort() wird auch dann aufgerufen, wenn Schreibvorgänge in der Warteschlange stehen. Diese Teile werden dann weggeworfen. Wenn dieser Prozess asynchron ist, kann er ein Versprechen über Erfolg oder Misserfolg zurückgeben. Der Parameter reason enthält einen DOMString-Wert, der beschreibt, warum der Stream abgebrochen wurde.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Die WritableStreamDefaultController-Schnittstelle der Streams API stellt einen Controller dar, mit dem während der Einrichtung, wenn mehr Blöcke zum Schreiben gesendet werden, oder am Ende des Schreibvorgangs der Status einer WritableStream gesteuert werden kann. Beim Erstellen einer WritableStream wird der zugrunde liegenden Senke eine entsprechende WritableStreamDefaultController-Instanz zur Bearbeitung zugewiesen. Für WritableStreamDefaultController gibt es nur eine Methode: WritableStreamDefaultController.error(). Diese führt bei allen zukünftigen Interaktionen mit dem verknüpften Stream zu Fehlern. WritableStreamDefaultController unterstützt auch ein signal-Attribut, das eine Instanz von AbortSignal zurückgibt, sodass ein WritableStream-Vorgang bei Bedarf beendet werden kann.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Das zweite, ebenfalls optionale Argument des WritableStream()-Konstruktors ist queuingStrategy. Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Dafür sind zwei Parameter erforderlich:

  • highWaterMark: Eine positive Zahl, die die Hochwassermarke des Streams angibt, die auf diese Warteschlangenstrategie angewendet wird.
  • size(chunk): Eine Funktion, die die endliche, nicht negative Größe des gegebenen Chunk-Werts berechnet und zurückgibt. Mit dem Ergebnis wird der Rückdruck bestimmt, der sich über die entsprechende WritableStreamDefaultWriter.desiredSize-Eigenschaft zeigt.

Die Methoden getWriter() und write()

Zum Schreiben in einen beschreibbaren Stream benötigen Sie einen Writer. Dieser ist ein WritableStreamDefaultWriter. Die Methode getWriter() der WritableStream-Schnittstelle gibt eine neue Instanz von WritableStreamDefaultWriter zurück und sperrt den Stream auf diese Instanz. Solange der Stream gesperrt ist, kann kein anderer Autor übernommen werden, bis der aktuelle Stream freigegeben wird.

Die Methode write() der WritableStreamDefaultWriter-Schnittstelle schreibt einen übergebenen Datenblock in einen WritableStream und die zugrunde liegende Senke und gibt dann ein Versprechen zurück, das aufgelöst wird, um den Erfolg oder Misserfolg des Schreibvorgangs anzuzeigen. Was „Erfolg“ bedeutet, hängt von der zugrunde liegenden Senke ab. Es kann darauf hindeuten, dass der Chunk akzeptiert und nicht unbedingt an seinem endgültigen Ziel gespeichert ist.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Das Attribut locked

Mit dem Attribut WritableStream.locked können Sie prüfen, ob ein beschreibbarer Stream gesperrt ist.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Codebeispiel für beschreibbaren Stream

Das folgende Codebeispiel zeigt alle Schritte in Aktion.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Einen lesbaren Stream an einen beschreibbaren Stream senden

Ein lesbarer Stream kann über die Methode pipeTo() des lesbaren Streams an einen beschreibbaren Stream übergeben werden. ReadableStream.pipeTo() leitet den aktuellen ReadableStream an eine bestimmte WritableStream weiter und gibt ein Versprechen zurück, das erfüllt ist, wenn der Pipe-Prozess erfolgreich abgeschlossen wurde, oder lehnt den Vorgang ab, wenn Fehler aufgetreten sind.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Transformationsstream erstellen

Die Schnittstelle TransformStream der Streams API stellt einen Satz transformierbarer Daten dar. Um einen Transformationsstream zu erstellen, rufen Sie seinen Konstruktor TransformStream() auf, der ein Transformationsstreamobjekt aus den angegebenen Handlern erstellt und zurückgibt. Der TransformStream()-Konstruktor akzeptiert als erstes Argument ein optionales JavaScript-Objekt, das transformer darstellt. Solche Objekte können eine der folgenden Methoden enthalten:

transformer

  • start(controller): Diese Methode wird sofort beim Erstellen des Objekts aufgerufen. In der Regel wird dies verwendet, um Präfix-Chunks mit controller.enqueue() in die Warteschlange zu stellen. Diese Blöcke werden von der lesbaren Seite gelesen, sind aber nicht von Schreibvorgängen auf der beschreibbaren Seite abhängig. Wenn dieser anfängliche Prozess asynchron ist, z. B. weil etwas Aufwand zum Abrufen der Präfix-Chunks erforderlich ist, kann die Funktion ein Promise zurückgeben, um Erfolg oder Misserfolg zu signalisieren. Ein abgelehntes Promise führt zu einem Fehler im Stream. Alle ausgelösten Ausnahmen werden vom TransformStream()-Konstruktor noch einmal ausgelöst.
  • transform(chunk, controller): Diese Methode wird aufgerufen, wenn ein neuer Block, der ursprünglich auf die beschreibbare Seite geschrieben wurde, zur Transformation bereit ist. Die Streamimplementierung sorgt dafür, dass diese Funktion erst aufgerufen wird, nachdem vorherige Transformationen erfolgreich waren, und nie, bevor start() oder nachdem flush() aufgerufen wurde. Diese Funktion führt die eigentliche Transformation des Transformationsstreams aus. Die Ergebnisse können mit controller.enqueue() in eine Warteschlange gestellt werden. Dadurch kann ein einzelner auf die beschreibbarer Seite geschriebener Block zu null oder mehreren Blöcken auf der lesbaren Seite führen, je nachdem, wie oft controller.enqueue() aufgerufen wird. Wenn der Transformationsprozess asynchron ist, kann diese Funktion ein Versprechen über den Erfolg oder Misserfolg der Transformation zurückgeben. Ein abgelehntes Promise gibt sowohl auf der lesbaren als auch auf der beschreibbaren Seite des Transformationsstreams Fehler. Wenn keine transform()-Methode angegeben ist, wird die Identitätstransformation verwendet. Dabei werden Blöcke, die von der beschreibbaren Seite bis zur lesbaren Seite unverändert bleiben, in die Warteschlange gestellt.
  • flush(controller): Diese Methode wird aufgerufen, nachdem alle auf die beschreibbaren Seite geschriebenen Blöcke transformiert wurden, indem transform() erfolgreich durchlaufen wurde und die beschreibbare Seite bald geschlossen wird. In der Regel wird dies verwendet, um Suffix-Chunks für die lesbare Seite in die Warteschlange zu stellen, bevor auch diese geschlossen wird. Wenn der Leerungsprozess asynchron ist, kann die Funktion ein Versprechen über Erfolg oder Misserfolg zurückgeben. Das Ergebnis wird dem Aufrufer von stream.writable.write() mitgeteilt. Darüber hinaus gibt ein abgelehntes Promise sowohl auf der lesbaren als auch auf der beschreibbaren Seite des Streams einen Fehler zurück. Das Auslösen einer Ausnahme wird so behandelt wie das Zurückgeben eines abgelehnten Promise.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Die Warteschlangenstrategien writableStrategy und readableStrategy

Der zweite und dritte optionale Parameter des TransformStream()-Konstruktors sind optionale writableStrategy- und readableStrategy-Warteschlangenstrategien. Sie sind wie in den Streamabschnitten Lesen bzw. Beschreibbar beschrieben definiert.

Codebeispiel für Transform-Stream

Das folgende Codebeispiel zeigt einen einfachen Transformationsstream in Aktion.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Lesbaren Stream über einen Transformationsstream senden

Die Methode pipeThrough() der ReadableStream-Schnittstelle bietet eine verkettbare Methode, um den aktuellen Stream über einen Transformationsstream oder ein anderes beschreibbares/lesbares Paar zu leiten. Durch das Piping eines Streams wird dieser in der Regel für die Dauer der Pipe gesperrt, sodass andere Leser ihn nicht sperren können.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Das nächste (Bit-konstruierte) Codebeispiel zeigt, wie Sie eine „Shouting“-Version von fetch() implementieren können, in der der gesamte Text in Großbuchstaben geschrieben wird, indem Sie das zurückgegebene Antwortversprechen als Stream auslesen und Chunk für Block in Großbuchstaben schreiben. Der Vorteil dieses Ansatzes besteht darin, dass Sie nicht warten müssen, bis das gesamte Dokument heruntergeladen ist, was bei großen Dateien einen großen Unterschied ausmachen kann.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Demo

Die folgende Demo zeigt lesbare, beschreibbare und transformierende Streams in Aktion. Außerdem finden Sie hier Beispiele für pipeThrough()- und pipeTo()-Pip-Ketten und veranschaulicht tee(). Optional können Sie die Demo in einem eigenen Fenster ausführen oder den Quellcode ansehen.

Nützliche Streams, die im Browser verfügbar sind

In den Browser sind eine Reihe nützlicher Streams integriert. Sie können ein ReadableStream ganz einfach aus einem Blob erstellen. Die Methode stream() der Blob-Schnittstelle gibt einen ReadableStream zurück, der beim Lesen die im Blob enthaltenen Daten zurückgibt. Denken Sie auch daran, dass ein File-Objekt eine bestimmte Art von Blob ist und in jedem Kontext verwendet werden kann, den es für ein Blob gibt.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Die Streamingvarianten von TextDecoder.decode() und TextEncoder.encode() werden TextDecoderStream bzw. TextEncoderStream genannt.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Dateien lassen sich mit den Transformationsstreams CompressionStream und DecompressionStream ganz einfach komprimieren oder dekomprimieren. Das folgende Codebeispiel zeigt, wie Sie die Streams-Spezifikation herunterladen, sie direkt im Browser komprimieren (gzip) und die komprimierte Datei direkt auf das Laufwerk schreiben können.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Die FileSystemWritableFileStream der File System Access API und die experimentellen fetch()-Anfragestreams sind Beispiele für beschreibbare Streams.

Die Serial API nutzt in hohem Maße sowohl lesbare als auch beschreibbare Streams.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

Die WebSocketStream API bindet Streams in die WebSocket API ein.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Nützliches Infomaterial

Danksagungen

Dieser Artikel wurde von Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley und Adam Rice verfasst. Die Blogposts von Jake Archibald haben mir sehr dabei geholfen, Streams zu verstehen. Einige der Codebeispiele sind von den explorativen Datenanalysen des GitHub-Nutzers @bellbind inspiriert und Teile der Beschreibung stützen sich stark auf die MDN-Webdokumente in Streams. Die Autoren des Streams-Standards haben beim Schreiben dieser Spezifikation hervorragende Arbeit geleistet. Hero-Image von Ryan Lara auf Unsplash