Strumienie – kompleksowy przewodnik

Dowiedz się, jak korzystać z czytelnych i możliwych do zapisu strumieni oraz jak je przekształcać za pomocą interfejsu Streams API.

Interfejs Streams API umożliwia programowy dostęp do strumieni danych otrzymanych przez sieć lub utworzonych w dowolny sposób lokalnie i przetwarzania ich za pomocą JavaScriptu. Strumieniowanie obejmuje podział zasobu, który chcesz otrzymywać, wysyłać lub przekształcać na małe fragmenty, a następnie przetwarzać te fragmenty krok po kroku. Strumieniowe przesyłanie danych i tak działa przy odbieraniu zasobów takich jak HTML czy filmy do stron internetowych, ale ta funkcja nie była dostępna w języku JavaScript przed wprowadzeniem funkcji fetch w strumieniach w 2015 roku.

Wcześniej, aby przetworzyć jakiś zasób (np. film, plik tekstowy itp.), trzeba było pobrać cały plik, poczekać na jego deserializację, a potem przetworzyć. Wszystko się zmienia, gdy strumienie będą dostępne w języku JavaScript. Nieprzetworzone dane można teraz przetwarzać za pomocą JavaScriptu stopniowo, w miarę jak będzie on dostępny na kliencie, bez konieczności generowania bufora, ciągu znaków czy obiektu blob. Zapewnia to wiele korzyści, w tym wymienię niektóre z nich:

  • Efekty wideo: uporządkowanie czytelnego strumienia wideo przez strumień przekształceń, który stosuje efekty w czasie rzeczywistym.
  • Kompresja (de)kompresja danych: przesyłanie strumienia pliku przez strumień przekształcania, który selektywnie (de)kompresuje dane.
  • Dekodowanie obrazu: uporządkowanie strumienia odpowiedzi HTTP przez strumień przekształcania, który dekoduje bajty do danych bitmapy, a następnie przez inny strumień przekształcania, który przekształca mapy bitowe na pliki PNG. Jeśli jest instalowany w ramach modułu obsługi fetch skryptu service worker, pozwala to na przejrzyste wypełnianie nowym formatem obrazów, np. AVIF.

Obsługiwane przeglądarki

ReadableStream i WritableStream

Obsługa przeglądarek

  • 43
  • 14
  • 65
  • 10.1

Źródło

TransformStream

Obsługa przeglądarek

  • 67
  • 79
  • 102
  • 14.1

Źródło

Podstawowe pojęcia

Zanim przejdziemy do szczegółów różnych typów transmisji, pozwólcie, że przedstawię kilka podstawowych pojęć.

Kawałki

Fragment to pojedynczy fragment danych zapisywany w strumieniu lub z niego odczytywany. Strumienie mogą zawierać dowolny rodzaj – nawet fragmenty różnych typów. W większości przypadków fragment nie będzie najbardziej atomową jednostką danych w danym strumieniu. Na przykład strumień bajtów może zawierać fragmenty składające się z 16 KiB jednostek Uint8Array, a nie pojedynczych bajtów.

Czytelne strumienie

Czytelny strumień stanowi źródło danych, z którego można odczytywać dane. Inaczej mówiąc, dane wydobywają się z czytelnego strumienia. Czytelnym strumieniem jest konkretnie instancja klasy ReadableStream.

Strumienie z możliwością zapisu

Strumień z możliwością zapisu reprezentuje miejsce docelowe danych, w którym można zapisywać. Inaczej mówiąc, dane przekazują się do strumienia z możliwością zapisu. Strumień z możliwością zapisu jest instancją klasy WritableStream.

Przekształcanie strumieni

Strumień przekształcania składa się z pary strumieni: strumienia z możliwością zapisu (strona z możliwością zapisu) i czytelnego strumienia, czyli strony, którą można odczytać. Prawdziwą metaforą takiej sytuacji mógłby być tłumacz symultaniczny, który na bieżąco tłumaczy z jednego języka na inny. W sposób charakterystyczny dla strumienia przekształcania zapis po stronie możliwej do zapisu powoduje udostępnienie nowych danych do odczytu od strony czytelnej. Ogólnie rzecz biorąc, jako strumień przekształcania może służyć każdy obiekt z właściwością writable i właściwością readable. Jednak standardowa klasa TransformStream ułatwia utworzenie takiej pary, która jest prawidłowo splątana.

Łańcuchy rur

Strumienie są używane głównie przez przenoszenie ich do siebie. Czytelny strumień może być przekazywany bezpośrednio do strumienia z możliwością zapisu za pomocą metody pipeTo() czytelnego strumienia lub może być najpierw przekazywany przez co najmniej 1 strumień przekształcenia z użyciem metody pipeThrough() czytelnego strumienia. Zbiór strumieni połączonych w ten sposób jest nazywany łańcuchem pionowym.

Ciśnienie wsteczne

Po utworzeniu łańcucha potoku będzie ono przekazywać sygnały wskazujące, jak szybko mają być przez niego przepływane. Jeśli dowolny krok w łańcuchu nie może jeszcze przyjmować fragmentów, przekazuje sygnał wstecz w łańcuchu potoku aż do momentu, w którym pierwotne źródło nie może już tak szybko produkować fragmentów. Ten proces normalizowania przepływu jest nazywany ciśnieniem wstecznym.

Teeing

Czytelny strumień można zmodyfikować (nazwany na podstawie kształtu wielkiego litery „T”) przy użyciu metody tee(). Spowoduje to zablokowanie strumienia, czyli sprawi, że nie będzie już można go używać bezpośrednio. Spowoduje to jednak utworzenie 2 nowych strumieni, zwanych gałęziami, które można wykorzystywać niezależnie. Rozgrywka na początku jest też ważna, ponieważ transmisji nie można przewijać ani uruchamiać ponownie. Więcej o tym dowiesz się później.

Schemat łańcucha potoku składającego się z czytelnego strumienia pochodzącego z wywołania interfejsu API pobierania, który jest następnie przekazywany przez strumień przekształcania, którego dane wyjściowe są przetwarzane, a następnie wysyłane do przeglądarki dla pierwszego wynikowego czytelnego strumienia i do pamięci podręcznej skryptu service worker dla drugiego uzyskanego do odczytu strumienia.
Łańcuch pionowy.

Mechanika czytelnego strumienia

Czytelny strumień to źródło danych reprezentowane w kodzie JavaScript przez obiekt ReadableStream, który przepływa ze źródła. Konstruktor ReadableStream() tworzy i zwraca czytelny obiekt strumienia z podanych modułów obsługi. Wyróżniamy 2 rodzaje źródeł:

  • Źródła push nieustannie przesyłają Ci dane, gdy już je otworzysz. To Ty decydujesz, czy chcesz rozpocząć, wstrzymać lub anulować dostęp do strumienia. Przykładami są strumienie wideo na żywo, zdarzenia wysyłane przez serwer i WebSockets.
  • Po nawiązaniu połączenia ze źródłami pobierania musisz wyraźnie poprosić o dostęp do danych z tych źródeł. Mogą to być na przykład operacje HTTP przeprowadzane przy użyciu wywołań fetch() lub XMLHttpRequest.

Dane ze strumienia są odczytywane sekwencyjnie w małych porcjach zwanych fragmentami. Mówi się, że fragmenty umieszczone w strumieniu są umieszczone w kolejce. Oznacza to, że czekają one w kolejce gotowej do odczytania. Wewnętrzna kolejka śledzi fragmenty, które nie zostały jeszcze przeczytane.

Strategia kolejkowania to obiekt, który na podstawie stanu kolejki wewnętrznej określa, w jaki sposób strumień powinien sygnalizować wsteczne działanie. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje całkowity rozmiar wszystkich fragmentów w kolejce z określoną liczbą nazywaną wysokim znakiem wody.

Fragmenty strumienia są odczytywane przez czytnika. Odczytuje on dane pojedynczo, dzięki czemu możesz wykonywać na nich dowolne operacje. Czytnik oraz powiązany z nim kod przetwarzania są nazywane konsumentem.

Następna konstrukcja w tym kontekście jest nazywana kontrolerem. Z każdym czytelnym strumieniem powiązany jest kontroler, który, jak sama nazwa wskazuje, umożliwia kontrolowanie strumienia.

Strumień może czytać tylko 1 czytelnik w danym momencie. Gdy zostanie utworzony czytelnik i zacznie on czytać strumień (czyli stanie się aktywnym czytnikiem), zostanie na nim zablokowany. Jeśli chcesz, aby inny czytelnik przejął czytanie Twojego strumienia, zwykle musisz zwolnić pierwszy czytelnik, zanim cokolwiek zrobisz (choć możesz pisać).

Tworzenie czytelnego strumienia

Aby utworzyć czytelny strumień, wywołaj jego konstruktor ReadableStream(). Konstruktor ma opcjonalny argument underlyingSource, który reprezentuje obiekt z metodami i właściwościami definiującymi działanie instancji utworzonego strumienia.

underlyingSource

Mogą to być te opcjonalne metody zdefiniowane przez programistę:

  • start(controller): wywoływane natychmiast po utworzeniu obiektu. Może ona uzyskać dostęp do źródła strumienia i wykonywać inne czynności wymagane do skonfigurowania funkcji transmisji. Jeśli ten proces ma zostać przeprowadzony asynchronicznie, metoda może zwrócić komunikat o powodzeniu lub niepowodzeniu. Parametr controller przekazywany do tej metody to ReadableStreamDefaultController.
  • pull(controller): umożliwia sterowanie transmisją w miarę pobierania kolejnych fragmentów. Jest on wywoływany wielokrotnie, dopóki wewnętrzna kolejka fragmentów nie będzie pełna, aż do osiągnięcia limitu wody. Jeśli wywołanie funkcji pull() jest obietnicą, usługa pull() nie zostanie wywołana ponownie, dopóki ta obietnica nie zostanie zrealizowana. Jeśli obietnica zostanie odrzucona, strumień wystąpi błąd.
  • cancel(reason): wywoływane, gdy konsument anuluje transmisję.
const readableStream = new ReadableStream({
  start(controller) {
    /* … */
  },

  pull(controller) {
    /* … */
  },

  cancel(reason) {
    /* … */
  },
});

ReadableStreamDefaultController obsługuje te metody:

/* … */
start(controller) {
  controller.enqueue('The first chunk!');
},
/* … */

queuingStrategy

Drugim, również opcjonalnym argumentem konstruktora ReadableStream(), jest queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania na potrzeby strumienia, która przyjmuje 2 parametry:

  • highWaterMark: nieujemna liczba wskazująca wysoki znak wody w strumieniu, w którym zastosowano tę strategię kolejkowania.
  • size(chunk): funkcja, która oblicza i zwraca skończony nieujemny rozmiar danego fragmentu. Ten wynik służy do określania ciśnienia wstecznego ujawnianego za pomocą odpowiedniej właściwości ReadableStreamDefaultController.desiredSize. Reguluje też, kiedy wywoływana jest metoda pull() źródłowego źródła.
const readableStream = new ReadableStream({
    /* … */
  },
  {
    highWaterMark: 10,
    size(chunk) {
      return chunk.length;
    },
  },
);

Metody getReader() i read()

Aby czytać ze czytelnego strumienia, potrzebujesz czytnika w postaci ReadableStreamDefaultReader. Metoda getReader() w interfejsie ReadableStream tworzy czytnik i blokuje na nim strumień. Gdy strumień jest zablokowany, do czasu jego opublikowania nie można pozyskać innego czytnika.

Metoda read() interfejsu ReadableStreamDefaultReader zwraca obietnicę zapewniającą dostęp do następnego fragmentu w wewnętrznej kolejce strumienia. Jest wypełniany lub odrzucany z odpowiednim wynikiem w zależności od stanu strumienia. Dostępne są następujące możliwości:

  • Jeśli fragment jest dostępny, obietnica zostanie zrealizowana za pomocą obiektu formularza
    { value: chunk, done: false }.
  • Jeśli strumień zostanie zamknięty, obietnica zostanie zrealizowana za pomocą obiektu formularza:
    { value: undefined, done: true }.
  • Jeśli w strumieniu wystąpi błąd, obietnica zostanie odrzucona z odpowiednim błędem.
const reader = readableStream.getReader();
while (true) {
  const { done, value } = await reader.read();
  if (done) {
    console.log('The stream is done.');
    break;
  }
  console.log('Just read a chunk:', value);
}

Właściwość locked

Aby sprawdzić, czy czytelny strumień jest zablokowany, przejdź do jego właściwości ReadableStream.locked.

const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Przykładowe czytelne fragmenty kodu strumienia

Przykładowy kod poniżej pokazuje wszystkie kroki w praktyce. Najpierw tworzysz ReadableStream, który w argumencie underlyingSource (klasa TimestampSource) definiuje metodę start(). Ta metoda informuje controller strumienia o tym, że enqueue() ma sygnaturę czasową co sekundę w ciągu 10 sekund. Na koniec informuje kontroler o tym, że ma close() pobrać strumień. Korzystasz z tego strumienia, tworząc czytnik za pomocą metody getReader() i wywołując read() do momentu, gdy strumień zostanie określony done.

class TimestampSource {
  #interval

  start(controller) {
    this.#interval = setInterval(() => {
      const string = new Date().toLocaleTimeString();
      // Add the string to the stream.
      controller.enqueue(string);
      console.log(`Enqueued ${string}`);
    }, 1_000);

    setTimeout(() => {
      clearInterval(this.#interval);
      // Close the stream after 10s.
      controller.close();
    }, 10_000);
  }

  cancel() {
    // This is called if the reader cancels.
    clearInterval(this.#interval);
  }
}

const stream = new ReadableStream(new TimestampSource());

async function concatStringStream(stream) {
  let result = '';
  const reader = stream.getReader();
  while (true) {
    // The `read()` method returns a promise that
    // resolves when a value has been received.
    const { done, value } = await reader.read();
    // Result objects contain two properties:
    // `done`  - `true` if the stream has already given you all its data.
    // `value` - Some data. Always `undefined` when `done` is `true`.
    if (done) return result;
    result += value;
    console.log(`Read ${result.length} characters so far`);
    console.log(`Most recently read chunk: ${value}`);
  }
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));

Iteracja asynchroniczna

Sprawdzenie po każdym powtórzeniu pętli read(), czy strumień ma wartość done, może nie być najwygodniejszym interfejsem API. Na szczęście wkrótce pojawi się lepsza metoda: iteracja asynchroniczna.

for await (const chunk of stream) {
  console.log(chunk);
}

Obejściem umożliwiającym obecnie korzystanie z iteracji asynchronicznego jest wdrożenie działania z użyciem kodu polyfill.

if (!ReadableStream.prototype[Symbol.asyncIterator]) {
  ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    try {
      while (true) {
        const {done, value} = await reader.read();
        if (done) {
          return;
          }
        yield value;
      }
    }
    finally {
      reader.releaseLock();
    }
  }
}

Rozpoczynanie czytelnego strumienia

Metoda tee() interfejsu ReadableStream łączy bieżący czytelny strumień, zwracając tablicę dwuelementową zawierającą 2 powstałe gałęzie jako nowe instancje ReadableStream. Dzięki temu 2 czytelników może czytać strumień jednocześnie. Możesz to zrobić na przykład w skrypcie service worker, jeśli chcesz pobrać odpowiedź z serwera i przesłać ją strumieniowo do przeglądarki, a także przesłać ją do pamięci podręcznej skryptu service worker. Ponieważ treść odpowiedzi nie może być wykorzystana więcej niż raz, potrzebujesz 2 kopii. Aby anulować przesyłanie strumieniowe, musisz anulować obie gałęzie wynikowe. Rozpoczęcie poprzedzania strumienia jest zazwyczaj blokowane na czas określony, przez co inni czytelnicy nie mogą go zablokować.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called `read()` when the controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();

// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}

// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
  const result = await readerB.read();
  if (result.done) break;
  console.log('[B]', result);
}

Czytelne strumienie bajtów

W przypadku strumieni reprezentujących bajty dostępna jest rozszerzona wersja czytelnego strumienia, pozwalająca wydajnie obsługiwać bajty, w szczególności przez zminimalizowanie liczby kopii. Strumienie bajtów umożliwiają pozyskiwanie własnych czytników buforów (BYOB). Domyślna implementacja może zapewniać szereg różnych danych wyjściowych, np. w przypadku WebSockets: ciągi tekstowe lub bufory tablic, natomiast strumienie bajtów gwarantują dane wyjściowe w bajtach. Czytniki BYOB mają też zalety związane ze stabilnością. Dzieje się tak dlatego, że po odłączeniu bufora gwarantuje to, że nie zapisze się on dwukrotnie w tym samym buforze, co pozwala uniknąć wyścigu. Czytniki BYOB mogą zmniejszyć liczbę uruchomień odśmiecania pamięci przez przeglądarkę, ponieważ może używać buforów ponownie.

Tworzenie czytelnego strumienia bajtów

Możesz utworzyć czytelny strumień bajtów, przekazując do konstruktora ReadableStream() dodatkowy parametr type.

new ReadableStream({ type: 'bytes' });

underlyingSource

Podstawowym źródłem czytelnego strumienia bajtów jest ReadableByteStreamController, którym można manipulować. Metoda ReadableByteStreamController.enqueue() przyjmuje argument chunk, którego wartość to ArrayBufferView. Właściwość ReadableByteStreamController.byobRequest zwraca bieżące żądanie pull BYOB lub wartość null, jeśli nie ma takiego żądania. I wreszcie, właściwość ReadableByteStreamController.desiredSize zwraca odpowiedni rozmiar, aby wypełnić wewnętrzną kolejkę kontrolowanego strumienia.

queuingStrategy

Drugim, również opcjonalnym argumentem konstruktora ReadableStream(), jest queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania na potrzeby strumienia, która przyjmuje 1 parametr:

  • highWaterMark: nieujemna liczba bajtów wskazująca wysoki znak wody w strumieniu, w którym zastosowano tę strategię kolejkowania. Służy ona do określania ciśnienia wstecznego ujawnianego za pomocą odpowiedniej właściwości ReadableByteStreamController.desiredSize. Reguluje też, kiedy wywoływana jest metoda pull() źródłowego źródła.

Metody getReader() i read()

Następnie możesz uzyskać dostęp do ReadableStreamBYOBReader, ustawiając odpowiednio parametr mode: ReadableStream.getReader({ mode: "byob" }). Pozwala to dokładniej kontrolować przydzielanie bufora w celu uniknięcia tworzenia kopii. Aby odczytać dane ze strumienia bajtów, musisz wywołać właściwość ReadableStreamBYOBReader.read(view), gdzie view to wartość ArrayBufferView.

Przykładowy czytelny kod strumienia bajtów

const reader = readableStream.getReader({ mode: "byob" });

let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);

async function readInto(buffer) {
  let offset = 0;

  while (offset < buffer.byteLength) {
    const { value: view, done } =
        await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
    buffer = view.buffer;
    if (done) {
      break;
    }
    offset += view.byteLength;
  }

  return buffer;
}

Ta funkcja zwraca czytelne strumienie bajtów, co umożliwia sprawny odczyt losowo wygenerowanej tablicy bez konieczności kopiowania. Zamiast wykorzystać wstępnie ustalony fragment o rozmiarze 1024, próbuje on wypełnić bufor dostarczony przez programistę, zapewniając pełną kontrolę.

const DEFAULT_CHUNK_SIZE = 1_024;

function makeReadableByteStream() {
  return new ReadableStream({
    type: 'bytes',

    pull(controller) {
      // Even when the consumer is using the default reader,
      // the auto-allocation feature allocates a buffer and
      // passes it to us via `byobRequest`.
      const view = controller.byobRequest.view;
      view = crypto.getRandomValues(view);
      controller.byobRequest.respond(view.byteLength);
    },

    autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
  });
}

Mechanika strumienia możliwego do zapisu

Strumień z możliwością zapisu to miejsce docelowe, w którym możesz zapisywać dane reprezentowane w JavaScripcie przez obiekt WritableStream. Pełni to rolę abstrakcji nad podstawowym ujściem, czyli ujściem wejścia-wyjścia niższego poziomu, w którym są zapisywane nieprzetworzone dane.

Dane są zapisywane w strumieniu za pomocą scenarzysty etapami. Fragmenty mogą mieć różne formy, tak jak fragmenty w czytniku. Fragmenty gotowe do napisania możesz utworzyć za pomocą dowolnego kodu. Autor wraz z powiązanym kodem jest nazywany producentem.

Gdy twórca zostaje utworzony i zaczyna pisać w strumieniu (aktywny autor), mówi się, że jest zablokowany. W strumieniu z możliwością zapisu może jednocześnie napisać tylko 1 autor. Jeśli chcesz, aby inny twórca zaczął pisać do Twojej transmisji, zwykle musisz opublikować ją przed dołączeniem do niej kolejnego autora.

Wewnętrzna kolejka śledzi fragmenty, które zostały zapisane w strumieniu, ale nie zostały jeszcze przetworzone przez bazowe ujście.

Strategia kolejkowania to obiekt, który na podstawie stanu kolejki wewnętrznej określa, w jaki sposób strumień powinien sygnalizować wsteczne działanie. Strategia kolejkowania przypisuje rozmiar do każdego fragmentu i porównuje całkowity rozmiar wszystkich fragmentów w kolejce z określoną liczbą nazywaną wysokim znakiem wody.

Ostatnim elementem jest kontroler. Każdy strumień z możliwością zapisu ma powiązany kontroler, który umożliwia kontrolowanie strumienia (np. jego przerwanie).

Tworzę strumień z możliwością zapisu

Interfejs WritableStream interfejsu Streams API udostępnia standardową abstrakcję zapisywania danych strumieniowanych w miejscu docelowym, które nazywa się ujściem. Ten obiekt ma wbudowane tło i kolejkowanie. Strumień możliwy do zapisu tworzysz, wywołując jego konstruktor WritableStream(). Zawiera opcjonalny parametr underlyingSink, który reprezentuje obiekt z metodami i właściwościami definiującymi działanie instancji utworzonego strumienia.

underlyingSink

underlyingSink może zawierać następujące opcjonalne metody zdefiniowane przez programistę. Parametr controller przekazywany do niektórych metod to WritableStreamDefaultController.

  • start(controller): ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zawartość tej metody powinna umożliwiać dostęp do bazowego ujścia. Jeśli ten proces zostanie wykonany asynchronicznie, może zwrócić komunikat o powodzeniu lub porażce.
  • write(chunk, controller): ta metoda jest wywoływana, gdy nowy fragment danych (określony w parametrze chunk) będzie gotowy do zapisania w bazowym ujściu. Może zwrócić obietnicę sygnalizowania powodzenia lub niepowodzenia operacji zapisu. Ta metoda jest wywoływana dopiero po udanym zapisie i nigdy po zamknięciu lub przerwaniu strumienia.
  • close(controller): ta metoda zostanie wywołana, gdy aplikacja zasygnalizuje, że zakończyła zapisywanie fragmentów w strumieniu. Treść powinna robić wszystko, co jest konieczne do sfinalizowania zapisów w podstawowym ujściu i zwolnienia do niego dostępu. Jeśli ten proces jest asynchroniczny, może zwrócić komunikat o powodzeniu lub porażce. Ta metoda jest wywoływana dopiero po pomyślnym zakończeniu wszystkich znajdujących się w kolejce zapisów.
  • abort(reason): ta metoda jest wywoływana, gdy aplikacja zasygnalizuje, że chce nagle zamknąć strumień, i wywoła w jej przypadku błąd. Może wyczyścić wszystkie wstrzymane zasoby, np. close(), ale usługa abort() będzie wywoływana nawet wtedy, gdy zapisy znajdują się w kolejce. Te fragmenty zostaną odrzucone. Jeśli ten proces jest asynchroniczny, może zwrócić komunikat o powodzeniu lub niepowodzeniu. Parametr reason zawiera parametr DOMString, który określa, dlaczego strumień został przerwany.
const writableStream = new WritableStream({
  start(controller) {
    /* … */
  },

  write(chunk, controller) {
    /* … */
  },

  close(controller) {
    /* … */
  },

  abort(reason) {
    /* … */
  },
});

Interfejs WritableStreamDefaultController interfejsu Streams API reprezentuje kontroler, który umożliwia kontrolę stanu elementu WritableStream podczas konfiguracji, gdy jest przesyłanych więcej fragmentów do zapisu lub na końcu zapisu. Podczas tworzenia elementu WritableStream bazowe ujście otrzymuje odpowiednie wystąpienie WritableStreamDefaultController do manipulowania. Element WritableStreamDefaultController ma tylko 1 metodę: WritableStreamDefaultController.error(), która powoduje błędy przy wszystkich przyszłych interakcjach z powiązanym strumieniem. WritableStreamDefaultController obsługuje też właściwość signal, która zwraca wystąpienie AbortSignal, co umożliwia zatrzymanie operacji WritableStream w razie potrzeby.

/* … */
write(chunk, controller) {
  try {
    // Try to do something dangerous with `chunk`.
  } catch (error) {
    controller.error(error.message);
  }
},
/* … */

queuingStrategy

Drugim, również opcjonalnym argumentem konstruktora WritableStream(), jest queuingStrategy. Jest to obiekt, który opcjonalnie definiuje strategię kolejkowania na potrzeby strumienia, która przyjmuje 2 parametry:

  • highWaterMark: nieujemna liczba wskazująca wysoki znak wody w strumieniu, w którym zastosowano tę strategię kolejkowania.
  • size(chunk): funkcja, która oblicza i zwraca skończony nieujemny rozmiar danego fragmentu. Ten wynik służy do określania ciśnienia wstecznego ujawnianego za pomocą odpowiedniej właściwości WritableStreamDefaultWriter.desiredSize.

Metody getWriter() i write()

Aby zapisywać tekst w strumieniu z możliwością zapisu, potrzebujesz autora: WritableStreamDefaultWriter. Metoda getWriter() interfejsu WritableStream zwraca nową instancję WritableStreamDefaultWriter i blokuje strumień do tej instancji. Gdy strumień jest zablokowany, nie można pozyskać innego zapisującego, dopóki bieżący strumień nie zostanie udostępniony.

Metoda write() interfejsu WritableStreamDefaultWriter zapisuje przekazany fragment danych do WritableStream i jego bazowego ujścia, a następnie zwraca obietnicę, która zwraca wartość wskazującą na sukces lub błąd operacji zapisu. Pamiętaj, że „sukces” zależy od ujścia bazowego. Może to oznaczać, że fragment został zaakceptowany, a niekoniecznie to, że został bezpiecznie zapisany w miejscu docelowym.

const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');

Właściwość locked

Aby sprawdzić, czy strumień z możliwością zapisu jest zablokowany, otwórz jego właściwość WritableStream.locked.

const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);

Przykładowy kod strumienia z możliwością zapisu

Przykładowy kod poniżej pokazuje wszystkie kroki w praktyce.

const writableStream = new WritableStream({
  start(controller) {
    console.log('[start]');
  },
  async write(chunk, controller) {
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
  // Wait to add to the write queue.
  await writer.ready;
  console.log('[ready]', Date.now() - start, 'ms');
  // The Promise is resolved after the write finishes.
  writer.write(char);
}
await writer.close();

Przesyłanie czytelnego strumienia do strumienia z możliwością zapisu

Czytelny strumień może zostać przekazany do strumienia z możliwością zapisu za pomocą metody pipeTo() strumienia. ReadableStream.pipeTo() dodaje potok ReadableStream do danego elementu WritableStream i zwraca obietnicę, która realizuje się po pomyślnym zakończeniu procesu potoku lub odrzuca, jeśli wystąpiły błędy.

const readableStream = new ReadableStream({
  start(controller) {
    // Called by constructor.
    console.log('[start readable]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // Called when controller's queue is empty.
    console.log('[pull]');
    controller.enqueue('d');
    controller.close();
  },
  cancel(reason) {
    // Called when the stream is canceled.
    console.log('[cancel]', reason);
  },
});

const writableStream = new WritableStream({
  start(controller) {
    // Called by constructor
    console.log('[start writable]');
  },
  async write(chunk, controller) {
    // Called upon writer.write()
    console.log('[write]', chunk);
    // Wait for next write.
    await new Promise((resolve) => setTimeout(() => {
      document.body.textContent += chunk;
      resolve();
    }, 1_000));
  },
  close(controller) {
    console.log('[close]');
  },
  abort(reason) {
    console.log('[abort]', reason);
  },
});

await readableStream.pipeTo(writableStream);
console.log('[finished]');

Tworzenie strumienia przekształceń

Interfejs TransformStream interfejsu Streams API reprezentuje zbiór danych, które można przekształcić. Strumień przekształcenia tworzysz, wywołując jego konstruktor TransformStream(), który tworzy i zwraca obiekt strumienia przekształcania z podanych modułów obsługi. Konstruktor TransformStream() przyjmuje jako swój pierwszy argument opcjonalny obiekt JavaScript reprezentujący element transformer. Takie obiekty mogą zawierać dowolne z tych metod:

transformer

  • start(controller): ta metoda jest wywoływana natychmiast po utworzeniu obiektu. Zwykle jest to używane do umieszczania fragmentów prefiksu w kolejce za pomocą funkcji controller.enqueue(). Są one odczytywane od strony czytelnej, ale nie zależą od żadnych zapisów dostępnych do zapisu. Jeśli ten początkowy proces jest asynchroniczny, np. dlatego, że uzyskanie fragmentów prefiksu wymaga pewnego wysiłku, funkcja może zwrócić obietnicę zasygnalizowania powodzenia lub niepowodzenia. Odrzucona obietnica spowoduje błąd strumienia. Wszystkie zgłoszone wyjątki zostaną przesłane ponownie przez konstruktor TransformStream().
  • transform(chunk, controller): ta metoda jest wywoływana, gdy nowy fragment pierwotnie zapisany po stronie możliwej do zapisu jest gotowy do przekształcenia. Implementacja strumienia gwarantuje, że ta funkcja zostanie wywołana dopiero po udanym przekształceniu poprzednich i nigdy przed zakończeniem start() ani po wywołaniu funkcji flush(). Ta funkcja wykonuje faktyczne zadanie przekształcania strumienia przekształceń. Może umieścić wyniki w kolejce za pomocą funkcji controller.enqueue(). Zezwala to na użycie jednego fragmentu zapisanego po stronie możliwej do zapisu w celu generowania 0 lub wielu fragmentów po stronie czytelnej w zależności od tego, ile razy funkcja controller.enqueue() jest wywoływana. Jeśli proces przekształcania jest asynchroniczny, funkcja ta może zwrócić obietnicę zasygnalizowania powodzenia lub niepowodzenia przekształcenia. Odrzucona obietnica spowoduje błąd zarówno po stronie strumienia przekształcania, jak i po jej stronie. Jeśli nie jest podana żadna metoda transform(), używane jest przekształcenie tożsamości, które umieszcza fragmenty w kolejce niezmienionej od strony możliwej do zapisu do strony czytelnej.
  • flush(controller): ta metoda jest wywoływana po przekształceniu wszystkich fragmentów zapisanych po stronie możliwej do zapisu przez interfejs transform(), a strona dostępna do zapisu zostanie wkrótce zamknięta. Zwykle służy to do umieszczania fragmentów sufiksu w kolejce po czytelnej stronie, zanim je zamkniemy. Jeśli proces czyszczenia jest asynchroniczny, funkcja może zwrócić obietnicę zasygnalizowania powodzenia lub niepowodzenia. Wynik zostanie przekazany do elementu wywołującego stream.writable.write(). Dodatkowo odrzucona obietnica spowoduje błąd zarówno po stronie strumienia, jak i po jej stronie. Złożenie wyjątku jest traktowane tak samo jak zwrócenie odrzuconej obietnicy.
const transformStream = new TransformStream({
  start(controller) {
    /* … */
  },

  transform(chunk, controller) {
    /* … */
  },

  flush(controller) {
    /* … */
  },
});

Strategie kolejkowania writableStrategy i readableStrategy

Drugi i trzeci opcjonalny parametr konstruktora TransformStream() to opcjonalne strategie kolejkowania writableStrategy i readableStrategy. Są one zdefiniowane odpowiednio w sekcjach strumieni do odczytu i zapisu.

Przykładowy kod strumienia przekształcania

Poniżej znajduje się przykładowy kod pokazujący, jak działa prosty strumień przekształceń.

// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

(async () => {
  const readStream = textEncoderStream.readable;
  const writeStream = textEncoderStream.writable;

  const writer = writeStream.getWriter();
  for (const char of 'abc') {
    writer.write(char);
  }
  writer.close();

  const reader = readStream.getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Przekierowywanie czytelnego strumienia przez strumień przekształcania

Metoda pipeThrough() interfejsu ReadableStream udostępnia łańcuchowy sposób przesyłania bieżącego strumienia przez strumień przekształcania lub dowolną inną parę, którą można zapisać. Przekierowywanie strumienia za pomocą potoku powoduje jego zablokowanie na czas trwania potoku, uniemożliwiając innym czytelnikom zablokowanie go.

const transformStream = new TransformStream({
  transform(chunk, controller) {
    console.log('[transform]', chunk);
    controller.enqueue(new TextEncoder().encode(chunk));
  },
  flush(controller) {
    console.log('[flush]');
    controller.terminate();
  },
});

const readableStream = new ReadableStream({
  start(controller) {
    // called by constructor
    console.log('[start]');
    controller.enqueue('a');
    controller.enqueue('b');
    controller.enqueue('c');
  },
  pull(controller) {
    // called read when controller's queue is empty
    console.log('[pull]');
    controller.enqueue('d');
    controller.close(); // or controller.error();
  },
  cancel(reason) {
    // called when rs.cancel(reason)
    console.log('[cancel]', reason);
  },
});

(async () => {
  const reader = readableStream.pipeThrough(transformStream).getReader();
  for (let result = await reader.read(); !result.done; result = await reader.read()) {
    console.log('[value]', result.value);
  }
})();

Następny przykładowy kod (nieco złożony) pokazuje, jak można wdrożyć „krzyczącą” wersję elementu fetch(), która zapisuje cały tekst wielką literą, wykorzystując zwrócone obiecane odpowiedzi jako strumień i stosując każdą z nich wielką literą. Zaletą tego podejścia jest to, że nie musisz czekać na pobranie całego dokumentu, co może mieć ogromne znaczenie w przypadku dużych plików.

function upperCaseStream() {
  return new TransformStream({
    transform(chunk, controller) {
      controller.enqueue(chunk.toUpperCase());
    },
  });
}

function appendToDOMStream(el) {
  return new WritableStream({
    write(chunk) {
      el.append(chunk);
    }
  });
}

fetch('./lorem-ipsum.txt').then((response) =>
  response.body
    .pipeThrough(new TextDecoderStream())
    .pipeThrough(upperCaseStream())
    .pipeTo(appendToDOMStream(document.body))
);

Pokaz

Poniższa demonstracja przedstawia czytelne i zapisywalne strumienie oraz przekształcanie ich w praktyce. Obejmuje też przykłady łańcuchów potoków pipeThrough() i pipeTo(), a także pokazuje tee(). Możesz opcjonalnie uruchomić prezentację w osobnym oknie lub wyświetlić kod źródłowy.

Przydatne strumienie dostępne w przeglądarce

W przeglądarce możesz znaleźć wiele przydatnych strumieni. Możesz łatwo utworzyć ReadableStream z obiektu blob. Metoda stream() interfejsu Blob zwraca wartość ReadableStream, która po odczytaniu zwraca dane zawarte w obiekcie bloba. Pamiętaj też, że obiekt File to konkretny rodzaj obiektu Blob i można go używać w każdym kontekście dostępnym dla blobów.

const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();

Warianty strumieniowego przesyłania danych TextDecoder.decode() i TextEncoder.encode() są nazywane odpowiednio TextDecoderStream i TextEncoderStream.

const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());

Kompresowanie i dekompresowanie plików jest proste dzięki zastosowaniu CompressionStream i DecompressionStream odpowiednio przekształcania strumieni. Przykładowy kod poniżej pokazuje, jak pobrać specyfikację strumieni, skompresować (gzip) bezpośrednio w przeglądarce i zapisać skompresowany plik bezpośrednio na dysku.

const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));

const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);

Przykłady interfejsu File System Access APIFileSystemWritableFileStream oraz eksperymentalnych fetch()strumieni żądań w strumieniach writable

Interfejs Serial API intensywnie wykorzystuje zarówno strumienie czytelne, jak i zapisy.

// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();

// Listen to data coming from the serial device.
while (true) {
  const { value, done } = await reader.read();
  if (done) {
    // Allow the serial port to be closed later.
    reader.releaseLock();
    break;
  }
  // value is a Uint8Array.
  console.log(value);
}

// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();

I wreszcie WebSocketStream API integruje strumienie z interfejsem WebSocket API.

const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();

while (true) {
  const { value, done } = await reader.read();
  if (done) {
    break;
  }
  const result = await process(value);
  await writer.write(result);
}

Przydatne materiały

Podziękowania

Ten artykuł napisali Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley oraz Adam Rice. Posty na blogu Jake'a Archibalda bardzo pomogły mi w zrozumieniu strumieni. Niektóre przykłady kodu są inspirowane eksploracjami użytkownika GitHuba @bellbind i fragmentami prozy w dużym stopniu bazują na dokumentach internetowych MDN w strumieniach. Autorzy kanału Streams Standard wykonali świetną robotę przy opracowaniu tej specyfikacji. Baner powitalny autorstwa Ryana Lary w filmie Unsplash.