介紹
Python 不乏并發(fā)選項,標(biāo)準(zhǔn)庫包括對線程、進(jìn)程和異步 I/O 的支持。在許多情況下,Python 通過創(chuàng)建異步、線程和子進(jìn)程等高級模塊,消除了使用這些各種并發(fā)方法的困難。在標(biāo)準(zhǔn)庫之外,還有第三種解決方案,例如twisted、stackless 和處理模塊,僅舉幾例。本文使用實踐示例專門關(guān)注 Python 中的線程處理。網(wǎng)上有很多很好的資源來記錄線程 API,但本文試圖提供常見線程使用模式的實踐示例。
首先定義進(jìn)程和線程之間的區(qū)別很重要。線程與進(jìn)程的不同之處在于它們共享狀態(tài)、內(nèi)存和資源。這個簡單的區(qū)別對于線程來說既是優(yōu)點也是缺點。一方面,線程是輕量級的并且易于通信,但另一方面,它們帶來了一系列問題,包括死鎖、競爭條件和純粹的復(fù)雜性。幸運(yùn)的是,由于 GIL 和排隊模塊,Python 中的線程實現(xiàn)起來比其他語言要簡單得多。
你好 Python 線程
接下來,我假設(shè)你已經(jīng)安裝了 Python 2.5 或更高版本,因為許多示例將使用 Python 語言的更新功能,這些功能至少出現(xiàn)在 Python2.5 中。要開始使用 Python 中的線程,我們將從一個簡單的“Hello World”示例開始:
清單 1. hello_threads_example
import threading
import datetime
class ThreadClass(threading.Thread):
def run(self):
now = datetime.datetime.now()
print "%s says Hello World at time: %s" %
(self.getName(), now)
for i in range(2):
t = ThreadClass()
t.start()
如果你運(yùn)行這個例子,你會得到以下輸出:
#python hello_threads.py
Thread?1 says Hello World at time: 2008?05?13 13:22:50.252069
Thread?2 says Hello World at time: 2008?05?13 13:22:50.252576
查看此輸出,你可以看到你收到了來自兩個帶有日期戳的線程的 Hello World 語句。如果你查看實際代碼,會發(fā)現(xiàn)有兩個 import 語句;一個導(dǎo)入 datetime 模塊,另一個導(dǎo)入 threading 模塊。該類ThreadClass繼承自threading.Thread,因此,您需要定義一個 run 方法來執(zhí)行您在線程內(nèi)運(yùn)行的代碼。在 run 方法中唯一需要注意的重要事項self.getName()是該方法將標(biāo)識線程的名稱。
最后三行代碼實際上調(diào)用了類并啟動了線程。如果您注意到,t.start()實際上是啟動線程的。線程模塊在設(shè)計時就考慮到了繼承性,實際上是建立在較低級別的線程模塊之上的。在大多數(shù)情況下,繼承自 被認(rèn)為是最佳實踐threading.Thread,因為它為線程編程創(chuàng)建了一個非常自然的 API。
使用帶線程的隊列
正如我之前提到的,當(dāng)線程需要共享數(shù)據(jù)或資源時,線程處理可能會很復(fù)雜。線程模塊確實提供了許多同步原語,包括信號量、條件變量、事件和鎖。雖然存在這些選項,但最好的做法是專注于使用隊列。隊列更容易處理,并使線程編程更加安全,因為它們有效地將所有對資源的訪問集中到單個線程,并允許更清晰、更易讀的設(shè)計模式。
在下一個示例中,你將首先創(chuàng)建一個程序,該程序?qū)⒁来位蛞粋€接一個地獲取網(wǎng)站的 URL,并打印出頁面的前 1024 個字節(jié)。這是使用線程可以更快地完成某些事情的經(jīng)典示例。首先,讓我們使用urllib2模塊一次抓取這些頁面,并對代碼進(jìn)行計時:
清單 2. URL 獲取序列
import urllib2
import time
hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"
start = time.time()
#grabs urls of hosts and prints first 1024 bytes of page
for host in hosts:
url = urllib2.urlopen(host)
print url.read(1024)
print "Elapsed Time: %s" % (time.time() ? start)
當(dāng)你運(yùn)行它時,你會得到大量輸出到標(biāo)準(zhǔn)輸出,因為頁面被部分打印。但你會在最后得到這個:
Elapsed Time: 2.40353488922
讓我們稍微看一下這段代碼。你只導(dǎo)入兩個模塊。首先,urllib2模塊是承擔(dān)重任并抓取網(wǎng)頁的東西。其次,你通過調(diào)用?time.time()
?創(chuàng)建一個開始時間值,然后再次調(diào)用它并減去初始值以確定程序執(zhí)行所需的時間。最后,從程序的速度來看,“兩秒半”的結(jié)果并不可怕,但如果你有數(shù)百個網(wǎng)頁要檢索,考慮到當(dāng)前的平均值,大約需要 50 秒??纯磩?chuàng)建線程版本如何加快速度:
清單 3. URL 獲取線程
#!/usr/bin/env python
import Queue
import threading
import urllib2
import time
hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"
queue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def init(self, queue):
threading.Thread.init(self)
self.queue = queue
def run(self):
while True:
#grabs host from queue
host = self.queue.get()
#grabs urls of hosts and prints first 1024 bytes of page
url = urllib2.urlopen(host)
print url.read(1024)
#signals to queue job is done
self.queue.task_done()
start = time.time()
def main():
#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(queue)
t.setDaemon(True)
t.start()
#populate queue with data
for host in hosts:
queue.put(host)
#wait on the queue until everything has been processed
queue.join()
main()
print "Elapsed Time: %s" % (time.time() ? start)
這個例子有更多的代碼需要解釋,但由于使用了排隊模塊,它并沒有比第一個線程示例復(fù)雜多少。這種模式是在 Python 中使用線程的一種非常常見且推薦的方式。步驟描述如下:
- 創(chuàng)建一個?
Queue.Queue()
?實例,然后用數(shù)據(jù)填充它。 - 將填充數(shù)據(jù)的實例傳遞到從?
threading.Thread
?繼承而創(chuàng)建的?Thread
?類中。 - 產(chǎn)生一個守護(hù)線程池。
- 一次從隊列中拉出一項,并在線程內(nèi)部使用該數(shù)據(jù)(即 run 方法)來完成這項工作。
- 工作完成后,向?
queue.task_done()
?隊列發(fā)送任務(wù)已完成的信號。 - 加入隊列,這實際上意味著等到隊列為空,然后退出主程序。
關(guān)于此模式的注意事項:通過將守護(hù)線程設(shè)置為 true,它允許主線程或程序在只有守護(hù)線程處于活動狀態(tài)時退出。這創(chuàng)建了一種控制程序流程的簡單方法,因為你可以在退出之前加入隊列,或等到隊列為空。確切的過程在隊列模塊的文檔中得到了最好的描述,如右側(cè)的資源部分所示:
join()
阻塞,直到隊列中的所有項目都被獲取和處理。每當(dāng)將項目添加到隊列時,未完成任務(wù)的計數(shù)就會增加。每當(dāng)使用者線程調(diào)用 task_done() 以指示該項目已被檢索并且其上的所有工作已完成時,未完成任務(wù)的計數(shù)就會下降。當(dāng)未完成任務(wù)的數(shù)量降至零時, join()解鎖。
使用多個隊列
因為上面演示的模式非常有效,所以通過將額外的線程池與隊列鏈接來擴(kuò)展它是相對簡單的。在上面的示例中,你只是打印出網(wǎng)頁的第一部分。下一個示例返回每個線程抓取的整個網(wǎng)頁,然后將其放入另一個隊列。然后設(shè)置另一個加入第二個隊列的線程池,然后在網(wǎng)頁上工作。本示例中執(zhí)行的工作涉及使用名為 Beautiful Soup 的第三方 Python 模塊解析網(wǎng)頁。僅使用幾行代碼,使用此模塊,你將提取標(biāo)題標(biāo)簽并為你訪問的每個頁面打印出來。
清單 4. 多隊列數(shù)據(jù)挖掘網(wǎng)站
import Queue
import threading
import urllib2
import time
from BeautifulSoup import BeautifulSoup
hosts = "http://yahoo.com", "http://google.com", "http://amazon.com",
"http://ibm.com", "http://apple.com"
queue = Queue.Queue()
outqueue = Queue.Queue()
class ThreadUrl(threading.Thread):
"""Threaded Url Grab"""
def init(self, queue, outqueue):
threading.Thread.init(self)
self.queue = queue
self.outqueue = outqueue
def run(self):
while True:
#grabs host from queue
host = self.queue.get()
#grabs urls of hosts and then grabs chunk of webpage
url = urllib2.urlopen(host)
chunk = url.read()
#place chunk into out queue
self.out_queue.put(chunk)
#signals to queue job is done
self.queue.task_done()
class DatamineThread(threading.Thread):
"""Threaded Url Grab"""
def __init(self, out_queue):
threading.Thread.__init(self)
self.out_queue = out_queue
def run(self):
while True:
#grabs host from queue
chunk = self.out_queue.get()
#parse the chunk
soup = BeautifulSoup(chunk)
print soup.findAll(['title'])
#signals to queue job is done
self.out_queue.task_done()
start = time.time()
def main():
#spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadUrl(queue, out_queue)
t.setDaemon(True)
t.start()
#populate queue with data
for host in hosts:
queue.put(host)
for i in range(5):
dt = DatamineThread(out_queue)
dt.setDaemon(True)
dt.start()
#wait on the queue until everything has been processed
queue.join()
out_queue.join()
main()
print "Elapsed Time: %s" % (time.time() ? start)
如果你運(yùn)行此版本的腳本,你將獲得以下輸出:
#python url_fetch_threaded_part2.py
<title>Google</title> <title>Yahoo!</title> <title>Apple</title> <title>IBM United States</title> <title>Amazon.com: Online Shopping for Electronics, Apparel,
Computers, Books, DVDs & more</title> Elapsed Time: 3.75387597084
在查看代碼時,你可以看到我們添加了另一個隊列實例,然后將該隊列傳遞給第一個線程池類ThreadURL. 接下來,你幾乎為下一個線程池類復(fù)制了完全相同的結(jié)構(gòu)DatamineThread。在這個類的run方法中,從每個線程的隊列中抓取網(wǎng)頁,chunk,然后用Beautiful Soup處理這個chunk。在這種情況下, 你可以使用 Beautiful Soup 來簡單地從每個頁面中提取標(biāo)題標(biāo)簽并打印出來。這個例子可以很容易地變成更有用的東西,因為你擁有基本搜索引擎或數(shù)據(jù)挖掘工具的核心。一個想法是使用 Beautiful Soup 從每個頁面中提取鏈接,然后關(guān)注它們。
總結(jié)
本文探討了 Python 中的線程,并展示了使用隊列來減輕復(fù)雜性和細(xì)微錯誤以及提高可讀代碼的最佳實踐。雖然這個基本模式相對簡單,但它可以通過將隊列和線程池鏈接在一起來解決大量問題。在最后一部分,您開始探索創(chuàng)建一個更復(fù)雜的處理管道,作為未來項目的模型。在資源部分有很多關(guān)于并發(fā)和線程的優(yōu)秀資源。
最后,重要的是要指出線程并不是所有問題的解決方案,而且進(jìn)程可以非常適合許多情況。如果你只需要分叉多個進(jìn)程并監(jiān)聽響應(yīng),那么標(biāo)準(zhǔn)庫 ??subprocess 模塊尤其可以更簡單地處理。