facebook

Blog

Stay updated

To decouple the communication in complex architectures, we can use messages and get help from RabbitMQ to manage them
Decoupling the communication with RabbitMQ
Wednesday, May 29, 2019

The project I am working on is based on an architecture made up of different applications, that communicate each other through messages exchange. This communication usually takes place following the pattern Publish/Subscribe, that establishes that an application can communicate in asynchronous manner with different entities, without coupling.

Involved entities are:

  • Publisher -the application, that produces the message
  • Message Broker – It’s the MOM (Message Oriented Middleware) that takes the message and routes it toward a consumer
  • Consumer – the application, that consumes the message.

For this project, we used RabbitMQ, an open source messaging broker, that supports different protocols and offers many features.

RabbitMQ implements the protocol AMQP.0-9-1 (Advanced Message Queuing Protocol), that establish that messages would be published towards some AMQP entites, called Exchange. Their role is to distribute received messages according to specific rules called bindings to other entities, Queue, to whom one or more Consumer can subscribe.

The routing algorithm, which allows messages to be routed to queues, depends on the exchange type. In AMQP there are four types defined, in addition to the default one, and they have below characteristics:

  • Default: its main feature is that every new queue is automatically linked to it, with a routing key identical to the queue name.
  • Direct: useful to route messages. The messages delivering is based on a routing key.
  • FanOut: it’s perfect for broadcast routing, since it forward messages to all queues binded to its, overlooking the routing key
  • Topic: used for multicast routing. The messages delivering is based on a routing key and a wildcard pattern used to bind queues to exchange.
  • Headers: in this case, the exchange is not based on routing key, but on expressed attributes, like message header.

Both queues and exchanges have properties, that allow them to survive to a broker restart and they can also delete themselves, when respectively consumer or associated queues lack. Queues can also be exclusive, that is binded to a single connection.

I prefer not to local install RabbitMQ, but to use a Docker container , both for a convenience reason, since the dockerized image is ready to use, and to give the idea to use a broker instanced on a environment separate from publisher and subscriber, which will be defined hereafter.

To create the container, we run the following command, where we specify hostname, its name and the RabbitMQ image we want to instance: 

docker run -d --hostname my-rabbit --name rabbit1 -p "5672:5672" -p "15672:15672" rabbitmq:3-management

I mapped the default port and the management Web UI port between container and localhost, to have the choice to access as I need. I chose exactly the management image, for the presence of a web interface, reachable to the address http://localhost:15672,  thanks to which it is possible to interact and control different broker entities: a more intuitive option compared to CLI (rabbitmqctl)

Using the examples available on the RabbitMQ website, I create two console application in .NET Core. One of them will be the Publisher, the other one will be the Subscriber. RabbitMQ provides, among others, a client for the .NET language, that can be installed through the Nuget packages manager.

Analyze now the Producer of the message I defined as Sender.

class Sender
{
  static void Main(string[] args)
  {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    using (var channel = connection.CreateModel())
    {
      channel.QueueDeclare(queue: "QueueDemo",
                 durable: false,
                 exclusive: false,
                 autoDelete: false,
                 arguments: null);
​
      string message = "Demo Message";
      var body = Encoding.UTF8.GetBytes(message);
​
      channel.BasicPublish(exchange: "",
                 routingKey: "QueueDemo",
                 basicProperties: null,
                 body: body);
      Console.WriteLine("Sent {0}", message);
    }
​
    Console.WriteLine(" Press [enter] to exit.");
    Console.ReadLine();
  }
}

We create a connection to the EndPoint, in our situation the localhost, istancing a ConnectionFactory. Running the program in debug, we can see that a connection has been created toward the endpoint, with AMQP protocol and with RabbitMQ default port.

We can declare now a queue, with the definition of a Name (QueueDemo) property. We create the message to send (a simple string), and in the method BasicPublish we indicate the default exchange (with an empty string), the routing key (identical to the queue name) and we insert the message to publish in the body.

We can run the application Sender :

In the WebUI Overview screen we can note that the queue number has increased. Actually, the queue QueueDemo has been created and there’s a message in queue.

In QueueDemo detailed information, we can see the exchange it has been linked to

And clicking on the button Get Message(s) we can display the Payload of our message, which correspond to the string created in Sender.

The message, then, is properly queued and is waiting to be consumed.

We analyze now the  Subscriber application, that I called Receiver. We create a connection here too and we declare the queue DemoQueue. We create then the real consumer through the class EventBasicConsumer and with the method Received we define the event, which will occur when the message will be received. To do that, we assign BasicDeliverEventArgs ea properties, that contains all properties related to the message delivered by the broker. Lastly, with the method BasicConsume we start the just defined consumer .

class Receiver
{
  static void Main(string[] args)
  {
    var factory = new ConnectionFactory() { HostName = "localhost" };
    using (var connection = factory.CreateConnection())
    {
      using (var channel = connection.CreateModel())
      {
        channel.QueueDeclare(queue: "QueueDemo",
                   durable: false,
                   exclusive: false,
                   autoDelete: false,
                   arguments: null);
        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
          var body = ea.Body;
          var message = Encoding.UTF8.GetString(body);
          Console.WriteLine("Received {0}", message);
        };
        channel.BasicConsume(queue: "QueueDemo",
                   autoAck: true,
                   consumer: consumer);
        Console.WriteLine("Press [enter] to exit.");
        Console.ReadLine();
      }
    }
  }
}

Running the application, the message will be consumed precisely.

With many messages in queue and many consumer in waiting, the load balancing will be managed by default with a Round Robin scheduling, so that no consumer will take priority over others. In order to be sure that the message has been delivered to consumer, we can modify the message’s acknowledge way. Inside the consumer Received method, we invoke the method BasicAck, that is used to make the message’s acknowledge. Between the arguments of this method, there’s the Delivery Tag that univocally identify the delivering. It is needed to set the autoAck in the BasicConsume to false.

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);

On the WebUI we can see how the message has been properly consumed and an ack has been sent from the consumer.

RabbitMQ can be a great choice for the communication between applications, microservices and all those software solutions for whom it is needed the use of a distributed architecture. As open source middleware, it offers many advantages: among them, the decoupling between different entities, for whom it is possible to deploy separately, or the asynchronous communication, that allow to application to continue the execution flow without block.

Different from HTTP communication through REST API, for example, it offers a better reliability in the information exchange between application, thanks both to acknowledge mechanisms and re-delivery of messages and to robustness and redundancy (cluster of nodes) mechanisms.

Obviously, in this article I have only described basic features of RabbitMQ, that were useful to me to start to interact with this communication way.

Waiting for a more technical article, here is the link to the repository GitHub created for this issue: https://github.com/intersect88/IntroToRabbitMQ.

See you next

Written by

Genny Paudice