How to use the drain function from pull-stream

Find comprehensive JavaScript pull-stream.drain code examples handpicked from public code repositorys.

pull-stream.drain is a function that pulls all data from a Readable Stream, discards it, and calls an optional callback function upon completion.

1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
          })
        )
      ),
      toPullStream()
    ),
    (drainer = pull.drain((msg) => {
      t.fail('should not drain yet')
    }))
  )
})
fork icon6
star icon48
watch icon8

+ 6 other calls in file

190
191
192
193
194
195
196
197
198
199
  getMsgByOffset(latest.value.offset, (err, kvt) => {
    if (err) cb(err)
    else cb(null, kvt)
  })
}, 8),
pull.drain(
  (kvt) => {
    state.updateFromKVT(PrivateIndex.reEncrypt(kvt))
  },
  (err) => {
fork icon6
star icon41
watch icon6

+ 24 other calls in file

How does pull-stream.drain work?

pull-stream.drain is a helper function used to consume a stream and discard all of its data, while still ensuring that the stream is completely consumed, without backpressure issues or memory leaks. It takes a callback function that is called for each chunk of data in the stream until it is completely consumed, and it can also take an optional error handler callback to handle any errors that may occur during consumption of the stream.

84
85
86
87
88
89
90
91
92
93
          groupMemberId,
        }))
      )
    ),
    pull.flatten(),
    (groupMemberStream = pull.drain(({ groupMemberId, groupSecret }) => {
      requestManager.addGroupMember(groupMemberId, groupSecret)
    }))
  )
}
fork icon1
star icon8
watch icon5

+ 221 other calls in file

816
817
818
819
820
821
822
823
824
    if (err) return cb(null, null)
    return cb(null, msg_.value)
  })
}),
pull.filter( (x) => x !== null ),
pull.drain(function (msgValue) {
  var author = msgValue.author
  var op = msgValue.content
  var tokenType = op.tokenType
fork icon0
star icon4
watch icon2

+ 139 other calls in file

Ai Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const pull = require("pull-stream");
const values = [1, 2, 3, 4, 5];

// Create a readable stream from an array of values
const readable = pull.values(values);

// Use drain to consume the stream
pull(
  readable,
  pull.drain(
    (value) => {
      console.log(`Received value: ${value}`);
    },
    (err) => {
      console.error(`Error: ${err}`);
    },
    () => {
      console.log("Stream ended");
    }
  )
);

In this example, pull.values is used to create a readable stream from an array of values. The pull.drain function is used to consume the stream, which takes three arguments: A function to call for each value received from the stream A function to call if an error occurs A function to call when the stream ends The first function simply logs each value to the console, while the second and third functions log an error or a message when the stream ends, respectively.

46
47
48
49
50
51
52
53
54
55

var print = null
if (args['only-id']) {
  print = pull.drain((obj) => console.log(obj.id), cb) 
} else if (args.csv) {
  print = pull.drain((obj) => console.log((obj.name ? [obj.id, obj.name] : [obj.id]).join(',')), cb)
} else if (typeof args.json === "undefined" || args.json) {
  print = pull.drain((obj) => console.log(JSON.stringify(obj)), cb) 
} else {
  return cb(new Error("Invalid printing option"))
fork icon0
star icon4
watch icon2

+ 209 other calls in file

48
49
50
51
52
53
54
55
56
57
var util = require('./util')(ssb)

if (args['graph-easy']) {
  pull(
    pull.values(keys),
    pull.drain(function (k) {
      if (Object.keys(tangle[k]).length > 0) 
        Object.keys(tangle[k]).forEach(function (k2) {
          var label = tangle[k][k2] || 'source'
          console.log("[" + prefix(k) + "] - " + label + " -> " + "[ " + prefix(k2) + "]")
fork icon0
star icon4
watch icon2

+ 236 other calls in file

277
278
279
280
281
282
283
284
285
    }),
    pw.recent(opts.windowSize, opts.windowTime),
    pull.asyncMap(function (batch, cb) {
      db.batch(batch, cb)
    }),
    pull.drain(null, done)
  )
}

fork icon0
star icon1
watch icon3

+ 5 other calls in file

338
339
340
341
342
343
344
345
346
347
  pull.through(null, function() {
    queue = createQueue();
    readyState = RS_DISCONNECTED;
    signaller('disconnected');
  }),
  pull.drain(signaller._process)
);

// pass the queue to the sink
pull(queue, sink);
fork icon19
star icon0
watch icon3

+ 3 other calls in file

381
382
383
384
385
386
387
388
389
      pull.map(updateMigratedSizeAndPluck),
      pull.map(toBIPF),
      pull.asyncMap(writeToNewLog),
      (drainAborter = config.db2.maxCpu
        ? drainGently(tooHotOpts(config), op, opDone)
        : pull.drain(op, opDone))
    )
  })
}
fork icon6
star icon41
watch icon0

52
53
54
55
56
57
58
59
60
61
function fetchBottomBy (amt) {
  var lastmsg
  var renderedKeys = []
  pull(
    opts.feed({ reverse: true, limit: amt||30, lt: cursor(botcursor) }),
    pull.drain(function (msg) {
      lastmsg = msg

      // filter
      if (opts.filter && !opts.filter(msg))
fork icon378
star icon0
watch icon2

+ 3 other calls in file

91
92
93
94
95
96
97
98
99
100

module.exports = {
  createPersistentLonpgollStream: createPersistentLonpgollStream,
  init(api, rp) {
    const sink = new EventEmitter();
    const drain = pull.drain(function(data) {
      sink.emit('data', data);
    });
    pull(
      createPersistentLonpgollStream(api, rp),
fork icon10
star icon34
watch icon15

565
566
567
568
569
570
571
572
573
574
pull.filter((progress) => {
  console.log(progress)
  return progress === 1
}),
pull.take(1),
pull.drain(async () => {
  endMeasure(t, 'migrate (+db1)')
  await sleep(2000) // wait for new log FS writes to finalize
  sbot.close(true, () => ended.resolve())
})
fork icon6
star icon41
watch icon0

+ 5 other calls in file

27
28
29
30
31
32
33
34
35
36
37
  queue,
  pullParaMap(
    (continuable, done) => continuable(done),
    10 // width of parallel processing
  ),
  pull.drain()
)


export default function (apollo) {
  const state = {
fork icon1
star icon11
watch icon0

146
147
148
149
150
151
152
153
154
155
  }
}

pull(
  sbot.metafeeds.branchStream({ old: false, live: true }),
  pull.drain((branch) => {
    const dentalFeed = branch.find((feed) => feed.purpose === 'dental')
    if (!dentalFeed) return

    t.equal(dentalFeed.purpose, 'dental', 'finds encrypted feed (live)')
fork icon0
star icon10
watch icon0

+ 2 other calls in file

98
99
100
101
102
103
104
105
106
pull.filter(m => (
  m.value.author === m.value.content.about &&
  m.value.content.image
)),
pull.asyncMap((m, cb) => ssb.aboutSelf.get(m.value.author, cb)),
pull.drain(about => {
  if (about.image) {
    ssb.blobs.has(about.image, (err, hasBlob) => {
      if (err) return console.warn(err)
fork icon2
star icon3
watch icon0

172
173
174
175
176
177
178
179
180
181
  old: true,
  dest: rootMessage.key,
  useBlocksFrom: rootMessage.value.author,
  types: ['post', 'about']
})),
pull.drain(msg => {
  if (msg.sync) {
    // actually add container to DOM when we get sync on thread
    sync = true
    result.set(container)
fork icon0
star icon0
watch icon1

+ 5 other calls in file