How to use pull-stream

Comprehensive pull-stream code examples:

How to use pull-stream.empty:

125
126
127
128
129
130
131
132
133
```txt
        empty()
------------------------
```
```js
const source = pull.empty()
const sink = pull.log()
pull(source, sink)
```

How to use pull-stream.error:

137
138
139
140
141
142
143
144
145
```txt
        error()
--e---------------------
```
```js
const source = pull.error()
const sink = pull.log()
pull(source, sink)
```

How to use pull-stream.addPipe:

235
236
237
238
239
240
241
242
243
244
    read(ended = true, cbs.length ? cbs.shift() : function () {})
  else if(cbs.length)
    cbs.shift()(true)      
}

s.source = pull.addPipe(function (end, cb) {
  if(input.length) {
    cb(null, input.shift())
    if(!input.length)
      s.emit('drain')

How to use pull-stream.concat:

8
9
10
11
12
13
14
15
16
17
18
const goTestData = 'test data from go'


function readWrite (stream) {
  pull(
    stream,
    pull.concat((err, data) => {
      if (err) {
        throw err
      }
      let offset = 0

How to use pull-stream.Source:

182
183
184
185
186
187
188
189
190
191
    cb(abort)
  }
  if(!n) next()
}

module.exports = pull.Source(function (streams) {
  return function (abort, cb) {
    ;(function next () {
      if(abort)
        all(streams, abort, cb)

How to use pull-stream.count:

113
114
115
116
117
118
119
120
121
```txt
        count(6)
0---1---2---3---4---5---6
```
```js
const source = pull.count(6)
const sink = pull.log()
pull(source, sink)
```

How to use pull-stream.infinite:

151
152
153
154
155
156
157
158
159
160
    infinite(() => i++)
--1--2--3--4--5--6--7--8--
```
```js
var i = 0
const source = pull.infinite(() => i++)
const sink = pull.log()
pull(source, sink)
```
#### keys

How to use pull-stream.log:

114
115
116
117
118
119
120
121
122
123
        count(6)
0---1---2---3---4---5---6
```
```js
const source = pull.count(6)
const sink = pull.log()
pull(source, sink)
```

#### empty

How to use pull-stream.read:

140
141
142
143
144
145
146
147
148
149
    return toStream(null, pull.live(db, opts))

  opts.onSync = function () {
    ts.emit('sync')
  }
  return ts = toStream(null, pull.read(db, opts))
}

module.exports.install = function (db) {
  db.methods = db.methods || {}

How to use pull-stream.live:

135
136
137
138
139
140
141
142
143
144
var liveStream = module.exports = function (db, opts) {
  var ts
  opts = opts || {}
  opts.tail = opts.tail !== false
  if(opts.old === false)
    return toStream(null, pull.live(db, opts))

  opts.onSync = function () {
    ts.emit('sync')
  }

How to use pull-stream.through:

239
240
241
242
243
244
245
246
247
248
      l.push(ch.key)
    else
      l.push(ch)
  })

  return pull(l, pull.through(null, cleanup))

}

exports.read =

How to use pull-stream.defer:

3
4
5
6
7
8
9
10
11
12
13
14
// Currently this uses pull streams,
// and not levelup's readstream, but in theory
// I should be able pretty much just drop that in.


module.exports = function pullReadStream (options, makeData) {
  var stream = pull.defer()


  stream.setIterator = function (iterator) {
    stream.resolve(function (end, cb) {
      if(!end) iterator.next(function (err, key, value) {

How to use pull-stream.pull:

355
356
357
358
359
360
361
362
363
364
let decoder
let cbp

source.first = () => {
        let result
        return pull(
                source,
                drain((i) => {
                        result = i
                        return false

How to use pull-stream.reduce:

926
927
928
929
930
931
932
933
934
935
    } 
    return cb(null, null)
  })
}),
pull.filter( (x) => x !== null ),
pull.reduce(
  (unspent, s_) => unspent.sub(Decimal(s_.amount)),
  Decimal(op.amount), 
  function (err, unspent) {
    if (err) return cb(err)

How to use pull-stream.take:

208
209
210
211
212
213
214
215
216
   take(3)
--1---2--3|
```
```js
const source = pull.count()
const through = pull.take(3)
const sink = pull.log()
pull(source, through, sink)
```

How to use pull-stream.unique:

98
99
100
101
102
103
104
105
106
107
  descending(), // latest => oldest
  toPullStream()
),
pull.filter(m => m.value.author === m.value.content.about),
pull.map(m => m.value.author),
pull.unique(),
pullParaMap((feedId, cb) => {
  getProfile(feedId)
    .then(profile => cb(null, profile))
    .catch(err => cb(err))

How to use pull-stream.onEnd:

776
777
778
779
780
781
782
783
784
785
  asOffsets(),
  batch(1000),
  toPullStream()
),
pull.asyncMap(log.del),
pull.onEnd((err) => {
  // prettier-ignore
  if (err) return cb(new Error('deleteFeed() failed for feed ' + feedId, {cause: err}))

  state.delete(feedId)

How to use pull-stream.flatten:

1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
    recordStream = toPull(log.stream(opts))
  }

  return recordStream
}),
pull.flatten(),
pull.filter((record) => isValueOk([op], record.value)),
pull.through(() => {
  if (debugQuery.enabled)
    debugQuery(

How to use pull-stream.collect:

634
635
636
637
638
639
640
641
642
643
  fromDB(db),
  where(not(slowEqual('value.author', alice.id))),
  paginate(2),
  toPullStream()
),
pull.collect((err, pages) => {
  t.error(err, 'toPullStream got no error')
  t.equal(pages.length, 1, 'toPullStream got one page')
  const msgs = pages[0]
  t.equal(msgs.length, 1, 'page has one messages')

How to use pull-stream.map:

190
191
192
193
194
195
196
197
198
   map((i) => i * 10)
--10--30-50----70-----
```
```js
const source = pull.values([ 1, 3, 5, 7])
const through = pull.map((i) => i * 10)
const sink = pull.log()
pull(source, through, sink)
```

How to use pull-stream.drain:

1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
          })
        )
      ),
      toPullStream()
    ),
    (drainer = pull.drain((msg) => {
      t.fail('should not drain yet')
    }))
  )
})

How to use pull-stream.values:

172
173
174
175
176
177
178
179
180
181
-----1--2----3----4--------
         flatten()
-----1--2----3-1--4-2---3--
```
```js
const source = pull.values([
  pull.values([ 1, 2, 3, 4 ]),
  pull.values([ 1, 2, 3 ])
])
const sink = pull.log()

How to use pull-stream.filter:

203
204
205
206
207
208
209
210
211
212
var downScrollAborter

function filterDownThrough () {
  return pull(
    downScrollAborter,
    pull.filter(msg => msg),
    pull.filter(followFilter),
    pull.filter(userFilter),
    pull.filter(rootFilter),
    pull.filter(channelFilter),

How to use pull-stream.asyncMap:

1547
1548
1549
1550
1551
1552
1553
1554
1555
1556

let recordStream
if (seqStream) {
  recordStream = pull(
    seqStream,
    pull.asyncMap((seq, cb) => {
      ensureIndexSync({ data: { indexName: 'seq' } }, () => {
        getRecord(seq, cb)
      })
    })