Pelajari cara menggunakan stream yang dapat dibaca, ditulis, dan mentransformasi dengan Streams API.
Dengan Streams API, Anda dapat mengakses secara terprogram aliran data yang diterima melalui jaringan atau dibuat dengan cara apa pun secara lokal dan memprosesnya dengan JavaScript. Streaming berarti memecah resource yang ingin Anda terima, kirim, atau ubah menjadi potongan-potongan kecil, lalu pemrosesan potongan ini sedikit demi sedikit. Meskipun streaming adalah sesuatu yang dilakukan browser saat menerima aset seperti HTML atau video untuk ditampilkan di halaman web, kemampuan ini belum pernah tersedia untuk JavaScript sebelum fetch
dengan streaming diperkenalkan pada tahun 2015.
Sebelumnya, jika ingin memproses jenis resource (baik itu video atau file teks, dll.), Anda harus mendownload seluruh file, menunggu hingga dideserialisasi ke dalam format yang sesuai, lalu memprosesnya. Dengan stream yang tersedia untuk JavaScript, ini semua akan berubah. Anda sekarang dapat memproses data mentah dengan JavaScript secara bertahap segera setelah tersedia di klien, tanpa perlu membuat buffer, string, atau blob. Tindakan ini akan membuka sejumlah kasus penggunaan, beberapa di antaranya saya cantumkan di bawah:
- Efek video: menyisipkan streaming video yang dapat dibaca melalui streaming transformasi yang menerapkan efek secara real time.
- Data (de)kompresi: menghubungkan aliran file melalui stream transformasi yang secara selektif (mende) mengompresinya.
- Dekode gambar: menghubungkan aliran respons HTTP melalui aliran transformasi yang mendekode byte
menjadi data bitmap, lalu melalui aliran transformasi lain yang menerjemahkan bitmap menjadi PNG. Jika diinstal di dalam pengendali
fetch
dari pekerja layanan, Anda dapat mem-polyfill format gambar baru seperti AVIF secara transparan.
Dukungan browser
ReadableStream dan WritableStream
TransformStream
Konsep inti
Sebelum membahas berbagai jenis streaming secara mendetail, saya akan memperkenalkan beberapa konsep inti.
Potongan
Potongan adalah satu bagian dari data yang ditulis atau dibaca dari aliran data. Jenis apa pun; {i>stream<i} bahkan dapat berisi potongan-potongan dari jenis yang berbeda. Sering kali, potongan tidak akan menjadi unit data paling atomik untuk aliran data tertentu. Misalnya, aliran byte mungkin berisi potongan yang terdiri dari 16
unit Uint8Array
KiB, bukan byte tunggal.
Streaming yang dapat dibaca
Aliran data yang dapat dibaca mewakili sumber data yang dapat Anda baca. Dengan kata lain, data keluar dari aliran yang dapat dibaca. Konkretnya, aliran yang dapat dibaca adalah instance class
ReadableStream
.
Feed yang dapat ditulis
Aliran yang dapat ditulis mewakili tujuan untuk data yang dapat Anda tulis. Dengan kata lain, data
masuk ke aliran yang dapat ditulis. Secara konkret, stream yang dapat ditulis adalah instance dari
class WritableStream
.
Mentransformasi streaming
Aliran transformasi terdiri dari sepasang aliran: aliran yang dapat ditulis, yang dikenal sebagai sisi yang dapat ditulis,
dan aliran yang dapat dibaca, yang dikenal sebagai sisi yang dapat dibaca.
Metafora di dunia nyata untuk hal ini adalah penerjemah simultan yang menerjemahkan dari satu bahasa ke bahasa lain dengan cepat.
Dengan cara yang khusus untuk aliran transformasi, menulis
ke sisi yang dapat ditulis akan membuat data baru tersedia untuk dibaca dari
sisi yang dapat dibaca. Secara konkret, objek apa pun dengan properti writable
dan properti readable
dapat berfungsi sebagai aliran transformasi. Namun, class TransformStream
standar mempermudah pembuatan
pasangan yang terjerat dengan tepat.
Rantai pipa
Streaming terutama digunakan dengan menghubungkan streaming tersebut ke satu sama lain. Stream yang dapat dibaca dapat diarahkan langsung ke stream yang dapat ditulis, menggunakan metode pipeTo()
stream yang dapat dibaca, atau dapat disalurkan melalui satu atau beberapa stream transformasi terlebih dahulu, menggunakan metode pipeThrough()
stream yang dapat dibaca tersebut. Kumpulan
aliran yang disalurkan bersama-sama dengan cara ini disebut sebagai rantai pipe.
Tekanan Balik
Setelah rantai pipa dibuat, rantai tersebut akan menyebarkan sinyal terkait seberapa cepat potongan harus mengalir melaluinya. Jika salah satu langkah dalam rantai tersebut belum dapat menerima potongan, langkah tersebut akan menyebarkan sinyal ke belakang melalui rantai pipa, hingga akhirnya sumber asli diberi tahu untuk berhenti menghasilkan potongan dengan begitu cepat. Proses normalisasi aliran ini disebut tekanan balik.
Kaus
Stream yang dapat dibaca dapat di-teed (dinamai berdasarkan bentuk huruf besar 'T') menggunakan metode tee()
.
Tindakan ini akan mengunci aliran data, sehingga tidak dapat digunakan lagi secara langsung; namun, tindakan ini akan membuat dua aliran data baru, yang disebut cabang, yang dapat digunakan secara terpisah.
Teeing juga penting karena streaming tidak dapat diputar ulang atau dimulai ulang, hal ini akan dibahas nanti.
Mekanisme streaming yang dapat dibaca
Aliran yang dapat dibaca adalah sumber data yang direpresentasikan dalam JavaScript oleh objek
ReadableStream
yang
mengalir dari sumber pokok. Konstruktor
ReadableStream()
membuat dan menampilkan objek streaming yang dapat dibaca dari pengendali yang diberikan. Ada dua
jenis sumber dasar:
- Sumber push terus-menerus mengirimkan data saat Anda mengaksesnya, dan terserah Anda untuk memulai, menjeda, atau membatalkan akses ke aliran data. Contohnya mencakup streaming video live, peristiwa yang dikirim server, atau WebSocket.
- Sumber pengambilan mengharuskan Anda untuk secara eksplisit meminta data dari sumber tersebut setelah terhubung. Contohnya mencakup operasi HTTP melalui panggilan
fetch()
atauXMLHttpRequest
.
Data streaming dibaca secara berurutan dalam potongan-potongan kecil yang disebut potongan. Potongan yang ditempatkan dalam aliran data dikatakan diantrekan. Ini berarti mereka sedang menunggu dalam antrean siap untuk dibaca. Antrean internal melacak potongan yang belum dibaca.
Strategi antrean adalah objek yang menentukan cara streaming seharusnya memberikan sinyal tekanan balik berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran ke setiap bagian, dan membandingkan ukuran total bagian dalam antrean dengan angka yang ditentukan, yang dikenal sebagai tanda air tinggi.
Potongan-potongan di dalam streaming tersebut dibaca oleh pembaca. Pembaca ini mengambil data satu per satu, sehingga Anda dapat melakukan jenis operasi apa pun yang ingin dilakukan pada data tersebut. Pembaca dan kode pemrosesan lain yang menyertainya disebut konsumen.
Konstruksi berikutnya dalam konteks ini disebut pengontrol. Setiap aliran data yang dapat dibaca memiliki pengontrol terkait yang, seperti namanya, memungkinkan Anda untuk mengontrol aliran.
Hanya satu pembaca yang dapat membaca streaming pada satu waktu; saat pembaca dibuat dan mulai membaca streaming (yaitu, menjadi pembaca aktif), pembaca akan terkunci pada streaming tersebut. Jika ingin pembaca lain mengambil alih membaca streaming, Anda biasanya perlu melepaskan pembaca pertama sebelum melakukan hal lainnya (meskipun Anda dapat melakukan tee streaming).
Membuat feed yang dapat dibaca
Anda membuat aliran yang dapat dibaca dengan memanggil konstruktornya,
ReadableStream()
.
Konstruktor ini memiliki argumen opsional underlyingSource
, yang mewakili sebuah objek
dengan metode dan properti yang menentukan perilaku dari instance streaming yang dibuat.
underlyingSource
Hal ini dapat menggunakan metode opsional yang ditentukan developer berikut:
start(controller)
: Dipanggil langsung saat objek dibuat. Metode ini dapat mengakses sumber streaming, dan melakukan hal lain yang diperlukan untuk menyiapkan fungsi streaming. Jika proses ini dilakukan secara asinkron, metode ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parametercontroller
yang diteruskan ke metode ini adalahReadableStreamDefaultController
.pull(controller)
: Dapat digunakan untuk mengontrol streaming saat lebih banyak potongan diambil. Metode ini akan dipanggil berulang kali selama antrean internal potongan streaming tidak penuh, hingga antrean mencapai tanda air tinggi. Jika hasil pemanggilanpull()
adalah promise,pull()
tidak akan dipanggil lagi hingga promise tersebut terpenuhi. Jika promise ditolak, streaming akan mengalami error.cancel(reason)
: Dipanggil saat konsumen streaming membatalkan streaming.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
mendukung metode berikut:
ReadableStreamDefaultController.close()
menutup aliran data yang terkait.ReadableStreamDefaultController.enqueue()
mengantrekan potongan tertentu dalam aliran data terkait.ReadableStreamDefaultController.error()
akan menyebabkan interaksi mendatang dengan aliran data yang terkait hingga error.
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
queuingStrategy
Argumen kedua, yang juga bersifat opsional, dari konstruktor ReadableStream()
adalah queuingStrategy
.
Objek ini secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua
parameter:
highWaterMark
: Angka non-negatif yang menunjukkan tanda air tinggi pada aliran menggunakan strategi antrean ini.size(chunk)
: Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai potongan yang diberikan. Hasilnya digunakan untuk menentukan tekanan balik, yang diwujudkan melalui propertiReadableStreamDefaultController.desiredSize
yang sesuai. Ini juga mengatur kapan metodepull()
sumber dasar dipanggil.
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
Metode getReader()
dan read()
Untuk membaca dari aliran yang dapat dibaca, Anda memerlukan alat pembaca, yang akan berupa
ReadableStreamDefaultReader
.
Metode getReader()
antarmuka ReadableStream
membuat pembaca dan mengunci aliran data ke
pembaca. Saat streaming dikunci, tidak ada pembaca lain yang dapat diperoleh hingga pembaca ini dirilis.
Metode read()
dari antarmuka ReadableStreamDefaultReader
menampilkan promise yang memberikan akses ke bagian berikutnya dalam antrean internal aliran data. Fungsi ini memenuhi atau menolak dengan hasil, bergantung pada status
streaming. Berikut ini beberapa kemungkinan yang tersedia:
- Jika ada bagian yang tersedia, promise tersebut akan dipenuhi dengan objek berbentuk
{ value: chunk, done: false }
. - Jika aliran data ditutup, promise akan dipenuhi dengan objek berbentuk
{ value: undefined, done: true }
. - Jika streaming tersebut mengalami error, promise akan ditolak dengan error yang relevan.
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
Properti locked
Anda dapat memeriksa apakah streaming yang dapat dibaca dikunci dengan mengakses
properti
ReadableStream.locked
.
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Contoh kode streaming yang dapat dibaca
Contoh kode di bawah ini menunjukkan semua langkah yang dilakukan. Pertama-tama, buat ReadableStream
bahwa dalam argumen underlyingSource
-nya (yaitu, class TimestampSource
) menentukan metode start()
.
Metode ini memberi tahu controller
aliran data ke
enqueue()
stempel waktu setiap detik selama sepuluh detik.
Terakhir, kode ini akan memberi tahu pengontrol untuk melakukan close()
aliran data. Gunakan streaming ini
dengan membuat pembaca melalui metode getReader()
, lalu memanggil read()
hingga streaming tersebut
menjadi done
.
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
Iterasi asinkron
Memeriksa setiap iterasi loop read()
jika stream-nya adalah done
mungkin bukan API yang paling praktis.
Untungnya, akan segera ada cara yang lebih baik untuk melakukan ini: iterasi asinkron.
for await (const chunk of stream) {
console.log(chunk);
}
Solusi untuk menggunakan iterasi asinkron saat ini adalah dengan menerapkan perilaku dengan polyfill.
if (!ReadableStream.prototype[Symbol.asyncIterator]) {
ReadableStream.prototype[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const {done, value} = await reader.read();
if (done) {
return;
}
yield value;
}
}
finally {
reader.releaseLock();
}
}
}
Menyesuaikan feed untuk feed yang dapat dibaca
Metode tee()
dari antarmuka ReadableStream
mengarahkan aliran yang dapat dibaca saat ini, yang menampilkan array dua elemen
yang berisi dua cabang yang dihasilkan sebagai instance ReadableStream
baru. Hal ini memungkinkan
dua pembaca membaca streaming secara bersamaan. Anda dapat melakukan hal ini, misalnya, pada pekerja layanan jika ingin mengambil respons dari server dan menstreamingnya ke browser, serta melakukan streaming ke cache pekerja layanan. Karena isi respons tidak dapat digunakan lebih dari sekali, Anda memerlukan dua salinan
untuk melakukannya. Untuk membatalkan aliran data, Anda harus membatalkan kedua cabang yang dihasilkan. Pemberian rating pada aliran
umumnya akan menguncinya selama durasi tersebut, sehingga pembaca lain tidak dapat menguncinya.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
Streaming byte yang dapat dibaca
Untuk streaming yang merepresentasikan byte, versi panjang dari streaming yang dapat dibaca disediakan untuk menangani byte secara efisien, khususnya dengan meminimalkan salinan. Dengan streaming byte, pembaca bring your own buffer (BYOB) dapat diperoleh. Implementasi default dapat memberikan berbagai output yang berbeda, seperti string atau buffer array dalam kasus WebSockets, sedangkan streaming byte menjamin output byte. Selain itu, pembaca BYOB memiliki manfaat stabilitas. Hal ini karena jika buffer terlepas, ada jaminan bahwa buffer tersebut tidak akan ditulis ke buffer yang sama dua kali, sehingga menghindari kondisi race. Pembaca BYOB dapat mengurangi jumlah waktu yang diperlukan browser untuk menjalankan pembersihan sampah memori, karena dapat menggunakan kembali buffer.
Membuat aliran byte yang dapat dibaca
Anda dapat membuat aliran byte yang dapat dibaca dengan meneruskan parameter type
tambahan ke
konstruktor ReadableStream()
.
new ReadableStream({ type: 'bytes' });
underlyingSource
Sumber dasar aliran byte yang dapat dibaca diberi ReadableByteStreamController
untuk
dimanipulasi. Metode ReadableByteStreamController.enqueue()
mengambil argumen chunk
yang nilainya
adalah ArrayBufferView
. Properti ReadableByteStreamController.byobRequest
menampilkan permintaan pull BYOB saat ini, atau null jika tidak ada. Terakhir, properti ReadableByteStreamController.desiredSize
akan menampilkan ukuran yang diinginkan untuk mengisi antrean internal aliran data yang dikontrol.
queuingStrategy
Argumen kedua, yang juga bersifat opsional, dari konstruktor ReadableStream()
adalah queuingStrategy
.
Ini adalah objek yang secara opsional menentukan strategi antrean untuk aliran data, yang memerlukan satu
parameter:
highWaterMark
: Jumlah byte non-negatif yang menunjukkan tanda air tinggi pada aliran menggunakan strategi antrean ini. Ini digunakan untuk menentukan tekanan balik, yang diwujudkan melalui propertiReadableByteStreamController.desiredSize
yang sesuai. Ini juga mengatur kapan metodepull()
sumber dasar dipanggil.
Metode getReader()
dan read()
Anda kemudian bisa mendapatkan akses ke ReadableStreamBYOBReader
dengan menetapkan parameter mode
yang sesuai:
ReadableStream.getReader({ mode: "byob" })
. Hal ini memungkinkan kontrol alokasi buffer yang lebih akurat untuk menghindari salinan. Untuk membaca dari aliran byte, Anda perlu memanggil
ReadableStreamBYOBReader.read(view)
, dengan view
sebagai
ArrayBufferView
.
Contoh kode aliran byte yang dapat dibaca
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
Fungsi berikut menampilkan aliran byte yang dapat dibaca yang memungkinkan pembacaan zero-copy yang efisien dari array yang dihasilkan secara acak. Alih-alih menggunakan ukuran potongan 1.024 yang telah ditentukan, fungsi ini mencoba mengisi buffer yang disediakan developer, sehingga memungkinkan kontrol penuh.
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
Mekanisme streaming yang dapat ditulis
Stream yang dapat ditulis adalah tujuan tempat Anda dapat menulis data, yang diwakili dalam JavaScript oleh objek WritableStream
. Fungsi ini
berfungsi sebagai abstraksi di bagian atas sink yang mendasarinya—sink I/O tingkat rendah tempat
data mentah ditulis.
Data ditulis ke streaming melalui writer, satu bagian per satu bagian. Sebuah potongan data dapat memiliki banyak bentuk, sama seperti potongan di pembaca. Anda dapat menggunakan kode apa pun yang diinginkan untuk menghasilkan potongan yang siap ditulis; penulis beserta kode terkait disebut produser.
Saat penulis dibuat dan mulai menulis ke aliran data (penulis aktif), penulis dianggap terkunci pada aliran tersebut. Hanya satu penulis yang dapat menulis ke aliran yang dapat ditulis dalam satu waktu. Jika ingin penulis lain mulai menulis ke streaming, Anda biasanya perlu melepaskannya, sebelum melampirkan penulis lain ke penulis tersebut.
Antrean internal melacak potongan yang telah ditulis ke aliran data, tetapi belum diproses oleh sink yang mendasarinya.
Strategi antrean adalah objek yang menentukan cara streaming seharusnya memberikan sinyal tekanan balik berdasarkan status antrean internalnya. Strategi antrean menetapkan ukuran ke setiap bagian, dan membandingkan ukuran total bagian dalam antrean dengan angka yang ditentukan, yang dikenal sebagai tanda air tinggi.
Konstruksi akhir disebut pengontrol. Setiap streaming yang dapat ditulis memiliki pengontrol terkait yang memungkinkan Anda mengontrol streaming (misalnya, untuk membatalkannya).
Membuat streaming yang dapat ditulis
Antarmuka WritableStream
Streams API menyediakan abstraksi standar untuk menulis data streaming ke tujuan, yang dikenal
sebagai sink. Objek ini dilengkapi dengan backpressure dan antrean bawaan. Anda akan membuat stream yang dapat ditulis dengan
memanggil konstruktornya
WritableStream()
.
Class ini memiliki parameter underlyingSink
opsional, yang mewakili sebuah objek
dengan metode dan properti yang menentukan perilaku instance aliran data yang dibuat.
underlyingSink
underlyingSink
dapat menyertakan metode opsional berikut yang ditentukan developer. Parameter controller
yang diteruskan ke beberapa metode adalah
WritableStreamDefaultController
.
start(controller)
: Metode ini langsung dipanggil saat objek dibuat. Konten metode ini harus ditujukan untuk mendapatkan akses ke sink yang mendasarinya. Jika proses ini dilakukan secara asinkron, proses ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan.write(chunk, controller)
: Metode ini akan dipanggil saat potongan data baru (yang ditentukan dalam parameterchunk
) siap ditulis ke sink yang mendasarinya. Fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan operasi tulis. Metode ini hanya akan dipanggil setelah penulisan sebelumnya berhasil, dan tidak pernah setelah streaming ditutup atau dibatalkan.close(controller)
: Metode ini akan dipanggil jika aplikasi menandakan bahwa aplikasi telah selesai menulis potongan ke aliran data. Konten harus melakukan apa pun yang diperlukan untuk menyelesaikan penulisan ke sink yang mendasarinya, dan melepaskan akses ke dalamnya. Jika proses ini asinkron, proses dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Metode ini hanya akan dipanggil setelah semua operasi tulis yang diantrekan berhasil.abort(reason)
: Metode ini akan dipanggil jika aplikasi memberikan sinyal bahwa aplikasi ingin menutup aliran data secara tiba-tiba dan menempatkannya dalam status error. Tindakan ini dapat membersihkan resource yang ditahan, seperticlose()
, tetapiabort()
akan dipanggil meskipun operasi tulis diantrekan. Potongan tersebut akan dibuang. Jika proses ini asinkron, proses dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan. Parameterreason
berisiDOMString
yang menjelaskan alasan aliran data dibatalkan.
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Antarmuka
WritableStreamDefaultController
dari Streams API mewakili pengontrol yang memungkinkan kontrol atas status WritableStream
selama penyiapan, saat lebih banyak potongan dikirimkan untuk ditulis, atau di akhir penulisan. Saat membuat
WritableStream
, sink yang mendasarinya diberi instance WritableStreamDefaultController
yang sesuai untuk dimanipulasi. WritableStreamDefaultController
hanya memiliki satu metode:
WritableStreamDefaultController.error()
,
yang menyebabkan error pada interaksi mendatang dengan aliran data yang terkait.
WritableStreamDefaultController
juga mendukung properti signal
yang menampilkan instance
AbortSignal
,
sehingga operasi WritableStream
dapat dihentikan jika diperlukan.
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
queuingStrategy
Argumen kedua, yang juga bersifat opsional, dari konstruktor WritableStream()
adalah queuingStrategy
.
Objek ini secara opsional menentukan strategi antrean untuk streaming, yang menggunakan dua
parameter:
highWaterMark
: Angka non-negatif yang menunjukkan tanda air tinggi pada aliran menggunakan strategi antrean ini.size(chunk)
: Fungsi yang menghitung dan menampilkan ukuran non-negatif terbatas dari nilai potongan yang diberikan. Hasilnya digunakan untuk menentukan tekanan balik, yang diwujudkan melalui propertiWritableStreamDefaultWriter.desiredSize
yang sesuai.
Metode getWriter()
dan write()
Untuk menulis ke aliran yang dapat ditulis, Anda memerlukan penulis, yang akan berupa
WritableStreamDefaultWriter
. Metode getWriter()
dari antarmuka WritableStream
akan menampilkan
instance baru WritableStreamDefaultWriter
dan mengunci aliran ke instance tersebut. Saat streaming dikunci, tidak ada penulis lain yang dapat diperoleh hingga penulis saat ini dirilis.
Metode write()
dari antarmuka WritableStreamDefaultWriter
menulis potongan data yang diteruskan ke WritableStream
dan sink yang mendasarinya, lalu menampilkan promise yang di-resolve untuk menunjukkan keberhasilan atau kegagalan operasi tulis. Perlu diperhatikan bahwa
arti "berhasil" bergantung pada sink yang mendasarinya; hal ini mungkin menunjukkan bahwa potongan telah diterima,
dan belum tentu bahwa potongan tersebut telah disimpan dengan aman ke tujuan akhirnya.
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
Properti locked
Anda dapat memeriksa apakah stream yang dapat ditulis dikunci dengan mengakses
properti
WritableStream.locked
-nya.
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
Contoh kode streaming yang dapat ditulis
Contoh kode di bawah ini menunjukkan semua langkah yang dilakukan.
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Menambahkan stream yang dapat dibaca ke stream yang dapat ditulis
Stream yang dapat dibaca dapat diarahkan ke stream yang dapat ditulis melalui metode
pipeTo()
stream yang dapat dibaca.
ReadableStream.pipeTo()
menyalurkan ReadableStream
saat ini ke WritableStream
yang ditentukan dan menampilkan promise yang terpenuhi saat proses pemipaan berhasil diselesaikan, atau menolak jika terjadi error.
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
Membuat stream transformasi
Antarmuka TransformStream
Streams API mewakili sekumpulan data yang dapat ditransformasi. Anda
akan membuat aliran transformasi dengan memanggil TransformStream()
konstruktornya, yang membuat dan menampilkan
objek aliran transformasi dari pengendali yang diberikan. Konstruktor TransformStream()
menerima objek JavaScript opsional yang mewakili transformer
sebagai argumen pertamanya. Objek tersebut dapat
berisi salah satu metode berikut:
transformer
start(controller)
: Metode ini langsung dipanggil saat objek dibuat. Biasanya ini digunakan untuk mengantrekan potongan awalan, menggunakancontroller.enqueue()
. Potongan tersebut akan dibaca dari sisi yang dapat dibaca, tetapi tidak bergantung pada penulisan ke sisi yang dapat ditulis. Jika proses awal ini asinkron, misalnya karena memerlukan upaya untuk memperoleh potongan awalan, fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan; promise yang ditolak akan melakukan error pada streaming. Setiap pengecualian yang ditampilkan akan ditampilkan ulang oleh konstruktorTransformStream()
.transform(chunk, controller)
: Metode ini dipanggil saat potongan baru yang awalnya ditulis ke sisi yang dapat ditulis siap untuk diubah. Implementasi streaming menjamin bahwa fungsi ini hanya akan dipanggil setelah transformasi sebelumnya berhasil, dan belum pernah sebelumstart()
selesai atau setelahflush()
dipanggil. Fungsi ini melakukan tugas transformasi yang sebenarnya dari aliran transformasi. Dapat mengantrekan hasil menggunakancontroller.enqueue()
. Tindakan ini mengizinkan satu bagian yang ditulis ke sisi yang dapat ditulis menghasilkan nol atau beberapa potongan pada sisi yang dapat dibaca, bergantung pada frekuensicontroller.enqueue()
dipanggil. Jika proses transformasi bersifat asinkron, fungsi ini dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan transformasi. Promise yang ditolak akan menimbulkan error pada sisi aliran transformasi yang dapat dibaca dan ditulis. Jika tidak ada metodetransform()
yang diberikan, transformasi identitas akan digunakan, yang mengantrekan potongan yang tidak berubah dari sisi yang dapat ditulis ke sisi yang dapat dibaca.flush(controller)
: Metode ini dipanggil setelah semua bagian yang ditulis ke sisi yang dapat ditulis diubah dengan berhasil melewatitransform()
, dan sisi yang dapat ditulis akan ditutup. Biasanya digunakan untuk mengantrekan potongan akhiran ke sisi yang dapat dibaca, sebelum itu juga ditutup. Jika proses penghapusan bersifat asinkron, fungsi dapat menampilkan promise untuk menandakan keberhasilan atau kegagalan; hasilnya akan disampaikan kepada pemanggilstream.writable.write()
. Selain itu, promise yang ditolak akan menimbulkan error pada sisi streaming yang dapat dibaca dan ditulis. Menampilkan pengecualian akan diperlakukan sama seperti menampilkan promise yang ditolak.
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
Strategi antrean writableStrategy
dan readableStrategy
Parameter opsional kedua dan ketiga dari konstruktor TransformStream()
adalah strategi antrean writableStrategy
dan readableStrategy
opsional. Semuanya didefinisikan seperti yang diuraikan dalam
bagian aliran yang dapat dibaca dan
dapat ditulis.
Mentransformasi contoh kode streaming
Contoh kode berikut menunjukkan cara kerja streaming transformasi sederhana.
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Menambahkan stream yang dapat dibaca melalui stream transformasi
Metode pipeThrough()
dari antarmuka ReadableStream
menyediakan cara berantai untuk menyalurkan aliran saat ini
melalui aliran transformasi atau pasangan lainnya yang dapat ditulis/dibaca. Penyertaan aliran data umumnya akan menguncinya
selama durasi pipe, sehingga pembaca lain tidak akan menguncinya.
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Contoh kode berikutnya (sedikit rumit) menunjukkan cara menerapkan versi "teriakan" fetch()
yang menulis semua teks dalam huruf besar dengan menggunakan promise respons yang ditampilkan
sebagai stream
dan menuliskan huruf besar per bagian. Keuntungan dari pendekatan ini adalah Anda tidak perlu menunggu seluruh dokumen didownload, sehingga dapat membuat perbedaan besar saat menangani file berukuran besar.
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
Demo
Demo di bawah ini menunjukkan cara kerja stream yang dapat dibaca, ditulis, dan mentransformasikan. Contoh ini juga menyertakan contoh
rantai pipe pipeThrough()
dan pipeTo()
, serta menunjukkan tee()
. Jika ingin, Anda dapat menjalankan demo di jendelanya sendiri atau melihat kode sumber.
Streaming bermanfaat tersedia di browser
Ada sejumlah streaming berguna yang dibangun langsung ke dalam browser. Anda dapat dengan mudah membuat
ReadableStream
dari blob. Metode stream() antarmuka Blob
menampilkan ReadableStream
yang, setelah dibaca, menampilkan data yang terdapat dalam blob. Ingat juga bahwa objek File
adalah jenis Blob
tertentu, dan dapat digunakan dalam konteks apa pun yang dapat dilakukan oleh blob.
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
Varian streaming TextDecoder.decode()
dan TextEncoder.encode()
masing-masing disebut
TextDecoderStream
dan
TextEncoderStream
.
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream());
Mengompresi atau mendekompresi file dapat dilakukan dengan mudah menggunakan
aliran transformasi
CompressionStream
dan
DecompressionStream
. Contoh kode di bawah ini menunjukkan cara mendownload spesifikasi Streams, mengompresi (gzip) langsung di browser, dan menulis file yang dikompresi langsung ke disk.
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
FileSystemWritableFileStream
File System Access API
dan streaming permintaan fetch()
eksperimental adalah
contoh streaming yang dapat ditulis di mana pun.
Serial API memanfaatkan banyak streaming yang dapat dibaca dan ditulis.
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
Terakhir, WebSocketStream
API mengintegrasikan stream dengan WebSocket API.
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
Referensi yang berguna
- Spesifikasi stream
- Demo pengiring
- Polyfill stream
- 2016—tahun aliran data web
- Ierator dan generator asinkron
- Visualizer Streaming
Ucapan terima kasih
Artikel ini ditinjau oleh Jake Archibald, François Beaufort, Sam Dutton, Mattias Buelens, Surma, Joe Medley, dan Adam Rice. Postingan blog Jake Archibald telah banyak membantu saya dalam memahami aliran data. Beberapa contoh kode terinspirasi oleh eksplorasi @bellbind pengguna GitHub dan bagian dari prosa yang banyak dibangun di MDN Web Docs on Streams. Penulis Streams Standard telah melakukan pekerjaan yang luar biasa dalam menulis spesifikasi ini. Banner besar oleh Ryan Lara di Unsplash.