Apache Kafka – Producer Java アプリケーションの作成 No.66

自己投資としてチャレンジしている内容を Blog で公開しています。

今回は Apache Kafka で Broker へデータを送信する Producer の Java アプリケーションを実装してみたいと思います。(In English, Apache Kafka – Develop Producer Java application No.66)


▼1. Apache Kafka の用語の説明

  • Event : 書き込みや参照の対象となるレコード、メッセージ
  • Topic: Event が保存され、整理される場所。ファイルシステムにあるフォルダのようなもの。クライアントは指定した topic に対してイベントを書き込み、データを参照する。
  • Partition : Topic 作成時に指定した partition の数に Topic が分けられる。分けられた Partition に Broker が割り当てられる。
  • Broker : Kafka を構成するサーバーの中で Storage の機能を持つサーバー。Broker にデータが保存され (write)、Broker のあるデータが参照される (read)。
  • Producer :  topic への Event の write (Message 送信) (Publish) を行うクライアントアプリケーション
  • Consumer : Topic にある Event の read (Message 受信) (Subscribe) を行うクライアントアプリケーション
  • Offset: 各 Partition に保存された Event には連番が付与される。Partition 単位で Event の位置を示す目的として、Offset が利用される。

(参考) Apache Kafka


▼2. 事前準備

Kafka の環境を準備します。
Apache Kafka インストール 4 ノード No.42

Maven のインストール
Apache Kafka Word Count 実装 – Java No.44
2-3. Apache Maven のインストール” を参照


▼3. Producer アプリケーションの作成

1 から 100 の文字を Event として Topic に書き込む Producer の Java アプリを作成します。

3-1. ディレクトリ作成
アプリケーションを作成するため、ディレクトリを作成し、VS Code を起動します。

mkidr kafka-simple-app
cd kafka-simple-app

3-2. Apache Maven によるプロジェクトの作成

mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-simple -DgroupId=org.example.kafka.streams -DartifactId=kafka-simple-app -Dversion=1.0-SNAPSHOT -DinteractiveMode=false

(一部の出力)
xxx
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-simple:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.example.kafka.streams
[INFO] Parameter: artifactId, Value: kafka-simple-app
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: org.example.kafka.streams
[INFO] Parameter: packageInPathFormat, Value: org/example/kafka/streams
[INFO] Parameter: package, Value: org.example.kafka.streams
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: groupId, Value: org.example.kafka.streams
[INFO] Parameter: artifactId, Value: kafka-simple-app
[INFO] Project created from Archetype in dir: /home/hadoop/kafka-simple-app
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  40.597 s
[INFO] Finished at: 2022-11-05T08:16:52+09:00
[INFO] ------------------------------------------------------------------------

3-3. デフォルトで作成された App.java および AppTest.java の削除

rm ./kafka-simple-app/src/main/java/org/example/kafka/streams/App.java
rm ./kafka-simple-app/src/test/java/org/example/kafka/streams/AppTest.java

3-4. pom.xml の追記

以下を追記し保存します。
(pom.xml の一部)

xxx  
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.13</artifactId>
      <version>3.3.1</version>
    </dependency>
  </dependencies>
xxx  

VS Code 上で保存すると、自動で dependencies を分析してくれる Dependency Analytics Extension のインストールが聞かれるのでインストールします。

InstallDevendencyAnalyticsExtension

3-5. Producer アプリケーションの作成

以下では kafkaAppProducer を作成し、1 から 100 の文字を Event として Topic に書き込む Producer を作成します。

package org.example.kafka.streams;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class kafkaAppProducer {
    private static String topicName = "test";

    public static void main(String[] args) {
        
        // setting for KafkaProducer
        Properties props = new Properties();
        String kafkabrokers =System.getenv("KAFKABROKERS");
        props.put("bootstrap.servers",kafkabrokers);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // create the object to produce and send the message.
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    }else{
                        String infmsg = String.format("we successfully sent is: partition:%d ,offset: %d" ,recordMetadata.partition(),recordMetadata.offset());
                        System.out.println(infmsg);
                    }
                }
            });
        
        // close the producer.
        producer.close();
    }   
}

3-6. Producer アプリケーションの実行

Ctrl+Shift+p Java: Export Jar をクリックし、kafka-simple-app.jar を作成。

createJarFile

以下のコマンドを実行します。1 から 100 の数字を変換した文字列が 3 つのパーティションに分散され書き込まれました。

$ java -jar ./kafka-simple-app.jar 

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
we successfully sent is: partition:2 ,offset: 49
we successfully sent is: partition:2 ,offset: 50
we successfully sent is: partition:2 ,offset: 51
we successfully sent is: partition:2 ,offset: 52
xxx
we successfully sent is: partition:2 ,offset: 69
we successfully sent is: partition:2 ,offset: 70
we successfully sent is: partition:0 ,offset: 91
we successfully sent is: partition:0 ,offset: 92
xxx
we successfully sent is: partition:0 ,offset: 134
we successfully sent is: partition:0 ,offset: 135
we successfully sent is: partition:1 ,offset: 82
we successfully sent is: partition:1 ,offset: 83
xxx
we successfully sent is: partition:1 ,offset: 111
we successfully sent is: partition:1 ,offset: 112
we successfully sent is: partition:1 ,offset: 113
we successfully sent is: partition:1 ,offset: 114

3-7. 生成したデータを参照します。

Kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test --from-beginning

▼4. 参考情報

以上です。参考になりましたら幸いです。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です