Apache Kafka – Producer Java アプリケーションの作成 v2 No.68

自己投資としてチャレンジしている内容を Blog で公開しています。

今回は以前紹介した Apache Kafka で Broker へデータを送信するための Producer の Java アプリケーションに関連して、生成したJSON オブジェクトを Broker へ送信する Producer を作成したいと思います。


▼1. Broker のログの保持期間

クライアントが Topic に書き込んだデータ (ログ) は永遠に保持されるわけではなく、保持期間は以下の 2 つの設定のいずれかに該当した場合に古いものから削除されます。

Topic level PropertyServer Default Property説明
retention.mslog.retention.minutes既定は 10080 分 (7 日)。ログの保持期間を設定することができます。この期間を過ぎると古いデータは削除されます。
retention.byteslog.retention.bytes既定値は無し。ログのサイズが、この値を超えると古いデータが削除されます。

(参考) Apache Kafka


▼2. 事前準備

Kafka の環境を準備します。
Apache Kafka インストール 4 ノード No.42
Apache Kafka – Producer Java アプリケーションの作成 No.66
Apache Kafka – Consumer Java アプリケーションの作成 No.67


▼3. Producer アプリケーションの作成

生成したJSON オブジェクトを Broker へ送信する Producer の Java アプリを作成します。

3-1. Producer アプリケーションの作成

以下のような構成の JSON ファイルを生成し、Kafka の Broker へ送信します。

{
    "name": "Tom",
    "age": 30,
    "city": "Shanghai",
    "datetime": 2022-11-06T07: 20: 05.308Z
}
package org.example.kafka.streams;

import org.apache.kafka.clients.producer.*;

import java.time.Instant;
import java.util.Properties;

public class kafkaAppPro2 {
    // create json message and send to kafka
    public static void main(String[] args) {
        String topic = "test02";

        Properties props = new Properties();
        String kafkabrokers =System.getenv("KAFKABROKERS");
        props.put("bootstrap.servers",kafkabrokers);
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // create json message in table

        for (int i = 0; i < 100; i++) {
            // create json message
            // name is random and city is random
              String[] nalist = {"John", "Mary", "Peter", "Paul", "Mark", "Jane", "Sue", "Bob", "Alice", "Tom"};
            String[] clist = {"New York", "Paris", "London", "Tokyo", "Hong Kong", "Singapore", "Sydney", "Melbourne", "Beijing", "Shanghai"};
            String city = clist[i%10];
            String name = nalist[i%10];
            String json = "{\"name\":\"" + name + "\", \"age\":30, \"city\":\"" + city + "\",\"datetime\":" + Instant.now() + "}";

            // create the object to produce and send the message.
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            producer.send(new ProducerRecord<String, String>(topic, json), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        String infmsg = String.format("we successfully sent is: partition:%d ,offset: %d", recordMetadata.partition(), recordMetadata.offset());
                        System.out.println(infmsg);
                    }
                }
            });

            // close the producer.
            producer.close();
        }
    }
}

3-2. 3-1 で作成した Producer 、kafkaAppPro2 を実行し、JSON オブジェクトを Topic に出力します。

3-3. Consumer を起動し、指定した Topic のデータの読み込みを開始し、Producer によって書き込まれたデータをリアルタイムで確認します。

(出力)データの一部
xxxx
partition= 0,offset = 413, key = null, value = {"name":"Tom", "age":30, "city":"Shanghai","datetime":2022-11-06T07:20:05.308Z}
partition= 0,offset = 414, key = null, value = {"name":"John", "age":30, "city":"New York","datetime":2022-11-06T07:20:05.361Z}
partition= 0,offset = 415, key = null, value = {"name":"Mary", "age":30, "city":"Paris","datetime":2022-11-06T07:20:05.424Z}
partition= 0,offset = 416, key = null, value = {"name":"Peter", "age":30, "city":"London","datetime":2022-11-06T07:20:05.479Z}
partition= 0,offset = 417, key = null, value = {"name":"Paul", "age":30, "city":"Tokyo","datetime":2022-11-06T07:20:05.532Z}
partition= 0,offset = 418, key = null, value = {"name":"Mark", "age":30, "city":"Hong Kong","datetime":2022-11-06T07:20:05.593Z}
partition= 0,offset = 419, key = null, value = {"name":"Jane", "age":30, "city":"Singapore","datetime":2022-11-06T07:20:05.656Z}
partition= 0,offset = 420, key = null, value = {"name":"Sue", "age":30, "city":"Sydney","datetime":2022-11-06T07:20:05.720Z}
partition= 0,offset = 421, key = null, value = {"name":"Bob", "age":30, "city":"Melbourne","datetime":2022-11-06T07:20:05.773Z}
partition= 0,offset = 422, key = null, value = {"name":"Alice", "age":30, "city":"Beijing","datetime":2022-11-06T07:20:05.830Z}
partition= 0,offset = 423, key = null, value = {"name":"Tom", "age":30, "city":"Shanghai","datetime":2022-11-06T07:20:05.898Z}

▼4. 参考情報

  1. Apache Kafka
  2. Apache Kafka インストール 4 ノード No.42
  3. Apache Kafka – Producer Java アプリケーションの作成 No.66
  4. Apache Kafka – Consumer Java アプリケーションの作成 No.67

以上です。参考になりましたら幸いです。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です