facebook

Blog

Stay updated

Let’s abstract the transport and create microservices applications that interoperate asynchronously
Mass Transit: a real use case
Tuesday, May 25, 2021
mass transit

In the last year and a half, I have had the opportunity to work for a client with a strongly microservice-oriented architecture to support his system consisting of several applications and services that interoperate in an asynchronous way using a Service Bus.

A Service Bus is an integration service that allows interoperability among heterogeneous components in a distributed system. When an application needs to send a message, it will do using a dedicated Service Bus method that, internally, will be responsible for routing it to one or more endpoints that can handle it. From the point of view of the application that manages the message, the Service Bus must be able to pick up the messages and trigger processes to handle them. When a message is successfully managed, it reports to the broker that the message can be deleted. If the processing fails, depending on the configuration of the error management policies, apply strategies to reprocess it.

The client uses the Masstransit framework as an implementation of a Service Bus. In particular, in the projects we worked on, we used it to integrate a web app with a windows service to manage some prolonged running operations without impacting the web application.

This framework offers us some important features, in addition to the basic features that each Service Bus should make available. These features are:

  • Transparent management of exceptions, retries and poisoned messages;
  • Competing consumers pattern;
  • Sagas management.

For a complete list of features, take a look at this link.

A real use case

In the example I present, we will have a web app implemented with Razor Pages, which will give the user the ability to create meetings. Once created, we want to send emails and SMS notifications to the guests, but we want these operations not to impact the web app and instead are carried out by services created specifically for this type of task. The web application will send a message using MassTransit. Then, it’s the task of the framework that routes the message through the transport infrastructure (Azure Service Bus, Amazon SQS, RabbitMq). The services that will consume these messages will be two console applications: one will simulate sending emails, and the other sending SMS. With MassTransit, we will create two consumers who can manage the type of message published by the web app. In the article, you will find only the code related to MassTransit; the code related to the UI will be omitted; moreover, since the consumers will be almost similar, we will focus only on the code of the EmailSender. But you can find the complete code on my repository.

Web Application (Publisher)

Our web app will be responsible for sending a message through our service bus.

The instance of the bus will be injected into the constructor and is of type IBus. This interface contains definitions of the methods needed to send messages through the bus.

public IndexModel(IBus bus){    _bus = bus;} 

After clicking a UI button, a handler method will be triggered, which sends a message.

public async Task<IActionResult> OnPostCreateMeetingAsync()
{
        var message = new MeetingCreatedMessage(Title, Creator, Participants?.Split(','), StartsOn, EndsOn); 
        await SendMessage(message);    
        return Page();
}

private async Task SendMessage(MeetingCreatedMessage message)
{
        try
        {
                await _bus.Publish(message);
                MessageSent = true;
        }
        catch
        {
                MessageSent = false;    
        }
}

The message we send will be of type MeetingCreatedMessage and will be published: this means that we will use the publish/subscribe communication model, ideal for our use case because the message will have more consumers who can manage it

EmailSender Console Application (Consumer1)

Our first console application will be the consumer responsible for “sending” mail to the meeting participants. In this project, we have a class that deals with the consumption of the message.

public class MeetingCreatedMessageConsumer : IConsumer<MeetingCreatedMessage> 
{  ...  }

The class implements IConsumer<MeetingCreatedMessage>, a Masstransit interface indicating that our consumer can process the specified type parameter messages.

public async Task Consume(ConsumeContext<MeetingCreatedMessage> context)
{
        var meeting = context.Message; 
        
        if (IsMeetingAlreadyEnded(meeting))    
        {
                return;
        }
        
        var emails = CreateEmails(meeting);
        await SendEmails(emails);
}

When a message is available in the queue to which the application is hooked, the MassTransit middleware will read the type in the metadata. It will identify the consumer who can manage the message type, then the Consume method will be called.

MassTransit Configuration

In the previous snippets, we have seen how to publish and consume a message with MassTransit. But that’s not all: what we’ve seen so far is the code that uses middleware to publish/consume messages is the code independent of the transport technology we use. We need to configure the Masstransit middleware to specify the type of transport, settings, and endpoints.

Note that for each transport you want to use, you will need to install the appropriate MassTransit Nuget package; otherwise, you will not find the extension methods that allow you to build the bus instance.

Transport with Azure Service Bus

We create a group of resources and a Service Bus type resource by selecting the Standard plan, essential to create topics and subscriptions.

In the shared access policies section, click RootManageSharedAccessKey and copy the Primary Connection String. We will use this key to grant MassTransit permissions to access and manage resources on the Bus. 

public void ConfigureServices(IServiceCollection services)
{
        services.AddRazorPages();

        var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg => 
        {
                cfg.Host(Configuration["AzureServiceBus:ConnectionString"]);
        });
        services.AddSingleton<IBus>(bus);
}

The creation of the bus is effortless: we build it using the method CreateUsingAzureServiceBus and specifying the connection string previously copied

static async Task Main()
{
        var emailSender = new FakeEmailSender();

        var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
                cfg.Host(Configuration["AzureServiceBus:ConnectionString"]);         
                cfg.ReceiveEndpoint(Configuration["AzureServiceBus:Queue"], endpoint =>
                {
                        endpoint.Consumer(() => new MeetingCreatedMessageConsumer(emailSender));
                });
        });
        await bus.StartAsync();
        Console.ReadKey();
        await bus.StopAsync();
} 

The bus configuration on the consumer side is similar, but you need to specify the endpoints too. These latter represent an abstraction of a path from which our app will read messages. With the definition of an endpoint, a queue will be created if it does not already exist.

Specifying the endpoint is not enough: we have to register the consumers called when the messages are received. This last operation will create, if not already exists, a topic that will have the same name of the message that the consumer can manage.

Let’s run the projects and see what happens on our bus.

In the Topics section, we can see that MassTransit has created a new topic with two subscriptions, one for each queue that must receive a copy of the message.

In the Subscriptions section we can see what queues are subscribed to the topic.

Finally, in the Queues section we can see our queues.

Let’s send messages using the web app, choose any queue on our Azure Bus, and see in the metrics section that the number of incoming and outgoing messages increases. Note that the metrics are not updated in real-time, so you should wait a few minutes before observing the increment. To ensure that messages arrive, we can see on the consumer console some log indicating that the emails and SMS were sent. This means that our consumers have received and processed the messages.

Transport with Amazon SQS/SNS

The first step is to create an user in Identity and Access Management (IAM) and generate programmatic access keys.

Grant AmazonSQSFullAccess and AmazonSNSFullAccess permissions then copy the access keys

public void ConfigureServices(IServiceCollection services)
{
        services.AddRazorPages();

        var bus = Bus.Factory.CreateUsingAmazonSqs(cfg =>
        {
                cfg.Host(Configuration["AmazonSqs:Region"], host =>
                {
                        host.AccessKey(Configuration["AmazonSqs:AccessKey"]);            
                        host.SecretKey(Configuration["AmazonSqs:SecretKey"]);
                });
        });
        services.AddSingleton<IBus>(bus);
} 
static async Task Main()
{
        var emailSender = new FakeEmailSender();
        
        var bus = Bus.Factory.CreateUsingAmazonSqs(cfg =>
        {
                cfg.Host(Configuration["AmazonSqs:Region"], host =>
                {
                        host.AccessKey(Configuration["AmazonSqs:AccessKey"]);            
                        host.SecretKey(Configuration["AmazonSqs:SecretKey"]);
                });

                cfg.ReceiveEndpoint(Configuration["AmazonSqs:Queue"], endpoint =>
                {
                        endpoint.Consumer(() => new MeetingCreatedMessageConsumer(emailSender));
                });
        });
        await bus.StartAsync();
        Console.ReadKey();
        await bus.StopAsync();
}

Even in this case, the bus creation and configuration are straightforward for both the publisher and the consumer. The difference is that we use a different method to create an AWS SQS/SNS bus.

Now let’s run the projects.

Like ASB, a topic with a subscription is created for each queue that should receive a copy of the message.

Here are the queues:

Let’s go to the Monitoring tab of any queue, and after sending some messages, we can see how the metric “Number Of Messages Received” goes up. As in ASB, the metrics don’t update in real-time, and you have to wait a while before observing them.

Transport with RabbitMQ

In this last section we will see how to use RabbitMq, the most popular message broker.

public void ConfigureServices(IServiceCollection services)
{
        services.AddRazorPages();

        var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
                cfg.Host(Configuration["RabbitMQ:Host"], host =>
                {
                        host.Username(Configuration["RabbitMQ:Username"]);            
                        host.Password(Configuration["RabbitMQ:Password"]);
                });
        });
        services.AddSingleton<IBus>(bus);
} 
static async Task Main()
{
        var emailSender = new FakeEmailSender();

        var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
                cfg.Host(Configuration["RabbitMQ:Host"], host =>
                {
                        host.Username(Configuration["RabbitMQ:Username"]);            
                        host.Password(Configuration["RabbitMQ:Password"]);
                });

                cfg.ReceiveEndpoint(Configuration["RabbitMQ:Queue"], endpoint =>
                {
                        endpoint.Consumer(() => new MeetingCreatedMessageConsumer(emailSender));
                });
        });
        await bus.StartAsync();
        Console.ReadKey();
        await bus.StopAsync();
}

As you see in the snippets, the configuration is similar to those previously seen; and the difference lies in the method of the factory used to build the bus and the way we configure the host.

Also, we run the three projects.

In the Exchanges tab, you will see that MassTransit created an exchange with the same name of the type of message to send and two Exchanges that have the same name of the queue to which they are respectively connected.

Here are the two queues. 

By sending messages and clicking on any queue, we will see that the frequency of messages increases.

Conclusions

We have seen how MassTransit helps us create applications that interoperate through a Service Bus in a transport agnostic fashion, making it possible to interchange the different transport technologies without making almost any change to the code. This is just one of the many advantages that MassTransit offers us. For further information, I recommend you go to the official website of the project.

If you want to know about a library similar to MassTransit, I suggest you take a look at this article written by Genny about Rebus.

I hope that the reading has been to your liking and that it can be helpful to you.

To the next article!

Written by

Claudio Manniti