RxJS Operators In Angular

In this blog, I'm going to explore the use of RxJS operators. As a beginner developer, we hear a lot about Promise/Observables/Subscription to call asynchronous services and perform data operations using traditional means, such as - loops, custom property mapper, and class models and so on. Instead, we can use various RxJS operators which are very easy and simple to write. In this blog, I will be demonstrating some of the real-time use cases of our day to day work and handling complex response in an easy way.

of() is used for converting the string/objects to Observables.

  1. import { Observable, of, from } from 'rxjs';   
  1. ngOnInit() {  
  2.   
  3.     const employee = {  
  4.       name: 'Rajendra'  
  5.     };  
  6.     const obsEmployee: Observable<any>  = of(employee);  
  7.     obsEmployee.subscribe((data) => { console.log(data); });  
  8.   
  9.   }  
RxJs Operators In Angular
  1. ngOnInit() {  
  2.   
  3.     const employee = {  
  4.       name: 'Rajendra'  
  5.     };  
  6.     const obsEmployee: Observable<any>  = of('Rajendra Taradale');  
  7.     obsEmployee.subscribe((data) => { console.log(data); });  
  8.   
  9.   }  
RxJs Operators In Angular 
Map() is used to manipulate the data from observable return values in observable fashion-
  1. import { map } from 'rxjs/operators';  
  1. ngOnInit() {  
  2.   const data = of('Rajendra Taradale');  
  3.   data  
  4.   .pipe(map(x => x.toUpperCase()))  
  5.   .subscribe((d) => { console.log(d); });  
  6. }  
RxJs Operators In Angular
 
Share(), no matter how many times you subscribe, will return a single source URL. Let us see in this code. Here, I'm using a loader to be set true while HTTP call is in progress once I receive the data from service, I will close the loader. This is just an example to show you the use of Share() operator. 
  1. import { share} from 'rxjs/operators';  
Here is a sample API Method which will return a list of users.
  1. getPosts(): Observable<any[]> {  
  2.    return this.http.get<any[]>('https://jsonplaceholder.typicode.com/users'));  
  3.  }  
 This function will set the loader to true. 
  1. setLoading(obs: Observable<any>) {  
  2.     this.loading = true;  
  3.     obs.subscribe(() => this.loading = false);  
  4.   }  
Let's check the calling method without share().
  1.   ngOnInit() {  
  2.     const request = this.getPosts();  
  3.     this.setLoading(request);  
  4.     request.subscribe(data => console.log(data));  
  5. }  
If you observe the above code, you will find that I'm using .subscribe in setLoading() and request.subscribe(), which will invoke our service twice. Let us check the output.
 
RxJs Operators In Angular
 
Now, let us check with Share() operator.
  1. getPosts(): Observable<any[]> {  
  2.    return this.http.get<any[]>   
  3.       ('https://jsonplaceholder.typicode.com/users').pipe(share());  
  4.  }  
Here comes the output.
 
RxJs Operators In Angular 
 
Switch Map
 
It cancels one observable and switches to another one.
  1. getUsers(): Observable<any[]> {  
  2.     return this.http.get<any[]>('https://jsonplaceholder.typicode.com/users');  
  3.   }  
  4.   
  5.   getPosts(): Observable<any[]> {  
  6.     return this.http.get<any[]>('http://jsonplaceholder.typicode.com/posts');  
  7.   }  
Let's see the switch map().
  1. ngOnInit() {  
  2.   const reqPosts = this.getPosts();  
  3.   const reqUsers = this.getUsers();  
  4.   
  5.   const reqPostsUser = reqPosts.pipe(  
  6.     switchMap(posts => {  
  7.       return reqUsers.pipe(tap(users => {  
  8.         console.log('Posts List ', posts);  
  9.         console.log('User List ', users);  
  10.       }));  
  11.     })  
  12.   );  
Again, check the output.
 
RxJs Operators In Angular

DebounceTime and DistinctUntilChanged

Both are very useful when you do some search and based on the search text change, you have an HTTP call which will return the response. It will emit out the value from the source Observable only after a specified time has passed. 
  1. this.personalForm.get('firstName').valueChanges.pipe(debounceTime(500)).subscribe(  
  2.       value => {  
  3.         console.log(value);  
  4.       }  
  5.     );   
  1. this.personalForm.get('firstName').valueChanges.pipe(distinctUntilChanged()).subscribe(  
  2.      value => {  
  3.        console.log(value);  
  4.      }  
  5.    );  
Unsubscribe() names itself, say un-follow or stop data flow. In Angular, we use this in NgOnDistroy() lifecycle hook to make sure all the subscriptions are closed and no more data is there to receive or perform any operation.  
  1. import { Subscription } from 'rxjs' 
  1. Request: Subscription;   
  1. CallSErvice() {    
  2.     if (this.Request != null && !this.Request.closed) {    
  3.       this.Request.unsubscribe();    
  4.     }    
  5.     this.Request = this.getUsers().subscribe();    
  6.   }  
Let us check the output now. I have intentionally clicked multiple times - Yes, it's cancelled the exisiting calls.
 
RxJs Operators In Angular 
 
Note
Here is another ready-made feature to unsubscribe all observables.
 
https://www.npmjs.com/package/ngx-auto-unsubscribe
 
Take()/First() /TakeUntil()/TakeWhile()/TakeLast()

These operators are just another way to handle or manage your observables data and take and ignore the requests accordingly

We will play with the below code to demonstrate other operators.

  1. import { Observable, of, from, Subscriber, Subscription, fromEvent, Subject } from 'rxjs';  
  2. import { map, share, switchMap, tap, count, first, takeUntil} from 'rxjs/operators';   
  1. const eventSource= fromEvent(document, 'click');  
  2.     eventSource.subscribe(()=>{  
  3.            console.log('clicked 'this.count);  
  4.            this.count++;  
  5.     });  
 On every Dom click, the log will be stored in console.
 
RxJs Operators In Angular 
 
First()
 
No matter how many times I click, this will serve the first request only.
  1. const eventSource= fromEvent(document, 'click');  
  2.   eventSource.pipe(first()).subscribe(()=>{  
  3.          console.log('clicked 'this.count);  
  4.          this.count++;  
  5.   });  
 RxJs Operators In Angular
We can see only first click printed in the console. 
 
Takewhile() - In this case, we can handle observables conditionally.
  1. const eventSource= fromEvent(document, 'click');  
  2.     eventSource.pipe(takeWhile(()=> this.count < 3)).subscribe(()=>{  
  3.            console.log('clicked 'this.count);  
  4.            this.count++;  
  5.     });  
RxJs Operators In Angular
TakeLast() - In this case, it will serve the last emitted values. If we use code takeLast(2), the last two values will get published.
  1. const eventSource= of(1, 2, 3, 4, 5);  
  2.     eventSource.pipe(takeLast(2)).subscribe((d)=>{  
  3.            console.log('Get last Value ',d);  
  4.              
  5.     });  
 RxJs Operators In Angular

TakeUntil() is useful when you are working with other observables, and based on other observables you emit  a value on start, and stop the emitted values

  1. startClick = new Subject<void>();   
  1. const eventSource = fromEvent(document, 'click');  
  2.     eventSource.pipe(takeUntil(this.startClick)).subscribe(() => {  
  3.            console.log('clicked ');  
  4.     });   
  1. stopClick() {  
  2.     this.startClick.next();  
  3.     this.startClick.complete();  
  4.   }  
MergeMap/FlatMap
 
Merging two observables into one. 
  1. const reqPosts: Observable<any> = this.getPosts();  
  2.     const reqUsers: Observable<any> = this.getUsers();  
  3.   
  4.     const dt: Observable<any> = reqPosts.pipe(  
  5.       mergeMap(post=>{  
  6.         return reqUsers.pipe(  
  7.           map(user=>{  
  8.             const allData = {  
  9.               rpost:post,  
  10.               ruser:user  
  11.             };  
  12.             return allData;  
  13.           })  
  14.         )  
  15.       })  
  16.     )  
  17.   
  18.     dt.subscribe((dt)=>console.log(dt));  
See the output
 
RxJs Operators In Angular
Concat() It's just another way to combine two observables but it will emit the subscribe method two times.
 
forkJoin() it will execute observables in parallel fashion, it will return values only after both observables are complete in an array format 
  1. const reqPosts: Observable<any> = this.getPosts();  
  2.     const reqUsers: Observable<any> = this.getUsers();  
  3.   
  4.     const combinedData = forkJoin(reqPosts, reqUsers);  
  5.     combinedData.subscribe(dt => console.log(dt));  
 Let's see the output
 
RxJs Operators In Angular
A conclusion from RxJs operator is never jump directly to manual modification of observable data, it's a hectic and time-consuming task, always look at RxJs operators if you come across any work with async services and observables. Just take a look at operators, and pick the one which suits your task.