How to use the defer function from rxjs
Find comprehensive JavaScript rxjs.defer code examples handpicked from public code repositorys.
rxjs.defer is a function provided by the RxJS library that defers the creation of an observable until an observer is ready to subscribe to it, allowing for asynchronous operations to be managed and handled more efficiently.
GitHub: openforis/sepal
1 2 3 4 5 6 7 8 9 10
const {swallow} = require('#sepal/rxjs') const log = require('#sepal/log').getLogger('gdal') const {terminal$} = require('#sepal/terminal') const _ = require('lodash') const createVrt$ = ({inputPaths, outputPath, args = []}) => defer(() => { const inputPathList = _.isArray(inputPaths) ? inputPaths : [inputPaths] return execute$( 'gdalbuildvrt', 'create VRT',
+ 5 other calls in file
GitHub: openforis/sepal
45 46 47 48 49 50 51 52 53 54
const expiresInMinutes = (user.googleTokens.accessTokenExpiryDate - new Date().getTime()) / 60 / 1000 log.trace(() => `${usernameTag(user.username)} ${urlTag(req.originalUrl)} Google tokens expires in ${expiresInMinutes} minutes`) return expiresInMinutes < REFRESH_IF_EXPIRES_IN_MINUTES } const verifyGoogleTokens$ = defer(() => { const user = getRequestUser(req) if (user?.googleTokens?.accessTokenExpiryDate) { if (shouldRefreshGoogleTokens(user)) { return refreshGoogleTokens$
+ 7 other calls in file
How does rxjs.defer work?
rxjs.defer
works by creating a function that returns an observable that is not actually created until an observer subscribes to it.
This allows you to set up an observable pipeline before you have all of the data or resources needed to create the observable, which can then be created on-demand once the pipeline is ready.
The deferred observable is represented as a function that takes no arguments and returns an observable.
When the observable is finally created, it will emit values to the observer, and the observer can use its methods like .next()
, .error()
, and .complete()
to handle those values.
This makes it useful for situations where you need to set up an observable pipeline in advance, but don't have all of the data or resources you need to create the observable until later.
GitHub: openforis/sepal
21 22 23 24 25 26 27 28 29 30
if (!amqpUri) { throw new Error('Missing amqpUri') } const connection$ = new Subject() const connect = () => { defer(async () => { log.debug((`Connecting to message broker: ${amqpUri}`)) return await amqp.connect(amqpUri) }).pipe( retry(RETRY_DELAY_MS)
61 62 63 64 65 66 67 68 69 70
host: 'localhost', password: 'doNoUseThisSecret!', username: 'postgres' }; const dataSource$ = defer(() => from( (async () => { const dataSource = createDataSource({ connectionConfig,
Ai Example
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
const { defer } = require("rxjs"); // Define a deferred observable that will eventually emit some data function deferredObservable() { console.log("Creating deferred observable..."); return defer(() => { console.log("Emitting data..."); return [1, 2, 3]; }); } // Use the deferred observable in an observable pipeline deferredObservable().subscribe( (value) => console.log(`Received value: ${value}`), (error) => console.error(`Error: ${error.message}`), () => console.log("Completed") );
In this example, we first define a deferredObservable function that will eventually emit some data. We use console.log to log messages indicating when the deferred observable is being created and when it is emitting data. We then use rxjs.defer to create a deferred observable that will be created on-demand when an observer subscribes to it. In the observable pipeline, we use the .subscribe() method to attach an observer that will receive values emitted by the observable. We use the observer's methods like .next(), .error(), and .complete() to handle those values. When the code is run, the output will be: yaml Copy code
101 102 103 104 105 106 107 108 109 110
} } write(data /* Array of JSON objects to be written */) { return defer(() => { let writeStream; return new Observable(obs$ => { if (!Array.isArray(data)) {
91 92 93 94 95 96 97 98 99 100
/** * refresh keycloak token */ refreshToken$ () { return defer(() => refreshKeycloakToken(this.client, { refresh_token: this.data.token.refresh_token, grant_type: 'refresh_token', client_id: this.settings.client_id, realmName: this.settings.realmName
+ 7 other calls in file
8 9 10 11 12 13 14 15 16 17
const CHUNK_SIZE = 10 * 1024 * 1024 const CONCURRENT_FILE_DOWNLOAD = 1 const RETRIES = 5 const do$ = (description, promise) => defer(() => { log.debug(description) return of(true).pipe( switchMap(() => fromPromise(promise)), retry(RETRIES)
GitHub: lucasteles/reactivehooks
119 120 121 122 123 124 125 126
}, interval); }); }; var waitElement = function (id, timeout) { if (timeout === void 0) { timeout = 1000; } return rxjs_1.defer(function () { return waitElementPromise(id, timeout); }); }; exports.waitElement = waitElement;
89 90 91 92 93 94 95 96 97 98
}), mergeMap((pkgChange) => iif( () => dryRun, of(pkgChange), defer(() => from( writePkg( pkgChange.pkgPath, /**
+ 3 other calls in file
GitHub: caosbad/api
44 45 46 47 48 49 50 51 52 53
exports.TimeoutError = rxjs.TimeoutError; exports.bindCallback = rxjs.bindCallback; exports.bindNodeCallback = rxjs.bindNodeCallback; exports.combineLatest = rxjs.combineLatest; exports.concat = rxjs.concat; exports.defer = rxjs.defer; exports.empty = rxjs.empty; exports.forkJoin = rxjs.forkJoin; exports.from = rxjs.from; exports.fromEvent = rxjs.fromEvent;
GitHub: VEGMQAZ/sepal
30 31 32 33 34 35 36 37 38 39
log.debug(() => message) return operation$ } const auth$ = () => defer(getCurrentContext$).pipe( switchMap(({userCredentials}) => { const oAuth2Client = new google.auth.OAuth2() const expiration = moment(userCredentials['access_token_expiry_date']) log.debug(() => `Authenticating with token expiring ${expiration.fromNow()} (${expiration})`)
GitHub: QimatLuo/muranode
8 9 10 11 12 13 14 15 16 17 18 19
const Biz = (init = {}) => { const target = { ...init }; target.api = (x) => defer(() => { const method = x.body ? "POST" : "GET"; const headers = { Authorization: `token ${target.token}`, "User-Agent": "Muranode/1.0.0",
+ 21 other calls in file
GitHub: VEGMQAZ/sepal
21 22 23 24 25 26 27 28 29 30
if (!amqpUri) { throw new Error('Missing amqpUri') } const connection$ = new Subject() const connect = () => { defer(async () => await amqp.connect(amqpUri)).pipe( retry(RETRY_DELAY_MS) ).subscribe( connection => { log.info(`Connected to message broker: ${amqpUri}`)
39 40 41 42 43 44 45 46 47 48
}; if(businessId){ query.businessId = businessId; } return defer(() => collection.findOne(query)); } /** * get services from the satellite
+ 165 other calls in file
95 96 97 98 99 100 101 102 103 104
} fetchAnswer(question) { var Prompt = this.prompts[question.type]; this.activePrompt = new Prompt(question, this.rl, this.answers); return defer(() => from( this.activePrompt .run() .then((answer) => ({ name: question.name, answer: answer }))
+ 123 other calls in file
26 27 28 29 30 31 32 33 34
const collection = mongoDB.db.collection(CollectionName); const query = { "_id": id }; return defer(() => collection.findOne(query, projection)); }
+ 5 other calls in file
152 153 154 155 156 157 158 159 160
completeRender(apmTrans) { const driver = this.driver; const layout = this.layout; const logger = this.logger; return Rx.defer(async () => { var _layout$positionEleme; // Waiting till _after_ elements have rendered before injecting our CSS // allows for them to be displayed properly in many cases
+ 2 other calls in file