自己投資としてチャレンジしている内容を Blog で公開しています。
今回は Apache Kafka で Broker からデータを受信する Consumer の Java アプリケーションを実装してみたいと思います。
▼1. Apache Kafka Topic について
以下のコマンド kafka-topics.sh で Topic の情報が確認できます。ここで表示された Topic の情報について説明します。
kafka-topics.sh --describe --bootstrap-server $KAFKABROKERS --topic test
(結果)
Topic: test TopicId: C6c29UKLQXxxXx2G6EmuyA PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2,segment.bytes=1073741824
Topic: test Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: test Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: test Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
- Topic: メッセージが保存され、整理される場所。クライアントは指定した topic に対してメッセージを書き込み、そのメッセージを参照する。
- Partition : Topic 作成時に指定した partition 数に応じて Topic が分けられる。分けられた Partition に Broker が割り当てられる。
- Broker : Kafka を構成するサーバーの中で Storage 機能を持つサーバー。Broker にデータが保存され、Broker のデータが参照される。
- ReplicationFactor: Replica の数を決める。
- Replicas: Partition に割り当てられた複数の Broker の id 。Partition に関わるログをそれぞれの Broker で複製している。
- Leader: Partition に割り当てられた複数の Broker (Replicas) の中で、現在、読み書きしている broker の id 。Leader が利用できなくなると、他の broker に代わる。(***)
- Isr: “In-Sync Replicas” の略。同期中の Replica を表示。
設定の部分
- min.insync.replicas : 複製可能な同期している状態で Commit されたデータが書き込まれる最小の replica 数。指定した数よりも replica が少ないと、クライアントの書き込みを受け付けず、書き込みができなくなる。
- segment.bytes: 既定では 1 GB (1,073,741,824 Byte) , 単位 Byte 。ログのセグメントのサイズ。ログの保持期間 (retention) や削除 (cleaning) がファイルに対して一度で行われるため、大きいセグメントファイルを設定すると、削除対象のファイルは少なくて済むが、保持期間に関して細かく設定できない。ログのセグメントファイルのサイズを超えると、別のログファイルが作成される。
(***) Leader の 0,1,2 は broker の id となります。Broker の id は以下のコマンド zookeeper-shell.sh で確認できます。
zookeeper-shell.sh $KAFKAZOOKEEPER ls /brokers/ids
Connecting to 10.0.0.1:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[0,1,2](参考) Apache Kafka, ReplicationFactor vs replicas in kafka – Stack Overflow, Apache Kafka ”Topic-level configuration”
▼2. 事前準備
Kafka の環境を準備します。Apache Kafka インストール 4 ノード No.42
また、データを kafka に書き込む Producer も用意します。Apache Kafka – Producer Java アプリケーションの作成 No.66
▼3. Consumer アプリケーションの作成
指定した Topic のデータを参照する Consumer の Java アプリを作成します。
3-1. Consumer アプリケーションの作成
30 秒間で指定した Topic に書き込まれたデータを参照する Consumer を作成します。
package org.example.kafka.streams;
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.*;
public class kafkaAppConsumer {
private static String topicName = "test01";
/**
* @param args
*/
public static void main(String[] args) {
// setting for KafkaConsumer
Properties props = new Properties();
String kafkabrokers =System.getenv("KAFKABROKERS");
props.put("bootstrap.servers",kafkabrokers);
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// create the object to consume and receive the message.
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// subscribe the topic.
consumer.subscribe(Arrays.asList(topicName));
for(int count=0;count<300;count++){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String, String> record: records){
System.out.printf("partition= %d,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());
}
}
// close the consumer.
consumer.close();
}
}3-2. Producer アプリケーションの実行し Topic にデータを書き込みます。
$ java -jar ./kafka-simple-app.jar
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
we successfully sent is: partition:2 ,offset: 49
we successfully sent is: partition:2 ,offset: 50
we successfully sent is: partition:2 ,offset: 51
we successfully sent is: partition:2 ,offset: 52
xxx
we successfully sent is: partition:2 ,offset: 69
we successfully sent is: partition:2 ,offset: 70
we successfully sent is: partition:0 ,offset: 91
we successfully sent is: partition:0 ,offset: 92
xxx
we successfully sent is: partition:0 ,offset: 134
we successfully sent is: partition:0 ,offset: 135
we successfully sent is: partition:1 ,offset: 82
we successfully sent is: partition:1 ,offset: 83
xxx
we successfully sent is: partition:1 ,offset: 111
we successfully sent is: partition:1 ,offset: 112
we successfully sent is: partition:1 ,offset: 113
we successfully sent is: partition:1 ,offset: 1143-3. 3-1 で作成した Comsumer を以下のように実行し、Topic に書き込まれたデータを参照します。

(結果)一部データのみ記載
partition= 2,offset = 10, key = 0, value = 0
partition= 2,offset = 11, key = 2, value = 2
partition= 2,offset = 12, key = 3, value = 3
partition= 2,offset = 13, key = 9, value = 9
xxx
partition= 2,offset = 22, key = 57, value = 57
partition= 2,offset = 23, key = 69, value = 69
partition= 0,offset = 10, key = 1, value = 1
partition= 0,offset = 11, key = 5, value = 5
xxx
partition= 0,offset = 53, key = 98, value = 98
partition= 0,offset = 54, key = 99, value = 99
partition= 1,offset = 10, key = 4, value = 4
partition= 1,offset = 11, key = 6, value = 6
xxx
partition= 1,offset = 41, key = 94, value = 94
partition= 1,offset = 42, key = 95, value = 95
partition= 2,offset = 24, key = 73, value = 73
partition= 2,offset = 25, key = 74, value = 74
xxx
partition= 2,offset = 30, key = 91, value = 91
partition= 2,offset = 31, key = 97, value = 97▼4. 参考情報
1. Apache Kafka
2. Apache Kafka インストール 4 ノード No.42
3. Apache Kafka Word Count 実装 – Java No.44
4.. Apache Kafka – Producer Java アプリケーションの作成 No.66
以上です。参考になりましたら幸いです。