facebook

Blog

Resta aggiornato

Vediamo come implementare Actor Model in .NET
Scalabilità e resilienza con gli Akka Actors
mercoledì 10 Marzo 2021

Un nostro cliente, operante nell’ambito del gaming, ci ha commissionato una piattaforma per la gestione di un campionato di calcio virtuale, con classifiche sia di squadra che individuali.

Dopo aver analizzato il problema, ci siamo chiesti come realizzare un sistema che tenesse aggiornate le classifiche e le statistiche a fronte dei risultati di singole partite (o magari, ad esempio, del gol o dell’ammonizione di un calciatore), mantenendo il sistema scalabile e resiliente.

Una delle soluzioni migliori che abbiamo trovato è stata utilizzare gli Akka Actors, disponibili su ambiente .NET tramite il pacchetto Akka.NET (https://getakka.net/articles/intro/what-is-akka.html). Vengono utilizzati con particolare successo nei sistemi distribuiti e nelle architetture virtualizzate, in cui componenti possono talvolta bloccarsi senza dare alcuna risposta, i messaggi possono essere persi e c’è latenza di rete.

Per far fronte a queste esigenze, Akka.NET ci offre:

  • Comportamento multi-thread anche senza l’utilizzo di costrutti di concorrenza di basso livello come atomics o lock;
  • Comunicazione remota trasparente tra i sistemi e i loro componenti; 
  • Un’architettura clusterizzata, ad alta disponibilità, elastica, scalabile. 

Inoltre, vengono messi a disposizione degli sviluppatori pacchetti come Akka.Cluster e Akka.Cluster.Sharding, che consentono la configurazione di cluster per distribuire gli attori su più nodi.

Akka.NET si basa sul pattern Actor model (https://hal.archives-ouvertes.fr/hal-01163534v7/document) e fornisce un insieme di librerie per implementare correttamente questo pattern, in modo da ottenere un livello più alto di astrazione e rendere più facile la progettazione di sistemi concorrenti, paralleli e distribuiti.

Actor Model 

La principale caratteristica degli attori è quella di modellare il mondo come entità stateful che comunicano tra loro scambiandosi messaggi, per cui, l’Actor model fornisce un’astrazione che consente di progettare un’applicazione in termini di comunicazione tra i componenti. 

Come entità computazionali, gli attori hanno le seguenti caratteristiche: 

  1. Comunicano tra loro con messaggi asincroni
  2. Gestiscono il proprio stato 
  3. Quando rispondono a un messaggio, possono: 
    • Creare altri attori figli
    • Inviare messaggi ad altri attori 
    • Fermare sé stessi o gli attori figli 

Di conseguenza, nell’Actor model due attori non potranno mai condividere la stessa memoria, né modificare direttamente lo stato di un altro attore. Dovrà sempre mandare un messaggio all’attore che vuole modificare e sarà quindi il ricevente che si prenderà cura di recepire ed elaborare il messaggio.

Actor Model

Architettura applicazione 

Andiamo quindi a introdurre la nostra soluzione, con un occhio anche all’architettura degli Akka Actors.

Essa sarà composta da un applicazione front-end scritta in Angular, uno strato di Web API .NET Core actor based con supporto al long polling tramite l’utilizzo di un hub SignalR e, infine, un simulatore di eventi.

Application diagram

Quando si crea un attore in Akka.NET, questo ha sicuramente un genitore, poiché gli attori sono organizzati in una struttura ad albero, e ogni attore viene creato all’interno di un altro. In maniera gerarchica, l’attore creato diventa figlio dell’attore genitore da cui è stato creato.

Al vertice dell’albero, c’è un livello predefinito costituito da 3 attori: root guardian (/), genitore di tutti gli attori del sistema, che ha come figli user guardian (/user), che è genitore di tutti gli attori creati dall’utente, e il system guardian (/system), usato per supervisionare e assicurare la chiusura ordinata degli attori, preservando il logging degli eventi. I nomi degli attori di sistema contengono la parola guardian poiché essi fungono da supervisori per tutti i figli creati sotto di essi.

Akka tree

Nel nostro progetto abbiamo introdotto tre tipi di attori: due di primo livello, ovvero MatchesActor, che servirà per aggiornare i dati relativi a tutte le partite, e StandingsActor, per i dati della classifica, e uno di secondo livello, MatchActor, che servirà a gestire i dati della singola partita.

var firstRef = System.ActorOf(Props.Create(() => new MatchesActor()), "matches"); 

Console.WriteLine($"First: {firstRef}"); 

Che restituirà in output: 

First: Actor[akka://akkaActors/user/matches#1213311313] 

Nel nostro progetto, la configurazione degli attori di primo livello è stata realizzata nella classe ActorsEnvironment, in particolar modo nel metodo Configure, così realizzato: 

public static ActorsEnvironment Configure(IServiceProvider provider) 

{ 

        var hubContext = provider.GetService<IHubContext<Hub>>(); 

        var system = ActorSystem.Create("akkaActors"); 
 
        system.ActorOf(Props.Create(() => new MatchesActor()), "matches"); 
        system.ActorOf(Props.Create(() => new StandingsActor()), "standings"); 

        return new ActorsEnvironment(system); 

} 

Tale metodo viene richiamato nello Startup.cs, in ConfigureServices:

 services.AddSingleton<ActorsEnvironment>(context => ActorsEnvironment.CreateAndConfigure(context));

Quindi, nel nostro caso, l’attore di primo livello di tipo MatchesActor, avrà il path /user/matches.

Abbiamo inoltre definito delle costanti per i path dei nostri Actors:

public class ActorPaths 

{ 
        public const string MatchesActor = "/user/matches"; 
        public const string StandingsActor = "/user/standings"; 
        public const string SignalRActor = "/user/signalR"; 

        public const string AllActors = "/user/*/*"; 
 } 

Per creare attori di livello non superiore, invece, bisogna invocare Context.ActorOf() dall’interno dell’attore padre, come da codice seguente:

public class MatchActor : UntypedActor 
{ 
} 

public class MatchesActor : UntypedActor 
{ 

        protected override void OnReceive(object message) 
        { 
                switch (message) 
                { 
                        case "create-new": 
                        IActorRef secondRef = Context.ActorOf(Props.Create(() => new MatchActor()), $"first-match"); 
                        Console.WriteLine($"Second: {secondRef}"); 
                        break; 
                } 
        } 
} 

var firstRef = Sys.ActorOf(Props.Create<MatchesActor>(), "matches"); 
Console.WriteLine($"First: {firstRef}"); 
firstRef.Tell("create-new", ActorRefs.NoSender); 

In questo modo creiamo un attore di primo livello, ne stampiamo il riferimento, e gli inviamo un messaggio contenente l’id del figlio eventualmente da creare e di cui viene stampato il riferimento.

L’output generato è:

First : Actor[akka://akkaActors/user/matches#1213311313] 
Second: Actor[akka://akkaActors/user/matches/first-match#12261716] 

Ciclo di vita 

Un attore può essere stoppato tramite l’invocazione del metodo Context.Stop(actorRef), ma, come best practice, è preferibile inviare un messaggio all’attore che si intende stoppare e lasciare che sia lui a fermare sé stesso, tramite l’invocazione di Context.Stop(self).

Quando un attore viene stoppato, lo stesso avviene ricorsivamente per tutti i suoi figli, in modo da evitare perdita di risorse e semplificare la pulizia della memoria.

Le API degli actors permettono l’override dei metodi PreStart(), invocato dopo che l’attore è stato creato ma prima che legga il suo primo messaggio, e PostStop(), invocato dopo che l’attore è stato fermato.

Nel nostro caso:

public class MatchActor : ReceiveActor 
{ 
        protected override void PreStart() 
        { 
                Console.WriteLine($"Match actor {Self.Path.Name} started"); 
        } 

        protected override void PostStop() 
        { 
                Console.WriteLine($"Match actor {Self.Path.Name} stopped"); 
        } 

} 

public class MatchesActor : ReceiveActor 
{ 
        protected override void PreStart() 
        { 
                Console.WriteLine($"Matches actor {Self.Path.Name} started"); 
        } 

        protected override void PostStop() 
        { 
                Console.WriteLine($"Matches actor {Self.Path.Name} stopped"); 
        } 

        protected override void OnReceive(object message) 
        { 
                switch (message) 
                { 
                        case "create-new": 
                                IActorRef secondRef = Context.ActorOf(Props.Create(() => new MatchActor()), $"first-match"); 
                        
                                break;             
                        case "stop": 
                                Context.Stop(Self); 
                                break;             
                }   

        }
 
} 

var firstRef = Sys.ActorOf(Props.Create<MatchesActor>(), "matches"); 
firstRef.Tell("create-new", ActorRefs.NoSender); 
firstRef.Tell("stop"); 

L’output generato è: 

Matches actor matches started 
Match actor first-match started 
Match actor first-match stopped 
Matches actor matches stopped 

Da ciò si evince che l’invocazione del PostStop() per gli attori figli avviene prima di quella del padre.

Gestione degli errori 

Quando un attore va in errore, viene temporaneamente sospeso e l’informazione dell’errore viene propagata al genitore che deve decidere come gestirlo.  

Se non viene modificata la gestione degli errori, la strategia di default prevede di stoppare e far ripartire l’attore figlio, come si evince dall’esempio di seguito. 

public class MatchActor : ReceiveActor 
{ 
        protected override void PreStart() 
        { 
                Console.WriteLine($"Match actor {Self.Path.Name} started"); 
        } 

        protected override void PostStop() 
        { 
                Console.WriteLine($"Match actor {Self.Path.Name} stopped"); 
        } 

        protected override void OnReceive(object message) 
        { 
                switch (message) 
                { 
                        case "fail": 
                                Console.WriteLine("Match actor fails"); 
                                throw new Exception("I failed!"); 
                } 
        } 
} 

public class MatchesActor : ReceiveActor 
 { 
        protected override void PreStart() 
        { 
                Console.WriteLine($"Matches actor {Self.Path.Name} started"); 
        } 

        protected override void PostStop() 
        { 
                Console.WriteLine($"Matches actor {Self.Path.Name} stopped"); 
        } 


protected override void OnReceive(object message) 
        { 
                switch (message) 
                { 
                        case "fail-child": 
                                IActorRef child = Context.ActorOf(Props.Create(() => new MatchActor()), $"first-match"); 

                                child.Tell("fail"); 
                                break;             
                } 

        } 

  } 

var firstRef = Sys.ActorOf(Props.Create<MatchesActor>(), "matches"); 
firstRef.Tell("fail-child", ActorRefs.NoSender); 
firstRef.Tell("stop"); 

L’output generato è: 

Matches actor matches started 
Match actor first-match started 
Match actor fails 
Matches actor matches stopped 
Matches actor matches started 

[ERROR][11.02.2021 13:34:50][Thread 0003][akka://akkaActors/user/matches/first-match] I failed! 
Cause: System.Exception: I failed! 
   at AkkaActors.MatchActor.OnReceive(Object message) 
   at Akka.Actor.UntypedActor.Receive(Object message) 
   at Akka.Actor.ActorBase.AroundReceive(Receive receive, Object message) 
   at Akka.Actor.ActorCell.ReceiveMessage(Object message) 
   at Akka.Actor.ActorCell.Invoke(Envelope envelope) 

Nell’output notiamo che l’attore padre viene fermato e riparte immediatamente, e vengono richiamati in successione PostStop() e PreStart().
Per avere un comportamento diverso dalla prima esecuzione a quelle successive, possiamo fare l’override dei metodi PreRestart() e PostRestart().

Per quanto riguarda gli eventi gestiti, abbiamo 3 tipi per l’actor MatchesActor:

  • MatchStarted,
  • MatchEnded 
  • MatchScoreChanged. 

Questi metodi sono così definiti nel codice:

public class MatchStarted 
{ 
        public int MatchId { get; set; } 
} 

 public class MatchEnded 
{ 
        public int MatchId { get; set; } 
} 

 public class MatchScoreChanged 
{ 
        public int MatchId { get; set; } 
        public int Team1Score { get; set; } 
        public int Team2Score { get; set; } 
} 

Essi vengono gestiti registrando degli handler in MatchesActor, in questo modo: 

public MatchesActor() 
{ 
        ReceiveAsync<MatchStarted>(request => Handle(request)); 
        ReceiveAsync<MatchEnded>(request => Handle(request)); 
        ReceiveAsync<MatchScoreChanged>(request => Handle(request)); 
} 

private IActorRef GetMatchActor(int matchId) 
{ 
        var childActor = Context.Child($"{matchId}"); 

        if (childActor is Nobody) 
        { 
                childActor = Context.ActorOf(Props.Create(() => new MatchActor()), $"{matchId}"); 
        } 

         return childActor; 

} 

 private async Task Handle(MatchStarted request) 
{ 
        var child = GetMatchActor(request.MatchId); 
        child.Forward(request); 
} 
 // [...] 

In questo modo, i messaggi vengono inoltrati al MatchActor di riferimento, che li gestisce: 

public MatchActor() 
{ 
        ReceiveAsync<MatchStarted>(request => Handle(request)); 
        ReceiveAsync<MatchEnded>(request => Handle(request)); 
        ReceiveAsync<MatchScoreChanged>(request => Handle(request)); 
} 

 private async Task Handle(MatchStarted request) 
{ 
        CurrentMatch.Started = true; 
        CurrentMatch.Team1Score = 0; 
        CurrentMatch.Team2Score = 0; 

         Context.ActorSelection(ActorPaths.StandingsActor).Tell(new StandingsChanged() {  
                Teams = new Dictionary<string, int>() 
                { 
                        { CurrentMatch.Team1, 1 }, 
                        { CurrentMatch.Team2, 1 } 
                } 

        }); 
        Context.ActorSelection(ActorPaths.SignalRActor).Tell(request); 

} 
 // [...] 

Dove StandingsChanged è definito così: 

public class StandingsChanged 
{ 
        public Dictionary<string, int> Teams { get; set; } 
} 

MatchActor gestisce la logica di aggiornamento della classifica e inoltra il messaggio prima allo StandingsActor che aggiorna la classifica ed è così definito:

public class StandingsActor : ReceiveActor 
{ 
        private Standings CurrentStandings { get; set; } 
        public StandingsActor() 

        { 
                Receive<StandingsChanged>(request => Handle(request)); 
        } 
  
        private void Handle(StandingsChanged request) 
        { 
                CurrentStandings = new Standings() 
                { 
                        Teams = request.Teams 
                }; 

                Context.ActorSelection(ActorPaths.SignalRActor).Tell(request); 

        } 

 }  

Successivamente, se c’è necessità che gli gli eventi vengano aggiornati nella UI, possono essere inoltrati al SignalRActor, che si occuperà di fare il polling in broadcast dei dati da mostrare nella UI in base alla tipologia di messaggio da gestire:

public class SignalRActor : ReceiveActor 
{ 
        private readonly IHubContext<Hub> hubContext; 

        public SignalRActor(IHubContext<Hub> hubContext) 
        { 
                this.hubContext = hubContext; 

                Receive<MatchStarted>(message => Handle(message)); 
                Receive<MatchEnded>(message => Handle(message)); 
                Receive<MatchScoreChanged>(message => Handle(message)); 
                Receive<StandingsChanged>(message => Handle(message)); 

        } 

         private void Handle(MatchStarted message) 
        { 
                hubContext.Clients.All.SendAsync("BroadcastMatchStarted", message); 
        } 
  
        private void Handle(MatchEnded message) 
        { 
                hubContext.Clients.All.SendAsync("BroadcastMatchEnded", message); 
        } 

        private void Handle(MatchScoreChanged message) 
        { 
                hubContext.Clients.All.SendAsync("BroadcastMatchScoreChanged", message); 
        } 
  
        private void Handle(StandingsChanged message) 
        { 
                 hubContext.Clients.All.SendAsync("BroadcastStandingsChanged", message); 
        } 

} 

I nostri Web API controllers utilizzano gli attori ed effettuano delle operazioni tramite essi.

Ad esempio, il MatchesController contiene i 3 endpoint per la modifica dello stato dei match:

 [HttpPut] 
[Route("{matchId}/start")] 
public IActionResult StartMatch(int matchId) 
{ 
        environment 
                .SelectActor(ActorPaths.MatchesActor) 
                .Tell(new MatchStarted 
                { 
                        MatchId = matchId 
                }); 

         return Ok(); 

} 

[HttpPut] 
[Route("{matchId}/stop")] 
public IActionResult StopMatch(int matchId) 
{ 
        environment 
                .SelectActor(ActorPaths.MatchesActor) 
                .Tell(new MatchEnded 
                { 
                        MatchId = matchId 
                }); 

        return Ok(); 

} 
 
[HttpPut] 
[Route("{matchId}/changescore/{team1Score}/{team2Score}")] 
public IActionResult ChangeScore(int matchId, int team1Score, int team2Score) 
{ 
        environment 
                .SelectActor(ActorPaths.MatchesActor) 
                .Tell(new MatchScoreChanged 
                { 
                        MatchId = matchId, 
                        Team1Score = team1Score, 
                        Team2Score = team2Score 
                }); 
  
        return Ok(); 

} 

Lato applicazione front-end, è importante segnalare l’implementazione di un SignalRService, che serve a intercettare le comunicazioni con l’hub di SignalR: 

export class SignalRService { 
        private hub: HubConnection; 
        
        constructor() { 
                this.hub = new signalR.HubConnectionBuilder() 
                       	.withUrl(`${environment.websocketUrl}akkaActorsHub`) 
        		.build(); 

                this.hub.start().then(() => { 
               	        console.log(`Hub connection started`); 
    		        }).catch(err => document.write(err)); 

        } 

         public on<T>(name: string): Observable<T> { 
                const subject = new Subject<T>(); 

                this.hub.on(name, (data) => { 
                subject.next(data); 
                }); 

                return subject.asObservable(); 

        } 

} 

Questa classe viene poi iniettata nei component Angular in cui è necessaria, più precisamente nel metodo ngOnInit, in questo modo:

ngOnInit() { 
        const onMatchScoreChanged$: Observable<any> = this.signalR.on(`BroadcastMatchScoreChanged`); 
        onMatchScoreChanged$.subscribe((message: any) => { 
                //Do something 
        }); 
        } 
}); 

Per i nostri test abbiamo creato un simulatore di eventi che provvede a far iniziare due match e cambia i risultati, prima di fermarli definitivamente.

Gli eventi vengono inviati tramite protocollo REST alle Web API:

private static void Send(string endpoint, StringContent content) 
{ 
        _client.PostAsync(endpoint, content).Wait(); 
} 

 static void Main(string[] args) 
{ 
        _client = new HttpClient { BaseAddress = new Uri("http://localhost:5000") }; 

        _client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); 

        Send("api/matches/1/start", ""); 
        Send("api/matches/2/start", ""); 

        var content = new StringContent(JsonConvert.SerializeObject(new 
        { 
                Team1Score = 1, 
                Team2Score = 0 
        }), Encoding.UTF8, "application/json"); 

        Send("api/matches/2/updatescore", content); 

        // other events… 

        Send("api/matches/2/stop", ""); 
        Send("api/matches/1/stop", ""); 

}	 

Esecuzione progetto 

Avviando la nostra soluzione di test, possiamo vedere una dashboard con dei dati che si aggiornano ogni qual volta arriva un evento dal simulatore:

application running

Mi auguro di aver suscitato la vostra curiosità sull’argomento. Sul nostro blog, troverai altri articoli sull’architettura software.

Il codice sorgente utilizzato in questo articolo è disponibile qui

Al prossimo articolo.

Scopri di più da Blexin

Abbonati ora per continuare a leggere e avere accesso all'archivio completo.

Continue reading