Tornado 協(xié)程隊列

2022-03-10 14:59 更新

協(xié)程的異步隊列。 這些類與標準庫的 ?asyncio包中提供的類非常相似。

注意:

與標準庫的 ?queue模塊不同,這里定義的類不是線程安全的。 要從另一個線程使用這些隊列,請在調用任何隊列方法之前使用 ?IOLoop.add_callback? 將控制權轉移到 ?IOLoop線程。

隊列

class tornado.queues.Queue(maxsize: int = 0)

協(xié)調生產者和消費者協(xié)程。

如果 ?maxsize為 0(默認值),則隊列大小是無限的。

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await gen.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    for item in range(5):
        await q.put(item)
        print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')

IOLoop.current().run_sync(main)

結果如下:

Put 0
Put 1
Doing work on 0
Put 2
Doing work on 1
Put 3
Doing work on 2
Put 4
Doing work on 3
Doing work on 4
Done

在沒有原生協(xié)程的 Python 版本中(3.5 之前),?consumer()? 可以寫成:

@gen.coroutine
def consumer():
    while True:
        item = yield q.get()
        try:
            print('Doing work on %s' % item)
            yield gen.sleep(0.01)
        finally:
            q.task_done()

maxsize

隊列中允許的項目數。

qsize() → int

隊列中的項目數。

put(item: _T, timeout: Union[float, datetime.timedelta, None] = None) → Future[None]

將一個項目放入隊列中,也許等到有空間。

返回一個 ?Future?,它會在超時后引發(fā) ?tornado.util.TimeoutError?。

?timeout可以是一個表示時間的數字(與 ?tornado.ioloop.IOLoop.time? 的比例相同,通常是 ?time.time?),或者是相對于當前時間的截止日期的 ?datetime.timedelta? 對象。

put_nowait(item: _T) → None

將一個項目放入隊列而不阻塞。

如果沒有立即可用的空閑槽,則提高 ?QueueFull?。

get(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[_T]

從隊列中移除并返回一個項目。

返回一個等待項目,一旦項目可用就解決,或在超時后引發(fā) ?tornado.util.TimeoutError?。

?timeout可以是一個表示時間的數字(與 ?tornado.ioloop.IOLoop.time? 的比例相同,通常是?time.time?),或者是相對于當前時間的截止日期的 ?datetime.timedelta? 對象。

注意:

該方法的 ?timeout參數與標準庫的 ?queue.Queue.get? 不同。 該方法將數值解釋為相對超時; 這將它們解釋為絕對截止日期,并且需要 ?timedelta對象用于相對超時(與 Tornado 中的其他超時一致)。

get_nowait() → _T

從隊列中移除并返回一個項目而不阻塞。

如果一個項目立即可用,則返回一個項目,否則引發(fā) ?QueueEmpty?。

task_done() → None

指示以前排隊的任務已完成。

由隊列消費者使用。 對用于獲取任務的每個 ?get ?,對 ?task_done的后續(xù)調用會告訴隊列該任務的處理已完成。

如果一個連接被阻塞,它會在所有項目都被處理后恢復; 也就是說,當每個 ?put?都與 ?task_done?匹配時。

如果調用次數多于 ?put?,則引發(fā) ?ValueError?。

join(timeout: Union[float, datetime.timedelta, None] = None) → Awaitable[None]

阻塞直到隊列中的所有項目都處理完畢。

返回一個 ?awaitable?,它在超時后引發(fā) ?tornado.util.TimeoutError?。

優(yōu)先隊列

class tornado.queues.PriorityQueue(maxsize: int = 0)

按優(yōu)先級順序檢索條目的隊列,最低優(yōu)先。

條目通常是元組,如(?priority number?, ?data?)。

from tornado.queues import PriorityQueue

q = PriorityQueue()
q.put((1, 'medium-priority item'))
q.put((0, 'high-priority item'))
q.put((10, 'low-priority item'))

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())

結果如下:

(0, 'high-priority item')
(1, 'medium-priority item')
(10, 'low-priority item')

隊列

class tornado.queues.LifoQueue(maxsize: int = 0)

隊列首先檢索最近放置的項目。

from tornado.queues import LifoQueue

q = LifoQueue()
q.put(3)
q.put(2)
q.put(1)

print(q.get_nowait())
print(q.get_nowait())
print(q.get_nowait())

結果如下:

1
2
3

隊列空

exception tornado.queues.QueueEmpty

當隊列沒有項目時由 ?Queue.get_nowait? 引發(fā)。

隊列滿

exception tornado.queues.QueueFull

當隊列達到最大大小時由 ?Queue.put_nowait? 引發(fā)。


以上內容是否對您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號
微信公眾號

編程獅公眾號