協(xié)程的異步隊列。 這些類與標準庫的 ?asyncio
包中提供的類非常相似。
注意:
與標準庫的 ?queue
模塊不同,這里定義的類不是線程安全的。 要從另一個線程使用這些隊列,請在調用任何隊列方法之前使用 ?IOLoop.add_callback
? 將控制權轉移到 ?IOLoop
線程。
協(xié)調生產者和消費者協(xié)程。
如果 ?maxsize
為 0(默認值),則隊列大小是無限的。
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
q = Queue(maxsize=2)
async def consumer():
async for item in q:
try:
print('Doing work on %s' % item)
await gen.sleep(0.01)
finally:
q.task_done()
async def producer():
for item in range(5):
await q.put(item)
print('Put %s' % item)
async def main():
# Start consumer without waiting (since it never finishes).
IOLoop.current().spawn_callback(consumer)
await producer() # Wait for producer to put all tasks.
await q.join() # Wait for consumer to finish all tasks.
print('Done')
IOLoop.current().run_sync(main)
結果如下:
Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done
在沒有原生協(xié)程的 Python 版本中(3.5 之前),?consumer()
? 可以寫成:
@gen.coroutine
def consumer():
while True:
item = yield q.get()
try:
print('Doing work on %s' % item)
yield gen.sleep(0.01)
finally:
q.task_done()
隊列中允許的項目數。
隊列中的項目數。
將一個項目放入隊列中,也許等到有空間。
返回一個 ?Future
?,它會在超時后引發(fā) ?tornado.util.TimeoutError
?。
?timeout
可以是一個表示時間的數字(與 ?tornado.ioloop.IOLoop.time
? 的比例相同,通常是 ?time.time
?),或者是相對于當前時間的截止日期的 ?datetime.timedelta
? 對象。
將一個項目放入隊列而不阻塞。
如果沒有立即可用的空閑槽,則提高 ?QueueFull
?。
從隊列中移除并返回一個項目。
返回一個等待項目,一旦項目可用就解決,或在超時后引發(fā) ?tornado.util.TimeoutError
?。
?timeout
可以是一個表示時間的數字(與 ?tornado.ioloop.IOLoop.time
? 的比例相同,通常是?time.time
?),或者是相對于當前時間的截止日期的 ?datetime.timedelta
? 對象。
注意:
該方法的 ?timeout
參數與標準庫的 ?queue.Queue.get
? 不同。 該方法將數值解釋為相對超時; 這將它們解釋為絕對截止日期,并且需要 ?timedelta
對象用于相對超時(與 Tornado 中的其他超時一致)。
從隊列中移除并返回一個項目而不阻塞。
如果一個項目立即可用,則返回一個項目,否則引發(fā) ?QueueEmpty
?。
指示以前排隊的任務已完成。
由隊列消費者使用。 對用于獲取任務的每個 ?get
?,對 ?task_done
的后續(xù)調用會告訴隊列該任務的處理已完成。
如果一個連接被阻塞,它會在所有項目都被處理后恢復; 也就是說,當每個 ?put
?都與 ?task_done
?匹配時。
如果調用次數多于 ?put
?,則引發(fā) ?ValueError
?。
阻塞直到隊列中的所有項目都處理完畢。
返回一個 ?awaitable
?,它在超時后引發(fā) ?tornado.util.TimeoutError
?。
按優(yōu)先級順序檢索條目的隊列,最低優(yōu)先。
條目通常是元組,如(?priority number
?, ?data
?)。
from tornado.queues import PriorityQueue
q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
結果如下:
(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')
隊列首先檢索最近放置的項目。
from tornado.queues import LifoQueue
q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)
print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())
結果如下:
1
2
3
當隊列沒有項目時由 ?Queue.get_nowait
? 引發(fā)。
當隊列達到最大大小時由 ?Queue.put_nowait
? 引發(fā)。
更多建議: