How to use the Subject function from rxjs
Find comprehensive JavaScript rxjs.Subject code examples handpicked from public code repositorys.
71 72 73 74 75 76 77 78 79
test: networkConfigSchema, regtest: networkConfigSchema, }; let stateRef; let destroy$ = new Subject(); const getParticlCoreChain = () => {
77
162
36
+ 5 other calls in file
89 90 91 92 93 94 95 96 97 98
const toFilePath = prefix.endsWith('/') ? path.join(downloadDir, relativePath) : path.join(downloadDir, path.basename(prefix), relativePath) const downloadChunk$ = start => { const end = start + CHUNK_SIZE const chunkSubject$ = new Subject() const chunk$ = chunkSubject$.pipe(retry(RETRIES)) const startTime = new Date().getTime() let next file.createReadStream({start, end})
45
178
34
GitHub: openforis/sepal
16 17 18 19 20 21 22 23 24 25
const poolTag = (name, instanceId) => tag('Pool', name, instanceId) const Pool = ({name, maxIdleMilliseconds = 1000, minIdleCount = 0, create$, onCold, onHot, onRelease, onDispose, onKeep, onMsg}) => { const pool = [] const lock$ = new Subject() const unlock$ = new Subject() log.info(name) lock$.subscribe({
45
178
34
+ 3 other calls in file
GitHub: openforis/sepal
40 41 42 43 44 45 46 47 48 49
handleError(ctx, error) } } const handleHttp = async ctx => { const close$ = new Subject() ctx.req.on('close', () => close$.next()) const body$ = ctx.result$.pipe( takeUntil(close$) )
45
178
34
+ 5 other calls in file
GitHub: openforis/sepal
19 20 21 22 23 24 25 26 27 28
const messageQueue$ = amqpUri => { if (!amqpUri) { throw new Error('Missing amqpUri') } const connection$ = new Subject() const connect = () => { defer(async () => { log.debug((`Connecting to message broker: ${amqpUri}`)) return await amqp.connect(amqpUri)
45
178
34
GitHub: openforis/sepal
84 85 86 87 88 89 90 91 92 93
if (username || password) headers = { 'Authorization': `Basic ${base64.encode(`${username}:${password}`)}`, ...headers } const request$ = new Subject() const options = {url: urlWithQuery, method, headers, ...args} log.trace(() => `${method} ${url}`) const start = new Date() request(options, (error, response, body) => {
45
178
34
GitHub: openforis/sepal
23 24 25 26 27 28 29 30 31 32
const log = require('#sepal/log').getLogger('ws') const REMOVE_COMFORT_DELAY_MS = 1000 const out$ = new Subject() const stop$ = new Subject() const createWatcher = async ({out$, stop$, userHomeDir, pollIntervalMilliseconds}) => { const monitoredPaths = [] const trigger$ = new Subject()
45
177
0
+ 5 other calls in file
GitHub: openforis/sepal
11 12 13 14 15 16 17 18 19 20 21 22 23
const MIN_TIME_BETWEEN_NOTIFICATIONS = 1 * 1000 const MAX_TIME_BETWEEN_NOTIFICATIONS = 60 * 1000 const {sepalEndpoint, sepalUsername, sepalPassword} = getConfig() const task$ = new Subject() const cancel$ = new Subject() const msg = (id, msg) => `${taskTag(id)}: ${msg}`
45
177
0
1365 1366 1367 1368 1369 1370 1371 1372 1373 1374
#setupZmqDataSubjects(zmqChannels) { this.#teardownZmqDataSubjects(); zmqChannels.forEach(channel => { this.#ZMQ_SUBJECTS[channel] = new Subject(); this.#ZMQ_SUBJECTS[channel].pipe( auditTime(2_000) ).subscribe({ next: (data) => {
76
162
36
22 23 24 25 26 27 28 29 30 31
class StateMachine { //subject = null constructor (options) { this.subject = new Subject() if (!_.isObject(options.state)) { throw new StatemachineError('No inital state specified.', 1); }
6
15
4
+ 13 other calls in file
8 9 10 11 12 13 14 15 16 17 18 19
APIKEY: process.APIKEY, APISECRET: process.APISECRET, }); const PAIR = "BTCBUSD"; let subject = new Subject(); done$ = new Subject(); async function start(endTime = undefined) { count++;
2
2
1
+ 69 other calls in file
124 125 126 127 128 129 130 131 132 133
} } setValue(value) { return new Promise((resolve, reject) => { let obs = new Subject(); if (!value) { reject("Can't set null") return }
2
2
1
GitHub: VEGMQAZ/sepal
90 91 92 93 94 95 96 97 98 99
return { getInstance$: () => getInstance$().pipe( switchMap(instance => concat(of(instance), new Subject()).pipe( tap(instance => lock(instance)), map(({item}) => item), finalize(() => release(instance)) )
45
0
0
+ 17 other calls in file
GitHub: lpaolini/AccentaG4
13 14 15 16 17 18 19 20 21 22
delimiter: '\r\n' }) serial.pipe(parser) const send$ = new Subject() // handle opening serial.on('open', err => { if (err) {
5
12
8
+ 3 other calls in file
94 95 96 97 98 99 100 101 102 103
}) ) ), shareReplay() ) this.unhandledBookings = new Subject() this.manualDispatchedBookings = new Subject() this.dispatchedBookings = merge( this.manualDispatchedBookings, dispatch(this.cars, this.unhandledBookings)
1
16
0
+ 3 other calls in file
GitHub: feugy/melodie
27 28 29 30 31 32 33 34 35 36
const logger = getLogger('services/tracks') // Because synchronizing folders may broadcast a lot of messages to the UI, // this queue is a buffer to only broadcast once every second (see listen()) const messages$ = new Subject() const sorters = { /** * Sort tracks by their track number (in tags): consider disk number, then track number
3
54
3
GitHub: joshuatvernon/cbf
0 1 2 3 4 5 6 7 8 9
#!/usr/bin/env node const inquirer = require('inquirer'); const rx = require('rxjs'); const prompts = new rx.Subject(); const inquirerPrompts = inquirer.prompt(prompts).ui.process; const InquirerPromptTypes = Object.freeze({ LIST: 'list',
1
26
1
GitHub: Supamiu/ffxiv-teamcraft
8 9 10 11 12 13 14 15 16 17
const outputFolder = path.join(__dirname, '../../apps/client/src/app/core/data/sources/'); const assetOutputFolder = path.join(__dirname, '../../apps/client/src/assets/data/'); const queue = []; const emptyQueue$ = new Subject(); const stopInterval$ = emptyQueue$.pipe( distinctUntilChanged(), debounceTime(30000),
196
6
1
+ 19 other calls in file
10 11 12 13 14 15 16 17 18 19
const UPDATE_TIMEOUT = 1_800_000; // 30 minutes exports.init = () => { if (!checkerSubject) { checkerSubject = new Subject(); checkerSubject.pipe( auditTime(30_000), concatMap(() => {
77
162
0
86 87 88 89 90 91 92 93 94 95
headers: { 'Access-Control-Allow-Origin': '*' }, publicPath: '/_assets/', stats: Object.assign({ colors: true }, devServerConfig.devServer.stats), }); const notifications$ = new Subject(); const sockWrite = devServer.sockWrite; devServer.sockWrite = (sockets, type, data) => {
3
52
8
+ 3 other calls in file