Getting Started With SignalR Using ASP.NET Core - Streaming Data Using Angular

In this article, we'll learn how to stream data to clients with signalr using aspnet core and Angular 5. We will go through the channel reader /writer which helps in reading/writing into a channel. The channels are playing vital role in streaming data using signalR

Introduction

In this article, we'll learn how to stream data to clients with SignalR using ASP.NET Core and Angular 5. We will go through the channel reader /writer which helps in reading/writing into a channel. The channels play a vital role in streaming data using SignalR. Streaming data is the type of consumer /producer pattern. Let's say the producer is writing data to a channel and the consumer is reading data from the channel. You can choose how to write and read the data from channels so it is memory efficient and high performance. Let us say you are producing data on the channel but somehow a consumer is no longer available to consume the data. In such a case, the memory will increase so you can restrict the channel context to be bound to some limit. Will discuss each of the channels in details below.

The channels were introduced in DOTNET Core 2.1 in "System.Threading.Channels" namespace under the CoreFX (https://github.com/dotnet/corefx) project. The channels are better than data flow and pipeline stream. They are more elegant performers in consumer producer queue patterns and with simple and powerful APIs.

This article is a part of the series on SignalR using ASPNET Core.

  • Overview of New Stack SignalR on ASPNET Core here
  • Getting Started With SignalR Using ASP.NET Core And Angular 5 here
  • Working with Azure service
  • Working with xamarin
  • Working with dynamic hubs

This article demonstrates the following.

  • Channel reader/ writer
  • Bounded / unbounded/Un Buffered Channels
  • Creating angular service to read the stream
  • Streaming Demo

Prerequisite

You must have the following software,

The source code is available at GitHub.

Channels

The System.Threading.Tasks.Channels library provides a set of synchronization data structures for passing the data between producers and consumers. Whereas the existing System.Threading.Tasks.Dataflow library is focused on pipelining and connecting together dataflow "blocks" which encapsulates both storage and processing. System.Threading.Tasks.Channels is focused purely on the storage aspect with data structures used to provide the hand-offs between the participants explicitly coded to use the storage. The library is designed to be used with async/await in C#.

Core channels

  • Unbound Channel
    Used to create a buffered, unbound channel. The channel may be used concurrently by any number of readers and writers, and is unbounded in size, limited only by available memory.
  • Bound channel
    Used to create a buffered channel that stores at most a specified number of items. The channel may be used concurrently by any number of reads and writers.
  • UnBuffered Channel
    Used to create an unbuffered channel. Such a channel has no internal storage for T items, instead, it has the ability to pairing up writers and readers to rendezvous and directly hand off their data from one to the other. TryWrite operations will only succeed if there's currently an outstanding ReadAsync operation, and conversely, TryRead operations will only succeed if there's currently an outstanding WriteAsync operation.

At the end of this article , you will be able to achieve the below demo.

ASP.NET
If you haven't read about "How to get started with SignalR using ASPNET Core and Angular 5", click here

In the previous article , we have learned how to create Angular SPA template and we have also seen the start up configuration and npm package installation. The purpose of this article is to work with streaming data in SignalR.

SignalR Startup Configuration

You need to configure the SignalR service in "Configure Service" section and Map the hub in configure section. It automatically adds the SignalR reference for you.

Add SignalR service in Configure service method

  1. // This method gets called by the runtime. Use this method to add services to the container.  
  2.         public void ConfigureServices(IServiceCollection services)  
  3.         {  
  4.             services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1);  
  5.   
  6.             // In production, the Angular files will be served from this directory  
  7.             services.AddSpaStaticFiles(configuration =>  
  8.             {  
  9.                 configuration.RootPath = "ClientApp/dist";  
  10.             });  
  11.   
  12.             services.AddSignalR();  
  13.   
  14.             services.AddSingleton<StockTicker>();  
  15.         }  

Configure Method

  1.  app.UseSignalR(route =>   
  2.     {  
  3.          route.MapHub<ChatHub>("/chathub");  
  4.          route.MapHub<StockTickerHub>("/stock");  
  5.     });  

After configuring SignalR in Startup we need to install SignalR Javascript library using NPM Package Manager. Run the below command in Command Prompt.

Command

  • npm init -y
  • npm install @aspnet/signalr
  • npm init to initialize or set the project npm package. -y | --yes to skip the questionnaire.

You have to run the above commands in Package manager console (Tools --> Nuget Package Manager --> Package Manager Console)

Now, we are going to create a separate hub for data streaming so I'm going to create a stock ticker hub. The Hub is taking care of communication between client and server.

Creating Stock Hub

We are going to create stock hub class which  inherits hub class. The hub has clients API to talk with clients so you can easily invoke. I've created a separate class for stock ticker which is take care about the list of stock and streaming data.

Stock Ticker Hub

  1. /// <summary>  
  2.     /// stock ticker hub   
  3.     /// </summary>  
  4.     public class StockTickerHub : Hub  
  5.     {  
  6.         private StockTicker _stockTicker;  
  7.   
  8.         public StockTickerHub(StockTicker stockTicker)  
  9.         {  
  10.             this._stockTicker = stockTicker;  
  11.         }  
  12.   
  13.         public IEnumerable<Stock> GetAllStocks()  
  14.         {  
  15.             return _stockTicker.GetAllStocks();  
  16.         }  
  17.   
  18.         public ChannelReader<Stock> StreamStocks()  
  19.         {  
  20.             return _stockTicker.StreamStocks().AsChannelReader(10);  
  21.         }  
  22.   
  23.         public string GetMarketState()  
  24.         {  
  25.             return _stockTicker.MarketState.ToString();  
  26.         }  
  27.   
  28.         public async Task OpenMarket()  
  29.         {  
  30.             await _stockTicker.OpenMarket();  
  31.         }  
  32.   
  33.         public async Task CloseMarket()  
  34.         {  
  35.             await _stockTicker.CloseMarket();  
  36.         }  
  37.   
  38.         public async Task Reset()  
  39.         {  
  40.             await _stockTicker.Reset();  
  41.         }  
  42.     }  

Stock Ticker Class

Which is taking care about loading stocks and writing data to stream

  1. public class StockTicker  
  2.     {  
  3.         private readonly SemaphoreSlim _marketStateLock = new SemaphoreSlim(1, 1);  
  4.         private readonly SemaphoreSlim _updateStockPricesLock = new SemaphoreSlim(1, 1);  
  5.   
  6.         private readonly ConcurrentDictionary<string, Stock> _stocks = new ConcurrentDictionary<string, Stock>();  
  7.   
  8.         private readonly Subject<Stock> _subject = new Subject<Stock>();  
  9.   
  10.         // Stock can go up or down by a percentage of this factor on each change  
  11.         private readonly double _rangePercent = 0.002;  
  12.   
  13.         private readonly TimeSpan _updateInterval = TimeSpan.FromMilliseconds(250);  
  14.         private readonly Random _updateOrNotRandom = new Random();  
  15.   
  16.         private Timer _timer;  
  17.         private volatile bool _updatingStockPrices;  
  18.         private volatile MarketState _marketState;  
  19.   
  20.         public StockTicker(IHubContext<StockTickerHub> hub)  
  21.         {  
  22.             Hub = hub;  
  23.             LoadDefaultStocks();  
  24.         }  
  25.   
  26.         private IHubContext<StockTickerHub> Hub  
  27.         {  
  28.             get;  
  29.             set;  
  30.         }  
  31.   
  32.         public MarketState MarketState  
  33.         {  
  34.             get { return _marketState; }  
  35.             private set { _marketState = value; }  
  36.         }  
  37.   
  38.         public IEnumerable<Stock> GetAllStocks()  
  39.         {  
  40.             return _stocks.Values;  
  41.         }  
  42.   
  43.         public IObservable<Stock> StreamStocks()  
  44.         {  
  45.             return _subject;  
  46.         }  
  47.   
  48.         public async Task OpenMarket()  
  49.         {  
  50.             await _marketStateLock.WaitAsync();  
  51.             try  
  52.             {  
  53.                 if (MarketState != MarketState.Open)  
  54.                 {  
  55.                     _timer = new Timer(UpdateStockPrices, null, _updateInterval, _updateInterval);  
  56.   
  57.                     MarketState = MarketState.Open;  
  58.   
  59.                     await BroadcastMarketStateChange(MarketState.Open);  
  60.                 }  
  61.             }  
  62.             finally  
  63.             {  
  64.                 _marketStateLock.Release();  
  65.             }  
  66.         }  
  67.   
  68.         public async Task CloseMarket()  
  69.         {  
  70.             await _marketStateLock.WaitAsync();  
  71.             try  
  72.             {  
  73.                 if (MarketState == MarketState.Open)  
  74.                 {  
  75.                     if (_timer != null)  
  76.                     {  
  77.                         _timer.Dispose();  
  78.                     }  
  79.   
  80.                     MarketState = MarketState.Closed;  
  81.   
  82.                     await BroadcastMarketStateChange(MarketState.Closed);  
  83.                 }  
  84.             }  
  85.             finally  
  86.             {  
  87.                 _marketStateLock.Release();  
  88.             }  
  89.         }  
  90.   
  91.         public async Task Reset()  
  92.         {  
  93.             await _marketStateLock.WaitAsync();  
  94.             try  
  95.             {  
  96.                 if (MarketState != MarketState.Closed)  
  97.                 {  
  98.                     throw new InvalidOperationException("Market must be closed before it can be reset.");  
  99.                 }  
  100.   
  101.                 LoadDefaultStocks();  
  102.                 await BroadcastMarketReset();  
  103.             }  
  104.             finally  
  105.             {  
  106.                 _marketStateLock.Release();  
  107.             }  
  108.         }  
  109.   
  110.         private void LoadDefaultStocks()  
  111.         {  
  112.             _stocks.Clear();  
  113.   
  114.             var stocks = new List<Stock>  
  115.             {  
  116.                 new Stock { Symbol = "HDFC Bank", Price = 2049.35m },  
  117.                 new Stock { Symbol = "Bharti Airtel", Price = 377.55m },  
  118.                 new Stock { Symbol = "SBI", Price = 273.00m },  
  119.                 new Stock { Symbol = "Reliance", Price = 984.35m }  
  120.             };  
  121.   
  122.             stocks.ForEach(stock => _stocks.TryAdd(stock.Symbol, stock));  
  123.         }  
  124.   
  125.         private async void UpdateStockPrices(object state)  
  126.         {  
  127.             // This function must be re-entrant as it's running as a timer interval handler  
  128.             await _updateStockPricesLock.WaitAsync();  
  129.             try  
  130.             {  
  131.                 if (!_updatingStockPrices)  
  132.                 {  
  133.                     _updatingStockPrices = true;  
  134.   
  135.                     foreach (var stock in _stocks.Values)  
  136.                     {  
  137.                         TryUpdateStockPrice(stock);  
  138.   
  139.                         _subject.OnNext(stock);  
  140.                     }  
  141.   
  142.                     _updatingStockPrices = false;  
  143.                 }  
  144.             }  
  145.             finally  
  146.             {  
  147.                 _updateStockPricesLock.Release();  
  148.             }  
  149.         }  
  150.   
  151.         private bool TryUpdateStockPrice(Stock stock)  
  152.         {  
  153.             // Randomly choose whether to udpate this stock or not  
  154.             var r = _updateOrNotRandom.NextDouble();  
  155.             if (r > 0.1)  
  156.             {  
  157.                 return false;  
  158.             }  
  159.   
  160.             // Update the stock price by a random factor of the range percent  
  161.             var random = new Random((int)Math.Floor(stock.Price));  
  162.             var percentChange = random.NextDouble() * _rangePercent;  
  163.             var pos = random.NextDouble() > 0.51;  
  164.             var change = Math.Round(stock.Price * (decimal)percentChange, 2);  
  165.             change = pos ? change : -change;  
  166.   
  167.             stock.Price += change;  
  168.             return true;  
  169.         }  
  170.   
  171.         private async Task BroadcastMarketStateChange(MarketState marketState)  
  172.         {  
  173.             switch (marketState)  
  174.             {  
  175.                 case MarketState.Open:  
  176.                     await Hub.Clients.All.SendAsync("marketOpened");  
  177.                     break;  
  178.                 case MarketState.Closed:  
  179.                     await Hub.Clients.All.SendAsync("marketClosed");  
  180.                     break;  
  181.                 default:  
  182.                     break;  
  183.             }  
  184.         }  
  185.   
  186.         private async Task BroadcastMarketReset()  
  187.         {  
  188.             await Hub.Clients.All.SendAsync("marketReset");  
  189.         }  
  190.     }  
  191.   
  192.     public enum MarketState  
  193.     {  
  194.         Closed,  
  195.         Open  
  196.     }  

Creating Stock Angular Service

Now, I'm going to create stock signalR service for the Angular client to subscribe the stream and call any method. This service has three components.

  • Create Connection - take care about creating connection with hub
  • Register Server Events - registering on Events like receive message
  • Start the connection and subscribe for stream

Stock.signalR.service.ts

  1. import { EventEmitter, Injectable } from '@angular/core';  
  2. import { HubConnection,HubConnectionBuilder, IStreamResult } from '@aspnet/signalr'  
  3.   
  4.   
  5. @Injectable()  
  6. export class stockSignalRService {  
  7.   connectionEstablished = new EventEmitter<Boolean>();  
  8.   marketOpened = new EventEmitter<Boolean>();  
  9.   marketClosed = new EventEmitter<Boolean>();  
  10.   
  11.   private connectionIsEstablished = false;  
  12.   private _stockHubConnection: HubConnection;  
  13.   
  14.   
  15.   constructor() {  
  16.     this.createConnection();  
  17.     this.registerOnServerEvents();  
  18.     this.startConnection();  
  19.   }  
  20.   
  21.   private createConnection() {  
  22.     this._stockHubConnection = new HubConnectionBuilder()  
  23.       .withUrl('/stock')  
  24.       .build();  
  25.   }  
  26.   
  27.   private startConnection(): void {  
  28.     this._stockHubConnection  
  29.       .start()  
  30.       .then(() => {  
  31.         this.connectionIsEstablished = true;  
  32.         console.log('stock connection started');  
  33.         this.connectionEstablished.emit(true);  
  34.       }).catch(err => {  
  35.         setTimeout(this.startConnection(), 5000);  
  36.       });  
  37.   }  
  38.   
  39.   private registerOnServerEvents(): void {  
  40.     this._stockHubConnection.on("marketOpened", () => {  
  41.       console.log("marketOpened");  
  42.       this.marketOpened.emit(true);  
  43.     });  
  44.   
  45.     this._stockHubConnection.on("marketClosed",() => {  
  46.       console.log("marketClosed");  
  47.       this.marketClosed.emit(true);  
  48.     });  
  49.   }  
  50.   
  51.   public startStreaming(): IStreamResult<any> {  
  52.     return this._stockHubConnection.stream("StreamStocks");  
  53.   }  
  54.   
  55.   public getAllStocks(): Promise<any> {  
  56.     return this._stockHubConnection.invoke("getAllStocks");  
  57.   }  
  58.   
  59.   public openMarket() {  
  60.     this._stockHubConnection.invoke("OpenMarket");  
  61.   }  
  62.   
  63.   public CloseMarket() {  
  64.     this._stockHubConnection.invoke("CloseMarket");  
  65.   }  
  66.   
  67.   public ResetMarket() {  
  68.     this._stockHubConnection.invoke("Reset");  
  69.   }  
  70.     
  71. }  

After creating the service, we have to register with the provider to access in components. Either register for a global provider in app module or for specific one at self.

Stock Component

The Component takes care of rendring the stream data.

stock.component.ts Class

  1. import { Component } from "@angular/core";  
  2.   
  3. import { stockSignalRService } from "../services/stock.signalR.service";  
  4. import { forEach } from "@angular/router/src/utils/collection";  
  5.   
  6.   
  7. @Component({  
  8.   templateUrl: './stock.component.html',  
  9.   selector:"app-stock"  
  10. })  
  11.   
  12. export class StockComponent {  
  13.   
  14.   stocks = [];  
  15.   marketStatus: string;  
  16.   
  17.   constructor(private stockService: stockSignalRService) {  
  18.     this.stocks = [];  
  19.     this.marketStatus = 'closed';  
  20.     //subscribe for connection eastablish  
  21.     //fetch the stocks details  
  22.     stockService.connectionEstablished.subscribe(() => {  
  23.       stockService.getAllStocks().then((data) => {  
  24.         this.stocks = data;  
  25.       });  
  26.     });  
  27.   
  28.     //subscribe for market open  
  29.     stockService.marketOpened.subscribe(() => {  
  30.       this.marketStatus = 'open';  
  31.       this.startStrearming();  
  32.     });  
  33.   
  34.     //subscribe for market close  
  35.     stockService.marketClosed.subscribe(() => {  
  36.       this.marketStatus = 'closed';  
  37.     });  
  38.       
  39.   }  
  40.   
  41.   openMarketClicked() {  
  42.     this.stockService.openMarket();  
  43.   }  
  44.   
  45.   startStrearming() {  
  46.     this.stockService.startStreaming().subscribe({  
  47.       next: (data) => {  
  48.         this.displayStock(data);  
  49.       },  
  50.       error: function (err) {  
  51.         console.log('Error:' + err);  
  52.       },  
  53.       complete: function () {  
  54.         console.log('completed');  
  55.       }  
  56.     });  
  57.   }  
  58.   
  59.   closeMarketClicked() {  
  60.     this.stockService.CloseMarket();  
  61.   }  
  62.   
  63.   resetClicked() {  
  64.     this.stockService.ResetMarket();  
  65.   }  
  66.   
  67.   displayStock(stock) {  
  68.     console.log("stock updated:" + stock.symbol);  
  69.     for (let i in this.stocks) {  
  70.       //console.log(i);  
  71.       if (this.stocks[i].symbol == stock.symbol) {  
  72.         this.stocks[i] = stock;  
  73.       }  
  74.     }  
  75.   }  
  76.   
  77. }  

stock.component.html 

  1. <div>  
  2.   <br />  
  3.   <button name="openmarket" class="btn btn-primary" (click)="openMarketClicked()">Open Market</button>  
  4.   <button name="" class="btn btn-danger" (click)="closeMarketClicked()">Close Market</button>  
  5.   <button name="" class="btn btn-default" (click)="resetClicked()">Reset</button>  
  6.   <p>Stock Count : {{stocks.length}}</p>  
  7.   <p>Market Status : {{marketStatus}}</p>  
  8.   
  9.   <table class="table table-response table-bordered">  
  10.     <thead>  
  11.     <th>Symbol</th>  
  12.     <th>Price</th>  
  13.     <th>Change %</th>  
  14.     </thead>  
  15.     <tr *ngFor="let stock of stocks">  
  16.       <td>{{stock.symbol}}</td>  
  17.       <td>{{stock.price}}</td>  
  18.       <td>{{stock.percentChange}} {{stock.percentChange >= 0 ? '▲' : '▼'}}</td>  
  19.     </tr>  
  20.   </table>  
  21. </div>  

Finally, we are finished with the streaming demo with SignalR using ASPNET Core and Angular 5.

Summary

In this article, we have seen the working demo of Streaming data with SignalR using ASP.NET Core and Angular 5. We have learned the following -

  • EventEmmiter: to emit the custom event. Read more here
  • HubConnection and HubConnectionBuilder : to represent SignalR hub connection and a builder for configuring HubConnection instances.
  • C# Channels : consumer / Producers based high performant channels.

References