PySpark Delta Table の利用 (Spark cluster 3 nodes) No.41

自己投資の一つとしてチャレンジしている Programming の Java や Python の独習状況を Blog で公開しています。今回は Apache Spark の環境で利用できる PySpark で Delata Lake を使ってみます。

————————————
▼1. Delta Lake とは
————————————
Delta Lake は Lakehouse を実装するオープンソースです。Lakehouse は以下の特徴を持っています。Azure Synapse Analytics, Databricks, BigQuery, Redshift Spectrum などでも Lakehouse の機能が利用できます。

 -Transaction support: トランザクションのサポート。
 -Schema enforcement and governance: スキーマの適用やスキーマの展開
 - BI support: BI ツールの利用
 - Storage is decoupled from compute: ストレージと Compute との分離
 - Openness: 標準化されたストレージフォーマットの利用
 -Support for diverse data types ranging from unstructured to structured data:
画像、ビデオ、オーディオ、半構造化データやテキストの処理
 -Support for diverse workloads: マシンラーニング、SQL や 分析などの利用
 -End-to-end streaming: リアルタイムのレポート

————————————
▼2. 事前準備
————————————
2-1. Apache Spark クラスターを作成します。詳細は以前 blog で紹介した下記サイトを参照ください。Apache Spark インストール – 3 ノード No.29

————————————
▼3. Delta Lake を使った Spark 処理
————————————
以下の順に進めます。
————–
3-1. Delta Lake パッケージを使って pyspark を起動します。
3-2. Delta Table の作成、データの読み込み、データの更新、削除、Upsert (Merge), Time Travel を利用した古いデータの参照など紹介します。
3-3. Streaming Data の書き込み、読み込みについて紹介します。
3-4. Delta Lake がサポートする Table に対する Batch read や Batch write を紹介します。
————–

3-1. Delta Lake パッケージを使って pyspark を起動します。
参考情報: Quickstart — Delta Lake Documentation

(例)
$ pyspark --packages io.delta:delta-core_2.12:1.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Python 3.8.10 (default, Sep 28 2021, 16:10:42) 
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
:: loading settings :: url = jar:file:/home/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/hadoop/.ivy2/cache
The jars for the packages stored in: /home/hadoop/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1661901a-c468-4c0c-a329-b55cab6eef17;1.0
	confs: [default]
	found io.delta#delta-core_2.12;1.0.0 in central
	found org.antlr#antlr4;4.7 in central
	found org.antlr#antlr4-runtime;4.7 in central
	found org.antlr#antlr-runtime;3.5.2 in central
	found org.antlr#ST4;4.0.8 in central
	found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
	found org.glassfish#javax.json;1.0.4 in central
	found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 832ms :: artifacts dl 35ms
	:: modules in use:
	com.ibm.icu#icu4j;58.2 from central in [default]
	io.delta#delta-core_2.12;1.0.0 from central in [default]
	org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
	org.antlr#ST4;4.0.8 from central in [default]
	org.antlr#antlr-runtime;3.5.2 from central in [default]
	org.antlr#antlr4;4.7 from central in [default]
	org.antlr#antlr4-runtime;4.7 from central in [default]
	org.glassfish#javax.json;1.0.4 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   8   |   0   |   0   |   0   ||   8   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-1661901a-c468-4c0c-a329-b55cab6eef17
	confs: [default]
	0 artifacts copied, 8 already retrieved (0kB/50ms)
2021-10-26 11:22:24,546 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.1.2
      /_/

Using Python version 3.8.10 (default, Sep 28 2021 16:10:42)
Spark context Web UI available at http://xxxx:4040
Spark context available as 'sc' (master = local[*], app id = local-1635214948684).
SparkSession available as 'spark'.
>>> 

3-2. Delta Table の作成、データの読み込み、データの更新、削除、Upsert (Merge), Time Travel を利用した古いデータの参照など紹介します。

(例)
# Create a table
data = spark.range(0, 5)
data.write.format("delta").save("appbox/deltat211026")


# Read data
df = spark.read.format("delta").load("appbox/deltat211026")
df.show()

+---+                                                                           
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+

# Update table data
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("appbox/deltat211026")
df.show()

+---+                                                                           
| id|
+---+
|  5|
|  6|
|  7|
|  8|
|  9|
+---+

# Conditional update without overwrite
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "appbox/deltat211026")


# Update every even value by adding 100 to it
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })
deltaTable.toDF().show()

+---+                                                                           
| id|
+---+
|  5|
|106|
|  7|
|108|
|  9|
+---+

# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF().show()

+---+                                                                           
| id|
+---+
|  5|
|  7|
|  9|
+---+

# Upsert (merge) new data
newData = spark.range(0, 10)
deltaTable.alias("oldData") \
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \
  .execute()
deltaTable.toDF().show()

+---+
| id|
+---+
|  1|
|  7|
|  8|
|  5|
|  3|
|  2|
|  9|
|  4|
|  6|
|  0|
+---+

# Read older versions of data using time travel
df = spark.read.format("delta").option("versionAsOf", 0).load("appbox/deltat211026")
df.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+

33. Streaming Data の書き込み、読み込みについて紹介します。

(例)
# Write a stream of data to a table
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "appbox/chk211026").start("appbox/deltat211026")

# Read a stream of changes from a table in another terminal
stream2 = spark.readStream.format("delta").load("appbox/deltat211026").writeStream.format("console").start()


[Stage 0:>                                                          (0 + 1) / 2
[Stage 0:>                                                          (0 + 2) / 2
[Stage 0:=============================>                             (1 + 1) / 2
[Stage 1:>                                                         (0 + 1) / 50
[Stage 1:=>                                                        (1 + 1) / 50
[Stage 1:==>                                                       (2 + 1) / 50

-------------------------------------------
Batch: 0
-------------------------------------------
+---+
| id|
+---+
|117|
|118|
|119|
|120|
|121|
|122|
|123|
|124|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 52|
| 53|
| 54|
| 55|
| 56|
+---+
only showing top 20 rows

-------------------------------------------
Batch: 1
-------------------------------------------
+---+
| id|
+---+
|132|
|133|
xxxx
xxxx

34. Delta Lake がサポートする、Table に対する Batch read や Batch write を紹介します。

(例)
# Create Database
spark.sql("create Database delta")
spark.sql("show databases").show()

+---------+
|namespace|
+---------+
|  default|
|    delta|
+---------+

# Delta Lake supports creating two types of tables
# -1 tables defined in the metastore
# -2 tables defined by path
# Create a table using SQL DDL Commands

from pyspark.sql.types import *
spark.sql("CREATE OR REPLACE TABLE delta.events2110262 (\
  date DATE,\
  eventId STRING,\
  eventType STRING,\
  data STRING)\
USING DELTA")

spark.sql("use delta")
spark.sql("show tables").show()

+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
|   delta|events2110262|      false|
+--------+-------------+-----------+

# Create table in the metastore using DataFrame's schema and write data to it
df.write.format("delta").saveAsTable("events211026mg")

# Create or replace table with path using DataFrame's schema and write/overwrite data to it
df.write.format("delta").mode("overwrite").save("appbox/events211026mg")
df.show()

+---+                                                                           
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+

# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
    .tableName("event21a") \
    .addColumn("date", DateType()) \
    .addColumn("eventId", "STRING") \
    .addColumn("eventType", StringType()) \
    .addColumn("data", "STRING", comment = "event data") \
    .execute()

# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
    .addColumn("date", DateType()) \
    .addColumn("eventId", "STRING") \
    .addColumn("eventType", StringType()) \
    .addColumn("data", "STRING", comment = "event data") \
    .property("description", "table with event data") \
    .location("events211026mg") \
    .execute()

# Create partition data, managed table
spark.sql("CREATE TABLE delta.events2110263 (\
  date DATE,\
  eventId STRING,\
  eventType STRING,\
  data STRING) USING DELTA PARTITIONED BY (date);")

# Control data location, Create unmanaged table
spark.sql("CREATE TABLE deltaevents( \
  date DATE,\
  eventId STRING,\
  eventType STRING,\
  data STRING)\
USING DELTA LOCATION 'appbox/deltaevents';")

spark.sql("show tables").show()

+--------+--------------+-----------+
|database|     tableName|isTemporary|
+--------+--------------+-----------+
|   delta|   deltaevents|      false|
|   delta|      event21a|      false|
|   delta| events2110262|      false|
|   delta| events2110263|      false|
|   delta|events211026mg|      false|
+--------+--------------+-----------+

# Read a table
spark.table("delta.events2110263")    # query table in the metastore
DataFrame[date: date, eventId: string, eventType: string, data: string]

spark.read.format("delta").load("appbox/deltaevents")  # query table by path
DataFrame[date: date, eventId: string, eventType: string, data: string]

timestamp_string='2021-10-26 13:36:32.215'
df1 = spark.read.format("delta").option("timestampAsOf",timestamp_string).load("appbox/deltaevents")
df1.show()

+----+-------+---------+----+                                                   
|date|eventId|eventType|data|
+----+-------+---------+----+
+----+-------+---------+----+

version=0
df2 = spark.read.format("delta").option("versionAsOf", version).load("appbox/deltaevents")
df2.show()

+----+-------+---------+----+                                                   
|date|eventId|eventType|data|
+----+-------+---------+----+
+----+-------+---------+----+

# Write to a table about append
df.write.format("delta").option("mergeSchema","true").mode("append").save("appbox/deltaevents")
spark.read.format("delta").load("appbox/deltaevents").show()

+----+-------+---------+----+---+                                               
|date|eventId|eventType|data| id|
+----+-------+---------+----+---+
|null|   null|     null|null|  0|
|null|   null|     null|null|  1|
|null|   null|     null|null|  2|
|null|   null|     null|null|  3|
|null|   null|     null|null|  4|
+----+-------+---------+----+---+

df.write.format("delta").mode("append").saveAsTable("deltaeventst")
spark.sql("select * from deltaeventst").show()

+---+                                                                           
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+

# print schema
spark.sql("select * from deltaeventst").printSchema()

Root
 |-- id: long (nullable = true)

# Overwrite
df.write.format("delta").option("mergeSchema","true").mode("overwrite").save("appbox/deltaevents")

————————————
▼4. 参考情報
————————————
(1) Quickstart — Delta “Lake Documentation
(2) What Is a Lakehouse? – The Databricks Blog

以上です。参考になりましたら幸いです。



コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です