-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathqueue.py
More file actions
298 lines (243 loc) · 7.88 KB
/
queue.py
File metadata and controls
298 lines (243 loc) · 7.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
__all__ = (
'QueueException', 'WouldBlock', 'Closed',
'Queue', 'Order', 'QueueState',
)
import typing as T
import enum
import heapq
from functools import partial
from collections import deque
from asyncgui import ExclusiveEvent
class QueueState(enum.Enum):
'''
Enum class that represents the state of the Queue.
'''
OPENED = enum.auto()
'''
All operations are allowed.
:meta hide-value:
'''
HALF_CLOSED = enum.auto()
'''
Putting an item is not allowed.
:meta hide-value:
'''
CLOSED = enum.auto()
'''
Putting or getting an item is not allowed.
:meta hide-value:
'''
class QueueException(Exception):
'''Base class of all the queue-related exceptions.'''
class WouldBlock(QueueException):
'''Raised by X_nowait functions if X would block.'''
class Closed(QueueException):
'''
Occurs when:
* trying to **get** an item from a queue that is in the ``CLOSED`` state.
* trying to **get** an item from an **empty** queue that is in the ``HALF_CLOSED`` state.
* trying to **put** an item into a queue that is in the ``CLOSED`` or ``HALF_CLOSED`` state.
'''
Item: T.TypeAlias = T.Any
Order = T.Literal['fifo', 'lifo', 'small-first']
'''
* ``'fifo'``: First In First Out
* ``'lifo'``: Last In First Out
* ``'small-first'``: Smallest One First Out
'''
def _do_nothing(*_unused_args, **_unused_kwargs):
pass
class Queue:
'''
:param capacity: Cannot be zero. Unlimited if None.
'''
def __init__(self, *, capacity: int | None=None, order: Order='fifo'):
if capacity is None:
pass
elif (not isinstance(capacity, int)) or capacity < 1:
raise ValueError(f"'capacity' must be either a positive integer or None. (was {capacity!r})")
self._init_container(capacity, order)
self._state = QueueState.OPENED
self._putters = deque[tuple[ExclusiveEvent, Item]]()
self._getters = deque[ExclusiveEvent]()
self._capacity = capacity
self._order = order
self._is_transferring = False # A flag to avoid recursive calls of transfer_items.
def _init_container(self, capacity, order):
# If the capacity is 1, there is no point in reordering items.
# Therefore, for performance reasons, treat the order as 'lifo'.
if capacity == 1 or order == 'lifo':
c = []
c_get = c.pop
c_put = c.append
elif order == 'fifo':
c = deque(maxlen=capacity)
c_get = c.popleft
c_put = c.append
elif order == 'small-first':
c = []
c_get = partial(heapq.heappop, c)
c_put = partial(heapq.heappush, c)
else:
raise ValueError(f"'order' must be one of 'lifo', 'fifo' or 'small-first'. (was {order!r})")
self._c = c
self._c_get = c_get
self._c_put = c_put
def __len__(self) -> int:
return len(self._c)
size = property(__len__)
'''Number of items in the queue. This equals to ``len(queue)``. '''
@property
def capacity(self) -> int | None:
'''Number of items allowed in the queue. None if unlimited.'''
return self._capacity
@property
def is_empty(self) -> bool:
return not self._c
@property
def is_full(self) -> bool:
return len(self._c) == self._capacity
@property
def order(self) -> Order:
return self._order
async def get(self) -> T.Awaitable[Item]:
'''
.. code-block::
item = await queue.get()
'''
if self._state is QueueState.CLOSED:
raise Closed
if self._state is QueueState.HALF_CLOSED and self.is_empty:
raise Closed
if self._is_transferring or self.is_empty:
event = ExclusiveEvent()
self._getters.append(event)
exc, item = (await event.wait())[0]
if exc is not None:
raise exc
return item
item = self._c_get()
if self._putters:
self.transfer_items()
return item
def get_nowait(self) -> Item:
'''
.. code-block::
item = queue.get_nowait()
'''
if self._state is QueueState.CLOSED:
raise Closed
if self.is_empty:
if self._state is QueueState.HALF_CLOSED:
raise Closed
raise WouldBlock
item = self._c_get()
if self._putters:
self.transfer_items()
return item
async def put(self, item) -> T.Awaitable:
'''
.. code-block::
await queue.put(item)
'''
if self._state is not QueueState.OPENED:
raise Closed
if self._is_transferring or self.is_full:
event = ExclusiveEvent()
self._putters.append((event, item, ))
exc = (await event.wait())[0][0]
if exc is not None:
raise exc
return
self._c_put(item)
if self._getters:
self.transfer_items()
def put_nowait(self, item):
'''
.. code-block::
queue.put_nowait(item)
'''
if self._state is not QueueState.OPENED:
raise Closed
if self.is_full:
raise WouldBlock
self._c_put(item)
if self._getters:
self.transfer_items()
def half_close(self):
'''
Partially closes the queue.
Putting an item is no longer allowed.
'''
if self._state is not QueueState.OPENED:
return
self._state = QueueState.HALF_CLOSED
Closed_ = Closed
for putter, __ in self._putters:
putter.fire(Closed_)
if not self.is_empty:
return
for getter in self._getters:
getter.fire(Closed_, None)
def close(self):
'''
Fully closes the queue.
Putting or getting items are no longer allowed, and any items currently held will be discarded.
'''
if self._state is QueueState.CLOSED:
return
self._state = QueueState.CLOSED
self._c.clear()
Closed_ = Closed
for putter, __ in self._putters:
putter.fire(Closed_)
for getter in self._getters:
getter.fire(Closed_, None)
async def __aiter__(self):
'''
Repeats getting an item from the queue until it gets closed.
.. code-block::
async for item in queue:
...
This is equivalent to:
.. code-block::
try:
while True:
item = await queue.get()
...
except Closed:
pass
'''
try:
while True:
yield await self.get()
except Closed:
pass
def transfer_items(self, *_unused):
if self._is_transferring:
return
self._is_transferring = True
try:
# LOAD_FAST
c_put = self._c_put
c_get = self._c_get
putters = self._putters
getters = self._getters
next_putter = putters.popleft
next_getter = getters.popleft
while True:
while (not self.is_full) and putters:
putter, item = next_putter()
if (task := putter._waiting_task) is not None:
c_put(item)
task._step(None)
if (not getters) or self.is_empty:
break
while (not self.is_empty) and getters:
getter = next_getter()
if (task := getter._waiting_task) is not None:
task._step(None, c_get())
if (not putters) or self.is_full:
break
finally:
self._is_transferring = False