自己投資としてチャレンジしている内容を Blog で公開しています。今回は Apache Spark の環境で利用できる PySpark を利用し Azure SQL Database へ接続後、クエリを実行し結果を CSV ファイルへ保存する方法を紹介します。
————————————
▼1. pyodbc を利用した Azure SQL Database の接続
————————————
Azure SQL Database (SQL DB) への接続には Python SQL Driver である pyodbc を利用します。
今回案内する手順は、Apach Spark 上での PySpark を利用した実装となりますが、python が利用できる環境であれば同様の pyodbc を利用する手順で SQL DB へ接続することも可能です。
————————————
▼2. 事前準備
————————————
2-1. Apache Spark クラスターを作成します。詳細は以前 blog で紹介した下記サイトを参照ください。Apache Spark インストール – 3 ノード No.29
————————————
▼3. PySpark を利用し Azure SQL Database へ接続後、クエリを実行し結果を CSV ファイルへ保存する方法
————————————
| 3-1. SQL DB への接続に必要な Driver をインストールします。 3-2. PySpark の起動します。 3-3. SQL DB に接続し Data を select 後、結果を csv ファイルとして hdfs に保存します。 3-4. hdfs に保存したデータファイルを確認します。 |
3-1. SQL DB への接続に必要な Driver をインストールします。
———–
a) Microsoft ODBC Driver for SQL Server on Linux をインストール
b) pyodbc のインストール (pandas もインストール)
———–
a) Microsoft ODBC Driver for SQL Server on Linux をインストール
こちらのドキュメントを参考に Microsoft ODBC Driver for SQL Server on Linux をインストールします。
#Install curl command
sudo apt update
sudo apt install curl
curl --version
#Install Microsoft ODBC Driver for SQL Server on Linux
sudo su
curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
#Ubuntu 20.04
curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
exit
sudo apt-get update
sudo ACCEPT_EULA=Y apt-get install -y msodbcsql17
# optional, not mandatory: for bcp and sqlcmd
sudo ACCEPT_EULA=Y apt-get install -y mssql-tools
echo 'export PATH="$PATH:/opt/mssql-tools/bin"' >> ~/.bashrc
source ~/.bashrc
# optional, not mandatory: for unixODBC development headers
sudo apt-get install -y unixodbc-devb) pyodbc のインストール (pandas もインストール)
pyodbc をインストールし、インストールされた pyodbc のバージョンを確認します。
sudo -H pip install pyodbc
pip show pyodbcSQL DB から取得したデータを DataFrame に入れるため pandas もインストールします。
sudo apt-get update
sudo apt-get install python3-pip
sudo python3 -m pip install pandasPandas の詳細はこちらを参照ください。Getting started — pandas 1.3.3 documentation (pydata.org)
3-2. PySpark の起動します。
pyspark
(標準出力例)
Python 3.8.10 (default, Jun 2 2021, 10:49:15)
[GCC 9.4.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
2021-10-xx 12:22:30,989 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2021-10-xx 12:22:34,184 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.1.2
/_/
Using Python version 3.8.10 (default, Jun 2 2021 10:49:15)
Spark context Web UI available at http://masternodetsu:4041
Spark context available as 'sc' (master = local[*], app id = local-xxxx).
SparkSession available as 'spark'.3-3. SQL DB に接続し Data を select 後、結果を csv ファイルとして hdfs に保存します。
>>># Connect to SQL DB and select data from the table.
>>> import sys
>>> sys.path.append('/usr/local/lib/python3.8/dist-packages')
>>> import pyodbc
>>> cnxn=pyodbc.connect("Driver={ODBC Driver 17 for SQL Server};Server=tcp:xxx.database.windows.net,1433;Database=dbname;Uid=username;Pwd=password")
>>> cursor=cnxn.cursor()
>>> cursor.execute("select * from [dbo].[tableb]")
<pyodbc.Cursor object at 0x7exxxxx0>
>>> rows=cursor.fetchall()
>>> print(rows)
[('Travase', '7'), ('Ken', '8'), ('Haru', '12'), ('Hugo', '22')]
>>># Insert a new record and confirm the number of the inserted records
>>> count=cursor.execute("""insert into [dbo].[tableb] (LastName,id) values(?,?)""",'Nancy',30).rowcount
>>> cnxn.commit()
>>> print('Rows inserted: ' + str(count))
Rows inserted: 1
>>># show records again.
>>> rows=cursor.fetchall()
>>> print(rows)
[('Ken', '8'), ('Haru', '12'), ('Hugo', '22'), ('Nancy', '30')]
>>># create the dataframe using pandas library
>>> import pandas as pd
>>> query=f"select * from [dbo].[tableb]"
>>> pdf=pd.read_sql(query,cnxn)
>>> sparkdf = spark.createDataFrame(pdf)
>>> sparkdf.show()
+--------+---+
|LastName| id|
+--------+---+
| Travase| 7|
| Ken| 8|
| Haru| 12|
| Hugo| 22|
| Nancy| 30|
+--------+---+
>>># save data into the csv file in hdfs path
>>> sparkdf.write.format("csv").save("appbox/sparkdf",header="true")
>>> spark.read.option("header","true").csv("appbox/sparkdf").show()
+--------+---+
|LastName| id|
+--------+---+
| Travase| 7|
| Ken| 8|
| Haru| 12|
| Hugo| 22|
| Nancy| 30|
+--------+---+3-4. hdfs に保存したデータファイルを確認します。
$ hdfs dfs -ls appbox/sparkdf
Found 2 items
-rw-r--r-- 1 hadoop supergroup 0 2021-10-10 23:37 appbox/sparkdf/_SUCCESS
-rw-r--r-- 1 hadoop supergroup 53 2021-10-10 23:37 appbox/sparkdf/part-00000-27c7d2c1-e668-4bb8-98df-f046a580cce1-c000.csv3-5. 最後に、上記 Spark のジョブ実行履歴は以下のように 4040 もしくは 4041 ポートの Spark UI で確認できます。

————————————
▼4. 参考情報
————————————
(1) Azure 無料アカウント FAQ | Microsoft Azure
(2) Step 1: Configure pyodbc Python environment – Python driver for SQL Server | Microsoft Docs
(3) Install the Microsoft ODBC driver for SQL Server (Linux) – ODBC Driver for SQL Server | Microsoft Docs
(4) Step 3: Connecting to SQL using pyodbc – Python driver for SQL Server | Microsoft Docs
以上です。参考になりましたら幸いです。