Apache Kafka Word Count 実装 – Java No.44

自己投資の一つとしてチャレンジしている Java や Python などの独習状況を Blog で公開しています。

今回は Apache Kafka の環境で文字をカウントする Java コードを紹介したいと思います。(In English: Apache Kafka Word Count – Java No.44)

————————————
▼1. Apache Kafka の環境で文字をカウントする方法
————————————
Apache Kafka の環境で文字をカウントするために、以下の Step を実装します。

  1. 入力用の Topic を参照
  2. 文字を小文字に変換し、スペースで分割
  3. 文字でグループ分け
  4. 文字のカウント
  5. 出力用の topic に結果を出力

————————————
▼2. 準備
————————————
2-1. Apache Kafka の環境を用意します。
Apache Kafka インストール 4 ノード No.42

2-2. IntelliJ IDEA のインストールします。
IntelliJ IDEA のインストール on Ubuntu No.43

2-3. Apache Maven のインストール
Maven – Download Apache Maven

例)
wget https://downloads.apache.org/maven/maven-3/3.8.3/binaries/apache-maven-3.8.3-bin.tar.gz  -P /tmp/
sudo tar xzvf /tmp/apache-maven-*.tar.gz -C /opt/
/opt/apache-maven-3.8.3/bin/mvn -version

	Apache Maven 3.8.3 (xxxx)
	Maven home: /opt/apache-maven-3.8.3
	Java version: 1.8.0_312, vendor: Azul Systems, Inc., runtime: /usr/lib/jvm/zulu-8-azure-amd64/jre
	Default locale: en_US, platform encoding: UTF-8
	OS name: "linux", version: "5.11.0-40-generic", arch: "amd64", family: "unix"

#Apache-maven-3.8.3 の bin のディレクトリを環境変数に登録します。
例)
PATH=$PATH:/opt/apache-maven-3.8.3/bin/

2-4. 変数の指定

(例)
# KAFKA Broker の host 情報を持つ環境変数を作成
export KAFKABROKERS=10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092
echo $KAFKABROKERS
10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092

# KAFKA Zookeeper の host 情報を持つ環境変数を作成
export KAFKAZOOKEEPER=10.0.0.1:2181
echo $KAFKAZOOKEEPER
10.0.0.1:2181

2-5. 入力および出力用の topic の作成

(例)
# test-input4 の topic を作成します。
kafka-topics.sh --create --replication-factor 3 --partitions 3 --topic test-input4 --bootstarp-server $KAFKABROKERS
Created topic test

# test-output4 の topic を作成します。
kafka-topics.sh --create --replication-factor 3 --partitions 3 --topic test-output4 --bootstarp-server $KAFKABROKERS
Created topic test

————————————
▼3. Kafka で文字をカウントする Java コード
————————————
3-1. IntelliJ IDEA でプロジェクトを作成。
New Project をクリックし、新しいプロジェクトを作成します。

32. 左に表示されている Maven を選択し、Project SDK はインストールした 1.8 version 1.8.0_312 が選択されている事を確認します。

3-3. Next をクリックし、Artifact Cordinates に以下の項目を記載します。Finish をクリックします。New Window を選びます。

(例) 
GroupId org.example.kafka.streams
ArtifactId: kafka-stream-project
Version: 1.0-SNAPSHOT

3-4. 左に表示された src のフォルダ配下にある pom.xml をクリックして開き、以下の dependencies や plugin を追加します。Log4j のログも標準出力してみます。
(参考情報)
kafka-streams:Maven Repository: org.apache.kafka » kafka-streams » 3.0.0 (mvnrepository.com)
slf4j: Maven Repository: org.slf4j » slf4j-api » 1.7.32 (mvnrepository.com)
slf4j log4j binding: Maven Repository: org.slf4j » slf4j-log4j12 » 1.7.32 (mvnrepository.com)
org.apache.maven.plugins : Maven Repository: org.apache.maven.plugins » maven-compiler-plugin (mvnrepository.com)

(例) pom.xml の一部

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <kafka.version>3.0.0</kafka.version>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.32</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.32</version>
            <!--scope>test</scope-->
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

35. 左に表示された src のフォルダ配下にある main -> resources を右クリックし New -> File を選択し、log4j.properties のファイル名のファイルを作成し、以下を追記します。

log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n

36. 左に表示された src のフォルダ配下にある main -> java を右クリックし、New Java Class 任意のファイル名 (例) WordCountStream を入力し 、Class をダブルクリックします。

3-7. 表示された WordCountStream .java に以下のコードを記載します。
上記 1 で記載した step を実装しています。

(例)

package org.example.kafka.streams;

import java.util.Properties;
import java.util.Arrays;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Produced;

public class WordCountStream {
    public static void main(String[] args) {
        Properties kafkaStreamParameters = new Properties();
        kafkaStreamParameters.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-stream-wordCount4");
        kafkaStreamParameters.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, args[0]);
        kafkaStreamParameters.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        kafkaStreamParameters.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();
        // 1. Defining the source for the stream from Kafka
        KStream<String, String> topicRecords = builder.stream("test-input4");

        KTable<String, Long> wordCounts = topicRecords
                .mapValues(topicRecord -> topicRecord.toLowerCase())
                // 2. Split each text line, by whitespace, into words. The text lines are the message
                .flatMapValues(lowercaseTopicRecord -> Arrays.asList(lowercaseTopicRecord.split(" ")))
                // 3. Group by key before aggregation
                .groupBy((key,word) -> word)
                // 4. Count
                .count();

        // 5. to in order to write the results back to kafka
        wordCounts.toStream().to("test-output4", Produced.with(Serdes.String(),Serdes.Long()));

    	// Kafka stream is started.
        KafkaStreams streams = new KafkaStreams(builder.build(),kafkaStreamParameters);
        streams.start();

        // shutdown hook to correctly close the streams application
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }
}

3-8. 引数を指定します。
Alt+Shift+F10 を押してから、0 を押し、表示された Run/Defub congfigurations の Program argument にて、コードの引数として Broker の IP アドレス:port を指定します。

例) 10.0.0.2:9092,10.0.0.3:9092,10.0.0.4:9092

39. Build のメニューのプルダウンメニューにある “Build Project” をクリックし、Build します。

3-10. 文字カウント結果を表示させるため、Kafka consumer を起動。

(例)
kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test-output4 --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

3-11. Kafka Producer の起動。

kafka-console-producer.sh --broker-list $KAFKABROKERS --topic test-input4

3-12. IntelliJ IDEA で、Run のメニューのプルダウンメニューにある “Run” をクリックします。ポップアップされた WordCountStream をクリックします。

3-13. 3-10 で Kafka Producer にて、以下の文字を入力します

>KAFKA Kafka Kafka KAFKa kafkA Java Java JAVa JAVA Test TEST ABC

3-14. 以下のように文字のカウントができる事を確認します。

————————————
▼4. 参考情報
————————————
(1) Download IntelliJ IDEA: The Capable & Ergonomic Java IDE by JetBrains
(2) Download Azul Zulu Builds of OpenJDK | Azul
(3) Java – Visual Studio Code の利用 No.24
(4) Apache Kafka インストール 4 ノード No.42
(5) https://kafka.apache.org/documentation/streams/

以上です。参考になりましたら幸いです。



コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です