自己投資としてチャレンジしている内容を Blog で公開しています。
今回は Apache Kafka で Broker へデータを送信する Producer の Java アプリケーションを実装してみたいと思います。(In English, Apache Kafka – Develop Producer Java application No.66)
Summary
▼1. Apache Kafka の用語の説明
- Event : 書き込みや参照の対象となるレコード、メッセージ
- Topic: Event が保存され、整理される場所。ファイルシステムにあるフォルダのようなもの。クライアントは指定した topic に対してイベントを書き込み、データを参照する。
- Partition : Topic 作成時に指定した partition の数に Topic が分けられる。分けられた Partition に Broker が割り当てられる。
- Broker : Kafka を構成するサーバーの中で Storage の機能を持つサーバー。Broker にデータが保存され (write)、Broker のあるデータが参照される (read)。
- Producer : topic への Event の write (Message 送信) (Publish) を行うクライアントアプリケーション
- Consumer : Topic にある Event の read (Message 受信) (Subscribe) を行うクライアントアプリケーション
- Offset: 各 Partition に保存された Event には連番が付与される。Partition 単位で Event の位置を示す目的として、Offset が利用される。
(参考) Apache Kafka
▼2. 事前準備
Kafka の環境を準備します。
Apache Kafka インストール 4 ノード No.42
Maven のインストール
Apache Kafka Word Count 実装 – Java No.44
“2-3. Apache Maven のインストール” を参照
▼3. Producer アプリケーションの作成
1 から 100 の文字を Event として Topic に書き込む Producer の Java アプリを作成します。
3-1. ディレクトリ作成
アプリケーションを作成するため、ディレクトリを作成し、VS Code を起動します。
mkidr kafka-simple-app
cd kafka-simple-app3-2. Apache Maven によるプロジェクトの作成
mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-simple -DgroupId=org.example.kafka.streams -DartifactId=kafka-simple-app -Dversion=1.0-SNAPSHOT -DinteractiveMode=false
(一部の出力)
xxx
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-simple:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.example.kafka.streams
[INFO] Parameter: artifactId, Value: kafka-simple-app
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: org.example.kafka.streams
[INFO] Parameter: packageInPathFormat, Value: org/example/kafka/streams
[INFO] Parameter: package, Value: org.example.kafka.streams
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: groupId, Value: org.example.kafka.streams
[INFO] Parameter: artifactId, Value: kafka-simple-app
[INFO] Project created from Archetype in dir: /home/hadoop/kafka-simple-app
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 40.597 s
[INFO] Finished at: 2022-11-05T08:16:52+09:00
[INFO] ------------------------------------------------------------------------3-3. デフォルトで作成された App.java および AppTest.java の削除
rm ./kafka-simple-app/src/main/java/org/example/kafka/streams/App.java
rm ./kafka-simple-app/src/test/java/org/example/kafka/streams/AppTest.java3-4. pom.xml の追記
以下を追記し保存します。
(pom.xml の一部)
xxx
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
xxx VS Code 上で保存すると、自動で dependencies を分析してくれる Dependency Analytics Extension のインストールが聞かれるのでインストールします。

3-5. Producer アプリケーションの作成
以下では kafkaAppProducer を作成し、1 から 100 の文字を Event として Topic に書き込む Producer を作成します。
package org.example.kafka.streams;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class kafkaAppProducer {
private static String topicName = "test";
public static void main(String[] args) {
// setting for KafkaProducer
Properties props = new Properties();
String kafkabrokers =System.getenv("KAFKABROKERS");
props.put("bootstrap.servers",kafkabrokers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// create the object to produce and send the message.
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}else{
String infmsg = String.format("we successfully sent is: partition:%d ,offset: %d" ,recordMetadata.partition(),recordMetadata.offset());
System.out.println(infmsg);
}
}
});
// close the producer.
producer.close();
}
}3-6. Producer アプリケーションの実行
Ctrl+Shift+p Java: Export Jar をクリックし、kafka-simple-app.jar を作成。

以下のコマンドを実行します。1 から 100 の数字を変換した文字列が 3 つのパーティションに分散され書き込まれました。
$ java -jar ./kafka-simple-app.jar
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
we successfully sent is: partition:2 ,offset: 49
we successfully sent is: partition:2 ,offset: 50
we successfully sent is: partition:2 ,offset: 51
we successfully sent is: partition:2 ,offset: 52
xxx
we successfully sent is: partition:2 ,offset: 69
we successfully sent is: partition:2 ,offset: 70
we successfully sent is: partition:0 ,offset: 91
we successfully sent is: partition:0 ,offset: 92
xxx
we successfully sent is: partition:0 ,offset: 134
we successfully sent is: partition:0 ,offset: 135
we successfully sent is: partition:1 ,offset: 82
we successfully sent is: partition:1 ,offset: 83
xxx
we successfully sent is: partition:1 ,offset: 111
we successfully sent is: partition:1 ,offset: 112
we successfully sent is: partition:1 ,offset: 113
we successfully sent is: partition:1 ,offset: 1143-7. 生成したデータを参照します。
Kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test --from-beginning▼4. 参考情報
以上です。参考になりましたら幸いです。