How to use the merge function from rxjs
Find comprehensive JavaScript rxjs.merge code examples handpicked from public code repositorys.
rxjs.merge is an RxJS operator that combines multiple source observables into a single observable.
21 22 23 24 25 26 27 28 29 30
const exec = (cmd, args) => { // Use `Observable` support if merged https://github.com/sindresorhus/execa/pull/26 const cp = execa(cmd, args); return merge( streamToObservable(cp.stdout.pipe(split()), {await: cp}), streamToObservable(cp.stderr.pipe(split()), {await: cp}) ).pipe(filter(Boolean)); };
22 23 24 25 26 27 28 29 30 31
const scriptGrep = require('./plugin-release/script-grep') const exec = (cmd, args) => { const cp = execa(cmd, args) return merge(streamToObservable(cp.stdout.pipe(split())), streamToObservable(cp.stderr.pipe(split())), cp).pipe(filter(Boolean)) } const buildDeleteCommands = async (pluginId = '', currentVersion = '') => { const deleteCommands = []
How does rxjs.merge work?
rxjs.merge is an operator in the RxJS library that combines multiple observables into a single observable, emitting values from each observable in the order they are subscribed to, and completing only when all the merged observables complete. It is similar to the Promise.all method in JavaScript.
16 17 18 19 20 21 22 23 24 25
} commands.map(command => command.close.pipe( take(this.tries), takeWhile(({ exitCode }) => exitCode !== 0) )).map((failure, index) => Rx.merge( // Delay the emission (so that the restarts happen on time), // explicitly telling the subscriber that a restart is needed failure.pipe(delay(this.delay, this.scheduler), mapTo(true)), // Skip the first N emissions (as these would be duplicates of the above),
+ 5 other calls in file
58 59 60 61 62 63 64 65 66 67
retryWhenStream$.subscribe( data => console.log('retry when stream', data), err => console.error('retry when error', err) ) const streamCombined$ = merge( throwError('error'), // of(1,2,3), of(4,5,6), of(7,8,9) );
+ 3 other calls in file
Ai Example
1 2 3 4 5 6 7 8 9 10
import { merge, interval } from "rxjs"; import { mapTo } from "rxjs/operators"; const observable1 = interval(1000).pipe(mapTo("First")); const observable2 = interval(2000).pipe(mapTo("Second")); const observable3 = interval(3000).pipe(mapTo("Third")); merge(observable1, observable2, observable3).subscribe((value) => console.log(value) );
In this example, rxjs.merge is used to combine three observables (observable1, observable2, and observable3) into a single observable. The interval operator is used to create an observable that emits values at regular intervals. The mapTo operator is used to map each emitted value to a string that identifies the observable. The merge function is then called with the three observables as arguments, and the resulting observable is subscribed to. When the merged observable emits a value, the value is logged to the console. The output will be a sequence of strings (First, Second, Third) that correspond to the observable that emitted the value.
GitHub: langhuihui/rx4rx
137 138 139 140 141 142 143 144 145 146
filter, scan } = rxOp var inc = source.pipe(filter(isPositive), map(returnPlus1)); var dec = source.pipe(filter(isNegative), map(returnMinus1)); var count = rxjs.merge(inc, dec).pipe(scan(addXY, 0)); var label = rxjs.of('initial', 'Count is '); var view = rxjs.combineLatest(label, count).pipe(map(renderWithArgs)); runners.runRx6(deferred, view); }, options)
+ 3 other calls in file
GitHub: lpaolini/AccentaG4
35 36 37 38 39 40 41 42 43 44
callback(buffer.toString('binary')) ) const mergeHeartbeat = (heartbeatDelay, heartbeatValue) => observable$ => merge( observable$, timer(heartbeatDelay, heartbeatDelay).pipe( mapTo(heartbeatValue) )
+ 3 other calls in file
96 97 98 99 100 101 102 103 104 105
), shareReplay() ) this.unhandledBookings = new Subject() this.manualDispatchedBookings = new Subject() this.dispatchedBookings = merge( this.manualDispatchedBookings, dispatch(this.cars, this.unhandledBookings) ).pipe(share()) }
GitHub: USU-Robosub/submarine
28 29 30 31 32 33 34 35 36 37
const command = { ...Command().action( (system, scheduler) => base.pipe( take(1), mergeMap(() => { return merge(...commands.map(command => { const instance = scheduler.run(command) if(instance.lockingFailed){ throw Error(instance) }
+ 11 other calls in file
87 88 89 90 91 92 93 94 95 96
topic: toAscii(result.topic) }))); const obsErr = fromEvent(emitter, 'error').pipe(mergeMap(throwError)); const obsSub = merge(obsData, obsErr); obsSub.shhSubscription = emitter; return obsSub; }
+ 3 other calls in file
GitHub: jeffbski/wait-on
152 153 154 155 156 157 158 159 160 161
} logWaitingForWDeps(resources); const resourcesCompleted$ = combineLatest(resources.map(createResourceWithDeps$)); merge(timeoutError$, resourcesCompleted$) .pipe(takeWhile((resourceStates) => resourceStates.some((x) => !x))) .subscribe({ next: (resourceStates) => { lastResourcesState = resourceStates;
+ 7 other calls in file
161 162 163 164 165 166 167 168 169 170
scan((acc, taxi) => acc.push(taxi) && acc, []), debounceTime(1000), tap((cars) => info('region taxis', cars.length)), filter((taxis) => taxis.length > 0), mergeMap((taxis) => merge(this.manualBookings, this.unhandledBookings).pipe( bufferTime(5000, null, 100), filter((bookings) => bookings.length > 0), tap((bookings) => info('Clustering taxi bookings', bookings.length, taxis.length)
+ 5 other calls in file
94 95 96 97 98 99 100 101 102 103
share() ) experiment.passengerUpdates = experiment.passengers.pipe( mergeMap(({ deliveredEvents, pickedUpEvents }) => merge(deliveredEvents, pickedUpEvents) ), catchError((err) => error('passengerUpdates', err)), share() )
+ 2 other calls in file
68 69 70 71 72 73 74 75 76 77
return {winner}; }), ); const roundEnds$ = merge(roundEndsWinner$, roundEndsOfficiallyWinner$); const get1vN = (rsh, winner) => { const ts = sh => (sh && sh.T && sh.T.length) || 0; const cts = sh => (sh && sh.CT && sh.CT.length) || 0;
+ 3 other calls in file
GitHub: hatchapp/game-server
55 56 57 58 59 60 61 62 63 64
const move$ = fromEvent(ring, 'move'); const err$ = fromEvent(ring, 'error').pipe( mergeMap((err) => throwError(err)) ); return merge(move$, err$); } async function main(){ // pick the strategy to choose swim base finding strategy
+ 5 other calls in file
GitHub: openforis/sepal
88 89 90 91 92 93 94 95 96 97 98
messageKey: 'tasks.status.canceled' }) task$.pipe( mergeMap(task => { const taskCancellation$ = merge( cancel$.pipe( filter(id => id === task.id), tap(() => log.debug(msg(task.id, 'cancelled by user'))), ),
137 138 139 140 141 142 143 144 145 146
.pipe(reduce((a, b) => a + b.population, 0)) .toPromise(), packageVolumes: packageVolumes.find((e) => name.startsWith(e.name)), commercialAreas: commercialAreas, unhandledBookings: name.startsWith('Helsingborg') ? merge(bookings.hm, bookings.ikea) : of(), citizens, }) return kommun
72 73 74 75 76 77 78 79 80 81
.pipe( tap(() => console.log(GRENADE_SHORT.DECOY)), map(e => formatGrenade(GRENADE_SHORT.DECOY, e)), ); const grenades$ = merge(hes$, flashbangs$, smokes$, molotovs$, decoys$); demo.parse(fs.readFileSync(DEMO_FILE)); return grenades$.pipe(
37 38 39 40 41 42 43 44 45 46
return observable.pipe(operators.successOp, operators.errorOp); }; var createMultiAjaxObservable = function createMultiAjaxObservable(ajax, ajaxConfigs, operators, action, ajaxBodyCreator, ajaxRequestInjector) { var observables = transformRequests(ajax, ajaxConfigs, action, ajaxBodyCreator, ajaxRequestInjector); if ((0, _typeUtil.exists)(operators.takeUntilOp)) { return _rxjs.merge.apply(void 0, _toConsumableArray(observables)).pipe(operators.successOp, operators.errorOp, operators.takeUntilOp); } return _rxjs.merge.apply(void 0, _toConsumableArray(observables)).pipe(operators.successOp, operators.errorOp); }; var createForkJoinAjaxObservable = function createForkJoinAjaxObservable(ajax, ajaxConfigs, operators, action, ajaxBodyCreator, ajaxRequestInjector) {
+ 7 other calls in file
GitHub: lbovet/makine
75 76 77 78 79 80 81 82 83 84
body: (constant) => constant ? () => of({ body: constant }) : body => of({ body }) }, serve: (port) => (...pipeline) => merge(start(port).pipe(skip(1)), ...pipeline).pipe( catchError((err, caught) => (console.error(err), caught)) ), start, stop : () => server.close()
GitHub: caosbad/api
53 54 55 56 57 58 59 60 61 62
exports.fromEvent = rxjs.fromEvent; exports.fromEventPattern = rxjs.fromEventPattern; exports.generate = rxjs.generate; exports.iif = rxjs.iif; exports.interval = rxjs.interval; exports.merge = rxjs.merge; exports.never = rxjs.never; exports.of = rxjs.of; exports.onErrorResumeNext = rxjs.onErrorResumeNext; exports.pairs = rxjs.pairs;