今天我們學習下 Queue 的進階用法。
生產者消費者模型
在并發編程中,比如爬蟲,有的線程負責爬取數據,有的線程負責對爬取到的數據做處理(清洗、分類和入庫)。假如他們是直接交互的,那么當二者的速度不匹配時勢必出現等待現象,這也就產生了資源的浪費。
抽象是一種很重要的通用能力,而生產者消費者模型是前人將一系列同類型的具體的問題抽象出來的一個一致的最佳解決方案。
該模型有三個重要角色,容器,生產者和消費者,顧名思義,生產者就是負責生產數據或任務的,消費者就是負責消費數據或者任務的(下文統稱為任務),容器是二者進行通訊的媒介。在該模型中,生產者和消費者不在直接進行通訊,而是通過引入一個第三者容器(通常都是用阻塞隊列)來達到解耦的目的。這樣生產者不必在因為消費者速度過慢而等待,直接將任務放入容器即可,消費者也不必因生產者生產速度過慢而等待,直接從容器中獲取任務,以此達到了資源的最大利用。
使用該模型可以解決并發編程中的絕大部分并發問題。
簡易版
我們先寫一個單生產者和單消費者的簡易版生產者消費者模型。
import threading
import time
import queue
def consume(thread_name, q):
while True:
time.sleep(2)
product = q.get()
print("%s consume %s" % (thread_name, product))
def produce(thread_name, q):
for i in range(3):
product = 'product-' + str(i)
q.put(product)
print("%s produce %s" % (thread_name, product))
time.sleep(1)
q = queue.Queue()
p = threading.Thread(target=produce, args=("producer",q))
c = threading.Thread(target=consume, args=("consumer",q))
p.start()
c.start()
p.join()
# 輸出如下
producer produce product-0
producer produce product-1
consumer consume product-0
producer produce product-2
consumer consume product-1
consumer consume product-2
...
以上就是最簡單的生產者消費者模型了,生產者生產三個任務供消費者消費。但是上面的寫法有個問題,就是生產者將任務生產完畢之后就和主線程一起退出了,但是消費者將所有的任務消費完之后還沒停止,一直處于阻塞狀態。
那可不可以將 while True 的判斷改為 while not q.empty()呢,肯定是不行的。因為 empty() 返回 False ,不保證后續調用的 get()不被阻塞。同時,如果用 empty() 函數來做判斷的話,那么就要保證消費者線程開啟之時生產者一定至少生產了一個任務,否則消費者線程就會因條件不滿足直接退出程序;同時如果生產者生產速度比較慢,一旦消費者將任務消費完且下次判斷時還沒有新的任務入隊,那么消費者線程也會因條件不滿足直接退出程序。自此以后,生產者生產的任務就永遠不會被消費了。
那我們可以做一個約定,當生產者生產完任務之后,放入一個標志,類似于 q.put(None),一旦消費者接收到為 None 的任務時就意味著結束,直接退出程序即可。這種做法在上面的程序中是沒有問題的,唯一的缺點就是有 N 個消費者線程就需要放入 N 個 None 標志,這對于多消費者類型的程序顯然是很不友好的。
最佳實踐
我們可以結合隊列的內置函數 task_done() 和 join() 來達到我們的目的。
join() 函數是阻塞的。當消費者通過 get() 從隊列獲取一項任務并處理完成之后,需要調用且只可以調用一次 task_done(),該方法會給隊列發送一個信號,join()函數則在監聽這個信號。可以簡單理解為隊列內部維護了一個計數器,該計數器標識未完成的任務數,每當添加任務時,計數器會增加,調用 task_done()時計數器則會減少,直到隊列為空。而 join() 就是在監聽隊列是否為空,一旦條件滿足則結束阻塞狀態。
import threading
import time
import queue
def consume(thread_name, q):
while True:
time.sleep(2)
product = q.get()
print("%s consume %s" % (thread_name, product))
q.task_done()
def produce(thread_name, q):
for i in range(3):
product = 'product-' + str(i)
q.put(product)
print("%s produce %s" % (thread_name, product))
time.sleep(1)
q.join()
q = queue.Queue()
p = threading.Thread(target=produce, args=("producer",q))
c = threading.Thread(target=consume, args=("consumer",q))
c1 = threading.Thread(target=consume, args=("consumer-1",q))
c.setDaemon(True)
c1.setDaemon(True)
p.start()
c.start()
c1.start()
p.join()
# 輸出如下
producer produce product-0
producer produce product-1
consumer-1 consume product-0
consumer consume product-1
producer produce product-2
consumer consume product-2
上述示例中,我們將消費者線程設置為守護線程,這樣當主線程結束時消費者線程也會一并結束。然后主線程最后一句 p.join() 又表示主線程必須等待生產者線程結束后才可以結束。
再細看生產者線程的主函數 produce(),該函數中出現了我們上面說過的 q.join() 函數。而 task_done 則是在消費者線程的主函數中調用的。故當生產者線程生產完所有任務后就會被阻塞,只有當消費者線程處理完所有任務后生產者才會阻塞結束。隨著生產者線程的結束,主線程也一并結束,守護線程消費者線程也一并結束,自此所有線程均安全退出。
Queue 總結
本章節介紹了隊列的高級應用,從簡易版的示例到最佳實踐,介紹了生產者消費者模型的基本用法,在該模型中,隊列扮演了非常重要的角色,起到了解耦的目的。
本模型有固定的步驟,其中最重要的就是通過 task_done() 和 join() 來互相通信。 task_done() 僅僅用來通知隊列消費者已完成一個任務,至于任務是什么它毫不關心,它只關心隊列中未完成的任務數量。
注意:task_done() 不可以在 put() 之前調用,否則會引發 ValueError: task_done() called too many times。同時在處理完任務后只可以調用一次該函數,否則隊列將不能準確計算未完成任務數量。