How to use the defer function from rxjs

Find comprehensive JavaScript rxjs.defer code examples handpicked from public code repositorys.

rxjs.defer is a function provided by the RxJS library that defers the creation of an observable until an observer is ready to subscribe to it, allowing for asynchronous operations to be managed and handled more efficiently.

1
2
3
4
5
6
7
8
9
10
const {swallow} = require('#sepal/rxjs')
const log = require('#sepal/log').getLogger('gdal')
const {terminal$} = require('#sepal/terminal')
const _ = require('lodash')

const createVrt$ = ({inputPaths, outputPath, args = []}) => defer(() => {
    const inputPathList = _.isArray(inputPaths) ? inputPaths : [inputPaths]
    return execute$(
        'gdalbuildvrt',
        'create VRT',
fork icon45
star icon180
watch icon34

+ 5 other calls in file

45
46
47
48
49
50
51
52
53
54
    const expiresInMinutes = (user.googleTokens.accessTokenExpiryDate - new Date().getTime()) / 60 / 1000
    log.trace(() => `${usernameTag(user.username)} ${urlTag(req.originalUrl)} Google tokens expires in ${expiresInMinutes} minutes`)
    return expiresInMinutes < REFRESH_IF_EXPIRES_IN_MINUTES
}

const verifyGoogleTokens$ = defer(() => {
    const user = getRequestUser(req)
    if (user?.googleTokens?.accessTokenExpiryDate) {
        if (shouldRefreshGoogleTokens(user)) {
            return refreshGoogleTokens$
fork icon45
star icon178
watch icon34

+ 7 other calls in file

How does rxjs.defer work?

rxjs.defer works by creating a function that returns an observable that is not actually created until an observer subscribes to it.

This allows you to set up an observable pipeline before you have all of the data or resources needed to create the observable, which can then be created on-demand once the pipeline is ready.

The deferred observable is represented as a function that takes no arguments and returns an observable.

When the observable is finally created, it will emit values to the observer, and the observer can use its methods like .next(), .error(), and .complete() to handle those values.

This makes it useful for situations where you need to set up an observable pipeline in advance, but don't have all of the data or resources you need to create the observable until later.

21
22
23
24
25
26
27
28
29
30
if (!amqpUri) {
    throw new Error('Missing amqpUri')
}
const connection$ = new Subject()
const connect = () => {
    defer(async () => {
        log.debug((`Connecting to message broker: ${amqpUri}`))
        return await amqp.connect(amqpUri)
    }).pipe(
        retry(RETRY_DELAY_MS)
fork icon45
star icon178
watch icon34

61
62
63
64
65
66
67
68
69
70
  host: 'localhost',
  password: 'doNoUseThisSecret!',
  username: 'postgres'
};

const dataSource$ = defer(() =>
  from(
    (async () => {
      const dataSource = createDataSource({
        connectionConfig,
fork icon41
star icon175
watch icon38

Ai Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const { defer } = require("rxjs");

// Define a deferred observable that will eventually emit some data
function deferredObservable() {
  console.log("Creating deferred observable...");

  return defer(() => {
    console.log("Emitting data...");
    return [1, 2, 3];
  });
}

// Use the deferred observable in an observable pipeline
deferredObservable().subscribe(
  (value) => console.log(`Received value: ${value}`),
  (error) => console.error(`Error: ${error.message}`),
  () => console.log("Completed")
);

In this example, we first define a deferredObservable function that will eventually emit some data. We use console.log to log messages indicating when the deferred observable is being created and when it is emitting data. We then use rxjs.defer to create a deferred observable that will be created on-demand when an observer subscribes to it. In the observable pipeline, we use the .subscribe() method to attach an observer that will receive values emitted by the observable. We use the observer's methods like .next(), .error(), and .complete() to handle those values. When the code is run, the output will be: yaml Copy code

101
102
103
104
105
106
107
108
109
110
  }
}


write(data /* Array of JSON objects to be written */) {
  return defer(() => {
    let writeStream;

    return new Observable(obs$ => {
      if (!Array.isArray(data)) {
fork icon77
star icon162
watch icon36

91
92
93
94
95
96
97
98
99
100

/**
 * refresh keycloak token
 */
refreshToken$ () {
  return defer(() => refreshKeycloakToken(this.client, {
    refresh_token: this.data.token.refresh_token,
    grant_type: 'refresh_token',
    client_id: this.settings.client_id,
    realmName: this.settings.realmName
fork icon52
star icon1
watch icon4

+ 7 other calls in file

8
9
10
11
12
13
14
15
16
17
const CHUNK_SIZE = 10 * 1024 * 1024
const CONCURRENT_FILE_DOWNLOAD = 1

const RETRIES = 5

const do$ = (description, promise) => defer(() => {
    log.debug(description)
    return of(true).pipe(
        switchMap(() => fromPromise(promise)),
        retry(RETRIES)
fork icon45
star icon178
watch icon34

119
120
121
122
123
124
125
126
        }, interval);
    });
};
var waitElement = function (id, timeout) {
    if (timeout === void 0) { timeout = 1000; }
    return rxjs_1.defer(function () { return waitElementPromise(id, timeout); });
};
exports.waitElement = waitElement;
fork icon1
star icon17
watch icon0

89
90
91
92
93
94
95
96
97
98
}),
mergeMap((pkgChange) =>
  iif(
    () => dryRun,
    of(pkgChange),
    defer(() =>
      from(
        writePkg(
          pkgChange.pkgPath,
          /**
fork icon0
star icon9
watch icon1

+ 3 other calls in file

44
45
46
47
48
49
50
51
52
53
exports.TimeoutError = rxjs.TimeoutError;
exports.bindCallback = rxjs.bindCallback;
exports.bindNodeCallback = rxjs.bindNodeCallback;
exports.combineLatest = rxjs.combineLatest;
exports.concat = rxjs.concat;
exports.defer = rxjs.defer;
exports.empty = rxjs.empty;
exports.forkJoin = rxjs.forkJoin;
exports.from = rxjs.from;
exports.fromEvent = rxjs.fromEvent;
fork icon299
star icon0
watch icon1

30
31
32
33
34
35
36
37
38
39
    log.debug(() => message)
    return operation$
}

const auth$ = () =>
    defer(getCurrentContext$).pipe(
        switchMap(({userCredentials}) => {
            const oAuth2Client = new google.auth.OAuth2()
            const expiration = moment(userCredentials['access_token_expiry_date'])
            log.debug(() => `Authenticating with token expiring ${expiration.fromNow()} (${expiration})`)
fork icon45
star icon0
watch icon0

8
9
10
11
12
13
14
15
16
17
18
19


const Biz = (init = {}) => {
  const target = { ...init };


  target.api = (x) =>
    defer(() => {
      const method = x.body ? "POST" : "GET";
      const headers = {
        Authorization: `token ${target.token}`,
        "User-Agent": "Muranode/1.0.0",
fork icon0
star icon1
watch icon1

+ 21 other calls in file

21
22
23
24
25
26
27
28
29
30
if (!amqpUri) {
    throw new Error('Missing amqpUri')
}
const connection$ = new Subject()
const connect = () => {
    defer(async () => await amqp.connect(amqpUri)).pipe(
        retry(RETRY_DELAY_MS)
    ).subscribe(
        connection => {
            log.info(`Connected to message broker: ${amqpUri}`)
fork icon45
star icon0
watch icon0

39
40
41
42
43
44
45
46
47
48
  };
  if(businessId){
    query.businessId = businessId;
  }

  return defer(() => collection.findOne(query));
}

/**
 * get services from the satellite
fork icon0
star icon0
watch icon5

+ 165 other calls in file

95
96
97
98
99
100
101
102
103
104
}

fetchAnswer(question) {
  var Prompt = this.prompts[question.type];
  this.activePrompt = new Prompt(question, this.rl, this.answers);
  return defer(() =>
    from(
      this.activePrompt
        .run()
        .then((answer) => ({ name: question.name, answer: answer }))
fork icon0
star icon0
watch icon1

+ 123 other calls in file

26
27
28
29
30
31
32
33
34
  const collection = mongoDB.db.collection(CollectionName);

  const query = {
    "_id": id
  };
  return defer(() => collection.findOne(query, projection));
}


fork icon0
star icon0
watch icon5

+ 5 other calls in file

152
153
154
155
156
157
158
159
160

completeRender(apmTrans) {
  const driver = this.driver;
  const layout = this.layout;
  const logger = this.logger;
  return Rx.defer(async () => {
    var _layout$positionEleme; // Waiting till _after_ elements have rendered before injecting our CSS
    // allows for them to be displayed properly in many cases

fork icon0
star icon0
watch icon0

+ 2 other calls in file