自己投資としてチャレンジしている内容を Blog で公開しています。
今回は以前紹介した Apache Kafka で Broker へデータを送信するための Producer の Java アプリケーションに関連して、生成したJSON オブジェクトを Broker へ送信する Producer を作成したいと思います。
Summary
▼1. Broker のログの保持期間
クライアントが Topic に書き込んだデータ (ログ) は永遠に保持されるわけではなく、保持期間は以下の 2 つの設定のいずれかに該当した場合に古いものから削除されます。
| Topic level Property | Server Default Property | 説明 |
| retention.ms | log.retention.minutes | 既定は 10080 分 (7 日)。ログの保持期間を設定することができます。この期間を過ぎると古いデータは削除されます。 |
| retention.bytes | log.retention.bytes | 既定値は無し。ログのサイズが、この値を超えると古いデータが削除されます。 |
(参考) Apache Kafka
▼2. 事前準備
Kafka の環境を準備します。
Apache Kafka インストール 4 ノード No.42
Apache Kafka – Producer Java アプリケーションの作成 No.66
Apache Kafka – Consumer Java アプリケーションの作成 No.67
▼3. Producer アプリケーションの作成
生成したJSON オブジェクトを Broker へ送信する Producer の Java アプリを作成します。
3-1. Producer アプリケーションの作成
以下のような構成の JSON ファイルを生成し、Kafka の Broker へ送信します。
{
"name": "Tom",
"age": 30,
"city": "Shanghai",
"datetime": 2022-11-06T07: 20: 05.308Z
}package org.example.kafka.streams;
import org.apache.kafka.clients.producer.*;
import java.time.Instant;
import java.util.Properties;
public class kafkaAppPro2 {
// create json message and send to kafka
public static void main(String[] args) {
String topic = "test02";
Properties props = new Properties();
String kafkabrokers =System.getenv("KAFKABROKERS");
props.put("bootstrap.servers",kafkabrokers);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// create json message in table
for (int i = 0; i < 100; i++) {
// create json message
// name is random and city is random
String[] nalist = {"John", "Mary", "Peter", "Paul", "Mark", "Jane", "Sue", "Bob", "Alice", "Tom"};
String[] clist = {"New York", "Paris", "London", "Tokyo", "Hong Kong", "Singapore", "Sydney", "Melbourne", "Beijing", "Shanghai"};
String city = clist[i%10];
String name = nalist[i%10];
String json = "{\"name\":\"" + name + "\", \"age\":30, \"city\":\"" + city + "\",\"datetime\":" + Instant.now() + "}";
// create the object to produce and send the message.
Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>(topic, json), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
} else {
String infmsg = String.format("we successfully sent is: partition:%d ,offset: %d", recordMetadata.partition(), recordMetadata.offset());
System.out.println(infmsg);
}
}
});
// close the producer.
producer.close();
}
}
}3-2. 3-1 で作成した Producer 、kafkaAppPro2 を実行し、JSON オブジェクトを Topic に出力します。
3-3. Consumer を起動し、指定した Topic のデータの読み込みを開始し、Producer によって書き込まれたデータをリアルタイムで確認します。
(出力)データの一部
xxxx
partition= 0,offset = 413, key = null, value = {"name":"Tom", "age":30, "city":"Shanghai","datetime":2022-11-06T07:20:05.308Z}
partition= 0,offset = 414, key = null, value = {"name":"John", "age":30, "city":"New York","datetime":2022-11-06T07:20:05.361Z}
partition= 0,offset = 415, key = null, value = {"name":"Mary", "age":30, "city":"Paris","datetime":2022-11-06T07:20:05.424Z}
partition= 0,offset = 416, key = null, value = {"name":"Peter", "age":30, "city":"London","datetime":2022-11-06T07:20:05.479Z}
partition= 0,offset = 417, key = null, value = {"name":"Paul", "age":30, "city":"Tokyo","datetime":2022-11-06T07:20:05.532Z}
partition= 0,offset = 418, key = null, value = {"name":"Mark", "age":30, "city":"Hong Kong","datetime":2022-11-06T07:20:05.593Z}
partition= 0,offset = 419, key = null, value = {"name":"Jane", "age":30, "city":"Singapore","datetime":2022-11-06T07:20:05.656Z}
partition= 0,offset = 420, key = null, value = {"name":"Sue", "age":30, "city":"Sydney","datetime":2022-11-06T07:20:05.720Z}
partition= 0,offset = 421, key = null, value = {"name":"Bob", "age":30, "city":"Melbourne","datetime":2022-11-06T07:20:05.773Z}
partition= 0,offset = 422, key = null, value = {"name":"Alice", "age":30, "city":"Beijing","datetime":2022-11-06T07:20:05.830Z}
partition= 0,offset = 423, key = null, value = {"name":"Tom", "age":30, "city":"Shanghai","datetime":2022-11-06T07:20:05.898Z}▼4. 参考情報
- Apache Kafka
- Apache Kafka インストール 4 ノード No.42
- Apache Kafka – Producer Java アプリケーションの作成 No.66
- Apache Kafka – Consumer Java アプリケーションの作成 No.67
以上です。参考になりましたら幸いです。