自己投資としてチャレンジしている内容を Blog で公開しています。
今回は前回お伝えした Apache Kafka で Broker からデータを受信する Consumer の Java アプリケーションの enable.auto.commit を false にし、メッセージの重複を意図的に発生させたいと思います。(In English Apache Kafka – Consumer Java – Duplicate Messages No.78)
▼1. enable.auto.commit とは
Consumer で設定可能な enable.auto.commit は、Offset を自動で commit するか否かを決めることができ、True もしくは False を指定します。
Offset を commit することで、どこまで処理が完了したか記録することができます。Consumer 側で Offset を commit しないと、どこまで読んだか記録されず、Consumer が再度同じメッセージを読む、メッセージの重複が発生します。
(参考) KafkaConsumer (clients 0.9.0.1 API) (apache.org) ”Manual Offset Control” の部分
▼2. 事前準備
- Apache Kafka インストール 4 ノード No.42
- Apache Kafka – Producer Java アプリケーションの作成 No.66
- Apache Kafka – Consumer Java アプリケーションの作成 No.67
- Apache Kafka – Producer Java アプリケーションの作成 v2 No.68
▼3. Consumer が再度同じメッセージを読む、メッセージの重複を再現
Auto Commit を False にし、Consumer 側で Offset を手動で Commit する Java コードを作成し、Offset を Commit しなかった場合の、次の Consumer の実行時、同じデータを読んでいる状況 (メッセージの重複) を確認します。
3-1. Consumer アプリケーションの作成
30 秒間で指定した Topic に書き込まれたデータを参照する Consumer を作成します。enable.auto.commit は false にし、commit 処理を記載します。
package org.example.kafka.app;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class kafkaAppConsumerCom {
private static String topicName = "test1116";
public static void main(String[] args) {
// setting for KafkaConsumer
Properties props = new Properties();
String kafkabrokers =System.getenv("KAFKABROKERS");
props.put("bootstrap.servers",kafkabrokers);
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// create the object to consume and receive the message.
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// subscribe the topic.
consumer.subscribe(Arrays.asList(topicName));
for(int count=0;count<300;count++){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String, String> record: records){
System.out.printf("partition= %d,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());
//commit offset
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
Map<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
map.put(topicPartition, offsetAndMetadata);
consumer.commitSync(map);
}
}
// close the consumer.
consumer.close();
}
}3-2. Producer アプリケーションの実行し Topic にデータを書き込みます。
Apache Kafka – Producer Java アプリケーションの作成 No.66 を基に、テストのため 5 件のみデータを書き込みます。
we successfully sent is: partition:1 ,offset: 48 we successfully sent is: partition:1 ,offset: 49 we successfully sent is: partition:1 ,offset: 50 we successfully sent is: partition:2 ,offset: 47 we successfully sent is: partition:0 ,offset: 36
3-3. 3-1 で作成した Comsumer の 39 行目の consumer.commitSync(map); をコメントアウトします。

3-4. 3-3 のコードを実行します。
以下の結果が得られます。
hadoop@nodemaster:~/kafkatest/kafka-consumer2/kafka-cons$ cd /home/hadoop/kafkatest/kafka-consumer2/kafka-cons ; /usr/bin/env /usr/lib/jvm/zulu-8-amd64/bin/java -cp /tmp/cp_5m27n4dvieqftfil1ysifhkxj.jar org.example.kafka.app.kafkaAppConsumerCom SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
partition= 2,offset = 47, key = null, value = {"name":"Alice", "age":36", city":"Beijing","datetime":2022-11-16T07:33:04.311Z}
partition= 1,offset = 48, key = null, value = {"name":"Jane", "age":30", city":"Singapore","datetime":2022-11-16T07:33:00.776Z}
partition= 1,offset = 49, key = null, value = {"name":"Sue", "age":32", city":"Sydney","datetime":2022-11-16T07:33:03.982Z}
partition= 1,offset = 50, key = null, value = {"name":"Bob", "age":34", city":"Melbourne","datetime":2022-11-16T07:33:04.143Z}
partition= 0,offset = 36, key = null, value = {"name":"Tom", "age":38", city":"Shanghai","datetime":2022-11-16T07:33:04.390Z}3-5. メッセージの重複が確認できます。
再度 3-3 のコードを実行すると以下、3-4 と同じ結果が得られます。メッセージの重複が確認できます。
xxxx
partition= 2,offset = 47, key = null, value = {"name":"Alice", "age":36", city":"Beijing","datetime":2022-11-16T07:33:04.311Z}
partition= 1,offset = 48, key = null, value = {"name":"Jane", "age":30", city":"Singapore","datetime":2022-11-16T07:33:00.776Z}
partition= 1,offset = 49, key = null, value = {"name":"Sue", "age":32", city":"Sydney","datetime":2022-11-16T07:33:03.982Z}
partition= 1,offset = 50, key = null, value = {"name":"Bob", "age":34", city":"Melbourne","datetime":2022-11-16T07:33:04.143Z}
partition= 0,offset = 36, key = null, value = {"name":"Tom", "age":38", city":"Shanghai","datetime":2022-11-16T07:33:04.390Z}3-6. 3-3 で設定したコメントアウトを外しコードを再度実行します。
Comsumer 側で OffSet を Commit し、再度実行し結果を得ます。
3-7. メッセージの重複読み取りが発生していないことが確認できます。
Offset Commit 後、再度 Consumer のアプリを実行すると、同じ結果は得られず、メッセージの重複読み取りが発生していないことが確認できます。
hadoop@nodemaster:~/kafkatest/kafka-consumer2/kafka-cons$ cd /home/hadoop/kafkatest/kafka-consumer2/kafka-cons ; /usr/bin/env /usr/lib/jvm/zulu-8-amd64/bin/java -cp /tmp/cp_5m27n4dvieqftfil1ysifhkxj.jar org.example.kafka.app.kafkaAppConsumerCom SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
▼4. 参考情報
- Apache Kafka
- Apache Kafka インストール 4 ノード No.42
- Apache Kafka Word Count 実装 – Java No.44
- Apache Kafka – Producer Java アプリケーションの作成 No.66
- Apache Kafka – Consumer Java アプリケーションの作成 No.67
以上です。参考になりましたら幸いです。