How to use the forkJoin function from rxjs

Find comprehensive JavaScript rxjs.forkJoin code examples handpicked from public code repositorys.

rxjs.forkJoin is an operator in RxJS that combines multiple observables and waits for them all to complete before emitting their last emitted values as an array.

172
173
174
175
176
177
178
179
180
181
        rxop.reduce((acc, r) => acc.concat(r), []),
        rxop.tap(() => demo.cancel()),
        rxop.catchError((e) => console.error(e)),
    );
});
return forkJoin(parsers).pipe(
    rxop.map(results => ({
        results,
        UNPARSED_DEMOS_BATCH,
        demoBatchIdx,
fork icon0
star icon10
watch icon2

46
47
48
49
50
51
52
53
54
55
exports.bindNodeCallback = rxjs.bindNodeCallback;
exports.combineLatest = rxjs.combineLatest;
exports.concat = rxjs.concat;
exports.defer = rxjs.defer;
exports.empty = rxjs.empty;
exports.forkJoin = rxjs.forkJoin;
exports.from = rxjs.from;
exports.fromEvent = rxjs.fromEvent;
exports.fromEventPattern = rxjs.fromEventPattern;
exports.generate = rxjs.generate;
fork icon299
star icon0
watch icon1

How does rxjs.forkJoin work?

rxjs.forkJoin is an operator in RxJS that takes multiple source observables and waits until all of them have emitted at least one value before subscribing to them. Once all the source observables complete, it emits an array of the last emitted value from each source observable. If any of the source observables throws an error, forkJoin will immediately terminate and emit an error. If the source observables emit values at different times, forkJoin will wait until all of them have emitted at least one value before emitting an array of their last emitted values. If any source observable completes before all the others have emitted a value, forkJoin will hold on to the emitted value until it receives a value from all other source observables or until they complete. forkJoin can also accept an object instead of an array of observables, where the keys of the object will be used to construct the resulting array. In this case, the object values should be observables. forkJoin will only emit a value once, so it's not suitable for observing changes to the source observables over time. It's best used when you want to wait for multiple observables to complete before performing a task that requires their results.

197
198
199
200
201
202
203
204
205
206

return {
    getImage$: function () {
        return ccdc$.pipe(
            switchMap(({ccdc}) =>
                forkJoin({
                    segmentsImage: ccdc.getImage$(),
                    geometry: ccdc.getGeometry$()
                }).pipe(
                    switchMap(({segmentsImage, geometry}) => getObservations$(geometry).pipe(
fork icon45
star icon177
watch icon0

578
579
580
581
582
583
584
585
586
587
  }
}

textbookLevelContentMetrics(collectedData) {
  return new Promise((resolve, reject) => {
    forkJoin(..._.map(collectedData, data => this.getProgramDetails(data.program_id))).subscribe(details => {
    try {
      const contentTypes = details.length ? _.uniq(_.compact(..._.map(details, (model) => {
        if (model && (!_.isEmpty(model.dataValues.content_types)))
            return  model.content_types
fork icon35
star icon0
watch icon6

+ 26 other calls in file

Ai Example

1
2
3
4
5
6
7
8
9
10
11
import { forkJoin, of } from "rxjs";
import { delay } from "rxjs/operators";

const observable1$ = of("Hello").pipe(delay(3000));
const observable2$ = of("RxJS!").pipe(delay(1000));

const forkJoin$ = forkJoin([observable1$, observable2$]);

forkJoin$.subscribe(([res1, res2]) => {
  console.log(`${res1} ${res2}`);
});

In this example, we create two observables observable1$ and observable2$ that each emit a string after a specified delay using the delay operator. We then pass an array containing these observables to the forkJoin function, which returns a new observable forkJoin$. When we subscribe to forkJoin$, we use array destructuring to extract the results of the two observables in the array passed to forkJoin. In this case, the result of observable2$ is emitted first, and the result of observable1$ is emitted after a longer delay. Once both observables have emitted their values, the forkJoin$ observable emits an array containing both values, and we log the concatenated string to the console.

182
183
184
185
186
187
188
189
190
191
const tasks = lines.map(l => {

    return of(createLineObj(l)).pipe(
        concatMap(line => {

            return forkJoin([
                getVehicleDescriptionByPlateID(line.PLATE_ID, pgClient),
                getViolationByViolationCode(line.VIOLATION_CODE, pgClient),
                getTicketBySummonsNumberAndYear(line.SUMMONS_NUMBER, line.YEAR, pgClient)
            ]).pipe(
fork icon1
star icon0
watch icon1

+ 3 other calls in file

83
84
85
86
87
88
89
90
91
92
        takeUntil(gameEnds$),
        reduce((acc, v) => acc.concat(v), []),
    );
});

forkJoin(parsers$).subscribe(
    results => {
        let allGrenades = results.reduce((acc, r) => acc.concat(r), []);

        let data = _.mapValues(_.groupBy(allGrenades, 'type'), gs => gs.map(g => {
fork icon0
star icon10
watch icon2

631
632
633
634
635
636
637
638
639
640
        "timestamp": 1, "requestedFeatures": 1, "pickUp": 1, "dropOff": 1, "tripCost": 1,
        "verificationCode": 1, "fareDiscount": 1, "fare": 1, "state": 1, "tip": 1, "client": 1,
        "driver": 1, "businessId": 1, "shiftId": 1, "request": 1
    })
),
mergeMap(dbService => forkJoin(
    of(dbService),
    this.payClientAgreement$(dbService, timestamp, aid, av),
    //this.payPlatformClientAgreement$(dbService, timestamp),
    this.payAppClientAgreement$(dbService, timestamp, aid, av),
fork icon0
star icon0
watch icon5

+ 11 other calls in file

176
177
178
179
180
181
182
183
184
185
    $call.push(build.maximo(xmls, dir, path));
  } else {
    $call.push(build.tfjs(xmls, dir, path));
  }
})
forkJoin($call)
.subscribe((data) => {
  writeFileSync(`${path}/train/labels.csv`, data[0]);
  writeFileSync(`${path}/test/labels.csv`, data[1]);
  console.log('generating label.csv for train and test');
fork icon0
star icon0
watch icon0

266
267
268
269
270
271
272
273
274
275
  //writeFileSync(`${path}/labels/${label}`, txt)
  //copyFileSync(`${path}/${filename}`, `${path}/images/${filename}`)
  $save[label] = new Observable((obs) => {writeFile(`${path}/labels/${label}`, txt, (err) => {obs.next('done'); obs.complete();})})
  $save[filename] = new Observable((obs) => {copyFile(`${path}/${filename}`, `${path}/images/${filename}`, err => {obs.next('done'); obs.complete();})})
})
forkJoin($save)
.subscribe({
  next: (save) => console.log(Object.keys(save).length),
  complete: () => {
    observer.next()
fork icon0
star icon0
watch icon0

+ 4 other calls in file

163
164
165
166
167
168
169
170
171
172
  const apmPositionElements = apmTrans === null || apmTrans === void 0 ? void 0 : apmTrans.startSpan('position-elements', 'correction'); // position panel elements for print layout

  await ((_layout$positionEleme = layout.positionElements) === null || _layout$positionEleme === void 0 ? void 0 : _layout$positionEleme.call(layout, driver, logger));
  apmPositionElements === null || apmPositionElements === void 0 ? void 0 : apmPositionElements.end();
  await (0, _wait_for_render.waitForRenderComplete)(this.timeouts.loadDelay, driver, layout, logger);
}).pipe((0, _operators.mergeMap)(() => Rx.forkJoin({
  timeRange: (0, _get_time_range.getTimeRange)(driver, layout, logger),
  elementsPositionAndAttributes: (0, _get_element_position_data.getElementPositionAndAttributes)(driver, layout, logger),
  renderErrors: (0, _get_render_errors.getRenderErrors)(driver, layout, logger)
})), this.waitUntil(this.timeouts.renderComplete));
fork icon0
star icon0
watch icon0