Hier erfahren Sie, wie Sie mit der Streams API lesbare und beschreibbare Streams verwenden und transformieren können.
Mit der Streams API können Sie programmatisch auf Datenstreams zugreifen, die über das Netzwerk empfangen oder auf andere Weise lokal erstellt wurden, und sie mit JavaScript verarbeiten. Beim Streaming wird eine Ressource, die Sie empfangen, senden oder transformieren möchten, in kleine Blöcke unterteilt und diese dann Stück für Stück verarbeitet. Während Browser beim Empfang von Assets wie HTML oder Videos, die auf Webseiten angezeigt werden sollen, trotzdem etwas tun, war diese Funktion für JavaScript erst seit der Einführung von fetch
mit Streams im Jahr 2015 für JavaScript verfügbar.
Wenn Sie bisher eine Ressource (z. B. ein Video oder eine Textdatei usw.) verarbeiten wollten, mussten Sie die gesamte Datei herunterladen, warten, bis sie in ein geeignetes Format deserialisiert wurde, und sie dann verarbeiten. Mit Streams, die für JavaScript verfügbar sind, ändert sich das. Sie können nun Rohdaten mit JavaScript verarbeiten, sobald sie auf dem Client verfügbar sind, ohne einen Puffer, String oder Blob generieren zu müssen. Dadurch wird eine Reihe von Anwendungsfällen genutzt, von denen ich einige nachfolgend auflistet:
- Videoeffekte:Weiterleitung eines lesbaren Videostreams durch einen Umwandlungsstream, bei dem Effekte in Echtzeit angewendet werden.
- Datenkomprimierung (De-)Komprimierung:Pipeline eines Dateistreams durch einen Transformationsstream, der ihn selektiv (de)komprimiert.
- Bilddecodierung:Leiten eines HTTP-Antwortstreams durch einen Transformationsstream, der Bytes in Bitmapdaten decodiert, und anschließend über einen anderen Transformationsstream, der Bitmaps in PNGs übersetzt. Wenn dieser im
fetch
-Handler eines Service Workers installiert ist, können Sie neue Bildformate wie AVIF transparent mit Polyfill für neue Bildformate füllen.
Unterstützte Browser
ReadableStream und WritableStream
TransformStream
Wichtige Konzepte
Bevor ich näher auf die verschiedenen Arten von Streams eingehe, möchte ich zuerst einige grundlegende Konzepte vorstellen.
Stücke
Ein Chunk ist ein einzelnes Datenelement, das in einen Stream geschrieben oder aus diesem gelesen wird. Sie können einen beliebigen Typ haben. Streams können sogar Blöcke verschiedener Typen enthalten. In den meisten Fällen ist ein Block nicht die atomarste Dateneinheit für einen bestimmten Stream. Beispielsweise kann ein Bytestream Blöcke enthalten, die aus 16 KiB-Uint8Array
-Einheiten anstelle von einzelnen Byte bestehen.
Lesbare Streams
Ein lesbarer Stream stellt eine Datenquelle dar, aus der Sie lesen können. Mit anderen Worten: Die Daten kommen aus einem lesbaren Stream. Konkret ist ein lesbarer Stream eine Instanz der ReadableStream
-Klasse.
Beschreibbare Streams
Ein beschreibbarer Stream ist ein Ziel für Daten, in die Sie schreiben können. Mit anderen Worten, die Daten gehen in einen beschreibbaren Stream. Konkret ist ein beschreibbarer Stream eine Instanz der Klasse WritableStream
.
Streams transformieren
Ein Transformationsstream besteht aus einem Paar von Streams: einem beschreibbaren Stream, der als beschreibbare Seite bezeichnet wird, und einem lesbaren Stream, der als lesbare Seite bezeichnet wird.
Eine reale Metapher dafür wäre ein Simultandolmetscher, der spontan von einer Sprache in eine andere übersetzt.
Wenn Sie auf die beschreibbare Seite schreiben, werden für den Transformationsstream neue Daten für das Lesen von der lesbaren Seite zur Verfügung gestellt. Konkret kann jedes Objekt mit den Attributen writable
und readable
als Transformationsstream dienen. Mit der Standardklasse TransformStream
ist es jedoch einfacher, ein solches Paar zu erstellen, das richtig verkettet ist.
Rohrketten
Streams werden hauptsächlich über Weiterleitungen aneinander verwendet. Ein lesbarer Stream kann mithilfe der Methode pipeTo()
des lesbaren Streams direkt an einen beschreibbaren Stream übergeben oder mit der Methode pipeThrough()
des lesbaren Streams zuerst durch einen oder mehrere Transformationsstreams geleitet werden. Eine Gruppe von Streams, die auf diese Weise per Pipe verbunden werden, wird als Pipe-Kette bezeichnet.
Gegendruck
Sobald eine Pipe-Kette erstellt wurde, propagiert sie Signale darüber, wie schnell Blöcke durch sie fließen sollen. Wenn ein Schritt in der Kette noch keine Blöcke annehmen kann, breitet er ein Signal rückwärts durch die Pipe-Kette weiter, bis schließlich die ursprüngliche Quelle angewiesen wird, keine so schnellen Blöcke mehr zu erzeugen. Dieser Vorgang der Normalisierung des Flusses wird als Rückdruck bezeichnet.
Abschlag
Ein lesbarer Stream kann mithilfe der Methode tee()
abgerufen werden (nach der Form eines Großbuchstabens „T“).
Dadurch wird der Stream gesperrt, er ist also nicht mehr direkt nutzbar. Es werden jedoch zwei neue Streams, sogenannte Zweige, erstellt, die unabhängig voneinander verarbeitet werden können.
Teeing ist auch wichtig, da Streams nicht zurückgespult oder neu gestartet werden können. Mehr dazu später.
Die Mechanismen eines lesbaren Streams
Ein lesbarer Stream ist eine Datenquelle, die in JavaScript durch ein ReadableStream
-Objekt dargestellt wird, das aus einer zugrunde liegenden Quelle stammt. Der Konstruktor ReadableStream()
erstellt und gibt ein lesbares Streamobjekt von den angegebenen Handlern zurück. Es gibt zwei Arten zugrunde liegender Quelle:
- Push-Quellen senden ständig Daten an Sie, wenn Sie darauf zugegriffen haben. Es liegt an Ihnen, den Zugriff auf den Stream zu starten, zu pausieren oder abzubrechen. Beispiele hierfür sind Live-Videostreams, vom Server gesendete Ereignisse oder WebSockets.
- Bei Pull-Quellen müssen Sie Daten explizit anfordern, sobald eine Verbindung besteht. Beispiele hierfür sind HTTP-Vorgänge über
fetch()
- oderXMLHttpRequest
-Aufrufe.
Streamdaten werden sequenziell in kleinen Stücken gelesen, die als Chunks bezeichnet werden. Die in einem Stream platzierten Blöcke werden angeblich in die Warteschlange gestellt. Sie befinden sich also in einer Warteschlange, die zum Lesen bereit ist. Eine interne Warteschlange erfasst die noch nicht gelesenen Blöcke.
Eine Warteschlangenstrategie ist ein Objekt, das bestimmt, wie ein Stream basierend auf dem Status seiner internen Warteschlange einen Rückdruck signalisieren soll. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht die Gesamtgröße aller Blöcke in der Warteschlange mit einer bestimmten Zahl, die auch als High Watermark bezeichnet wird.
Die Blöcke im Stream werden von einem Reader gelesen. Dieser Reader ruft die Daten Block für Stück ab, sodass Sie jede Art von Operation ausführen können, mit der Sie arbeiten möchten. Das Lesegerät und der andere zugehörige Verarbeitungscode werden als Nutzer bezeichnet.
Das nächste Konstrukt in diesem Kontext wird als Controller bezeichnet. Jedem lesbaren Stream ist ein Controller zugeordnet, mit dem Sie den Stream steuern können.
Es kann immer nur ein Leser einen Stream auf einmal lesen. Wenn ein Leser erstellt wird und mit dem Lesen eines Streams beginnt (also zu einem aktiven Leser wird), ist er an diesen gesperrt. Wenn Sie möchten, dass ein anderer Leser das Lesen Ihres Streams übernimmt, müssen Sie in der Regel den ersten Reader freigeben, bevor Sie etwas anderes tun können. Sie können aber auch Streams abschlagen.
Einen lesbaren Stream erstellen
Sie erstellen einen lesbaren Stream, indem Sie dessen Konstruktor ReadableStream()
aufrufen.
Der Konstruktor hat das optionale Argument underlyingSource
, das ein Objekt mit Methoden und Attributen darstellt, die definieren, wie sich die erstellte Streaminstanz verhält.
underlyingSource
Dabei können die folgenden optionalen, vom Entwickler definierten Methoden verwendet werden:
start(controller)
: Wird sofort bei der Erstellung des Objekts aufgerufen. Die Methode kann auf die Streamquelle zugreifen und andere Aktionen ausführen, die zum Einrichten der Streamfunktionalität erforderlich sind. Wenn dieser Prozess asynchron erfolgen soll, kann die Methode ein Versprechen über Erfolg oder Misserfolg zurückgeben. Der an diese Methode übergebene Parametercontroller
istReadableStreamDefaultController
.pull(controller)
: Kann verwendet werden, um den Stream zu steuern, wenn mehr Blöcke abgerufen werden. Sie wird wiederholt aufgerufen, solange die interne Warteschlange des Streams nicht voll ist, bis die Warteschlange ihren Höchstwert erreicht. Wenn das Ergebnis des Aufrufs vonpull()
ein Promise ist, wirdpull()
erst dann noch einmal aufgerufen, wenn das Versprechen erfüllt ist. Wenn das Promise abgelehnt wird, tritt ein Fehler im Stream auf.cancel(reason)
: Wird aufgerufen, wenn der Streamnutzer den Stream abbricht.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
Das ReadableStreamDefaultController
unterstützt die folgenden Methoden:
ReadableStreamDefaultController.close()
schließt den zugehörigen Stream.- Mit
ReadableStreamDefaultController.enqueue()
wird ein bestimmter Block in den zugehörigen Stream der Warteschlange gestellt. ReadableStreamDefaultController.error()
führt dazu, dass bei allen zukünftigen Interaktionen mit dem zugehörigen Stream Fehler auftreten.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Das zweite, ebenfalls optionale Argument des ReadableStream()
-Konstruktors ist queuingStrategy
.
Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Dafür sind zwei Parameter erforderlich:
highWaterMark
: Eine positive Zahl, die die Hochwassermarke des Streams angibt, die auf diese Warteschlangenstrategie angewendet wird.size(chunk)
: Eine Funktion, die die endliche, nicht negative Größe des gegebenen Chunk-Werts berechnet und zurückgibt. Mit dem Ergebnis wird der Rückdruck bestimmt, der sich über die entsprechendeReadableStreamDefaultController.desiredSize
-Eigenschaft zeigt. Außerdem legt sie fest, wann die Methodepull()
der zugrunde liegenden Quelle aufgerufen wird.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Die Methoden getReader()
und read()
Zum Lesen aus einem lesbaren Stream benötigen Sie einen Reader, der ein ReadableStreamDefaultReader
ist.
Die Methode getReader()
der ReadableStream
-Schnittstelle erstellt einen Reader und sperrt den Stream darauf. Solange der Stream gesperrt ist, kann kein anderes Lesegerät erworben werden, bis dieses Lesegerät freigegeben wird.
Die Methode read()
der ReadableStreamDefaultReader
-Schnittstelle gibt ein Versprechen zurück, das Zugriff auf den nächsten Block in der internen Warteschlange des Streams bietet. Er erfüllt oder lehnt ihn mit einem Ergebnis ab, das vom Status des Streams abhängt. Es gibt folgende Möglichkeiten:
- Wenn ein Chunk verfügbar ist, wird das Versprechen mit einem Objekt der Form
{ value: chunk, done: false }
erfüllt. - Wenn der Stream geschlossen wird, wird das Versprechen mit einem Objekt des Formats
{ value: undefined, done: true }
erfüllt. - Wenn beim Stream ein Fehler auftritt, wird das Promise mit dem entsprechenden Fehler abgelehnt.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Das Attribut locked
Sie können prüfen, ob ein lesbarer Stream gesperrt ist. Dazu verwenden Sie das Attribut ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Lesbare Codebeispiele für Streams
Das folgende Codebeispiel zeigt alle Schritte in Aktion. Erstellen Sie zuerst eine ReadableStream
, die in ihrem underlyingSource
-Argument (d. h. der Klasse TimestampSource
) eine start()
-Methode definiert.
Mit dieser Methode wird der controller
des Streams angewiesen, während zehn Sekunden jede Sekunde einen enqueue()
-Zeitstempel zu generieren.
Schließlich wird der Controller angewiesen, den Stream zu close()
. Sie können diesen Stream nutzen, indem Sie mit der Methode getReader()
einen Reader erstellen und read()
aufrufen, bis der Stream done
ist.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Asynchrone Iteration
Die Prüfung der einzelnen read()
-Schleifendurchläufe, ob der Stream done
ist, ist möglicherweise nicht die bequemste API.
Glücklicherweise gibt es dazu bald eine bessere Methode: die asynchrone Iteration.
for await (const chunk of stream) {
console.log(chunk);
}
Eine Umgehung der heutigen Verwendung der asynchronen Iteration besteht in der Implementierung des Verhaltens mit einem Polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Einen lesbaren Stream abschicken
Die Methode tee()
der ReadableStream
-Schnittstelle schneidet den aktuell lesbaren Stream ab und gibt ein Array mit zwei Elementen zurück, das die beiden resultierenden Zweige als neue ReadableStream
-Instanzen enthält. Dadurch können zwei Leser einen Stream gleichzeitig lesen. Sie können dies beispielsweise in einem Service Worker tun, wenn Sie eine Antwort vom Server abrufen und an den Browser streamen, aber auch in den Service Worker-Cache streamen möchten. Da ein Antworttext nicht mehr als einmal verarbeitet werden kann, benötigen Sie dazu zwei Kopien. Wenn Sie den Stream abbrechen möchten, müssen Sie dann beide resultierenden Zweige abbrechen. Durch das Teegen eines Streams wird dieser in der Regel für die Dauer gesperrt, sodass andere Leser ihn nicht sperren können.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Lesbare Bytestreams
Für Streams, die Byte darstellen, wird eine erweiterte Version des lesbaren Streams bereitgestellt, um Bytes effizient zu verarbeiten, insbesondere durch Minimieren von Kopien. Bytestreams ermöglichen den Erwerb eigener BYOB-Lesegeräte (Bring-your-own-buffer). Die Standardimplementierung kann einen Bereich verschiedener Ausgaben bereitstellen, wie z. B. Strings oder Array-Zwischenspeicher im Fall von WebSockets, während Bytestreams die Byteausgabe garantieren. Darüber hinaus bieten BYOB-Lesegeräte Stabilitätsvorteile. Dies liegt daran, dass beim Trennen eines Zwischenspeichers garantiert werden kann, dass nicht zweimal in denselben Zwischenspeicher geschrieben wird, wodurch Race-Bedingungen vermieden werden. BYOB-Leser können die Häufigkeit reduzieren, mit der der Browser die automatische Speicherbereinigung ausführen muss, da er Puffer wiederverwenden kann.
Lesbaren Bytestream erstellen
Sie können einen lesbaren Bytestream erstellen, indem Sie einen zusätzlichen type
-Parameter an den ReadableStream()
-Konstruktor übergeben.
new ReadableStream({ type: 'bytes' });
underlyingSource
Der zugrunde liegenden Quelle eines lesbaren Bytestreams wird eine ReadableByteStreamController
zur Bearbeitung zugewiesen. Die Methode ReadableByteStreamController.enqueue()
verwendet ein chunk
-Argument, dessen Wert ein ArrayBufferView
ist. Das Attribut ReadableByteStreamController.byobRequest
gibt die aktuelle BYOB-Pull-Anfrage oder null zurück, wenn keine vorhanden ist. Schließlich gibt das Attribut ReadableByteStreamController.desiredSize
die gewünschte Größe zurück, um die interne Warteschlange des kontrollierten Streams zu füllen.
queuingStrategy
Das zweite, ebenfalls optionale Argument des ReadableStream()
-Konstruktors ist queuingStrategy
.
Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Es enthält einen Parameter:
highWaterMark
: Eine positive Anzahl von Byte, die den Höchstwert des Streams angibt, der diese Warteschlangenstrategie verwendet. Damit lässt sich der Rückdruck über das entsprechendeReadableByteStreamController.desiredSize
-Attribut ermitteln. Außerdem legt sie fest, wann die Methodepull()
der zugrunde liegenden Quelle aufgerufen wird.
Die Methoden getReader()
und read()
Wenn Sie dann Zugriff auf ein ReadableStreamBYOBReader
erhalten, legen Sie den Parameter mode
entsprechend fest: ReadableStream.getReader({ mode: "byob" })
. Dies ermöglicht eine präzisere Kontrolle über die Zwischenspeicherzuweisung, um Kopien zu vermeiden. Zum Lesen aus dem Bytestream müssen Sie ReadableStreamBYOBReader.read(view)
aufrufen, wobei view
ein ArrayBufferView
ist.
Codebeispiel für lesbaren Bytestream
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Die folgende Funktion gibt lesbare Bytestreams zurück, die ein effizientes Lesen ohne Kopie eines zufällig generierten Arrays ermöglichen. Anstatt eine vordefinierte Chunk-Größe von 1.024 zu verwenden,wird versucht, den vom Entwickler bereitgestellten Puffer zu füllen, um die volle Kontrolle zu ermöglichen.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Die Mechanismen eines beschreibbaren Streams
Ein beschreibbarer Stream ist ein Ziel, in das Sie Daten schreiben können. Dies wird in JavaScript durch ein WritableStream
-Objekt dargestellt. Dies dient als Abstraktion über einer zugrunde liegenden Senke – einer untergeordneten E/A-Senke, in die Rohdaten geschrieben werden.
Die Daten werden Chunk für Stück über einen Writer in den Stream geschrieben. Ein Chunk kann eine Vielzahl von Formen annehmen, genau wie die Chunks in einem Reader. Sie können jeden beliebigen Code verwenden, um die schreibbaren Blöcke zu erzeugen. Der Autor und der zugehörige Code werden als Producer bezeichnet.
Wenn ein Writer erstellt wird und mit dem Schreiben in einen Stream beginnt (ein aktiver Autor), gilt er als gesperrt. Es kann immer nur ein Autor gleichzeitig in einen beschreibbaren Stream schreiben. Wenn ein anderer Autor mit dem Schreiben in Ihren Stream beginnen soll, müssen Sie den Stream zuerst freigeben, bevor Sie einen anderen Autor anhängen können.
Eine interne Warteschlange erfasst die Blöcke, die in den Stream geschrieben, aber noch nicht von der zugrunde liegenden Senke verarbeitet wurden.
Eine Warteschlangenstrategie ist ein Objekt, das bestimmt, wie ein Stream basierend auf dem Status seiner internen Warteschlange einen Rückdruck signalisieren soll. Die Warteschlangenstrategie weist jedem Block eine Größe zu und vergleicht die Gesamtgröße aller Blöcke in der Warteschlange mit einer bestimmten Zahl, die auch als High Watermark bezeichnet wird.
Das letzte Konstrukt wird als Controller bezeichnet. Jedem beschreibbaren Stream ist ein Controller zugeordnet, mit dem Sie den Stream steuern und ihn beispielsweise abbrechen können.
Beschreibbaren Stream erstellen
Die WritableStream
-Schnittstelle der Streams API bietet eine Standardabstraktion zum Schreiben von Streamingdaten in ein Ziel, das als Senke bezeichnet wird. Dieses Objekt hat einen integrierten Rückdruck und eine Warteschlange. Sie erstellen einen beschreibbaren Stream, indem Sie dessen Konstruktor WritableStream()
aufrufen.
Der optionale Parameter underlyingSink
steht für ein Objekt mit Methoden und Attributen, die das Verhalten der erstellten Streaminstanz definieren.
underlyingSink
Der underlyingSink
kann die folgenden optionalen, vom Entwickler definierten Methoden enthalten. Der controller
-Parameter, der an einige der Methoden übergeben wird, ist WritableStreamDefaultController
.
start(controller)
: Diese Methode wird sofort beim Erstellen des Objekts aufgerufen. Der Inhalt dieser Methode sollte darauf abzielen, Zugriff auf die zugrunde liegende Senke zu erhalten. Wenn dieser Prozess asynchron ausgeführt werden soll, kann er ein Versprechen über Erfolg oder Misserfolg zurückgeben.write(chunk, controller)
: Diese Methode wird aufgerufen, wenn ein neuer Datenblock (im Parameterchunk
angegeben) für das Schreiben in die zugrunde liegende Senke bereit ist. Sie kann ein Versprechen über einen erfolgreichen oder fehlgeschlagenen Schreibvorgang zurückgeben. Diese Methode wird erst aufgerufen, nachdem vorherige Schreibvorgänge erfolgreich waren und nie, nachdem der Stream geschlossen oder abgebrochen wurde.close(controller)
: Diese Methode wird aufgerufen, wenn die Anwendung signalisiert, dass sie das Schreiben von Blöcken in den Stream abgeschlossen hat. Der Inhalt sollte alles tun, was erforderlich ist, um Schreibvorgänge in der zugrunde liegenden Senke abzuschließen und den Zugriff darauf freizugeben. Wenn dieser Prozess asynchron ist, kann er ein Versprechen über Erfolg oder Misserfolg zurückgeben. Diese Methode wird erst aufgerufen, wenn alle Schreibvorgänge in der Warteschlange erfolgreich waren.abort(reason)
: Diese Methode wird aufgerufen, wenn die Anwendung signalisiert, dass der Stream abrupt geschlossen und in einen fehlerhaften Zustand versetzt werden soll. Er kann alle zurückgehaltenen Ressourcen bereinigen, ähnlich wieclose()
, aberabort()
wird auch dann aufgerufen, wenn Schreibvorgänge in der Warteschlange stehen. Diese Teile werden dann weggeworfen. Wenn dieser Prozess asynchron ist, kann er ein Versprechen über Erfolg oder Misserfolg zurückgeben. Der Parameterreason
enthält einenDOMString
-Wert, der beschreibt, warum der Stream abgebrochen wurde.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Die WritableStreamDefaultController
-Schnittstelle der Streams API stellt einen Controller dar, mit dem während der Einrichtung, wenn mehr Blöcke zum Schreiben gesendet werden, oder am Ende des Schreibvorgangs der Status einer WritableStream
gesteuert werden kann. Beim Erstellen einer WritableStream
wird der zugrunde liegenden Senke eine entsprechende WritableStreamDefaultController
-Instanz zur Bearbeitung zugewiesen. Für WritableStreamDefaultController
gibt es nur eine Methode: WritableStreamDefaultController.error()
. Diese führt bei allen zukünftigen Interaktionen mit dem verknüpften Stream zu Fehlern.
WritableStreamDefaultController
unterstützt auch ein signal
-Attribut, das eine Instanz von AbortSignal
zurückgibt, sodass ein WritableStream
-Vorgang bei Bedarf beendet werden kann.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Das zweite, ebenfalls optionale Argument des WritableStream()
-Konstruktors ist queuingStrategy
.
Es ist ein Objekt, das optional eine Warteschlangenstrategie für den Stream definiert. Dafür sind zwei Parameter erforderlich:
highWaterMark
: Eine positive Zahl, die die Hochwassermarke des Streams angibt, die auf diese Warteschlangenstrategie angewendet wird.size(chunk)
: Eine Funktion, die die endliche, nicht negative Größe des gegebenen Chunk-Werts berechnet und zurückgibt. Mit dem Ergebnis wird der Rückdruck bestimmt, der sich über die entsprechendeWritableStreamDefaultWriter.desiredSize
-Eigenschaft zeigt.
Die Methoden getWriter()
und write()
Zum Schreiben in einen beschreibbaren Stream benötigen Sie einen Writer. Dieser ist ein WritableStreamDefaultWriter
. Die Methode getWriter()
der WritableStream
-Schnittstelle gibt eine neue Instanz von WritableStreamDefaultWriter
zurück und sperrt den Stream auf diese Instanz. Solange der Stream gesperrt ist, kann kein anderer Autor übernommen werden, bis der aktuelle Stream freigegeben wird.
Die Methode write()
der WritableStreamDefaultWriter
-Schnittstelle schreibt einen übergebenen Datenblock in einen WritableStream
und die zugrunde liegende Senke und gibt dann ein Versprechen zurück, das aufgelöst wird, um den Erfolg oder Misserfolg des Schreibvorgangs anzuzeigen. Was „Erfolg“ bedeutet, hängt von der zugrunde liegenden Senke ab. Es kann darauf hindeuten, dass der Chunk akzeptiert und nicht unbedingt an seinem endgültigen Ziel gespeichert ist.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Das Attribut locked
Mit dem Attribut WritableStream.locked
können Sie prüfen, ob ein beschreibbarer Stream gesperrt ist.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Codebeispiel für beschreibbaren Stream
Das folgende Codebeispiel zeigt alle Schritte in Aktion.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Einen lesbaren Stream an einen beschreibbaren Stream senden
Ein lesbarer Stream kann über die Methode pipeTo()
des lesbaren Streams an einen beschreibbaren Stream übergeben werden.
ReadableStream.pipeTo()
leitet den aktuellen ReadableStream
an eine bestimmte WritableStream
weiter und gibt ein Versprechen zurück, das erfüllt ist, wenn der Pipe-Prozess erfolgreich abgeschlossen wurde, oder lehnt den Vorgang ab, wenn Fehler aufgetreten sind.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Transformationsstream erstellen
Die Schnittstelle TransformStream
der Streams API stellt einen Satz transformierbarer Daten dar. Um einen Transformationsstream zu erstellen, rufen Sie seinen Konstruktor TransformStream()
auf, der ein Transformationsstreamobjekt aus den angegebenen Handlern erstellt und zurückgibt. Der TransformStream()
-Konstruktor akzeptiert als erstes Argument ein optionales JavaScript-Objekt, das transformer
darstellt. Solche Objekte können eine der folgenden Methoden enthalten:
transformer
start(controller)
: Diese Methode wird sofort beim Erstellen des Objekts aufgerufen. In der Regel wird dies verwendet, um Präfix-Chunks mitcontroller.enqueue()
in die Warteschlange zu stellen. Diese Blöcke werden von der lesbaren Seite gelesen, sind aber nicht von Schreibvorgängen auf der beschreibbaren Seite abhängig. Wenn dieser anfängliche Prozess asynchron ist, z. B. weil etwas Aufwand zum Abrufen der Präfix-Chunks erforderlich ist, kann die Funktion ein Promise zurückgeben, um Erfolg oder Misserfolg zu signalisieren. Ein abgelehntes Promise führt zu einem Fehler im Stream. Alle ausgelösten Ausnahmen werden vomTransformStream()
-Konstruktor noch einmal ausgelöst.transform(chunk, controller)
: Diese Methode wird aufgerufen, wenn ein neuer Block, der ursprünglich auf die beschreibbare Seite geschrieben wurde, zur Transformation bereit ist. Die Streamimplementierung sorgt dafür, dass diese Funktion erst aufgerufen wird, nachdem vorherige Transformationen erfolgreich waren, und nie, bevorstart()
oder nachdemflush()
aufgerufen wurde. Diese Funktion führt die eigentliche Transformation des Transformationsstreams aus. Die Ergebnisse können mitcontroller.enqueue()
in eine Warteschlange gestellt werden. Dadurch kann ein einzelner auf die beschreibbarer Seite geschriebener Block zu null oder mehreren Blöcken auf der lesbaren Seite führen, je nachdem, wie oftcontroller.enqueue()
aufgerufen wird. Wenn der Transformationsprozess asynchron ist, kann diese Funktion ein Versprechen über den Erfolg oder Misserfolg der Transformation zurückgeben. Ein abgelehntes Promise gibt sowohl auf der lesbaren als auch auf der beschreibbaren Seite des Transformationsstreams Fehler. Wenn keinetransform()
-Methode angegeben ist, wird die Identitätstransformation verwendet. Dabei werden Blöcke, die von der beschreibbaren Seite bis zur lesbaren Seite unverändert bleiben, in die Warteschlange gestellt.flush(controller)
: Diese Methode wird aufgerufen, nachdem alle auf die beschreibbaren Seite geschriebenen Blöcke transformiert wurden, indemtransform()
erfolgreich durchlaufen wurde und die beschreibbare Seite bald geschlossen wird. In der Regel wird dies verwendet, um Suffix-Chunks für die lesbare Seite in die Warteschlange zu stellen, bevor auch diese geschlossen wird. Wenn der Leerungsprozess asynchron ist, kann die Funktion ein Versprechen über Erfolg oder Misserfolg zurückgeben. Das Ergebnis wird dem Aufrufer vonstream.writable.write()
mitgeteilt. Darüber hinaus gibt ein abgelehntes Promise sowohl auf der lesbaren als auch auf der beschreibbaren Seite des Streams einen Fehler zurück. Das Auslösen einer Ausnahme wird so behandelt wie das Zurückgeben eines abgelehnten Promise.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Die Warteschlangenstrategien writableStrategy
und readableStrategy
Der zweite und dritte optionale Parameter des TransformStream()
-Konstruktors sind optionale writableStrategy
- und readableStrategy
-Warteschlangenstrategien. Sie sind wie in den Streamabschnitten Lesen bzw. Beschreibbar beschrieben definiert.
Codebeispiel für Transform-Stream
Das folgende Codebeispiel zeigt einen einfachen Transformationsstream in Aktion.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Lesbaren Stream über einen Transformationsstream senden
Die Methode pipeThrough()
der ReadableStream
-Schnittstelle bietet eine verkettbare Methode, um den aktuellen Stream über einen Transformationsstream oder ein anderes beschreibbares/lesbares Paar zu leiten. Durch das Piping eines Streams wird dieser in der Regel für die Dauer der Pipe gesperrt, sodass andere Leser ihn nicht sperren können.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Das nächste (Bit-konstruierte) Codebeispiel zeigt, wie Sie eine „Shouting“-Version von fetch()
implementieren können, in der der gesamte Text in Großbuchstaben geschrieben wird, indem Sie das zurückgegebene Antwortversprechen als Stream auslesen und Chunk für Block in Großbuchstaben schreiben. Der Vorteil dieses Ansatzes besteht darin, dass Sie nicht warten müssen, bis das gesamte Dokument heruntergeladen ist, was bei großen Dateien einen großen Unterschied ausmachen kann.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
Die folgende Demo zeigt lesbare, beschreibbare und transformierende Streams in Aktion. Außerdem finden Sie hier Beispiele für pipeThrough()
- und pipeTo()
-Pip-Ketten und veranschaulicht tee()
. Optional können Sie die Demo in einem eigenen Fenster ausführen oder den Quellcode ansehen.
Nützliche Streams, die im Browser verfügbar sind
In den Browser sind eine Reihe nützlicher Streams integriert. Sie können ein ReadableStream
ganz einfach aus einem Blob erstellen. Die Methode stream() der Blob
-Schnittstelle gibt einen ReadableStream
zurück, der beim Lesen die im Blob enthaltenen Daten zurückgibt. Denken Sie auch daran, dass ein File
-Objekt eine bestimmte Art von Blob
ist und in jedem Kontext verwendet werden kann, den es für ein Blob gibt.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Die Streamingvarianten von TextDecoder.decode()
und TextEncoder.encode()
werden TextDecoderStream
bzw. TextEncoderStream
genannt.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Dateien lassen sich mit den Transformationsstreams CompressionStream
und DecompressionStream
ganz einfach komprimieren oder dekomprimieren. Das folgende Codebeispiel zeigt, wie Sie die Streams-Spezifikation herunterladen, sie direkt im Browser komprimieren (gzip) und die komprimierte Datei direkt auf das Laufwerk schreiben können.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
Die FileSystemWritableFileStream
der File System Access API und die experimentellen fetch()
-Anfragestreams sind Beispiele für beschreibbare Streams.
Die Serial API nutzt in hohem Maße sowohl lesbare als auch beschreibbare Streams.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Die WebSocketStream
API bindet Streams in die WebSocket API ein.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Nützliches Infomaterial
- Streams-Spezifikation
- Begleitende Demos
- Streamt Polyfill
- 2016 – das Jahr der Webstreams
- Asynchrone Iterationen und Generatoren
- Stream Visualizer
Danksagungen
Dieser Artikel wurde von Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley und Adam Rice verfasst. Die Blogposts von Jake Archibald haben mir sehr dabei geholfen, Streams zu verstehen. Einige der Codebeispiele sind von den explorativen Datenanalysen des GitHub-Nutzers @bellbind inspiriert und Teile der Beschreibung stützen sich stark auf die MDN-Webdokumente in Streams. Die Autoren des Streams-Standards haben beim Schreiben dieser Spezifikation hervorragende Arbeit geleistet. Hero-Image von Ryan Lara auf Unsplash