Apache Kafka – Consumer Java アプリケーションの作成 No.67

自己投資としてチャレンジしている内容を Blog で公開しています。
今回は Apache Kafka で Broker からデータを受信する Consumer の Java アプリケーションを実装してみたいと思います。

▼1. Apache Kafka Topic について

以下のコマンド kafka-topics.sh で Topic の情報が確認できます。ここで表示された Topic の情報について説明します。

kafka-topics.sh --describe --bootstrap-server $KAFKABROKERS --topic test

(結果)
Topic: test	TopicId: C6c29UKLQXxxXx2G6EmuyA	PartitionCount: 3	ReplicationFactor: 3	Configs: min.insync.replicas=2,segment.bytes=1073741824
	Topic: test	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
	Topic: test	Partition: 1	Leader: 2	Replicas: 2,0,1	Isr: 2,0,1
	Topic: test	Partition: 2	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2
  • Topic: メッセージが保存され、整理される場所。クライアントは指定した topic に対してメッセージを書き込み、そのメッセージを参照する。
  • Partition  : Topic 作成時に指定した partition 数に応じて Topic が分けられる。分けられた Partition に Broker が割り当てられる。
  • Broker : Kafka を構成するサーバーの中で Storage 機能を持つサーバー。Broker にデータが保存され、Broker のデータが参照される。
  • ReplicationFactor:  Replica の数を決める。
  • Replicas: Partition に割り当てられた複数の Broker の id 。Partition に関わるログをそれぞれの Broker で複製している。
  • Leader: Partition に割り当てられた複数の Broker (Replicas) の中で、現在、読み書きしている broker の id 。Leader が利用できなくなると、他の broker に代わる。(***)
  • Isr:  “In-Sync Replicas” の略。同期中の Replica を表示。

設定の部分

  • min.insync.replicas : 複製可能な同期している状態で Commit されたデータが書き込まれる最小の replica 数。指定した数よりも replica が少ないと、クライアントの書き込みを受け付けず、書き込みができなくなる。
  • segment.bytes: 既定では 1 GB (1,073,741,824 Byte) , 単位 Byte 。ログのセグメントのサイズ。ログの保持期間 (retention) や削除 (cleaning) がファイルに対して一度で行われるため、大きいセグメントファイルを設定すると、削除対象のファイルは少なくて済むが、保持期間に関して細かく設定できない。ログのセグメントファイルのサイズを超えると、別のログファイルが作成される。

(***) Leader の 0,1,2 は broker の id となります。Broker の id は以下のコマンド zookeeper-shell.sh で確認できます。

zookeeper-shell.sh $KAFKAZOOKEEPER ls /brokers/ids

Connecting to 10.0.0.1:2181
WATCHER::
WatchedEvent state:SyncConnected type:None path:null
[0,1,2]

(参考) Apache Kafka, ReplicationFactor vs replicas in kafka – Stack Overflow, Apache Kafka ”Topic-level configuration”


▼2. 事前準備

Kafka の環境を準備します。Apache Kafka インストール 4 ノード No.42
また、データを kafka に書き込む Producer も用意します。Apache Kafka – Producer Java アプリケーションの作成 No.66


▼3. Consumer アプリケーションの作成

指定した Topic のデータを参照する Consumer の Java アプリを作成します。

3-1. Consumer アプリケーションの作成

30 秒間で指定した Topic に書き込まれたデータを参照する Consumer を作成します。

package org.example.kafka.streams;

import org.apache.kafka.clients.consumer.*;

import java.time.Duration;
import java.util.*;

public class kafkaAppConsumer {
    private static String topicName = "test01";

    /**
     * @param args
     */
    public static void main(String[] args) {
        // setting for KafkaConsumer
        Properties props = new Properties();
        String kafkabrokers =System.getenv("KAFKABROKERS");
        props.put("bootstrap.servers",kafkabrokers);
        props.put("group.id", "test");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // create the object to consume and receive the message.
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        // subscribe the topic.
        consumer.subscribe(Arrays.asList(topicName));
        for(int count=0;count<300;count++){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for(ConsumerRecord<String, String> record: records){
                System.out.printf("partition= %d,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());
            }
        }
        // close the consumer.      
        consumer.close();
    }
}

3-2. Producer アプリケーションの実行し Topic にデータを書き込みます。

$ java -jar ./kafka-simple-app.jar 

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
we successfully sent is: partition:2 ,offset: 49
we successfully sent is: partition:2 ,offset: 50
we successfully sent is: partition:2 ,offset: 51
we successfully sent is: partition:2 ,offset: 52
xxx
we successfully sent is: partition:2 ,offset: 69
we successfully sent is: partition:2 ,offset: 70
we successfully sent is: partition:0 ,offset: 91
we successfully sent is: partition:0 ,offset: 92
xxx
we successfully sent is: partition:0 ,offset: 134
we successfully sent is: partition:0 ,offset: 135
we successfully sent is: partition:1 ,offset: 82
we successfully sent is: partition:1 ,offset: 83
xxx
we successfully sent is: partition:1 ,offset: 111
we successfully sent is: partition:1 ,offset: 112
we successfully sent is: partition:1 ,offset: 113
we successfully sent is: partition:1 ,offset: 114

3-3. 3-1 で作成した Comsumer を以下のように実行し、Topic に書き込まれたデータを参照します。

(結果)一部データのみ記載

partition= 2,offset = 10, key = 0, value = 0
partition= 2,offset = 11, key = 2, value = 2
partition= 2,offset = 12, key = 3, value = 3
partition= 2,offset = 13, key = 9, value = 9
xxx
partition= 2,offset = 22, key = 57, value = 57
partition= 2,offset = 23, key = 69, value = 69
partition= 0,offset = 10, key = 1, value = 1
partition= 0,offset = 11, key = 5, value = 5
xxx
partition= 0,offset = 53, key = 98, value = 98
partition= 0,offset = 54, key = 99, value = 99
partition= 1,offset = 10, key = 4, value = 4
partition= 1,offset = 11, key = 6, value = 6
xxx
partition= 1,offset = 41, key = 94, value = 94
partition= 1,offset = 42, key = 95, value = 95
partition= 2,offset = 24, key = 73, value = 73
partition= 2,offset = 25, key = 74, value = 74
xxx
partition= 2,offset = 30, key = 91, value = 91
partition= 2,offset = 31, key = 97, value = 97

▼4. 参考情報

1. Apache Kafka
2. Apache Kafka インストール 4 ノード No.42
3. Apache Kafka Word Count 実装 – Java No.44
4.. Apache Kafka – Producer Java アプリケーションの作成 No.66

以上です。参考になりましたら幸いです。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です