How to use the mergeMap function from rxjs

Find comprehensive JavaScript rxjs.mergeMap code examples handpicked from public code repositorys.

42
43
44
45
46
47
48
49
50
51
limiterService,
limiter$: (observable$, id = uuid(), username) => {
    const stop$ = new Subject()
    return service.submit$(limiterService, {id, username: username || 'ANON'}).pipe(
        takeUntil(stop$),
        mergeMap(() => observable$.pipe(
            finalize(() => stop$.next())
        ))
    )
}
fork icon45
star icon178
watch icon34

73
74
75
76
77
78
79
80
81
82
const initialState = files => getProgress({files})

const downloadFiles$ = ({files, prefix, downloadDir, deleteAfterDownload}) => {
    return of(files).pipe(
        switchMap(files => of(...files)),
        mergeMap(file => downloadFile$({file, prefix, downloadDir, deleteAfterDownload}), CONCURRENT_FILE_DOWNLOAD),
        scan((currentProgress, fileProgress) => getProgress({
            files,
            currentProgress,
            fileProgress
fork icon45
star icon178
watch icon34

28
29
30
31
32
33
34
35
36
37
    complete: () => log.fatal('Pool lock stream completed')
})

unlock$.pipe(
    tap(instance => instance.locked = false),
    mergeMap(instance =>
        timer(maxIdleMilliseconds).pipe(
            takeUntil(lock$.pipe(
                filter(currentInstance => currentInstance === instance)
            )),
fork icon45
star icon178
watch icon34

+ 5 other calls in file

5
6
7
8
9
10
11
12
13
14
const RETRY_DELAY_MS = 10000

const retry = delay => pipe(
    retryWhen(error$ =>
        error$.pipe(
            mergeMap(
                error => {
                    log.warn(`Reconnecting in ${delay}ms after error: ${error}`)
                    return timer(delay)
                }
fork icon45
star icon178
watch icon34

87
88
89
90
91
92
93
94
95
96
97
        defaultMessage: 'Stopped',
        messageKey: 'tasks.status.canceled'
    })


task$.pipe(
    mergeMap(task => {
        const taskCancellation$ = merge(
            cancel$.pipe(
                filter(id => id === task.id),
                tap(() => log.debug(msg(task.id, 'cancelled by user'))),
fork icon45
star icon177
watch icon0

64
65
66
67
68
69
70
71
72
73
  op.filter(tx => {
    console.log(tx);
    return tx.transactionInfo !== undefined && tx.transactionInfo.hash === signedLockTx.hash;
  }),
  op.delay(5000),
  op.mergeMap(_ => {
    return transactionHttp.announceAggregateBonded(signedAggregateTx);
  })
).subscribe(
  (x) => {
fork icon5
star icon5
watch icon3

79
80
81
82
83
84
85
86
87
/**
 * Static map objects.
 */

this.measureStations = kommuner.pipe(
  mergeMap((kommun) => kommun.measureStations)
)

this.postombud = kommuner.pipe(mergeMap((kommun) => kommun.postombud))
fork icon1
star icon16
watch icon0

+ 41 other calls in file

15
16
17
18
19
20
21
22
23
24
25
    retry(1),
    )


range(10)
.pipe(
    mergeMap(f, 2),
)
.subscribe({
    next: v => console.info('next value ', v),
    error: e => console.error('error happened', e),
fork icon0
star icon0
watch icon0

+ 2 other calls in file

5
6
7
8
9
10
11
12
13
14
15


return of(1) // like Bacon.once()
.pipe(
    mergeMap(_ => from(fetch(`https://httpstat.us/${status}`))),
    tap(a => console.info(a.url, status)),
    mergeMap(e => {
        if (e.status === 401) {
            // throw new Error(e.statusText)
            return of(1)
                .pipe(
fork icon0
star icon0
watch icon0