
In my previous articles, we have seen how important is decoupling the communications in enterprise applications, and we used on-premise solutions like RabbitMQ or transportation abstraction like Rebus. Cloud offers, in any case, native tools to manage messages. Specifically in Azure, we have at our disposal three services, which purpose is to help us to make this decoupling in different situations: Azure Service Bus, Azure Event Hub e Azure Event Grid.
Azure Service Bus is much appropriate to those enterprise scenarios, in which it is necessary to exchange high-value content messages. In these contexts, the creator of the message prefers that a specific recipient manages them. It is necessary to make a difference between a message that expresses a request (often called command) and an event, which can be subscribed by the interested system parties that want to know if a given “fact” has happened (perfect to implement CQRS scenarios).
Event Hub and Event Grid focus themselves on the event. Azure Event Hub has been designed to manage a series of events or streaming in real-time, so with a big emphasis on communication performance. It is particularly suitable for analysis and telemetry applications. Event Grid, instead, has been designed for those event-based applications or services whose processing is triggered by events that notify a change of status, ideal therefore for the integration of different services on the cloud or in hybrid scenarios.
Let’s analyze them individually to get an idea of their functionality.
Azure Service Bus
ASB is a message broker for enterprise integration whose purpose is to decouple applications and services, and offer a reliable communication tool to exchange information.
Messages can be sent using queues or topics. When using queues, a solution adopted for end-to-end communications, the messages are sorted and saved on arrival and are delivered only when they are actually requested. Topics can be used when you want to create a publish/subscribe communication. The messages are sent to the topic by a publisher, and each subscriber receives a copy of the message.
Azure Service Bus offers several advanced features including:
- Dead-Letter queue, where those messages that cannot be processed are saved.
- Scheduled delivery, the chance to plan the messages sending.
- Transaction support: ASB is a transactional broker and allows the grouping of transactions in relation to one of the messaging entities such as queues and topics. The transactional nature of the broker guarantees the absence of losses and duplication of messages.
- Support for batch sending: You can delay the sending of messages, and if in this period new messages arrive from the sender, they will all be sent in a single batch.
- Automatic elimination of the queue, in case of inactivity
- Duplicate detection
- Geographical Disaster Recovery: In case of downtime, this option allows you to continue processing in a data center in a different geographical area.
Let’s see how we can send and receive messages with ASB using topics. In the example, we have a publisher who publishes messages on an ASB topic and a subscriber who subscribes to it.

We connect now to the Azure portal, and create a new Azure Service Bus resource.

By clicking on Create we can see the screen to create the Namespace, that is a container for messaging components.

We choose the standard or premium version to have the possibility of using a topic-based publish/subscribe communication. The Make this namespace zone redundant option allows us to replicate the namespace for redundancy in the availability zones. Once we selected the namespace, we create a topic by choosing the Topics item from the available Entities.

We choose the name of the topic and its features, such as the capacity, the duration of the messages, and the detection of duplicates, and click on Create:

Once the topic has been created, we create a subscription to it. We find the entry Subscription in the Entities of the Topic. Among the creation options, we can choose to send expired messages to the deadletter queue, or enable sessions to have a FIFO (First-In-First-Out) message management.

In the Topic overview, we can display the created subscription, and mainly we see that the message count of that subscription is currently zero:

Now let’s create a console application that will represent our Publisher. Let’s add the Nuget package Microsoft.Azure.ServiceBus. We instantiate a TopicClient by passing the connection string of the Service Bus as a parameter: we can obtain it via the menu Settings > Shared access policies of the Service Bus on the Azure portal, by clicking on the Policy, and the name of the topic we have previously created.
After entering the number of messages that we want to publish, we invoke the SendAsync() method of the TopicClient to which we pass a message that will be sent asynchronously.
namespace AzureServiceBusPublisher
{
class Program
{
const string AzureServiceBusConnectionString = "MyConnectionString";
const string AzureServiceBusTopic = "demotopic";
static ITopicClient topicClient;
public static async Task Main(string[] args)
{
Console.WriteLine("Type the number of messages that you want to publish -> ");
int numberOfMessage = int.Parse(Console.ReadLine());
topicClient = new TopicClient(AzureServiceBusConnectionString, AzureServiceBusTopic);
for (int i = 0; i < numberOfMessage; i++)
{
string messageBody = $"Demo Message {i}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
await topicClient.SendAsync(message);
}
await topicClient.CloseAsync();
}
}
}


By connecting to the DemoTopic Overview, we notice that the message count is equal to the number of messages we sent.
We now build the application that will act as Subscriber, which is slightly more complex than the one created for the publisher. In this case, we instantiate a SubscriptionClient which receives as parameters the connection string of the Service Bus, the name of the Topic, and the name of the Subscription to which we want to subscribe.
We need to register a message handler, namely a function that deals with processing them: in the example, we called it ProcessMessages. This method decodes the message, prints its content, and completes the processing of the message by removing it from the subscription.
In addition to the name of the method, the subscriptionClient.RegisterMessageHandler() requires also a MessageHandlerOptions as parameter, in which some properties of the message Handler are defined.
namespace AzureServiceBusSubscriber
{
class Program
{
const string AzureServiceBusConnectionString = "MyConnectionString";
const string AzureServiceBusTopic = "demotopic";
const string AzureServiceBusSubscription = "demoTopicSubscription";
static ISubscriptionClient subscriptionClient;
static async Task Main(string[] args)
{
subscriptionClient = new SubscriptionClient(AzureServiceBusConnectionString, AzureServiceBusTopic, AzureServiceBusSubscription);
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
{
AutoComplete = false,
MaxConcurrentCalls = 1,
};
subscriptionClient.RegisterMessageHandler(ProcessMessages, messageHandlerOptions);
Console.ReadKey();
await subscriptionClient.CloseAsync();
}
static async Task ProcessMessages(Message message, CancellationToken token)
{
string messageReceived = Encoding.UTF8.GetString(message.Body);
Console.WriteLine("Received -> " + messageReceived);
await subscriptionClient.CompleteAsync(message.SystemProperties.LockToken);
}
static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
Console.WriteLine($"Exception: {exceptionReceivedEventArgs.Exception}");
return Task.CompletedTask;
}
}
}
We launch the Subscriber program and get the following result:

The DemoTopicSubscription message count returned to zero and therefore we consumed all the published messages.

Azure Event Hubs
Azure Event Hubs is defined as a Big Data streaming platform and as an Event Ingestor service. Platform as it is part of the Platform-as-a-Service (PaaS), and Event Ingestor as it allows to decouple the producer of an event stream from the consumer.
Event Hubs can receive and process millions of events per second and can be used both for real-time analysis or telemetry processing and to acquire and process data in batches or to store them in a storage (e.g., Azure Blob Storage).
It allows the simultaneous processing of the data stream by different applications using a partitioned consumer model in which each consumer reads only a specific subset of the event stream.
The architecture of Event Hubs is as follows:

Event Producers represent any entity that can send data to the Event Hub via Http, AMQP, or Apache Kafka (they allow you to work with Kafka-based applications). The partitions within the event hub are ordered sequences of events, and each consumer group reads the event stream independently by creating a separate view of the stream for each interested application. Finally, there are the Event Receivers, namely the entities that read the data from the Event Hub. Let’s see an example of how you can send and receive Events with Event Hubs.
On the Azure portal we create a new Event Hubs resource:


Click on Create and go to the newly created NameSpace section to add an Event Hub.

Let’s name the event hub and choose the number of partitions we want to use and click on Create.

We now build a console application that will act as Event Producer. After creating the project, we add the Nuget package Azure.Messaging.EventHubs. Like the Azure Service Bus example, the user is asked for the number of events he wants to send.
To publish the events, we instantiate an EventHubProducerClient that has the connection string to the namespace and the name of the event hub as parameters.
We build now a batch of events which are subsequently sent asynchronously to the Event Hub. Since we do not specify any partitions, they are sent to the default partition, automatically created.
namespace EventHubsProducer
{
class Program
{
const string EventHubsConnectionString = "MyConnectionString";
const string EventHub = "eventhubdemo";
public static async Task Main(string[] args)
{
Console.WriteLine("Type the number of events that you want to publish -> ");
int numberOfEvent = int.Parse(Console.ReadLine());
await using var eventHubProducerClient = new EventHubProducerClient(EventHubsConnectionString, EventHub);
using EventDataBatch eventDataBatch = await eventHubProducerClient.CreateBatchAsync();
for (int i = 0; i < numberOfEvent; i++)
{
string eventBody = $"Demo Message {i}";
eventDataBatch.TryAdd(new Azure.Messaging.EventHubs.EventData(Encoding.UTF8.GetBytes(eventBody)));
}
await eventHubProducerClient.SendAsync(eventDataBatch);
}
}
}


As we can see from the metrics section of the overview of the EventHUbs-Demo namespace, we actually sent five messages.
Let’s create an application that will consume these events. To do this, in addition to installing the Azure.Messaging.EventHubs Nuget package, we must also add Azure.Messaging.EventHubs.Processor as we will need to instantiate an EventProcessorClient. Since this type of client is robust, it requires the presence of storage, in order to save checkpoints based on the events processed within a partition, so that processing can be resumed from that point.
So let’s create a Storage Account on Azure:

Once we choose the name, we can continue with the basic settings that will then create the Storage Account.

We then create a container from the Blob Service menu, that we will call eventhubcontainer. An Azure Blob is a type of storage for unstructured data. Let’s create an instance of the container that will be used by the previously mentioned EventProcessorClient.
To create an instance of the EventProcessorClient we must also provide the ConsumerGroup as a parameter (which, in this case, will be the default one), the connection string of the namespace of the event hubs, and the name of the event hub.
Before you can start processing events, you must register the ProcessEventAsync and ProcessErrorAsync events. The first one is the event that is responsible for processing events from the Event Hub, whereas the second deals with errors.
With StartProcessingAsync() we start the processing, and we delay it for a few seconds, to avoid that the process does not complete the processing correctly.
In the end, we stop processing using the StopProcessingAsync() method.
namespace EventHubsReceiver
{
class Program
{
const string EventHubsConnectionString = "EventHubConnectionString";
const string EventHub = "eventhubdemo";
const string AzureBlobStorageConnectionString = "AzureBlobStorageConnectionString";
const string AzureBlobStorageContainer = "eventhubcontainer";
public static async Task Main(string[] args)
{
string eventHubConsumerClient = EventHubConsumerClient.DefaultConsumerGroupName;
BlobContainerClient blobContainerClient = new BlobContainerClient(AzureBlobStorageConnectionString, AzureBlobStorageContainer);
EventProcessorClient eventProcessorClient = new EventProcessorClient(blobContainerClient, eventHubConsumerClient, EventHubsConnectionString, EventHub);
eventProcessorClient.ProcessEventAsync += ProcessingEvent;
eventProcessorClient.ProcessErrorAsync += ProcessingError;
await eventProcessorClient.StartProcessingAsync();
await Task.Delay(TimeSpan.FromSeconds(10));
await eventProcessorClient.StopProcessingAsync();
}
private static Task ProcessingEvent (ProcessEventArgs processEventArgs)
{
Console.WriteLine("Received -> " + Encoding.UTF8.GetString(processEventArgs.Data.Body.ToArray()));
return Task.CompletedTask;
}
private static Task ProcessingError(ProcessErrorEventArgs processErrorEventArgs)
{
Console.WriteLine(processErrorEventArgs.Exception.Message);
return Task.CompletedTask;
}
}
}
The result we get is the following:


We actually consumed the previously produced messages. In addition, if we watch at the Blob container previously created, we notice the checkpoint related to the processing of events.

Azure Event Grid
Event Grid is an event connector that allows you to create event-based architectures. It implements a publish/subscribe communication.
Event Grid can subscribe to various resources made available by Azure or to customized event topics (Event Sources). These events are then sent to event handlers who manage or process them.

In addition to the features mentioned, it can support a considerable number of subscribers, and there is the possibility of filtering events, e.g., based on their type.
Let’s see how to create a publish/subscribe communication with Event Grid.
From the Azure portal, we create a new NameSpace, and call it EventGridDemo.


Unlike the other two Services, with Event Grid we will start from the Subscriber. The Subscriber will be an Azure Function that is deployed on Azure. An Azure Function allows you to run code with no worry about the infrastructure, creating Serverless applications. Furthermore, these types of functions are event-driven as they are triggered by the occurrence of different types of events.
So let’s create a new project from Visual Studio and choose Azure Function as template.Once the name of the application has been chosen, we are asked to choose the type of trigger that will activate the function. In this example, we choose to use an HTTP trigger.

We create an EventGridSubscriber, and a mapping with a customized topic, Demo.Message, which we use to read the events of type DemoEvent. We then use the DeserializeEventGridEvents() method to de-serialize the events of the Event Grid.
Next, we go to evaluate each event and in particular when we encounter an event of the DemoEvent type, we print the EventMessage. Once the processing of the events is finished, we send a 200 OK HTTP.
It is important to create a SubscriptionValidationResponse response for events of the SubscriptionValidationEventData type to complete the event subscription handshake.
class DemoEvent
{
[JsonProperty(PropertyName = "eventMessage")]
public string EventMessage { get; set; }
}
namespace EventGridDemoSubscriber
{
public static class Function1
{
[FunctionName("Function1")]
public static async Task<HttpResponseMessage> Run([HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)]HttpRequestMessage req, TraceWriter log)
{
const string CustomTopicEvent = "Demo.Message";
log.Info("C# HTTP trigger function processed a request.");
string requestEvent = await req.Content.ReadAsStringAsync();
EventGridSubscriber eventGridSubscriber = new EventGridSubscriber();
eventGridSubscriber.AddOrUpdateCustomEventMapping(CustomTopicEvent, typeof(DemoEvent));
EventGridEvent[] eventGridEvents = eventGridSubscriber.DeserializeEventGridEvents(requestEvent);
foreach (EventGridEvent eventGridEvent in eventGridEvents)
{
if (eventGridEvent.Data is SubscriptionValidationEventData)
{
var eventData = (SubscriptionValidationEventData)eventGridEvent.Data;
var responseData = new SubscriptionValidationResponse()
{
ValidationResponse = eventData.ValidationCode
};
return req.CreateResponse(HttpStatusCode.OK, responseData);
}
else if (eventGridEvent.Data is DemoEvent)
{
var eventData = (DemoEvent)eventGridEvent.Data;
log.Info("Received -> " + eventData.EventMessage);
}
}
return req.CreateResponse(HttpStatusCode.OK);
}
}
}
On the solution we right click on the project, and we choose the Publish option. In this way, we will be able to publish our Azure Function on Azure.

Once we publish the function on Azure, we go to the Event Grid Topic created previously, and create an Event Subscription.

We choose a name and WebHook as endpoint type, that is an HTTP endpoint, which corresponds to the url of the Azure Function previously published.

So we created a subscription that we can view on the Event Grid Topic overview page.

We create a console application that will be our publisher. We create an EventGridClient and use the PublishEventsAsync method that requires the Endpoint address as parameters and a method that returns a list of EventGridEvent.
namespace EventGridDemoPublisher
{
class DemoEvent
{
[JsonProperty(PropertyName = "eventMessage")]
public string EventMessage { get; set; }
}
class Program
{
static void Main(string[] args)
{
string topicEndpoint = "EndpointAddress";
string topicKey = "topickey";
string topicEndpointHost = new Uri(topicEndpoint).Host;
TopicCredentials topicCredentials = new TopicCredentials(topicKey);
EventGridClient eventGridClient = new EventGridClient(topicCredentials);
eventGridClient.PublishEventsAsync(topicEndpointHost, CreateEventList()).GetAwaiter().GetResult();
Console.Write("Published events to Event Grid topic.");
Console.ReadLine();
}
static IList<EventGridEvent> CreateEventList()
{
Console.WriteLine("Type the number of events that you want to publish -> ");
int numberOfEvent = int.Parse(Console.ReadLine());
List<EventGridEvent> eventGridEventsList = new List<EventGridEvent>();
for (int i = 0; i < numberOfEvent; i++)
{
eventGridEventsList.Add(new EventGridEvent()
{
Id = Guid.NewGuid().ToString(),
EventType = "Demo.Message",
Data = new DemoEvent()
{
EventMessage = $"Demo message {i}"
},
EventTime = DateTime.Now
});
}
return eventGridEventsList;
}
}
}
Let’s launch the Publisher:

In the same way we launch the Azure Function:

As we can see, the five messages published to the Event Grid Topic have been received.

Conclusions
Our overview on events management in Azure finishes here. We definitely only scratched the surface, but I just mean to give you the basic information to choose the service that best suits your needs and start from there. I suggest you to try all three services at least once, maybe with the code used in the examples you find here: https://github.com/intersect88/AzureMessagingService
See you at the next article!