How to use event-stream

Comprehensive event-stream code examples:

How to use event-stream.readable:

12
13
14
15
16
17
18
19
20
21
var key = new Buffer(options.key),
    client = options.client,
    maxReadLength = options.maxReadLength || DEFAULT_READ_LENGTH,
    current = 0;

return es.readable(function(count, callback) {
  var that = this;
  client.getrange(key, current, current + maxReadLength - 1, function(err, data) {
    if (err) {
      that.emit('error', err);

How to use event-stream.resume:

119
120
121
122
123
124
125
126
127
128
    logger.error(`${date} [${count}] (${data.id}): ${e.response.text}`);
    failedRecs.push(data);
    fail++;
  }
  await wait(config.delay);
  es.resume();
}

const stream = fs.createReadStream(inFile, { encoding: "utf8" });
let streamCount = 0;

How to use event-stream.replace:

113
114
115
116
117
118
119
120
121
122
  const originalPipe = stream.pipe;
  stream.pipe = function (resp) {
    originalPipe
      .call(
        stream,
        es.replace(new RegExp(injectTag, 'i'), INJECTED_CODE + injectTag)
      )
      .pipe(resp);
  };
}

How to use event-stream.wait:

24
25
26
27
28
29
30
31
32
33
  sample: 0.25,
});

// Explicitly end the stream since we're not going to write any additional users to it.
gate.toStream({ end: true }).pipe(
  es.wait(function(err, jsonString) {
    if (err) {
      done.fail(err);
    } else {
      expect(JSON.parse(jsonString)).toEqual({

How to use event-stream.readArray:

22
23
24
25
26
27
28
29
30
31

it('should prepend text', function(done) {

  // 创建伪文件
  var fakeFile = new File({
    contents: es.readArray(['stream', 'with', 'those', 'contents'])
  });

  // 创建一个 prefixer 流(stream)
  var myPrefixer = prefixer('prependthis');

How to use event-stream.concat:

43
44
45
46
47
48
49
50
51
52
      cacheBust('server/pages/_includes/', 'head.html'),
      cacheBust('server/pages/_includes/', 'scripts.html'),
    ];
  };

  return es.concat(bustArray());
}

function bustCacheAndReload(done) {
  bustCache().on('end', function () {

How to use event-stream.duplex:

63
64
65
66
67
68
69
70
71
72
        if (state === 'idle') {
            eventuallyRun();
        }
    });

    return es.duplex(input, output);
};

exports.fixWin32DirectoryPermissions = () => {
    if (!/win32/.test(process.platform)) {

How to use event-stream.mapSync:

81
82
83
84
85
86
87
88
89
        return f;
    });
};

exports.setExecutableBit = pattern => {
    var setBit = es.mapSync(f => {
        f.stat.mode = /* 100755 */ 33261;
        return f;
    });

How to use event-stream.pipeline:

47
48
49
50
51
52
53
54
55
56
child.stdin.write(allFiles + '\n')
child.stdin.end()

// stream <stdout> into a JSON parser
// parse every top-level object and emit it on the stream
return es.pipeline(
  child.stdout,
  through2(chunkToString),
  JSONStream.parse([true])
)

How to use event-stream.map:

95
96
97
98
99
100
101
102
103
104
        }

        this.emit('data', file);
});

const formatting = es.map(function (file, cb) {
        tsfmt
                .processString(file.path, file.contents.toString('utf8'), {
                        verify: false,
                        tsfmt: true,

How to use event-stream.through:

153
154
155
156
157
158
159
160
161
162

watch('build/monaco/*').pipe(es.through(function() {
    runSoon(5000);
}));

resultStream = es.through(function(data) {
    const filePath = path.normalize(data.path);
    if (filesToWatchMap[filePath]) {
        runSoon(5000);
    }

How to use event-stream.merge:

31
32
33
34
35
36
37
38
39
40
    filename: ele
  }));

  // 映射与合并js流
  let streams = bundleTasks.map(jsTask);
  return es.merge(streams);
});

// [删除]发布目录下的js文件
gulp.task('del:js', () => del.sync([`${DIST_JS_PATH}/*`]));