ストリーム - 決定版ガイド

Streams API で、読み取り、書き込み、変換を行うストリームを使用する方法について説明します。

Streams API を使用すると、ネットワーク経由で受信したデータ ストリーム、またはローカルで作成されたデータ ストリームにプログラムでアクセスし、JavaScript で処理できます。ストリーミングでは、受信、送信、または変換するリソースを小さなチャンクに分割し、これらのチャンクを少しずつ処理します。ストリーミングは、ウェブページに表示する HTML や動画などのアセットを受信する際にブラウザが行う処理ですが、2015 年にストリームによる fetch が導入されるまでは、この機能を JavaScript で利用することはできませんでした。

これまでは、なんらかの種類のリソース(動画やテキスト ファイルなど)を処理する場合、ファイル全体をダウンロードし、適切な形式にシリアル化解除されるのを待ってから処理する必要がありました。ストリームを JavaScript で利用できるようになったことで このすべてが変わりましたこれにより、バッファ、文字列、blob を生成する必要なく、クライアントで利用可能になり次第、JavaScript で元データを段階的に処理できるようになりました。これにより、さまざまなユースケースが可能になります。その一部を以下に示します。

  • 動画エフェクト: 読み取り可能な動画ストリームを変換ストリームにパイピングし、リアルタイムでエフェクトを適用します。
  • データの(解凍)圧縮: ファイル ストリームを選択的に(解凍)する変換ストリームを介してファイル ストリームをパイピングします。
  • 画像のデコード: バイトをビットマップ データにデコードする変換ストリームを介して HTTP レスポンス ストリームをパイプし、次にビットマップを PNG に変換する別の変換ストリームをパイプします。Service Worker の fetch ハンドラ内にインストールされていれば、AVIF などの新しい画像形式を透過的にポリフィルできます。

ブラウザ サポート

ReadableStream と WritableStream

対応ブラウザ

  • 43
  • 14
  • 65
  • 10.1

ソース

TransformStream

対応ブラウザ

  • 67
  • 79
  • 102
  • 14.1

ソース

基本コンセプト

さまざまな種類のストリームの詳細を説明する前に、基本的なコンセプトをいくつか紹介しましょう。

チャンク

チャンクとは、ストリームに対して読み書きされる単一のデータのことです。どのようなタイプでもかまいません。さらに、ストリームにさまざまなタイプのチャンクを含めることもできます。ほとんどの場合、チャンクは特定のストリームの最もアトミックな単位ではありません。たとえば、バイト ストリームには、1 バイトではなく、16 KiB の Uint8Array ユニットで構成されるチャンクが含まれることがあります。

読み取り可能なストリーム

読み取り可能なストリームとは、読み取り可能なデータのソースを表します。つまり、データは読み取り可能なストリームから出力されます。具体的には、読み取り可能なストリームは ReadableStream クラスのインスタンスです。

書き込み可能なストリーム

書き込み可能なストリームとは、書き込み可能なデータの宛先を表します。つまり、データは書き込み可能なストリームに入ります。具体的には、書き込み可能なストリームは WritableStream クラスのインスタンスです。

ストリームを変換する

変換ストリームは、ストリームのペア(書き込み可能側と呼ばれる書き込み可能なストリームと読み取り側と呼ばれる読み取り可能なストリーム)で構成されます。現実世界でこれを表現する例は、ある言語から別の言語に臨機応変に翻訳する同時通訳です。変換ストリームに固有の方法で、書き込み可能側に書き込むと、新しいデータを読み取り側から読み取れるようになります。具体的には、writable プロパティと readable プロパティを持つオブジェクトは変換ストリームとして機能します。ただし、標準の TransformStream クラスを使用すると、適切にもつれたペアを簡単に作成できます。

パイプ チェーン

ストリームは主に、ストリームを相互にパイプして使用します。読み取り可能なストリームは、読み取り可能なストリームの pipeTo() メソッドを使用して書き込み可能なストリームに直接パイプできます。また、読み取り可能ストリームの pipeThrough() メソッドを使用して、最初に 1 つ以上の変換ストリームを介してパイプ処理することもできます。このようにパイプされた複数のストリームの集合をパイプライン チェーンと呼びます。

バックプレッシャー

パイプ チェーンが構築されると、チャンクがどれだけ速く流れるかに関するシグナルが伝播されます。チェーン内のいずれかのステップでチャンクがまだ受け入れられない場合、パイプライン チェーンを介して信号を逆方向に伝播し、最終的に元のソースにはチャンクの生成を急いで停止するように指示されるまで伝播します。フローを正規化するこのプロセスは、バックプレッシャーと呼ばれます。

ティーイング

読み取り可能なストリームは、tee() メソッドを使用して(大文字の「T」の形にちなんで名付けられます)ティードできます。これにより、ストリームがロックされます。つまり、直接使用できなくなります。ただし、ブランチと呼ばれる 2 つの新しいストリームが作成され、個別に使用できます。ティーイングも重要です。ストリームを巻き戻したり再開したりすることはできません。詳しくは後ほど説明します。

フェッチ API の呼び出しから来る読み取り可能なストリームで構成されるパイプ チェーンの図。このストリームは変換ストリームに送られ、その出力が取り出され、最初に生成される読み取り可能なストリームはブラウザに送信され、2 番目の読み取り可能なストリームは Service Worker のキャッシュに送信されます。
パイプチェーン。

読み取り可能なストリームの仕組み

読み取り可能なストリームとは、JavaScript で表されるデータソースであり、基盤となるソースから流れる ReadableStream オブジェクトによって示されます。ReadableStream() コンストラクタは、指定されたハンドラから読み取り可能なストリーム オブジェクトを作成して返します。基盤となるソースには次の 2 種類があります。

  • push ソースは、アクセスすると常にデータを push します。ストリームへのアクセスを開始、一時停止、キャンセルするかどうかはユーザー次第です。たとえば、ライブ動画ストリーム、サーバー送信イベント、WebSocket などです。
  • pull ソースでは、接続後にソースのデータを明示的にリクエストする必要があります。たとえば、fetch()XMLHttpRequest の呼び出しによる HTTP オペレーションなどです。

ストリーム データは、チャンクと呼ばれる小さな断片で順次読み取られます。ストリームに配置されたチャンクは「エンキュー」と呼ばれます。これは読み取り可能なキューで 待機していることを意味します内部キューは、まだ読み取られていないチャンクを追跡します。

キューイング戦略とは、内部キューの状態に基づいてストリームがバックプレッシャーを通知する方法を決定するオブジェクトです。キューイング戦略では、各チャンクにサイズを割り当て、キュー内のすべてのチャンクの合計サイズを指定した数(ハイ ウォーターマーク)と比較します。

ストリーム内のチャンクは、リーダーによって読み取られます。このリーダーはデータを一度に 1 チャンクずつ取得するため、どのようなオペレーションでも実行できます。リーダーとそれに伴う他の処理コードを合わせたものは、コンシューマと呼ばれます。

このコンテキストの次の構造体は、コントローラと呼ばれます。読み取り可能な各ストリームには、その名前が示すように、ストリームを制御できるコントローラが関連付けられています。

ストリームを読み取ることができるリーダーは一度に 1 つのみです。リーダーが作成されてストリームの読み取りを開始すると(つまり、アクティブなリーダーになると)、そのストリームにロックされます。別のリーダーにストリームの読み取りを引き継ぐ場合は、通常、他の操作を行う前に最初のリーダーを解放する必要があります(ただし、ストリームを開始することはできます)。

読み取り可能なストリームを作成する

読み取り可能なストリームを作成するには、そのコンストラクタ ReadableStream() を呼び出します。このコンストラクタには、オプションの引数 underlyingSource があります。これは、作成されたストリーム インスタンスの動作を定義するメソッドとプロパティを持つオブジェクトを表します。

underlyingSource

これには、デベロッパーが定義した以下のオプションのメソッドを使用できます。

  • start(controller): オブジェクトが作成されるとすぐに呼び出されます。このメソッドでは、ストリーム ソースにアクセスしたり、ストリーム機能を設定するために必要なその他の操作を実行したりできます。このプロセスを非同期で行う場合、このメソッドは、成功または失敗を通知する Promise を返すことができます。このメソッドに渡される controller パラメータは ReadableStreamDefaultController です。
  • pull(controller): 追加のチャンクが取得されたときにストリームの制御に使用できます。ストリームの内部チャンクのチャンクがいっぱいでない限り、キューがハイ ウォーターマークに達するまで繰り返し呼び出されます。pull() の呼び出し結果が Promise の場合、その Promise が解決されるまで pull() が再度呼び出されることはありません。Promise が拒否された場合、ストリームはエラーになります。
  • cancel(reason): ストリーム コンシューマがストリームをキャンセルしたときに呼び出されます。
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController は、次のメソッドをサポートしています。

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

ReadableStream() コンストラクタの 2 番目の引数(同様にオプションの引数)は queuingStrategy です。これはオプションでストリームのキューイング戦略を定義するオブジェクトであり、次の 2 つのパラメータを取ります。

  • highWaterMark: このキューイング戦略を使用するストリームのハイ ウォーター マークを示す 0 以上の数値。
  • size(chunk): 指定されたチャンク値の負でない有限サイズを計算して返す関数。その結果を使用してバックプレッシャーを判断し、適切な ReadableStreamDefaultController.desiredSize プロパティを通じて明示します。また、基盤となるソースの pull() メソッドがいつ呼び出されるかも制御します。
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

getReader() メソッドと read() メソッド

読み取り可能なストリームから読み取るには、ReadableStreamDefaultReader であるリーダーが必要です。ReadableStream インターフェースの getReader() メソッドは、リーダーを作成し、ストリームをロックします。ストリームがロックされている間は、このリーダーが解放されるまで他のリーダーを取得できません。

ReadableStreamDefaultReader インターフェースの read() メソッドは、ストリームの内部キューにある次のチャンクへのアクセスを提供する Promise を返します。ストリームの状態に応じて、解決または拒否の結果を返します。次のようなさまざまな方法があります。

  • チャンクが利用可能な場合、Promise は
    { value: chunk, done: false } という形式のオブジェクトで処理されます。
  • ストリームが閉じられると、Promise は
    { value: undefined, done: true } という形式のオブジェクトで処理されます。
  • ストリームでエラーが発生すると、Promise は関連するエラーで拒否されます。
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

locked プロパティ

読み取り可能なストリームがロックされているかどうかを確認するには、ReadableStream.locked プロパティにアクセスします。

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

読み取り可能なストリーム コードサンプル

以下のコードサンプルは、実際の手順をすべて示しています。まず、underlyingSource 引数(つまり、TimestampSource クラス)で start() メソッドを定義する ReadableStream を作成します。このメソッドは、10 秒間の間、controller 1 秒ごとにenqueue()ストリームのタイムスタンプを通知します。 最後に、ストリームを close() するようコントローラに指示します。このストリームを消費するには、getReader() メソッドでリーダーを作成し、ストリームが done になるまで read() を呼び出します。

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

非同期反復処理

read() ループの反復処理ごとにストリームが done かどうかをチェックするのは、最も便利な API ではない場合があります。幸いなことに、これより優れた方法がまもなく提供され、非同期イテレーションが実現するでしょう。

for await (const chunk of stream) {
  console.log(chunk);
}

現在非同期イテレーションを使用するための回避策は、ポリフィルで動作を実装することです。

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

読み取り可能なストリームをティーンする

ReadableStream インターフェースの tee() メソッドは、現在読み取り可能なストリームに先行し、結果の 2 つのブランチを含む 2 要素配列を新しい ReadableStream インスタンスとして返します。これにより、2 人のリーダーが同時にストリームを読み取ることができます。たとえば、サーバーからレスポンスを取得してブラウザにストリーミングするだけでなく、Service Worker のキャッシュにもストリーミングする場合に、Service Worker でこれを行うことができます。レスポンスの本文は複数回使用できないため、これを行うには 2 つのコピーが必要です。ストリームをキャンセルするには、生成されるブランチの両方をキャンセルする必要があります。通常、ストリームをティーンすると、そのストリームがロックされ、他のリーダーがロックできなくなります。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

読み取り可能なバイト ストリーム

バイトを表すストリームの場合は、特にコピーを最小限に抑えることで、バイトを効率的に処理するために読み取り可能なストリームの拡張バージョンが提供されます。バイト ストリームを使用すると、Bring Your Own Buffer(BYOB)リーダーを取得できます。デフォルトの実装では、WebSocket の場合に文字列バッファや配列バッファなど、さまざまな出力範囲を提供できますが、バイト ストリームはバイト出力を保証します。さらに、BYOB リーダーには安定性という利点があります。バッファがデタッチされると、1 つのバッファが同じバッファに 2 回書き込まないことが保証され、競合状態を回避できるためです。BYOB リーダーはバッファを再利用できるため、ブラウザでガベージ コレクションを実行する必要がある回数を減らすことができます。

読み取り可能なバイト ストリームの作成

読み取り可能なバイト ストリームを作成するには、追加の type パラメータを ReadableStream() コンストラクタに渡します。

new ReadableStream({ type: 'bytes' });

underlyingSource

読み取り可能なバイト ストリームの基盤となるソースには、操作する ReadableByteStreamController が与えられます。その ReadableByteStreamController.enqueue() メソッドは、値が ArrayBufferViewchunk 引数を取ります。プロパティ ReadableByteStreamController.byobRequest は、現在の BYOB pull リクエストを返します。存在しない場合は null を返します。最後に、ReadableByteStreamController.desiredSize プロパティは、制御対象ストリームの内部キューを満たすのに必要なサイズを返します。

queuingStrategy

ReadableStream() コンストラクタの 2 番目の引数(同様にオプションの引数)は queuingStrategy です。これはオプションでストリームのキューイング戦略を定義するオブジェクトであり、次の 1 つのパラメータを取ります。

  • highWaterMark: このキューイング戦略を使用するストリームのハイ ウォーター マークを示す、負でないバイト数。これはバックプレッシャーの決定に使用され、適切な ReadableByteStreamController.desiredSize プロパティを通じて明示されます。また、基盤となるソースの pull() メソッドがいつ呼び出されるかも制御します。

getReader() メソッドと read() メソッド

その後、mode パラメータを ReadableStream.getReader({ mode: "byob" }) のように設定することで、ReadableStreamBYOBReader にアクセスできます。これにより、バッファの割り当てをより詳細に制御してコピーを回避できます。バイト ストリームから読み取るには、ReadableStreamBYOBReader.read(view) を呼び出す必要があります。ここで、viewArrayBufferView です。

読み取り可能なバイト ストリームのコードサンプル

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

次の関数は、ランダムに生成された配列の効率的なゼロコピー読み取りを可能にする読み取り可能なバイト ストリームを返します。あらかじめ決められた 1,024 のチャンクサイズを使用する代わりに、デベロッパー提供のバッファを埋めようとし、完全な制御を可能にします。

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

書き込み可能なストリームの仕組み

書き込み可能なストリームはデータを書き込むことができる宛先であり、JavaScript では WritableStream オブジェクトで表されます。これは、基盤となるシンク(元データが書き込まれる下位レベルの I/O シンク)を抽象化したものです。

データは、ライターを介してストリームに一度に 1 つのチャンクで書き込まれます。チャンクには、リーダーのチャンクと同様に、さまざまな形態があります。書き込み可能なチャンクを生成するには、どのようなコードでもかまいません。ライターと関連コードをプロデューサーと呼びます。

ライターが作成され、ストリームへの書き込みを開始すると(アクティブなライター)、そのライターは「ロックされた」と言われます。書き込み可能なストリームに一度に書き込めるライターは 1 つだけです。別のライターがストリームへの書き込みを開始したい場合は、通常、別のライターをアタッチする前にそのライターを解放する必要があります。

内部キューは、ストリームに書き込まれたものの、基盤となるシンクでまだ処理されていないチャンクを追跡します。

キューイング戦略とは、内部キューの状態に基づいてストリームがバックプレッシャーを通知する方法を決定するオブジェクトです。キューイング戦略では、各チャンクにサイズを割り当て、キュー内のすべてのチャンクの合計サイズを指定した数(ハイ ウォーターマーク)と比較します。

最終的な構造はコントローラと呼ばれます。書き込み可能な各ストリームにはコントローラが関連付けられており、これによりストリームの制御(中止など)を行うことができます。

書き込み可能なストリームの作成

Streams API の WritableStream インターフェースは、シンクと呼ばれる宛先にストリーミング データを書き込むための標準的な抽象化を提供します。このオブジェクトには、バックプレッシャーとキューイングが組み込まれています。書き込み可能なストリームを作成するには、そのコンストラクタ WritableStream() を呼び出します。オプションの underlyingSink パラメータは、作成されたストリーム インスタンスの動作を定義するメソッドとプロパティを持つオブジェクトを表します。

underlyingSink

underlyingSink には、デベロッパーが定義した以下のオプションのメソッドを含めることができます。一部のメソッドに渡される controller パラメータは WritableStreamDefaultController です。

  • start(controller): このメソッドは、オブジェクトが作成されるとすぐに呼び出されます。このメソッドの内容は、基盤となるシンクにアクセスすることを目的としています。このプロセスを非同期で行う場合は、成功または失敗を通知する Promise を返すことができます。
  • write(chunk, controller): このメソッドは、新しいデータチャンク(chunk パラメータで指定)を基盤となるシンクに書き込む準備ができたときに呼び出されます。書き込みオペレーションの成功または失敗を通知する Promise を返すことができます。このメソッドは、以前の書き込みが成功した後にのみ呼び出されます。ストリームが閉じたり中止されたりした後は、決して呼び出されません。
  • close(controller): このメソッドは、ストリームへのチャンクの書き込みが完了したことをアプリが通知した場合に呼び出されます。コンテンツは、基盤となるシンクへの書き込みを確定するために必要なすべての処理を行い、書き込みアクセス権を解放する必要があります。このプロセスが非同期の場合は、成功または失敗を通知する Promise を返すことができます。このメソッドは、キューに格納された書き込みがすべて成功した後にのみ呼び出されます。
  • abort(reason): このメソッドは、ストリームを突然閉じてエラー状態にすることをアプリが通知した場合に呼び出されます。close() と同様に、保持されているリソースをクリーンアップできますが、書き込みがキューに入っている場合でも abort() が呼び出されます。これらのチャンクは破棄されます。このプロセスが非同期の場合は、成功または失敗を通知する Promise を返すことができます。reason パラメータには、ストリームが中止された理由を示す DOMString が含まれます。
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Streams API の WritableStreamDefaultController インターフェースは、設定時、書き込み用に送信されたチャンク、または書き込みの終了時に WritableStream の状態を制御できるコントローラを表します。WritableStream を作成すると、基になるシンクに操作対象の WritableStreamDefaultController インスタンスが与えられます。WritableStreamDefaultController のメソッドは WritableStreamDefaultController.error() だけです。これにより、関連するストリームとの以降のインタラクションはすべてエラーになります。WritableStreamDefaultController は、AbortSignal のインスタンスを返す signal プロパティもサポートしています。これにより、必要に応じて WritableStream オペレーションを停止できます。

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

WritableStream() コンストラクタの 2 番目の引数(同様にオプションの引数)は queuingStrategy です。これはオプションでストリームのキューイング戦略を定義するオブジェクトであり、次の 2 つのパラメータを取ります。

  • highWaterMark: このキューイング戦略を使用するストリームのハイ ウォーター マークを示す 0 以上の数値。
  • size(chunk): 指定されたチャンク値の負でない有限サイズを計算して返す関数。その結果を使用してバックプレッシャーを判断し、適切な WritableStreamDefaultWriter.desiredSize プロパティを通じて明らかにします。

getWriter() メソッドと write() メソッド

書き込み可能なストリームに書き込むには、ライター(WritableStreamDefaultWriter)が必要です。WritableStream インターフェースの getWriter() メソッドは、WritableStreamDefaultWriter の新しいインスタンスを返し、そのインスタンスにストリームをロックします。ストリームがロックされている間は、現在のライターが解放されるまで他のライターは取得できません。

WritableStreamDefaultWriter インターフェースの write() メソッドは、渡されたデータチャンクを WritableStream とその基盤となるシンクに書き込み、書き込みオペレーションの成功または失敗を示す Promise を返します。「成功」の意味は基盤となるシンクによって異なります。これはチャンクが受け入れられたことを示している可能性があり、必ずしも最終的な宛先に安全に保存されているわけではありません。

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

locked プロパティ

書き込み可能なストリームがロックされているかどうかを確認するには、その WritableStream.locked プロパティにアクセスします。

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

書き込み可能なストリームのコードサンプル

以下のコードサンプルは、実際の手順をすべて示しています。

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

読み取り可能なストリームを書き込み可能なストリームにパイプする

読み取り可能なストリームは、読み取り可能なストリームの pipeTo() メソッドを使用して書き込み可能なストリームにパイプできます。ReadableStream.pipeTo() は、現在の ReadableStream を指定された WritableStream にパイプし、パイププロセスが正常に完了すると履行され、エラーが発生した場合は拒否される Promise を返します。

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

変換ストリームの作成

Streams API の TransformStream インターフェースは、変換可能な一連のデータを表します。変換ストリームを作成するには、そのコンストラクタ TransformStream() を呼び出します。このコンストラクタは、指定されたハンドラから変換ストリーム オブジェクトを作成して返します。TransformStream() コンストラクタは、最初の引数として、transformer を表す JavaScript オブジェクト(省略可)を受け取ります。このようなオブジェクトには、次のいずれかのメソッドを含めることができます。

transformer

  • start(controller): このメソッドは、オブジェクトが作成されるとすぐに呼び出されます。これは通常、controller.enqueue() を使用して接頭辞チャンクをキューに追加するために使用されます。これらのチャンクは読み取り側から読み取られますが、書き込み可能側への書き込みには依存しません。この初期プロセスが非同期である場合(たとえば、接頭辞のチャンクの取得にある程度の時間がかかるため)、関数は成功または失敗を通知する Promise を返すことができます。拒否された Promise はストリームでエラーになります。スローされた例外は、TransformStream() コンストラクタによって再度スローされます。
  • transform(chunk, controller): このメソッドは、最初に書き込み可能側に書き込まれた新しいチャンクの変換準備が整うと呼び出されます。ストリーム実装では、この関数が前の変換が成功した後にのみ呼び出されることが保証されます。start() が完了する前や flush() が呼び出された後には呼び出されません。この関数は変換ストリームの実際の変換作業を実行しますcontroller.enqueue() を使用して結果をキューに追加できます。これにより、書き込み可能側に書き込まれた単一のチャンクに対して、controller.enqueue() が呼び出される回数に応じて、読み取り側でゼロまたは複数のチャンクが生成されます。変換プロセスが非同期の場合、この関数は変換の成功または失敗を通知する Promise を返すことができます。Promise が拒否された場合、変換ストリームの読み取り側と書き込み可能側の両方でエラーが発生します。transform() メソッドが指定されていない場合は、ID 変換が使用されます。これにより、チャンクが書き込み可能側から読み取り側に変更されることなくキューに追加されます。
  • flush(controller): このメソッドは、書き込み可能側に書き込まれたすべてのチャンクが transform() を正常に渡すことで変換され、書き込み可能側が閉じられるときに呼び出されます。これは通常、読み取り側に接尾辞のチャンクをキューに追加するために使用され、その後、チャンクが閉じられることはありません。フラッシュ プロセスが非同期の場合、関数は成功または失敗を通知する Promise を返すことができます。結果は stream.writable.write() の呼び出し元に通知されます。さらに、Promise が拒否された場合、ストリームの読み取り側と書き込み可能側の両方でエラーが発生します。例外のスローは、拒否された Promise を返す場合と同様に扱われます。
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

writableStrategyreadableStrategy のキューイング戦略

TransformStream() コンストラクタの 2 番目と 3 番目の省略可能なパラメータは、オプションの writableStrategyreadableStrategy のキューイング戦略です。これらは、読み取り可能書き込み可能なストリーム セクションでそれぞれ概説されているように定義されています。

変換ストリームのコードサンプル

次のコードサンプルは、単純な変換ストリームの動作を示しています。

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

読み取り可能なストリームを変換ストリームでパイプする

ReadableStream インターフェースの pipeThrough() メソッドは、変換ストリームまたはその他の書き込み可能/読み取り可能なペアを介して現在のストリームをパイプする、連鎖可能な方法を提供します。ストリームのパイプ処理は通常、パイプ処理の間だけストリームをロックし、他のリーダーがストリームをロックできないようにします。

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

次のコードサンプル(少し不自然)は、fetch() の「シャウト」バージョンを実装する方法を示しています。このバージョンでは、返されたレスポンス Promise をストリームとして消費し、チャンクごとに大文字化することで、すべてのテキストを大文字にします。このアプローチの利点は、ドキュメント全体がダウンロードされるのを待つ必要がないことです。このことは、大きなファイルを扱う場合に大きな違いをもたらします。

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

デモ

以下のデモは、読み取り、書き込み、変換の各ストリームの動作を示しています。また、pipeThrough()pipeTo() のパイプチェーンの例と、tee() も示します。必要に応じて、デモを専用のウィンドウで実行したり、ソースコードを表示したりできます。

ブラウザで利用できる便利なストリーミング

ブラウザには便利なストリームが数多く組み込まれています。ReadableStream は blob から簡単に作成できます。Blob インターフェースの stream() メソッドは、読み取り時に blob に含まれるデータを返す ReadableStream を返します。また、File オブジェクトは特定の種類の Blob であり、blob が使用できるすべてのコンテキストで使用できます。

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

TextDecoder.decode()TextEncoder.encode() のストリーミング バリアントはそれぞれ TextDecoderStreamTextEncoderStream と呼ばれます。

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

ファイルの圧縮または解凍は、それぞれ CompressionStream 変換ストリームと DecompressionStream 変換ストリームを使用して簡単に行えます。次のコードサンプルは、Streams 仕様をダウンロードし、ブラウザで直接圧縮(gzip)して、圧縮ファイルを直接ディスクに書き込む方法を示しています。

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

File System Access APIFileSystemWritableFileStream と試験運用版の fetch() リクエスト ストリームは、実際の書き込み可能なストリームの例です。

Serial API は、読み取り可能なストリームと書き込み可能なストリームの両方を多用します。

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

最後に、WebSocketStream API はストリームを WebSocket API と統合します。

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

関連リソース

謝辞

この記事は、Jake ArchibaldFrançois BeaufortSam DuttonMattias BuelensSurmaJoe MedleyAdam Rice によってレビューされました。Jake Archibald のブログ投稿は、ストリームを理解するうえで非常に役立ちました。コードサンプルの一部は GitHub ユーザー @bellbind の探索から着想を得たものです。散文の一部は、ストリーム上の MDN ウェブ ドキュメント上に大きく構築されています。この仕様は、Streams Standard作成者によって多大な成果を上げています(ヒーロー画像、Ryan LaraUnsplash より)。