facebook

Blog

Resta aggiornato

Astraiamo il trasporto e creiamo applicazioni a microservizi che interagiscono in maniera asincrona
Mass Transit: un caso d’uso reale
martedì 25 Maggio 2021
mass transit

In questo ultimo anno e mezzo, ho avuto l’opportunità di lavorare per un cliente che ha un’architettura fortemente orientata ai microservizi per supportare il proprio sistema composto da svariate applicazione e servizi tra loro indipendenti, che interoperano in maniera asincrona sfruttando un Service Bus.

Un Service Bus rappresenta un servizio di integrazione in grado di permettere l’interoperabilità tra componenti eterogenei dislocati in un sistema distribuito. Quando un’applicazione necessita di inviare un messaggio, lo farà semplicemente utilizzando un apposito metodo del Service Bus che, internamente, si occuperà di dirottarlo verso uno o più endpoints in grado di gestirlo. Dal punto di vista dell’applicazione che invece deve gestire il messaggio, il Service Bus dovrà essere in grado di prelevare i messaggi e innescare dei processi atti a elaborarli e, in caso di messaggio gestito correttamente, segnalare al broker che il messaggio può essere cancellato; oppure, in caso l’elaborazione non vada a buon fine, in base alla configurazione delle policy di gestione degli errori, applicare delle strategie per la rielaborazione del messaggio.

Il cliente in questione usa come implementazione di Service Bus il framework MassTransit. Nello specifico, nei progetti su cui abbiamo lavorato, lo abbiamo utilizzato per integrare un’app web con un windows service affinché alcune operazioni molto lente venissero gestite da quest’ultimo senza impattare sulla web application.

Questo framework ci offre, oltre alle funzionalità di base che ogni Service Bus dovrebbe mettere a disposizione, altre importanti funzionalità come:

  • Gestione di eccezioni, retries e messaggi poisoned in modo trasparente;
  • Consumatori concorrenti (competing consumer pattern);
  • Gestione delle saghe.

Per un elenco completo, vi consiglio di dare un’occhiata a questo link.

Un caso d’uso reale

Nell’esempio che vi propongo, avremo una web app implementata con Razor Pages, che darà la possibilità all’utente di creare dei meetings. Creato il meeting, vogliamo inviare delle emails e delle notifiche SMS agli invitati, ma vogliamo che queste operazioni non impattino sulla web app e che vengano invece effettuate da servizi creati appositamente per questo tipo di task. La web application invierà un messaggio sfruttando MassTransit. Sarà poi compito del framework instradare il messaggio attraverso l’infrastruttura di trasporto (Azure Service Bus, Amazon SQS, RabbitMQ). I servizi che consumeranno questi messaggi saranno due applicazioni console: una simulerà l’invio di emails e l’altra l’invio di sms. Attraverso MassTransit, creeremo quindi due consumers che potranno gestire il tipo di messaggio pubblicato dalla web app. Nell’articolo troverete solamente il codice relativo a MassTransit, verrà quindi omesso il codice relativo alla UI; inoltre, dato che i consumers saranno pressoché simili, ci focalizzeremo solamente sul codice dell’EmailSender. Potete trovare però il codice completo sul mio repository.

Web Application (Publisher)

La nostra web app sarà responsabile dell’invio di un messaggio attraverso il nostro service bus.

L’istanza del bus verrà iniettata nel costruttore e sarà di tipo IBus. Questa interfaccia contiene le definizioni dei metodi necessari per poter inviare messaggi attraverso il bus.

public IndexModel(IBus bus){    _bus = bus;} 

Dopo aver cliccato un bottone della UI, verrà triggerato un handler method che invia il messagio.

public async Task<IActionResult> OnPostCreateMeetingAsync()
{
        var message = new MeetingCreatedMessage(Title, Creator, Participants?.Split(','), StartsOn, EndsOn); 
        await SendMessage(message);    
        return Page();
}

private async Task SendMessage(MeetingCreatedMessage message)
{
        try
        {
                await _bus.Publish(message);
                MessageSent = true;
        }
        catch
        {
                MessageSent = false;    
        }
}

Il messaggio che spediremo sarà di tipo MeetingCreatedMessage e verrà pubblicato; ciò significa che utilizzeremo il modello di comunicazione publish/subscibe, ideale per il nostro caso d’uso, poiché il messaggio avrà più consumers che dovranno gestirlo.

EmailSender Console Application (Consumer1)

La nostra prima console application, sarà il consumer responsabile di “inviare” le mail ai partecipanti del meeting. In questo progetto avremo una classe che si occuperà del consumo del messaggio.

public class MeetingCreatedMessageConsumer : IConsumer<MeetingCreatedMessage> 
{  ...  }

Come vedete, la classe implementa IConsumer<MeetingCreatedMessage>, un’interfaccia di MassTransit che indica che il nostro consumer sarà in grado di elaborare i messaggi del type parameter specificato.

public async Task Consume(ConsumeContext<MeetingCreatedMessage> context)
{
        var meeting = context.Message; 
        
        if (IsMeetingAlreadyEnded(meeting))    
        {
                return;
        }
        
        var emails = CreateEmails(meeting);
        await SendEmails(emails);
}

Quando un messaggio sarà disponibile nella coda a cui l’applicazione è agganciata, il middleware di MassTransit, leggendone il tipo nei metadati, individuerà il consumer che può gestire il tipo di messaggio e richiamerà il metodo Consume.

Configuriamo MassTransit

Negli snippets precedenti, abbiamo visto come pubblicare e come consumare un messaggio con MassTransit, ma non è finita qui: ciò che abbiamo visto finora è il codice che sfrutta il middleware per pubblicare/consumare messaggi ed è il codice indipendente dalla tecnologia di trasporto che utilizziamo. Ciò che ci manca è configurare il middleware di MassTransit per specificare il tipo di trasporto, i suoi settaggi e gli endpoints.

Notate che per ogni trasporto che volete utilizzare, dovrete installare l’apposito Nuget package di Masstransit, altrimenti non troverete gli extension methods che vi permettono di costruire l’istanza del bus.

Trasporto con Azure Service Bus

Creiamo un gruppo di risorse e una risorsa di tipo Service Bus selezionando il piano Standard, essenziale per poter creare i topics e sfruttare il modello publish/subscribe.

Nella sezione delle shared access policies, selezioniamo RootManageSharedAccessKey, clicchiamo e copiamo la Primary Connection String che dovremo passare a MassTransit per accedere e gestire le risorse del bus.

public void ConfigureServices(IServiceCollection services)
{
        services.AddRazorPages();

        var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg => 
        {
                cfg.Host(Configuration["AzureServiceBus:ConnectionString"]);
        });
        services.AddSingleton<IBus>(bus);
}

La creazione del bus è davvero semplice, lo costruiamo utilizzando il metodo CreateUsingAzureServiceBus e specificando la connection string precedentemente copiata.

static async Task Main()
{
        var emailSender = new FakeEmailSender();

        var bus = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
        {
                cfg.Host(Configuration["AzureServiceBus:ConnectionString"]);         
                cfg.ReceiveEndpoint(Configuration["AzureServiceBus:Queue"], endpoint =>
                {
                        endpoint.Consumer(() => new MeetingCreatedMessageConsumer(emailSender));
                });
        });
        await bus.StartAsync();
        Console.ReadKey();
        await bus.StopAsync();
} 

La configurazione del bus lato consumer è simile ma in aggiunta vanno specificati gli endpoints. Questi ultimi rappresentano l’astrazione di un percorso dal quale i messaggi verranno letti dalla nostra app. Con la definizione di un endpoint, se non esiste già, verrà creata una coda.

Specificare l’endpoint non basta: dobbiamo registrare i consumers che verranno richiamati alla ricezione dei messaggi. Quest’ultima operazione creerà, se non esiste già, un topic che avrà lo stesso nome del messaggio che il consumer può gestire.

Mandiamo in esecuzione i progetti e vediamo cosa accade sul nostro bus.

Andando nella sezione Topics, vedremo che MassTransit ha creato un nuovo topic a cui sono associate due subscription, una per ogni coda che deve ricevere una copia del messaggio.

Andando nella sezione Subscriptions vediamo quali code sono collegate al topic.

Infine, nella sezione Queues possiamo vedere le nostre code.

Inviamo dei messaggi attraverso la web app, scegliamo una coda qualunque, e vediamo nella sezione delle metriche che il numero di messaggi in entrata e in uscita aumenta. Notate che le metriche non vengono aggiornate in real-time, quindi dovreste attendere qualche minuto prima di poter osservare l’incremento dei messaggi. Per accertarci che i messaggi arrivino, possiamo vedere sulla console dei consumer alcuni messaggi indicanti che la mail e gli SMS sono stati inviati. Questo vuol dire che i nostri consumers hanno ricevuto ed elaborato i messaggi arrivati sulle relative code.

Trasporto con Amazon SQS/SNS

Il primo passo consiste nel creare un utente nella sezione Identity and Access Management (IAM) e genereare le chiavi di accesso programmatiche.

Assegnamogli le autorizzazioni AmazonSQSFullAccess e AmazonSNSFullAccess e copiamo le chiavi di accesso.

public void ConfigureServices(IServiceCollection services)
{
        services.AddRazorPages();

        var bus = Bus.Factory.CreateUsingAmazonSqs(cfg =>
        {
                cfg.Host(Configuration["AmazonSqs:Region"], host =>
                {
                        host.AccessKey(Configuration["AmazonSqs:AccessKey"]);            
                        host.SecretKey(Configuration["AmazonSqs:SecretKey"]);
                });
        });
        services.AddSingleton<IBus>(bus);
} 
static async Task Main()
{
        var emailSender = new FakeEmailSender();
        
        var bus = Bus.Factory.CreateUsingAmazonSqs(cfg =>
        {
                cfg.Host(Configuration["AmazonSqs:Region"], host =>
                {
                        host.AccessKey(Configuration["AmazonSqs:AccessKey"]);            
                        host.SecretKey(Configuration["AmazonSqs:SecretKey"]);
                });

                cfg.ReceiveEndpoint(Configuration["AmazonSqs:Queue"], endpoint =>
                {
                        endpoint.Consumer(() => new MeetingCreatedMessageConsumer(emailSender));
                });
        });
        await bus.StartAsync();
        Console.ReadKey();
        await bus.StopAsync();
}

Anche in questo caso la creazione e configurazione del bus è davvero molto semplice sia per il publisher che per il consumer. La differenza è che usiamo un differente metodo per creare un bus per il trasporto AWS SQS/SNS.

Mandiamo ora in esecuzione i progetti.

Come nel caso di ASB, viene creato un topic con una subscription per ogni coda che dovrà ricevere una copia del messaggio.

Ecco le code:

Andiamo nel tab Monitoring di una coda qualunque e, inviati alcuni messaggi, potremo vedere come la metrica “Number Of Messages Received” salga. Come nel caso di ASB, le metriche non sono aggiornate in real-time e bisogna attendere un po’ prima di osservarle.

Trasporto con RabbitMQ

In quest’ultima sezione vedremo come utilizzare RabbitMQ, il più popolare message broker attualmente in circolazione.

public void ConfigureServices(IServiceCollection services)
{
        services.AddRazorPages();

        var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
                cfg.Host(Configuration["RabbitMQ:Host"], host =>
                {
                        host.Username(Configuration["RabbitMQ:Username"]);            
                        host.Password(Configuration["RabbitMQ:Password"]);
                });
        });
        services.AddSingleton<IBus>(bus);
} 
static async Task Main()
{
        var emailSender = new FakeEmailSender();

        var bus = Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
                cfg.Host(Configuration["RabbitMQ:Host"], host =>
                {
                        host.Username(Configuration["RabbitMQ:Username"]);            
                        host.Password(Configuration["RabbitMQ:Password"]);
                });

                cfg.ReceiveEndpoint(Configuration["RabbitMQ:Queue"], endpoint =>
                {
                        endpoint.Consumer(() => new MeetingCreatedMessageConsumer(emailSender));
                });
        });
        await bus.StartAsync();
        Console.ReadKey();
        await bus.StopAsync();
}

Come vedete negli snippets, la configurazione è simile a quelle viste in precedenza, La differenza sta nel metodo della factory utilizzato per costruire il bus e nel modo in cui configuriamo l’host.

Mandiamo anche questa volta in esecuzione i tre progetti.

Nel tab Exchanges, vedrete che MassTransit creerà un exchange che ha il nome del tipo di messaggio da inviare e due exchanges che hanno lo stesso nome della coda a cui sono rispettivamente collegati.

Ecco le due code. 

Inviando dei messaggi e cliccando su una coda qualunque, vedremo che la frequenza dei messaggi aumenta.

Conclusioni

Abbiamo visto come MassTransit ci dia la possibilità di creare applicazioni che interoperano tra loro attraverso un Service Bus, in maniera agnostica rispetto al trasporto utilizzato, rendendo possibile in maniera trasparente intercambiare le diverse tecnologie di trasporto senza apportare quasi alcuna modifica al codice. Questo è solo uno dei tanti vantaggi che MassTransit ci permette di sfruttare, per approfondimenti vi consiglio di andare al sito ufficiale del progetto.

Se volete conoscere un’alternativa a MassTransit, vi consiglio di dare un’occhiata a questo articolo scritto da Genny che parla di Rebus.

Spero che la lettura sia stata di vostro gradimento e che vi possa essere utile.

Alla prossima! 

Scritto da

Claudio Manniti