Swap-Aggregator-Subgraph/node_modules/pull-to-stream/index.js
Richa-iitr d211083153 Revert "Revert "added handler""
This reverts commit c36ee8c5ca.
2022-07-03 07:30:05 +05:30

124 lines
3.0 KiB
JavaScript

'use strict';
const { Duplex } = require('readable-stream');
class PullDuplexStream extends Duplex {
constructor(source, sink, options) {
super(options);
this.source = source;
this.drainingSource = false;
this.sink = sink;
this.input = [];
this.writeCallbacks = [];
this.internalSourceCallbacks = [];
if (this.sink) {
this.sink(this._internalSource.bind(this));
}
}
drainPull() {
const self = this;
this.drainingSource = true;
this.source(null, function next(end, data) {
if (end instanceof Error) {
return self.emit('error', end);
}
if (end) {
return self.push(null);
}
if (self.push(data)) {
self.source(null, next);
} else {
self.drainingSource = false;
}
});
}
_read() {
if (this.source && !this.drainingSource) {
this.drainPull();
}
}
_write(chunk, encoding, callback) {
if (this.internalSourceCallbacks.length > 0) {
this.internalSourceCallbacks.shift()(null, chunk);
callback();
} else {
this.input.push(chunk);
this.writeCallbacks.push(callback);
}
}
_internalSource(end, cb) {
if (end) {
if (this.writeCallbacks.length > 0) {
// call write callback with error
this.writeCallbacks.shift()(
end instanceof Error ? end : new Error('Aborted')
);
}
return cb(end);
}
if (this.input.length > 0) {
cb(null, this.input.shift());
this.writeCallbacks.shift()();
} else {
this.internalSourceCallbacks.push(cb);
}
}
_final(callback) {
// end the internal source
if (this.internalSourceCallbacks.length > 0) {
this.internalSourceCallbacks.shift()(true);
}
callback();
}
_destroy(err, cb) {
// abort the source
if (!this._readableState.ended && this.source) {
this.source(true, () => {
// do nothing
});
}
// propagate error to sink
if (this.internalSourceCallbacks.length > 0) {
this.internalSourceCallbacks.shift()(err);
}
cb(err);
}
}
function wrapper(source, sink, options) {
if (source && typeof source === 'object') {
source = source.source;
sink = source.sink;
}
return new PullDuplexStream(
source,
sink,
Object.assign(
{
readableObjectMode: true,
writableObjectMode: true
},
options
)
);
}
module.exports = {
duplex: wrapper,
readable: (source, options) => wrapper(source, null, options),
writeable: (sink, options) => wrapper(null, sink, options)
};