diff --git a/uasyncio.queues/uasyncio/queues.py b/uasyncio.queues/uasyncio/queues.py index 04918ae5c1..53c20fa7a6 100644 --- a/uasyncio.queues/uasyncio/queues.py +++ b/uasyncio.queues/uasyncio/queues.py @@ -1,5 +1,6 @@ from collections.deque import deque -from uasyncio.core import sleep + +from uasyncio import core class QueueEmpty(Exception): @@ -21,14 +22,18 @@ class Queue: with qsize(), since your single-threaded uasyncio application won't be interrupted between calling qsize() and doing an operation on the Queue. """ - _attempt_delay = 0.1 def __init__(self, maxsize=0): self.maxsize = maxsize self._queue = deque() + self._full = core.TaskQueue() + self._empty = core.TaskQueue() def _get(self): - return self._queue.popleft() + res = self._queue.popleft() + if self._full.peek(): + core._task_queue.push(self._full.pop()) + return res def get(self): """Returns generator, which can be used for getting (and removing) @@ -36,10 +41,13 @@ def get(self): Usage:: - item = yield from queue.get() + item = await queue.get() """ - while not self._queue: - yield from sleep(self._attempt_delay) + if not self._queue: + self._empty.push(core.cur_task) + core.cur_task.data = self._empty + yield + return self._get() def get_nowait(self): @@ -52,6 +60,8 @@ def get_nowait(self): return self._get() def _put(self, val): + if self._empty.peek(): + core._task_queue.push(self._empty.pop()) self._queue.append(val) def put(self, val): @@ -59,10 +69,12 @@ def put(self, val): Usage:: - yield from queue.put(item) + await queue.put(item) """ - while self.qsize() >= self.maxsize and self.maxsize: - yield from sleep(self._attempt_delay) + if self.maxsize and self.qsize() >= self.maxsize: + self._full.push(core.cur_task) + core.cur_task.data = self._full + yield self._put(val) def put_nowait(self, val): @@ -70,7 +82,7 @@ def put_nowait(self, val): If no free slot is immediately available, raise QueueFull. """ - if self.qsize() >= self.maxsize and self.maxsize: + if self.maxsize and self.qsize() >= self.maxsize: raise QueueFull() self._put(val)