Reactive Programming Using Rxjs

Reactive Programming using Rxjs

ReactiveX library is present in many languages.

For Example,
  • Javascript - RxJs 
  • Java - RxJava
  • Go - RxGo
  • Dart -RxDart etc . 
What is Rxjs ?
  1. RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.
  2. RXJS is used when we have requirement to work with streams of data instead of single values.
  3. RXJS has many operators to manipulate and transform the set of data received.
Example
  • stream of data - Supposed a click event is written on a button , there is a chance user might click on the button n number of times which emits data which is called stream of data.
  • single Values - we are making an API call which returns the data or sends a failure message which is called single value like once the API gives the response its is completed. 

npm Command to Install Rxjs 

npm install Rxjs

Observables

Observables are wrapper around streams of data which emits data asynchronously.

Data streams can be created from many things

  1. UI Events
  2. Http requests
  3. File Sytems
  4. Array like Objects
  5. Memory/Cache
  • Observables are used to watch these streams and emit functions when a value, error or complete signal is returned.
  • Observables can be subscribed to by an Observer.
  • Observer is there to execute some piece of code whenever observable sends any value, emits error or Observable reports that it is done
  • Subscription means the subscribe method using which we tell the Observable that someone(Observer) is waiting for the values , errors from the stream of data.
  • After receiving data Observer implements 3 methods using subscribe function

    •  next ( )
    • error ( )
    • complete ( )
  • The next method will be called whenever a value is returned from the Observable.
  • The error method is called whenever an error is returned from the Observable.
  • The complete method is called whenever an observable is done i.e no more values will be emitted in the future from the observable.
  • Some of the Observables might never finish as well.(Ex. When an observable is created for button click it never ends as user can click any point of time the button at any co-ordinate )
  • We can pass 3 methods to the subscribe function or an object containing 3 functions.
    1. 1..subscribe(  
    2.     (value) => {}, (error) => {}, (complete) => {})  
    3. 2. Var ob = {  
    4.     next: function() {},  
    5.     error: function() {},  
    6.     complete: function() {}  
    7. }.subscribe(ob);  

How does Observer know which method( next, error or complete ) to execute when receiving value from Observable ?

There is a contract between Observer and Observable using which observer is able to identify and call the appropriate methods.

Note

For Creating an observable we use next , complete and error methods

Creating an Observable using Rxjs with Create Method

public static create(onSubscription: function(observer: Observer): TeardownLogic): Observable

  • Observables can be created using many methods ,one of them is create .For more methods visit 
    http://reactivex.io/rxjs/class/es6/Observable.js~Observable.html 
  • Creates a new Observable, that will execute the specified function when an Observer subscribes to it.
  • A observable can have 3 methods next() , complete () and error ()
  • A well-formed Observable can emit as many values as it needs via next method, but complete and error methods can be called only once and nothing else can be called thereafter.
  • Calling next with a value will emit that value to the observer. Calling complete means that Observable finished emitting and will not do anything else. Calling error means that something went wrong - value passed to error method should provide details on what exactly happened.
  • When using TypeScript it is recommend to declare type signature of function passed to create as (observer: Observer) => TeardownLogic, where Observer and TeardownLogic are interfaces provided by the library.

Read the documentation Carefully for Javascript

Steps to create an Observable and then subscribe to it. 

Create a folder called RxjsSample -> go to cmd this folder then – npm install Rxjs

RxjsSample.js

  1. var Rx = require('rxjs/Rx');  
  2. var fromObservable = require('rxjs/Observable').Observable;  
  3. //creating an observable  
  4. var observable = fromObservable.create(function(ob) {  
  5.     ob.next(1);  
  6.     setTimeout(() => {  
  7.         ob.next("first message")  
  8.     }, 2000);  
  9.     setTimeout(() => {  
  10.         ob.next("second message")  
  11.     }, 4000);  
  12.     setTimeout(() => {  
  13.         ob.error("some error occured")  
  14.     }, 6000);  
  15. });  
  16. //observer subscribing to observable using subscription method  
  17. observable.subscribe(  
  18.     (data) => {  
  19.         console.log(data);  
  20.     }, (error) => {  
  21.         console.log(error);  
  22.     }, (completed) => {  
  23.         console.log('completed');  
  24.     }); 
The above observable created emits data after 2 , 4 , 6 seconds using next method. 

Cmd



we can unsubscibe the observable using unsubscribe method.

In the next articles we will discuss on the inbuilt observables in angular 2 and the operators in the Observables .