facebook

Blog

Resta aggiornato

Vediamo come Rebus, oltre ad astrarci dal trasporto, ci permetta di gestire processi sulla base dei messaggi sul bus
Orchestrare gli eventi con Rebus
mercoledì 25 Dicembre 2019

Nel precedente articolo, abbiamo visto come l’utilizzo di un service bus possa apportare benefici in sistemi software dove è necessario il disaccoppiamento tra le parti che li compongono. In particolare, abbiamo parlato di Rebus.

Rebus mette a disposizione una funzionalità molto interessante: poter orchestrare processi che hanno una lunga durata (long running process), e la cui evoluzione non è sequenziale, ma basata sullo scatenarsi di eventi.
Per questo tipo di processi, esiste in letteratura il Process Manager Pattern.

Il process manager viene attivato da un evento (trigger message) ed opera come unità centrale di controllo, gestendo lo stato della sequenza di step che compongono il processo e decidendo le azioni da intraprendere in base a risultati intermedi.

Con Rebus, è possibile modellare scenari del genere, in quanto mette a disposizione le cosiddette Sagas. Saga è solo un altro nome per chiamare un Process Manager e quindi una saga non è altro che l’implementazione di una macchina a stati le cui transizioni tra stati sono determinate da messaggi.

Per ottenere tale implementazione abbiamo bisogno di:

  • Una rappresentazione dello stato della saga;
  • Un particolare handler che rappresenti un modello delle transizioni e azioni che costituiscano la saga stessa;
  • Una mappatura tra i campi dei messaggi e quelli che rappresentano lo stato della saga.

Per capire meglio questi aspetti, è opportuno cimentarsi con un esempio concreto. Ho pensato ad un caso d’uso molto semplice: l’acquisto di un prodotto.

Avremo un client (OrderClient), attraverso il quale un utente può scegliere un prodotto da ordinare e confermarne l’acquisto. La conferma dell’acquisto coinvolge un servizio di pagamento (PaymentService). Il processo di acquisizione del prodotto termina quando un terzo servizio, OrderHandler, riceve informazioni sull’ordine e sulla conferma di pagamento.

Partiamo da OrderClient.
La sua implementazione non si discosta molto dal Sender visto nell’articolo precedente.

using (var activator = new BuiltinHandlerActivator())
{
   var sagaDbConnectionString = ConfigurationManager.ConnectionStrings["SagasDB"].ToString();
   var bus = Configure.With(activator)
         .Transport(t = > t.UseMsmqAsOneWayClient())
         .Subscriptions(s => s.StoreInSqlServer(sagaDbConnectionString, "SubscriptionsTable", isCentralized: true))
         .Start();

Questa volta, però, aggiungiamo alle configurazioni del bus anche quella che consente ad un publisher di memorizzare le sottoscrizioni agli eventi da esso pubblicati.

In questo esempio, abbiamo scelto di memorizzare le sottoscrizioni con SQL Server (ricordate di aggiungere il pacchetto Rebus.SqlServer alla Solution) e di utilizzare uno storage centralizzato, per fare in modo che possa essere comune tra il publisher e i suoi subscribers.

Terminata la configurazione, OrderClient non fa altro che pubblicare dei messaggi sul bus a seconda degli input dell’utente. In particolare, ne saranno pubblicati due tipi:

  • bus.Publish(new Order(orderId, orderType)).Wait();
    Dove orderId è un intero generato casualmente (in seguito ne capiremo l’importanza) e orderType è una stringa che rappresenta il tipo di ordine;
  • bus.Publish(new OrderConfirmation(orderId, confirmation)).Wait();
    Se l’utente sceglie di confermare l’ordine, viene inviato questo messaggio con lo stesso orderId e con il parametro confirmation, un booleano che porta con sé la scelta dell’utente di confermare o meno l’ordine.

Dal log è possibile vedere inoltre che il messaggio di tipo Order è indirizzato a OrderHandler mentre il messaggio di tipo OrderConfirmation a PaymentService.

Anche l’implementazione del PaymentService è molto semplice.

namespace PaymentService
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var activator = new BuiltinHandlerActivator())
            {
                var db = ConfigurationManager.ConnectionStrings["SagasDB"].ToString();
                activator.Register((bus, context) => new Handler(bus));
 
                Configure.With(activator)
                    .Transport(t => t.UseMsmq("PaymentService"))
                    .Subscriptions(s => s.StoreInSqlServer(db, "SubscriptionsTable", isCentralized: true))
                    .Start();
 
                activator.Bus.Subscribe <OrderConfirmation>().Wait();
 
                Console.WriteLine("Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
 
    class Handler : IHandleMessages <OrderConfirmation>
    {
        readonly IBus _bus;
 
        public Handler(IBus bus)
        {
            _bus = bus;
        }
 
        public async Task Handle(OrderConfirmation message)
        {
            if (message.Confirmation)
                Console.WriteLine("Payment accepted");
            var payment = true;
            _bus.Publish(new OrderPayment(message.Id, payment)).Wait();
        }
    }
}

Dopo aver configurato il bus, effettuiamo la sottoscrizione ai messaggi di tipo OrderConfirmation. L’handler di tali messaggi si occupa inoltre di pubblicare un messaggio di tipo OrderPayment a cui sarà sottoscritto OrderHandler.

Veniamo infine alla parte più interessante in quanto OrderHandler conterrà l’implementazione della saga.

namespace OrderHandler
{
    class Program
    {
        static void Main(string[] args)
        {
            var sagaDbConnectionString = ConfigurationManager.ConnectionStrings["SagasDB"].ToString();
 
            using (var activator = new BuiltinHandlerActivator())
            {
                activator.Register((bus, context) => new DemoSaga(bus));
 
                Configure.With(activator)
                    .Subscriptions(s => s.StoreInSqlServer(sagaDbConnectionString, "SubscriptionsTable", isCentralized: true))
                    .Sagas(s => s.StoreInSqlServer(sagaDbConnectionString, "Sagas", "SagaIndex"))
                    .Transport(t => t.UseMsmq("OrderHandler"))
                    .Start();
 
                activator.Bus.Subscribe <Order>().Wait();
                activator.Bus.Subscribe <OrderPayment>().Wait();
 
                Console.WriteLine("Press ENTER to quit");
                Console.ReadLine();
            }
        }
    }
}

Abbiamo aggiunto un’ulteriore configurazione del bus, dove andiamo a impostare dove andranno salvate le informazioni della saga.

Sagas(s => s.StoreInSqlServer(sagaDbConnectionString, "Sagas", "SagaIndex"))

Successivamente, sottoscriviamo OrderHandler ai messaggi di tipo Order e OrderPayment.
Ma com’è fatto l’handler di tali messaggi DemoSaga?

   class DemoSaga : Saga <DemoSagaData>,
        IAmInitiatedBy <Order>, IAmInitiatedBy <OrderPayment>
    {
...        

Nella sua dichiarazione, vediamo che DemoSaga implementa la classe astratta Saga che viene resa concreta dalla definizione del tipo DemoSagaData.

class DemoSagaData : ISagaData
    {
         
        public Guid Id { get; set ; }
        public int Revision { get; set; }
 
        public int OrderId { get; set; }
        public string OrderType { get; set; }
        public bool OrderReceived { get; set; }
        public bool PaymentReceived { get; set; }
        public bool Complete()
        {
            return OrderReceived && PaymentReceived;
        }
    }

DemoSagaData implementa l’interfaccia ISagaData: in questo modo andiamo a rappresentare lo stato della saga.

Inoltre, sempre nella dichiarazione, vediamo che DemoSaga implementa anche IAmInitiatedBy<TMessage> un’interfaccia che consente di indicare ad un handler di messaggi quali tipi hanno il permesso di iniziare una nuova saga.

protected override void
CorrelateMessages(ICorrelationConfig<DemoSagaData> config)
        {
            config.Correlate<Order>(m => m.Id, d => d.OrderId);
            config.Correlate<OrderPayment> (m => m.Id, d => d.OrderId);
        }

L’override del metodo CorrelateMessages (ICorrelationConfig<TSagaData> config) ci consentirà di correlare i messaggi in ingresso con lo stato della saga, per evitare di avere sue istanze multiple. In questo esempio, abbiamo usato orderId come ID di correlazione tra messaggi e saga.

        public async Task Handle(Order message)
        {
            if (!IsNew) return;
 
            Data.OrderType = message.Type;
            Data.OrderReceived = true;
 
            Console.WriteLine(@"
Order Received -> Type: " + message.Type);
 
            await CompleteSaga();
        }
 
        public async Task Handle(OrderPayment message)
        {
            if (IsNew) return;
 
            Data.PaymentReceived = message.Payment;
             
            Console.WriteLine(@"
Payment Received!
");
            await CompleteSaga();
 
        }
 
        async Task CompleteSaga()
        {
            if (Data.Complete())
            {
                Console.WriteLine(@"
Order Complete
");
                MarkAsComplete();
            }
        }

Successivamente, implementiamo i metodi Handle() per i due messaggi in ingresso. L’utilizzo di IsNew, un parametro della classe astratta Saga, consente di verificare se stiamo utilizzando un’unica istanza della saga per l’orderId corrente.

Dopo aver salvato le informazioni ricevute dai messaggi nello stato della saga, viene chiamato il metodo CompleteSaga() che verifica la ricezione dell’ ordine e il buon fine del pagamento. Se l’esito dell’ordine è positivo, viene invocato il metodo MarkAsComplete() che sancisce la fine della saga andando ad eliminare le relative informazioni dallo storage scelto.

Nell’immagine sono presenti, appunto, le tabelle Sagas e SagaIndex i cui dati verranno rimossi al completamento della saga e mostriamo inoltre la tabella SubscriptionTable dove sono mappati i messaggi con i relativi subscribers.

Spero che questo articolo vi abbia incuriosito com’è stato per me trattare questo argomento.
Lascio il link del repository dove potete trovare l’implementazione dell’esempio nel branch sagas ( https://github.com/intersect88/RebusDemo/tree/sagas ).

Al prossimo articolo.

E che la saga continui…