Apache Kafka Tutorial For Beginners

There are plenty of Apache Kafka Tutorials on the internet. Why another one?

While other tutorials are valuable, I see that improvement can be made. As a visual learner (yes, it’s a thing), I prefer tutorials with many videos and images.

Thus I created this one to help you quickly get started with Apache Kafa, see how it works, and start producing and consuming messages.

This post is the first part of an Apache Kafka tutorial series. As the first, it covers only the basic concepts. In-depth tutorials are reserved for later posts.

Code Repository

All the code in this tutorial is available at this GitHub repository: https://github.com/datmt/Kafka-Tutorials

Set up Apache Kafka using Docker

Previously, installing Kafka on VM is a serious task. Thanks to docker and docker-compose, setting up a cluster of Kafka brokers has never been easier.

The docker-compose.yaml file is under resources/deployment of the git repository above. However, here is the content of the docker-compose file:

version: '3'

services:
  zoo1:
    image: confluentinc/cp-zookeeper:7.2.1
    hostname: zoo1
    container_name: zoo1
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888

  zoo2:
    image: confluentinc/cp-zookeeper:7.2.1
    hostname: zoo2
    container_name: zoo2
    ports:
      - "2182:2182"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2182
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888

  zoo3:
    image: confluentinc/cp-zookeeper:7.2.1
    hostname: zoo3
    container_name: zoo3
    ports:
      - "2183:2183"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2183
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_SERVERS: zoo1:2888:3888;zoo2:2888:3888;zoo3:2888:3888


  kafka1:
    image: confluentinc/cp-kafka:7.2.1
    hostname: kafka1
    container_name: kafka1
    ports:
      - "9092:9092"
      - "29092:29092"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 1
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka2:
    image: confluentinc/cp-kafka:7.2.1
    hostname: kafka2
    container_name: kafka2
    ports:
      - "9093:9093"
      - "29093:29093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka2:19093,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9093,DOCKER://host.docker.internal:29093
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 2
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka3:
    image: confluentinc/cp-kafka:7.2.1
    hostname: kafka3
    container_name: kafka3
    ports:
      - "9094:9094"
      - "29094:29094"
    environment:
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka3:19094,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9094,DOCKER://host.docker.internal:29094
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181,zoo2:2182,zoo3:2183"
      KAFKA_BROKER_ID: 3
      KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
      KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer
      KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true"
    depends_on:
      - zoo1
      - zoo2
      - zoo3

  kafka-schema-registry:
    image: confluentinc/cp-schema-registry:7.2.1
    hostname: kafka-schema-registry
    container_name: kafka-schema-registry
    depends_on:
      - zoo1
      - zoo2
      - zoo3
      - kafka1
      - kafka2
      - kafka3
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: kafka-schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka1:19092,PLAINTEXT://kafka2:19093,PLAINTEXT://kafka3:19094'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081


  kafka-ui:
    image: provectuslabs/kafka-ui:052d8af45632265cc88192663b14fec5aff713f5
    container_name: kafka-ui
    depends_on:
      - kafka1
    ports:
      - "8080:8080"
    env_file:
      - ./env_file

Here is the environment file for Kafka-ui

KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:29092
KAFKA_CLUSTERS_0_NAME=local_kafka

What does kafka-ui do? It provides a very nice interface for you to view the current status of the cluster. You don’t need to type queries in a shell console to see the current cluster’s status. Isn’t it cool?

Let’s talk a bit about the docker-compose.yaml file. From the beginning to line 100, it setups the cluster with 3 Kafka brokers and 3 zookeeper instances. I’m going to discuss more about these things later.

The schema registry is a standalone server outside of the Kafka cluster. It maintains the schemas have been written to the cluster’s topics. More on schema registry here: https://developer.confluent.io/learn-kafka/apache-kafka/schema-registry/

Let’s create the cluster by using docker-compose:

docker-compose up -d 

Wait for a while for all the containers to start. Let’s check if we have all containers up and running correctly:

docker ps --format "table {{.ID}}\t{{.Image}}\t{{.Ports}}"

The extra format is to show only columns I need (to avoid text span into multiple lines)

As you can see, all the containers are up.

Accessing Kafka UI

Now I have the Kafka GUI interface available (the first container exposed at port 8080 in the screenshot above. I can access it at http://localhost:8080

Access Kafka Management UI
Access Kafka Management UI

As you can see, Kafka UI has successfully connected to the cluster and displayed the number of brokers correctly.

Let’s start writing some code.

Create a Producer in Kafka

I’m going to use Java in this series. However, with the cluster up and running, you can use any programming language to connect to it.

public class StringProducer {
    public static void main(String[] args) {
       sendMessage(); 
    }


    private static void sendMessage() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("schema.registry.url", "http://localhost:8081");

        try (Producer<String, String> producer = new KafkaProducer<>(properties)) {
            for (int i =0; i < 4; i++) {
                ProducerRecord<String, String> producerRecord = 
                  new ProducerRecord<>(SIMPLE_STRING_TOPIC, "Message key: " + i, "Hello simple message" );
                producer.send(producerRecord);
            }
        }

    }

Let’s have a look at the most important points in the code above.

  • In line 9, I specified the bootstrap servers. Though I started three brokers, I only need to specify one here. The client can discover the rest once it connects successfully to a broker.
  • Line 10 and line 11 specify the serializer for the message’s key and value. Since I send only strings, I use StringSerializer
  • Line 12 registers the schema registry (mentioned above)
  • From line 14 to line 19, I send a message 4 times

Let’s run the code and check if I can see the messages in Kafka UI:

Send message in Apache Kafka
Send message in Apache Kafka

As you can see, a topic with the name string_message_topic was created and there are 4 messages. That means I have successfully executed the code and 4 messages was created in the cluster.

If I click on the topic and switch to the Messages tab, I can see the messages here:

Viewing Kafka messages
Viewing Kafka messages

Sending messages to the Kafka cluster is simple, isn’t it?

Consuming messages in Apache Kafka

Consuming messages in Kafka is as simple as producing them.

Let’s create a consumer to consume the messages:

    public static void main(String[] args) {
        consumeMessage();
    }

    private static void consumeMessage() {

        Properties properties = new Properties();

        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("group.id", "kafka.simple_string_consumer");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("schema.registry.url", "http://localhost:8081");

        try (KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties)) {
            consumer.subscribe(List.of(Constants.SIMPLE_STRING_TOPIC));
            while (true) {
                ConsumerRecords<String, String> messages = consumer.poll(Duration.ofSeconds(3));

                for (var message: messages) {
                    System.out.printf("Consuming message with key: %s and value: %s \n",message.key(),  message.value());
                }
            }
        }

    }

This code looks quite similar to that of I did with the producer. Let’s focus on the consuming message mechanism:

  • On line 10, I specified a group id. This is important since consumers in Kafka must belong to a group. They don’t exist on their own. Even if the group has only one consumer, you still need to create a group and specify the group id for the consumer
  • On line 15, I created a consumer with the properties
  • On line 16, the consumer subscribes to a topic
  • On line 18, the consumer polled the brokers to get the messages. Note that the timeout is the maximum time the consumer will wait for the messages. If messages are available, the method will return immediately.
  • Line 20 to Line 22 display the messages if available.

Now let’s run the consumer and see if we can get any messages:

Consumer waiting for messages

You may wonder, why isn’t any message. I sent 4 messages before. By default, Kafka consumers only consume messages after the point it starts. If you want to consume previous messages, there are settings to be set.

To keep thing simple, let’s go back to the producer and run it again to produce new messages. When the producer runs, the messages appeared in the consumer console as expected:

Consuming messages in Apache Kafka
Consuming messages in Apache Kafka

If you go back to Kafa UI, you can see the are new messages available:

New messages in Kafka UI
New messages in Kafka UI

Conclusion

In this post, I’ve discussed how to set up Kafka clusters, Kafka UI and how to send/consume messages. The code for this post is available here:

Leave a Comment