自己投資の一つとしてチャレンジしている Java や Python などの独習状況を Blog で公開しています。
今回は Apache Kafka の環境で文字をカウントする Java コードを紹介したいと思います。(In English: Apache Kafka Word Count – Java No.44)
————————————
▼1. Apache Kafka の環境で文字をカウントする方法
————————————
Apache Kafka の環境で文字をカウントするために、以下の Step を実装します。
- 入力用の Topic を参照
- 文字を小文字に変換し、スペースで分割
- 文字でグループ分け
- 文字のカウント
- 出力用の topic に結果を出力
————————————
▼2. 準備
————————————
2-1. Apache Kafka の環境を用意します。
Apache Kafka インストール 4 ノード No.42
2-2. IntelliJ IDEA のインストールします。
IntelliJ IDEA のインストール on Ubuntu No.43
2-3. Apache Maven のインストール
Maven – Download Apache Maven
例)
wget https://downloads.apache.org/maven/maven-3/3.8.3/binaries/apache-maven-3.8.3-bin.tar.gz -P /tmp/
sudo tar xzvf /tmp/apache-maven-*.tar.gz -C /opt/
/opt/apache-maven-3.8.3/bin/mvn -version
Apache Maven 3.8.3 (xxxx)
Maven home: /opt/apache-maven-3.8.3
Java version: 1.8.0_312, vendor: Azul Systems, Inc., runtime: /usr/lib/jvm/zulu-8-azure-amd64/jre
Default locale: en_US, platform encoding: UTF-8
OS name: "linux", version: "5.11.0-40-generic", arch: "amd64", family: "unix"
#Apache-maven-3.8.3 の bin のディレクトリを環境変数に登録します。
例)
PATH=$PATH:/opt/apache-maven-3.8.3/bin/2-4. 変数の指定
(例)
# KAFKA Broker の host 情報を持つ環境変数を作成
export KAFKABROKERS=10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092
echo $KAFKABROKERS
10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092
# KAFKA Zookeeper の host 情報を持つ環境変数を作成
export KAFKAZOOKEEPER=10.0.0.1:2181
echo $KAFKAZOOKEEPER
10.0.0.1:2181
2-5. 入力および出力用の topic の作成
(例)
# test-input4 の topic を作成します。
kafka-topics.sh --create --replication-factor 3 --partitions 3 --topic test-input4 --bootstarp-server $KAFKABROKERS
Created topic test
# test-output4 の topic を作成します。
kafka-topics.sh --create --replication-factor 3 --partitions 3 --topic test-output4 --bootstarp-server $KAFKABROKERS
Created topic test————————————
▼3. Kafka で文字をカウントする Java コード
————————————
3-1. IntelliJ IDEA でプロジェクトを作成。
New Project をクリックし、新しいプロジェクトを作成します。
3–2. 左に表示されている Maven を選択し、Project SDK はインストールした 1.8 version 1.8.0_312 が選択されている事を確認します。
3-3. Next をクリックし、Artifact Cordinates に以下の項目を記載します。Finish をクリックします。New Window を選びます。
(例)
GroupId org.example.kafka.streams
ArtifactId: kafka-stream-project
Version: 1.0-SNAPSHOT3-4. 左に表示された src のフォルダ配下にある pom.xml をクリックして開き、以下の dependencies や plugin を追加します。Log4j のログも標準出力してみます。
(参考情報)
kafka-streams:Maven Repository: org.apache.kafka » kafka-streams » 3.0.0 (mvnrepository.com)
slf4j: Maven Repository: org.slf4j » slf4j-api » 1.7.32 (mvnrepository.com)
slf4j log4j binding: Maven Repository: org.slf4j » slf4j-log4j12 » 1.7.32 (mvnrepository.com)
org.apache.maven.plugins : Maven Repository: org.apache.maven.plugins » maven-compiler-plugin (mvnrepository.com)
(例) pom.xml の一部
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<kafka.version>3.0.0</kafka.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.32</version>
<!--scope>test</scope-->
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
3–5. 左に表示された src のフォルダ配下にある main -> resources を右クリックし New -> File を選択し、log4j.properties のファイル名のファイルを作成し、以下を追記します。
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n3–6. 左に表示された src のフォルダ配下にある main -> java を右クリックし、New Java Class 任意のファイル名 (例) WordCountStream を入力し 、Class をダブルクリックします。
3-7. 表示された WordCountStream .java に以下のコードを記載します。
上記 1 で記載した step を実装しています。
(例)
package org.example.kafka.streams;
import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;
public class WordCountStream {
public static void main(String[] args) {
Properties kafkaStreamParameters = new Properties();
kafkaStreamParameters.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wordCount4");
kafkaStreamParameters.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
kafkaStreamParameters.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
kafkaStreamParameters.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
// 1. Defining the source for the stream from Kafka
KStream<String, String> topicRecords = builder.stream("test-input4");
KTable<String, Long> wordCounts = topicRecords
.mapValues(topicRecord -> topicRecord.toLowerCase())
// 2. Split each text line, by whitespace, into words. The text lines are the message
.flatMapValues(lowercaseTopicRecord -> Arrays.asList(lowercaseTopicRecord.split(" ")))
// 3. Group by key before aggregation
.groupBy((key,word) -> word)
// 4. Count
.count();
// 5. to in order to write the results back to kafka
wordCounts.toStream().to("test-output4", Produced.with(Serdes.String(),Serdes.Long()));
// Kafka stream is started.
KafkaStreams streams = new KafkaStreams(builder.build(),kafkaStreamParameters);
streams.start();
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
}
3-8. 引数を指定します。
Alt+Shift+F10 を押してから、0 を押し、表示された Run/Defub congfigurations の Program argument にて、コードの引数として Broker の IP アドレス:port を指定します。
例) 10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:90923–9. Build のメニューのプルダウンメニューにある “Build Project” をクリックし、Build します。
3-10. 文字カウント結果を表示させるため、Kafka consumer を起動。
(例)
kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test-output4 --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer3-11. Kafka Producer の起動。
kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test-input43-12. IntelliJ IDEA で、Run のメニューのプルダウンメニューにある “Run” をクリックします。ポップアップされた WordCountStream をクリックします。
3-13. 3-10 で Kafka Producer にて、以下の文字を入力します
>KAFKA Kafka Kafka KAFKa kafkA Java Java JAVa JAVA Test TEST ABC
3-14. 以下のように文字のカウントができる事を確認します。

————————————
▼4. 参考情報
————————————
(1) Download IntelliJ IDEA: The Capable & Ergonomic Java IDE by JetBrains
(2) Download Azul Zulu Builds of OpenJDK | Azul
(3) Java – Visual Studio Code の利用 No.24
(4) Apache Kafka インストール 4 ノード No.42
(5) https://kafka.apache.org/documentation/streams/
以上です。参考になりましたら幸いです。