How to count the words in topic by Java is shown.
- ▼1. Steps of counting words in a topic by Java
- ▼2. Prerequisites
- ▼3. Counting words in a topic by Java
- 3-1. Creating a project of IntelliJ IDEA
- 3-2. Confirming the version 1.8 of JDK is used
- 3-3. Putting the following information while creating a project
- 3-5. Creating log4j.properties file in src folder under main -> resources and then adding below.
- 3-6. Creating WordCountStream.java in src folder under main -> java
- 3-7. Writing below in WordCountStream .java
- 3-8. Adding parameters
- 3-9. Building Project
- 3-10. Starting Kafka consumer to show the results of counting words
- 3-11. Starting Kafka Producer
- 3-12. Running WordCountStream on IntelliJ IDEA
- 3-13. Putting the following words in Kafka Producer of 3-10
- 3-14. Confirming the output of this code
- ▼4. Reference
▼1. Steps of counting words in a topic by Java
- Retrieve data in the topic for input
- Convert characters into lowercase and use space to separate them
- Group by the word
- Count the number of word
- Write the results into the topic for output
▼2. Prerequisites
2-1. Installing an Apache Kafka cluster
Install Apache Kafka 4 nodes No.42
2-2. Installing IntelliJ IDEA
Install IntelliJ IDEA on Ubuntu No.43
2-3. Installing Apache Maven
(e.g)
wget https://downloads.apache.org/maven/maven-3/3.8.3/binaries/apache-maven-3.8.3-bin.tar.gz -P /tmp/
sudo tar xzvf /tmp/apache-maven-*.tar.gz -C /opt/
/opt/apache-maven-3.8.3/bin/mvn -version
Apache Maven 3.8.3 (xxxx)
Maven home: /opt/apache-maven-3.8.3
Java version: 1.8.0_312, vendor: Azul Systems, Inc., runtime: /usr/lib/jvm/zulu-8-azure-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "5.11.0-40-generic", arch: "amd64", family: "unix"
#Set the directory path of /bin/ in Apache-maven-3.8.3 to environment variable
例)
PATH=$PATH:/opt/apache-maven-3.8.3/bin/
2-4. Setting parameters of $KAFKABROKERS and $KAFKAZOOKEEPER
(e.g)
# Set $KAFKABROKERS
export KAFKABROKERS=10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092
echo $KAFKABROKERS
10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092
# Set $KAFKAZOOKEEPER
export KAFKAZOOKEEPER=10.0.0.1:2181
echo $KAFKAZOOKEEPER
10.0.0.1:2181
2-5. Creating the topic for both input and output
(e.g)
# Creating test-input4 for input
kafka-topics.sh --create --replication-factor 3 --partitions 3 --topic test-input4 --bootstarp-server $KAFKABROKERS
Created topic test
# Creating test-output4 for output
kafka-topics.sh --create --replication-factor 3 --partitions 3 --topic test-output4 --bootstarp-server $KAFKABROKERS
Created topic test
▼3. Counting words in a topic by Java
3-1. Creating a project of IntelliJ IDEA
Creating a new project
3-2. Confirming the version 1.8 of JDK is used
3-3. Putting the following information while creating a project
(e.g)
GroupId org.example.kafka.streams
ArtifactId: kafka-stream-project
Version: 1.0-SNAPSHOT
3-4. Updating pom.xml as below
Ref: kafka-streams:Maven Repository: org.apache.kafka » kafka-streams » 3.0.0 (mvnrepository.com)
slf4j: Maven Repository: org.slf4j » slf4j-api » 1.7.32 (mvnrepository.com)
slf4j log4j binding: Maven Repository: org.slf4j » slf4j-log4j12 » 1.7.32 (mvnrepository.com)
org.apache.maven.plugins : Maven Repository: org.apache.maven.plugins » maven-compiler-plugin (mvnrepository.com)
(e.g) a part of pom.xml
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<kafka.version>3.0.0</kafka.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.32</version>
<!--scope>test</scope-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
3-5. Creating log4j.properties file in src folder under main -> resources and then adding below.
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
3-6. Creating WordCountStream.java in src folder under main -> java
3-7. Writing below in WordCountStream .java
(e.g)
package org.example.kafka.streams;
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
public class WordCountStream {
public static void main(String[] args) {
Properties kafkaStreamParameters = new Properties();
kafkaStreamParameters.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wordCount4");
kafkaStreamParameters.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
kafkaStreamParameters.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
kafkaStreamParameters.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// 1. Defining the source for the stream from Kafka
KStream<String, String> topicRecords = builder.stream("test-input4");
KTable<String, Long> wordCounts = topicRecords
.mapValues(topicRecord -> topicRecord.toLowerCase())
// 2. Split each text line, by whitespace, into words. The text lines are the message
.flatMapValues(lowercaseTopicRecord -> Arrays.asList(lowercaseTopicRecord.split(" ")))
// 3. Group by key before aggregation
.groupBy((key,word) -> word)
// 4. Count
.count();
// 5. to in order to write the results back to kafka
wordCounts.toStream().to("test-output4", Produced.with(Serdes.String(),Serdes.Long()));
// Kafka stream is started.
KafkaStreams streams = new KafkaStreams(builder.build(),kafkaStreamParameters);
streams.start();
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
3-8. Adding parameters
Pressing Alt+Shift+F10 and pressing 0 . after that, adding the following brokers IP Adress:port number in Program argument of Run/Defub congfigurations
(e.g) 10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092
3-9. Building Project
3-10. Starting Kafka consumer to show the results of counting words
(e.g)
kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test-output4 --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
3-11. Starting Kafka Producer
kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test-input4
3-12. Running WordCountStream on IntelliJ IDEA
3-13. Putting the following words in Kafka Producer of 3-10
>KAFKA Kafka Kafka KAFKa kafkA Java Java JAVa JAVA Test TEST ABC

3-14. Confirming the output of this code

▼4. Reference
1 Download IntelliJ IDEA: The Capable & Ergonomic Java IDE by JetBrains
2 Download Azul Zulu Builds of OpenJDK | Azul
3 Java – Visual Studio Code on Ubuntu No.24
4 Install Apache Kafka 4 nodes No.42
5 https://kafka.apache.org/documentation/streams/
That’s all. Have a nice day ahead !!!