Apache Kafka – Consumer Java – メッセージの重複 No.78

自己投資としてチャレンジしている内容を Blog で公開しています。

今回は前回お伝えした Apache Kafka で Broker からデータを受信する Consumer の Java アプリケーションの enable.auto.commit を false にし、メッセージの重複を意図的に発生させたいと思います。(In English Apache Kafka – Consumer Java – Duplicate Messages No.78)

▼1. enable.auto.commit とは

Consumer で設定可能な enable.auto.commit は、Offset を自動で commit するか否かを決めることができ、True もしくは False を指定します。

Offset を commit することで、どこまで処理が完了したか記録することができます。Consumer 側で Offset を commit しないと、どこまで読んだか記録されず、Consumer が再度同じメッセージを読む、メッセージの重複が発生します。

(参考) KafkaConsumer (clients 0.9.0.1 API) (apache.org) ”Manual Offset Control” の部分


▼2. 事前準備


▼3. Consumer が再度同じメッセージを読む、メッセージの重複を再現

Auto Commit を False にし、Consumer 側で Offset を手動で Commit する Java コードを作成し、Offset を Commit しなかった場合の、次の Consumer の実行時、同じデータを読んでいる状況 (メッセージの重複) を確認します。

3-1. Consumer アプリケーションの作成

30 秒間で指定した Topic に書き込まれたデータを参照する Consumer を作成します。enable.auto.commit は false にし、commit 処理を記載します。

package org.example.kafka.app;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

public class kafkaAppConsumerCom {
    private static String topicName = "test1116";

    public static void main(String[] args) {
        // setting for KafkaConsumer
        Properties props = new Properties();
        String kafkabrokers =System.getenv("KAFKABROKERS");
        props.put("bootstrap.servers",kafkabrokers);
        props.put("group.id", "test");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // create the object to consume and receive the message.
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        // subscribe the topic.
        consumer.subscribe(Arrays.asList(topicName));
        for(int count=0;count<300;count++){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for(ConsumerRecord<String, String> record: records){
                System.out.printf("partition= %d,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());

                //commit offset
                TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
                Map<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
                map.put(topicPartition, offsetAndMetadata);
                consumer.commitSync(map);
            }
        }
        // close the consumer.      
        consumer.close();
    }
}

3-2. Producer アプリケーションの実行し Topic にデータを書き込みます。

Apache Kafka – Producer Java アプリケーションの作成 No.66 を基に、テストのため 5 件のみデータを書き込みます。

we successfully sent is: partition:1 ,offset: 48
we successfully sent is: partition:1 ,offset: 49
we successfully sent is: partition:1 ,offset: 50
we successfully sent is: partition:2 ,offset: 47
we successfully sent is: partition:0 ,offset: 36

3-3. 3-1 で作成した Comsumer の 39 行目の consumer.commitSync(map); をコメントアウトします。

CommentOutconsumer.commitSync

3-4. 3-3 のコードを実行します。

以下の結果が得られます。

hadoop@nodemaster:~/kafkatest/kafka-consumer2/kafka-cons$  cd /home/hadoop/kafkatest/kafka-consumer2/kafka-cons ; /usr/bin/env /usr/lib/jvm/zulu-8-amd64/bin/java -cp /tmp/cp_5m27n4dvieqftfil1ysifhkxj.jar org.example.kafka.app.kafkaAppConsumerCom SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
partition= 2,offset = 47, key = null, value = {"name":"Alice", "age":36", city":"Beijing","datetime":2022-11-16T07:33:04.311Z}
partition= 1,offset = 48, key = null, value = {"name":"Jane", "age":30", city":"Singapore","datetime":2022-11-16T07:33:00.776Z}
partition= 1,offset = 49, key = null, value = {"name":"Sue", "age":32", city":"Sydney","datetime":2022-11-16T07:33:03.982Z}
partition= 1,offset = 50, key = null, value = {"name":"Bob", "age":34", city":"Melbourne","datetime":2022-11-16T07:33:04.143Z}
partition= 0,offset = 36, key = null, value = {"name":"Tom", "age":38", city":"Shanghai","datetime":2022-11-16T07:33:04.390Z}

3-5. メッセージの重複が確認できます。

再度 3-3 のコードを実行すると以下、3-4 と同じ結果が得られます。メッセージの重複が確認できます。

xxxx
partition= 2,offset = 47, key = null, value = {"name":"Alice", "age":36", city":"Beijing","datetime":2022-11-16T07:33:04.311Z}
partition= 1,offset = 48, key = null, value = {"name":"Jane", "age":30", city":"Singapore","datetime":2022-11-16T07:33:00.776Z}
partition= 1,offset = 49, key = null, value = {"name":"Sue", "age":32", city":"Sydney","datetime":2022-11-16T07:33:03.982Z}
partition= 1,offset = 50, key = null, value = {"name":"Bob", "age":34", city":"Melbourne","datetime":2022-11-16T07:33:04.143Z}
partition= 0,offset = 36, key = null, value = {"name":"Tom", "age":38", city":"Shanghai","datetime":2022-11-16T07:33:04.390Z}

3-6. 3-3 で設定したコメントアウトを外しコードを再度実行します。

Comsumer 側で OffSet を Commit し、再度実行し結果を得ます。

3-7. メッセージの重複読み取りが発生していないことが確認できます。

Offset Commit 後、再度 Consumer のアプリを実行すると、同じ結果は得られず、メッセージの重複読み取りが発生していないことが確認できます。

hadoop@nodemaster:~/kafkatest/kafka-consumer2/kafka-cons$  cd /home/hadoop/kafkatest/kafka-consumer2/kafka-cons ; /usr/bin/env /usr/lib/jvm/zulu-8-amd64/bin/java -cp /tmp/cp_5m27n4dvieqftfil1ysifhkxj.jar org.example.kafka.app.kafkaAppConsumerCom SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

▼4. 参考情報

  1. Apache Kafka
  2. Apache Kafka インストール 4 ノード No.42
  3. Apache Kafka Word Count 実装 – Java No.44
  4. Apache Kafka – Producer Java アプリケーションの作成 No.66
  5. Apache Kafka – Consumer Java アプリケーションの作成 No.67

以上です。参考になりましたら幸いです。



コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です