自己投資としてチャレンジしている内容を Blog で公開しています。
今回は Apache Spark の環境で利用できる PySpark を利用し Azure Cosmos DB への接続、クエリの実行、実行結果を Parquet ファイルへ保存する方法を紹介します。
————————————
▼1. Apache Spark 3 の環境から Azure Cosmos DB へ接続する方法
————————————
Apache Spark 3 の環境で Azure Cosmos DB に接続するには、Azure Cosmos DB Connector for Apache Spark 3 が必要です。以下のサイトからダウンロードします。
例) com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12 の場合、2020年10月中旬の時点で最新の version は 4.3.1 となります。
com.azure.cosmos.spark : azure-cosmos-spark_3-1_2-12 – Maven Central Repository Search
————————————
▼2. 事前準備
————————————
2-1. Apache Spark クラスターを作成します。詳細は以前 blog で紹介した下記サイトを参照ください。
【最新】【プログラミング学習】Apache Spark インストール – 3 ノード No.29 – 2021/07 – 【挑戦する人を応援するサイト】Kumoman Blog
2-2. 無料で利用できる Azure Cosmos DB を以下のサイトから用意します。Azure を利用するために必要な Subscription を用意する必要もありません。
https://azure.microsoft.com/ja-jp/try/cosmosdb/
2-3. Azure Cosmos DB Connector for Apache Spark 3 をダウンロードします。
例) azure-cosmos-spark_3-1_2-12-4.3.1.jar の場合
com.azure.cosmos.spark : azure-cosmos-spark_3-1_2-12 : 4.3.1 – Maven Central Repository Search
————————————
▼3. PySpark を使い、Azure Cosmos DB へ接続後、クエリを実行し、結果を Parquet ファイルへ保存する方法
————————————
以下の順に進めます。
————–
3-1. Azure Cosmos DB Connector for Apache Spark 3 の jar ファイルを指定して pyspark を起動。
3-2. Cosmos DB の Database および Container を作成します。
3-3. Container にデータを入れます。
3-4. Container のデータを読み込み出力します。また Container のスキーマーを確認します。
3-5. 読み込んだデータを parquet ファイルに保存します。保存後、hdfs dfs -ls のコマンドで保存した parquet ファイルを確認します。
3-6. Spark UI で Spark Job の実行履歴を確認します。
————–
3-1. Azure Cosmos DB Connector for Apache Spark 3 の jar ファイルを指定して pyspark を起動。
$ pyspark --jars azure-cosmos-spark_3-1_2-12-4.3.1.jar
Python 3.8.10 (default, Sep 28 2021, 16:10:42)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
2021-10-19 22:26:57,756 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Python version 3.8.10 (default, Sep 28 2021 16:10:42)
Spark context Web UI available at http://xxxxx:4040
Spark context available as 'sc' (master = local[*], app id = local-1634650023016).
SparkSession available as 'spark'.
>>> 3-2. Cosmos DB の Database および Container を作成します。
最初に Cosmos DB の接続情報を指定し、データベース名、コンテナを作成します。
>>> # Define connection information, Database name and Container/Collection name.
>>> cosmosEndpoint = "https://COSMOSDBACCOUNT.documents.azure.com:443/"
>>> cosmosMasterKey = "COSMOSDBACCOUNTKEY"
>>> cosmosDatabaseName = "testdb"
>>> cosmosContainerName = "data"
>>>
>>> cfg3 = {
... "spark.cosmos.accountEndpoint" : cosmosEndpoint,
... "spark.cosmos.accountKey" : cosmosMasterKey,
... "spark.cosmos.database" : cosmosDatabaseName,
... "spark.cosmos.container" : cosmosContainerName,
... }
>>>
>>> # Configure Catalog Api to be used
>>> spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
>>> spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
>>> spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
>>>
>>> # create a cosmos database using catalog api
>>> spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))
DataFrame[]
>>>
>>> # create a cosmos container/collection using catalog api
>>> spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/num', manualThroughput = '400')".format(cosmosDatabaseName, cosmosContainerName))
DataFrame[]3-3. Container にデータを入れます。
>>> # insert records into the
>>> spark.createDataFrame((("10","A1", "cat", 2, True), ("20","A2","dog", 4, False)))\
... .toDF("id","num","name","age","isAlive") \
... .write\
... .format("cosmos.oltp")\
... .options(**cfg3)\
... .mode("APPEND")\
... .save()(結果)

3-4. Container のデータを読み込み出力します。また Container のスキーマーも確認します。Container のデータを読み込みは、2 通りあり、自動的にスキーマを認識する ”spark.cosmos.read.inferSchema.enabled” を true にした方法と、spark.read.schema により手動でスキーマを指定する方法を記載しています。
>>> # show schema and records
>>> df = spark.read.format("cosmos.oltp").options(**cfg3)\
... .option("spark.cosmos.read.inferSchema.enabled", "true")\
... .load()
>>> df.printSchema()
root
|-- name: string (nullable = true)
|-- isAlive: boolean (nullable = true)
|-- age: integer (nullable = true)
|-- num: string (nullable = true)
|-- id: string (nullable = false)
>>> df.show()
2021-10-19 22:33:45,387 WARN util.CaseInsensitiveStringMap: Converting duplicated key spark.cosmos.read.inferSchema.enabled into CaseInsensitiveStringMap.
2021-10-19 22:33:45,387 WARN util.CaseInsensitiveStringMap: Converting duplicated key spark.cosmos.accountkey into CaseInsensitiveStringMap.
2021-10-19 22:33:45,388 WARN util.CaseInsensitiveStringMap: Converting duplicated key spark.cosmos.accountendpoint into CaseInsensitiveStringMap.
+----+-------+---+---+---+
|name|isAlive|age|num| id|
+----+-------+---+---+---+
| dog| false| 4| A2| 20|
| cat| true| 2| A1| 10|
+----+-------+---+---+---+
>>> # After defining a schema in pyspark, show the schema
>>> from pyspark.sql.types import *
>>> schema = StructType([
... StructField("id", StringType()),
... StructField("num", StringType()),
... StructField("name", StringType()),
... StructField("type", StringType()),
... StructField("age", IntegerType()),
... StructField("isAlive", BooleanType())
... ])
>>>
>>> df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg3)\
... .load()
>>> df.printSchema()
root
|-- id: string (nullable = true)
|-- num: string (nullable = true)
|-- name: string (nullable = true)
|-- type: string (nullable = true)
|-- age: integer (nullable = true)
|-- isAlive: boolean (nullable = true)
3-5. 読み込んだデータを parquet ファイルに保存します。保存後、hdfs dfs -ls のコマンドで保存した parquet ファイルを確認します。
>>> df1.write.mode("overwrite").parquet("appbox/df1cosmos")
2021-10-19 23:34:03,274 WARN util.CaseInsensitiveStringMap: Converting duplicated key spark.cosmos.accountkey into CaseInsensitiveStringMap.
2021-10-19 23:34:03,274 WARN util.CaseInsensitiveStringMap: Converting duplicated key spark.cosmos.accountendpoint into CaseInsensitiveStringMap.
>>>
>>> spark.read.parquet("appbox/df1cosmos").show()
+---+---+----+----+---+-------+
| id|num|name|type|age|isAlive|
+---+---+----+----+---+-------+
| 20| A2| dog|null| 4| false|
| 10| A1| cat|null| 2| true|
+---+---+----+----+---+-------+別途 Terminal を立ち上げて、hdfs dfs -ls のコマンドで保存した parquet ファイルを確認します。
$ hdfs dfs -ls appbox/df1cosmos
Found 3 items
-rw-r--r-- 1 hadoop supergroup 0 2021-10-19 23:34 appbox/df1cosmos/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 1332 2021-10-19 23:34 appbox/df1cosmos/part-00000-349aca57-60f9-4fb8-abe7-c20e483c52b6-c000.snappy.parquet
-rw-r--r-- 1 hadoop supergroup 1332 2021-10-19 23:34 appbox/df1cosmos/part-00001-349aca57-60f9-4fb8-abe7-c20e483c52b6-c000.snappy.parquet3-6. 最後に、Spark UI で Spark のジョブ実行履歴を確認します。

————————————
▼4. 参考情報
————————————
(1) Quickstart – Manage data with Azure Cosmos DB Spark 3 OLTP Connector for SQL API | Microsoft Docs
(2) GitHub – Azure/azure-cosmosdb-spark: Apache Spark Connector for Azure Cosmos DB
(3) azure-sdk-for-java/configuration-reference.md at main · Azure/azure-sdk-for-java · GitHub
以上です。参考になりましたら幸いです。