自己投資としてチャレンジしている内容を Blog で公開しています。今回は Python の並列処理、 Threading について紹介します。
————————————
▼1. Threading は何のために利用するのか?
————————————
Python でデータを処理する際、直列で処理してもいいですが、データロードなどを含む待ち時間がある処理は並列に進めた方が効率的です。そのような場合に Threading を利用します。
————————————
▼2. 事前準備
————————————
2-1. Python が実行できる環境を用意します。以下参考情報です。
(1) Python – Visual Studio Code の利用 No.34
(2) Apache Spark インストール – 3 ノード No.29
———————————–
▼3. 今回紹介する並列実行のパターン
————————————
並列実行の動作を視覚化するために以下の図を表示し、さらに Thread identifier を追記して並列処理の動作を確認します。
3-1. 並列実行状況を表す図
Let's draw a picture ----- ---------- --------------- -------------------- ------------------------- ------------------------------ ----------------------------------- ---------------------------------------- --------------------------------------------- --------------------------------------------- ---------------------------------------- ----------------------------------- ------------------------------ ------------------------- -------------------- --------------- ---------- -----
以降では、並列実行可能な以下のモジュールを紹介します。
—————————————–
(A). Threading を利用
A-1. Daemon を利用しない
A-2. Daemon を利用する
(B). Join を利用
(C). ThreadPoolExecutor を利用
(D). ProcessPoolExecutor を利用
—————————————–
(A) Threading を利用する場合
A-1, Daemon を利用しない場合
すべての処理が実行され完了しています。上記 3.1 の並列実行状況を表す図 が描けて います。 Threading.Thread の option daemon が False の場合の動作となります。
//thread_daemonf.py
import time
import threading
import os
def lineone(num):
msg="-"
msgplus=str(threading.current_thread())+"-"*num
print(msgplus)
time.sleep(10/num)
print(msgplus)
print("Let's draw a picture")
if __name__ == '__main__':
start = time.time()
for j in range(5,50,5):
t = threading.Thread(target=lineone,args=(j,),daemon=False)
t.start()
duration_time = time.time() - start
print("Duration time: {0}",format(duration_time) + "[sec]") 実行結果は以下となります。9 つの thread が利用されています。Main 関数は最後まで処理が進み実行時間が表示された後、非同期で動作した lineone 関数の2番めの print が継続されていることが分かります。
//thread_daemonf_out.txt
Let's draw a picture
<Thread(Thread-1, started 140217379768064)>-----
<Thread(Thread-2, started 140217371375360)>----------
<Thread(Thread-3, started 140217362982656)>---------------
<Thread(Thread-4, started 140217354589952)>--------------------
<Thread(Thread-5, started 140217346197248)>-------------------------
<Thread(Thread-6, started 140217337804544)>------------------------------
<Thread(Thread-7, started 140216992003840)>-----------------------------------
<Thread(Thread-8, started 140216983611136)>----------------------------------------
<Thread(Thread-9, started 140216975218432)>---------------------------------------------
Duration time: {0} 0.0060689449310302734[sec]
<Thread(Thread-9, started 140216975218432)>---------------------------------------------
<Thread(Thread-8, started 140216983611136)>----------------------------------------
<Thread(Thread-7, started 140216992003840)>-----------------------------------
<Thread(Thread-6, started 140217337804544)>------------------------------
<Thread(Thread-5, started 140217346197248)>-------------------------
<Thread(Thread-4, started 140217354589952)>--------------------
<Thread(Thread-3, started 140217362982656)>---------------
<Thread(Thread-2, started 140217371375360)>----------
<Thread(Thread-1, started 140217379768064)>-----A-2, Daemon を利用する場合
上記 3.1 の並列実行状況を表す図 が描けておらず、途中で処理が終わっている事がわかります。Threading.Thread の option daemon が True の場合の動作となります。
//thread_daemont.py
import time
import threading
import os
def lineone(num):
msg="-"
msgplus=str(threading.current_thread())+"-"*num
#msgplus="-"*num
print(msgplus)
time.sleep(10/num)
print(msgplus)
print("Let's draw a picture")
if __name__ == '__main__':
start = time.time()
for j in range(5,50,5):
t = threading.Thread(target=lineone,args=(j,),daemon=True)
t.start()
duration_time = time.time() - start
print("Duration time: {0}",format(duration_time) + "[sec]") 実行結果は以下となります。9 つの thread が利用されています。lineone 関数の 1 番目の print は終わり、関数は即終了しています。lineone の関数の2番めの print は継続して実行されていません。最初の print が終わった段階で後続の処理を待たず、lineone の関数が終了しています。Main 関数は最後まで処理が進み実行時間が表示され処理は完了しています。
//thread_daemont_out.txt
Let's draw a picture
<Thread(Thread-1, started daemon 139680454702848)>-----
<Thread(Thread-2, started daemon 139680446310144)>----------
<Thread(Thread-3, started daemon 139680437917440)>---------------
<Thread(Thread-4, started daemon 139680429524736)>--------------------
<Thread(Thread-5, started daemon 139680421132032)>-------------------------
<Thread(Thread-6, started daemon 139680412739328)>------------------------------
<Thread(Thread-7, started daemon 139680404346624)>-----------------------------------
<Thread(Thread-8, started daemon 139679919765248)>----------------------------------------
<Thread(Thread-9, started daemon 139679911372544)>---------------------------------------------
Duration time: {0} 0.007597923278808594[sec] (B). Join を利用
すべての処理が実行され完了しています。記 3.1 の並列実行状況を表す図 が描けて います。Join を利用し、すべての Thread の完了を待ってから main 関数を終了しています。そのため実行時間計測の print も最後に表示されています。
//thread_join.py
import time
import threading
import os
def lineone(num):
msg="-"
msgplus=str(threading.current_thread())+"#"+str(threading.get_ident())+"-"*num
print(msgplus)
time.sleep(10/num)
print(msgplus)
print("Let's draw a picture")
if __name__ == '__main__':
start = time.time()
threads=[]
for j in range(5,50,5):
t = threading.Thread(target=lineone,args=(j,),daemon=False)
t.start()
threads.append(t)
for thread in threads:
thread.join()
duration_time = time.time() - start
print("Duration_time:{0}".format(duration_time) + "[sec]") 実行結果は以下となります。
//thread_join_out.txt(抜粋) Let's draw a picture <Thread(Thread-1, started 140610526525184)>#xx----- <Thread(Thread-2, started 140610518132480)>#xx---------- <Thread(Thread-3, started 140610509739776)>#xx--------------- <Thread(Thread-4, started 140610501347072)>#xx-------------------- <Thread(Thread-5, started 140610492954368)>#xx------------------------- <Thread(Thread-6, started 140610484561664)>#xx------------------------------ <Thread(Thread-7, started 140610476168960)>#xx----------------------------------- <Thread(Thread-8, started 140610467776256)>#xx---------------------------------------- <Thread(Thread-9, started 140609981511424)>#xx--------------------------------------------- <Thread(Thread-9, started 140609981511424)>#xx--------------------------------------------- <Thread(Thread-8, started 140610467776256)>#xx---------------------------------------- <Thread(Thread-7, started 140610476168960)>#xx----------------------------------- <Thread(Thread-6, started 140610484561664)>#xx------------------------------ <Thread(Thread-5, started 140610492954368)>#xx------------------------- <Thread(Thread-4, started 140610501347072)>#xx-------------------- <Thread(Thread-3, started 140610509739776)>#xx--------------- <Thread(Thread-2, started 140610518132480)>#xx---------- <Thread(Thread-1, started 140610526525184)>#xx----- Duration_time: 2.0052225589752197[sec]
(C).ThreadPoolExecutor を利用
すべての処理が実行され完了しています。ただし、上記 3.1 の並列実行状況を表す図 とは少し異なります。ThreadPool を利用し lineone の関数の実行がすべて完了してから次に進んでいます。すべての Thread の完了をま待った後 main 関数を終了しています。そのため実行時間計測の print も最後に表示されています。
//thread_tpe.py
import time
import threading
import os
import concurrent.futures
def lineone(num):
msg="-"
msgplus=str(threading.current_thread())+"#"+str(threading.get_ident())+"-"*num
print(msgplus)
time.sleep(10/num)
print(msgplus)
return
print("Let's draw a picture")
if __name__ == '__main__':
start = time.time()
for j in range(5,50,5):
with concurrent.futures.ThreadPoolExecutor(max_workers=9) as executor:
executor.submit(lineone,j)
duration_time = time.time() - start
print("Duration_time:{0}".format(duration_time) + "[sec]")
executor.shutdown()実行結果は以下となります。Thread の名前は 9 個ありますが、thread identifier は 2 種類しかありません。Lineone の関数の 2 つの print が完了してから次の処理に進んでいるため、実行時間も 5.6 秒かかっています。上記 B Join の 2 秒と比べると時間がかかっています。
//thread_tpe_out.txt (抜粋) Let's draw a picture <Thread(ThreadPoolExecutor-0_0, started daemon 140280648816384)>#xx----- <Thread(ThreadPoolExecutor-0_0, started daemon 140280648816384)>#xx----- <Thread(ThreadPoolExecutor-1_0, started daemon 140280640423680)>#xx---------- <Thread(ThreadPoolExecutor-1_0, started daemon 140280640423680)>#xx---------- <Thread(ThreadPoolExecutor-2_0, started daemon 140280648816384)>#xx--------------- <Thread(ThreadPoolExecutor-2_0, started daemon 140280648816384)>#xx--------------- <Thread(ThreadPoolExecutor-3_0, started daemon 140280640423680)>#xx-------------------- <Thread(ThreadPoolExecutor-3_0, started daemon 140280640423680)>#xx-------------------- <Thread(ThreadPoolExecutor-4_0, started daemon 140280648816384)>#xx------------------------- <Thread(ThreadPoolExecutor-4_0, started daemon 140280648816384)>#xx------------------------- <Thread(ThreadPoolExecutor-5_0, started daemon 140280640423680)>#xx------------------------------ <Thread(ThreadPoolExecutor-5_0, started daemon 140280640423680)>#xx------------------------------ <Thread(ThreadPoolExecutor-6_0, started daemon 140280648816384)>#xx----------------------------------- <Thread(ThreadPoolExecutor-6_0, started daemon 140280648816384)>#xx----------------------------------- <Thread(ThreadPoolExecutor-7_0, started daemon 140280640423680)>#xx---------------------------------------- <Thread(ThreadPoolExecutor-7_0, started daemon 140280640423680)>#xx---------------------------------------- <Thread(ThreadPoolExecutor-8_0, started daemon 140280648816384)>#xx--------------------------------------------- <Thread(ThreadPoolExecutor-8_0, started daemon 140280648816384)>#xx--------------------------------------------- Duration_time:5.6826581954956055[sec]
(D). ProcessPoolExecutor を利用
すべての処理が実行され完了しています。ただし、上記 3.1 の並列実行状況を表す図 とは少し異なります。Thread ではなく、ProcessPool を利用し、lineone の関数の実行がすべて完了してから次に進んでいます。すべての Process の完了をま待った後 main 関数を終了しています。そのため実行時間計測の print も最後に表示されています。
//thread_ppe.py
import time
import threading
import os
import concurrent.futures
def lineone(num):
msg="-"
msgplus=str(threading.current_thread())+"#"+str(threading.get_ident())+"-"*num
print(msgplus)
time.sleep(10/num)
print(msgplus)
return
print("Let's draw a picture")
if __name__ == '__main__':
start = time.time()
for j in range(5,50,5):
with concurrent.futures.ProcessPoolExecutor(max_workers=9) as executor:
executor.submit(lineone,j)
duration_time = time.time() - start
print("Duration_time:{0}".format(duration_time) + "[sec]")
executor.shutdown()実行結果は以下となります。 ProcessPool の利用のため、利用している Thread は 1 つのため、トータル 7.1秒かかっています。C の ThreadPoolExecutor よりもさらに時間がかかっています。
//thread_ppe_out.txt Let's draw a picture <_MainThread(MainThread, started 139883787577152)>#xx----- <_MainThread(MainThread, started 139883787577152)>#xx----- <_MainThread(MainThread, started 139883787577152)>#xx---------- <_MainThread(MainThread, started 139883787577152)>#xx---------- <_MainThread(MainThread, started 139883787577152)>#xx--------------- <_MainThread(MainThread, started 139883787577152)>#xx--------------- <_MainThread(MainThread, started 139883787577152)>#xx-------------------- <_MainThread(MainThread, started 139883787577152)>#xx-------------------- <_MainThread(MainThread, started 139883787577152)>#xx------------------------- <_MainThread(MainThread, started 139883787577152)>#xx------------------------- <_MainThread(MainThread, started 139883787577152)>#xx------------------------------ <_MainThread(MainThread, started 139883787577152)>#xx------------------------------ <_MainThread(MainThread, started 139883787577152)>#xx----------------------------------- <_MainThread(MainThread, started 139883787577152)>#xx----------------------------------- <_MainThread(MainThread, started 139883787577152)>#xx---------------------------------------- <_MainThread(MainThread, started 139883787577152)>#xx---------------------------------------- <_MainThread(MainThread, started 139883787577152)>#xx--------------------------------------------- <_MainThread(MainThread, started 139883787577152)>#xx--------------------------------------------- Duration_time:7.1517040729522705[sec]
————————————
▼3. 参考情報
————————————
(1) threading — Thread-based parallelism — Python 3.10.0 documentation
(2) concurrent.futures — aunching parallel tasks — Python 3.10.0 documentation
(3) Threads and threading | Microsoft Docs
以上です。参考になりましたら幸いです。