Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 86 additions & 54 deletions lib/StreamCache.js
Original file line number Diff line number Diff line change
@@ -1,57 +1,89 @@
var Util = require('util');
var Stream = require('stream').Stream;
"use strict";

module.exports = StreamCache;
Util.inherits(StreamCache, Stream);
function StreamCache() {
Stream.call(this);

this.writable = true;
this.readable = true;

this._buffers = [];
this._dests = [];
this._ended = false;
class StreamCache extends require('stream').Stream{
constructor(){
super();

this.writable=true;
this.readable=true;

this.length=0;

//contains just one buffer if ended
this._buffers=[];
//is set to null if ended
this._dests=[];
}

write(buffer){
const dests=this._dests;
if(dests!==null){}
else
throw Error('stream already ended');

if(buffer.constructor===Buffer){}
else
throw Error('buffer expected');

this._buffers.push(buffer);
this.length+=buffer.length;

var i=dests.length;
while(i--)
dests[i].write(buffer);

return true;
}

end(buffer){
const dests=this._dests;
if(buffer===undefined){
if(dests!==null){}
else
throw Error('stream already ended');
}
else
this.write(buffer);

var i=dests.length;
while(i--)
dests[i].end();

this._dests=null;

//merge all buffers into one since there will be no more being added
//(saves memory and loopings in future pipe calls)
this._buffers=[
Buffer.concat(this._buffers)
];

return this;
}

pipe(dest,options){
if(options===undefined){}
else
throw Error('options not supported');

//TODO try to asynchronize buffer flow, since with this implementation,
// it blocks execution until all data is written out

const buffers=this._buffers,dests=this._dests;
if(dests===null)
return dest.end(buffers[0]);

for(var i=0,l=buffers.length;i<l;++i)
dest.write(buffers[i]);

dests.push(dest);

return dest;
}

//deprecated
getLength(){
return this.length;
}
}

StreamCache.prototype.write = function(buffer) {
this._buffers.push(buffer);

this._dests.forEach(function(dest) {
dest.write(buffer);
});
};

StreamCache.prototype.pipe = function(dest, options) {
if (options) {
throw Error('StreamCache#pipe: options are not supported yet.');
}

this._buffers.forEach(function(buffer) {
dest.write(buffer);
});

if (this._ended) {
dest.end();
return dest;
}

this._dests.push(dest);

return dest;
};

StreamCache.prototype.getLength = function() {
return this._buffers.reduce(function(totalLength, buffer) {
return totalLength + buffer.length;
}, 0);
};

StreamCache.prototype.end = function() {
this._dests.forEach(function(dest) {
dest.end();
});

this._ended = true;
this._dests = [];
};
module.exports=StreamCache;