
Nei miei precedenti articoli, abbiamo visto come sia utile utilizzare tecnologie e soluzioni per la comunicazione come RabbitMQ o un ESB (Enterprise Service Bus) come Rebus, siano essi on premise o cloud, per disaccoppiare la comunicazione nei sistemi software.
Non molti anni fa, in Linkedin provarono ad utilizzare ActiveMQ per tentare di risolvere un problema relativo all’accesso in tempo reale a dati di monitoring e di tracciabilità delle attività degli utenti, che, peraltro, presentavano anche delle correlazioni tra essi. La prova non andò a buon fine, in quanto, con ActiveMQ, non riuscivano a scalare per gestire l’enorme quantità di traffico. Decisero, quindi, di sviluppare un sistema di messaggistica che prevedesse sia la persistenza dei messaggi, in modo da renderli fruibili a più applicazioni, sia la possibilità di scalare orizzontalmente in modo da gestire una grossa quantità di dati. Infine decisero di dargli un nome di uno scrittore che suonasse bene e fu così che nacque Kafka (Kafka: The Definitive Guide).
Concetti fondamentali
Inizialmente, è stato dunque creato come un “distributed publish/subscribe messaging system” fino ad evolversi in una piattaforma di streaming distribuita. Di base, avremo un Publisher (Producer) che pubblica messaggi su di un Broker (Kafka Broker) e un Subscriber (Consumer) che si sottoscrive ad essi.

Il messaggio è l’unità di informazione che viaggia in Kafka e non è altro che un array di byte che, opzionalmente, può essere accompagnato da una key utilizzata per gestire la scrittura del messaggio. In genere, per motivi di efficienza, in Kafka si preferisce trasmettere messaggi in batch.
Quando un broker riceve un messaggio da un producer, lo categorizza in un Topic. Di fatto, il topic è la rappresentazione effettiva dello stream di dati che passa dal producer al consumer. I topic sono a loro volta suddivisi in partitions. Una partizione è una sequenza ordinata ed immutabile di messaggi che vengono immagazzinati uno dopo l’altro e che può essere letta solo nell’ordine in cui sono stati scritti. Infatti, i messaggi non saranno ordinati all’interno del topic ma lo saranno a livello di partizioni, assegnando ad ogni messaggio un id univoco detto Offset. Le partizioni, oltre a garantire l’ordine dei messaggi, sono anche uno strumento di ridondanza e scalabilità, in quanto possono essere replicati all’interno di Kafka.

Infatti, l’idea di base è che un broker Kafka operi come parte di un Cluster di broker eseguiti su uno o più server. Un cluster ha il compito di ricevere i messaggi, salvarli nei topic per un retention period configurabile e assegnare gli offset all’interno delle partizioni. Le partizioni possono essere replicate all’interno del cluster e per ogni partizione ci sarà un solo broker che farà da leader, ovvero gestirà i messaggi per quella partizioni, mentre gli altri broker, su cui la partizione è replicata, agiranno da followers, salvando copie in maniera passiva rispetto al leader. Questo è molto importante, perché se il broker leader fallisce, uno dei followers può essere eletto come tale e quindi permettere al cluster di funzionare correttamente.

Come ulteriore meccanismo di fault-tolerance, uno dei broker all’interno del cluster svolgerà il ruolo di controller e si occuperà di assegnare le partizioni agli altri broker e di vigilare sullo stato di salute dei componenti del cluster.
Abbiamo detto che i messaggi sono pubblicati dai Producer. Un Producer, oltre a creare lo stream di dati, ha la facoltà di scegliere in quale partizione scrivere un determinato messaggio mediante un partitioner. La scelta di solito è basata sulla key del messaggio: se la chiave non è specificata il partitioner sceglierà la partizione secondo uno scheduling round-robin (RR) per bilanciare il carico; altrimenti, se la chiave non è nulla, il partitioner potrà instradare i messaggi verso partizioni specifiche, ad esempio, facendo l’hash della chiave in modo che i messaggi con lo stesso valore di hash siano scritti nella stessa partizione.
Lo stream di messaggi sarà consumato da quei software, i consumer, che si sottoscrivono ad un determinato topic e ne prelevano i messaggi ordinati grazie all’utilizzo dell’offset di cui abbiamo parlato. Infatti, quando un consumer inizia a consumare i messaggi di una partizione, viene creato un topic particolare chiamato _consumer_offsets, dove salva l’offset dell’ultimo messaggio consumato e mantiene quindi uno stato della sua lettura.
Anche i consumer sono pensati per operare come parte di un un insieme, chiamato Consumer Group. Un Consumer Group è formato da uno o più consumer che lavorano insieme per consumare in maniera più efficiente i messaggi all’interno di un topic dividendosi tra loro le partizioni in maniera equa. Infatti, se uno dei consumer del gruppo fallisce, la sua parte viene distribuita tra gli altri consumers. Oltre ad avere più consumer all’interno di un gruppo, si possono avere anche più gruppi che consumano messaggi di un topic. Questo è molto utile quando ci sono più applicazioni interessate ai messaggi di un determinato topic.
Il concetto di Consumer Group consente di generalizzare i vantaggi dei sistemi tradizionali di messaggistica: come in un sistema a coda è possibile dividere il processamento tra vari consumer e ciò viene fatto attraverso le varie istanze di consumer all’interno del gruppo; come in un sistema publish/subscribe i messaggi possono essere letti da più Consumer Group.
In questo modo, è possibile garantire sia la scalabilità per l’elaborazione sia l’affidabilità per la consegna a più User Group e quindi a più applicazioni interessate. Inoltre, con il concetto di partizione, Kafka garantisce l’ordine dei messaggi anche con più consumer, a differenza di un sistema più tradizionale come quello a coda, in cui i messaggi sono consegnati in maniera asincrona e quindi in presenza di più consumer non è possibile garantire l’ordinamento.
Facciamo un esempio
Introdotti tutti i concetti chiave di Kafka, vediamo un esempio di publish/subscribe realizzato in .NET Core. Per il server kafka, utilizzeremo un’ immagine istanziata su un container Docker.
Per fare in modo che un server kafka funzioni correttamente, dobbiamo innanzitutto assicurarci che sia collegato ad un’istanza Zookeeper. Zookeeper è un servizio centralizzato che gestisce le configurazioni e la sincronizzazioni in applicazioni distribuite. Kafka utilizza Zookeeper per salvare metadati relativi ai broker, ai topics e alle partizioni.
Quindi creiamo un container docker per Zookeeper lanciando il seguente comando da terminale dove andiamo a specificare la porta ed un server id:
docker run --net=kafka \
-d \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
-e ZOOKEEPER_SERVER_ID=1 \
confluentinc/cp-zookeeper:5.4.1
Successivamente, creiamo il container per il server Kafka specificando l’indirizzo di Zookeeper, un indirizzo dove raggiungere Kafka al di fuori del container e il valore di replicazione dei topic da specificare in caso di cluster single-node:
docker run --net=kafka \
-d \
-p 9092:9092 \
--name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:5.4.
Creiamo un topic con tre partizioni con il seguente comando:
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic demo-topic --partitions 3
per vedere il risultato utilizziamo invece:
docker exec -it kafka kafka-topics --describe --bootstrap-server localhost:9092

Una volta creato il topic, realizziamo una console application che svolgerà il compito di Producer. Utilizzeremo il client di Kafka per .NET ,Confluent.Kafka, messo a disposizione da Confluent.
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace Kafka.Producer
{
class Program
{
static async Task Main(string[] args)
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092", Partitioner = Partitioner.Random };
using (var producer = new ProducerBuilder<Null, string>(config).Build())
{
try
{
for (int i = 0; i < 5; i++)
{
var deliveryResult = await producer.ProduceAsync("demo-topic", new Message<Null, string> { Value = $"DemoMessage '{i}'" });
Console.WriteLine($"Published message to '{deliveryResult.TopicPartitionOffset}'");
}
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Error publishing message: {e.Error.Reason}");
}
}
}
}
}
Configuriamo il Producer indicando l’indirizzo del server Kafka ed il tipo di Partitioner (in questo caso Random attuerà uno scheduling RR ). Creiamo quindi un Producer mediante il ProducerBuilder con chiave nulla, avendo scelto un partitioner non basato su chiave.
Tramite il metodo ProduceAsync() inviamo i messaggi su “demo-topic” e stampiamo in console il risultato della scrittura, in modo da visualizzare in quale partizione e in quale offset sono stati salvati i messaggi.
Eseguendo il programma otteniamo il seguente risultato :

Con il comando seguente andiamo ad utilizzare il console consumer di Kafka, un tool che legge i messaggi all’interno del topic e li riporta sulla console:
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic demo-topic --from-beginning

Come possiamo vedere, i messaggi non sono mostrati in ordine perché, come abbiamo detto, l’ordine è a livello di partizione, non di topic. Possiamo vedere ciò con il seguente comando relativo alla partizione 2:
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic demo-topic --from-beginning --partition 2

Come possiamo vedere risultano ordinati. Creiamo adesso l’applicazione consumer:
using System;
using Confluent.Kafka;
class Program
{
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "demo-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(conf).Build())
{
consumer.Subscribe("demo-topic");
var readingMessage = true;
try
{
while (readingMessage)
{
try
{
var consumeResult = consumer.Consume(10000);
if (consumeResult == null)
{
Console.WriteLine("There are no messages in topic");
readingMessage = false;
}
else
{
Console.WriteLine($"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
}
Configuriamo il consumer indicando il Consumer Group, l’indirizzo del server Kafka e specificando quale azione intraprendere quando non c’è nessun offset iniziale. Creiamo il consumer e sottoscriviamolo al nostro topic, poi iniziamo la lettura del topic fino al timeout specificato tramite il metodo Consume. Stampiamo il risultato della lettura in console e infine con il metodo Close committiamo gli offset su Kafka, avvertendo che il consumer sta lasciando il gruppo.
Eseguendo il Consumer possiamo vedere che tutti i messaggi pubblicati sono stati consumati correttamente.

L’ecosistema Kafka
Questa è solo una panoramica sulle funzionalità core di Kafka, che negli anni è diventato un vero e proprio ecosistema. Infatti, oltre allo scambio di messaggi, tra producer e consumer esistono due ulteriori feature: Kafka Connect e Kafka Stream.

La prima è un framework per il trasferimento di dati di sistemi eterogenei attraverso Kafka che mette a disposizione delle API ben definite per implementare dei cosiddetti connectors. Si pensi alla variazione del campo di una tabella di un database notificata tramite Kafka: ciò è possibile mediante un connettore del database verso Kafka.
La seconda feature, invece, è una libreria per la creazione di applicazioni di streaming, ovvero applicazioni che trasformano topic di input di Kafka in topic di output.
Conclusioni
Sperando che questa introduzione sia stata interessante per entrare nell’ecosistema Kafka, vi lascio il link del repository dove potete trovare l’implementazione del producer e del consumer, nonché i comandi per creare i container per Kafka e Zookeeper.
Alla prossima!