How to use the from function from rxjs

Find comprehensive JavaScript rxjs.from code examples handpicked from public code repositorys.

38
39
40
41
42
43
44
45
46
47
  obs$.error('INVALID_DATA');
  return;
}

writeStream = _fs.createWriteStream(this._targetPath, { encoding: 'utf8' });
const sourceStream = from(data).pipe(
  skipWhile(d => !d || Object.prototype.toString.call(d) !== '[object Object]'),
  catchError(e => destroy$.next()),
  takeUntil(destroy$)
);
fork icon77
star icon162
watch icon36

60
61
62
63
64
65
66
67
68
69
    headers: {[SEPAL_USER_HEADER]: JSON.stringify(user)}
}).pipe(
    map((({body}) => JSON.parse(body))),
    switchMap(user => {
        log.isDebug() && log.debug(`${usernameTag(user.username)} ${urlTag(req.url)} Updated user in user store, connected to Google: ${!!user.googleTokens}`)
        return from(setUser(user))
    }),
    catchError(error => {
        log.error(`${usernameTag(user.username)} ${urlTag(req.url)} Failed to load current user`, error)
        return EMPTY
fork icon45
star icon178
watch icon34

30
31
32
33
34
35
36
37
38
39
switchMap(({body: googleTokens}) => {
    if (googleTokens) {
        log.debug(() => `${usernameTag(user.username)} ${urlTag(req.originalUrl)} Refreshed Google tokens`)
        const updatedUser = {...user, googleTokens: JSON.parse(googleTokens)}
        res.set(SEPAL_USER_HEADER, JSON.stringify(updatedUser))
        return from(userStore.setUser(updatedUser))
    } else {
        log.warn(`${usernameTag(user.username)} ${urlTag(req.originalUrl)} Google tokens not refreshed - missing from response`)
    }
    return of(true)
fork icon45
star icon178
watch icon34

+ 3 other calls in file

76
77
78
79
80
81
82
83
84
85
        emitsOne(dirPath => expect(Path.basename(dirPath)).toEqual('some_dir_2'))
    )
})

const mkTmpDir$ = () =>
    from(fs.promises.mkdtemp(Path.join(os.tmpdir(), 'test-')))

beforeEach(() =>
    firstValueFrom(
        mkTmpDir$().pipe(
fork icon45
star icon178
watch icon34

215
216
217
218
219
220
221
222
223
224
    args$.subscribe(
        msg => processMessage(msg)
    )
}

return from(init()).pipe(
    switchMap(() => out$.pipe(
        finalize(
            () => stop$.next()
        )
fork icon45
star icon177
watch icon0

+ 5 other calls in file

15
16
17
18
19
20
21
22
23
24
25
        complete: () => log.fatal('Monitor unexpectedly completed')
    })


const apps$ = () =>
    fileToJson$('/var/lib/sepal/app-manager/apps.json').pipe(
        switchMap(({apps}) => from(apps)),
        filter(({repository}) => repository),
        map(({endpoint = 'shiny', label, repository, branch}) => {
            const name = basename(repository)
            return {
fork icon45
star icon177
watch icon0

62
63
64
65
66
67
68
69
70
71
  password: 'doNoUseThisSecret!',
  username: 'postgres'
};

const dataSource$ = defer(() =>
  from(
    (async () => {
      const dataSource = createDataSource({
        connectionConfig,
        devOptions: process.argv.includes('--drop')
fork icon41
star icon175
watch icon38

16
17
18
19
20
21
22
23
24
25
  log('')
  return
}

return await new Promise(resolve =>
  from(db.movies.allDocs({ include_docs: true })).pipe(
    pluck('rows'),
    map(entities => entities.map(entity => ({ id: entity.id, ...entity.doc }))),
    tap(movies => movies.filter(movie => ['wished', 'missing'].includes(movie.state)).length ? '' : log('👏', 'Up to date, no more wished movies !')),
    map(movies => movies.sort((a, b) => a.time - b.time)),
fork icon6
star icon98
watch icon0

+ 11 other calls in file

103
104
105
106
107
108
109
110
111
112
    return { recordsMap, records }
  },
  { recordsMap: new Map(), records: [] }
),
mergeMap(({ records }) =>
  from(model.save(records)).pipe(
    mergeMap(({ saved, removedIds }) =>
      concat(
        from(saved).pipe(
          tap(record =>
fork icon3
star icon54
watch icon3

+ 9 other calls in file

129
130
131
132
133
134
135
136
137
138
    var label = most.from(['initial', 'Count is ']);
    var view = most.combine(renderWithArgs, label, count);
    runners.runMost(deferred, view.drain());
}, options)
.add('rx 6', function (deferred) {
    var source = rxjs.from(a);
    var {
        map,
        filter,
        scan
fork icon1
star icon49
watch icon6

+ 3 other calls in file

110
111
112
113
114
115
116
117
118
119
  complete() {
    onComplete();
  },
};
this._latestSnapshotObservable = interval(this._options.snapshotInterval || 5000).pipe(
  switchMap(() => from(this.getLatestSnapshot())),
  multicast(new Subject()),
  refCount(),
);
this._subscribedSnapshotObservable = this._latestSnapshotObservable.subscribe(observer);
fork icon7
star icon18
watch icon3

+ 7 other calls in file

58
59
60
61
62
63
64
65
66
67
);

const urlAndDOM$ = uniqueUrl$.pipe(
  mergeMap(
    url => {
      return from(rp(url)).pipe(
        retryWhen(
          genericRetryStrategy({
            maxRetryAttempts: maxRetries,
            scalingDuration: 3000,
fork icon6
star icon12
watch icon3

+ 3 other calls in file

124
125
126
127
128
129
130
131
132
133
filter(({ buses, trips }) => buses.length && trips.length),
mergeMap(({ buses, trips }) => busDispatch(buses, trips), 1), // try to find optimal plan x kommun at a time
retryWhen((errors) => errors.pipe(delay(1000), take(10))),
mergeAll(),
mergeMap(({ bus, stops }) =>
  from(stops).pipe(
    pairwise(),
    map(stopsToBooking),
    map((booking) => ({ bus, booking }))
  )
fork icon1
star icon16
watch icon0

+ 5 other calls in file

10
11
12
13
14
15
16
17
18
19
20
21
const range = (length) => Array.from({ length }).map((_, i) => i)


describe('A kommun', () => {
  const arjeplog = { lon: 17.886855, lat: 66.041054 }
  const ljusdal = { lon: 14.44681991219, lat: 61.59465992477 }
  const squares = from([])
  let fleets
  let kommun


  let testBooking = new Booking({
fork icon1
star icon16
watch icon0

75
76
77
78
79
80
81
82
83
84
85
  return adresses.map((a) => ({ ...a, position: new Position(a.position) }))
}


// function read() {
function read({ fleets }) {
  return from(data).pipe(
    filter(({ namn }) =>
      includedMunicipalities.some((name) => namn.startsWith(name))
    ),
    map((kommun) => {
fork icon1
star icon16
watch icon0

+ 2 other calls in file

69
70
71
72
73
74
75
76
77
78
this.hub = { position: new Position(hub) }

this.percentageHomeDelivery = (percentageHomeDelivery || 0) / 100 || 0.15 // based on guestimates from workshop with transport actors in oct 2021
this.percentageReturnDelivery = 0.1
this.kommun = kommun
this.cars = from(Object.entries(vehicles)).pipe(
  mergeMap(([type, count]) =>
    range(0, count).pipe(
      mergeMap(() => {
        const Vehicle = vehicleTypes[type].class
fork icon1
star icon16
watch icon0

65
66
67
68
69
70
71
72
73

function publicTransport(operator) {
  // stop_times.trip_id -> trips.service_id -> calendar_dates.service_id
  const todaysDate = moment().format('YYYYMMDD')

  const todaysStops = from(getStopsForDate(todaysDate, operator)).pipe(
    mergeAll(),
    shareReplay()
  )
fork icon1
star icon16
watch icon0

4135
4136
4137
4138
4139
4140
4141
4142
4143
4144
console.log(email);

let sayReturn = await getUserNotPost();
const response = await axios.get(apiEmailSorted);
const data = await response.data.records;
const data$ = from(data)
  .pipe(
    tap((data) => {
      console.log("Debug");
      //console.log(data)
fork icon0
star icon14
watch icon0

+ 3 other calls in file

31
32
33
34
35
36
37
38
39
40
 */
function readPackageJson() {
  return (pkgJsonPath$) =>
    pkgJsonPath$.pipe(
      mergeMap((pkgJsonPath) =>
        from(readPkg({cwd: path.dirname(pkgJsonPath), normalize: false})).pipe(
          map((pkg) =>
            Object.freeze({
              pkgPath: pkgJsonPath,
              pkg,
fork icon0
star icon9
watch icon1

+ 7 other calls in file

227
228
229
230
231
232
233
    );

return timer(delay, interval).pipe(
  mergeMap(() => {
    output(`checking file stat for file:${filePath} ...`);
    return from(getFileSize(filePath));
  }, simultaneous),
fork icon85
star icon0
watch icon11

+ 7 other calls in file