Javascript : RxJs

Rx is a library for composing asynchronous and event-based programs using observable collections.

The basic building blocks of RxJS are observables (producers) and observers (consumers). We already mentioned the two types of observables:

  • Hot observables : are pushing even when we are not subscribed to them (e.g., UI events).

  • Cold observables : start pushing only when we subscribe. They start over if we subscribe again.

Creating and Subscribing to Simple Observable Sequences

You do not need to implement the Observable class manually to create an observable sequence. Similarly, you do not need to implement Observer either to subscribe to a sequence. By installing the Reactive Extension libraries, you can take advantage of the Observable type which provides many operators for you to create a simple sequence with zero, one or more elements. In addition, RxJS provides an overloaded subscribe method which allows you to pass in onNext, onError and onCompleted function handlers.

Creating and subscribing to a simple sequence

Before getting into many operators, let’s look at how to create an Observable
from scratch using the Rx.Observable.create method.

First, we need to ensure we reference the core rx.js file.

<script src="rx.js"></script>

Or if we’re using Node.js, we can reference it as such:

var Rx = require('rx');

In this example, we will simply yield a single value of 42 and then mark it as completed. The return value is completely optional if no cleanup is required.


var source = Rx.Observable.create(observer => {
  // Yield a single value and complete
  observer.onNext(42);
  observer.onCompleted();

  // Any cleanup logic might go here
  return () => console.log('disposed')
});

var subscription = source.subscribe(
  x => console.log('onNext: %s', x),
  e => console.log('onError: %s', e),
  () => console.log('onCompleted'));

// => onNext: 42
// => onCompleted

subscription.dispose();
// => disposed

For most operations, this is completely overkill, but shows the very basics of how most RxJS operators work.

Example

Arrays are in memory values, Event Stream is similar to array , here events might happen over time. Whenever event happen the subscribe block will get execute.

Now, what is an event stream? It’s a sequence of events happening over time. You can kind of think of it as an asynchronous array.

Events happen over time, and we can add an event listener to this whole sequence. Whenever an event happens, we can react to it by doing something. That is the main idea.

Another type of sequence that you see in JavaScript, just, for instance, arrays. How does event streams relate to arrays? Well, arrays are sequences in space. All of these items in this array exist now in memory.


var source = ['1','1', 'foo', '2', '3', '5', 'bar', '8', '13'];

var result = source
		.map(x=>parseInt(x))
		.filter(x=>!isNaN(x))
		.reduce((x,y) => x+y);
console.log(result)

On the other hand, event streams don’t have that property. So, the events might happen over time, and you don’t even know what are the items that might happen.

Here we have those same items that happened in the array, but we have them happening over time every 400 milliseconds, like this. We add an event listener to the source, event stream, by calling subscribe. Whenever an event happens, we just console log that out, and we saw it happening every 400 milliseconds.


var source = Rx.Observable.interval(400).take(9)
		.map(i => ['1','1','foo','2','3','5','bar','8','13'] [i]]);

var result = source
		.map(x => parseInt(x))
		.filter(x=> !isNaN(x))
		.reduce(x,y) => x+y);

result.subscribe(x => console.log(x));

How does the reduce function know that you’ve reached the end of the event stream?

A stream can emit 3 types of events: normal data to the next callback, errors to the error callback, and end of stream with the complete callback. This is how the reduce operator can conclude with one value: as soon as the source completes, it outputs the reduced value.

What is RxJs

RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.

The essential concepts in RxJS which solve async event management are:

  1. Observable: represents the idea of an invokable collection of future values or events.

  2. Observer: is a collection of callbacks that knows how to listen to values delivered by the Observable.

  3. Subscription: represents the execution of an Observable, is primarily useful for cancelling the execution.

  4. Operators: are pure functions that enable a functional programming style of dealing with collections with operations like map, filter, concat, flatMap, etc.

  5. Subject: is the equivalent to an EventEmitter, and the only way of multicasting a value or event to multiple Observers.

  6. Schedulers: are centralized dispatchers to control concurrency, allowing us to coordinate when computation happens on e.g. setTimeout or requestAnimationFrame or others.

Example

mport { Observable } from 'rxjs/Observable';

var observable = Observable.create(function subscribe(observer) {
  var i = 0;
  setInterval(() => { observer.next(i++); }, 1000);
})

var subscription = observable.subscribe(function next(value) {
  console.log(value);
})

Example 2

import { of } from 'rxjs'; 
import { map } from 'rxjs/operators';


const source = of('World').pipe(
  map(x => `Hello ${x}!`)
);

source.subscribe(x => console.log(x));

Click For Angular NgRx Video

Reference

  1. Rxjs Observable Sequence

  2. Array Extras

  3. RxJs Functions

  4. Tntro To Reactive Programming with RxJS

  5. RxJs Operators

  6. Why RxJs

  7. Angular with RxJs

  8. RxJs SitePoint

  9. Javascript Array Methods

  10. Observable Methods

  11. Rxjs Version Changes
  12. ngRx

Leave a Reply

Your email address will not be published. Required fields are marked *