訊息串:這份指南

瞭解如何透過 Streams API 使用可讀取、可寫入及轉換串流。

Streams API 可讓您以程式輔助方式存取透過網路接收的資料串流,或以任何方式在本機建立的資料串流,並使用 JavaScript 處理資料串流。「串流」是指將您要接收、傳送或轉換為小型資源的資源進行細分,然後逐步處理這些區塊。雖然瀏覽器在接收要顯示在網頁上的 HTML 或影片等素材資源時,還是會像串流一樣運作,但這種功能在 2015 年推出串流 fetch前就未曾讓 JavaScript 使用。

過去,如要處理某個種類的資源 (例如影片或文字檔等),您必須下載整個檔案,等待檔案還原為合適的格式,然後再進行處理。由於可以將串流提供給 JavaScript 而上述改變現在,您可以在用戶端有可用的 JavaScript 時,逐步使用 JavaScript 處理原始資料,而不必產生緩衝區、字串或 blob。我可以解鎖多種用途,以下列出其中幾項:

  • 影片效果:透過可即時套用效果的轉換串流,傳送可讀取的影片串流。
  • 資料 (解壓縮):透過選擇性 (解壓縮) 轉換串流傳輸檔案串流。
  • 圖片解碼:透過轉換串流傳輸 HTTP 回應串流,該串流會將位元組解碼為點陣圖資料,然後再透過其他轉換串流將點陣圖轉譯為 PNG。如果該元件安裝在 Service Worker 的 fetch 處理常式中,您就能以透明化的方式聚填 AVIF 等新圖片格式。

瀏覽器支援

ReadableStream 和 WritableStream

瀏覽器支援

  • 43
  • 14
  • 65
  • 10.1

資料來源

TransformStream

瀏覽器支援

  • 67
  • 79
  • 102
  • 14.1

資料來源

核心概念

在詳細說明各種訊息串類型前,先來介紹一些核心概念。

塊狀

區塊是指從串流寫入或讀取的單一資料片段。串流可以是任何類型,甚至可以包含不同類型的區塊。大多數情況下,區塊不是特定串流最不可分割的資料單位。例如,位元組串流可能包含由 16 KiB Uint8Array 單位 (而非單位元組) 的區塊。

可讀取串流

可讀取的串流代表您可以讀取的資料來源。換句話說,資料會從可讀取的串流「傳出」。具體而言,可讀取的串流是 ReadableStream 類別的執行個體。

可寫入串流

可寫入的串流代表可寫入的資料目的地。換句話說,資料會「加入」可寫入的串流。具體而言,可寫入串流是 WritableStream 類別的執行個體。

轉換串流

轉換串流是由「兩組串流」組成:一個可寫入的串流 (稱為其可寫入側),以及可讀取的串流 (稱為可讀取的側邊)。在現實生活中比喻是同時翻譯,能即時將某種語言翻譯成另一種語言。以轉換串流特有的方式寫入可寫入的側邊,新資料就可供從可讀取的端讀取。簡言之,任何具有 writable 屬性和 readable 屬性的物件都能做為轉換串流。不過,標準 TransformStream 類別可讓您更輕鬆地建立這類正確傾斜的組合。

管線鏈

訊息串主要用於「管道」。您可以使用可讀取串流的 pipeTo() 方法,將可讀取的串流直接傳輸至可寫入的串流,也可以使用可讀取串流的 pipeThrough() 方法,先透過一或多個轉換串流傳輸該串流。以這種方式連接的一組串流稱為管道鏈。

背壓

管道鏈建構完成後,就會傳播有關區塊通過速度的信號。如果鏈結中的某個步驟目前無法接受區塊,則該程序會透過管道鏈向後傳播信號,直到最終收到原始來源的通知,會停止迅速產生區塊。這個正規化資料流的程序稱為背壓。

開球

可讀取的串流可以使用其 tee() 方法對齊 (以大寫「T」形狀命名)。這會「鎖定」串流,使串流無法再直接使用;但系統會建立兩個新的串流,稱為分支,可以獨立使用。Teat 設定也很重要,因為串流無法倒轉或重新開始,稍後將進一步說明。

管道鏈圖,當中包含可讀取的串流 (來自擷取 API 的呼叫)。接著,透過轉換串流傳輸輸出內容,再將輸出內容傳遞至瀏覽器,以取得第一個產生的可讀取串流,再將第二個產生的可讀取串流傳送至 Service Worker 快取。
管道鏈。

可讀取串流的機制

可讀取的串流是透過從基礎來源傳輸的 ReadableStream 物件以 JavaScript 表示的資料來源。ReadableStream() 建構函式會從指定的處理常式建立及傳回可讀取的串流物件。基礎來源有兩種:

  • 「推送來源」會在您存取資料後持續推送資料,您隨時可以開始、暫停或取消存取串流。例如直播影片串流、伺服器傳送事件或 WebSocket。
  • 使用提取來源後,您必須明確要求從來源取得資料。例如透過 fetch()XMLHttpRequest 呼叫提供 HTTP 作業。

串流資料會在名為「區塊」的小片段中依序讀取。系統會將串流中的區塊顯示為「排入佇列」。也就是說,這些指令已在佇列中等候可供讀取。內部佇列會追蹤尚未讀取的區塊。

「佇列策略」是一種物件,可決定串流應如何根據內部佇列的狀態傳送背壓信號。佇列策略會為每個區塊指派大小,並將佇列中所有區塊的總大小與指定的數字進行比較,稱為「高水標」

「讀取器」會讀取串流中的區塊。這個讀取器一次擷取資料區塊後,方便您執行想對這類資料的任何操作。讀取器以及其他與其一起處理的程式碼,亦稱為「消費者」

在此情況下,下一個建構項目稱為「控制器」。顧名思義,每個可讀取的串流都有相關聯的控制器,方便您控管串流。

一次只有一個讀取器可以讀取串流。當讀取器建立並開始讀取串流 (也就是成為「有效讀取器」) 後,該串流就會「鎖定」。如要讓其他讀取器接管串流,通常需要先放開第一個讀取器,才能執行任何動作 (但您可以 tee 串流)。

建立可讀取的串流

您可以呼叫其建構函式 ReadableStream(),建立可讀取的串流。建構函式含有選用的引數 underlyingSource,代表包含方法和屬性的物件,用於定義建構的串流執行個體行為方式。

underlyingSource

您可以使用下列由開發人員定義的選用方法:

  • start(controller):建構物件時立即呼叫。這個方法可以存取串流來源,以及設定串流功能所需的任何其他作業。如果這項程序以非同步方式完成,這個方法可能會傳回承諾,表示成功或失敗。傳遞至這個方法的 controller 參數為 ReadableStreamDefaultController
  • pull(controller):隨著擷取到的區塊更多,可用於控制串流。只要串流內部區塊的佇列並未已滿,直到佇列達到高水標前,系統就會重複呼叫此方法。如果呼叫 pull() 的結果是承諾,則只有在上述 promise 符合要求時,才會再次呼叫 pull()。如果承諾遭拒,串流就會發生錯誤。
  • cancel(reason):在串流消費者取消串流時呼叫。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController 支援下列方法:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

第二項選用屬性,ReadableStream() 建構函式的引數則為 queuingStrategy。此物件可視需要定義串流的佇列策略,該策略會採用兩個參數:

  • highWaterMark:以非負數表示使用這項佇列策略的串流高水標誌。
  • size(chunk):這個函式會計算並傳回指定區塊值的有限非負數大小。該結果會用來判斷背壓,並透過適當的 ReadableStreamDefaultController.desiredSize 屬性資訊清單。也會控管呼叫基礎來源 pull() 方法的時機。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader()read() 方法

如要讀取可讀取的串流,您需要讀取器,格式為 ReadableStreamDefaultReaderReadableStream 介面的 getReader() 方法會建立讀取器,並將串流鎖定。串流鎖定後,必須等到這部串流釋出後,才能取得其他讀者。

ReadableStreamDefaultReader 介面的 read() 方法會傳回承諾,可提供串流內部佇列中下一個區塊的存取權。這會根據串流狀態來執行或拒絕結果。不同的可能如下:

  • 如果有區塊,則承諾項目會以下列格式的物件完成:
    { value: chunk, done: false }
  • 如果串流關閉,承諾項目會以下列形式的物件執行:
    { value: undefined, done: true }
  • 如果串流發生錯誤,承諾會遭到拒絕,並附上相關錯誤。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked 屬性

存取其 ReadableStream.locked 屬性,即可檢查可讀取串流是否已鎖定。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可讀取串流程式碼範例

以下程式碼範例顯示所有操作步驟。您必須先建立 ReadableStream,並在其 underlyingSource 引數 (即 TimestampSource 類別) 中定義 start() 方法。這個方法會在十秒期間,每秒通知串流的 controllerenqueue() 時間戳記。最後,它會通知控制器 close() 串流。如要使用這個串流,請透過 getReader() 方法建立讀取器,並呼叫 read(),直到串流變成 done 為止。

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

非同步疊代

檢查每個 read() 迴圈疊代時,如果串流為 done 並非最便利的 API,幸好,我們很快就會提供更好的方法:非同步疊代。

for await (const chunk of stream) {
  console.log(chunk);
}

目前使用非同步疊代的解決方法,是使用 polyfill 實作行為。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

鎖定可讀取的串流

ReadableStream 介面的 tee() 方法會中斷目前可讀取的串流,並傳回兩個元素陣列,其中包含兩個產生的分支,做為新的 ReadableStream 執行個體。這可讓兩位讀者同時讀取串流。例如,您可以在 Service Worker 中進行這項操作,例如從伺服器擷取回應,並將回應串流至瀏覽器,同時將回應串流至 Service Worker 快取。因為回應主體不能超過一次使用,所以您需要兩個副本才能完成這項工作。如要取消串流,必須同時取消兩個產生的分支版本。選擇串流直播時,通常會先鎖定串流,避免其他讀者鎖定。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

可讀取的位元組串流

針對代表位元組的串流,系統會提供可讀取串流的擴充版本,以有效處理位元組,特別是透過最小化副本處理。位元串流可讓取得自己的緩衝區 (BYOB) 讀取器。預設的實作方式可以提供一系列不同的輸出內容 (例如使用 WebSocket 時,字串或陣列緩衝區),而位元組串流則能保證位元組輸出。此外,BYOB 讀取者也有穩定性方面的優勢。這是因為如果緩衝區卸離,就能確保一個緩衝區不會寫入相同緩衝區兩次,進而避免競爭狀況。BYOB 讀取器可以減少瀏覽器執行垃圾收集所需的次數,因為 BYOB 讀取器可以重複使用緩衝區。

建立可讀取的位元組串流

將額外的 type 參數傳遞至 ReadableStream() 建構函式,即可建立可讀取的位元組串流。

new ReadableStream({ type: 'bytes' });

underlyingSource

為可讀取的位元組串流的基礎來源會給予 ReadableByteStreamController 來執行操作。其 ReadableByteStreamController.enqueue() 方法使用值為 ArrayBufferViewchunk 引數。屬性 ReadableByteStreamController.byobRequest 會傳回目前的 BYOB 提取要求;如果沒有,屬性則傳回空值。最後,ReadableByteStreamController.desiredSize 屬性會傳回所需的大小,以填滿控制串流的內部佇列。

queuingStrategy

第二項選用屬性,ReadableStream() 建構函式的引數則為 queuingStrategy。這個物件可視需要定義串流的佇列策略,該策略會採用一個參數:

  • highWaterMark:非負數的位元組,表示使用這項佇列策略的串流高水標誌。這會用來判斷背壓,並透過適當的 ReadableByteStreamController.desiredSize 屬性資訊清單。也會控管呼叫基礎來源 pull() 方法的時機。

getReader()read() 方法

接著,您就可以透過設定 mode 參數來存取 ReadableStreamBYOBReaderReadableStream.getReader({ mode: "byob" })。這樣一來,您就能更精準地控制緩衝區分配,藉此避免複製作業。如要讀取位元組串流,您必須呼叫 ReadableStreamBYOBReader.read(view),其中 viewArrayBufferView

可讀取位元組串流程式碼範例

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

下列函式會傳回可讀取的位元組串流,允許有效率地讀取隨機產生的陣列的零複製作業。系統不會使用 1,024 的預先決定區塊大小,而是嘗試填滿開發人員提供的緩衝區,以獲得完整控制。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

可寫入串流的機制

可寫入串流是您可以寫入資料的目的地,以 JavaScript 代表 WritableStream 物件。這可做為基礎接收器頂端的抽象層,也就是寫入原始資料的較低層級 I/O 接收器。

資料會透過寫入器寫入串流,一次一個區塊。區塊可能有多種形式,就像讀取器中的區塊一樣。您可以使用任何想要的程式碼產生可供寫入的區塊;寫入者加上相關程式碼稱為「生產端」

建立寫入者並開始寫入串流 (「進行中的寫入者」) 時,代表該寫入作業會「鎖定」。一次只有一位寫入者可以在可寫入的串流中寫入資料。如要讓其他寫入者開始寫入串流,通常需要先釋放串流,然後再將其他寫入者附加至串流。

內部佇列可追蹤已寫入串流,但尚未由基礎接收器處理的區塊。

「佇列策略」是一種物件,可決定串流應如何根據內部佇列的狀態傳送背壓信號。佇列策略會為每個區塊指派大小,並將佇列中所有區塊的總大小與指定的數字進行比較,稱為「高水標」

最後一個建構項目稱為「控制器」。每個可寫入串流都有相關聯的控制器,可讓您控制串流 (例如取消操作)。

建立可寫入的串流

Streams API 的 WritableStream 介面提供將串流資料寫入目的地 (稱為接收器) 的標準抽象化機制。這個物件具有內建背壓與佇列。只要呼叫其建構函式 WritableStream(),即可建立可寫入的串流。這個元素包含選用的 underlyingSink 參數,代表包含方法和屬性的物件,可用於定義建構的串流執行個體行為。

underlyingSink

underlyingSink 可包含下列由開發人員定義的選用方法。傳遞至部分方法的 controller 參數是 WritableStreamDefaultController

  • start(controller):建構物件後,會立即呼叫這個方法。此方法的內容應旨在取得基礎接收器的存取權。如果這項程序以非同步方式完成,系統可能會傳回承諾,表示成功或失敗。
  • write(chunk, controller):當新的資料區塊 (在 chunk 參數中指定) 準備好寫入基礎接收器時,系統就會呼叫這個方法。它會傳回承諾,指出寫入作業成功或失敗。系統只會在先前的寫入成功後呼叫這個方法,且不會在串流關閉或取消後呼叫此方法。
  • close(controller):如果應用程式表示應用程式已將區塊寫入串流,系統就會呼叫這個方法。內容應執行任何必要工作,以完成對基礎接收器的寫入作業,並釋出對該接收器的存取權。如果這項程序為非同步性質,系統可能會傳回承諾,表示成功或失敗。只有在所有排入佇列的寫入作業都成功後,系統才會呼叫這個方法。
  • abort(reason):如果應用程式表示希望突然關閉串流,並設為錯誤狀態,系統就會呼叫這個方法。它可以清理任何保留的資源 (與 close() 類似),但即使寫入作業排入佇列,系統仍會呼叫 abort()。這些區塊會丟掉。如果此程序為非同步,可能會傳回承諾,表示成功或失敗。reason 參數包含 DOMString,說明串流取消的原因。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API 的 WritableStreamDefaultController 介面代表一種控制器,可在設定期間控制 WritableStream 的狀態,因為在寫入期間或寫入結束時會提交更多區塊。建構 WritableStream 時,系統會為基礎接收器指派對應的 WritableStreamDefaultController 執行個體來操控。WritableStreamDefaultController 只有一個方法:WritableStreamDefaultController.error(),會導致日後與相關聯串流的任何互動發生錯誤。WritableStreamDefaultController 也支援 signal 屬性,該屬性會傳回 AbortSignal 的執行個體,讓您可以視需要停止 WritableStream 作業。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

第二項選用屬性,WritableStream() 建構函式的引數則為 queuingStrategy。此物件可視需要定義串流的佇列策略,該策略會採用兩個參數:

  • highWaterMark:以非負數表示使用這項佇列策略的串流高水標誌。
  • size(chunk):這個函式會計算並傳回指定區塊值的有限非負數大小。該結果會用來判斷背壓,並透過適當的 WritableStreamDefaultWriter.desiredSize 屬性資訊清單。

getWriter()write() 方法

如要寫入可寫入的串流,您需要寫入者,這會是 WritableStreamDefaultWriterWritableStream 介面的 getWriter() 方法會傳回 WritableStreamDefaultWriter 的新例項,並將串流鎖定至該執行個體。在串流遭到鎖定的情況下,必須等到目前串流釋出,才能取得其他寫入者。

WritableStreamDefaultWriter 介面的 write() 方法會將傳遞的資料區塊寫入 WritableStream 及其基礎接收器,然後傳回承諾解析,表示寫入作業成功或失敗。請注意,「成功」代表到基礎接收器;可能表示區塊已接受,但不一定能安全儲存至最終目的地。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked 屬性

您可以存取其 WritableStream.locked 屬性,檢查可寫入的串流是否已鎖定。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

可寫入串流程式碼範例

以下程式碼範例呈現了所有的實際操作步驟。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

將可讀取的串流移至可寫入的串流

您可透過可讀取串流的 pipeTo() 方法,將可讀取的串流傳輸至可寫入的串流。ReadableStream.pipeTo() 會將目前的 ReadableStream 傳遞給指定的 WritableStream,並傳回能在管道程序成功完成時顯示的承諾,或拒絕任何錯誤。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

建立轉換串流

Streams API 的 TransformStream 介面代表一組可轉換的資料。您可以呼叫其建構函式 TransformStream() 來建立轉換串流,從指定的處理常式建立並傳回轉換串流物件。TransformStream() 建構函式接受其第一個引數,也就是代表 transformer 的選用 JavaScript 物件。這類物件可包含以下任何方法:

transformer

  • start(controller):建構物件後,會立即呼叫這個方法。一般而言,這會使用 controller.enqueue() 將前置字串區塊加入佇列。這些區塊會從可讀取的端讀取,但並不仰賴寫入可寫入端的任何資料。如果這個初始程序是非同步的,例如因為取得前置字串區塊需要花費一些心力,函式可以傳回承諾,指出成功或失敗;遭拒的承諾將導致串流錯誤。任何擲回的例外狀況都會由 TransformStream() 建構函式重新擲回。
  • transform(chunk, controller):如果新區塊原本寫入可寫入端,而可供轉換,系統就會呼叫這個方法。串流實作會保證,只有在先前的轉換成功後,且在 start() 完成或呼叫 flush() 之後,才會呼叫此函式。這個函式會執行轉換串流的實際轉換工作。可以使用 controller.enqueue() 將結果排入佇列。這樣就能確保寫入可寫入端的單一區塊,依據呼叫 controller.enqueue() 的次數,在可讀取端產生零或多個區塊。如果轉換作業是非同步進行的,這個函式可以傳回承諾,表示轉換成功或失敗。遭拒的承諾會同時發生錯誤轉換串流可讀取和可寫入的那面。如未提供 transform() 方法,則會使用身分轉換功能,將未變更的區塊排入可寫入端的區塊。
  • flush(controller):所有寫入可寫入端的區塊都已透過 transform() 成功轉換且即將關閉,然後呼叫此方法。通常用於將後置字串區塊排入可讀取的端,以免這個區塊關閉。如果清除程序為非同步性質,函式可以傳回承諾,以表示成功或失敗,並將結果傳達給 stream.writable.write() 的呼叫端。此外,遭拒的承諾會同時錯誤可讀取和可寫入的兩側。擲回例外狀況與傳回遭拒承諾視為相同。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy 佇列策略

TransformStream() 建構函式的第二和第三個選用參數是選用的 writableStrategyreadableStrategy 佇列策略。如可讀取可寫入串流部分所述。

轉換串流程式碼範例

以下程式碼範例顯示簡易轉換串流的實際運作情形。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

透過轉換串流找出可讀取的串流

ReadableStream 介面的 pipeThrough() 方法提供鏈結方式,可讓您透過轉換串流或任何其他可寫入/可讀取的組合,傳輸目前的串流。傳輸串流時,通常會在管道期間鎖定,避免其他讀者鎖定串流。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

下一個程式碼範例 (有點複雜) 說明如何實作 fetch() 的「大聲公」版本,透過使用傳回的回應承諾「做為串流」,並按區塊的大寫區塊,將所有文字大寫。這種做法的好處是,您不需要等待完整文件下載完成,這在處理大型檔案時可產生巨大變化。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

操作示範

下方示範呈現了可讀取、可寫入及轉換串流的實際運作情形。其中也包含 pipeThrough()pipeTo() 管道鏈的範例,以及說明 tee()。您可以選擇在專屬的視窗中執行 demo,或是查看原始碼

瀏覽器提供的實用訊息串

瀏覽器中內建許多實用的訊息串。您可以從 blob 輕鬆建立 ReadableStreamBlob 介面的 stream() 方法會在讀取 blob 時傳回 ReadableStream。另請注意,File 物件是 Blob 的特定種類,可用於 blob 的任何結構定義。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() 的串流變數分別稱為 TextDecoderStreamTextEncoderStream

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

您可以使用 CompressionStreamDecompressionStream 轉換串流分別壓縮或解壓縮檔案。下列程式碼範例說明如何下載串流規格、直接在瀏覽器中壓縮 (gzip),以及將壓縮檔直接寫入磁碟。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream 和實驗性 fetch() 要求串流都是可寫入的串流示例。

Serial API 會大量使用可讀取和可寫入的串流。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最後,WebSocketStream API 會將串流與 WebSocket API 整合。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

實用資源

特別銘謝

本文由 Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice 審查。Jake Archibald 的網誌文章讓我進一步瞭解串流。部分程式碼範例的靈感是來自 GitHub 使用者 @bellbind 的探索,以及 MDN 網路文件串流中大量建構項目。Streams Standard作者在編寫這個規格方面有相當出色的工作。主頁橫幅由 Ryan LaraUnsplash 上完成。