瞭解如何透過 Streams API 使用可讀取、可寫入及轉換串流。
Streams API 可讓您以程式輔助方式存取透過網路接收的資料串流,或以任何方式在本機建立的資料串流,並使用 JavaScript 處理資料串流。「串流」是指將您要接收、傳送或轉換為小型資源的資源進行細分,然後逐步處理這些區塊。雖然瀏覽器在接收要顯示在網頁上的 HTML 或影片等素材資源時,還是會像串流一樣運作,但這種功能在 2015 年推出串流 fetch
前就未曾讓 JavaScript 使用。
過去,如要處理某個種類的資源 (例如影片或文字檔等),您必須下載整個檔案,等待檔案還原為合適的格式,然後再進行處理。由於可以將串流提供給 JavaScript 而上述改變現在,您可以在用戶端有可用的 JavaScript 時,逐步使用 JavaScript 處理原始資料,而不必產生緩衝區、字串或 blob。我可以解鎖多種用途,以下列出其中幾項:
- 影片效果:透過可即時套用效果的轉換串流,傳送可讀取的影片串流。
- 資料 (解壓縮):透過選擇性 (解壓縮) 轉換串流傳輸檔案串流。
- 圖片解碼:透過轉換串流傳輸 HTTP 回應串流,該串流會將位元組解碼為點陣圖資料,然後再透過其他轉換串流將點陣圖轉譯為 PNG。如果該元件安裝在 Service Worker 的
fetch
處理常式中,您就能以透明化的方式聚填 AVIF 等新圖片格式。
瀏覽器支援
ReadableStream 和 WritableStream
TransformStream
核心概念
在詳細說明各種訊息串類型前,先來介紹一些核心概念。
塊狀
區塊是指從串流寫入或讀取的單一資料片段。串流可以是任何類型,甚至可以包含不同類型的區塊。大多數情況下,區塊不是特定串流最不可分割的資料單位。例如,位元組串流可能包含由 16 KiB Uint8Array
單位 (而非單位元組) 的區塊。
可讀取串流
可讀取的串流代表您可以讀取的資料來源。換句話說,資料會從可讀取的串流「傳出」。具體而言,可讀取的串流是 ReadableStream
類別的執行個體。
可寫入串流
可寫入的串流代表可寫入的資料目的地。換句話說,資料會「加入」可寫入的串流。具體而言,可寫入串流是 WritableStream
類別的執行個體。
轉換串流
轉換串流是由「兩組串流」組成:一個可寫入的串流 (稱為其可寫入側),以及可讀取的串流 (稱為可讀取的側邊)。在現實生活中比喻是同時翻譯,能即時將某種語言翻譯成另一種語言。以轉換串流特有的方式寫入可寫入的側邊,新資料就可供從可讀取的端讀取。簡言之,任何具有 writable
屬性和 readable
屬性的物件都能做為轉換串流。不過,標準 TransformStream
類別可讓您更輕鬆地建立這類正確傾斜的組合。
管線鏈
訊息串主要用於「管道」。您可以使用可讀取串流的 pipeTo()
方法,將可讀取的串流直接傳輸至可寫入的串流,也可以使用可讀取串流的 pipeThrough()
方法,先透過一或多個轉換串流傳輸該串流。以這種方式連接的一組串流稱為管道鏈。
背壓
管道鏈建構完成後,就會傳播有關區塊通過速度的信號。如果鏈結中的某個步驟目前無法接受區塊,則該程序會透過管道鏈向後傳播信號,直到最終收到原始來源的通知,會停止迅速產生區塊。這個正規化資料流的程序稱為背壓。
開球
可讀取的串流可以使用其 tee()
方法對齊 (以大寫「T」形狀命名)。這會「鎖定」串流,使串流無法再直接使用;但系統會建立兩個新的串流,稱為分支,可以獨立使用。Teat 設定也很重要,因為串流無法倒轉或重新開始,稍後將進一步說明。
可讀取串流的機制
可讀取的串流是透過從基礎來源傳輸的 ReadableStream
物件以 JavaScript 表示的資料來源。ReadableStream()
建構函式會從指定的處理常式建立及傳回可讀取的串流物件。基礎來源有兩種:
- 「推送來源」會在您存取資料後持續推送資料,您隨時可以開始、暫停或取消存取串流。例如直播影片串流、伺服器傳送事件或 WebSocket。
- 使用提取來源後,您必須明確要求從來源取得資料。例如透過
fetch()
或XMLHttpRequest
呼叫提供 HTTP 作業。
串流資料會在名為「區塊」的小片段中依序讀取。系統會將串流中的區塊顯示為「排入佇列」。也就是說,這些指令已在佇列中等候可供讀取。內部佇列會追蹤尚未讀取的區塊。
「佇列策略」是一種物件,可決定串流應如何根據內部佇列的狀態傳送背壓信號。佇列策略會為每個區塊指派大小,並將佇列中所有區塊的總大小與指定的數字進行比較,稱為「高水標」。
「讀取器」會讀取串流中的區塊。這個讀取器一次擷取資料區塊後,方便您執行想對這類資料的任何操作。讀取器以及其他與其一起處理的程式碼,亦稱為「消費者」。
在此情況下,下一個建構項目稱為「控制器」。顧名思義,每個可讀取的串流都有相關聯的控制器,方便您控管串流。
一次只有一個讀取器可以讀取串流。當讀取器建立並開始讀取串流 (也就是成為「有效讀取器」) 後,該串流就會「鎖定」。如要讓其他讀取器接管串流,通常需要先放開第一個讀取器,才能執行任何動作 (但您可以 tee 串流)。
建立可讀取的串流
您可以呼叫其建構函式 ReadableStream()
,建立可讀取的串流。建構函式含有選用的引數 underlyingSource
,代表包含方法和屬性的物件,用於定義建構的串流執行個體行為方式。
underlyingSource
您可以使用下列由開發人員定義的選用方法:
start(controller)
:建構物件時立即呼叫。這個方法可以存取串流來源,以及設定串流功能所需的任何其他作業。如果這項程序以非同步方式完成,這個方法可能會傳回承諾,表示成功或失敗。傳遞至這個方法的controller
參數為ReadableStreamDefaultController
。pull(controller)
:隨著擷取到的區塊更多,可用於控制串流。只要串流內部區塊的佇列並未已滿,直到佇列達到高水標前,系統就會重複呼叫此方法。如果呼叫pull()
的結果是承諾,則只有在上述 promise 符合要求時,才會再次呼叫pull()
。如果承諾遭拒,串流就會發生錯誤。cancel(reason)
:在串流消費者取消串流時呼叫。
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
支援下列方法:
ReadableStreamDefaultController.close()
會關閉相關聯的串流。ReadableStreamDefaultController.enqueue()
會將相關聯的串流中的特定區塊排入佇列。ReadableStreamDefaultController.error()
會導致日後與相關聯串流的任何互動發生錯誤。
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
第二項選用屬性,ReadableStream()
建構函式的引數則為 queuingStrategy
。此物件可視需要定義串流的佇列策略,該策略會採用兩個參數:
highWaterMark
:以非負數表示使用這項佇列策略的串流高水標誌。size(chunk)
:這個函式會計算並傳回指定區塊值的有限非負數大小。該結果會用來判斷背壓,並透過適當的ReadableStreamDefaultController.desiredSize
屬性資訊清單。也會控管呼叫基礎來源pull()
方法的時機。
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
getReader()
和 read()
方法
如要讀取可讀取的串流,您需要讀取器,格式為 ReadableStreamDefaultReader
。ReadableStream
介面的 getReader()
方法會建立讀取器,並將串流鎖定。串流鎖定後,必須等到這部串流釋出後,才能取得其他讀者。
ReadableStreamDefaultReader
介面的 read()
方法會傳回承諾,可提供串流內部佇列中下一個區塊的存取權。這會根據串流狀態來執行或拒絕結果。不同的可能如下:
- 如果有區塊,則承諾項目會以下列格式的物件完成:
{ value: chunk, done: false }
。 - 如果串流關閉,承諾項目會以下列形式的物件執行:
{ value: undefined, done: true }
。 - 如果串流發生錯誤,承諾會遭到拒絕,並附上相關錯誤。
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
locked
屬性
存取其 ReadableStream.locked
屬性,即可檢查可讀取串流是否已鎖定。
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
可讀取串流程式碼範例
以下程式碼範例顯示所有操作步驟。您必須先建立 ReadableStream
,並在其 underlyingSource
引數 (即 TimestampSource
類別) 中定義 start()
方法。這個方法會在十秒期間,每秒通知串流的 controller
到 enqueue()
時間戳記。最後,它會通知控制器 close()
串流。如要使用這個串流,請透過 getReader()
方法建立讀取器,並呼叫 read()
,直到串流變成 done
為止。
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
非同步疊代
檢查每個 read()
迴圈疊代時,如果串流為 done
並非最便利的 API,幸好,我們很快就會提供更好的方法:非同步疊代。
for await (const chunk of stream) {
console.log(chunk);
}
目前使用非同步疊代的解決方法,是使用 polyfill 實作行為。
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
鎖定可讀取的串流
ReadableStream
介面的 tee()
方法會中斷目前可讀取的串流,並傳回兩個元素陣列,其中包含兩個產生的分支,做為新的 ReadableStream
執行個體。這可讓兩位讀者同時讀取串流。例如,您可以在 Service Worker 中進行這項操作,例如從伺服器擷取回應,並將回應串流至瀏覽器,同時將回應串流至 Service Worker 快取。因為回應主體不能超過一次使用,所以您需要兩個副本才能完成這項工作。如要取消串流,必須同時取消兩個產生的分支版本。選擇串流直播時,通常會先鎖定串流,避免其他讀者鎖定。
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
可讀取的位元組串流
針對代表位元組的串流,系統會提供可讀取串流的擴充版本,以有效處理位元組,特別是透過最小化副本處理。位元串流可讓取得自己的緩衝區 (BYOB) 讀取器。預設的實作方式可以提供一系列不同的輸出內容 (例如使用 WebSocket 時,字串或陣列緩衝區),而位元組串流則能保證位元組輸出。此外,BYOB 讀取者也有穩定性方面的優勢。這是因為如果緩衝區卸離,就能確保一個緩衝區不會寫入相同緩衝區兩次,進而避免競爭狀況。BYOB 讀取器可以減少瀏覽器執行垃圾收集所需的次數,因為 BYOB 讀取器可以重複使用緩衝區。
建立可讀取的位元組串流
將額外的 type
參數傳遞至 ReadableStream()
建構函式,即可建立可讀取的位元組串流。
new ReadableStream({ type: 'bytes' });
underlyingSource
為可讀取的位元組串流的基礎來源會給予 ReadableByteStreamController
來執行操作。其 ReadableByteStreamController.enqueue()
方法使用值為 ArrayBufferView
的 chunk
引數。屬性 ReadableByteStreamController.byobRequest
會傳回目前的 BYOB 提取要求;如果沒有,屬性則傳回空值。最後,ReadableByteStreamController.desiredSize
屬性會傳回所需的大小,以填滿控制串流的內部佇列。
queuingStrategy
第二項選用屬性,ReadableStream()
建構函式的引數則為 queuingStrategy
。這個物件可視需要定義串流的佇列策略,該策略會採用一個參數:
highWaterMark
:非負數的位元組,表示使用這項佇列策略的串流高水標誌。這會用來判斷背壓,並透過適當的ReadableByteStreamController.desiredSize
屬性資訊清單。也會控管呼叫基礎來源pull()
方法的時機。
getReader()
和 read()
方法
接著,您就可以透過設定 mode
參數來存取 ReadableStreamBYOBReader
:
ReadableStream.getReader({ mode: "byob" })
。這樣一來,您就能更精準地控制緩衝區分配,藉此避免複製作業。如要讀取位元組串流,您必須呼叫 ReadableStreamBYOBReader.read(view)
,其中 view
為 ArrayBufferView
。
可讀取位元組串流程式碼範例
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
下列函式會傳回可讀取的位元組串流,允許有效率地讀取隨機產生的陣列的零複製作業。系統不會使用 1,024 的預先決定區塊大小,而是嘗試填滿開發人員提供的緩衝區,以獲得完整控制。
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
可寫入串流的機制
可寫入串流是您可以寫入資料的目的地,以 JavaScript 代表 WritableStream
物件。這可做為基礎接收器頂端的抽象層,也就是寫入原始資料的較低層級 I/O 接收器。
資料會透過寫入器寫入串流,一次一個區塊。區塊可能有多種形式,就像讀取器中的區塊一樣。您可以使用任何想要的程式碼產生可供寫入的區塊;寫入者加上相關程式碼稱為「生產端」。
建立寫入者並開始寫入串流 (「進行中的寫入者」) 時,代表該寫入作業會「鎖定」。一次只有一位寫入者可以在可寫入的串流中寫入資料。如要讓其他寫入者開始寫入串流,通常需要先釋放串流,然後再將其他寫入者附加至串流。
內部佇列可追蹤已寫入串流,但尚未由基礎接收器處理的區塊。
「佇列策略」是一種物件,可決定串流應如何根據內部佇列的狀態傳送背壓信號。佇列策略會為每個區塊指派大小,並將佇列中所有區塊的總大小與指定的數字進行比較,稱為「高水標」。
最後一個建構項目稱為「控制器」。每個可寫入串流都有相關聯的控制器,可讓您控制串流 (例如取消操作)。
建立可寫入的串流
Streams API 的 WritableStream
介面提供將串流資料寫入目的地 (稱為接收器) 的標準抽象化機制。這個物件具有內建背壓與佇列。只要呼叫其建構函式 WritableStream()
,即可建立可寫入的串流。這個元素包含選用的 underlyingSink
參數,代表包含方法和屬性的物件,可用於定義建構的串流執行個體行為。
underlyingSink
underlyingSink
可包含下列由開發人員定義的選用方法。傳遞至部分方法的 controller
參數是 WritableStreamDefaultController
。
start(controller)
:建構物件後,會立即呼叫這個方法。此方法的內容應旨在取得基礎接收器的存取權。如果這項程序以非同步方式完成,系統可能會傳回承諾,表示成功或失敗。write(chunk, controller)
:當新的資料區塊 (在chunk
參數中指定) 準備好寫入基礎接收器時,系統就會呼叫這個方法。它會傳回承諾,指出寫入作業成功或失敗。系統只會在先前的寫入成功後呼叫這個方法,且不會在串流關閉或取消後呼叫此方法。close(controller)
:如果應用程式表示應用程式已將區塊寫入串流,系統就會呼叫這個方法。內容應執行任何必要工作,以完成對基礎接收器的寫入作業,並釋出對該接收器的存取權。如果這項程序為非同步性質,系統可能會傳回承諾,表示成功或失敗。只有在所有排入佇列的寫入作業都成功後,系統才會呼叫這個方法。abort(reason)
:如果應用程式表示希望突然關閉串流,並設為錯誤狀態,系統就會呼叫這個方法。它可以清理任何保留的資源 (與close()
類似),但即使寫入作業排入佇列,系統仍會呼叫abort()
。這些區塊會丟掉。如果此程序為非同步,可能會傳回承諾,表示成功或失敗。reason
參數包含DOMString
,說明串流取消的原因。
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Streams API 的 WritableStreamDefaultController
介面代表一種控制器,可在設定期間控制 WritableStream
的狀態,因為在寫入期間或寫入結束時會提交更多區塊。建構 WritableStream
時,系統會為基礎接收器指派對應的 WritableStreamDefaultController
執行個體來操控。WritableStreamDefaultController
只有一個方法:WritableStreamDefaultController.error()
,會導致日後與相關聯串流的任何互動發生錯誤。WritableStreamDefaultController
也支援 signal
屬性,該屬性會傳回 AbortSignal
的執行個體,讓您可以視需要停止 WritableStream
作業。
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
第二項選用屬性,WritableStream()
建構函式的引數則為 queuingStrategy
。此物件可視需要定義串流的佇列策略,該策略會採用兩個參數:
highWaterMark
:以非負數表示使用這項佇列策略的串流高水標誌。size(chunk)
:這個函式會計算並傳回指定區塊值的有限非負數大小。該結果會用來判斷背壓,並透過適當的WritableStreamDefaultWriter.desiredSize
屬性資訊清單。
getWriter()
和 write()
方法
如要寫入可寫入的串流,您需要寫入者,這會是 WritableStreamDefaultWriter
。WritableStream
介面的 getWriter()
方法會傳回 WritableStreamDefaultWriter
的新例項,並將串流鎖定至該執行個體。在串流遭到鎖定的情況下,必須等到目前串流釋出,才能取得其他寫入者。
WritableStreamDefaultWriter
介面的 write()
方法會將傳遞的資料區塊寫入 WritableStream
及其基礎接收器,然後傳回承諾解析,表示寫入作業成功或失敗。請注意,「成功」代表到基礎接收器;可能表示區塊已接受,但不一定能安全儲存至最終目的地。
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
locked
屬性
您可以存取其 WritableStream.locked
屬性,檢查可寫入的串流是否已鎖定。
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
可寫入串流程式碼範例
以下程式碼範例呈現了所有的實際操作步驟。
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
將可讀取的串流移至可寫入的串流
您可透過可讀取串流的 pipeTo()
方法,將可讀取的串流傳輸至可寫入的串流。ReadableStream.pipeTo()
會將目前的 ReadableStream
傳遞給指定的 WritableStream
,並傳回能在管道程序成功完成時顯示的承諾,或拒絕任何錯誤。
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
建立轉換串流
Streams API 的 TransformStream
介面代表一組可轉換的資料。您可以呼叫其建構函式 TransformStream()
來建立轉換串流,從指定的處理常式建立並傳回轉換串流物件。TransformStream()
建構函式接受其第一個引數,也就是代表 transformer
的選用 JavaScript 物件。這類物件可包含以下任何方法:
transformer
start(controller)
:建構物件後,會立即呼叫這個方法。一般而言,這會使用controller.enqueue()
將前置字串區塊加入佇列。這些區塊會從可讀取的端讀取,但並不仰賴寫入可寫入端的任何資料。如果這個初始程序是非同步的,例如因為取得前置字串區塊需要花費一些心力,函式可以傳回承諾,指出成功或失敗;遭拒的承諾將導致串流錯誤。任何擲回的例外狀況都會由TransformStream()
建構函式重新擲回。transform(chunk, controller)
:如果新區塊原本寫入可寫入端,而可供轉換,系統就會呼叫這個方法。串流實作會保證,只有在先前的轉換成功後,且在start()
完成或呼叫flush()
之後,才會呼叫此函式。這個函式會執行轉換串流的實際轉換工作。可以使用controller.enqueue()
將結果排入佇列。這樣就能確保寫入可寫入端的單一區塊,依據呼叫controller.enqueue()
的次數,在可讀取端產生零或多個區塊。如果轉換作業是非同步進行的,這個函式可以傳回承諾,表示轉換成功或失敗。遭拒的承諾會同時發生錯誤轉換串流可讀取和可寫入的那面。如未提供transform()
方法,則會使用身分轉換功能,將未變更的區塊排入可寫入端的區塊。flush(controller)
:所有寫入可寫入端的區塊都已透過transform()
成功轉換且即將關閉,然後呼叫此方法。通常用於將後置字串區塊排入可讀取的端,以免這個區塊關閉。如果清除程序為非同步性質,函式可以傳回承諾,以表示成功或失敗,並將結果傳達給stream.writable.write()
的呼叫端。此外,遭拒的承諾會同時錯誤可讀取和可寫入的兩側。擲回例外狀況與傳回遭拒承諾視為相同。
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
writableStrategy
和 readableStrategy
佇列策略
TransformStream()
建構函式的第二和第三個選用參數是選用的 writableStrategy
和 readableStrategy
佇列策略。如可讀取和可寫入串流部分所述。
轉換串流程式碼範例
以下程式碼範例顯示簡易轉換串流的實際運作情形。
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
透過轉換串流找出可讀取的串流
ReadableStream
介面的 pipeThrough()
方法提供鏈結方式,可讓您透過轉換串流或任何其他可寫入/可讀取的組合,傳輸目前的串流。傳輸串流時,通常會在管道期間鎖定,避免其他讀者鎖定串流。
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
下一個程式碼範例 (有點複雜) 說明如何實作 fetch()
的「大聲公」版本,透過使用傳回的回應承諾「做為串流」,並按區塊的大寫區塊,將所有文字大寫。這種做法的好處是,您不需要等待完整文件下載完成,這在處理大型檔案時可產生巨大變化。
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
操作示範
下方示範呈現了可讀取、可寫入及轉換串流的實際運作情形。其中也包含 pipeThrough()
和 pipeTo()
管道鏈的範例,以及說明 tee()
。您可以選擇在專屬的視窗中執行 demo,或是查看原始碼。
瀏覽器提供的實用訊息串
瀏覽器中內建許多實用的訊息串。您可以從 blob 輕鬆建立 ReadableStream
。Blob
介面的 stream() 方法會在讀取 blob 時傳回 ReadableStream
。另請注意,File
物件是 Blob
的特定種類,可用於 blob 的任何結構定義。
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
TextDecoder.decode()
和 TextEncoder.encode()
的串流變數分別稱為 TextDecoderStream
和 TextEncoderStream
。
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
您可以使用 CompressionStream
和 DecompressionStream
轉換串流分別壓縮或解壓縮檔案。下列程式碼範例說明如何下載串流規格、直接在瀏覽器中壓縮 (gzip),以及將壓縮檔直接寫入磁碟。
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
File System Access API 的 FileSystemWritableFileStream
和實驗性 fetch()
要求串流都是可寫入的串流示例。
Serial API 會大量使用可讀取和可寫入的串流。
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
最後,WebSocketStream
API 會將串流與 WebSocket API 整合。
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
實用資源
特別銘謝
本文由 Jake Archibald、François Beaufort、Sam Dutton、Mattias Buelens、Surma、Joe Medley 和 Adam Rice 審查。Jake Archibald 的網誌文章讓我進一步瞭解串流。部分程式碼範例的靈感是來自 GitHub 使用者 @bellbind 的探索,以及 MDN 網路文件串流中大量建構項目。Streams Standard 的作者在編寫這個規格方面有相當出色的工作。主頁橫幅由 Ryan Lara 在 Unsplash 上完成。