Introduction To RxJava

Introduction

 
RxJava is one of the most talked libraries for enabling Reactive programming in Android development. RxJava is a widely used library for performing asynchronous tasks. It is very popular among developers because it eliminates some boilerplate code and raises the level of abstraction.
  

Definition

 
According to the definition: "RxJava is a Java VM implementation of Reactive Extensions."
 
So, What are Reactive Extensions? 
 
The official docs say that "Reactive Extension(ReactiveX) is a library for composing asynchronous and event-based programs by using observable sequences."
 
Now, according to the definition, let's understand what these terms are - asynchronous, event-based etc. 
  • Asynchronous
    It implies that the different parts of a program run simultaneously.
      
  • Event-Based
    The program executes the code based on the events generated while the program is running. For example, a button click triggers an event and then the program’s event handler receives this event and does some work accordingly.
     
  • Observable sequences
    Observable and Flowable take some items and pass onto their subscribers. So, these items are called as Observable sequences or Data Stream.
     
  • RxJava frees us from the callback hell by providing the composing style of programming. We can plug in various transformations that resemble the Functional programming.
RxJava uses Observer and observable pattern where the subject all the time maintains its Observers and if any change occurs, then it notifies them by calling one of their methods.
 

3 O's of RxJava

 
In this RxJava world, everything is treated as streams. A stream emits item(s) over time, and each emission can be consumed/observed. Although the stream is not a new concept, you might be aware of the fact that push notifications are also a stream. Location update is also a stream.
 
Introduction To RxJava
 
The stream abstraction is implemented through three main constituents - Observables, Observers, and Operators. Observables emit data and observers consume that emitted data. Emissions from Observable objects can further be modified, transformed, and manipulated by chaining Operator calls.
 
Before implementing some code, we must add a gradle in our build.gradle.
  1. // reactive  
  2.     implementation 'io.reactivex.rxjava2:rxandroid:2.0.2'  
  3.   
  4.     // Because RxAndroid releases are few and far between, it is recommended you also  
  5.     // explicitly depend on RxJava's latest version for bug fixes and new features.  
  6.     implementation 'io.reactivex.rxjava2:rxjava:2.1.7'  

Observable

 
Observable basically emits the data. Observable is the stream abstraction in RxJava. It resembles the iterator in Java and iterates through and produces those items in an orderly fashion. If we want to emit numbers 1,2,3 etc. in that order, then we can use Observable. For example, the code shown below.
  1. Observable<Integer> observable = Observable.create(new Observable.OnSubscribe<Integer>() {  
  2.    @Override public void call(Subscriber<? super Integer> subscriber) {  
  3.        subscriber.onNext(1);  
  4.        subscriber.onNext(2);  
  5.        subscriber.onNext(3);  
  6.        subscriber.onCompleted();  
  7.    }  
  8. });  
Invoking subscriber.onNext(Integer) emits an item in the stream and when the stream has finished emitting, subscriber.onCompleted() is invoked. There is a way to emit items using the just operator like shown below.
  1. Observable.just(123); // 1, 2, 3 will be emitted, respectively  

Observer

 
Observer is another component of RxJava. Observers are subscribed to the Observables whenever there is a change or an event of interest occurs it immediately notifies by the following events.
  • Observer#onNext(T) - invoked when an item is emitted from the stream
  • Observable#onError(Throwable) - invoked when an error has occurred within the stream
  • Observable#onCompleted() - invoked when the stream is finished emitting items.
  1. Observable<Integer> observable = Observable.just(123);  
  2. observable.subscribe(new Observer<Integer>() {  
  3.    @Override public void onCompleted() {  
  4.        Log.d("Test""In onCompleted()");  
  5.    }  
  6.   
  7.    @Override public void onError(Throwable e) {  
  8.        Log.d("Test""In onError()");  
  9.    }  
  10.   
  11.    @Override public void onNext(Integer integer) {  
  12.        Log.d("Test""In onNext():" + integer);  
  13.    }  
  14. });  
Output produced in logcat is as follows,
 
In onNext(): 1
In onNext(): 2
In onNext(): 3
In onNext(): 4
In onCompleted()
 

Operators

 
The items emitted by Observables are manipulated before notifying the subscribed Observer object(s). There are many operators available in RxJava including map, filter, reduce, flatmap etc. Let's look at the map example since we are modifying the above example.
  1. Observable.just(12345).map(new Func1<Integer, Integer>() {  
  2.    @Override public Integer call(Integer integer) {  
  3.        return integer * 3;  
  4.    }  
  5. }).subscribe(new Observer<Integer>() {  
  6.    @Override public void onCompleted() {  
  7.        // ...  
  8.    }  
  9.   
  10.    @Override public void onError(Throwable e) {  
  11.        // ...  
  12.    }  
  13.   
  14.    @Override public void onNext(Integer integer) {  
  15.        // get output here 
  16.    }  
  17. });  
Output produced is every number multiplied by 3 i.e, 3,6,9,12,15.
 
How it simplifying things.
 

Network Call - AsyncTask vs RxJava

 
Let's see the difference between these two approaches. Firstly, we have a very popular and old way to make a network call. AsyncTask has been the traditional way to make calls until fast networking libraries came into the picture. 
 
Create an inner class and extend to AsyncTask, make network operation, and in postExecute(), update the UI. This is the approach this network call uses. Everything seamlessly looks good until the phone rotates. Once the phone is rotated, the code gets blown up and the app gets crashes because the activity recreates itself.
 
Memory leaks occur in this approach because this inner class has a reference of the outer class. If we want to chain another long operation, then we have to nest the other tasks which will become a non-readable code.
  1. public class NetworkRequestCall extends AsyncTask<Void, Void, User> {  
  2.   
  3.     private final int userId;  
  4.   
  5.     public NetworkRequestCall(int userId) {  
  6.         this.userId = userId;  
  7.     }  
  8.   
  9.     @Override protected User doInBackground(Void... params) {  
  10.         return networkService.getUserDetails(userId);  
  11.     }  
  12.   
  13.     @Override protected void onPostExecute(User user) {  
  14.         nameTextView.setText(user.getName());  
  15.         // ...set other views  
  16.     }  
  17. }  
  18.      
  19. private void onButtonClicked(Button button) {  
  20.    new NetworkRequestCall(123).execute()  
  21. }  
However, in RxJava, this call looks different as shown below.
  1. private Subscription subscription;  
  2.   
  3. private void onButtonClicked(Button button) {  
  4.    subscription = networkService.getObservableUser(123)  
  5.                       .subscribeOn(Schedulers.io())  
  6.                       .observeOn(AndroidSchedulers.mainThread())  
  7.                       .subscribe(new Action1<User>() {  
  8.                           @Override public void call(User user) {  
  9.                               nameTextView.setText(user.getName());  
  10.                               // ... set other views  
  11.                           }  
  12.                       });  
  13. }  
  14.   
  15. @Override protected void onDestroy() {  
  16.    if (subscription != null && !subscription.isUnsubscribed()) {  
  17.        subscription.unsubscribe();  
  18.    }  
  19.    super.onDestroy();  
  20. }  
In this approach, when an activity is destroyed, we can unsubscribe to the activity. A call does not execute when an activity is destroyed so a potential crash may occur or memory/context leaks are avoided.
 

Conclusion

 
In this article, we learned how RxJava is simplifying things. We saw AsyncTask and how RxJava is overcoming the issues with the AsyncTask. There are many other uses cases of RxJava, like in the TimerTask. This Observer and Observable pattern of programming is good enough to deal with the network operations and other cases also.


Similar Articles