facebook

Blog

Stay updated

Let’s discover Kafka by introducing the fundamental concepts and showing an example in .NET Core
“The Metamorphosis” of communication with Kafka
Wednesday, April 22, 2020

In my previous articles, we have seen how useful it is to use communication technologies and solutions such as RabbitMQ or an ESB (Enterprise Service Bus) such as Rebus, whether they are on-premise or cloud, to decouple communication in software systems.

Not many years ago, in Linkedin they used ActiveMQ to try to solve a problem related to real-time access to monitoring and user activities traceability data, which  were also related each other. The try was not successful because with ActiveMQ they could not scale-out to manage the huge amount of traffic. Therefore, they decided to develop a messaging system that would provide both the persistence of the messages, in order to make them usable for multiple applications, and the chance to scale horizontally in order to manage a large amount of data. Finally, they decided to give it a name of a writer who sounded good, and that was how Kafka was born (Kafka: The Definitive Guide).

Fundamental concepts

It was initially created as a “distributed publish/subscribe messaging system” until it evolved into a distributed streaming platform. Basically, we will have a Publisher (Producer) who publishes messages on a Broker (Kafka Broker) and a Subscriber (Consumer) who subscribes to them.

The message is the information unit that travels in Kafka and is nothing more than an array of bytes, which, optionally, can be accompanied by a key used to manage the writing of the message. For reasons of efficiency, in Kafka it is generally preferred to transmit messages in batches.

When a broker receives a message from a producer, he categorizes it into a Topic. The topic is the actual representation of the data stream that passes from the producer to the consumer. The topics are, in turn, divided into partitions. A partition is an ordered and immutable sequence of messages that are stored one after the other, and that can only be read in the order in which they were written. In fact, the messages will not be sorted within the topic but will be sorted at the partition level, assigning each message a unique id called Offset. In order to ensure the order of the messages, the partitions are also a tool for redundancy and scalability as they can be replicated within Kafka.

In fact, the basic idea is that a Kafka broker operates as part of a Cluster of brokers running on one or more servers. A cluster is responsible for receiving messages, saving them in topics for a configurable retention period and assign the offsets within partitions. The partitions can be replicated within the cluster and for each partition there will be only one broker who will act as leader. This last will manage the messages for that partition, while the other brokers on which the partition is replicated will act as followers by saving copies passively than the leader. This is very important because if the broker leader fails, one of the followers can be elected leader and thus allow the cluster to run properly.

We said that the messages are published by the Producers. In addition to creating the data stream, a Producer can choose which partition to write a particular message through a partitioner. The choice is usually based on the message key: if the key is not specified, the partitioner will choose the partition according to a round-robin (RR) scheduling to balance the load; otherwise, if there is a key, the partitioner will be able to route messages to specific partitions, for example, by hashing the key so that messages with the same hash value are written in the same partition.

As an additional fault-tolerance mechanism, one of the brokers within the cluster will play the role of controller and will take care of assigning the partitions to the other brokers and supervising the health of the cluster components.

The stream of messages will be consumed by those softwares, the consumers, who subscribe to a specific topic and take their messages ordered thanks to the use of the offset we have talked about. In fact, when a consumer starts to consume the messages of a partition, a particular topic is created called _consumer_offsets, where he saves the offset of the last message consumed and thus maintaining a state of his reading.

Consumers are also designed to operate as part of a whole, called the Consumer Group.

A Consumer Group is made up of one or more consumers who work together to consume messages more efficiently within a topic by dividing the partitions equally. In fact, if one of the group’s consumers fails, its part is distributed among the other consumers. In addition to having multiple consumers within a group, you can also have multiple groups consuming messages from a topic. This fact is very useful when there are multiple applications interested in the messages of a certain topic. The Consumer Group concept allows you to generalize the advantages of traditional messaging systems. As in a queue system, it is possible to divide the processing among various consumers: the various consumer instances within the group do that. As in a publish/subscribe system, messages can be read by multiple Consumer Groups.

In this way, it is possible to guarantee both scalability for processing and reliability for delivery to multiple User Groups and therefore to multiple interested applications. In addition, with the concept of partition, Kafka guarantees the order of messages even with multiple consumers, unlike a more traditional system such as the queue system in which messages are delivered asynchronously and therefore in presence of multiple consumers is not possible to guarantee the order.

Let’s take an example

Introduced all the key concepts of Kafka, we see now an example of publish/subscribe developed in .NET Core. For the kafka-server, we will use an image instantiated on a Docker container.

In order to let a kafka server run properly, we must first make sure that it is connected to a Zookeeper instance. Zookeeper is a centralized service that manages configurations and synchronizations in distributed applications. Kafka uses Zookeeper to save metadata related to brokers, topics and partitions. So let’s create a container docker for Zookeeper by running the following command from the terminal where we go to specify the port and a server id:

docker run --net=kafka \
-d \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
-e ZOOKEEPER_SERVER_ID=1 \
confluentinc/cp-zookeeper:5.4.1

After that, we create the container for the Kafka server by specifying the Zookeeper address, an address where to reach Kafka outside the container and the replication value of the topics to be specified in the case of single-node clusters:

docker run --net=kafka \
-d \
-p 9092:9092 \
--name=kafka \
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka:5.4.

Let’s create a topic with three partitions with the following command:

docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic demo-topic --partitions 3

to see the result we use instead:

docker exec -it kafka kafka-topics --describe --bootstrap-server localhost:9092

Once the topic has been created, we create a console application that will perform the role of Producer. We will use the Kafka client for .NETConfluent.Kafka, made available by Confluent.

using System;
using System.Threading.Tasks;
using Confluent.Kafka;
 
namespace Kafka.Producer
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var config = new ProducerConfig { BootstrapServers = "localhost:9092", Partitioner = Partitioner.Random };
 
            using (var producer = new ProducerBuilder<Null, string>(config).Build())
            {
                try
                {
                    for (int i = 0; i < 5; i++)
                    {
                        var deliveryResult = await producer.ProduceAsync("demo-topic", new Message<Null, string> { Value = $"DemoMessage '{i}'" });
                        Console.WriteLine($"Published message to '{deliveryResult.TopicPartitionOffset}'");
                    }
                     
                }
                catch (ProduceException<Null, string> e)
                {
                    Console.WriteLine($"Error publishing message: {e.Error.Reason}");
                }
            }
        }
    }
}

We configure the Producer indicating the address of the Kafka server and the type of Partitioner (in this case Random will implement an RR scheduling). Then we create a Producer using the ProducerBuilder with null key, since we chose a non-key based partitioner. Using the ProduceAsync() method we send the messages on “demo-topic” and print the result of the writing in the console in order to display in which partition and in which offset the messages have been saved.

Running the program we get the following result:

With the following command we go to use the Kafka console consumer, a tool that reads the messages within the topic and reports them on the console:

docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic demo-topic --from-beginning

As we can see, the messages are not shown in order because, as we said, the order is at the partition level, not at the topic level. We can see this with the following command related to partition 2:

docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic demo-topic --from-beginning --partition 2

As we can see they are ordered. Let’s now create the consumer application:

using System;
using Confluent.Kafka;
 
class Program
{
    public static void Main(string[] args)
    {
        var conf = new ConsumerConfig
        {
            GroupId = "demo-consumer-group",
            BootstrapServers = "localhost:9092",
            AutoOffsetReset = AutoOffsetReset.Earliest
        };
 
        using (var consumer = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            consumer.Subscribe("demo-topic");
 
            var readingMessage = true;
            try
            {
                while (readingMessage)
                {
                    try
                    {
                        var consumeResult = consumer.Consume(10000);
                        if (consumeResult == null)
                        {
                            Console.WriteLine("There are no messages in topic");
                            readingMessage = false;
                        }
                        else
                        {
                            Console.WriteLine($"Consumed message '{consumeResult.Message.Value}' at: '{consumeResult.TopicPartitionOffset}'.");
                        }
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
 
                consumer.Close();
            }
        }
    }
}

We configure the consumer by indicating the Consumer Group, the address of the Kafka server and specifying what action to take when there is no initial offset. We create the consumer and subscribe it to our topic, then we start reading the topic until the timeout specified through the Consume method. We print the result of the reading in the console and finally with the Close method we commit the offsets on Kafka, alerting that the consumer is leaving the group.

By running the Consumer, we can see that all published messages have been consumed correctly.

The Kafka ecosystem

This is just an overview of the core features of Kafka, which has become a real ecosystem over the years. In fact, in addition to the exchange of messages between producer and consumer, there are two further features: Kafka Connect and Kafka Stream.

The first is a framework for the data transfer of heterogeneous systems through Kafka which provides well-defined APIs to implement so-called connectors. Think about the variation of the field of a table of a database notified through Kafka: this is possible through a database connector to Kafka.

Instead, the second feature is a library for creating streaming applications, that is applications that transform Kafka input topics into output topics.

Conclusions

Hoping that this introduction has been interesting to enter the Kafka ecosystem, I leave you the link of the repository where you can find the producer and consumer implementation, as well as the commands to create the containers for Kafka and Zookeeper.

See you next time!