How to use the Observable function from rxjs

Find comprehensive JavaScript rxjs.Observable code examples handpicked from public code repositorys.

rxjs.Observable is a class in the RxJS library that represents a stream of events over time, that can be observed and acted upon with various operators.

213
214
215
216
217
218
219
220
221
  }
},

invoke: {
  'wallet-settings': (_, walletName) => {
    return new Observable(observer => {

      if (!stateRef || (typeof walletName !== 'string')) {
        observer.complete();
fork icon77
star icon162
watch icon36

+ 3 other calls in file

104
105
106
107
108
109
110
111
112
113
  assert.strictEqual(count++, 7)
})

tman.it('Rx.Observable asynchronous test', function () {
  assert.strictEqual(count++, 8)
  return Rx.Observable.fromPromise(new Promise(function (resolve) {
    assert.strictEqual(count++, 9)
    setTimeout(resolve, 100)
  }))
})
fork icon9
star icon82
watch icon10

+ 9 other calls in file

How does rxjs.Observable work?

rxjs.Observable is a class that represents a stream of data that can be subscribed to, with support for a wide range of operators and transformations that can be applied to the data stream. When data is emitted by an Observable, subscribers can receive and handle it through the use of callbacks or other means, depending on the desired behavior. The Observable class provides a way to create and manipulate these streams of data in a flexible and reactive manner, allowing developers to write complex applications that respond to user actions, network events, and other stimuli.

42
43
44
45
46
47
48
49
50
51
Rx.Observable.fromEvent(monitoring, 'ChannelNew')
  .take(10)
  .map((ev) => ev.netting_channel.toString('hex'))
  .map(add => `0x${add}`)
  .concatMap(add =>
    Rx.Observable.timer(2500)
      .mapTo(add)
      .do(monitoring.subscribeAddress)
  )
  .subscribe()
fork icon10
star icon28
watch icon15

+ 3 other calls in file

49
50
51
52
53
54
55
56
57
58
title: 'Zipping',
task: () => {
  const tasks = [
    {
      title: 'Creating archive with fonts',
      task: () => new Observable(ob => {
        assert(paths.length === 1);
        const p = paths[0];

        if(p !== targetdir) {
fork icon6
star icon23
watch icon5

+ 27 other calls in file

Ai Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import { Observable } from "rxjs";

const observable = new Observable((observer) => {
  let i = 0;
  const intervalId = setInterval(() => {
    observer.next(i++);
  }, 1000);

  return () => clearInterval(intervalId);
});

const subscription = observable.subscribe({
  next: (value) => console.log(value),
  error: (error) => console.error(error),
  complete: () => console.log("complete"),
});

// After 5 seconds, unsubscribe from the observable.
setTimeout(() => {
  subscription.unsubscribe();
}, 5000);

In this example, we create an observable by passing a function to the Observable constructor. This function takes an observer object, which we use to emit values to subscribers. We use setInterval to emit a sequence of numbers every second, and return a function from our function that will be called when the observable is unsubscribed from. We subscribe to the observable using the subscribe method and provide an object with functions that will be called when the observable emits a value (next), encounters an error (error), or completes (complete). Finally, we unsubscribe from the observable after 5 seconds using the unsubscribe method on the subscription object.

70
71
72
73
74
75
76
77
78
79

exports.rangeImpl = Rx.Observable.range;

exports.throw = Rx.Observable.throw;

exports.timerImpl = Rx.Observable.timer;

/**** Transformation Operators ****/

exports.buffer = function(obs1){
fork icon5
star icon21
watch icon2

+ 319 other calls in file

93
94
95
96
97
98
99
100
101
102
  delete manifest.action
  delete manifest.key
}

function buildFrontend() {
  return new Observable((observer) => {
    exec('cd frontend && yarn build', (error) => {
      if (error) observer.error()

      observer.complete()
fork icon0
star icon11
watch icon1

+ 27 other calls in file

72
73
74
75
76
77
78
79
80
81
},
npmRelease: {
    title: "📢 Publish on npm",
    enabled: () => config.npm === true,
    task: async () => {
        return new Observable(async (observer) => {
            let packagesCount = Number(0)

            for await (const [index, pkg] of paths.entries()) {
                let lastError = null
fork icon0
star icon6
watch icon2

+ 13 other calls in file

47
48
49
50
51
52
53
54
55
56
    params: params,
  };
}

function send(mes) {
  let resp = rx.Observable.create((observer) => {
    messages[mes.id] = observer;
  });
  waitForConnection(() => {
    return WS.send(JSON.stringify(mes));
fork icon2
star icon4
watch icon7

+ 27 other calls in file

78
79
80
81
82
83
84
85
86
87
        setEditing(true)
)),
// Basic interactions with dashboard editor
closeDashboardWidgetEditorOnFinish: (action$, {getState = () => {}} = {}) => action$.ofType(INSERT)
    .filter( () => isDashboardAvailable(getState()))
    .switchMap(() => Rx.Observable.of(setEditing(false))),

// Basic interactions with dashboard editor
initDashboardEditorOnNew: (action$, {getState = () => {}} = {}) => action$.ofType(NEW)
    .filter( () => isDashboardAvailable(getState()))
fork icon344
star icon0
watch icon2

+ 27 other calls in file

89
90
91
92
93
94
95
96
97
98
            values.push(currentTime.toISOString());
        } else {
            break;
        }
    }
    return Rx.Observable.of(values);
};

/**
 * Gets the static list of times to animate
fork icon344
star icon0
watch icon0

+ 13 other calls in file

93
94
95
96
97
98
99
100
101
102
} else {
    actions.push(basicError({message: "maps.feedback.errorSizeExceeded"}));
}
if (!detailsUri) {
    actions.push(setDetailsChanged(a.detailsText !== "<p><br></p>"));
    return Rx.Observable.from(actions);
}
const originalDetails = currentMapOriginalDetailsTextSelector(state);
const currentDetails = currentMapDetailsTextSelector(state);
actions.push(setDetailsChanged(originalDetails !== currentDetails));
fork icon344
star icon0
watch icon0

+ 21 other calls in file

33
34
35
36
37
38
39
40
41
42
 * @param {Function} mode declare mode to display the mask (map or dashboard)
 * @memberof epics.feedbackMask
 * @return {Observable}
 */
const updateVisibility = (action$, loadActions, isEnabled = () => {}, mode) =>
    Rx.Observable.concat(
        Rx.Observable.of(feedbackMaskLoading(mode)),
        action$.ofType(...loadActions)
            .switchMap(action => {
                return Rx.Observable.of(
fork icon344
star icon0
watch icon0

+ 23 other calls in file

20
21
22
23
24
25
26
27
28
29
 * @memberof observables.autocomplete
 * @return {external:Observable} the stream used for fetching data for the autocomplete editor
*/
const createPagedUniqueAutompleteStream = (props$) => props$
    .distinctUntilChanged( ({value, currentPage}, newProps = {}) => !(newProps.value !== value || newProps.currentPage !== currentPage))
    .throttle(props => Rx.Observable.timer(props.delayDebounce || 0))
    .merge(props$.debounce(props => Rx.Observable.timer(props.delayDebounce || 0))).distinctUntilChanged()
    .switchMap((p) => {
        if (p.performFetch) {
            const data = getWpsPayload({
fork icon344
star icon0
watch icon2

+ 9 other calls in file

162
163
164
165
166
167
168
169
170
171
    observer.complete();
  });
},


'setSetting': (_, key, newValue, oldValue) => new Observable(observer => {
  let success = false;

  if (key === 'DEBUGGING_LEVEL' && log.levels.includes(newValue)) {
    // temporary (for the current session) change the default debug level
fork icon77
star icon162
watch icon0

+ 3 other calls in file

172
173
174
175
176
177
178
179
180
181
const externalSourceFields = this.parseArgs.externalSourceFields || [];
let parserConfig = this.omitKey(this.parseArgs, 'sourceEncoding');
parserConfig = this.omitKey(this.parseArgs, 'externalSourceFields');
let rStream;

return new Observable(subs => {
  (async () => {
    const results = [];
    rStream = _fs.createReadStream(source);
    rStream
fork icon77
star icon162
watch icon36

+ 3 other calls in file

91
92
93
94
95
96
97
98
99
100
    ctx.coreService = await startCore(config, { wallet: true, addressIndex: true });
  },
},
{
  title: 'Activating DIP3',
  task: () => new Observable(async (observer) => {
    const dip3ActivationHeight = 1000;
    const blocksToGenerateInOneStep = 10;

    let blocksGenerated = 0;
fork icon23
star icon32
watch icon13

+ 627 other calls in file

54
55
56
57
58
59
60
61
62
63
const healthCheck = new HealthCheck(packageJson, [
  getSubServiceHealthBroker
])
await createHealthCheckServer(Config.PORT, defaultHealthHandler(healthCheck))

const topicObservable = Rx.Observable.create((observer) => {
  consumer.on('message', async (message) => {
    Logger.debug(`Central-Event-Processor :: Topic ${topicName} :: Payload: \n${JSON.stringify(message.value, null, 2)}`)
    observer.next({ message })
    if (!Consumer.isConsumerAutoCommitEnabled(topicName)) {
fork icon10
star icon1
watch icon11

+ 13 other calls in file

-1
fork icon0
star icon13
watch icon3

+ 12 other calls in file

16
17
18
19
20
21
22
23
24
25
    retryWhen
} = require('rxjs/operators');

const watchDir = directory => {
    console.log(`Wating for changes at ${directory}`);
    return Rx.Observable.create(observer => {
        watch(directory, (eventType, filename) => {
            console.log(`Event raised! type: ${eventType}`);
            return observer.next({
                eventType,
fork icon0
star icon11
watch icon3

+ 3 other calls in file