const Minipass = require('minipass') const EE = require('events') const isStream = s => s && s instanceof EE && ( typeof s.pipe === 'function' || // readable (typeof s.write === 'function' && typeof s.end === 'function') // writable ) const _head = Symbol('_head') const _tail = Symbol('_tail') const _linkStreams = Symbol('_linkStreams') const _setHead = Symbol('_setHead') const _setTail = Symbol('_setTail') const _onError = Symbol('_onError') const _onData = Symbol('_onData') const _onEnd = Symbol('_onEnd') const _onDrain = Symbol('_onDrain') const _streams = Symbol('_streams') class Pipeline extends Minipass { constructor (opts, ...streams) { if (isStream(opts)) { streams.unshift(opts) opts = {} } super(opts) this[_streams] = [] if (streams.length) this.push(...streams) } [_linkStreams] (streams) { // reduce takes (left,right), and we return right to make it the // new left value. return streams.reduce((src, dest) => { src.on('error', er => dest.emit('error', er)) src.pipe(dest) return dest }) } push (...streams) { this[_streams].push(...streams) if (this[_tail]) streams.unshift(this[_tail]) const linkRet = this[_linkStreams](streams) this[_setTail](linkRet) if (!this[_head]) this[_setHead](streams[0]) } unshift (...streams) { this[_streams].unshift(...streams) if (this[_head]) streams.push(this[_head]) const linkRet = this[_linkStreams](streams) this[_setHead](streams[0]) if (!this[_tail]) this[_setTail](linkRet) } destroy (er) { // set fire to the whole thing. this[_streams].forEach(s => typeof s.destroy === 'function' && s.destroy()) return super.destroy(er) } // readable interface -> tail [_setTail] (stream) { this[_tail] = stream stream.on('error', er => this[_onError](stream, er)) stream.on('data', chunk => this[_onData](stream, chunk)) stream.on('end', () => this[_onEnd](stream)) stream.on('finish', () => this[_onEnd](stream)) } // errors proxied down the pipeline // they're considered part of the "read" interface [_onError] (stream, er) { if (stream === this[_tail]) this.emit('error', er) } [_onData] (stream, chunk) { if (stream === this[_tail]) super.write(chunk) } [_onEnd] (stream) { if (stream === this[_tail]) super.end() } pause () { super.pause() return this[_tail] && this[_tail].pause && this[_tail].pause() } // NB: Minipass calls its internal private [RESUME] method during // pipe drains, to avoid hazards where stream.resume() is overridden. // Thus, we need to listen to the resume *event*, not override the // resume() method, and proxy *that* to the tail. emit (ev, ...args) { if (ev === 'resume' && this[_tail] && this[_tail].resume) this[_tail].resume() return super.emit(ev, ...args) } // writable interface -> head [_setHead] (stream) { this[_head] = stream stream.on('drain', () => this[_onDrain](stream)) } [_onDrain] (stream) { if (stream === this[_head]) this.emit('drain') } write (chunk, enc, cb) { return this[_head].write(chunk, enc, cb) && (this.flowing || this.buffer.length === 0) } end (chunk, enc, cb) { this[_head].end(chunk, enc, cb) return this } } module.exports = Pipeline