How to use the asyncMap function from pull-stream

Find comprehensive JavaScript pull-stream.asyncMap code examples handpicked from public code repositorys.

pull-stream.asyncMap is a function in the pull-stream library that applies an asynchronous transformation to each item in a pull-stream.

1547
1548
1549
1550
1551
1552
1553
1554
1555
1556

let recordStream
if (seqStream) {
  recordStream = pull(
    seqStream,
    pull.asyncMap((seq, cb) => {
      ensureIndexSync({ data: { indexName: 'seq' } }, () => {
        getRecord(seq, cb)
      })
    })
fork icon6
star icon48
watch icon8

775
776
777
778
779
780
781
782
783
  where(author(feedId)),
  asOffsets(),
  batch(1000),
  toPullStream()
),
pull.asyncMap(log.del),
pull.onEnd((err) => {
  // prettier-ignore
  if (err) return cb(new Error('deleteFeed() failed for feed ' + feedId, {cause: err}))
fork icon6
star icon41
watch icon6

+ 49 other calls in file

How does pull-stream.asyncMap work?

pull-stream.asyncMap works by taking two arguments: a function that performs an asynchronous transformation on each item in the pull-stream, and a pull-stream source. The function creates a new pull-stream that is a copy of the source stream, but with the asynchronous transformation applied to each item in the stream. To achieve this, pull-stream.asyncMap uses a pull-stream through that wraps the original source stream and applies the asynchronous transformation using a callback function. The asynchronous transformation function takes two arguments: the item from the original source stream, and a callback function that is used to pass the transformed item to the next stage of the pull-stream. The function must call the callback function when it is finished transforming the item, or else the pull-stream will become stuck and not progress. pull-stream.asyncMap is useful when you need to apply an asynchronous transformation to each item in a pull-stream. For example, you might use pull-stream.asyncMap to transform a stream of text data into a stream of parsed JSON objects, or to make an API call for each item in the stream and return the results. Note that pull-stream.asyncMap is a part of the pull-stream library, which is a streaming library for Node.js and the browser. It is designed to work with other pull-stream functions to create powerful and flexible stream processing pipelines.

378
379
380
381
382
383
384
385
386
387

pull(
  oldLog.getStream({ gt: migratedOffset }),
  pull.map(updateMigratedSizeAndPluck),
  pull.map(toBIPF),
  pull.asyncMap(writeToNewLog),
  (drainAborter = config.db2.maxCpu
    ? drainGently(tooHotOpts(config), op, opDone)
    : pull.drain(op, opDone))
)
fork icon6
star icon41
watch icon0

300
301
302
303
304
305
306
307
308
}

startMeasure(t, 'add 1000 box1 msgs')
pull(
  pull.values(contents),
  pull.asyncMap(sbot.db.publish),
  pull.collect((err, msgs) => {
    endMeasure(t, 'add 1000 box1 msgs')
    if (err) t.fail(err)
fork icon6
star icon41
watch icon0

+ 3 other calls in file

Ai Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const pull = require("pull-stream");
const fetch = require("node-fetch");

// Define a pull-stream source
const source = pull.values(["apple", "banana", "orange"]);

// Define an asynchronous transformation function
async function getLength(item) {
  const response = await fetch(`https://api.example.com/length/${item}`);
  const json = await response.json();
  return json.length;
}

// Create a new pull-stream with the asynchronous transformation applied
const transformedStream = pull(source, pull.asyncMap(getLength));

// Consume the transformed stream
pull(transformedStream, pull.log());

In this example, we start by importing the pull-stream library and the node-fetch library, which is used to make HTTP requests. We then define a pull-stream source called source that contains three strings: 'apple', 'banana', and 'orange'. Next, we define an asynchronous transformation function called getLength. The function takes an item from the source stream and makes an API call to retrieve the length of the item from a hypothetical API. The function returns the length as a promise. We then use pull.asyncMap to create a new pull-stream called transformedStream that applies the getLength function to each item in the source stream. Finally, we use pull.log to consume the transformedStream and log each transformed item to the console. The output shows the length of each string in the source stream, as retrieved from the hypothetical API. Note that this is just an example, and the actual transformation function will depend on the specific use case. However, the basic pattern of using pull.asyncMap to apply an asynchronous transformation to each item in a pull-stream is the same.

205
206
207
208
209
210
211
212
213
}

function listInvites() {
  return pull(
    pull.values([0]), // dummy value used to kickstart the stream
    pull.asyncMap((n, cb) => {
      ssb.metafeeds.findOrCreate((err, myRoot) => {
        // prettier-ignore
        if (err) return cb(clarify(err, 'Failed to get root metafeed when listing invites'))
fork icon1
star icon7
watch icon4

+ 32 other calls in file

272
273
274
275
276
277
278
279
280
281
      value : e.value,
      type  : e.value == null ? 'del' : 'put'
    }
  }),
  pw.recent(opts.windowSize, opts.windowTime),
  pull.asyncMap(function (batch, cb) {
    db.batch(batch, cb)
  }),
  pull.drain(null, done)
)
fork icon1
star icon7
watch icon4

+ 3 other calls in file

97
98
99
100
101
102
103
104
105
106
),
pull.filter(m => (
  m.value.author === m.value.content.about &&
  m.value.content.image
)),
pull.asyncMap((m, cb) => ssb.aboutSelf.get(m.value.author, cb)),
pull.drain(about => {
  if (about.image) {
    ssb.blobs.has(about.image, (err, hasBlob) => {
      if (err) return console.warn(err)
fork icon2
star icon3
watch icon0

187
188
189
190
191
192
193
194
195
196
pull.filter(
  (rootMsg) => rootMsg.value.content?.tangles?.group?.root === null
),

// see if there are and members added
pull.asyncMap((rootMsg, cb) => {
  // return rootMsg if empty group, otherwise return false
  let foundMember = false
  pull(
    ssb.db.query(
fork icon1
star icon7
watch icon0

+ 2 other calls in file

135
136
137
138
139
140
141
142
143
144
}

function listGroupIds(opts = {}) {
  return pull(
    pull.values([0]),
    pull.asyncMap((_, cb) => {
      keyringReady.onReady(() => {
        return cb(null, keyring.group.list({ live: !!opts.live }))
      })
    }),
fork icon0
star icon5
watch icon4

+ 81 other calls in file

66
67
68
69
70
71
72
73
74
75
  })
  console.log(output)
} else if (args.json) {
  pull(
    pull.values(keys),
    pull.asyncMap((msgId,cb) => ssb.get({ id: msgId, meta: true}, cb)),
    pull.collect(function (err, msgs) {
      var output = {}
      msgs.forEach(function (msg) {
        output[msg.key] = msg
fork icon0
star icon4
watch icon2

+ 236 other calls in file

484
485
486
487
488
489
490
491
492
493

// Get and check all sources
pull(
  pull.values(op.sources),
  pull.asyncMap(get),
  pull.asyncMap((src, cb) => {
    var sId = src.msg.key
    var sOp = src.msg.value.content

    // Valid
fork icon0
star icon4
watch icon2

+ 839 other calls in file

51
52
53
54
55
56
57
58
59
60
                      Object.keys(types).length + 
                      " results"))

pull(
  pull.values(types),
  pull.asyncMap(function (tok, cb) {
    ssb.about.socialValue(
      { key: 'name', dest: tok.author}, function (err, name) {
        tok.author = { 
          id: tok.author
fork icon0
star icon4
watch icon2

+ 69 other calls in file

8
9
10
11
12
13
14
15
16
17
pull(
  pull.infinite(function () {
    return 'hello @ ' + Date.now()
  }),
  // throttle so it doesn't go nuts
  pull.asyncMap(function (value, cb) {
    setTimeout(function () {
      cb(null, value)
    }, 100)
  }),
fork icon27
star icon0
watch icon2

36
37
38
39
40
41
42
43
44
45
pull.infinite(),
pull.map(function (e) {
  c((c()||0)+1)
  return {random: e, count: c(), top: top}
}),
pull.asyncMap(function (e, cb) {
  var delay = 100 + 200*Math.random()
  setTimeout(function () {
    e.delay = delay
    cb(null, e)
fork icon3
star icon0
watch icon2