Apache Kafka Word Count – Java No.44

How to count the words in topic by Java is shown.

▼1. Steps of counting words in a topic by Java

  1. Retrieve data in the topic for input
  2. Convert characters into lowercase and use space to separate them
  3. Group by the word
  4. Count the number of word
  5. Write the results into the topic for output

▼2. Prerequisites

2-1. Installing an Apache Kafka cluster

Install Apache Kafka 4 nodes No.42

2-2. Installing IntelliJ IDEA

Install IntelliJ IDEA on Ubuntu No.43

2-3. Installing Apache Maven

Maven – Download Apache Maven

(e.g)
wget https://downloads.apache.org/maven/maven-3/3.8.3/binaries/apache-maven-3.8.3-bin.tar.gz  -P /tmp/
sudo tar xzvf /tmp/apache-maven-*.tar.gz -C /opt/
/opt/apache-maven-3.8.3/bin/mvn -version

	Apache Maven 3.8.3 (xxxx)
	Maven home: /opt/apache-maven-3.8.3
	Java version: 1.8.0_312, vendor: Azul Systems, Inc., runtime: /usr/lib/jvm/zulu-8-azure-amd64/jre
	Default locale: en_US, platform encoding: UTF-8
	OS name: "linux", version: "5.11.0-40-generic", arch: "amd64", family: "unix"

#Set the directory path of /bin/ in Apache-maven-3.8.3 to environment variable
例)
PATH=$PATH:/opt/apache-maven-3.8.3/bin/

2-4. Setting parameters of $KAFKABROKERS and $KAFKAZOOKEEPER

(e.g)
# Set $KAFKABROKERS
export KAFKABROKERS=10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092
echo $KAFKABROKERS
10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092

# Set $KAFKAZOOKEEPER
export KAFKAZOOKEEPER=10.0.0.1:2181
echo $KAFKAZOOKEEPER
10.0.0.1:2181

2-5. Creating the topic for both input and output

(e.g)
# Creating test-input4 for input
kafka-topics.sh --create --replication-factor 3 --partitions 3 --topic test-input4 --bootstarp-server $KAFKABROKERS
Created topic test

# Creating test-output4 for output
kafka-topics.sh --create --replication-factor 3 --partitions 3 --topic test-output4 --bootstarp-server $KAFKABROKERS
Created topic test

▼3. Counting words in a topic by Java

3-1. Creating a project of IntelliJ IDEA

Creating a new project

3-2. Confirming the version 1.8 of JDK is used

3-3. Putting the following information while creating a project

(e.g) 
GroupId org.example.kafka.streams
ArtifactId: kafka-stream-project
Version: 1.0-SNAPSHOT

3-4. Updating pom.xml as below
Ref: kafka-streams:Maven Repository: org.apache.kafka » kafka-streams » 3.0.0 (mvnrepository.com)
slf4j: Maven Repository: org.slf4j » slf4j-api » 1.7.32 (mvnrepository.com)
slf4j log4j binding: Maven Repository: org.slf4j » slf4j-log4j12 » 1.7.32 (mvnrepository.com)
org.apache.maven.plugins : Maven Repository: org.apache.maven.plugins » maven-compiler-plugin (mvnrepository.com)

(e.g) a part of pom.xml

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <kafka.version>3.0.0</kafka.version>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.32</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.32</version>
            <!--scope>test</scope-->
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

3-5. Creating log4j.properties file in src folder under main -> resources and then adding below.

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

3-6. Creating WordCountStream.java in src folder under main -> java

3-7. Writing below in WordCountStream .java

(e.g)

package org.example.kafka.streams;

import java.util.Properties;
import java.util.Arrays;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

public class WordCountStream {
    public static void main(String[] args) {
        Properties kafkaStreamParameters = new Properties();
        kafkaStreamParameters.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wordCount4");
        kafkaStreamParameters.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        kafkaStreamParameters.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        kafkaStreamParameters.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        // 1. Defining the source for the stream from Kafka
        KStream<String, String> topicRecords = builder.stream("test-input4");

        KTable<String, Long> wordCounts = topicRecords
                .mapValues(topicRecord -> topicRecord.toLowerCase())
                // 2. Split each text line, by whitespace, into words. The text lines are the message
                .flatMapValues(lowercaseTopicRecord -> Arrays.asList(lowercaseTopicRecord.split(" ")))
                // 3. Group by key before aggregation
                .groupBy((key,word) -> word)
                // 4. Count
                .count();

        // 5. to in order to write the results back to kafka
        wordCounts.toStream().to("test-output4", Produced.with(Serdes.String(),Serdes.Long()));

    	// Kafka stream is started.
        KafkaStreams streams = new KafkaStreams(builder.build(),kafkaStreamParameters);
        streams.start();

        // shutdown hook to correctly close the streams application
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

3-8. Adding parameters

Pressing Alt+Shift+F10 and pressing 0 . after that, adding the following brokers IP Adress:port number in Program argument of Run/Defub congfigurations

(e.g) 10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092

3-9. Building Project

3-10. Starting Kafka consumer to show the results of counting words

(e.g)
kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test-output4 --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

3-11. Starting Kafka Producer

kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test-input4

3-12. Running WordCountStream on IntelliJ IDEA

3-13. Putting the following words in Kafka Producer of 3-10

>KAFKA Kafka Kafka KAFKa kafkA Java Java JAVa JAVA Test TEST ABC

3-14. Confirming the output of this code


▼4. Reference

1 Download IntelliJ IDEA: The Capable & Ergonomic Java IDE by JetBrains
2 Download Azul Zulu Builds of OpenJDK | Azul
3 Java – Visual Studio Code on Ubuntu No.24
4 Install Apache Kafka 4 nodes No.42
5 https://kafka.apache.org/documentation/streams/

That’s all. Have a nice day ahead !!!

Leave a Reply

Your email address will not be published. Required fields are marked *