How to use the merge function from rxjs

Find comprehensive JavaScript rxjs.merge code examples handpicked from public code repositorys.

rxjs.merge is an RxJS operator that combines multiple source observables into a single observable.

21
22
23
24
25
26
27
28
29
30

const exec = (cmd, args) => {
        // Use `Observable` support if merged https://github.com/sindresorhus/execa/pull/26
        const cp = execa(cmd, args);

        return merge(
                streamToObservable(cp.stdout.pipe(split()), {await: cp}),
                streamToObservable(cp.stderr.pipe(split()), {await: cp})
        ).pipe(filter(Boolean));
};
fork icon339
star icon0
watch icon2

22
23
24
25
26
27
28
29
30
31
const scriptGrep = require('./plugin-release/script-grep')

const exec = (cmd, args) => {
  const cp = execa(cmd, args)

  return merge(streamToObservable(cp.stdout.pipe(split())), streamToObservable(cp.stderr.pipe(split())), cp).pipe(filter(Boolean))
}

const buildDeleteCommands = async (pluginId = '', currentVersion = '') => {
  const deleteCommands = []
fork icon41
star icon115
watch icon9

How does rxjs.merge work?

rxjs.merge is an operator in the RxJS library that combines multiple observables into a single observable, emitting values from each observable in the order they are subscribed to, and completing only when all the merged observables complete. It is similar to the Promise.all method in JavaScript.

16
17
18
19
20
21
22
23
24
25
}

commands.map(command => command.close.pipe(
    take(this.tries),
    takeWhile(({ exitCode }) => exitCode !== 0)
)).map((failure, index) => Rx.merge(
    // Delay the emission (so that the restarts happen on time),
    // explicitly telling the subscriber that a restart is needed
    failure.pipe(delay(this.delay, this.scheduler), mapTo(true)),
    // Skip the first N emissions (as these would be duplicates of the above),
fork icon233
star icon0
watch icon1

+ 5 other calls in file

58
59
60
61
62
63
64
65
66
67
retryWhenStream$.subscribe(
  data => console.log('retry when stream', data),
  err => console.error('retry when error', err)
)

const streamCombined$ = merge(
  throwError('error'), // of(1,2,3),
  of(4,5,6),
  of(7,8,9)
);
fork icon9
star icon10
watch icon5

+ 3 other calls in file

Ai Example

1
2
3
4
5
6
7
8
9
10
import { merge, interval } from "rxjs";
import { mapTo } from "rxjs/operators";

const observable1 = interval(1000).pipe(mapTo("First"));
const observable2 = interval(2000).pipe(mapTo("Second"));
const observable3 = interval(3000).pipe(mapTo("Third"));

merge(observable1, observable2, observable3).subscribe((value) =>
  console.log(value)
);

In this example, rxjs.merge is used to combine three observables (observable1, observable2, and observable3) into a single observable. The interval operator is used to create an observable that emits values at regular intervals. The mapTo operator is used to map each emitted value to a string that identifies the observable. The merge function is then called with the three observables as arguments, and the resulting observable is subscribed to. When the merged observable emits a value, the value is logged to the console. The output will be a sequence of strings (First, Second, Third) that correspond to the observable that emitted the value.

137
138
139
140
141
142
143
144
145
146
        filter,
        scan
    } = rxOp
    var inc = source.pipe(filter(isPositive), map(returnPlus1));
    var dec = source.pipe(filter(isNegative), map(returnMinus1));
    var count = rxjs.merge(inc, dec).pipe(scan(addXY, 0));
    var label = rxjs.of('initial', 'Count is ');
    var view = rxjs.combineLatest(label, count).pipe(map(renderWithArgs));
    runners.runRx6(deferred, view);
}, options)
fork icon1
star icon49
watch icon6

+ 3 other calls in file

35
36
37
38
39
40
41
42
43
44
        callback(buffer.toString('binary'))
    )

const mergeHeartbeat = (heartbeatDelay, heartbeatValue) =>
    observable$ =>
        merge(
            observable$,
            timer(heartbeatDelay, heartbeatDelay).pipe(
                mapTo(heartbeatValue)
            )
fork icon5
star icon12
watch icon8

+ 3 other calls in file

96
97
98
99
100
101
102
103
104
105
    ),
    shareReplay()
  )
  this.unhandledBookings = new Subject()
  this.manualDispatchedBookings = new Subject()
  this.dispatchedBookings = merge(
    this.manualDispatchedBookings,
    dispatch(this.cars, this.unhandledBookings)
  ).pipe(share())
}
fork icon1
star icon16
watch icon0

28
29
30
31
32
33
34
35
36
37
const command = {
  ...Command().action(
    (system, scheduler) => base.pipe(
      take(1),
      mergeMap(() => {
        return merge(...commands.map(command => {
          const instance = scheduler.run(command)
          if(instance.lockingFailed){
            throw Error(instance)
          }
fork icon0
star icon1
watch icon7

+ 11 other calls in file

87
88
89
90
91
92
93
94
95
96
    topic: toAscii(result.topic)
  })));

  const obsErr = fromEvent(emitter, 'error').pipe(mergeMap(throwError));

  const obsSub = merge(obsData, obsErr);
  obsSub.shhSubscription = emitter;

  return obsSub;
}
fork icon533
star icon0
watch icon132

+ 3 other calls in file

152
153
154
155
156
157
158
159
160
161
}
logWaitingForWDeps(resources);

const resourcesCompleted$ = combineLatest(resources.map(createResourceWithDeps$));

merge(timeoutError$, resourcesCompleted$)
  .pipe(takeWhile((resourceStates) => resourceStates.some((x) => !x)))
  .subscribe({
    next: (resourceStates) => {
      lastResourcesState = resourceStates;
fork icon85
star icon0
watch icon11

+ 7 other calls in file

161
162
163
164
165
166
167
168
169
170
scan((acc, taxi) => acc.push(taxi) && acc, []),
debounceTime(1000),
tap((cars) => info('region taxis', cars.length)),
filter((taxis) => taxis.length > 0),
mergeMap((taxis) =>
  merge(this.manualBookings, this.unhandledBookings).pipe(
    bufferTime(5000, null, 100),
    filter((bookings) => bookings.length > 0),
    tap((bookings) =>
      info('Clustering taxi bookings', bookings.length, taxis.length)
fork icon1
star icon16
watch icon0

+ 5 other calls in file

94
95
96
97
98
99
100
101
102
103
  share()
)

experiment.passengerUpdates = experiment.passengers.pipe(
  mergeMap(({ deliveredEvents, pickedUpEvents }) =>
    merge(deliveredEvents, pickedUpEvents)
  ),
  catchError((err) => error('passengerUpdates', err)),
  share()
)
fork icon1
star icon16
watch icon0

+ 2 other calls in file

68
69
70
71
72
73
74
75
76
77

        return {winner};
    }),
);

const roundEnds$ = merge(roundEndsWinner$, roundEndsOfficiallyWinner$);

const get1vN = (rsh, winner) => {
    const ts = sh => (sh && sh.T && sh.T.length) || 0;
    const cts = sh => (sh && sh.CT && sh.CT.length) || 0;
fork icon0
star icon10
watch icon2

+ 3 other calls in file

55
56
57
58
59
60
61
62
63
64
        const move$ = fromEvent(ring, 'move');
        const err$ = fromEvent(ring, 'error').pipe(
                mergeMap((err) => throwError(err))
        );

        return merge(move$, err$);
}

async function main(){
        // pick the strategy to choose swim base finding strategy
fork icon0
star icon1
watch icon5

+ 5 other calls in file

88
89
90
91
92
93
94
95
96
97
98
        messageKey: 'tasks.status.canceled'
    })


task$.pipe(
    mergeMap(task => {
        const taskCancellation$ = merge(
            cancel$.pipe(
                filter(id => id === task.id),
                tap(() => log.debug(msg(task.id, 'cancelled by user'))),
            ),
fork icon45
star icon177
watch icon0

137
138
139
140
141
142
143
144
145
146
    .pipe(reduce((a, b) => a + b.population, 0))
    .toPromise(),
  packageVolumes: packageVolumes.find((e) => name.startsWith(e.name)),
  commercialAreas: commercialAreas,
  unhandledBookings: name.startsWith('Helsingborg')
    ? merge(bookings.hm, bookings.ikea)
    : of(),
  citizens,
})
return kommun
fork icon1
star icon16
watch icon0

72
73
74
75
76
77
78
79
80
81
    .pipe(
        tap(() => console.log(GRENADE_SHORT.DECOY)),
        map(e => formatGrenade(GRENADE_SHORT.DECOY, e)),
    );

const grenades$ = merge(hes$, flashbangs$, smokes$, molotovs$, decoys$);

demo.parse(fs.readFileSync(DEMO_FILE));

return grenades$.pipe(
fork icon0
star icon10
watch icon2

37
38
39
40
41
42
43
44
45
46
  return observable.pipe(operators.successOp, operators.errorOp);
};
var createMultiAjaxObservable = function createMultiAjaxObservable(ajax, ajaxConfigs, operators, action, ajaxBodyCreator, ajaxRequestInjector) {
  var observables = transformRequests(ajax, ajaxConfigs, action, ajaxBodyCreator, ajaxRequestInjector);
  if ((0, _typeUtil.exists)(operators.takeUntilOp)) {
    return _rxjs.merge.apply(void 0, _toConsumableArray(observables)).pipe(operators.successOp, operators.errorOp, operators.takeUntilOp);
  }
  return _rxjs.merge.apply(void 0, _toConsumableArray(observables)).pipe(operators.successOp, operators.errorOp);
};
var createForkJoinAjaxObservable = function createForkJoinAjaxObservable(ajax, ajaxConfigs, operators, action, ajaxBodyCreator, ajaxRequestInjector) {
fork icon0
star icon4
watch icon0

+ 7 other calls in file

75
76
77
78
79
80
81
82
83
84
  body: (constant) =>
    constant ? () => of({ body: constant }) :
      body => of({ body })
},
serve: (port) => (...pipeline) =>
  merge(start(port).pipe(skip(1)), ...pipeline).pipe(
    catchError((err, caught) => (console.error(err), caught))
  ),
start,
stop : () => server.close()
fork icon0
star icon2
watch icon3

53
54
55
56
57
58
59
60
61
62
exports.fromEvent = rxjs.fromEvent;
exports.fromEventPattern = rxjs.fromEventPattern;
exports.generate = rxjs.generate;
exports.iif = rxjs.iif;
exports.interval = rxjs.interval;
exports.merge = rxjs.merge;
exports.never = rxjs.never;
exports.of = rxjs.of;
exports.onErrorResumeNext = rxjs.onErrorResumeNext;
exports.pairs = rxjs.pairs;
fork icon299
star icon0
watch icon1