Getting Started With Reactive Extensions

In Today's section, we will talk about the Rx library. Basically, Rx is a library for composing asynchronous and event-based programs using observable collections. This is very useful in the case wherein you are pulling data asynchronously from different sources and then manipulating the same and finally printing the result. In these kinds of scenarios, you have to write lots of glue code and of course, these codes will be error-prone. Let's say one of the sources just throws an error, then what will happen?
 
This way, you really need to do lots of stuff for the things working fine. Hence, Rx is an answer to this kind of situation, which keeps the thing simple yet lightweight. Rx also uses a LINQ query on the observable collections.
 
But, it would be nice to talk a little about collections before starting Rx. IEnumerables is one of the most widely used Pull Based collection which is synchronous in nature. The following is the sample snippet for the same.
  1. interface IEnumerable<out T>    
  2. {    
  3.     IEnumerator<T> GetEnumerator();    
  4. }    
  5.   
  6. interface IEnumerator<out T>:IDisposable    
  7. {    
  8.     bool moveNext();    
  9.     T currennt { get; }    
  10.     void Reset();    
  11. }    
And let us suppose due to any reason, your datasource went down for some time, then what will happen? It will keep waiting until the database comes online. And, you will land in an embarrassing situation something like shown below.
 
waiting until database comes online
 
And, you will end up doing something like this!
 
doing something like this
 
However, you can convert the same Pull based interface to Push based interface. The following is the sample for the same.
  1. //Observables:- Push based     
  2. interface IObservable<out T>    
  3. {    
  4.     IDisposable subscribe(IObserver<T> observer);    
  5. }    
  6.   
  7. interface IObserver<in T>    
  8. {    
  9.     void onNext(T value);    
  10.     void onError(Exception ex);    
  11.     void onCompleted();    
  12. }    
Very complete, precise, and stable. We will see a few examples around the same in a moment. The following is the sample console app, you can see that IObservable and IObserver are available by default in .NET 4.0 and higher.
 
 
Now, let me go ahead and install the Rx extension from Nuget.
 
install Rx extension from nuget
 
On successful installation, you can verify the assemblies.
 
reference
 
Now, these are three phases of getting observables.
 
three phases of getting observables
 
Let me explain the same with a simple demo. Now, as you can see in the following screenshot, as soon as I started creating observable, it started giving a bunch of overloads which I can make use of:
 
overloads
  1. using System;    
  2. using System.Reactive.Linq;    
  3.     
  4. namespace ReactiveExtensions    
  5. {    
  6.     internal class Program    
  7.     {    
  8.         private static void Main(string[] args)    
  9.         {    
  10.             IObservable<string> obj = Observable.Generate(    
  11.                 0, //Sets the initial value like for loop    
  12.                 _ => true//Don't stop till i say so, infinite loop    
  13.                 i => i + 1, //Increment the counter by 1 everytime    
  14.                 i => new string('#', i), //Append #    
  15.                 i => TimeSelector(i)); //delegated this to private method which just calculates time    
  16.     
  17.             //Subscribe here    
  18.             using (obj.Subscribe(Console.WriteLine))    
  19.             {    
  20.                 Console.WriteLine("Press any key to exit!!!");    
  21.                 Console.ReadLine();    
  22.             }    
  23.         }    
  24.     
  25.         //Returns TimeSelector    
  26.         private static TimeSpan TimeSelector(int i)    
  27.         {    
  28.             return TimeSpan.FromSeconds(i);    
  29.         }    
  30.     }    
  31. }   
In the above snippet, I have mentioned comments on each line that what it is doing. Best thing with this is its asynchronous nature which you can see in the following output:
 
output
 
Here, my program is executing on one thread and I am also typing there on the same console, which means UI is not blocked. It's free for any activity. I hope, you would have liked this small example around Observables.
 
We will deliver more in the coming sections. Till then, stay tuned! Happy Coding!


Similar Articles