自己投資としてチャレンジしている内容を Blog で公開しています。
今回は CSV ファイルのデータを読み込み Cosmos DB (SQL API) に挿入する方法について紹介します。
▼1. CSV ファイルのデータを Cosmos DB (SQL API) に挿入する方法
Azure Cosmos DB (SQL API) に CSV ファイルのデータを挿入する方法を紹介します。
最初に CSV からデータを読むために Bufferedreader クラスの readLine メソッドを利用します。読んだデータは配列に入れ、 Cosmos DB へのデータ挿入用に作成した Family のクラスの引数として渡します。Family のクラスのインスタンス family を item として Cosmos DB のコレクション (コンテイナー) に入れます。
▼2. 事前準備
2-1. Azure Cosmos DB のトライアル版の利用
以下のサイトから無料で利用できる Azure Cosmos DB を用意します。Azure を利用するために必要な Subscription を用意する必要もありません。https://azure.microsoft.com/ja-jp/try/cosmosdb/
2-2. Azure Cosmos DB Java SDK v4 4.8.0 を利用
pom.xml に以下のバージョンが指定されていることを確認します。
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.8.0</version>
</dependency>参考情報: Azure Cosmos DB Java SDK v4 for Core (SQL) API: release notes and resources
https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-sdk-java-v4
2-3. Database および collection (container) の作成
詳細はこちらの blog を参照ください。
Java Azure Cosmos DB (SQL API) Database の作成方法 No.14
2-4. CSV ファイルを vi エディタで作成 “data.csv”
1,Travase,true
2,Ken,true
3,Satya,true
4,Simon,false
5,Jane,false
6,Kristy,true▼3. CSV ファイルのデータを Cosmos DB (SQL API) に挿入するコード
3-1. Java コードの作成
以下のコードで CSV ファイルのデータを読み Cosmos DB (SQL API) の collection (container) に挿入します。最後に挿入したデータ (item) を確認しています。CosmosDBAccountName、DatabaseName、end_point 、master_key、containerName はそれぞれ利用する値に書き換えます。
(例) パラメータの end_point は https://YourCosmosDBAccountName.documents.azure.com:443 のような URL になります。
import com.azure.cosmos.*;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.util.CosmosPagedIterable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.stream.Collectors;
public class ReadCSVInsertCosmosDBv1 {
protected static final Logger log = LoggerFactory.getLogger(ReadCSVInsertCosmosDBv1.class);
public static class Family {
public String id;
public String lastName;
public boolean isRegistered;
public String getId() {
return id;
}
public String getLastName() {
return lastName;
}
public Family(String idv, String lastnamev, boolean isv) {
id = idv;
lastName = lastnamev;
isRegistered = isv;
}
public Family() {
}
}
public static void main(String... args) {
String databaseName = "DatabaseName";
String end_point = "Cosmos DB Endpoint";
String master_key = "key";
String containerName = "collection";
BufferedReader reader = null;
try (CosmosClient cosmosClient = new CosmosClientBuilder().endpoint(end_point).key(master_key).directMode(DirectConnectionConfig.getDefaultConfig()).consistencyLevel(ConsistencyLevel.SESSION).connectionSharingAcrossClientsEnabled(true).contentResponseOnWriteEnabled(true).userAgentSuffix("Application").buildClient()){
log.info("The connection to Cosmos DB succeeded.");
CosmosDatabase database = cosmosClient.getDatabase(databaseName);
log.info("Create item");
CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
log.info("Set Container");
CosmosContainer container = database.getContainer(containerName);
try {
reader = new BufferedReader(new FileReader(new File("/home/UserA/Project/data.csv")));
String line;
String[] data;
log.info("Insert data into items");
while ((line = reader.readLine()) != null) {
data = line.split(",");
for (int i = 0; i < data.length; i = i + 3) {
Family family = new Family(data[i], data[i + 1], Boolean.valueOf(data[i + 2]));
CosmosItemResponse<Family> item = container.createItem(family, new PartitionKey(family.getLastName()),cosmosItemRequestOptions);
}
}
} catch (Exception e) {
log.info("Something went wrong! ", e);
} finally {
if (reader != null) {
try {
reader.close();
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
}
log.info("Set some common query options");
CosmosQueryRequestOptions queryOptions = new CosmosQueryRequestOptions();
CosmosPagedIterable<Family> familiesPagedIterable = container.queryItems("select * from c",queryOptions,Family.class);
log.info("Read data");
familiesPagedIterable.iterableByPage(30).forEach(cosmosItemPropertiesFeedResponse -> {
System.out.println("Got results " + cosmosItemPropertiesFeedResponse.getResults().size() + " items(s)");
System.out.println("Item Ids " + cosmosItemPropertiesFeedResponse
.getResults()
.stream()
.map(Family::getId)
.collect(Collectors.toList())
);
});
}catch(CosmosException e){
e.printStackTrace();
System.err.println(String.format("Read Item failed with %s",e));
}
}
}
3-2. 実行結果
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Initializing DocumentClient [0] with serviceEndpoint [https://YourCosmosDBAccountName.documents.azure.com:443], connectionPolicy [ConnectionPolicy{requestTimeout=PT5S, connectionMode=DIRECT, maxConnectionPoolSize=1000, idleHttpConnectionTimeout=null, idleTcpConnectionTimeout=PT0S, userAgentSuffix='My-Application-clienttest', throttlingRetryOptions=RetryOptions{maxRetryAttemptsOnThrottledRequests=9, maxRetryWaitTime=PT30S}, endpointDiscoveryEnabled=true, preferredRegions=null, multipleWriteRegionsEnabled=true, proxyType=null, inetSocketProxyAddress=null, readRequestsFallbackEnabled=true, connectTimeout=PT5S, idleTcpEndpointTimeout=PT1H, maxConnectionsPerEndpoint=130, maxRequestsPerConnection=30, tcpConnectionEndpointRediscoveryEnabled=false}], consistencyLevel [Session], directModeProtocol [Tcp]
[main] INFO com.azure.cosmos.implementation.http.SharedGatewayHttpClient - creating a new shared HttpClient
[parallel-1] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Getting database account endpoint from https://YourCosmosDBAccountName.documents.azure.com:443
[main] INFO com.azure.cosmos.implementation.directconnectivity.SharedTransportClient - creating a new shared RntbdTransportClient
[main] INFO com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient - Using default Direct TCP options: azure.cosmos.directTcp.defaultOptions
[main] INFO com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint$RntbdEndpointMonitoringProvider - Starting RntbdClientChannelPoolMonitoringProvider ...
[main] INFO ReadCSVCosmosDBv1 - The connection to Cosmos DB succeeded.
[main] INFO ReadCSVCosmosDBv1 - Create item
[main] INFO ReadCSVCosmosDBv1 - Set Container
[main] INFO ReadCSVCosmosDBv1 - Insert data into items
[cosmos-rntbd-nio-2-1] INFO com.azure.cosmos.implementation.SessionContainer - Registering a new collection resourceId [xxxxx] in SessionTokens
[main] INFO ReadCSVCosmosDBv1 - Set some common query options
[main] INFO ReadCSVCosmosDBv1 - Read data
Got results 21 items(s)
Item Ids [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21]
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Attempting to close client 0
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Shutting down ...
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Closing Global Endpoint Manager ...
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Closing StoreClientFactory ..
[main] INFO com.azure.cosmos.implementation.directconnectivity.SharedTransportClient - closing one reference to the shared RntbdTransportClient, the number of remaining references is 0
[main] INFO com.azure.cosmos.implementation.directconnectivity.SharedTransportClient - All references to shared RntbdTransportClient are closed. Closing the underlying RntbdTransportClient
[main] INFO com.azure.cosmos.implementation.directconnectivity.rntbd.RntbdServiceEndpoint$RntbdEndpointMonitoringProvider - Shutting down RntbdClientChannelPoolMonitoringProvider ...
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Shutting down reactorHttpClient ...
[main] INFO com.azure.cosmos.implementation.http.SharedGatewayHttpClient - closing one reference to the shared HttpClient, the number of remaining references is 0
[main] INFO com.azure.cosmos.implementation.http.SharedGatewayHttpClient - All references to shared HttpClient are closed. Closing the underlying HttpClient
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Shutting down CpuMonitor ...
[main] INFO com.azure.cosmos.implementation.RxDocumentClientImpl - Shutting down completed.▼4. 参考情報
(1) Azure Cosmos DB を無料で試す https://docs.microsoft.com/ja-jp/azure/cosmos-db/optimize-dev-test#try-azure-cosmos-db-for-free
(2)Java CSV ファイルのデータを読み出力する No.19 https://www.kumoman.net/site/2021/01/06/javareadcsv-no19-2021-01/
(3) CosmosItemResponse<T> Class https://docs.microsoft.com/en-us/java/api/com.azure.cosmos.models.cosmositemresponse?view=azure-java-stable
(4) CosmosContainer.<T>createItem Method https://docs.microsoft.com/en-us/java/api/com.azure.cosmos.cosmoscontainer.createitem?view=azure-java-stable
(5) sample https://github.com/Azure-Samples/azure-cosmos-java-getting-started/blob/master/src/main/java/com/azure/cosmos/sample/sync/SyncMain.java