How to use the filter function from pull-stream

Find comprehensive JavaScript pull-stream.filter code examples handpicked from public code repositorys.

203
204
205
206
207
208
209
210
211
212
var downScrollAborter

function filterDownThrough () {
  return pull(
    downScrollAborter,
    pull.filter(msg => msg),
    pull.filter(followFilter),
    pull.filter(userFilter),
    pull.filter(rootFilter),
    pull.filter(channelFilter),
fork icon78
star icon382
watch icon30

+ 11 other calls in file

1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
  }

  return recordStream
}),
pull.flatten(),
pull.filter((record) => isValueOk([op], record.value)),
pull.through(() => {
  if (debugQuery.enabled)
    debugQuery(
      `live(${getNameFromOperation(op)}): 1 new msg`.replace(/%/g, '%% ')
fork icon6
star icon48
watch icon8

591
592
593
594
595
596
597
598
599
600
startMeasure(t, 'migrate (alone)')
sbot.db2migrate.start()

pull(
  sbot.db2migrate.progress(),
  pull.filter((progress) => progress === 1),
  pull.take(1),
  pull.drain(async () => {
    endMeasure(t, 'migrate (alone)')
    await sleep(2000) // wait for new log FS writes to finalize
fork icon6
star icon41
watch icon0

+ 5 other calls in file

245
246
247
248
249
250
251
252
253
254
  pull(
    pull.values(keystore.group.list()),
    paraMap(tribeGet, 4),
    opts.subtribes
      ? null
      : pull.filter(tribe => tribe.parentGroupId === undefined),
    pull.map(tribe => tribe.groupId),
    pull.collect(cb)
  )
})
fork icon4
star icon27
watch icon16

78
79
80
81
82
83
84
85
86
87
  pullParaMap((feedId, cb) => {
    getProfile(feedId)
      .then(profile => cb(null, profile))
      .catch(cb)
  }, 5),
  pull.filter(Boolean),
  // TODO: This removes the profiles that came back as null, we might want to show something in place of that
  // e.g. someone who hasnt opted in to publicWebHosting
  pull.collect((err, res) => err ? reject(err) : resolve(res))
)
fork icon2
star icon3
watch icon0

+ 20 other calls in file

93
94
95
96
97
98
99
100
101
102
ssb.db.query(
  where(type('about')),
  live({ old: true }),
  toPullStream()
),
pull.filter(m => (
  m.value.author === m.value.content.about &&
  m.value.content.image
)),
pull.asyncMap((m, cb) => ssb.aboutSelf.get(m.value.author, cb)),
fork icon2
star icon3
watch icon0

195
196
197
198
199
200
201
202
203
204
    where(and(isDecrypted('box2'), type('group/add-member'))),
    opts.live ? live({ old: true }) : null,
    toPullStream()
  ),
  pull.map((msg) => lodashGet(msg, 'value.content.recps', [])),
  pull.filter((recps) => recps.length > 1 && recps[0] === groupId),
  pull.map((recps) => recps.slice(1)),
  pull.flatten(),
  pull.unique()
)
fork icon1
star icon7
watch icon4

+ 131 other calls in file

60
61
62
63
64
65
66
67
68
69
        return cb()
      }
    })
  )
}),
pull.filter(),
pull.unique('id'),
pull.take(1),
pull.drain(
  (emptyFeed) => {
fork icon1
star icon7
watch icon0

+ 17 other calls in file

4
5
6
7
8
9
10
11
12
13
14
const pull = require('pull-stream')


module.exports = function countGroupFeeds(server, cb) {
  pull(
    server.metafeeds.branchStream({ old: true, live: false }),
    pull.filter((branch) => branch.length === 4),
    pull.map((branch) => branch[3]),
    pull.filter((feed) => feed.recps),
    pull.collect((err, feeds) => {
      if (err) return cb(err)
fork icon1
star icon7
watch icon0

96
97
98
99
100
101
102
103
104
105

pull(
  rpc.room.members({}),
  pullFlatMap(arr => arr),

  pull.filter(member => member.id !== ssb.id),

  /* Follow members (privately) */
  pullParaMap((member, cb) => {
    ssb.friends.isFollowing({ source: ssb.id, dest: member.id }, (err, isFollowing) => {
fork icon2
star icon3
watch icon6

815
816
817
818
819
820
821
822
823
824
  api.valid(msg_, function (err, msg_) {
    if (err) return cb(null, null)
    return cb(null, msg_.value)
  })
}),
pull.filter( (x) => x !== null ),
pull.drain(function (msgValue) {
  var author = msgValue.author
  var op = msgValue.content
  var tokenType = op.tokenType
fork icon0
star icon4
watch icon2

+ 489 other calls in file

21
22
23
24
25
26
27
28
29
30
      type: 'about', 
      name: { $prefix: author }
    } }
  } }]
}),
pull.filter((msg) => {
  return ref.isFeedId(msg.value.content.about) &&
        (typeof msg.value.content.name === "string")
}),
pull.collect(function (err, ary) {
fork icon0
star icon4
watch icon2

+ 69 other calls in file

36
37
38
39
40
41
42
43
44
45
          else if (args['alias-prefix'] && !value.startsWith(args['alias-prefix']))
            return cb(null, null)
          else return cb(null, { id: id, name: value })
        })
    }),
    pull.filter((obj) => obj !== null)
  )
} else {
  list = pull(pull.values(ids), pull.map((id) => ({ id: id })))
}
fork icon0
star icon4
watch icon2

+ 69 other calls in file

47
48
49
50
51
52
53
54
55
56
),
pull.map((pair) => ({
  path: pair[0],
  isDirectory: pair[1].isDirectory()
})),
pull.filter((file) => !file.isDirectory),
pull.map((file) => ({
  path: file.path.substring(index, file.path.length),
  originalPath: file.path
})),
fork icon0
star icon3
watch icon3

+ 3 other calls in file

177
178
179
180
181
182
183
184
185
186
  }

  // @ts-ignore
  return pull(
    chunkedStream,
    pull.filter((chunk) => chunk.length > 0),
    pull.map(pull.values),
    pull.flatten()
  )
}
fork icon0
star icon2
watch icon1

60
61
62
63
64
65
66
67
68
getResume: (item) => {
  return item.timestamp
},
filterMap: pull(
  // CHECK IF IS MENTION
  pull.filter(bumpFilter),

  // LOOKUP AND ADD ROOTS
  LookupRoots({ ssb, cache }),
fork icon0
star icon0
watch icon1

+ 11 other calls in file

25
26
27
28
29
30
31
32
33
34
latest: function () {
  return pull(
    ssb.createFeedStream({ live: true, old: false, awaitReady: false }),

    ApplyFilterResult({ ssb }),
    pull.filter(msg => !!msg.filterResult),

    LookupRoots({ ssb, cache }),

    FilterPrivateRoots(),
fork icon0
star icon0
watch icon1

+ 29 other calls in file

25
26
27
28
29
30
31
32
33
34
    pull(
      // send truncated marker for resuming search
      pull.values([marker]),

      // don't emit the resume if we're at the end of the stream
      pull.filter(() => count === limit)
    )
  ])
} else {
  return pull(
fork icon0
star icon0
watch icon1

+ 11 other calls in file