Apache Kafka – Develop Producer Java application No.66

How to develop Producer application by Java on Visual Studio Code is shown in this blog.

▼1. Apache Kafka

  • Event : record
  • Topic: it is similar to a folder in a filesystem, and the events are the files in that folder.
  • Partition : Topics are partitioned, meaning a topic is spread over a number of “buckets” located on different Kafka brokers.
  • Broker : Kafka is run as a cluster of one or more servers that can span multiple datacenters or cloud regions. Some of these servers form the storage layer, called the brokers.
  • Producer :  it is a client application that publish (write) events to Kafka.
  • Consumer : it is one that subscribe to (read and process) events.
  • Offset: sequence number of message. it shows the committed sequence number where consumer has read.

Ref: Apache Kafka


▼2. Prerequisites

Install Apache Kafka 4 nodes No.42

Apache Kafka Word Count – Java No.44
“2-3. Installing Apache Maven”

▼3. Develop Producer Java application

This producer Java application write events from 1 to 100 into a topic.

3-1. Make a directory
Make a directory to develop an application.

mkidr kafka-simple-app
cd kafka-simple-app

3-2. Create a project using Apache Maven

mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DarchetypeArtifactId=maven-archetype-simple -DgroupId=org.example.kafka.streams -DartifactId=kafka-simple-app -Dversion=1.0-SNAPSHOT -DinteractiveMode=false

(一部の出力)
xxx
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Archetype: maven-archetype-simple:1.4
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.example.kafka.streams
[INFO] Parameter: artifactId, Value: kafka-simple-app
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: package, Value: org.example.kafka.streams
[INFO] Parameter: packageInPathFormat, Value: org/example/kafka/streams
[INFO] Parameter: package, Value: org.example.kafka.streams
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] Parameter: groupId, Value: org.example.kafka.streams
[INFO] Parameter: artifactId, Value: kafka-simple-app
[INFO] Project created from Archetype in dir: /home/hadoop/kafka-simple-app
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  40.597 s
[INFO] Finished at: 2022-11-05T08:16:52+09:00
[INFO] ------------------------------------------------------------------------

3-3. Remove the App.java and AppTest.java files

rm ./kafka-simple-app/src/main/java/org/example/kafka/streams/App.java
rm ./kafka-simple-app/src/test/java/org/example/kafka/streams/AppTest.java

3-4. Update pom.xml file

Adding below in pom.xml
(a part of pom.xml)

xxx  
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.13</artifactId>
      <version>3.3.1</version>
    </dependency>
  </dependencies>
xxx  

Dependency Analytics Extension need to be installed on VS Code to check and update dependencies automatically.

InstallDevendencyAnalyticsExtension

3-5. Develop Producer application

Develop kafkaAppProducer below. it write events from 1 to 100 into a topic.

package org.example.kafka.streams;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class kafkaAppProducer {
    private static String topicName = "test";

    public static void main(String[] args) {
        
        // setting for KafkaProducer
        Properties props = new Properties();
        String kafkabrokers =System.getenv("KAFKABROKERS");
        props.put("bootstrap.servers",kafkabrokers);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // create the object to produce and send the message.
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for (int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i)), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    }else{
                        String infmsg = String.format("we successfully sent is: partition:%d ,offset: %d" ,recordMetadata.partition(),recordMetadata.offset());
                        System.out.println(infmsg);
                    }
                }
            });
        
        // close the producer.
        producer.close();
    }   
}

3-6. Execute Producer application

Click “Ctrl+Shift+p Java: Export Jar” and find the generated “kafka-simple-app.jar” file.

createJarFile

Run the following command. the number from 1 to 100 are written into 3 partitions.

$ java -jar ./kafka-simple-app.jar 

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
we successfully sent is: partition:2 ,offset: 49
we successfully sent is: partition:2 ,offset: 50
we successfully sent is: partition:2 ,offset: 51
we successfully sent is: partition:2 ,offset: 52
xxx
we successfully sent is: partition:2 ,offset: 69
we successfully sent is: partition:2 ,offset: 70
we successfully sent is: partition:0 ,offset: 91
we successfully sent is: partition:0 ,offset: 92
xxx
we successfully sent is: partition:0 ,offset: 134
we successfully sent is: partition:0 ,offset: 135
we successfully sent is: partition:1 ,offset: 82
we successfully sent is: partition:1 ,offset: 83
xxx
we successfully sent is: partition:1 ,offset: 111
we successfully sent is: partition:1 ,offset: 112
we successfully sent is: partition:1 ,offset: 113
we successfully sent is: partition:1 ,offset: 114

3-7. Retrieve the generated data

Kafka-console-consumer.sh --bootstrap-server $KAFKABROKERS --topic test --from-beginning

▼4. Reference

That’s all. Have a nice day ahead !!!

Leave a Reply

Your email address will not be published. Required fields are marked *