How to use readable-stream

Comprehensive readable-stream code examples:

How to use readable-stream.setEncoding:

3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
        destroyArgs = args;
    }
};
stream.text = () => new Promise((resolve, reject) => {
    let body = '';
    stream.setEncoding('utf8');
    stream.on('data', chunk => body += chunk);
    stream.on('end', () => resolve(body));
    stream.on('error', reject);
});

How to use readable-stream.Stream:

123
124
125
126
127
128
129
130
131
132
 * Returns a log stream for this transport. Options object is optional.
 * @param {Object} options - Stream options for this instance.
 * @returns {Stream} - TODO: add return description
 */
stream(options = {}) {
  const stream = new Stream();
  options = {
    method: 'stream',
    params: options
  };

How to use readable-stream.loadavg:

122
123
124
125
126
127
128
129
130
131
    return this;
};
Monitor.prototype._cycle = function () {
    var _a;
    var info = {
        loadavg: os.loadavg(),
        uptime: os.uptime(),
        freemem: os.freemem(),
        totalmem: os.totalmem()
    }, config = this.config();

How to use readable-stream.freemem:

124
125
126
127
128
129
130
131
132
133
Monitor.prototype._cycle = function () {
    var _a;
    var info = {
        loadavg: os.loadavg(),
        uptime: os.uptime(),
        freemem: os.freemem(),
        totalmem: os.totalmem()
    }, config = this.config();
    if (fs.statfsSync && config.diskfree && Object.keys(config.diskfree).length) {
        info.diskfree = info.diskfree || {};

How to use readable-stream.uptime:

123
124
125
126
127
128
129
130
131
132
};
Monitor.prototype._cycle = function () {
    var _a;
    var info = {
        loadavg: os.loadavg(),
        uptime: os.uptime(),
        freemem: os.freemem(),
        totalmem: os.totalmem()
    }, config = this.config();
    if (fs.statfsSync && config.diskfree && Object.keys(config.diskfree).length) {

How to use readable-stream.cpus:

32
33
34
35
36
37
38
39
40
41
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
// NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
var os = require('os'), fs = require('fs'), events = require('events'), stream = require('readable-stream'), _ = require('underscore'), version = require('./package.json').version, critical = os.cpus().length;
var EventType;
(function (EventType) {
    EventType["MONITOR"] = "monitor";
    EventType["UPTIME"] = "uptime";

How to use readable-stream.totalmem:

125
126
127
128
129
130
131
132
133
134
var _a;
var info = {
    loadavg: os.loadavg(),
    uptime: os.uptime(),
    freemem: os.freemem(),
    totalmem: os.totalmem()
}, config = this.config();
if (fs.statfsSync && config.diskfree && Object.keys(config.diskfree).length) {
    info.diskfree = info.diskfree || {};
    for (var path in config.diskfree) {

How to use readable-stream.finished:

105
106
107
108
109
110
111
112
113
114

  self[kFlushed]()
})

const proxy = Duplex.from({ writable: decode, readable: encode })
finished(proxy, cleanup)
this[kRpcStream] = proxy
return proxy

function cleanup () {

How to use readable-stream.getTracks:

5631
5632
5633
5634
5635
5636
5637
5638
5639
5640
5641
5642
Peer.prototype.addStream = function (stream) {
  var self = this


  self._debug('addStream()')


  stream.getTracks().forEach(function (track) {
    self.addTrack(track, stream)
  })
}

How to use readable-stream.readable:

7757
7758
7759
7760
7761
7762
7763
7764
7765
7766
7767
7768


function eos(stream, opts, callback) {
  if (typeof opts === 'function') return eos(stream, null, opts);
  if (!opts) opts = {};
  callback = once(callback || noop);
  var readable = opts.readable || opts.readable !== false && stream.readable;
  var writable = opts.writable || opts.writable !== false && stream.writable;


  var onlegacyfinish = function onlegacyfinish() {
    if (!stream.writable) onfinish();

How to use readable-stream.destroyed:

6950
6951
6952
6953
6954
6955
6956
6957
6958
6959
var cb = state.writecb;
if (typeof cb !== 'function') throw new ERR_MULTIPLE_CALLBACK();
onwriteStateUpdate(state);
if (er) onwriteError(stream, state, sync, er, cb);else {
  // Check if we're actually ready to finish, but don't emit yet
  var finished = needFinish(state) || stream.destroyed;

  if (!finished && !state.corked && !state.bufferProcessing && state.bufferedRequest) {
    clearBuffer(stream, state);
  }

How to use readable-stream.writable:

7758
7759
7760
7761
7762
7763
7764
7765
7766
7767
7768
function eos(stream, opts, callback) {
  if (typeof opts === 'function') return eos(stream, null, opts);
  if (!opts) opts = {};
  callback = once(callback || noop);
  var readable = opts.readable || opts.readable !== false && stream.readable;
  var writable = opts.writable || opts.writable !== false && stream.writable;


  var onlegacyfinish = function onlegacyfinish() {
    if (!stream.writable) onfinish();
  };

How to use readable-stream.abort:

7886
7887
7888
7889
7890
7891
7892
7893
7894
7895
7896
  return function (err) {
    if (closed) return;
    if (destroyed) return;
    destroyed = true; // request.destroy just do .end - .abort is what we want


    if (isRequest(stream)) return stream.abort();
    if (typeof stream.destroy === 'function') return stream.destroy();
    callback(err || new ERR_STREAM_DESTROYED('pipe'));
  };
}

How to use readable-stream.end:

54
55
56
57
58
59
60
61
62
63

  this._buf = this._cb = this._str = this._ondrain = null
  drained = true

  this._ignoreEmpty = false
  if (stream) stream.end()
  if (cb) cb(buf)
}

data = consumed === data.length ? EMPTY : data.slice(consumed)

How to use readable-stream._writev:

6907
6908
6909
6910
6911
6912
6913
6914
6915
6916
6917
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
  state.writelen = len;
  state.writecb = cb;
  state.writing = true;
  state.sync = true;
  if (state.destroyed) state.onwrite(new ERR_STREAM_DESTROYED('write'));else if (writev) stream._writev(chunk, state.onwrite);else stream._write(chunk, encoding, state.onwrite);
  state.sync = false;
}


function onwriteError(stream, state, sync, er, cb) {

How to use readable-stream.pause:

6141
6142
6143
6144
6145
6146
6147
6148
6149

  var ret = _this.push(chunk);

  if (!ret) {
    paused = true;
    stream.pause();
  }
}); // proxy all the other methods.
// important when wrapping filters and duplexes.

How to use readable-stream.once:

7144
7145
7146
7147
7148
7149
7150
7151
7152
7153
7154
7155
function endWritable(stream, state, cb) {
  state.ending = true;
  finishMaybe(stream, state);


  if (cb) {
    if (state.finished) process.nextTick(cb);else stream.once('finish', cb);
  }


  state.ended = true;
  stream.writable = false;

How to use readable-stream._final:

7088
7089
7090
7091
7092
7093
7094
7095
7096
7097
7098
7099
function needFinish(state) {
  return state.ending && state.length === 0 && state.bufferedRequest === null && !state.finished && !state.writing;
}


function callFinal(stream, state) {
  stream._final(function (err) {
    state.pendingcb--;


    if (err) {
      errorOrDestroy(stream, err);

How to use readable-stream.resume:

6168
6169
6170
6171
6172
6173
6174
6175
6176
6177
this._read = function (n) {
  debug('wrapped _read', n);

  if (paused) {
    paused = false;
    stream.resume();
  }
};

return this;

How to use readable-stream.prototype:

38
39
40
41
42
43
44
45
46
    this._receiverCapacity += size;
    this._push();
};

DataStream.prototype.end = function () {
    Duplex.prototype.end.apply(this, arguments);
    this._finished = true;
    this._push();
};

How to use readable-stream.req:

7799
7800
7801
7802
7803
7804
7805
7806
7807
7808
    return callback.call(stream, err);
  }
};

var onrequest = function onrequest() {
  stream.req.on('finish', onfinish);
};

if (isRequest(stream)) {
  stream.on('complete', onfinish);

How to use readable-stream.pipeline:

22
23
24
25
26
27
28
29
30
31
const ejs = require('gulp-ejs');
const fg = require('fast-glob');

// 从public压缩复制文件到build
function minifyJs() {
  return pipeline(src('public/**/*.js'), uglifyjs(), dest('build'));
}

function minifyCss() {
  return src('public/**/*.css').pipe(cleanCSS()).pipe(dest('build'));

How to use readable-stream.push:

6489
6490
6491
6492
6493
6494
6495
6496
6497
6498
6499
6500
};


function done(stream, er, data) {
  if (er) return stream.emit('error', er);
  if (data != null) // single equals check for both `null` and `undefined`
    stream.push(data); // TODO(BridgeAR): Write a test for these two error cases
  // if there's nothing in the write buffer, then that means
  // that nothing more will ever be provided


  if (stream._writableState.length) throw new ERR_TRANSFORM_WITH_LENGTH_0();

How to use readable-stream.destroy:

3089
3090
3091
3092
3093
3094
3095
3096
3097
3098
};
stream.abort = (err) => {
    console.warn('`MinigetStream#abort()` has been deprecated in favor of `MinigetStream#destroy()`');
    stream.aborted = true;
    stream.emit('abort');
    stream.destroy(err);
};
let destroyArgs;
const streamDestroy = (err) => {
    activeRequest.destroy(err);

How to use readable-stream.Duplex:

152
153
154
155
156
157
158
159
160
161
- __'finish'__:
- __'pipe'__:
- __'unpipe'__:
- __'error'__:

### stream.Duplex
- __'readable'__:
- __'data'__:
- __'end'__:
- __'close'__:

How to use readable-stream.read:

5761
5762
5763
5764
5765
5766
5767
5768
5769
//   read()s. The execution ends in this method again after the _read() ends
//   up calling push() with more data.
while (!state.reading && !state.ended && (state.length < state.highWaterMark || state.flowing && state.length === 0)) {
  var len = state.length;
  debug('maybeReadMore read 0');
  stream.read(0);
  if (len === state.length) // didn't get any data, stop spinning.
    break;
}

How to use readable-stream.Transform:

8
9
10
11
12
13
14
15
16
  createTransformFunction,
  parseOptions,
} = require('../src/pino-mozlog/index');

const options = parseOptions(process.argv.slice(2));
const mozlogTransport = new Transform({
  objectMode: true,
  transform: createTransformFunction({ options }),
});

How to use readable-stream._writableState:

6268
6269
6270
6271
6272
6273
6274
6275
6276
6277
stream.emit('end');

if (state.autoDestroy) {
  // In case of duplex streams we need a way to detect
  // if the writable side is ready for autoDestroy as well
  var wState = stream._writableState;

  if (!wState || wState.autoDestroy && wState.finished) {
    stream.destroy();
  }

How to use readable-stream.Readable:

3
4
5
6
7
8
9
10
11

var hasOwnProp = require('has-own-prop');
var toArray = require('stream-to-array');

var readableStream = require('readable-stream');
var Readable = readableStream.Readable;
var Writable = readableStream.Writable;
var Transform = readableStream.Transform;
var PassThrough = readableStream.PassThrough;