Asynchronous RPC Communication Using RabbitMq

If you have been a developer for at least a couple of years, chances are pretty high that you heard about message brokers. Message Brokers are software applications that enable services/systems to communicate between each other and exchange information reliably. RabbitMq is probably one of the most widely used message broker systems used across the globe.

In this article, we will use RabbitMq to implement a RPC (remote procedure call) call. The architecture of the system would revolve around a request Queue and a response queue (each client would have its own client queue). We will use the CorrelationId to distinguish between individual calls made by the client. The idea is illustrated in the diagram below.

RPC Communication with Message Broker

To illustrate the idea further to develop a client and a server, each of its functionalities could be summarized as the following.

  • Client
    Displays a list of User Names (just some strings) received from Server, Should be able to send a number to the server, which would indicate a number of usernames to return.
     
  • Server
    Listens to the probable requests from different clients. Generates and returns a collection of strings denoting usernames based on the count indicated by the client in the request message.

The Server

We will begin by creating the Server. The Server would receive a message in the request queue. The message would also contain meta-information which would help the server to understand the response queue to use. Each of the requests would contain

  • The actual message
  • Response Queue Name
  • Correlation Id (uniquely identify each request)

Let us begin defining and initializing our queue and listeners.

// Rpc Client on Server
internal class RpcClient: IDisposable {
    private
    const string QueueName = "UserRpcQueue";
    private IConnection _connection;
    private IModel _channel;
    private bool _isDisposed;
    public void InitializeAndRun() {
        var factory = new ConnectionFactory {
            HostName = "localhost",
        };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _channel.QueueDeclare(queue: QueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        _channel.BasicQos(0, 1, false);
        var consumer = new EventingBasicConsumer(_channel);
        _channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
        consumer.Received += Consumer_Received; // we will come to this in a bit
    }
}

In the above code, we are initializing a Queue with the name "UserRpcQueue". Note that we have set the Auto Acknowledgement to false. This is important because we need to send the acknowledgment only after executing the method and sending the response back.

Let us now expand the Consumer_Recieved method and decide what we need to do with each incoming request and how we respond to it. Remember, we will need to send the acknowledgment for the incoming request.

private void Consumer_Received(object ? sender, BasicDeliverEventArgs ea) {
    LogMessage?.Invoke("Recieved Request from client..");
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    var replyProps = _channel.CreateBasicProperties();
    replyProps.CorrelationId = ea.BasicProperties.CorrelationId;
    replyProps.ReplyTo = ea.BasicProperties.ReplyTo;
    var value = int.Parse(message);
    LogMessage?.Invoke($ "Preparing to generate {value} User Names");
    var userNames = GenerateUserNames(value); // the Worker method, which needs to be executed by the Server.
    foreach(var user in userNames) {
        LogMessage?.Invoke(user);
    }
    var response = JsonSerializer.Serialize < IEnumerable < string >> (userNames);
    var responseBody = Encoding.UTF8.GetBytes(response);
    _channel.BasicPublish(exchange: "", routingKey: replyProps.ReplyTo, basicProperties: replyProps, body: responseBody);
    _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}

What the Server does is actually pretty simple ( yeah, the real trick is in the client). It receives the request from one of the clients in a specific queue, in this case, named "UserRpcQueue". It then unwraps the message to retrieve the queue to use for response and the correlation id which would be used by the client to uniquely identify each request (and subsequent response).

Once it has this information it would now process the actual message and execute the operation required. In this case, it is the generation of a few strings (usernames), where the number of strings to be generated is the message sent by the client.

The response of the operation is then sent back to the client through the reply queue name supplied by the client. It also remembers to pack the CorrelationId which it received in the request as a part of response too. The server would use the correlation id to identify the individual request (we will get to it in a moment). At this point, the server is also ready to acknowledge the client for receipt and processing of the request.

The full source code for RpcClient on server is as follows.

// Rpc Client on Server
internal class RpcClient: IDisposable {
    private
    const string QueueName = "UserRpcQueue";
    private IConnection _connection;
    private IModel _channel;
    private bool _isDisposed;
    public void InitializeAndRun() {
        var factory = new ConnectionFactory {
            HostName = "localhost",
        };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _channel.QueueDeclare(queue: QueueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
        _channel.BasicQos(0, 1, false);
        var consumer = new EventingBasicConsumer(_channel);
        _channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
        consumer.Received += Consumer_Received;
    }
    private void Consumer_Received(object ? sender, BasicDeliverEventArgs ea) {
        LogMessage?.Invoke("Recieved Request from client..");
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        var replyProps = _channel.CreateBasicProperties();
        replyProps.CorrelationId = ea.BasicProperties.CorrelationId;
        replyProps.ReplyTo = ea.BasicProperties.ReplyTo;
        var value = int.Parse(message);
        LogMessage?.Invoke($ "Preparing to generate {value} User Names");
        var userNames = GenerateUserNames(value);
        foreach(var user in userNames) {
            LogMessage?.Invoke(user);
        }
        var response = JsonSerializer.Serialize < IEnumerable < string >> (userNames);
        var responseBody = Encoding.UTF8.GetBytes(response);
        _channel.BasicPublish(exchange: "", routingKey: replyProps.ReplyTo, basicProperties: replyProps, body: responseBody);
        _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
    }
    public IEnumerable < string > GenerateUserNames(int count) {
        return Enumerable.Range(1, count).Select(x => $ "UserName {x}");
    }
    public delegate void LogMessageHandler(string message);
    public event LogMessageHandler LogMessage;
    private void Dispose(bool disposing) {
        if (_isDisposed) return;
        if (disposing) {
            _channel.Close();
        }
        _isDisposed = true;
    }
    public void Dispose() {
        Dispose(true);
        GC.SuppressFinalize(this);
    }~RpcClient() {
        Dispose(false);
    }
}

The Client

At this point, we are ready to set up our client. The Client should be able to send a request message which is a number indicating the number of Usernames the Server needs to be sent back. As noticed in our server code, it also sends some additional information. We will now see why the client sends those information and how it needs them for processing the response from the server.

The first step of course is to initialize the queues we would be requiring. The requests would be sent on a common queue named "UserRpcQueue". The response queue, however, is dedicated on for each client. Each client maintains its own uniquely named response queue.

// Rpc Client on Client
internal class RpcClient: IDisposable {
    private IConnection _connection;
    private IModel _channel;
    private string _responseQueueName;
    private string QueueName = "UserRpcQueue";
    private bool _isDisposed;
    public void Initialiaze() {
        var factory = new ConnectionFactory {
            HostName = "localhost",
        };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _responseQueueName = _channel.QueueDeclare().QueueName;
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += Consumer_Received;
        _channel.BasicConsume(queue: _responseQueueName, consumer: consumer, autoAck: true);
    }
}

As you can see in the code above, we are setting up the connections to listen to a queue named "UserRpcQueue". We have also created a unique queue name _responseQueueName that would be used for receiving responses from the Server for this particular instance of the client. Before we add the code for the Recieved handler (in this case Consumer_Recieved), let us add the method to send the message.

private ConcurrentDictionary < string, TaskCompletionSource < string >> _activeTaskQueue = new ConcurrentDictionary < string, TaskCompletionSource < string >> ();
public Task < string > SendAsync(string message) {
    var basicProperties = _channel.CreateBasicProperties();
    basicProperties.ReplyTo = _responseQueueName;
    var messageId = Guid.NewGuid().ToString();
    basicProperties.CorrelationId = messageId;
    var taskCompletionSource = new TaskCompletionSource < string > ();
    var messageToSend = Encoding.UTF8.GetBytes(message);
    _channel.BasicPublish(exchange: "", routingKey: "UserRpcQueue", basicProperties: basicProperties, body: messageToSend);
    _activeTaskQueue.TryAdd(messageId, taskCompletionSource);
    return taskCompletionSource.Task;
}

The SendAsync  method would be used to send the message to Server. However, it does a bit more than that. First. it adds a few meta-information about the client to the message packet ( not the actual message though). This includes the unique response Queue name and an additional Guid, called the CorrelationId.

Once we have published the request to the Request Queue, we will add an entry to Dictionary. This dictionary activeTaskQueue tracks all the active requests which the client has sent to the client. We intend to ensure that the client would be in a position to send multiple requests to the server without having to wait for each response from the client. But we would like to associate each of the requests with their unique response. This is where the correlation id comes in handy. The unique Guid we created and passed as the CorrelationId is stored in the dictionary. Once we receive the response, we will use the associated TaskCompletionSource instance to set the result and complete the task.

How do we do it? Let us add the Consumer_Recieved handler which we had skipped earlier.

private void Consumer_Received(object ? sender, BasicDeliverEventArgs args) {
    var body = args.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    if (_activeTaskQueue.TryRemove(args.BasicProperties.CorrelationId, out
            var taskCompletionSource)) {
        taskCompletionSource.SetResult(message);
    }
}

After receiving the response, we would remove the associated entry from the active task dictionary, and set the result using the instance of TaskCompletionSource.

Let us look at the entire code of the RpcClient for Client.

internal class RpcClient: IDisposable {
    private IConnection _connection;
    private IModel _channel;
    private string _responseQueueName;
    private string QueueName = "UserRpcQueue";
    private bool _isDisposed;
    private ConcurrentDictionary < string, TaskCompletionSource < string >> _activeTaskQueue = new ConcurrentDictionary < string, TaskCompletionSource < string >> ();
    public void Initialiaze() {
        var factory = new ConnectionFactory {
            HostName = "localhost",
        };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _responseQueueName = _channel.QueueDeclare().QueueName;
        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += Consumer_Received;
        _channel.BasicConsume(queue: _responseQueueName, consumer: consumer, autoAck: true);
    }
    private void Consumer_Received(object ? sender, BasicDeliverEventArgs args) {
        var body = args.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);
        if (_activeTaskQueue.TryRemove(args.BasicProperties.CorrelationId, out
                var taskCompletionSource)) {
            taskCompletionSource.SetResult(message);
        }
    }
    public Task < string > SendAsync(string message) {
        var basicProperties = _channel.CreateBasicProperties();
        basicProperties.ReplyTo = _responseQueueName;
        var messageId = Guid.NewGuid().ToString();
        basicProperties.CorrelationId = messageId;
        var taskCompletionSource = new TaskCompletionSource < string > ();
        var messageToSend = Encoding.UTF8.GetBytes(message);
        _channel.BasicPublish(exchange: "", routingKey: "UserRpcQueue", basicProperties: basicProperties, body: messageToSend);
        _activeTaskQueue.TryAdd(messageId, taskCompletionSource);
        return taskCompletionSource.Task;
    }
    private void Dispose(bool disposing) {
        if (_isDisposed) return;
        if (disposing) {
            _channel.Close();
        }
        _isDisposed = true;
    }
    public void Dispose() {
        Dispose(true);
        GC.SuppressFinalize(this);
    }~RpcClient() {
        Dispose(false);
    }
}

You could now use the RpcClient as the following.

public async Task ExecuteFetchCommand()
{
    var response = await _rpcClient.SendAsync(Count.ToString());
    LogMessages.Add($"Generating {Count} UserNames");
    await foreach(var userName in DeserializeStreaming<string>(response))
    {
        LogMessages.Add(userName);
    }
}

So that was pretty easy to implement right? That's the whole strength of RabbitMq - It's quite simple and easy to use, and you could still do a lot of things with it. Please note that though we have used a Client-Server architecture to illustrate our example, in real life, it doesn't need to be. It could be any two services that are interested to query each other in an RPC way.

If you are interested in looking at the complete source code discussed in this article, the same has been included. Do note that the sample applications are built as WPF applications.


Similar Articles