Getting Started With MassTransit And RabbitMq

In this article, we explore the MassTransit library and use it to encapsulate RabbitMq operations. MassTransit provides a layer over abstraction over the commonly used message broker services.

What is a message broker?

Message brokers are software components or services that allow sharing of information between applications. They do this by translating messages using formal messaging protocols. Message brokers allow applications written in different languages and platforms to communicate with one another. Additionally, message brokers can store, validate and route messages to different clients.

The key components of a message broker are:

  • Producer: The source application that sends the message
  • Consumer: The client application that receives the message.
  • Queue: The data structure used to store incoming messages
  • Exchange: The routing agents responsible for routing messages to different queues, depending on binding

Why RabbitMQ

RabbitMq is one of the most widely used message broker services. It is characterized by high availability and scalability. RabbitMq is extremely lightweight, and supports various messaging protocols.

It is easy to consume in applications to aid in transmitting, receiving, and storing messages, ensuring producer messages are never lost.

Why MassTransit?

The primary goal of MassTransit is to provide a consistent, friendly abstraction over commonly used message transports, like the RabbitMq, Azure Service Bus, ActiveMQm Amazon SQS, etc. This is accomplished with an easy-to-use API that focuses on solving business problems.

MassTransit is asynchronous and leverages the TPL library to consume messages concurrently. It also supports connection management, exception management, and retries. To read more on the advantages of using MassTransit over transports like RabbitMq, follow the link in the official MassTransit documentation.

Setting up RabbitMq (with Docker)

We will use Docker containers for RabbitMq for the demo. For the first step, we will get the container running.

$ docker run -d --hostname demo-rabbit --name demo-rabbit -e RABBITMQ_DEFAULT_USER=user -e RABBITMQ_DEFAULT_PASS=password -p 15672:15672 rabbitmq:3-management

Producer Code Sample
 

Initialize Producer to use RabbitMq

The first step involves creating our demo producer application. Let us create a sample Web API and install the required nuget packages for MassTransit.

Install-Package MassTransit
Install-Package MassTransit.AspNetCore
Install-Package MassTransit.RabbitMQ

We will also add our configuration details to the appsettings.json.

"RabbitMqSettings": {
    "Uri": "rabbitmq://localhost",
    "UserName": "user",
    "Password": "password"
},
public class RabbitMqSettings
{
    public string Uri { get; set; } = null!;
    public string UserName { get; set; } = null!;
    public string Password { get; set; } = null!;
}

Let us now start configuring MassTransit to use our desired RabbitMq installation for transport.

var rabbitMqSettings = builder.Configuration.GetSection(nameof(RabbitMqSettings)).Get < RabbitMqSettings > ();
builder.Services.AddMassTransit(mt => mt.AddMassTransit(x => {
    x.UsingRabbitMq((cntxt, cfg) => {
        cfg.Host(rabbitMqSettings.Uri, "/", c => {
            c.Username(rabbitMqSettings.UserName);
            c.Password(rabbitMqSettings.Password);
        });
    });
}));

The above code adds RabbitMq as the transport layer for MassTransit. It is also reading the RabbitMq configuration from the appsetting.json, which we had set earlier.

Sending Message

We will now proceed to declare the message format to be sent across. We will now create a Class Library project and add the following record to it. This library would be shared between the producer and consumer for the Message type definition.

public record CommandMessage(long Id,string MessageString);

Let us now create an endpoint in our producer Web API which would be raising the request to send the message to the consumer.

app.MapPost("/sendmessage", (long id, string message, IPublishEndpoint publishEndPoint) => {
    publishEndPoint.Publish(new CommandMessage(id, message));;
});

Consumer Code Sample

With the producer code in place, it is time to concentrate on the consumer application. The consumer application would read any messages from the designated queue and process them depending on the message types. Each message type that needs to be processed by the consumer application would require an implementation of IConsumer<TMessageType>. This would trigger the required actions that need to happen when a message of the particular type arrives.

The IConsumer<TMessageType> is defined as:

public interface IConsumer < in TMessage > : IConsumer
where TMessage: class {
    Task Consume(ConsumeContext < TMessage > context);
}
public interface IConsumer {}

As you can see, there is only one method exposed by the interface - Consume. The Consume method receives the message wrapped in an IConsumeContext interface. This allows it to access the incoming message, but also the details surrounding the message, including the headers.

Let us begin by writing our handler for the message type CommandMessage.

public class CommandMessageConsumer: IConsumer < CommandMessage > {
    public async Task Consume(ConsumeContext < CommandMessage > context) {
        var message = context.Message;
        await Console.Out.WriteLineAsync($ "Message from Producer : {message.MessageString}");
        // Do something useful with the message
    }
}

As you can see from the code above, for the demo purpose, we are not doing anything useful with the incoming message. This is to keep the example code simple, but the code could perform any business logic required by the application.

Now that we have the CommandMessageConsumer ready, it is important to enlist it to the MassTransit pipeline. This could be achieved easily with the following code:

var rabbitMqSettings = builder.Configuration.GetSection(nameof(RabbitMqSettings)).Get < RabbitMqSettings > ();
builder.Services.AddMassTransit(mt => mt.UsingRabbitMq((cntxt, cfg) => {
    cfg.Host(rabbitMqSettings.Uri, "/", c => {
        c.Username(rabbitMqSettings.UserName);
        c.Password(rabbitMqSettings.Password);
    });
    cfg.ReceiveEndpoint("samplequeue", (c) => {
        c.Consumer < CommandMessageConsumer > ();
    });
}));

In the above code sample, any message of type CommandMessage arriving on the samplequeue queue will be consumed by the implementation of CommandMessageConsumer.

That's all that is required to run the first example of MassTransit with RabbitMq. Let us now test the sample application by running our Producer and Consumer applications.

Producer

Consumer

info: MassTransit[0]
      Bus started: rabbitmq://localhost/
Message from Producer : Hello World !!

You could test the asynchronous nature of message delivery by taking your Consumer application offline, while sending a message using your Producer code. The message could be verified within the RabbitMq using the Management tool. Later, when the Consumer application is online again, it would recieve the message.

Summary

As seen in the example codes above, MassTransit provides a simple, easy-to0-use abstraction over a different transport mechanism. In this example, we use RabbitMq, but it would be as simple to integrate an Azure Bus or an ActiveMq. The abstraction also allows the developer to focus on the business logic irrespective of the underlying implementation of transport layer.


Similar Articles