自己投資の一つとしてチャレンジしている Programming の Java や Python の独習状況を Blog で公開しています。今回は Apache Spark の環境で利用できる PySpark で Delata Lake を使ってみます。
————————————
▼1. Delta Lake とは
————————————
Delta Lake は Lakehouse を実装するオープンソースです。Lakehouse は以下の特徴を持っています。Azure Synapse Analytics, Databricks, BigQuery, Redshift Spectrum などでも Lakehouse の機能が利用できます。
-Transaction support: トランザクションのサポート。
-Schema enforcement and governance: スキーマの適用やスキーマの展開
- BI support: BI ツールの利用
- Storage is decoupled from compute: ストレージと Compute との分離
- Openness: 標準化されたストレージフォーマットの利用
-Support for diverse data types ranging from unstructured to structured data:
画像、ビデオ、オーディオ、半構造化データやテキストの処理
-Support for diverse workloads: マシンラーニング、SQL や 分析などの利用
-End-to-end streaming: リアルタイムのレポート
————————————
▼2. 事前準備
————————————
2-1. Apache Spark クラスターを作成します。詳細は以前 blog で紹介した下記サイトを参照ください。Apache Spark インストール – 3 ノード No.29
————————————
▼3. Delta Lake を使った Spark 処理
————————————
以下の順に進めます。
————–
3-1. Delta Lake パッケージを使って pyspark を起動します。
3-2. Delta Table の作成、データの読み込み、データの更新、削除、Upsert (Merge), Time Travel を利用した古いデータの参照など紹介します。
3-3. Streaming Data の書き込み、読み込みについて紹介します。
3-4. Delta Lake がサポートする Table に対する Batch read や Batch write を紹介します。
————–
3-1. Delta Lake パッケージを使って pyspark を起動します。
参考情報: Quickstart — Delta Lake Documentation
(例)
$ pyspark --packages io.delta:delta-core_2.12:1.0.0 --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
Python 3.8.10 (default, Sep 28 2021, 16:10:42)
[GCC 9.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
:: loading settings :: url = jar:file:/home/hadoop/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/hadoop/.ivy2/cache
The jars for the packages stored in: /home/hadoop/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1661901a-c468-4c0c-a329-b55cab6eef17;1.0
confs: [default]
found io.delta#delta-core_2.12;1.0.0 in central
found org.antlr#antlr4;4.7 in central
found org.antlr#antlr4-runtime;4.7 in central
found org.antlr#antlr-runtime;3.5.2 in central
found org.antlr#ST4;4.0.8 in central
found org.abego.treelayout#org.abego.treelayout.core;1.0.3 in central
found org.glassfish#javax.json;1.0.4 in central
found com.ibm.icu#icu4j;58.2 in central
:: resolution report :: resolve 832ms :: artifacts dl 35ms
:: modules in use:
com.ibm.icu#icu4j;58.2 from central in [default]
io.delta#delta-core_2.12;1.0.0 from central in [default]
org.abego.treelayout#org.abego.treelayout.core;1.0.3 from central in [default]
org.antlr#ST4;4.0.8 from central in [default]
org.antlr#antlr-runtime;3.5.2 from central in [default]
org.antlr#antlr4;4.7 from central in [default]
org.antlr#antlr4-runtime;4.7 from central in [default]
org.glassfish#javax.json;1.0.4 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 8 | 0 | 0 | 0 || 8 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-1661901a-c468-4c0c-a329-b55cab6eef17
confs: [default]
0 artifacts copied, 8 already retrieved (0kB/50ms)
2021-10-26 11:22:24,546 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Python version 3.8.10 (default, Sep 28 2021 16:10:42)
Spark context Web UI available at http://xxxx:4040
Spark context available as 'sc' (master = local[*], app id = local-1635214948684).
SparkSession available as 'spark'.
>>> 3-2. Delta Table の作成、データの読み込み、データの更新、削除、Upsert (Merge), Time Travel を利用した古いデータの参照など紹介します。
(例)
# Create a table
data = spark.range(0, 5)
data.write.format("delta").save("appbox/deltat211026")
# Read data
df = spark.read.format("delta").load("appbox/deltat211026")
df.show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
# Update table data
data = spark.range(5, 10)
data.write.format("delta").mode("overwrite").save("appbox/deltat211026")
df.show()
+---+
| id|
+---+
| 5|
| 6|
| 7|
| 8|
| 9|
+---+
# Conditional update without overwrite
from delta.tables import *
from pyspark.sql.functions import *
deltaTable = DeltaTable.forPath(spark, "appbox/deltat211026")
# Update every even value by adding 100 to it
deltaTable.update(
condition = expr("id % 2 == 0"),
set = { "id": expr("id + 100") })
deltaTable.toDF().show()
+---+
| id|
+---+
| 5|
|106|
| 7|
|108|
| 9|
+---+
# Delete every even value
deltaTable.delete(condition = expr("id % 2 == 0"))
deltaTable.toDF().show()
+---+
| id|
+---+
| 5|
| 7|
| 9|
+---+
# Upsert (merge) new data
newData = spark.range(0, 10)
deltaTable.alias("oldData") \
.merge(
newData.alias("newData"),
"oldData.id = newData.id") \
.whenMatchedUpdate(set = { "id": col("newData.id") }) \
.whenNotMatchedInsert(values = { "id": col("newData.id") }) \
.execute()
deltaTable.toDF().show()
+---+
| id|
+---+
| 1|
| 7|
| 8|
| 5|
| 3|
| 2|
| 9|
| 4|
| 6|
| 0|
+---+
# Read older versions of data using time travel
df = spark.read.format("delta").option("versionAsOf", 0).load("appbox/deltat211026")
df.show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+3–3. Streaming Data の書き込み、読み込みについて紹介します。
(例)
# Write a stream of data to a table
streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "appbox/chk211026").start("appbox/deltat211026")
# Read a stream of changes from a table in another terminal
stream2 = spark.readStream.format("delta").load("appbox/deltat211026").writeStream.format("console").start()
[Stage 0:> (0 + 1) / 2
[Stage 0:> (0 + 2) / 2
[Stage 0:=============================> (1 + 1) / 2
[Stage 1:> (0 + 1) / 50
[Stage 1:=> (1 + 1) / 50
[Stage 1:==> (2 + 1) / 50
-------------------------------------------
Batch: 0
-------------------------------------------
+---+
| id|
+---+
|117|
|118|
|119|
|120|
|121|
|122|
|123|
|124|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 52|
| 53|
| 54|
| 55|
| 56|
+---+
only showing top 20 rows
-------------------------------------------
Batch: 1
-------------------------------------------
+---+
| id|
+---+
|132|
|133|
xxxx
xxxx
3–4. Delta Lake がサポートする、Table に対する Batch read や Batch write を紹介します。
(例)
# Create Database
spark.sql("create Database delta")
spark.sql("show databases").show()
+---------+
|namespace|
+---------+
| default|
| delta|
+---------+
# Delta Lake supports creating two types of tables
# -1 tables defined in the metastore
# -2 tables defined by path
# Create a table using SQL DDL Commands
from pyspark.sql.types import *
spark.sql("CREATE OR REPLACE TABLE delta.events2110262 (\
date DATE,\
eventId STRING,\
eventType STRING,\
data STRING)\
USING DELTA")
spark.sql("use delta")
spark.sql("show tables").show()
+--------+-------------+-----------+
|database| tableName|isTemporary|
+--------+-------------+-----------+
| delta|events2110262| false|
+--------+-------------+-----------+
# Create table in the metastore using DataFrame's schema and write data to it
df.write.format("delta").saveAsTable("events211026mg")
# Create or replace table with path using DataFrame's schema and write/overwrite data to it
df.write.format("delta").mode("overwrite").save("appbox/events211026mg")
df.show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
# Create table in the metastore
DeltaTable.createIfNotExists(spark) \
.tableName("event21a") \
.addColumn("date", DateType()) \
.addColumn("eventId", "STRING") \
.addColumn("eventType", StringType()) \
.addColumn("data", "STRING", comment = "event data") \
.execute()
# Create or replace table with path and add properties
DeltaTable.createOrReplace(spark) \
.addColumn("date", DateType()) \
.addColumn("eventId", "STRING") \
.addColumn("eventType", StringType()) \
.addColumn("data", "STRING", comment = "event data") \
.property("description", "table with event data") \
.location("events211026mg") \
.execute()
# Create partition data, managed table
spark.sql("CREATE TABLE delta.events2110263 (\
date DATE,\
eventId STRING,\
eventType STRING,\
data STRING) USING DELTA PARTITIONED BY (date);")
# Control data location, Create unmanaged table
spark.sql("CREATE TABLE deltaevents( \
date DATE,\
eventId STRING,\
eventType STRING,\
data STRING)\
USING DELTA LOCATION 'appbox/deltaevents';")
spark.sql("show tables").show()
+--------+--------------+-----------+
|database| tableName|isTemporary|
+--------+--------------+-----------+
| delta| deltaevents| false|
| delta| event21a| false|
| delta| events2110262| false|
| delta| events2110263| false|
| delta|events211026mg| false|
+--------+--------------+-----------+
# Read a table
spark.table("delta.events2110263") # query table in the metastore
DataFrame[date: date, eventId: string, eventType: string, data: string]
spark.read.format("delta").load("appbox/deltaevents") # query table by path
DataFrame[date: date, eventId: string, eventType: string, data: string]
timestamp_string='2021-10-26 13:36:32.215'
df1 = spark.read.format("delta").option("timestampAsOf",timestamp_string).load("appbox/deltaevents")
df1.show()
+----+-------+---------+----+
|date|eventId|eventType|data|
+----+-------+---------+----+
+----+-------+---------+----+
version=0
df2 = spark.read.format("delta").option("versionAsOf", version).load("appbox/deltaevents")
df2.show()
+----+-------+---------+----+
|date|eventId|eventType|data|
+----+-------+---------+----+
+----+-------+---------+----+
# Write to a table about append
df.write.format("delta").option("mergeSchema","true").mode("append").save("appbox/deltaevents")
spark.read.format("delta").load("appbox/deltaevents").show()
+----+-------+---------+----+---+
|date|eventId|eventType|data| id|
+----+-------+---------+----+---+
|null| null| null|null| 0|
|null| null| null|null| 1|
|null| null| null|null| 2|
|null| null| null|null| 3|
|null| null| null|null| 4|
+----+-------+---------+----+---+
df.write.format("delta").mode("append").saveAsTable("deltaeventst")
spark.sql("select * from deltaeventst").show()
+---+
| id|
+---+
| 0|
| 1|
| 2|
| 3|
| 4|
+---+
# print schema
spark.sql("select * from deltaeventst").printSchema()
Root
|-- id: long (nullable = true)
# Overwrite
df.write.format("delta").option("mergeSchema","true").mode("overwrite").save("appbox/deltaevents")————————————
▼4. 参考情報
————————————
(1) Quickstart — Delta “Lake Documentation
(2) What Is a Lakehouse? – The Databricks Blog
以上です。参考になりましたら幸いです。