How to use the drain function from pull-stream
Find comprehensive JavaScript pull-stream.drain code examples handpicked from public code repositorys.
pull-stream.drain is a function that pulls all data from a Readable Stream, discards it, and calls an optional callback function upon completion.
GitHub: ssbc/jitdb
1171 1172 1173 1174 1175 1176 1177 1178 1179 1180
}) ) ), toPullStream() ), (drainer = pull.drain((msg) => { t.fail('should not drain yet') })) ) })
+ 6 other calls in file
GitHub: ssbc/ssb-db2
190 191 192 193 194 195 196 197 198 199
getMsgByOffset(latest.value.offset, (err, kvt) => { if (err) cb(err) else cb(null, kvt) }) }, 8), pull.drain( (kvt) => { state.updateFromKVT(PrivateIndex.reEncrypt(kvt)) }, (err) => {
+ 24 other calls in file
How does pull-stream.drain work?
pull-stream.drain is a helper function used to consume a stream and discard all of its data, while still ensuring that the stream is completely consumed, without backpressure issues or memory leaks. It takes a callback function that is called for each chunk of data in the stream until it is completely consumed, and it can also take an optional error handler callback to handle any errors that may occur during consumption of the stream.
84 85 86 87 88 89 90 91 92 93
groupMemberId, })) ) ), pull.flatten(), (groupMemberStream = pull.drain(({ groupMemberId, groupSecret }) => { requestManager.addGroupMember(groupMemberId, groupSecret) })) ) }
+ 221 other calls in file
GitHub: elavoie/ssb-tokens
816 817 818 819 820 821 822 823 824
if (err) return cb(null, null) return cb(null, msg_.value) }) }), pull.filter( (x) => x !== null ), pull.drain(function (msgValue) { var author = msgValue.author var op = msgValue.content var tokenType = op.tokenType
+ 139 other calls in file
Ai Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
const pull = require("pull-stream"); const values = [1, 2, 3, 4, 5]; // Create a readable stream from an array of values const readable = pull.values(values); // Use drain to consume the stream pull( readable, pull.drain( (value) => { console.log(`Received value: ${value}`); }, (err) => { console.error(`Error: ${err}`); }, () => { console.log("Stream ended"); } ) );
In this example, pull.values is used to create a readable stream from an array of values. The pull.drain function is used to consume the stream, which takes three arguments: A function to call for each value received from the stream A function to call if an error occurs A function to call when the stream ends The first function simply logs each value to the console, while the second and third functions log an error or a message when the stream ends, respectively.
GitHub: elavoie/ssb-tokens
46 47 48 49 50 51 52 53 54 55
var print = null if (args['only-id']) { print = pull.drain((obj) => console.log(obj.id), cb) } else if (args.csv) { print = pull.drain((obj) => console.log((obj.name ? [obj.id, obj.name] : [obj.id]).join(',')), cb) } else if (typeof args.json === "undefined" || args.json) { print = pull.drain((obj) => console.log(JSON.stringify(obj)), cb) } else { return cb(new Error("Invalid printing option"))
+ 209 other calls in file
GitHub: elavoie/ssb-tokens
48 49 50 51 52 53 54 55 56 57
var util = require('./util')(ssb) if (args['graph-easy']) { pull( pull.values(keys), pull.drain(function (k) { if (Object.keys(tangle[k]).length > 0) Object.keys(tangle[k]).forEach(function (k2) { var label = tangle[k][k2] || 'source' console.log("[" + prefix(k) + "] - " + label + " -> " + "[ " + prefix(k2) + "]")
+ 236 other calls in file
GitHub: juliangruber/munich-js
277 278 279 280 281 282 283 284 285
}), pw.recent(opts.windowSize, opts.windowTime), pull.asyncMap(function (batch, cb) { db.batch(batch, cb) }), pull.drain(null, done) ) }
+ 5 other calls in file
338 339 340 341 342 343 344 345 346 347
pull.through(null, function() { queue = createQueue(); readyState = RS_DISCONNECTED; signaller('disconnected'); }), pull.drain(signaller._process) ); // pass the queue to the sink pull(queue, sink);
+ 3 other calls in file
GitHub: ssbc/ssb-db2
381 382 383 384 385 386 387 388 389
pull.map(updateMigratedSizeAndPluck), pull.map(toBIPF), pull.asyncMap(writeToNewLog), (drainAborter = config.db2.maxCpu ? drainGently(tooHotOpts(config), op, opDone) : pull.drain(op, opDone)) ) }) }
52 53 54 55 56 57 58 59 60 61
function fetchBottomBy (amt) { var lastmsg var renderedKeys = [] pull( opts.feed({ reverse: true, limit: amt||30, lt: cursor(botcursor) }), pull.drain(function (msg) { lastmsg = msg // filter if (opts.filter && !opts.filter(msg))
+ 3 other calls in file
91 92 93 94 95 96 97 98 99 100
module.exports = { createPersistentLonpgollStream: createPersistentLonpgollStream, init(api, rp) { const sink = new EventEmitter(); const drain = pull.drain(function(data) { sink.emit('data', data); }); pull( createPersistentLonpgollStream(api, rp),
GitHub: ssbc/ssb-db2
565 566 567 568 569 570 571 572 573 574
pull.filter((progress) => { console.log(progress) return progress === 1 }), pull.take(1), pull.drain(async () => { endMeasure(t, 'migrate (+db1)') await sleep(2000) // wait for new log FS writes to finalize sbot.close(true, () => ended.resolve()) })
+ 5 other calls in file
GitHub: Ahau-NZ/Ahau
27 28 29 30 31 32 33 34 35 36 37
queue, pullParaMap( (continuable, done) => continuable(done), 10 // width of parallel processing ), pull.drain() ) export default function (apollo) { const state = {
146 147 148 149 150 151 152 153 154 155
} } pull( sbot.metafeeds.branchStream({ old: false, live: true }), pull.drain((branch) => { const dentalFeed = branch.find((feed) => feed.purpose === 'dental') if (!dentalFeed) return t.equal(dentalFeed.purpose, 'dental', 'finds encrypted feed (live)')
+ 2 other calls in file
98 99 100 101 102 103 104 105 106
pull.filter(m => ( m.value.author === m.value.content.about && m.value.content.image )), pull.asyncMap((m, cb) => ssb.aboutSelf.get(m.value.author, cb)), pull.drain(about => { if (about.image) { ssb.blobs.has(about.image, (err, hasBlob) => { if (err) return console.warn(err)
GitHub: DiegoT2k/provasocial
172 173 174 175 176 177 178 179 180 181
old: true, dest: rootMessage.key, useBlocksFrom: rootMessage.value.author, types: ['post', 'about'] })), pull.drain(msg => { if (msg.sync) { // actually add container to DOM when we get sync on thread sync = true result.set(container)
+ 5 other calls in file
pull-stream.asyncMap is the most popular function in pull-stream (1458 examples)