Exploring Subject <T> In Reactive Extensions For .Net

Reactive Extensions for .Net provides developers a set of functionalities to implement reactive programming models for .Net developers making event handling simpler and expressive using declarative operations. While the key cornerstones of reactive extensions are the IObserver and IObservable interfaces, it is not often you need to implement the interfaces yourself as a developer. The library supports an inbuilt type Subject<T> which implements both the interfaces and supports a host of functions.

Subject is the basic of the different available subjects in the library, there are others too - ReplaySubject<T>BehaviorSubject<T> and AsyncSubject<T>. It is quite useful to understand the essential differences between them and how you could use them to make better use of the library.

In this article, we will compare the Subject<T> and its siblings in an attempt to illustrate the differences between their behaviors.

Subject<T>

As mentioned earlier, Subject<T> is the basic of the available subjects and it provides an easy way to work with the library without having to implement the IObservable<T> and IObserver<T> interfaces yourself. A simple demo of the Subject type is shown below.

var subject = new Subject<int>();
subject.Subscribe(Console.Write);
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(4);

In the code above, we have created an instance of Subject<T> and since it implements both IObserver<T> and IObserverable<T>, have used the same instance for Subscribe and publishing values to the IObserver. Another important point to note here is how we have used an overload of Subscribe method which accepts an action as input. The action would be executed for each value published, which in this case is to print the number to console.

Let us attempt to display the published values and values printed into the console by the IObserver (in this Action<T>) in the following image. This would help us compare the remaining siblings and variations easily.

Exploring Subject<T> in Reactive Extensions for .Net

The first line indicates the published values, while the second line indicates values received by the IObserver. Also, we have added a line to indicate at what point of execution Observer subscribes to the stream of data. This line is indicated by the dotted vertical line.

In the code above, we notice that the observer subscribes to the stream of data before the first value is published. This is illustrated in the image as the subscriber line is placed before the first element. As seen in the output line, this makes no difference (at this point) to the output.

But what if the Observer subscribes to the data only after some of the values are already published? Would that make a difference to the data received by the Observer? Let us first write the code for the same before looking at the output.

var subject = new Subject<int>();
subject.OnNext(1);
subject.OnNext(2);
subject.Subscribe(Console.Write);
subject.OnNext(3);
subject.OnNext(4);

In the code shown above, one can observe that the Observer subscribes to the stream of data only after two values (1 and 2) are published. As one would expect, this would result in the Observer not receiving the data which were published prior to the call to subscribe method. This is illustrated in the image below.

Exploring Subject<T> in Reactive Extensions for .Net

What if you wanted to read all the published values even if the Observer had subscribed late. This is where ReplaySubject<T> comes into the party.

ReplaySubject<T>

ReplaySubject<T> caches the values and replays them for late subscribers. This is useful in avoiding race conditions. Let us change the previous code to use ReplaySubject<T> and see how it impacts what the observer receives.

var subject = new ReplaySubject<int>();
subject.OnNext(1);
subject.OnNext(2);
subject.Subscribe(Console.Write);
subject.OnNext(3);
subject.OnNext(4);

As seen in the code above, there is almost no change in the code, except the fact that we are now using ReplaySubject<T> instead of Subject<T>. The following diagram illustrates the impact on the data received by the Observer.

Exploring Subject<T> in Reactive Extensions for .Net

As illustrated, the cached values are now replayed to the subscriber even if it subscribes late. This useful functionality of course at its own cost. This implementation would cache each and every value published by the subscriber, which could lead to undesirable memory issues when the volume of data is significantly larger.

However, ReplaySubject<T> has solutions for the problem in more than one way. For sake of this example, we will look into two examples, which would use Size and Time Constraints to limit the cached values.

As the first case, we will limit the cached values using the size of the cache. The constructor of ReplaySubject<T> provides an overload that accepts an integer which signifies the size (maximum element count) of the cache buffer. Let us change the code to limit the cache size to 1 in our example.

var subject = new ReplaySubject<int>(1);
subject.OnNext(1);
subject.OnNext(2);
subject.Subscribe(Console.Write);
subject.OnNext(3);
subject.OnNext(4);

Notice how we have supplied the size of Cache as 1 using the constructor overload of ReplaySubject<T>. This restricts the cache and ensures only one element is cached and would be replaced with the new element as soon as it is published. The impact of the change is illustrated below.

Exploring Subject<T> in Reactive Extensions for .Net

Another approach for restricting cache is limiting the time for which the item will be cached, or in other words, provide an expiry time for the item cached.

Let us write the code to illustrate the example.

var subject = new ReplaySubject<int>(TimeSpan.FromMilliseconds(1000));
subject.OnNext(1);
Thread.Sleep(500);
subject.OnNext(2);
Thread.Sleep(200);
subject.OnNext(3);
Thread.Sleep(500);
subject.Subscribe(Console.Write);
subject.OnNext(4);
Thread.Sleep(500);

Similar to the previous code, we have used an overload of the ReplaySubject<T> constructor to specify the expiry for items in the cache. To demonstrate our case, we have introduced delays between the publishing of values.

Since it takes a whole of 1200 ms before the Observer subscribes, any element which has surpassed the expiry duration of 1000 ms will be removed from the cache. In this example, this would result in the value 1 being removed from the cached and would not be replayed to the late subscriber. This is illustrated in the image below.

Exploring Subject<T> in Reactive Extensions for .Net

There are other overloads of ReplaySubject<T> which can provide more flexibility and fine-tune the cached values, but for sake of example, we will keep the illustration to two we have already covered above.

BehaviorSubject<T>

BehaviorSubject<T> is quite similar to ReplaySubject<T> in the fact it helps in caching value. But there is a significant difference. BehaviorSubject<T> caches only the last value which was published. Let us write some code before we discuss this further.

var subject = new BehaviorSubject<int>(0);
subject.OnNext(1);
subject.OnNext(2);
subject.Subscribe(Console.Write);
subject.OnNext(3);
subject.OnNext(4);

If the BehaviorSubject<T> caches only a single value (last known), then how is it different from having a ReplaySubject<T> with size 1? The illustration diagram as seen below definitely reflects it with respect to the code above.

Exploring Subject<T> in Reactive Extensions for .Net

However, that is not entirely true. There is two significant difference to understand here. The first one is the presence of a default value. Note in the above code, we have supplied a value 0 as the default value in the constructor of BehaviorSubject<T>. If no value is present in the cache (or in other words, no data is published before the Observer subscribes), then the default value would be returned. This is different compared to ReplaySubject<T> with size 1, which would not have any values. This behavior is demonstrated in the code and visual representation of the sequence below.

var subject = new BehaviorSubject<int>(0);
subject.Subscribe(Console.Write);
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(4);

Exploring Subject<T> in Reactive Extensions for .Net

The second difference is how both BehaviorSubject<T> and ReplaySubject<T> behaves when subscribing to the completed sequence. The BehaviorSubject<T> would not have any values when subscribed after completion as shown in the code below.

var subject = new BehaviorSubject<int>(0);
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(4);
subject.OnCompleted();
subject.Subscribe(Console.Write);

It is guaranteed that the subscriber would not receive any values since the subscription occurs after completion.

Exploring Subject<T> in Reactive Extensions for .Net

However, this is the case with ReplaySubject<T>. There is no guarantee that Observer would not receive any values as illustrated in the code below.

var subject = new ReplaySubject<int>(1);
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(4);
subject.OnCompleted();
subject.Subscribe(Console.Write);

As seen in the code above, the size of the cache is 1 and even if the subscription is called after the completion call, the cache is retained (until expiry condition is met) and hence in this case would receive the last published value.

Exploring Subject<T> in Reactive Extensions for .Net

AsyncSubject<T>

AsyncSubject<T>, the last of the siblings of Subject<T> we will explore in this article is quite similar to the preceding two (ReplaySubject and BehaviorSubject) as it too caches the result. But once again there is a significant difference. The AsyncSubject<T> would publish the last cached value (it caches only one value, the last one) only when the sequence is marked for completion.

Consider the following code.

var subject = new AsyncSubject<int>();
subject.Subscribe(Console.Write);
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(4);
subject.OnCompleted();

This would produce a single value to the Observer, the last value which was published before the sequence was marked for completion - value 4. This is illustrated in the diagram below.

Exploring Subject<T> in Reactive Extensions for .Net

But what would have happened if we had skipped the call to mark the sequence for completion? Let us comment out the line and try again.

var subject = new AsyncSubject<int>();
subject.Subscribe(Console.Write);
subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnNext(4);

This would not produce any data for the Observer as the AsyncSubject<T> would publish results only after the sequence is marked for completion.

Exploring Subject<T> in Reactive Extensions for .Net

This is a significant difference and anyone who works with AsyncSubject<T> should keep this in mind.

Conclusion

This article demonstrated the difference between various siblings of Subject<T> and some of its variants. It is often useful to understand these minor differences as it might showcase a different behavior than the one you expect if you aren't aware of it.


Similar Articles