Using WebSocket To Build Real-Time Application Via ASP.NET Core

Introduction

 
WebSocket is a protocol providing full-duplex communication channels over a single TCP connection that makes more interaction between a browser and a web server possible, facilitating the real-time data transfer from and to the server.
 
In this article, I will show you a sample that uses WebSocket to build a real-time application about distributing tasks.
 
Suppose we have a task center, there are some tasks will be published to the task center, and when the center receives some tasks, it should distribute the tasks to workers.
 
Let's take a look at the result at first.
 
Using WebSocket To Build Real-Time Application Via ASP.NET Core
 
There are four clients connected to the WebSocket Server, and after publishing messages via RabbitMQ management, the messages will be distributed to different clients in time.
 
Note
Clients means the workers, WebSocket Server means the task center and RabbitMQ management simulates sending task to the task center, and the messages means the tasks that should be handled.
 
And here is the architecture diagram that shows how it works.
 
Using WebSocket To Build Real-Time Application Via ASP.NET Core
 
Let's take a look on how to accomplish it.
 

Setup RabbitMQ

 
Before writing some code, we should run up the RabbitMQ server at first. The fastest way is to use Docker.
  1. docker run -p 5672:5672 -p 15672:15672 rabbitmq:management  

WebSocket Server

 
This is the most important section!
 
At first, we should configure the WebSocket in Startup class.
  1. public class Startup  
  2. {  
  3.     // other ...  
  4.       
  5.     public void ConfigureServices(IServiceCollection services)  
  6.     {  
  7.         services.AddSingleton<Handlers.IDisHandler, Handlers.DisHandler>();  
  8.         services.AddControllers();  
  9.     }  
  10.   
  11.     public void Configure(IApplicationBuilder app, IWebHostEnvironment env)  
  12.     {  
  13.         // other ..  
  14.       
  15.         var webSocketOptions = new WebSocketOptions()  
  16.         {  
  17.             KeepAliveInterval = TimeSpan.FromSeconds(120),  
  18.             ReceiveBufferSize = 4 * 1024  
  19.         };  
  20.   
  21.         app.UseWebSockets(webSocketOptions);  
  22.   
  23.         app.Use(async (context, next) =>  
  24.         {  
  25.             // ws://www.yourdomian.com/push  
  26.             if (context.Request.Path == "/push")  
  27.             {  
  28.                 if (context.WebSockets.IsWebSocketRequest)  
  29.                 {  
  30.                     WebSocket webSocket = await context.WebSockets.AcceptWebSocketAsync();  
  31.   
  32.                     try  
  33.                     {  
  34.                         var handler = app.ApplicationServices.GetRequiredService<Handlers.IDisHandler>();  
  35.                         await handler.PushAsync(context, webSocket);  
  36.                     }  
  37.                     catch (Exception ex)  
  38.                     {  
  39.                         Console.WriteLine(ex.Message);  
  40.                     }                         
  41.                 }  
  42.                 else  
  43.                 {  
  44.                     context.Response.StatusCode = 400;  
  45.                 }  
  46.             }  
  47.             else  
  48.             {  
  49.                 await next();  
  50.             }  
  51.         });  
  52.     }  
  53. }  
Next step is to handle the logic about how to push messages to clients. Here create a class named DisHandler to do it.
 
Creating RabbitMQ connection, channel and consumer on the constructor, and the important part of the Received event of consumer.
 
Let's take a look at the Received event of consumer.
  1. consumer.Received += async (ch, ea) =>  
  2. {  
  3.     var content = Encoding.UTF8.GetString(ea.Body);  
  4.     Console.WriteLine($"received content = {content}");  
  5.   
  6.     // {"TaskName":"Demo", "TaskType":1}  
  7.     var msg = Newtonsoft.Json.JsonConvert.DeserializeObject<MqMsg>(content);  
  8.   
  9.     var workIds = Worker.GetByTaskType(msg.TaskType);  
  10.     var onlineWorkerIds = _sockets.Keys.Intersect(workIds).ToList();                  
  11.     if (onlineWorkerIds == null || !onlineWorkerIds.Any())  
  12.     {  
  13.         if (!ea.Redelivered)  
  14.         {  
  15.             Console.WriteLine("No online worker, reject the message and requeue");  
  16.             // should requeue here  
  17.             _channel.BasicReject(ea.DeliveryTag, true);  
  18.         }  
  19.         else  
  20.         {       
  21.             // should not requeue here, but this message will be discarded  
  22.             _channel.BasicReject(ea.DeliveryTag, false);  
  23.         }  
  24.     }  
  25.     else  
  26.     {  
  27.         // free or busy  
  28.         var randomNumberBuffer = new byte[10];  
  29.         new RNGCryptoServiceProvider().GetBytes(randomNumberBuffer);  
  30.         var rd = new Random(BitConverter.ToInt32(randomNumberBuffer, 0));                                          
  31.         var index = rd.Next(0, 9999) % onlineWorkerIds.Count;  
  32.         var workerId = onlineWorkerIds[index];  
  33.   
  34.         if (_sockets.TryGetValue(workerId, out var ws) && ws.State == WebSocketState.Open)  
  35.         {  
  36.             // simulating handle the message an get the result.  
  37.             // put your own logic here  
  38.             var val = msg.TaskName;  
  39.             if (msg.TaskType != 1) val = $"Special-{msg.TaskName}";  
  40.   
  41.             var task = Encoding.UTF8.GetBytes(val);  
  42.   
  43.             Console.WriteLine($"send to {workerId}-{val}");  
  44.   
  45.             // should ack here? or when to ack is better?  
  46.             _channel.BasicAck(ea.DeliveryTag, false);  
  47.   
  48.             // sending message to specify client  
  49.             await ws.SendAsync(  
  50.             new ArraySegment<byte>(task, 0, task.Length),   
  51.             WebSocketMessageType.Text,   
  52.             true,   
  53.             CancellationToken.None);  
  54.         }  
  55.         else  
  56.         {  
  57.             Console.WriteLine("Not found a worker");  
  58.         }  
  59.     }  
  60. };  
When we receive  a message, we should select a worker that can handle this task.
 
If there are no online workers, the server should reject the message and make it re-queue one more time.
 
If there are some online workers, the server will select a worker that can handle this task, here use a random number to simulate this scenario.
 
We also should ensure the connection of this worker is still open, so that the server can send messages to it.
 
The entry of handling WebSocket request is PushAsync method, it just maintains connections of clients, when client sends its client id to the server, it will record them, and when the client disconnects from the server, it will remove the client.
  1. public async Task PushAsync(HttpContext context, WebSocket webSocket)  
  2. {  
  3.     var buffer = new byte[1024 * 4];  
  4.     WebSocketReceiveResult result =   
  5.         await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);  
  6.     string clientId = Encoding.UTF8.GetString(buffer, 0, result.Count);  
  7.   
  8.     // record the client id and it's websocket instance  
  9.     if (_sockets.TryGetValue(clientId, out var wsi))  
  10.     {                  
  11.         if (wsi.State == WebSocketState.Open)  
  12.         {  
  13.             Console.WriteLine($"abort the before clientId named {clientId}");  
  14.             await wsi.CloseAsync(WebSocketCloseStatus.InternalServerError,   
  15.                 "A new client with same id was connected!",   
  16.                 CancellationToken.None);   
  17.         }  
  18.   
  19.         _sockets.AddOrUpdate(clientId, webSocket, (x, y) => webSocket);  
  20.     }  
  21.     else  
  22.     {  
  23.         Console.WriteLine($"add or update {clientId}");  
  24.         _sockets.AddOrUpdate(clientId, webSocket, (x, y) => webSocket);  
  25.     }  
  26.   
  27.     while (!result.CloseStatus.HasValue)  
  28.     {  
  29.         result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);  
  30.     }              
  31.   
  32.     await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);  
  33.     Console.WriteLine("close=" + clientId);  
  34.   
  35.     _sockets.TryRemove(clientId, out _);  
  36. }  

Client

 
The code of client just uses the sample in the document of ASP.NET Core WebSocket.
 
Here is the source code you can find in my GitHub page.

Summary

 
This article showed you a sample that uses WebSocket to build a real-time application via ASP.NET Core.
 
I hope this will help you!