PySpark on Apache Spark – Azure Cosmos DB への接続 No.40

自己投資としてチャレンジしている内容を Blog で公開しています。

今回は Apache Spark の環境で利用できる PySpark を利用し Azure Cosmos DB への接続、クエリの実行、実行結果を Parquet ファイルへ保存する方法を紹介します。

————————————
▼1. Apache Spark 3 の環境から Azure Cosmos DB へ接続する方法
————————————
Apache Spark 3 の環境で Azure Cosmos DB に接続するには、Azure Cosmos DB Connector for Apache Spark 3 が必要です。以下のサイトからダウンロードします。

例) com.azure.cosmos.spark:azure-cosmos-spark_3-1_2-12 の場合、2020年10月中旬の時点で最新の version は 4.3.1 となります。
com.azure.cosmos.spark : azure-cosmos-spark_3-1_2-12 – Maven Central Repository Search

————————————
▼2. 事前準備
————————————
2-1. Apache Spark クラスターを作成します。詳細は以前 blog で紹介した下記サイトを参照ください。
【最新】【プログラミング学習】Apache Spark インストール – 3 ノード No.29 – 2021/07 – 【挑戦する人を応援するサイト】Kumoman Blog

2-2. 無料で利用できる Azure Cosmos DB を以下のサイトから用意します。Azure を利用するために必要な Subscription を用意する必要もありません。
https://azure.microsoft.com/ja-jp/try/cosmosdb/

2-3. Azure Cosmos DB Connector for Apache Spark 3 をダウンロードします。
例) azure-cosmos-spark_3-1_2-12-4.3.1.jar の場合
com.azure.cosmos.spark : azure-cosmos-spark_3-1_2-12 : 4.3.1 – Maven Central Repository Search

————————————
▼3. PySpark を使い、Azure Cosmos DB へ接続後、クエリを実行し、結果を Parquet ファイルへ保存する方法
————————————
以下の順に進めます。
————–
3-1. Azure Cosmos DB Connector for Apache Spark 3 の jar ファイルを指定して pyspark を起動。
3-2. Cosmos DB の Database および Container を作成します。
3-3. Container にデータを入れます。
3-4. Container のデータを読み込み出力します。また Container のスキーマーを確認します。
3-5. 読み込んだデータを parquet ファイルに保存します。保存後、hdfs dfs -ls のコマンドで保存した parquet ファイルを確認します。
3-6. Spark UI で Spark Job の実行履歴を確認します。
————–

3-1. Azure Cosmos DB Connector for Apache Spark 3 の jar ファイルを指定して pyspark を起動。

$ pyspark --jars azure-cosmos-spark_3-1_2-12-4.3.1.jar
Python 3.8.10 (default, Sep 28 2021, 16:10:42) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
2021-10-19 22:26:57,756 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.8.10 (default, Sep 28 2021 16:10:42)
Spark context Web UI available at http://xxxxx:4040
Spark context available as 'sc' (master = local[*], app id = local-1634650023016).
SparkSession available as 'spark'.
>>> 

3-2. Cosmos DB の Database および Container を作成します。
最初に Cosmos DB の接続情報を指定し、データベース名、コンテナを作成します。

>>> # Define connection information, Database name and Container/Collection name.
>>> cosmosEndpoint = "https://COSMOSDBACCOUNT.documents.azure.com:443/"
>>> cosmosMasterKey = "COSMOSDBACCOUNTKEY"
>>> cosmosDatabaseName = "testdb"
>>> cosmosContainerName = "data"
>>> 
>>> cfg3 = {
...   "spark.cosmos.accountEndpoint" : cosmosEndpoint,
...   "spark.cosmos.accountKey" : cosmosMasterKey,
...   "spark.cosmos.database" : cosmosDatabaseName,
...   "spark.cosmos.container" : cosmosContainerName,
... }
>>> 
>>> # Configure Catalog Api to be used
>>> spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
>>> spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
>>> spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)
>>> 
>>> # create a cosmos database using catalog api
>>> spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))
DataFrame[]
>>> 
>>> # create a cosmos container/collection using catalog api
>>> spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/num', manualThroughput = '400')".format(cosmosDatabaseName, cosmosContainerName))
DataFrame[]

3-3. Container にデータを入れます。

>>> # insert records into the
>>> spark.createDataFrame((("10","A1", "cat", 2, True), ("20","A2","dog", 4, False)))\
...   .toDF("id","num","name","age","isAlive") \
...    .write\
...    .format("cosmos.oltp")\
...    .options(**cfg3)\
...    .mode("APPEND")\
...    .save()

(結果)

3-4. Container のデータを読み込み出力します。また Container のスキーマーも確認します。Container のデータを読み込みは、2 通りあり、自動的にスキーマを認識する ”spark.cosmos.read.inferSchema.enabled” を true にした方法と、spark.read.schema により手動でスキーマを指定する方法を記載しています。

>>> # show schema and records
>>> df = spark.read.format("cosmos.oltp").options(**cfg3)\
...  .option("spark.cosmos.read.inferSchema.enabled", "true")\
...  .load()
>>> df.printSchema()
root
 |-- name: string (nullable = true)
 |-- isAlive: boolean (nullable = true)
 |-- age: integer (nullable = true)
 |-- num: string (nullable = true)
 |-- id: string (nullable = false)

>>> df.show()
2021-10-19 22:33:45,387 WARN util.CaseInsensitiveStringMap: Converting duplicated key spark.cosmos.read.inferSchema.enabled into CaseInsensitiveStringMap.
2021-10-19 22:33:45,387 WARN util.CaseInsensitiveStringMap: Converting duplicated key spark.cosmos.accountkey into CaseInsensitiveStringMap.
2021-10-19 22:33:45,388 WARN util.CaseInsensitiveStringMap: Converting duplicated key spark.cosmos.accountendpoint into CaseInsensitiveStringMap.
+----+-------+---+---+---+                                                      
|name|isAlive|age|num| id|
+----+-------+---+---+---+
| dog|  false|  4| A2| 20|
| cat|   true|  2| A1| 10|
+----+-------+---+---+---+

>>> # After defining a schema in pyspark, show the schema
>>> from pyspark.sql.types import *
>>> schema = StructType([
...       StructField("id", StringType()),
...       StructField("num", StringType()),
...       StructField("name", StringType()),
...       StructField("type", StringType()),
...       StructField("age", IntegerType()),
...       StructField("isAlive", BooleanType())
...     ])
>>> 
>>> df = spark.read.schema(schema).format("cosmos.oltp").options(**cfg3)\
...  .load()
>>> df.printSchema()
root
 |-- id: string (nullable = true)
 |-- num: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- isAlive: boolean (nullable = true)

3-5. 読み込んだデータを parquet ファイルに保存します。保存後、hdfs dfs -ls のコマンドで保存した parquet ファイルを確認します。

>>> df1.write.mode("overwrite").parquet("appbox/df1cosmos")
2021-10-19 23:34:03,274 WARN util.CaseInsensitiveStringMap: Converting duplicated key spark.cosmos.accountkey into CaseInsensitiveStringMap.
2021-10-19 23:34:03,274 WARN util.CaseInsensitiveStringMap: Converting duplicated key spark.cosmos.accountendpoint into CaseInsensitiveStringMap.
>>>                                                                             
>>> spark.read.parquet("appbox/df1cosmos").show()
+---+---+----+----+---+-------+
| id|num|name|type|age|isAlive|
+---+---+----+----+---+-------+
| 20| A2| dog|null|  4|  false|
| 10| A1| cat|null|  2|   true|
+---+---+----+----+---+-------+

別途 Terminal を立ち上げて、hdfs dfs -ls のコマンドで保存した parquet ファイルを確認します。

$ hdfs dfs -ls appbox/df1cosmos
Found 3 items
-rw-r--r--   1 hadoop supergroup          0 2021-10-19 23:34 appbox/df1cosmos/_SUCCESS
-rw-r--r--   1 hadoop supergroup       1332 2021-10-19 23:34 appbox/df1cosmos/part-00000-349aca57-60f9-4fb8-abe7-c20e483c52b6-c000.snappy.parquet
-rw-r--r--   1 hadoop supergroup       1332 2021-10-19 23:34 appbox/df1cosmos/part-00001-349aca57-60f9-4fb8-abe7-c20e483c52b6-c000.snappy.parquet

3-6. 最後に、Spark UI で Spark のジョブ実行履歴を確認します。

————————————
▼4. 参考情報
————————————
(1) Quickstart – Manage data with Azure Cosmos DB Spark 3 OLTP Connector for SQL API | Microsoft Docs
(2) GitHub – Azure/azure-cosmosdb-spark: Apache Spark Connector for Azure Cosmos DB
(3) azure-sdk-for-java/configuration-reference.md at main · Azure/azure-sdk-for-java · GitHub

以上です。参考になりましたら幸いです。



コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です