-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththread_safe_data_structure.py
More file actions
59 lines (51 loc) · 1.61 KB
/
thread_safe_data_structure.py
File metadata and controls
59 lines (51 loc) · 1.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import threading
class WindowSlots:
def __init__(self, capacity) -> None:
self.capacity = capacity
self.data = [None] * self.capacity
self.front = 0
self.rear = 0
self.lock = threading.Lock()
self.event = threading.Event()
def push(self, item):
self.lock.acquire()
if (self.rear + 1) % self.capacity == self.front:
self.front = (self.front + 1) % self.capacity
rear = self.rear
front = self.front
self.rear = (self.rear + 1) % self.capacity
self.lock.release()
self.data[rear] = item
if front != rear:
self.event.set()
def top(self):
while True:
self.lock.acquire()
if self.front == self.rear:
self.lock.release()
self.event.wait()
else:
item = self.data[self.front]
self.lock.release()
if isinstance(item, tuple):
return item[1]
return item
def top_unblock(self):
self.lock.acquire()
if self.front == self.rear:
self.lock.release()
return None
else:
item = self.data[self.front]
self.lock.release()
return item
# def transform(self, func):
# while True:
# self.lock.acquire()
# if self.front == self.rear:
# self.lock.release()
# self.event.wait()
# else:
# self.data[self.front]
# self.lock.release()
# return item