In this blog, Duplicate messages in Consumer are shown using Java with false of enable.auto.commit property.
Contents
- ▼1. What is enable.auto.commit property?
- ▼2. Prerequisites
- ▼3. Reproducing duplicate messages in Consumer
- 3-1. Deploying Consumer Java application
- 3-2. Runing Producer and write 5 messages in a topic
- 3-3. Commenting out consumer.commitSync(map) in Consumer of 3.1
- 3-4. Running the consumer which was updated in 3.3 to get messages.
- 3-5. Finding duplicate messages
Re-running the consumer which was updated in 3.3 caused the duplicate messages. - 3-6. Removing the commented-out line in 3-3 and re-running this consumer
- We could not find the duplicate messages.5 messages were committed.
- ▼4. Reference
▼1. What is enable.auto.commit property?
Setting enable.auto.commit means that offsets are committed automatically. if it is false, committing offset is needed in producer.
KafkaConsumer (clients 0.9.0.1 API) (apache.org) ”Manual Offset Control”
▼2. Prerequisites
- Install Apache Kafka 4 nodes No.42
- Apache Kafka – Develop Producer Java application No.66
- Apache Kafka – Consumer Java アプリケーションの作成 No.67
- Apache Kafka – Producer Java アプリケーションの作成 v2 No.68
▼3. Reproducing duplicate messages in Consumer
Setting Auto Commit to false and adding commit offset in Consumer are usually executed to read messages in Java. for the purpose of reproducing duplicate messages, I commented out commitSync() in this case.
3-1. Deploying Consumer Java application
Setting enable.auto.commit to false.
package org.example.kafka.app;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.time.Duration;
import java.util.*;
public class kafkaAppConsumerCom {
private static String topicName = "test1116";
public static void main(String[] args) {
// setting for KafkaConsumer
Properties props = new Properties();
String kafkabrokers =System.getenv("KAFKABROKERS");
props.put("bootstrap.servers",kafkabrokers);
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// create the object to consume and receive the message.
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
// subscribe the topic.
consumer.subscribe(Arrays.asList(topicName));
for(int count=0;count<300;count++){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for(ConsumerRecord<String, String> record: records){
System.out.printf("partition= %d,offset = %d, key = %s, value = %s%n",record.partition(), record.offset(), record.key(), record.value());
//commit offset
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
Map<TopicPartition, OffsetAndMetadata> map = new HashMap<TopicPartition, OffsetAndMetadata>();
map.put(topicPartition, offsetAndMetadata);
consumer.commitSync(map);
}
}
// close the consumer.
consumer.close();
}
}
3-2. Runing Producer and write 5 messages in a topic
Apache Kafka – Develop Producer Java application No.66
(e.g) we successfully sent is: partition:1 ,offset: 48 we successfully sent is: partition:1 ,offset: 49 we successfully sent is: partition:1 ,offset: 50 we successfully sent is: partition:2 ,offset: 47 we successfully sent is: partition:0 ,offset: 36
3-3. Commenting out consumer.commitSync(map) in Consumer of 3.1

3-4. Running the consumer which was updated in 3.3 to get messages.
(Output)
hadoop@nodemaster:~/kafkatest/kafka-consumer2/kafka-cons$ cd /home/hadoop/kafkatest/kafka-consumer2/kafka-cons ; /usr/bin/env /usr/lib/jvm/zulu-8-amd64/bin/java -cp /tmp/cp_5m27n4dvieqftfil1ysifhkxj.jar org.example.kafka.app.kafkaAppConsumerCom SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
partition= 2,offset = 47, key = null, value = {"name":"Alice", "age":36", city":"Beijing","datetime":2022-11-16T07:33:04.311Z}
partition= 1,offset = 48, key = null, value = {"name":"Jane", "age":30", city":"Singapore","datetime":2022-11-16T07:33:00.776Z}
partition= 1,offset = 49, key = null, value = {"name":"Sue", "age":32", city":"Sydney","datetime":2022-11-16T07:33:03.982Z}
partition= 1,offset = 50, key = null, value = {"name":"Bob", "age":34", city":"Melbourne","datetime":2022-11-16T07:33:04.143Z}
partition= 0,offset = 36, key = null, value = {"name":"Tom", "age":38", city":"Shanghai","datetime":2022-11-16T07:33:04.390Z}
3-5. Finding duplicate messages
Re-running the consumer which was updated in 3.3 caused the duplicate messages.
xxxx
partition= 2,offset = 47, key = null, value = {"name":"Alice", "age":36", city":"Beijing","datetime":2022-11-16T07:33:04.311Z}
partition= 1,offset = 48, key = null, value = {"name":"Jane", "age":30", city":"Singapore","datetime":2022-11-16T07:33:00.776Z}
partition= 1,offset = 49, key = null, value = {"name":"Sue", "age":32", city":"Sydney","datetime":2022-11-16T07:33:03.982Z}
partition= 1,offset = 50, key = null, value = {"name":"Bob", "age":34", city":"Melbourne","datetime":2022-11-16T07:33:04.143Z}
partition= 0,offset = 36, key = null, value = {"name":"Tom", "age":38", city":"Shanghai","datetime":2022-11-16T07:33:04.390Z}
3-6. Removing the commented-out line in 3-3 and re-running this consumer
We could not find the duplicate messages.5 messages were committed.
hadoop@nodemaster:~/kafkatest/kafka-consumer2/kafka-cons$ cd /home/hadoop/kafkatest/kafka-consumer2/kafka-cons ; /usr/bin/env /usr/lib/jvm/zulu-8-amd64/bin/java -cp /tmp/cp_5m27n4dvieqftfil1ysifhkxj.jar org.example.kafka.app.kafkaAppConsumerCom SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
▼4. Reference
- Apache Kafka
- Install Apache Kafka 4 nodes No.42
- Apache Kafka Word Count – Java No.44
- Apache Kafka – Develop Producer Java application No.66
- Apache Kafka – Consumer Java アプリケーションの作成 No.67
That’s all. Have a nice day, ahead !!!