facebook

Blog

Stay updated

Let's discover how Rebus, besides abstracting from transport, let us managing processes based on bus messages
Orchestrate events with Rebus
Wednesday, December 25, 2019

In the previous article, we saw how the use of a service bus could benefit software systems where the decoupling between their parts is necessary. In particular, we have seen how to do that with Rebus.

Rebus makes available a very interesting feature: the ability to orchestrate processes with a long duration (long-running process), and whose evolution is not sequential but based on the unleashing of events.
For this type of process, in literature, we can find the Process Manager Pattern.

The process manager is triggered by an event (trigger message) and acts as a central control unit managing the status of the sequence of steps of the process and deciding the actions to be taken based on intermediate results.

With Rebus, it is possible to model such scenarios as it provides the so-called Sagas. Saga is just another name for Process Manager, and so a saga is nothing but the implementation of a state machine whose transitions between states are determined by messages.

To achieve this implementation, we need:

  • A representation of the state of the saga
  • A particular handler that represents a model of the transitions and actions that constitute the saga itself
  • A mapping between the message fields and those representing the status of the saga.

A concrete example is the best way to understand these aspects better. To show the use of the sagas, I imagine a simple use case: the purchase of a product.

We have a client (Order Client) by which a user can choose a product to order and confirm the purchase. The purchase confirmation involves a payment service (PaymentService). The product acquisition process ends when a third service, OrderHandler, that receives information about the order and payment confirmation.

Let’s start with OrderClient.
Its implementation does not differ much from the Sender seen in the previous article.

using (var activator = new BuiltinHandlerActivator())
{
   var sagaDbConnectionString = ConfigurationManager.ConnectionStrings["SagasDB"].ToString();
   var bus = Configure.With(activator)
         .Transport(t = > t.UseMsmqAsOneWayClient())
         .Subscriptions(s => s.StoreInSqlServer(sagaDbConnectionString, "SubscriptionsTable", isCentralized: true))
         .Start();

This time we add to the bus configurations also the one that allows a publisher to store the subscriptions to the events published by it.

In this example, we choose to store subscriptions with SQL Server (don’t forget to add the Rebus.SqlServer package to the Solution) and to use centralized storage to make it in common to both the publisher and its subscribers.

Once the configuration is completed, OrderClient just publishes messages on the bus depending on the user inputs. Two types of messages will be published:

  • bus.Publish (new Order (orderId, orderType)). Wait ();
    Where orderId is a randomly generated integer (we will understand its importance later) and orderType is a string that represents the type of order.
  • bus.Publish (new OrderConfirmation (orderId, confirmation)). Wait ();
    If the user chooses to confirm the order, this message is sent with the same orderId and with the confirmation parameter, a boolean that brings with it the user’s choice to confirm or not the order.

From the log, it is also possible to see that the message of type Order is addressed to OrderHandler and the message of type OrderConfirmation to PaymentService.

The implementation of the PaymentService is also very simple.

namespace PaymentService
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var activator = new BuiltinHandlerActivator())
            {
                var db = ConfigurationManager.ConnectionStrings["SagasDB"].ToString();
                activator.Register((bus, context) => new Handler(bus));
 
                Configure.With(activator)
                    .Transport(t => t.UseMsmq("PaymentService"))
                    .Subscriptions(s => s.StoreInSqlServer(db, "SubscriptionsTable", isCentralized: true))
                    .Start();
 
                activator.Bus.Subscribe <OrderConfirmation>().Wait();
 
                Console.WriteLine("Press [enter] to exit.");
                Console.ReadLine();
            }
        }
    }
 
    class Handler : IHandleMessages <OrderConfirmation>
    {
        readonly IBus _bus;
 
        public Handler(IBus bus)
        {
            _bus = bus;
        }
 
        public async Task Handle(OrderConfirmation message)
        {
            if (message.Confirmation)
                Console.WriteLine("Payment accepted");
            var payment = true;
            _bus.Publish(new OrderPayment(message.Id, payment)).Wait();
        }
    }
}

Once we configure the bus, we subscribe to messages of type OrderConfirmation. The handler of these messages also publishes an OrderPayment message to which OrderHandler will be subscribed.

Finally, we arrive to the most interesting part, as OrderHandler will contain the implementation of the saga.

namespace OrderHandler
{
    class Program
    {
        static void Main(string[] args)
        {
            var sagaDbConnectionString = ConfigurationManager.ConnectionStrings["SagasDB"].ToString();
 
            using (var activator = new BuiltinHandlerActivator())
            {
                activator.Register((bus, context) => new DemoSaga(bus));
 
                Configure.With(activator)
                    .Subscriptions(s => s.StoreInSqlServer(sagaDbConnectionString, "SubscriptionsTable", isCentralized: true))
                    .Sagas(s => s.StoreInSqlServer(sagaDbConnectionString, "Sagas", "SagaIndex"))
                    .Transport(t => t.UseMsmq("OrderHandler"))
                    .Start();
 
                activator.Bus.Subscribe <Order>().Wait();
                activator.Bus.Subscribe <OrderPayment>().Wait();
 
                Console.WriteLine("Press ENTER to quit");
                Console.ReadLine();
            }
        }
    }
}

We added a further configuration of the bus where we are going to set where the information of the saga will be saved.

Sagas(s => s.StoreInSqlServer(sagaDbConnectionString, "Sagas", "SagaIndex"))

Then we subscribe to OrderHandler messages of type Order and OrderPayment.
But how is the DemoSaga handler made?

   class DemoSaga : Saga <DemoSagaData>,
        IAmInitiatedBy <Order>, IAmInitiatedBy <OrderPayment>
    {
...        

In his statement, we see that DemoSaga implements the abstract class Saga, which is made concrete by the definition of the type DemoSagaData.

class DemoSagaData : ISagaData
    {
         
        public Guid Id { get; set ; }
        public int Revision { get; set; }
 
        public int OrderId { get; set; }
        public string OrderType { get; set; }
        public bool OrderReceived { get; set; }
        public bool PaymentReceived { get; set; }
        public bool Complete()
        {
            return OrderReceived && PaymentReceived;
        }
    }

DemoSagaData implements the ISagaData interface. In this way, we are going to represent the state of the saga.

Moreover, in the declaration, we see that DemoSaga also implements IAmInitiatedBy<TMessage> an interface that allows indicating to a handler of messages which types are allowed to start a new saga.

protected override void
CorrelateMessages(ICorrelationConfig<DemoSagaData> config)
        {
            config.Correlate<Order>(m => m.Id, d => d.OrderId);
            config.Correlate<OrderPayment> (m => m.Id, d => d.OrderId);
        }

We make the override of the CorrelateMessages method (ICorrelationConfig<TSagaData> config), which allows us to correlate incoming messages with the status of the saga to avoid having multiple instances of the saga. In this example, we used ordered as the correlation ID between messages and saga.

        public async Task Handle(Order message)
        {
            if (!IsNew) return;
 
            Data.OrderType = message.Type;
            Data.OrderReceived = true;
 
            Console.WriteLine(@"
Order Received -> Type: " + message.Type);
 
            await CompleteSaga();
        }
 
        public async Task Handle(OrderPayment message)
        {
            if (IsNew) return;
 
            Data.PaymentReceived = message.Payment;
             
            Console.WriteLine(@"
Payment Received!
");
            await CompleteSaga();
 
        }
 
        async Task CompleteSaga()
        {
            if (Data.Complete())
            {
                Console.WriteLine(@"
Order Complete
");
                MarkAsComplete();
            }
        }

Then we implement the Handle() methods for the two incoming messages. The use of IsNew, a parameter of the abstract class Saga, allows us to verify if we are using a new saga. Therefore, once again, we are going to guarantee that we are using a single instance of the saga for the current orderId.

After saving the information received from the messages in the status of the saga, the CompleteSaga () method is called which verifies if we have received the order and if the payment was successful. If the outcome of the order is positive, the MarkAsComplete () method is invoked, which establishes the end of the saga by going to delete the related information from the chosen storage.

In the image, there are the Sagas and SagaIndex tables whose data will be removed upon completion of the saga, and we also show the SubscriptionTable table where messages with their subscribers are mapped.

I hope this article has intrigued you as it was for me to discuss this topic. I leave the link of the repository where you can find the implementation of the example in the sagas branch( https://github.com/intersect88/RebusDemo/tree/sagas ).

To the next article.

Let the saga continue…