How to use the of function from rxjs

Find comprehensive JavaScript rxjs.of code examples handpicked from public code repositorys.

rxjs.of creates an Observable that emits the values provided as arguments and then completes.

56
57
58
59
60
61
62
63
64
65
            log.isTrace() && log.trace(`${usernameTag(user.username)} ${urlTag(req.originalUrl)} No need to refresh Google tokens for user - more than ${REFRESH_IF_EXPIRES_IN_MINUTES} minutes left until expiry`)
            return of(true)
        }
    } else {
        log.isTrace() && log.trace(`${usernameTag(user.username)} ${urlTag(req.originalUrl)} No Google tokens to verify for user`)
        return of(true)
    }
})

const authenticated$ = (username, response) => {
fork icon45
star icon178
watch icon34

+ 13 other calls in file

46
47
48
49
50
51
52
53
54
55
interval(MONITORING_FREQUENCY).pipe(
    exhaustMap(() => status$(taskId)),
    switchMap(({state, error_message: error}) =>
        (error || state === FAILED)
            ? throwError(() => new Error(error))
            : of(state)
    ),
    distinctUntilChanged(),
    takeWhile(state => isRunning(state)),
    map(toProgress)
fork icon45
star icon178
watch icon34

+ 5 other calls in file

How does rxjs.of work?

rxjs.of() is a static operator method that creates an observable that emits a sequence of values that are passed as arguments, and then completes. It can emit any number of values and types, and can be used to create an observable from an array or multiple arguments. The emitted values are synchronous by default, but can be made asynchronous with the use of a scheduler.

10
11
12
13
14
15
16
17
18
19

const RETRIES = 5

const do$ = (description, promise) => defer(() => {
    log.debug(description)
    return of(true).pipe(
        switchMap(() => fromPromise(promise)),
        retry(RETRIES)
    )
})
fork icon45
star icon178
watch icon34

+ 7 other calls in file

82
83
84
85
86
87
88
89
90
    log.debug(msg(instance, 'released'))
    onRelease && onRelease({name, instanceId: instance.id})
}

const hot$ = instance =>
    of(instance).pipe(
        tap(instance => log.debug(msg(instance, 'recycled existing'))),
        tap(instance => onHot && onHot({name, instanceId: instance.id}))
    )
fork icon45
star icon178
watch icon34

Ai Example

1
2
3
4
import { of } from "rxjs";

const observable = of(1, 2, 3);
observable.subscribe((value) => console.log(value));

In this example, the of function from the rxjs library is used to create an observable that emits the values 1, 2, and 3. The subscribe method is then used to subscribe to the observable and log each emitted value to the console. When the above code is run, the output will be: Copy code

74
75
76
77
78
79
80
81
82
83
)

const finalState$ = new BehaviorSubject(completedState)

return concat(
    of(initialState),
    progressState$.pipe(
        takeUntil(cancel$.pipe(
            tap(() => finalState$.next(cancelState))
        )),
fork icon45
star icon178
watch icon34

115
116
117
118
119
120
121
122
123
124
})
return request$.pipe(
    map(response => validateResponse(response, validStatuses)),
    catchError(e => {
        if (validStatuses && validStatuses.includes(e.statusCode)) {
            return of(e)
        } else {
            return throwError(() => e)
        }
    }),
fork icon45
star icon178
watch icon34

43
44
45
46
47
48
49
50
51
52
                .clip(geometry)
        )
    )
},
getBands$() {
    return of(BANDS)
},
getGeometry$() {
    return of(geometry)
}
fork icon45
star icon177
watch icon0

44
45
46
47
48
49
50
51
52
53
  pgBoss: true
};

const cardanoNode = new OgmiosObservableCardanoNode(
  {
    connectionConfig$: of({
      port: 1339
    })
  },
  { logger }
fork icon41
star icon175
watch icon38

33
34
35
36
37
38
39
40
41
42
mockSettingsService.getLastSynchronizationDate.mockReturnValue(
    new Date('1970-01-01'),
);

mockFlathubSynchronizer.startSynchronization.mockReturnValueOnce(
    of(true),
);

mockAppImageHubSynchronizer.startSynchronization.mockReturnValueOnce(
    of(true),
fork icon29
star icon360
watch icon7

+ 7 other calls in file

81
82
83
84
85
86
87
88
89
90
    } else {
      log.error(`Update check aborted: non-existent url`);
    }
  }

  return of('');
}),

concatMap(() => {
  return timer(UPDATE_TIMEOUT).pipe(
fork icon77
star icon162
watch icon0

162
163
164
165
166
167
168
169
170
171
const ccdcArgs = {selection: [band]}

const {monitoringEnd, monitoringStart, calibrationStart} = toDates(recipe)

const selectedBands$ = selectedBands && selectedBands.length
    ? of(selectedBands)
    : imageFactory(model.reference, ccdcArgs).getBands$()

const bands$ = selectedBands$.pipe(
    map(selectedBands => ({selectedBands, baseBands}))
fork icon45
star icon177
watch icon0

105
106
107
108
109
110
111
112
113
114
switchMap(progress =>
    progress.state === 'COMPLETED'
        ? taskCompleted$(task.id)
        : progress.state === 'CANCELED'
            ? taskCanceled$(task.id)
            : of(progress)
),
catchError(error =>
    taskFailed$(task.id, error)
),
fork icon45
star icon177
watch icon0

17
18
19
20
21
22
23
24
25
26
const LOCAL_PACKAGE_PREFIX = '@relate/';
const CWD = path.resolve(process.cwd());
const PACKAGES = path.join(CWD, '/packages');
const allPackages = fs.readdirSync(PACKAGES).map((p) => path.join(PACKAGES, p));

module.exports = of(allPackages).pipe(
    flatMap((packages) => [CWD, ...packages]),
    filter((p) => fs.statSync(p).isDirectory()),
    map(getDepLicenses),
    combineAll(),
fork icon8
star icon18
watch icon33

+ 13 other calls in file

29
30
31
32
33
34
35
36
37
38
  this.status = err ? 'off' : status
  this.emit('status', err)
}

listen() {
  return of(new PlexService(Config.payload.plex)).pipe(
    mergeMap((plex) => interval(5000).pipe(
      mergeMap(() => plex.status()),
      tap(status => this.state(status)),
      takeWhile(status => status === 'waiting', true),
fork icon6
star icon98
watch icon6

70
71
72
73
74
75
76
77
78
79
),
catchError(error => {
  const { uri } = error.options;
  console.log(`Error requesting ${uri} after ${maxRetries} retries.`);
  // return null on error
  return of(null);
}),
// filter out errors
filter(v => v),
// get the cheerio function $
fork icon6
star icon12
watch icon3

+ 3 other calls in file

-1
fork icon30
star icon10
watch icon2

+ 5 other calls in file

5
6
7
8
9
10
11
12
13
14
  observer.next(2);
  observer.next(3);
  throw new Error('stream crashed');
  observer.complete();
}).pipe(
  catchError(err => of('patched error'))
);

stream$.subscribe(
  data => console.log('data emitted', data),
fork icon9
star icon10
watch icon5

+ 11 other calls in file

122
123
124
125
126
127
128
129
130
131
function grab(movie, release, context = {}) {
  log('🎟️ ', `Grabbing ${chalk.inverse(release.title)} from ${chalk.gray(release.site)}`)
  logger.fetch(`🎟️ Grabbing **${release.title}** from **_${release.site}_**`, { context, success: true, release })

  return of(null).pipe(
    mergeMap(() => of(sensorr.config.blackhole).pipe(
      mergeMap(blackhole => bindNodeCallback(fs.access)(blackhole, fs.constants.W_OK).pipe(
        map(err => !err),
        mergeMap(exist => exist ? of(null) : bindNodeCallback(fs.mkdir)(blackhole, { recursive: true })),
        mergeMap(err => err ? throwError(err) : of(null)),
fork icon6
star icon98
watch icon6

+ 35 other calls in file

18
19
20
21
22
23
24
25
26
27
{
  title: `Checkout branch ${config.develop}`,
  task: (ctx, task) => git.checkout('-b', config.develop, `${config.remote}/${config.develop}`)
    .pipe(catchError(() => {
      task.skip(`Local branch ${config.develop} exists`);
      return of(undefined);
    }))
},
{
  title: `Checkout branch ${config.develop}`,
fork icon5
star icon9
watch icon5

+ 7 other calls in file

112
113
114
115
116
117
118
119
120
121
  tap(record =>
    messages$.next({ type: `${property}-changes`, data: record })
  )
),
// removals must be delays to that change are transfered before
of(null).pipe(delay(100)),
from(removedIds).pipe(
  tap(id =>
    messages$.next({ type: `${property}-removals`, data: id })
  )
fork icon3
star icon54
watch icon3

+ 5 other calls in file