Java CSV ファイルのデータを Azure Cosmos DB (SQL API) に入れる No.20

自己投資としてチャレンジしている内容を Blog で公開しています。

今回は CSV ファイルのデータを読み込み Cosmos DB (SQL API) に挿入する方法について紹介します。

▼1. CSV ファイルのデータを Cosmos DB (SQL API) に挿入する方法

Azure Cosmos DB (SQL API) に CSV ファイルのデータを挿入する方法を紹介します。
最初に CSV からデータを読むために Bufferedreader クラスの readLine メソッドを利用します。読んだデータは配列に入れ、 Cosmos DB へのデータ挿入用に作成した Family のクラスの引数として渡します。Family のクラスのインスタンス family を item として Cosmos DB のコレクション (コンテイナー) に入れます。


▼2. 事前準備

2-1. Azure Cosmos DB のトライアル版の利用

以下のサイトから無料で利用できる Azure Cosmos DB を用意します。Azure を利用するために必要な Subscription を用意する必要もありません。https://azure.microsoft.com/ja-jp/try/cosmosdb/

2-2.  Azure Cosmos DB Java SDK v4 4.8.0 を利用

pom.xml に以下のバージョンが指定されていることを確認します。

        <dependency>
            <groupId>com.azure</groupId>
            <artifactId>azure-cosmos</artifactId>
            <version>4.8.0</version>
        </dependency>

参考情報: Azure Cosmos DB Java SDK v4 for Core (SQL) API: release notes and resources
https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-java-v4

2-3. Database および collection (container) の作成

詳細はこちらの blog を参照ください。
Java Azure Cosmos DB (SQL API) Database の作成方法 No.14

2-4. CSV ファイルを vi エディタで作成 “data.csv”

1,Travase,true
2,Ken,true
3,Satya,true
4,Simon,false
5,Jane,false
6,Kristy,true

▼3. CSV ファイルのデータを Cosmos DB (SQL API) に挿入するコード

3-1. Java コードの作成

以下のコードで CSV ファイルのデータを読み Cosmos DB (SQL API) の collection (container) に挿入します。最後に挿入したデータ (item) を確認しています。CosmosDBAccountName、DatabaseName、end_point 、master_key、containerName はそれぞれ利用する値に書き換えます。

(例) パラメータの end_point は https://YourCosmosDBAccountName.documents.azure.com:443 のような URL になります。

import com.azure.cosmos.*;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.util.CosmosPagedIterable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.stream.Collectors;
public class ReadCSVInsertCosmosDBv1 {
    protected static final Logger log = LoggerFactory.getLogger(ReadCSVInsertCosmosDBv1.class);
    public static class Family {
        public String id;
        public String lastName;
        public boolean isRegistered;
        public String getId() {
            return id;
        }
        public String getLastName() {
            return lastName;
        }
        public Family(String idv, String lastnamev, boolean isv) {
            id = idv;
            lastName = lastnamev;
            isRegistered = isv;
        }
        public Family() {
        }
    }
    public static void main(String... args) {
        String databaseName = "DatabaseName";
        String end_point = "Cosmos DB Endpoint";
        String master_key = "key";
        String containerName = "collection";
        BufferedReader reader = null;
        try (CosmosClient cosmosClient = new CosmosClientBuilder().endpoint(end_point).key(master_key).directMode(DirectConnectionConfig.getDefaultConfig()).consistencyLevel(ConsistencyLevel.SESSION).connectionSharingAcrossClientsEnabled(true).contentResponseOnWriteEnabled(true).userAgentSuffix("Application").buildClient()){
            log.info("The connection to Cosmos DB succeeded.");
            CosmosDatabase database = cosmosClient.getDatabase(databaseName);
            log.info("Create item");
            CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
            log.info("Set Container");
            CosmosContainer container = database.getContainer(containerName);
            try {
                reader = new BufferedReader(new FileReader(new File("/home/UserA/Project/data.csv")));
                String line;
                String[] data;
                log.info("Insert data into items");
                while ((line = reader.readLine()) != null) {
                    data = line.split(",");
                    for (int i = 0; i < data.length; i = i + 3) {
                        Family family = new Family(data[i], data[i + 1], Boolean.valueOf(data[i + 2]));
                        CosmosItemResponse<Family> item = container.createItem(family, new PartitionKey(family.getLastName()),cosmosItemRequestOptions);
                    }
                }
            } catch (Exception e) {
                log.info("Something went wrong! ", e);
            } finally {
                if (reader != null) {
                    try {
                        reader.close();
                    } catch (IOException ioe) {
                        ioe.printStackTrace();
                    }
                }
            }
            log.info("Set some common query options");
            CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
            CosmosPagedIterable<Family> familiesPagedIterable = container.queryItems("select * from c",queryOptions,Family.class);
            log.info("Read data");
            familiesPagedIterable.iterableByPage(30).forEach(cosmosItemPropertiesFeedResponse ->            {
                System.out.println("Got results " + cosmosItemPropertiesFeedResponse.getResults().size() + " items(s)");
                System.out.println("Item Ids " + cosmosItemPropertiesFeedResponse
                        .getResults()
                        .stream()
                        .map(Family::getId)
                        .collect(Collectors.toList())
                );
            });
        }catch(CosmosException e){
            e.printStackTrace();
            System.err.println(String.format("Read Item failed with %s",e));
        }
    }
}


3-2. 実行結果

[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Initializing DocumentClient [0] with serviceEndpoint [https://YourCosmosDBAccountName.documents.azure.com:443], connectionPolicy [ConnectionPolicy{requestTimeout=PT5S, connectionMode=DIRECT, maxConnectionPoolSize=1000, idleHttpConnectionTimeout=null, idleTcpConnectionTimeout=PT0S, userAgentSuffix='My-Application-clienttest', throttlingRetryOptions=RetryOptions{maxRetryAttemptsOnThrottledRequests=9, maxRetryWaitTime=PT30S}, endpointDiscoveryEnabled=true, preferredRegions=null, multipleWriteRegionsEnabled=true, proxyType=null, inetSocketProxyAddress=null, readRequestsFallbackEnabled=true, connectTimeout=PT5S, idleTcpEndpointTimeout=PT1H, maxConnectionsPerEndpoint=130, maxRequestsPerConnection=30, tcpConnectionEndpointRediscoveryEnabled=false}], consistencyLevel [Session], directModeProtocol [Tcp]
[main] INFO com.azure.cosmos.implementation.http.SharedGatewayHttpClient - creating a new shared HttpClient
[parallel-1] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Getting database account endpoint from https://YourCosmosDBAccountName.documents.azure.com:443
[main] INFO com.azure.cosmos.implementation.directconnectivity.SharedTransportClient - creating a new shared RntbdTransportClient
[main] INFO com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient - Using default Direct TCP options: azure.cosmos.directTcp.defaultOptions
[main] INFO com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint$RntbdEndpointMonitoringProvider - Starting RntbdClientChannelPoolMonitoringProvider ...
[main] INFO ReadCSVCosmosDBv1 - The connection to Cosmos DB succeeded.
[main] INFO ReadCSVCosmosDBv1 - Create item
[main] INFO ReadCSVCosmosDBv1 - Set Container
[main] INFO ReadCSVCosmosDBv1 - Insert data into items
[cosmos-rntbd-nio-2-1] INFO com.azure.cosmos.implementation.SessionContainer - Registering a new collection resourceId [xxxxx] in SessionTokens
[main] INFO ReadCSVCosmosDBv1 - Set some common query options
[main] INFO ReadCSVCosmosDBv1 - Read data
Got results 21 items(s)
Item Ids [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Attempting to close client 0
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Shutting down ...
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Closing Global Endpoint Manager ...
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Closing StoreClientFactory ..
[main] INFO com.azure.cosmos.implementation.directconnectivity.SharedTransportClient - closing one reference to the shared RntbdTransportClient, the number of remaining references is 0
[main] INFO com.azure.cosmos.implementation.directconnectivity.SharedTransportClient - All references to shared RntbdTransportClient are closed. Closing the underlying RntbdTransportClient
[main] INFO com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint$RntbdEndpointMonitoringProvider - Shutting down RntbdClientChannelPoolMonitoringProvider ...
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Shutting down reactorHttpClient ...
[main] INFO com.azure.cosmos.implementation.http.SharedGatewayHttpClient - closing one reference to the shared HttpClient, the number of remaining references is 0
[main] INFO com.azure.cosmos.implementation.http.SharedGatewayHttpClient - All references to shared HttpClient are closed. Closing the underlying HttpClient
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Shutting down CpuMonitor ...
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Shutting down completed.

▼4. 参考情報

(1) Azure Cosmos DB を無料で試す https://docs.microsoft.com/ja-jp/azure/cosmos-db/optimize-dev-test#try-azure-cosmos-db-for-free
(2)Java CSV ファイルのデータを読み出力する No.19 https://www.kumoman.net/site/2021/01/06/javareadcsv-no19-2021-01/
(3) CosmosItemResponse<T> Class https://docs.microsoft.com/en-us/java/api/com.azure.cosmos.models.cosmositemresponse?view=azure-java-stable
(4) CosmosContainer.<T>createItem Method https://docs.microsoft.com/en-us/java/api/com.azure.cosmos.cosmoscontainer.createitem?view=azure-java-stable
(5) sample https://github.com/Azure-Samples/azure-cosmos-java-getting-started/blob/master/src/main/java/com/azure/cosmos/sample/sync/SyncMain.java



コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です