How to use the Subject function from rxjs

Find comprehensive JavaScript rxjs.Subject code examples handpicked from public code repositorys.

71
72
73
74
75
76
77
78
79
  test: networkConfigSchema,
  regtest: networkConfigSchema,
};

let stateRef;
let destroy$ = new Subject();


const getParticlCoreChain = () => {
fork icon77
star icon162
watch icon36

+ 5 other calls in file

89
90
91
92
93
94
95
96
97
98
const toFilePath = prefix.endsWith('/')
    ? path.join(downloadDir, relativePath)
    : path.join(downloadDir, path.basename(prefix), relativePath)
const downloadChunk$ = start => {
    const end = start + CHUNK_SIZE
    const chunkSubject$ = new Subject()
    const chunk$ = chunkSubject$.pipe(retry(RETRIES))
    const startTime = new Date().getTime()
    let next
    file.createReadStream({start, end})
fork icon45
star icon178
watch icon34

16
17
18
19
20
21
22
23
24
25
const poolTag = (name, instanceId) => tag('Pool', name, instanceId)

const Pool = ({name, maxIdleMilliseconds = 1000, minIdleCount = 0, create$, onCold, onHot, onRelease, onDispose, onKeep, onMsg}) => {
    const pool = []
    const lock$ = new Subject()
    const unlock$ = new Subject()

    log.info(name)

    lock$.subscribe({
fork icon45
star icon178
watch icon34

+ 3 other calls in file

40
41
42
43
44
45
46
47
48
49
        handleError(ctx, error)
    }
}

const handleHttp = async ctx => {
    const close$ = new Subject()
    ctx.req.on('close', () => close$.next())
    const body$ = ctx.result$.pipe(
        takeUntil(close$)
    )
fork icon45
star icon178
watch icon34

+ 5 other calls in file

19
20
21
22
23
24
25
26
27
28

const messageQueue$ = amqpUri => {
    if (!amqpUri) {
        throw new Error('Missing amqpUri')
    }
    const connection$ = new Subject()
    const connect = () => {
        defer(async () => {
            log.debug((`Connecting to message broker: ${amqpUri}`))
            return await amqp.connect(amqpUri)
fork icon45
star icon178
watch icon34

84
85
86
87
88
89
90
91
92
93
if (username || password)
    headers = {
        'Authorization': `Basic ${base64.encode(`${username}:${password}`)}`,
        ...headers
    }
const request$ = new Subject()
const options = {url: urlWithQuery, method, headers, ...args}
log.trace(() => `${method} ${url}`)
const start = new Date()
request(options, (error, response, body) => {
fork icon45
star icon178
watch icon34

23
24
25
26
27
28
29
30
31
32
const log = require('#sepal/log').getLogger('ws')

const REMOVE_COMFORT_DELAY_MS = 1000

const out$ = new Subject()
const stop$ = new Subject()

const createWatcher = async ({out$, stop$, userHomeDir, pollIntervalMilliseconds}) => {
    const monitoredPaths = []
    const trigger$ = new Subject()
fork icon45
star icon177
watch icon0

+ 5 other calls in file

11
12
13
14
15
16
17
18
19
20
21
22
23
const MIN_TIME_BETWEEN_NOTIFICATIONS = 1 * 1000
const MAX_TIME_BETWEEN_NOTIFICATIONS = 60 * 1000


const {sepalEndpoint, sepalUsername, sepalPassword} = getConfig()


const task$ = new Subject()
const cancel$ = new Subject()


const msg = (id, msg) => `${taskTag(id)}: ${msg}`

fork icon45
star icon177
watch icon0

1365
1366
1367
1368
1369
1370
1371
1372
1373
1374

#setupZmqDataSubjects(zmqChannels) {
  this.#teardownZmqDataSubjects();

  zmqChannels.forEach(channel => {
    this.#ZMQ_SUBJECTS[channel] = new Subject();
    this.#ZMQ_SUBJECTS[channel].pipe(
      auditTime(2_000)
    ).subscribe({
      next: (data) => {
fork icon76
star icon162
watch icon36

22
23
24
25
26
27
28
29
30
31
class StateMachine {

        //subject = null

        constructor (options) {
                this.subject = new Subject()

                if (!_.isObject(options.state)) {
                        throw new StatemachineError('No inital state specified.', 1);
                }
fork icon6
star icon15
watch icon4

+ 13 other calls in file

8
9
10
11
12
13
14
15
16
17
18
19
  APIKEY: process.APIKEY,
  APISECRET: process.APISECRET,
});
const PAIR = "BTCBUSD";


let subject = new Subject();
done$ = new Subject();


async function start(endTime = undefined) {
  count++;
fork icon2
star icon2
watch icon1

+ 69 other calls in file

124
125
126
127
128
129
130
131
132
133
    }
}

setValue(value) {
    return new Promise((resolve, reject) => {
        let obs = new Subject();
        if (!value) {
            reject("Can't set null")
            return
        }
fork icon2
star icon2
watch icon1

90
91
92
93
94
95
96
97
98
99

return {
    getInstance$: () =>
        getInstance$().pipe(
            switchMap(instance =>
                concat(of(instance), new Subject()).pipe(
                    tap(instance => lock(instance)),
                    map(({item}) => item),
                    finalize(() => release(instance))
                )
fork icon45
star icon0
watch icon0

+ 17 other calls in file

13
14
15
16
17
18
19
20
21
22
    delimiter: '\r\n'
})

serial.pipe(parser)
        
const send$ = new Subject()

// handle opening
serial.on('open', err => {
    if (err) {
fork icon5
star icon12
watch icon8

+ 3 other calls in file

94
95
96
97
98
99
100
101
102
103
      })
    )
  ),
  shareReplay()
)
this.unhandledBookings = new Subject()
this.manualDispatchedBookings = new Subject()
this.dispatchedBookings = merge(
  this.manualDispatchedBookings,
  dispatch(this.cars, this.unhandledBookings)
fork icon1
star icon16
watch icon0

+ 3 other calls in file

27
28
29
30
31
32
33
34
35
36

const logger = getLogger('services/tracks')

// Because synchronizing folders may broadcast a lot of messages to the UI,
// this queue is a buffer to only broadcast once every second (see listen())
const messages$ = new Subject()

const sorters = {
  /**
   * Sort tracks by their track number (in tags): consider disk number, then track number
fork icon3
star icon54
watch icon3

0
1
2
3
4
5
6
7
8
9
#!/usr/bin/env node

const inquirer = require('inquirer');
const rx = require('rxjs');

const prompts = new rx.Subject();
const inquirerPrompts = inquirer.prompt(prompts).ui.process;

const InquirerPromptTypes = Object.freeze({
  LIST: 'list',
fork icon1
star icon26
watch icon1

8
9
10
11
12
13
14
15
16
17
const outputFolder = path.join(__dirname, '../../apps/client/src/app/core/data/sources/');
const assetOutputFolder = path.join(__dirname, '../../apps/client/src/assets/data/');

const queue = [];

const emptyQueue$ = new Subject();

const stopInterval$ = emptyQueue$.pipe(
  distinctUntilChanged(),
  debounceTime(30000),
fork icon196
star icon6
watch icon1

+ 19 other calls in file

10
11
12
13
14
15
16
17
18
19
const UPDATE_TIMEOUT = 1_800_000; // 30 minutes


exports.init = () => {
  if (!checkerSubject) {
    checkerSubject = new Subject();

    checkerSubject.pipe(
      auditTime(30_000),
      concatMap(() => {
fork icon77
star icon162
watch icon0

86
87
88
89
90
91
92
93
94
95
  headers: { 'Access-Control-Allow-Origin': '*' },
  publicPath: '/_assets/',
  stats: Object.assign({ colors: true }, devServerConfig.devServer.stats),
});

const notifications$ = new Subject();

const sockWrite = devServer.sockWrite;

devServer.sockWrite = (sockets, type, data) => {
fork icon3
star icon52
watch icon8

+ 3 other calls in file