1. 線程創(chuàng)建與管理
省流:python多線程效率堪憂,想了解這方面的去看第2小節(jié)GIL,想繼續(xù)看看怎么使用的繼續(xù)接著看。
1.1 創(chuàng)建線程
Python提供了thread、threading等模塊來進(jìn)行線程的創(chuàng)建與管理,后者在線程管理能力上更進(jìn)一步,因此我們通常使用threading模塊。創(chuàng)建一個線程需要指定該線程執(zhí)行的任務(wù)(函數(shù)名)、以及該函數(shù)需要的參數(shù),示例代碼如下所示:
from threading import Thread, current_thread
def target01(args1, args2):
print("這里是{}".format(current_thread().name))
# 創(chuàng)建線程
thread01 = Thread(target=target01, args="參數(shù)", name="線程1")
# 設(shè)置守護(hù)線程【可選】
thread01.setDaemon(True)
# 啟動線程
thread01.start()
1.2 設(shè)置守護(hù)線程
線程是程序執(zhí)行的最小單位,Python在進(jìn)程啟動起來后,會自動創(chuàng)建一個主線程,之后使用多線程機(jī)制可以在此基礎(chǔ)上進(jìn)行分支,產(chǎn)生新的子線程。子線程啟動起來后,主線程默認(rèn)會等待所有線程執(zhí)行完成之后再退出。但是我們可以將子線程設(shè)置為守護(hù)線程,此時主線程任務(wù)一旦完成,所有子線程將會和主線程一起結(jié)束(就算子線程沒有執(zhí)行完也會退出)。
守護(hù)線程可以在線程啟動之前,通過setDaemon(True)的形式進(jìn)行設(shè)置,或者在創(chuàng)建子線程對象時,以參數(shù)的形式指定:
thread01 = Thread(target=target01, args="", name="線程1", daemon=True)
但是需要注意,如果希望主程序不等待任何線程直接退出,只有所有的線程都被設(shè)置為守護(hù)線程才有用。
1.3 設(shè)置線程阻塞
我們可以用join()方法使主線程陷入阻塞,以等待某個線程執(zhí)行完畢。因此這也是實(shí)現(xiàn)線程同步的一種方式。參數(shù) t i m e o u t timeouttimeout 可以用來設(shè)置主線程陷入阻塞的時間,如果線程不是守護(hù)線程,即沒有設(shè)置daemon為True,那么參數(shù) t i m e o u t timeouttimeout 是無效的,主線程會一直阻塞,直到子線程執(zhí)行結(jié)束。
測試代碼如下:
import time
from threading import Thread, current_thread
def target():
if current_thread().name == "1":
time.sleep(5)
else:
time.sleep(6)
print("線程{}已退出".format(current_thread().name))
thread01 = Thread(target=target, daemon=True, name="1")
thread02 = Thread(target=target, daemon=True, name="2")
thread01.start()
thread02.start()
print("程序因線程1陷入阻塞")
thread01.join(timeout=3)
print("程序因線程2陷入阻塞")
thread02.join(timeout=3)
print("主線程已退出")
1.4 線程間通信的方法
我們知道,線程之間共享同一塊內(nèi)存。子線程雖然可以通過指定target來執(zhí)行一個函數(shù),但是這個函數(shù)的返回值是沒有辦法直接傳回主線程的。我們使用多線程一般是用于并行執(zhí)行一些其他任務(wù),因此獲取子線程的執(zhí)行結(jié)果十分有必要。
直接使用全局變量雖然可行,但是資源的并發(fā)讀寫會引來線程安全問題。下面給出常用的兩種處理方式:
1.4.1 線程鎖
其一是可以考慮使用鎖來處理,當(dāng)多個線程對同一份資源進(jìn)行讀寫操作時,我們可以通過加鎖來確保數(shù)據(jù)安全。Python中給出了多種鎖的實(shí)現(xiàn),例如:同步鎖 Lock,遞歸鎖 RLock,條件鎖 Condition,事件鎖 Event,信號量鎖 Semaphore,這里只給出 Lock 的使用方式,其余的大家感興趣可以自己查閱。
可以通過threading.lock類來創(chuàng)建鎖對象,一旦一個線程獲得一個鎖,會阻塞之后所有嘗試獲得該鎖對象的線程,直到它被重新釋放。這里舉一個例子,通過加鎖來確保兩個線程在對同一個全局變量進(jìn)行讀寫時的數(shù)據(jù)安全:
from threading import Thread, Lock
from time import sleep
book_num = 100 # 圖書館最開始有100本圖書
bookLock = Lock()
def books_return():
global book_num
while True:
bookLock.acquire()
book_num += 1
print("歸還1本,現(xiàn)有圖書{}本".format(book_num))
bookLock.release()
sleep(1) # 模擬事件發(fā)生周期
def books_lease():
global book_num
while True:
bookLock.acquire()
book_num -= 1
print("借走1本,現(xiàn)有圖書{}本".format(book_num))
bookLock.release()
sleep(2) # 模擬事件發(fā)生周期
if __name__ == "__main__":
thread_lease = Thread(target=books_lease)
thread_return = Thread(target=books_return)
thread_lease.start()
thread_return.start()
從結(jié)果中可以看出,其中沒有出現(xiàn)由于讀寫沖突導(dǎo)致的數(shù)據(jù)錯誤。
1.4.2 queue模塊(同步隊列類)
或者,我們可以采用Python的queue模塊來實(shí)現(xiàn)線程通信。Python中的 q u e u e queuequeue 模塊實(shí)現(xiàn)了多生產(chǎn)者、多消費(fèi)者隊列,特別適用于在多線程間安全的進(jìn)行信息交換。該模塊提供了4種我們可以利用的隊列容器,分別 Q u e u e QueueQueue(先進(jìn)先出隊列)、L i f o Q u e u e LifoQueueLifoQueue(先進(jìn)后出隊列)、P r i o r t y Q u e u e PriortyQueuePriortyQueue(優(yōu)先級隊列)、S i m p l e Q u e u e SimpleQueueSimpleQueue(無界的先進(jìn)先出隊列,簡單實(shí)現(xiàn),缺少Q(mào)ueue中的任務(wù)跟蹤等高級功能)。下面我們以 Q u e u e QueueQueue 為例介紹其使用方法,其他容器請自行查閱。
Queue(maxsize=5) # 創(chuàng)建一個FIFO隊列,并制定隊列大小,若maxsize被指定為小于等于0,則隊列無限大
Queue.qsize() # 返回隊列的大致大小,注意并不是確切值,所以不能被用來當(dāng)做后續(xù)線程是否會被阻塞的依據(jù)
Queue.empty() # 判斷隊列為空是否成立,同樣不能作為阻塞依據(jù)
Queue.full() # 判斷隊列為滿是否成立,同樣不能作為阻塞依據(jù)
Queue.put(item, block=True, timeout=None) # 投放元素進(jìn)入隊列,block為True表示如果隊列滿了投放失敗,將阻塞該線程,timeout可用來設(shè)置線程阻塞的時間長短(秒);
# 注意,如果block為False,如果隊列為滿,則將直接引發(fā)Full異常,timeout將被忽略(在外界用try處理異常即可)
Queue.put_nowait(item) # 相當(dāng)于put(item, block=False)
Queue.get(block=True, timeout=False) # 從隊列中取出元素,block為False而隊列為空時,會引發(fā)Empty異常
Queue.get_nowait() # 相當(dāng)于get(block=False)
Queue.task_done() # 每個線程使用get方法從隊列中獲取一個元素,該線程通過調(diào)用task_done()表示該元素已處理完成。
Queue.join() # 阻塞至隊列中所有元素都被處理完成,即隊列中所有元素都已被接收,且接收線程全已調(diào)用task_done()。
下面給出一個例子,場景是3個廚師給4個客人上菜,這是對多生產(chǎn)者多消費(fèi)者場景的模擬:
import queue
from random import choice
from threading import Thread
q = queue.Queue(maxsize=5)
dealList = ["紅燒豬蹄", "鹵雞爪", "酸菜魚", "糖醋里脊", "九轉(zhuǎn)大腸", "陽春面", "烤鴨", "燒雞", "剁椒魚頭", "酸湯肥牛", "燉羊肉"]
def cooking(chefname: str):
for i in range(4):
deal = choice(dealList)
q.put(deal, block=True)
print("廚師{}給大家?guī)硪坏溃簕} ".format(chefname, deal))
def eating(custname: str):
for i in range(3):
deal = q.get(block=True)
print("顧客{}吃掉了:{} ".format(custname, deal))
q.task_done()
if __name__ == "__main__":
# 創(chuàng)建并啟動廚師ABC線程,創(chuàng)建并啟動顧客1234線程
threadlist_chef = [Thread(target=cooking, args=chefname).start() for chefname in ["A", "B", "C"]]
threadlist_cust = [Thread(target=eating, args=str(custname)).start() for custname in range(4)]
# 隊列阻塞,直到所有線程對每個元素都調(diào)用了task_done
q.join()
上述程序執(zhí)行結(jié)果如下圖所示:
1.5 殺死線程
在一些場景下,我們可能需要?dú)⑺滥硞€線程,但是在這之前,請仔細(xì)的考量代碼的上下文環(huán)境。強(qiáng)制殺死線程可能會帶來一些意想不到的結(jié)果,并且從程序設(shè)計來講,這本身就是不合理的。而且,鎖資源并不會因?yàn)楫?dāng)前線程的退出而釋放,這在程序運(yùn)行過程中,可能會成為典型的死鎖場景。所以殺死線程之前,請一定慎重。殺死線程的方法網(wǎng)上有好幾種,我這里給出一種我覺得比較穩(wěn)妥的方式。
前面我們提到過如何做線程通信,這里可以用全局變量給出一個flag,線程任務(wù)采用循環(huán)形式進(jìn)行,每次循環(huán)都會檢查該flag,外界可以通過修改這一flag來通知這一線程退出循環(huán),結(jié)束任務(wù),從而起到殺死線程的目的,但請注意,為了線程安全,退出前一定要釋放這一線程所占用的資源。下面給出一個示例程序:
from threading import Lock, Thread
from time import sleep
flag = True
lock = Lock()
def tar():
global flag, lock
while True:
lock.acquire()
"線程任務(wù)邏輯"
if flag is False:
break
lock.release()
lock.release()
if __name__ == "__main__":
thread = Thread(target=tar)
thread.start()
print("3秒后線程會被殺死")
sleep(3)
flag = False
print("線程已被殺死")
執(zhí)行結(jié)果如圖所示,如果需要其他的方法請自行查閱,網(wǎng)上有不少。
1.6 線程池的使用
在程序運(yùn)行過程之中,臨時創(chuàng)建一個線程需要耗費(fèi)不小的代價(包括與操作系統(tǒng)的交互部分),尤其是我們只對一個線程分配一個簡短的任務(wù),此時,頻繁的線程創(chuàng)建將會嚴(yán)重拖垮程序的執(zhí)行的效率。
因此,在這種情形下,我們可以選擇采用線程池技術(shù),即通過預(yù)先創(chuàng)建幾個空閑線程,在需要多線程來處理任務(wù)時,將任務(wù)分配給一個處于空閑狀態(tài)的線程,該線程在執(zhí)行完成后,將會回歸空閑狀態(tài),而不是直接銷毀;而如果申請從線程池中分配一個空閑線程時,遇到所有線程均處于運(yùn)行狀態(tài),則當(dāng)前線程可以選擇阻塞來等待線程資源的空閑。如此一來,程序?qū)τ诰€程的管理將會更加靈活。
Python從3.2開始,就將線程池作為內(nèi)置模塊包含了進(jìn)來,可以通過concurrent.futures.ThreadPoolExecutor來調(diào)用,使用方法也很簡單。下面給出線程池的程序例子:
from concurrent.futures import ThreadPoolExecutor
from time import sleep
tasklist = ["任務(wù)1", "任務(wù)2", "任務(wù)3", "任務(wù)4"]
def task(taskname: str):
sleep(5)
print(taskname + " 已完成\n")
return taskname + " 的執(zhí)行結(jié)果"
executor = ThreadPoolExecutor(max_workers=3) # 創(chuàng)建線程池(是一個ThreadPoolExecutor對象),線程數(shù)為3
future_a = executor.submit(task, tasklist[0]) # 通過submit方法向線程池提交任務(wù),返回一個對應(yīng)的Future對象
future_b = executor.submit(task, tasklist[1])
future_c = executor.submit(task, tasklist[2])
future_d = executor.submit(task, tasklist[3]) # 如果提交時,線程池中沒有空余線程,則該線程會進(jìn)入等待狀態(tài),主線程不會阻塞
print(future_a.result(), future_b.result()) # 通過Future對象的result()方法獲取任務(wù)的返回值,若沒有執(zhí)行完,則會陷入阻塞
有關(guān)于線程池的詳細(xì)使用方法,我后面還會出一篇文章,大家這里沒理解的可以去看一下。
2. GIL 全局解釋器鎖
2.1 GIL是什么?
G I L GILGIL(G l o b a l I n t e r p r e t e r L o c k Global Interpreter LockGlobalInterpreterLock,全局解釋器鎖)是CPython中采用的一種機(jī)制,它確保同一時刻只有一個線程在執(zhí)行Python字節(jié)碼。給整個解釋器加鎖使得解釋器多線程運(yùn)行更方便,而且開發(fā)的CPython也更易于維護(hù),但是代價是犧牲了在多處理器上的并行性。因此,在相當(dāng)多的場景中,CPython解釋器下的多線程機(jī)制的性能都不盡如人意。
2.2 GIL給Python帶來的影響?
上圖是David Beazley的UnderstandGIL幻燈片中的一張,用于描述GIL的執(zhí)行模型。
從這套幻燈篇的介紹中,我們可以得知,GIL本質(zhì)上是條件鎖與互斥鎖結(jié)合的一種二值信號量類的一個實(shí)例,在程序執(zhí)行過程中,一個線程通過acquire操作獲得GIL,從而執(zhí)行其字節(jié)碼,而當(dāng)其遇到IO操作時,他將會release釋放掉GIL鎖資源,GIL這時可以被其他線程獲得以執(zhí)行該線程的任務(wù),而原先線程的IO操作將會同時進(jìn)行。
由此我們可以看到,GIL使得Python多線程程序在計算密集的場景下,不能充分利用多核心并發(fā)的優(yōu)勢(因?yàn)闊o論機(jī)器有多少核心,并且無論有多少線程來執(zhí)行計算任務(wù),同時運(yùn)行的只有1個),而在IO密集的場景下,其性能受到的影響則較小。
2.3 如何繞過GIL?
那么如何避免GIL對多線程性能帶來的影響呢?
1. 繞過CPython,使用JPython(Java實(shí)現(xiàn)的)等別的Python解釋器
首先,GIL是CPython解釋器中的實(shí)現(xiàn),在JPython等其他解釋器中并沒有采用,因此我們可以考慮更換解釋器實(shí)現(xiàn)。我們現(xiàn)在從官網(wǎng)下載,或者通過Anaconda部署的解釋器普遍采用的是CPython,可以通過下面的方法安裝其它實(shí)現(xiàn):以JPython為例,去JPython官網(wǎng)下載其安裝包(jar包),然后用 java -jar (前提是你的電腦安裝了Java環(huán)境)去執(zhí)行它,最后再配置一下環(huán)境變量即可。
2. 把關(guān)鍵性能代碼,放到別的語言(一般是C++)中實(shí)現(xiàn)
這個是常用的一種方式,很多追求執(zhí)行性能的模塊例如Numpy、Pytorch等都是將自身性能代碼放在C語言擴(kuò)展中來完成的,如何開發(fā)Python模塊的C語言擴(kuò)展部分,可以參考這個鏈接http://t.csdn.cn/4YuDO。
3. 并行改并發(fā),在Python中,多進(jìn)程有時比多線程管用
Python程序的每個進(jìn)程都有自己的GIL鎖,互補(bǔ)干涉,因此我們也可以直接使用多線程來處理一些計算任務(wù),Python的多線程可以使用m u l t i p r o c e s s i n g multiprocessingmultiprocessing模塊來完成,示例程序如下。
from multiprocessing import Process
def task(procName: int):
print("這是線程{}".format(procName))
if __name__ == "__main__":
proc1 = Process(target=task, args=(1,))
proc2 = Process(target=task, args=(2,))
proc1.start()
proc2.start()
2.4 使用GIL并非絕對線程安全
前面講了這么多,總而言之就是Python的多線程機(jī)制很奇怪,就算有多個物理核心,在任何時刻只會有一個線程在執(zhí)行。那么有人就會問,那我還要給公共資源加鎖干什么?
各位記住,Python的GIL只是負(fù)責(zé)Python解釋器的線程安全,也只能保證同時只有一個線程在執(zhí)行字節(jié)碼,而Python程序本身的線程安全,Python一概不負(fù)責(zé)。想了解詳情的,可以參考下網(wǎng)上的討論,比如這個:https://www.zhihu.com/question/521650365