Streams 权威指南
(给大前端技术之路加星标,提升前端技能)
Streams API 让你可以用 JavaScript 以编程的方式处理网络接收的,或通过任何本地方式创建的数据流。流式处理包括将要接收、发送或转换的资源分解为小块(chunk),然后逐块处理这些小块。
尽管浏览器在在接收 HTML 或视频等资源时就在做流处理了,但在 2015 年推出 Streams API 之前,JavaScript 无法使用这些功能.
从技术上讲,XMLHttpRequest是可以实现流媒体的,但 实在不好用[1].
以前,如果要处理某种资源(视频或文本文件等),必须下载整个文件,等待将其反序列化为合适的格式,然后进行处理。随着JavaScript可以使用流,这一切都发生了变化。现在,原始数据一旦可用,就可以使用JavaScript进行逐步处理,而不需要生成buffer、string或blob。这将解锁许多用例,比如:
视频特效: 用管道(pipe)把一个可读的视频流连接到一个可以添加实时特效的转换流 数据解压/加压缩: 用管道(pipe)把一个文件流连接到一个可以有选择地(解)压缩的转换流。 图片解码: 用管道把一个HTTP响应流连接到一个可以将字节解码为位图数据的转换流,然后在连接到一个可以将位图转换为PNG转换流。如果是安装在service worker的“fetch”中,这允许你透明地填充新的图像格式,如AVIF。
核心概念
在详细介绍各种类型的流之前,让我先介绍一些核心概念。
Chunks
Chunks 是写入流或从流中读取的单个数据。它可以是任何类型;流甚至可以包含很多不同类型的Chunk。大多数情况下,对一个流,chunk不是最原子的数据单元。例如,字节流可能包含由16 KiBUint8Array
单元组成的块,而不是单个字节。
可读流
可读流表示一个可以从中读取的数据源。换句话说,数据来自可读流。具体地说,可读流是ReadableStream类的实例。
转换流
转换流由一对流组成:可写流(称为其可写端)和可读流(称为其可读端)。一个真实的比喻是,一个即时从一种语言翻译到另一种语言的同声传译员。对于转换流来说就是,向可写侧写入导致新数据可用于从可读侧读取。具体地说,任何具有可写属性和可读属性的对象都可以用作转换流。然而,标准的TransformStream类使得创建这样一对象变得更容易。
管道链(Pipe chains)
流主要通过管道相互连接来使用。可读流可以使用其pipeTo()
方法直接通过管道传输到可写流,也可以使用可读流的pipeThrough()
方法通过一个或多个转换流进行管道传输。以这种方式连接在一起的一组流称为管道链。
背压(Backpressure)
一旦一个管道链被构建,它将传播关于Chunks应该以多快的速度流过的信号。如果链中的任何一步还不能接收块,它就会通过管道链向后传播一个信号,直到最终原始源被告知停止这么快地生成chunks。这种normalizing flow的过程称为背压。
Teeing
可读流可以使用tee()
方法进行tee操作(以大写“T”的形状命名,一个入口两个出口)。这将锁定流,使其不再直接可用;但是,它将创建两个新流,称为分支,可以独立使用。Teeing也很重要,因为流不能倒带或重新启动,稍后将详细介绍。
可读流的机制
一个可读流是一个数据源,在JavaScript中由一个从底层源流出的`ReadableStream`[2]对象表示。ReadableStream
构造函数从给定的handlers中创建并返回一个可读的流对象。有两种类型的底层源:
推送(push)源当你访问它是,会不断的向你推送数据,你可以开始、暂停或取消对流的访问。示例包括实时视频流、server-sent events或WebSockets。 拉取(pull)源 要求你在连接到它们时显式地请求数据。例子包括通过 fetch()
或XMLHttpRequest
调用的HTTP操作。
流数据以称为chunk的小块顺序读取。放置在流中的块称为进入队列。这意味着它们正在队列中等待被读取。内部队列会跟踪尚未读取的数据块。
队列策略是一个能流内部队列的状态,决定流应该如何发出Backpressure信号的对象。排队策略为每个chunk分配一个大小,并将队列中所有chunk的总大小与一个指定的数字(称为高水位标记high water mark)进行比较。
流中的块由reader读取。这个reader一次检索一个chunk,允许你对数据进行各种类型的操作。reader加上与之相伴的处理逻辑即被称为消费者(consumer)。
还有一个构件叫做控制器(controller)。每个可读流都有一个关联的控制器,顾名思义,它允许你控制流。
一次只能有一个reader读取流;当一个reader被创建并开始读取一个流(也就是说,成为一个活动的reader)时,它就被锁定在这个流上。如果你想让另一个reader接管你的流,你通常需要先释放第一个reader(尽管你可以tee流)。
创建一个可读流
你可以通过调用 `ReadableStream()`[3]构造函数创建一个可读流。这个构造函数有一个可选参数 underlyingSource
, 它表示一个流实例将如何表现的方法和属性。
The underlyingSource
可以使用以下的可选方法,由开发人员自定义:
start(controller)
: 在对象被构造的时立刻被调用。该方法可以访问流的源,并执行设置流功能所需的任何其他操作。如果这个过程是异步完成的,该方法可以返回一个promise来表示成功或失败.controller
参数是一个`ReadableStreamDefaultController`[4].pull(controller)
: 可以用于在获取chunks时控制流。只要流的内部chunks队列没有满,它就会被反复调用,直到队列达到最高水位。如果调用pull()
的结果是一个promise,那么pull()
将不会被再次调用,直到该promise fulfills。如果promise被reject,流就会出错。cancel(reason)
: 当流的消费者取消流时被调用.
const readableStream = new ReadableStream({
start(controller) {
/* … */
},
pull(controller) {
/* … */
},
cancel(reason) {
/* … */
},
});
ReadableStreamDefaultController
支持下面的方法:
`ReadableStreamDefaultController.close()`[5] 关闭相关的流。 `ReadableStreamDefaultController.enqueue()`[6] 在关联的流中入队一个给定的块 `ReadableStreamDefaultController.error()`[7]让之后与相关流的任何交互出错
/* … */
start(controller) {
controller.enqueue('The first chunk!');
},
/* … */
The queuingStrategy
queuingStrategy
是`ReadableStream()`[8]构造函数的第二个参数,也是可以选的。它是一个对象,可选地定义流的队列策略,它有两个属性:
highWaterMark
: 一个非负数,表示使用此队列策略的流的高水位标记。size(chunk)
: 计算并返回给定chunk大小的函数。结果用于确定backpressure,通过ReadableStreamDefaultController.desiredSize属性显示。它还控制何时调用underlying source的pull()
方法。
const readableStream = new ReadableStream({
/* … */
},
{
highWaterMark: 10,
size(chunk) {
return chunk.length;
},
},
);
你可以自定义
queuingStrategy
, 或者使用 `ByteLengthQueuingStrategy`[9] 或 `CountQueuingStrategy`[10] 的实例. 如果没有queuingStrategy
提供,默认使用highWaterMark
为1
的CountQueuingStrategy
.
getReader()
和 getReader()
方法
要从可读流中读取,你需要一个reader,它将是一个reader`ReadableStreamDefaultReader`[11]。ReadableStream
接口的getReader()
方法方法创建一个reader并将流锁定到它。当流被锁定时,在释放之前不能获取其他reader。
ReadableStreamDefaultReader
的`read()`[12]方法返回一个提供访问内部队列中下一个chunk的promise。它会根据stream的状态 fulfill或者reject。会有以下几种可能:
chunk 可有,promise 会成功返回一个对象: { value: chunk, done: false }
.如果流已经关闭, promise 会成功返回一个对象: { value: undefined, done: true }
.如果流出错了, promise 会以一个相关的error被reject。
const reader = readableStream.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log('The stream is done.');
break;
}
console.log('Just read a chunk:', value);
}
locked
属性
你可以访问`ReadableStream.locked`[13] 属性来检查可读流是否被锁了。
const locked = readableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
可读流代码示例
下面的代码示例展示了所有的实际操作步骤。首先创建一个ReadableStream
,其underlyingSource
参数(即TimestampSource
类)中定义了一个start()方法。这个方法通知controller
在十秒内每秒钟enqueue()一个时间戳(到队列中)。最后,通知控制器关闭流。你可以用getReader()
创建一个reader,然后一直调用read()
直到流已经done
。
class TimestampSource {
#interval
start(controller) {
this.#interval = setInterval(() => {
const string = new Date().toLocaleTimeString();
// Add the string to the stream.
controller.enqueue(string);
console.log(`Enqueued ${string}`);
}, 1_000);
setTimeout(() => {
clearInterval(this.#interval);
// Close the stream after 10s.
controller.close();
}, 10_000);
}
cancel() {
// This is called if the reader cancels.
clearInterval(this.#interval);
}
}
const stream = new ReadableStream(new TimestampSource());
async function concatStringStream(stream) {
let result = '';
const reader = stream.getReader();
while (true) {
// The `read()` method returns a promise that
// resolves when a value has been received.
const { done, value } = await reader.read();
// Result objects contain two properties:
// `done` - `true` if the stream has already given you all its data.
// `value` - Some data. Always `undefined` when `done` is `true`.
if (done) return result;
result += value;
console.log(`Read ${result.length} characters so far`);
console.log(`Most recently read chunk: ${value}`);
}
}
concatStringStream(stream).then((result) => console.log('Stream complete', result));
异步迭代
在一个循环迭代中检测每个read()
的done
并不太方便。幸运的是讲有一个更好的方法做这:异步迭代
for await (const chunk of stream) {
console.log(chunk);
}
使用异步迭代的一种变通方法是使用helper函数实现特定功能。下面的代码就能让你使用这个特性:
function streamAsyncIterator(stream) {
// Get a lock on the stream:
const reader = stream.getReader();
return {
next() {
// Stream reads already resolve with {done, value}, so
// we can just call read:
return reader.read();
},
return() {
// Release the lock if the iterator terminates.
reader.releaseLock();
return {};
},
// for-await calls this on whatever it's passed, so
// iterators tend to return themselves.
[Symbol.asyncIterator]( "Symbol.asyncIterator") {
return this;
},
};
}
async function example() {
const response = await fetch(url);
for await (const chunk of streamAsyncIterator(response.body)) {
console.log(chunk);
}
}
Tee一个可读流
ReadableStream
接口的`tee()`[14] 方法可以对当前流进行tee操作,返回一个长度为2的数组,其中表示新分支的两个ReadableStream
实例。这允许两个reader同时读取一个流。比如,在service worker中可以这么用,你想从服务器获取响应并将其流发送到浏览器,但也可以将其流发送到service worker缓存。由于response 的body不能被重复消费,因此需要两个副本来完成此操作。要取消流,你得要取消两个产生的分支。Tee操作在此期间会产生一个锁,防止其他reader锁定它。
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called `read()` when the controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
// Create two `ReadableStream`s.
const [streamA, streamB] = readableStream.tee();
// Read streamA iteratively one by one. Typically, you
// would not do it this way, but you certainly can.
const readerA = streamA.getReader();
console.log('[A]', await readerA.read()); //=> {value: "a", done: false}
console.log('[A]', await readerA.read()); //=> {value: "b", done: false}
console.log('[A]', await readerA.read()); //=> {value: "c", done: false}
console.log('[A]', await readerA.read()); //=> {value: "d", done: false}
console.log('[A]', await readerA.read()); //=> {value: undefined, done: true}
// Read streamB in a loop. This is the more common way
// to read data from the stream.
const readerB = streamB.getReader();
while (true) {
const result = await readerB.read();
if (result.done) break;
console.log('[B]', result);
}
创建一个可读字节流
在ReadableStream()
构造函数中传递一个type
参数既可以创建一个字节流
new ReadableStream({ type: 'bytes' });
The underlyingSource
可读字节流的底层源被給予一个ReadableByteStreamController
用来操作。ReadableByteStreamController.enqueue()
方法接受一个ArrayBufferView
类的chunk
作为参数。ReadableByteStreamController.byobRequest
返回当前的BYOB("bring your own buffer") pull 请求,如果没有则为null。最后,ReadableByteStreamController.desiredSize
属性返回填充stream内部队列的期望大小。
The queuingStrategy
ReadableStream()
构造函数的第二个同样是可选的参数是queuingStrategy
。它是一个对象,可选地定义流的排队策略,它有一个参数:
highWaterMark
:个非负的字节数,表示使用此排队策略的流的高水位标记。这用于判定backpressure,通过ReadableByteStreamController.desiredSize
属性显示。它还控制何时调用底层源的pull()方法
与其他流类型的排队策略不同,可读字节流的排队策略没有
size(chunk)
函数。每个块的大小总是由其byteLength
属性决定的。
如果没有提供queuingStrategy,则默认使用highWaterMark为0的策略。
getReader()
and read()
你可以通过相应地设置mode参数来访问ReadableStreamBYOBReader
: ReadableStream.getReader({mode: "byob"})
。这允许对缓冲区分配进行更精确的控制,以避免复制。要从字节流中读取,你需要调用ReadableStreamBYOBReader.read(view)
,其中view是一个ArrayBufferView
。
可读字节流示例
const reader = readableStream.getReader({ mode: "byob" });
let startingAB = new ArrayBuffer(1_024);
const buffer = await readInto(startingAB);
console.log("The first 1024 bytes, or less:", buffer);
async function readInto(buffer) {
let offset = 0;
while (offset < buffer.byteLength) {
const { value: view, done } =
await reader.read(new Uint8Array(buffer, offset, buffer.byteLength - offset));
buffer = view.buffer;
if (done) {
break;
}
offset += view.byteLength;
}
return buffer;
}
下面的函数返回可读的字节流,它允许对随机生成的数组进行有效的零拷贝读取。它没有使用预先确定的1024 chunk 大小,而是尝试填充开发人员提供的buffer,从而允许完全的控制。
const DEFAULT_CHUNK_SIZE = 1_024;
function makeReadableByteStream() {
return new ReadableStream({
type: 'bytes',
pull(controller) {
// Even when the consumer is using the default reader,
// the auto-allocation feature allocates a buffer and
// passes it to us via `byobRequest`.
const view = controller.byobRequest.view;
view = crypto.getRandomValues(view);
controller.byobRequest.respond(view.byteLength);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
});
}
可写流的机制
可写流是一个可以写入数据的目的地,在JavaScript中由一个`WritableStream`[15] 对象表示。其作为一个写入原始数据的底层IO sink的抽象。
数据通过writer一次一个chunk写入流。chunk可以有多种形式,就像reader中的一样。你可以使用任何逻辑来生成准备编写的chunk。writer加上相关逻辑称为生产者(producer)。
当一个writer被创建并开始写入流(即一个活动的writer)时,它就被锁定在流上。一次只能有一个writer向可写流写入数据。如果你希望另一个writer开始写入流,通常需要先释放当前的writer。
一个内部队列跟踪已写入流但底层接收器尚未处理的chunk。
最后一个组件是控制器(controller),每个可写流都有一个关联的控制器,允许你控制流(例如,中止流)。
创建一个可写流
Streams API的 WritableStream
接口提供了将流数据写入目标(称为sink)的标准抽象。这个对象中内置背压与队列。你可以通过调用 WritableStream()
来创建一个写流。它有一个可选的underlyingSink参数,一个定义流实例将如何表现的对象。
The underlyingSink
underlyingSink
包括下面几个可选的、用户自定义的方法。`WritableStreamDefaultController`[16]类型的controller
会被作为参数传入部分方法中。
start(controller)
: 在对象被构造时立即调用此方法。该方法的内容应该以访问底层sink为目标。如果这个过程是异步完成的,它可以返回一个promise来表示成功或失败。write(chunk, controller)
: 这个方法会在有新的chunk可被写入底层sink时被调用。它可以返回一个promise来表示写操作的成功或失败。此方法只在先前的写操作成功后才被调用,并且不会在流关闭或中止后调用。close(controller)
: 这个方法会在已经完成了流的写入后被调用。这个方法应该完成对底层sink写操作收尾工作,并释放对它的访问。如果这个进程是异步的,它可以返回一个promise来表示成功或失败。这个方法只有在队列中所有写操作都成功之后才会被调用。abort(reason)
: 应用程序希望突然关闭流并将其置于出错状态时,该方法会被调用。它可以清理任何持有的资源,这很像close(),但是队列中还有写操作排队,abort()也会被调用。这些队列中的chunks就会被扔掉。如果这个进程是异步的,它可以返回一个promise来表示成功或失败。reason参数包含一个DOMString
,描述为什么流被中止。
const writableStream = new WritableStream({
start(controller) {
/* … */
},
write(chunk, controller) {
/* … */
},
close(controller) {
/* … */
},
abort(reason) {
/* … */
},
});
Streams API的 WritableStreamDefaultController
接口表示一个在设置期间、chunk被提交写入期间,或在写入结束期间控制WritableStream
的状态的控制器。在构造WritableStream
时,会给底层sink一个相应的WritableStreamDefaultController
实例以进行操作。WritableStreamDefaultController
只有一个方法 WritableStreamDefaultController.error()
会让以后的流操作出错:
/* … */
write(chunk, controller) {
try {
// Try to do something dangerous with `chunk`.
} catch (error) {
controller.error(error.message);
}
},
/* … */
The queuingStrategy
WritableStream()
构造函数第二个参数同样也是可选的。它是一个对象,可选地定义流的排队策略,它有两个参数:
highWaterMark
:一个非负数,表示使用此排队策略的流的高水位标记。size(chunk)
: 一个计算并返回一个给定chunk的大小。这个结果会被用来判断背压(backpressure),通过的WritableStreamDefaultWriter.desiredSize
属性暴露。
你可以自定义一个
queuingStrategy
, 或者使用 `ByteLengthQueuingStrategy`[17] 或 `CountQueuingStrategy`[18] 的实例. 如果没有queuingStrategy
提供,默认使用highWaterMark
为1
的CountQueuingStrategy
getWriter()
和write()
方法
要写入可写流,你需要一个writer,它将是一个WritableStreamDefaultWriter
。WritableStream
接口的getWriter()方法返回一个WritableStreamDefaultWriter的新实例,并锁定该实例的流。当流被锁定时,在释放当前的writer之前,不能获取其他writer。
`WritableStreamDefaultWriter`[19] 接口的`write()`[20]方法将传递的数据块写入WritableStream及其底层sink,然后返回一个promise,该promise将解析以指示写操作的成功或失败。请注意,“成功”的含义取决于底层sink,它可能只表示chunk已被接受,而不一定是chunk已安全保存到最终目的地。
const writer = writableStream.getWriter();
const resultPromise = writer.write('The first chunk!');
locked
属性
你可以通过访问可写流的`WritableStream.locked`[21]来检查可写流是否被锁定。
const locked = writableStream.locked;
console.log(`The stream is ${locked ? 'indeed' : 'not'} locked.`);
可写流代码示例
这个代码暂时了所有的实际操作。
const writableStream = new WritableStream({
start(controller) {
console.log('[start]');
},
async write(chunk, controller) {
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
const writer = writableStream.getWriter();
const start = Date.now();
for (const char of 'abcdefghijklmnopqrstuvwxyz') {
// Wait to add to the write queue.
await writer.ready;
console.log('[ready]', Date.now() - start, 'ms');
// The Promise is resolved after the write finishes.
writer.write(char);
}
await writer.close();
Pipe可读流到可写流
通过`pipeTo()`[22]可以将可读流pipe到可写流。ReadableStream.pipeTo()
方法把当前的``ReadableStreampipe到一个指定的
WritableStream`并且返回一个promise来表明成功或者失败。
const readableStream = new ReadableStream({
start(controller) {
// Called by constructor.
console.log('[start readable]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// Called when controller's queue is empty.
console.log('[pull]');
controller.enqueue('d');
controller.close();
},
cancel(reason) {
// Called when the stream is canceled.
console.log('[cancel]', reason);
},
});
const writableStream = new WritableStream({
start(controller) {
// Called by constructor
console.log('[start writable]');
},
async write(chunk, controller) {
// Called upon writer.write()
console.log('[write]', chunk);
// Wait for next write.
await new Promise((resolve) => setTimeout(() => {
document.body.textContent += chunk;
resolve();
}, 1_000));
},
close(controller) {
console.log('[close]');
},
abort(reason) {
console.log('[abort]', reason);
},
});
await readableStream.pipeTo(writableStream);
console.log('[finished]');
创建一个转换流
Streams API的TransformStream
接口表示一组可转换的数据。通过调用转换流的构造函数TransformStream()
来创建转换流,该构造函数从给定的handler中创建并返回转换流对象。TransformStream()
接受一个可选的transformer
对象。该对象可以包含以下任何一种方法:
The transformer
start(controller)
: 在构造对象时立即调用此方法。通常使用controller.enqueue()
来入队一些前缀chunk。这些块将从读端读取,但不依赖于对写端的任何写入。如果这个初始过程是异步的,例如,因为需要花费一些步骤来获取前缀chunk,函数可以返回一个promise来表示成功或失败。任何抛出的异常都将由TransformStream()
构造函数重新抛出。transform(chunk, controller)
:当新的chunk准备转换时,该方法会被调用。流的实现保证这个函数只会在前面的转换成功之后被调用,并且不会在start()完成之前或flush()被调用之后被调用。这个函数执行转换流的实际转换工作。它可以使用controller.enqueue()把结果进行入队。这就允许了从写端单个chunk可能会在读端参数0个或者多个chunk,取决于你掉了``controller.enqueue()多少次。如果转换的过程是异步的,这个函数可以返回一个promise来表示转换的成功或失败。被reject的promise会让转换流的可读和可写端都出错。如果没有提供
transform()`方法,则使用identity转换,chunk不变的从可写端队列到可读端。flush(controller)
:这个方法会在所有的chunk都成功的通过transform()
方法处理后,写端也准备关闭时被调用。通常,这被用于在读端关闭之前将队列后缀chunks插入到可读端。如果flush过程是异步的,函数可以返回一个promise来表示成功或失败;其结果会被通知到stream.writable.write()
的调用者。被reject的promise会让转换流的可读和可写端都出错。抛出异常被视为同等于返回被拒绝的promise。
const transformStream = new TransformStream({
start(controller) {
/* … */
},
transform(chunk, controller) {
/* … */
},
flush(controller) {
/* … */
},
});
writableStrategy
和readableStrategy
TransformStream()
构造函数的第二个和第三个参数也都是可选的,分别是writableStrategy
和readableStrategy
。这2个定于已经在可读流与可写流的章节中描述过了。
转换流代码示例
// Note that `TextEncoderStream` and `TextDecoderStream` exist now.
// This example shows how you would have done it before.
const textEncoderStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
(async () => {
const readStream = textEncoderStream.readable;
const writeStream = textEncoderStream.writable;
const writer = writeStream.getWriter();
for (const char of 'abc') {
writer.write(char);
}
writer.close();
const reader = readStream.getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
Pipe可读流到转换流
ReadableStream
接口的 `pipeThrough()`[23] 方法提供了一种链式的pipe可读流到转换流的功能。pipe操作通常会在管道运行期间锁定它,以防止其他reader锁定流。
const transformStream = new TransformStream({
transform(chunk, controller) {
console.log('[transform]', chunk);
controller.enqueue(new TextEncoder().encode(chunk));
},
flush(controller) {
console.log('[flush]');
controller.terminate();
},
});
const readableStream = new ReadableStream({
start(controller) {
// called by constructor
console.log('[start]');
controller.enqueue('a');
controller.enqueue('b');
controller.enqueue('c');
},
pull(controller) {
// called read when controller's queue is empty
console.log('[pull]');
controller.enqueue('d');
controller.close(); // or controller.error();
},
cancel(reason) {
// called when rs.cancel(reason)
console.log('[cancel]', reason);
},
});
(async () => {
const reader = readableStream.pipeThrough(transformStream).getReader();
for (let result = await reader.read(); !result.done; result = await reader.read()) {
console.log('[value]', result.value);
}
})();
下一个代码示例(有点人为设计的)展示了如何实现fetch()的“shouting”版本,通过将返回的响应promise作为一个流[24] ,一个chunk一个chunk地将所有字母变成大写。这种方法的优点是,你不需要等待整个文档被下载,这在处理大文件时可能会产生巨大的差异。
function upperCaseStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
},
});
}
function appendToDOMStream(el) {
return new WritableStream({
write(chunk) {
el.append(chunk);
}
});
}
fetch('./lorem-ipsum.txt').then((response) =>
response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(upperCaseStream())
.pipeTo(appendToDOMStream(document.body))
);
浏览器的支持和polyfill
浏览器对Streams API的支持各不相同。请确保查看 Can I use[25] 的详细兼容性数据。注意,有些浏览器只实现了某些特性的部分实现,所以一定要彻底检查清楚。
好消息是,有一个可用的 参考实现[26]和一个针对生产环境的polyfill[27] 。
Demo
下面的demo展示了可读、可写和转换流的实际应用。它还包括了pipeThrough()
和pipeTo()
管道链的例子,还演示了tee()
。你可以在新窗口中运行这个demo[28] 或查看source code[29].
浏览器中的流
浏览器中内置了许多有用的流。你可以轻松地从`Blob`[30] 创建一个可读流。Blob接口的stream()[31]方法返回一个可读流,读取时返回Blob中包含的数据。还记得吗,`File`[32]对象是一种特定类型的Blob,可以在Blob可以使用的任何上下文中使用。
const readableStream = new Blob(['hello world'], { type: 'text/plain' }).stream();
TextDecoder.decode()
和TextEncoder.encode()
的流的变体分别被称为`TextDecoderStream`[33]和`TextEncoderStream`[34]。
const response = await fetch('https://streams.spec.whatwg.org/');
const decodedStream = response.body.pipeThrough(new TextDecoderStream())
使用`CompressionStream`[35]和`DecompressionStream`[36]转换流可以轻松地压缩或解压缩文件。下面的代码示例展示了如何下载Streams spec,在浏览器中对其进行压缩(gzip),并将压缩文件直接写入硬盘。
const response = await fetch('https://streams.spec.whatwg.org/');
const readableStream = response.body;
const compressedStream = readableStream.pipeThrough(new CompressionStream('gzip'));
const fileHandle = await showSaveFilePicker();
const writableStream = await fileHandle.createWritable();
compressedStream.pipeTo(writableStream);
File System Access API[37]'s `FileSystemWritableFileStream`[38] 和实验性的 `fetch()` 请求流[39] 都是可写流的例子。
Serial API[40]大量使用了可读流和可写流。
// Prompt user to select any serial port.
const port = await navigator.serial.requestPort();
// Wait for the serial port to open.
await port.open({ baudRate: 9_600 });
const reader = port.readable.getReader();
// Listen to data coming from the serial device.
while (true) {
const { value, done } = await reader.read();
if (done) {
// Allow the serial port to be closed later.
reader.releaseLock();
break;
}
// value is a Uint8Array.
console.log(value);
}
// Write to the serial port.
const writer = port.writable.getWriter();
const data = new Uint8Array([104, 101, 108, 108, 111]); // hello
await writer.write(data);
// Allow the serial port to be closed later.
writer.releaseLock();
最后,`WebSocketStream`[41] 将流与WebSocket API集成。
const wss = new WebSocketStream(WSS_URL);
const { readable, writable } = await wss.connection;
const reader = readable.getReader();
const writer = writable.getWriter();
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
const result = await process(value);
await writer.write(result);
}
资源
Streams specification[42] Accompanying demos[43] Streams polyfill[44] 2016—the year of web streams[45] Async iterators and generators[46] Stream Visualizer[47]
参考资料
实在不好用: https://gist.github.com/igrigorik/5736866
[2]ReadableStream
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream
ReadableStream()
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream
ReadableStreamDefaultController
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultController
ReadableStreamDefaultController.close()
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultController/close
ReadableStreamDefaultController.enqueue()
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultController/enqueue
ReadableStreamDefaultController.error()
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultController/error
ReadableStream()
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/ReadableStream
ByteLengthQueuingStrategy
: https://developer.mozilla.org/en-US/docs/Web/API/ByteLengthQueuingStrategy
CountQueuingStrategy
: https://developer.mozilla.org/en-US/docs/Web/API/CountQueuingStrategy
ReadableStreamDefaultReader
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader
read()
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStreamDefaultReader/read
ReadableStream.locked
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/locked
tee()
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/tee
WritableStream
: https://developer.mozilla.org/en-US/docs/Web/API/WritableStream
WritableStreamDefaultController
: https://developer.mozilla.org/en-US/docs/Web/API/WritableStreamDefaultController
ByteLengthQueuingStrategy
: https://developer.mozilla.org/en-US/docs/Web/API/ByteLengthQueuingStrategy
CountQueuingStrategy
: https://developer.mozilla.org/en-US/docs/Web/API/CountQueuingStrategy
WritableStreamDefaultWriter
: https://developer.mozilla.org/en-US/docs/Web/API/WritableStreamDefaultWriter
write()
: https://developer.mozilla.org/en-US/docs/Web/API/WritableStreamDefaultWriter/write
WritableStream.locked
: https://developer.mozilla.org/en-US/docs/Web/API/WritableStream/locked
pipeTo()
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/pipeTo
pipeThrough()
: https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream/pipeThrough
作为一个流: https://developer.mozilla.org/en-US/docs/Web/API/Streams_API/Using_readable_streams#consuming_a_fetch_as_a_stream
[25]Can I use: https://caniuse.com/streams
[26]参考实现: https://github.com/whatwg/streams/tree/master/reference-implementation
[27]polyfill: https://github.com/MattiasBuelens/web-streams-polyfill
[28]demo: https://streams-demo.glitch.me/
[29]source code: https://glitch.com/edit/#!/streams-demo?path=script.js
[30]Blob
: https://developer.mozilla.org/en-US/docs/Web/API/Blob
stream(): https://developer.mozilla.org/en-US/docs/Web/API/Blob/stream
[32]File
: https://developer.mozilla.org/en-US/docs/Web/API/File
TextDecoderStream
: https://encoding.spec.whatwg.org/#interface-textdecoderstream
TextEncoderStream
: https://encoding.spec.whatwg.org/#interface-textencoderstream
CompressionStream
: https://wicg.github.io/compression/#compression-stream
DecompressionStream
: https://wicg.github.io/compression/#decompression-stream
File System Access API: https://wicg.github.io//file-system-access/
[38]FileSystemWritableFileStream
: https://wicg.github.io/file-system-access/#filesystemwritablefilestream
fetch()
请求流: https://web.dev/fetch-upload-streaming/#writable-streams
Serial API: https://web.dev/serial/
[41]WebSocketStream
: https://web.dev/websocketstream/
Streams specification: https://streams.spec.whatwg.org/
[43]Accompanying demos: https://streams.spec.whatwg.org/demos/
[44]Streams polyfill: https://github.com/MattiasBuelens/web-streams-polyfill
[45]2016—the year of web streams: https://jakearchibald.com/2016/streams-ftw/
[46]Async iterators and generators: https://jakearchibald.com/2017/async-iterators-and-generators/
[47]Stream Visualizer: https://surma.dev/lab/whatwg-stream-visualizer/lab.html
- EOF -
觉得本文对你有帮助?请分享给更多人
关注「大前端技术之路」加星标,提升前端技能
点赞和在看就是最大的支持❤️