-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathpriority_queue.py
More file actions
372 lines (309 loc) · 13.2 KB
/
priority_queue.py
File metadata and controls
372 lines (309 loc) · 13.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
# Copyright (C) 2025 Intel Corporation
# SPDX-License-Identifier: Apache-2.0
"""
Priority queue implementation with support for task prioritization, ordering, and updating.
This module provides a priority queue that allows tasks to be added with priorities,
and supports operations to update, remove, and retrieve tasks based on their priorities.
"""
import heapq
import bisect
import itertools
from typing import List, Dict, Optional, Tuple, Any
class PriorityQueue:
# pylint: disable=invalid-name
"""
A priority queue implementation that supports task prioritization and ordering.
This class allows tasks to be added with a specified priority, and supports
operations to update, remove, and retrieve tasks based on their priority.
"""
class __PriorityQueueIter:
"""
An iterator for the PriorityQueue class.
This iterator allows for iterating over the tasks in the priority queue
while ensuring that the queue's size does not change during iteration.
"""
def __init__(self, pq, removed):
"""
Initializes the iterator with the priority queue and removed marker.
Args:
pq: The priority queue to iterate over.
removed: The marker for removed tasks.
"""
self.__pq = pq if pq else []
self.__initial_len = len(self.__pq)
self.__removed = removed
self.__current = 0
def __next__(self):
"""
Returns the next task in the priority queue.
Returns:
tuple: The (priority, task) pair of the next task.
Raises:
RuntimeError: If the priority queue changes size during iteration.
StopIteration: If there are no more tasks to iterate over.
"""
if len(self.__pq) != self.__initial_len:
raise RuntimeError("PriorityQueue changed size during iteration.")
# Skip all removed tasks
while (
self.__current < len(self.__pq)
and self.__pq[self.__current][-1] is self.__removed
):
self.__current += 1
if self.__current >= len(self.__pq):
raise StopIteration
priority, _, task = self.__pq[self.__current]
self.__current += 1 # point to nex element
return (priority, task)
class __PriorityTracker:
"""
A helper class to track tasks by their priority.
This class maintains a mapping of priorities to tasks and supports
operations to add, find, and remove tasks based on their priority.
"""
def __init__(self):
"""
Initializes the priority tracker with empty mappings.
"""
self.__priority_dict = (
{}
) # dict(int, SortedList(task)): maps priority to unordered set of tasks with same priority
self.__priority_dict_set = (
{}
) # dict(int, set(task)): maps priority to unordered set of tasks with same priority
def find(self, priority: int) -> object:
"""
Finds a task with the specified priority.
Args:
priority (int): The priority to search for.
Returns:
object: A task with the specified priority, or None if not found.
"""
return (
next(iter(self.__priority_dict[priority]))[1]
if priority in self.__priority_dict
else None
)
def push(self, priority: int, tie_breaker: tuple, task: object):
"""
Adds a task with the specified priority and tie breaker.
Args:
priority (int): The priority of the task.
tie_breaker (tuple): A tuple used to break ties between tasks with the same priority.
task (object): The task to add.
Raises:
ValueError: If the task is None.
"""
if task is None:
raise ValueError("`task` cannot be `None`.")
if priority not in self.__priority_dict:
self.__priority_dict[priority] = []
assert priority not in self.__priority_dict_set
self.__priority_dict_set[priority] = set()
if task not in self.__priority_dict_set[priority]:
bisect.insort_right(self.__priority_dict[priority], (tie_breaker, task))
self.__priority_dict_set[priority].add(task)
def pop(self, priority: int, task=None) -> object:
"""
Removes a task with the specified priority.
Args:
priority (int): The priority of the task to remove.
task (object, optional): The specific task to remove. If None, the first task is removed.
Raises:
KeyError: If the priority is not found.
ValueError: If the specified task is not found in the priority.
Returns:
object: The task that was removed.
"""
if priority not in self.__priority_dict:
raise KeyError(str(priority))
retval = None
assert priority in self.__priority_dict_set
if task:
# Find index for task
idx = next(
(
i
for i, (_, contained_task) in enumerate(
self.__priority_dict[priority]
)
if contained_task == task
),
len(self.__priority_dict[priority]),
)
if idx >= len(self.__priority_dict[priority]):
raise ValueError("`task` not found in priority.")
_, retval = self.__priority_dict[priority].pop(idx)
assert retval == task
else:
# Remove first task
_, retval = self.__priority_dict[priority].pop(0)
self.__priority_dict_set[priority].remove(retval)
if len(self.__priority_dict[priority]) <= 0:
# Remove priority from dictionary if empty (we do not want to keep too many of these around)
self.__priority_dict.pop(priority)
assert len(self.__priority_dict_set[priority]) <= 0
self.__priority_dict_set.pop(priority)
return retval
__REMOVED = object() # Placeholder for a removed task
def __init__(self, queue: Optional[List[Tuple[int, Any]]] = None):
"""
Creates a new PriorityQueue object.
Args:
queue (Optional[List[Tuple[int, Any]]], optional): A list of (priority, task) tuples to initialize the queue.
This is an O(len(queue)) operation.
Raises:
ValueError: If any task in the queue is None.
"""
# entry: [priority: int, nonce: int, task: hashable_object]
self.__pq: List[List[Any]] = (
[]
) # list(entry) - List of entries arranged in a heap
self.__entry_finder: Dict[Any, List[Any]] = (
{}
) # dictionary(task: Hashable_object, entry) - mapping of tasks to entries
self.__priority_tracker = (
PriorityQueue.__PriorityTracker()
) # Tracks tasks by priority
self.__counter = itertools.count(1) # Unique sequence count
if queue:
for priority, task in queue:
if task is None:
raise ValueError("`queue`: tasks cannot be `None`.")
count = next(self.__counter)
entry = [priority, ((0,), count), task]
self.__entry_finder[task] = entry
self.__priority_tracker.push(priority, ((0,), count), task)
heapq.heappush(self.__pq, entry)
heapq.heapify(self.__pq)
def __bool__(self):
"""
Returns True if the priority queue is not empty, False otherwise.
Returns:
bool: True if it is not empty, False otherwise.
"""
return len(self) > 0
def __contains__(self, task: object):
"""
Checks if a task is in the priority queue.
Args:
task (object): The task to check for.
Returns:
bool: True if it is in the queue, False otherwise.
"""
return task in self.__entry_finder
def __iter__(self):
"""
Returns an iterator over the tasks in the priority queue.
Returns:
__PriorityQueueIter: An iterator over the tasks in the queue.
"""
return PriorityQueue.__PriorityQueueIter(self.__pq, PriorityQueue.__REMOVED)
def __len__(self):
"""
Returns the number of tasks in the priority queue.
Returns:
int: The number of tasks.
"""
return len(self.__entry_finder)
def __repr__(self):
"""
Returns a string representation of the priority queue.
Returns:
str: A string representation of the queue.
"""
return f"<{type(self).__name__} object at {hex(id(self))}>(len={len(self)}, pq={self.__pq})"
def push(
self, priority: int, task: object, tie_breaker: Optional[Tuple[int, ...]] = None
):
"""
Adds a new task or update the priority of an existing task.
Args:
priority (int): The priority of the task.
task (object): The task to add or update.
tie_breaker (Optional[Tuple[int, ...]], optional): A tuple of ints to use as a tie breaker for tasks
of the same priority. Defaults to (0,) if None.
Raises:
ValueError: If the task is None.
TypeError: If the tie_breaker is not a tuple of ints or None.
"""
if task is None:
raise ValueError("`task` cannot be `None`.")
if tie_breaker is not None and not all(isinstance(x, int) for x in tie_breaker):
raise TypeError("`tie_breaker` expected tuple of `int`s, or `None`.")
b_add_needed = True
if task in self.__entry_finder:
old_priority, (old_tie_breaker, _), _ = self.__entry_finder[task]
if tie_breaker is None:
tie_breaker = old_tie_breaker
if old_priority != priority or tie_breaker != old_tie_breaker:
self.remove(task)
else:
# same task without priority change detected: no need to add
b_add_needed = False
if tie_breaker is None:
tie_breaker = (0,)
if b_add_needed:
if len(self.__pq) == 0:
self.__counter = itertools.count(
1
) # restart sequence count when queue is empty
count = next(self.__counter)
entry = [priority, (tie_breaker, count), task]
self.__entry_finder[task] = entry
self.__priority_tracker.push(priority, (tie_breaker, count), task)
heapq.heappush(self.__pq, entry)
def remove(self, task: object):
"""
Removes an existing task from the priority queue.
Args:
task (object): The task to remove from the queue. It must exist.
Raises:
KeyError: If the task is not found in the queue.
"""
# mark an existing task as PriorityQueue.__REMOVED.
entry = self.__entry_finder.pop(task)
priority, *_ = entry
self.__priority_tracker.pop(
priority, task
) # remove it from the priority tracker
entry[-1] = PriorityQueue.__REMOVED
def peek(self) -> Optional[Tuple[int, Any]]:
"""
Returns the task with the lowest priority without removing it from the queue.
Returns:
Optional[Tuple[int, Any]]: The (priority, task) pair of the task with the lowest priority,
or None if the queue is empty.
"""
# make sure head is not a removed task
while self.__pq and self.__pq[0][-1] is PriorityQueue.__REMOVED:
heapq.heappop(self.__pq)
retval = None
if self.__pq:
priority, _, task = self.__pq[0]
retval = (priority, task)
return retval
def find(self, priority: int) -> object:
"""
Returns a task with the specified priority, if there is one.
The returned task is not removed from the priority queue.
Args:
priority (int): The priority of the task to find.
Returns:
object: The task with the specified priority, or None if no such task exists.
"""
return self.__priority_tracker.find(priority)
def pop(self) -> tuple:
"""
Removes and return the task with the lowest priority.
Returns:
tuple: The (priority, task) pair of the task that was removed.
Raises:
IndexError: If the queue is empty.
"""
task = PriorityQueue.__REMOVED
while task is PriorityQueue.__REMOVED: # make sure head is not a removed task
priority, _, task = heapq.heappop(self.__pq)
self.__entry_finder.pop(task)
self.__priority_tracker.pop(priority, task)
return (priority, task)