可读流的功能是作为上游,提供数据给下游。
本文中用readable来指代一个Readable实例。
可读流通过push方法产生数据,存入readable的缓存中。
当调用push(null)时,便宣告了流的数据产生的结束。
正常情况下,需要为流实例提供一个_read方法,在这个方法中调用push产生数据。
既可以在同一个tick中(同步)调用push,也可以异步的调用(通常如此)。
在需要数据时,流内部会自动调用_read方法来往缓存中添加数据。
var Stream = require('stream')
var readable = Stream.Readable()
var source = ['a', 'b', 'c']
readable._read = function () {
this.push(source.shift() || null)
}或
var Stream = require('stream')
var source = ['a', 'b', 'c']
var readable = Stream.Readable({
read: function () {
this.push(source.shift() || null)
},
})在数据被消耗完时,会触发end事件。
所谓“消耗完”,需要满足两个条件:
- 已经调用
push(null),声明不会再有任何新的数据产生 - 缓存中的数据也被读取完
var Stream = require('stream')
var source = ['a', 'b', 'c']
var readable = Stream.Readable({
read: function () {
var data = source.shift() || null
console.log('push', data)
this.push(data)
},
})
readable.on('end', function () {
console.log('end')
})
readable.on('data', function (data) {
console.log('data', data)
})输出:
⌘ node example/event-end.js
push a
push b
data <Buffer 61>
push c
data <Buffer 62>
push null
data <Buffer 63>
end
下游通过监听data事件(flowing模式)或通过调用read方法(paused模式),
从缓存中获取数据进行消耗。
在flowing模式下,readable的数据会持续不断的生产出来,
每个数据都会触发一次data事件,通过监听该事件来获得数据。
以下条件均可以使readable进入flowing模式:
- 调用
resume方法 - 如果之前未调用
pause方法进入paused模式,则监听data事件也会调用resume方法。 readable.pipe(writable)。pipe中会监听data事件。
var Stream = require('stream')
var source = ['a', 'b', 'c']
var readable = Stream.Readable({
read: function () {
this.push(source.shift() || null)
},
})
readable.on('data', function (data) {
console.log(data)
})输出结果:
⌘ node example/flowing-mode.js
<Buffer 61>
<Buffer 62>
<Buffer 63>
通常,并不直接监听data事件去消耗流,而是通过pipe方法去消耗。
在paused模式下,通过readable.read去获取数据。
以下条件均可使readable进入paused模式:
- 流创建完成,即初始状态
- 在
flowing模式下调用pause方法 - 通过
unpipe移除所有下游
除了初始状态外,很少会在paused模式下使用流。
一般不会调用pause方法或是unpipe方法从flowing模式切换至paused模式。
var Stream = require('stream')
var source = ['a', 'b', 'c']
var readable = Stream.Readable({
read: function () {
this.push(source.shift() || null)
},
})
readable.on('readable', function () {
var data
while (data = this.read()) {
console.log(data)
}
})输出结果:
⌘ node example/paused-mode.js
<Buffer 61 62>
<Buffer 63>
readable事件表示流中产生了新数据,或是到了流的尽头。
read(n)时,会从缓存中试图读取相应的字节数。
当n未指定时,会一次将缓存中的字节全部读取。
如果在flowing模式下调用read(),不指定n时,会逐个元素地将缓存读完,
而不像paused模式会一次全部读取。
- 使用
push来产生数据。 - 必须调用
push(null)来结束流,否则下游会一直等待。 push可以同步调用,也可异步调用。end事件表示可读流中的数据已被完全消耗。- 通常在flowing模式下使用可读流。