Skip to content

Commit 8990d50

Browse files
feat(datastructures): add thread-safe bounded queue implementation
Implements a thread-safe blocking queue using ReentrantLock and Condition variables for producer-consumer synchronization. ### What This Adds **ThreadSafeQueue.java** - Thread-safe bounded queue: - `enqueue()` - Blocking add to tail, waits when queue is full - `dequeue()` - Blocking remove from head, waits when queue is empty - `offer()` - Non-blocking add, returns false when full - `poll()` - Non-blocking remove, returns null when empty - `size()`, `isEmpty()`, `isFull()`, `capacity()` - State queries - Uses circular buffer for O(1) enqueue/dequeue operations - Supports multiple concurrent producers and consumers **ThreadSafeQueueTest.java** - Comprehensive test suite: - Basic enqueue/dequeue operations - Offer/poll non-blocking behavior - Null rejection validation - Invalid capacity rejection - Circular buffer wrap-around - Multiple producers single consumer concurrency - Single producer multiple consumers concurrency - Blocking behavior verification - Stress test with 8 concurrent threads ### Algorithm Uses a circular buffer with ReentrantLock and two Condition variables: - `notFull` - signaled when space becomes available - `notEmpty` - signaled when items are added - Producers await notFull when buffer is full - Consumers await notEmpty when buffer is empty - Signal opposite condition after each operation Time: O(1) enqueue/dequeue | Space: O(n) bounded buffer ### Reference https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem
1 parent 4b8099c commit 8990d50

2 files changed

Lines changed: 486 additions & 0 deletions

File tree

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package com.thealgorithms.datastructures.queues;
2+
3+
import java.util.concurrent.locks.Condition;
4+
import java.util.concurrent.locks.ReentrantLock;
5+
6+
/**
7+
* @brief Thread-safe bounded queue implementation using ReentrantLock and Condition variables
8+
* @details A blocking queue that supports multiple producers and consumers.
9+
* Uses a circular buffer internally with lock-based synchronization to ensure
10+
* thread safety. Producers block when the queue is full, and consumers block
11+
* when the queue is empty.
12+
* @see <a href="https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem">Producer-Consumer Problem</a>
13+
*/
14+
public class ThreadSafeQueue<T> {
15+
16+
private final Object[] buffer;
17+
private final int capacity;
18+
private int head;
19+
private int tail;
20+
private int count;
21+
private final ReentrantLock lock;
22+
private final Condition notFull;
23+
private final Condition notEmpty;
24+
25+
/**
26+
* @brief Constructs a ThreadSafeQueue with the specified capacity
27+
* @param capacity the maximum number of elements the queue can hold
28+
* @throws IllegalArgumentException if capacity is less than or equal to zero
29+
*/
30+
public ThreadSafeQueue(int capacity) {
31+
if (capacity <= 0) {
32+
throw new IllegalArgumentException("Capacity must be greater than zero.");
33+
}
34+
this.capacity = capacity;
35+
this.buffer = new Object[capacity];
36+
this.head = 0;
37+
this.tail = 0;
38+
this.count = 0;
39+
this.lock = new ReentrantLock();
40+
this.notFull = lock.newCondition();
41+
this.notEmpty = lock.newCondition();
42+
}
43+
44+
/**
45+
* @brief Adds an element to the tail of the queue, blocking if full
46+
* @param item the element to add
47+
* @throws InterruptedException if the thread is interrupted while waiting
48+
* @throws IllegalArgumentException if the item is null
49+
*/
50+
public void enqueue(T item) throws InterruptedException {
51+
if (item == null) {
52+
throw new IllegalArgumentException("Cannot enqueue null item.");
53+
}
54+
55+
lock.lock();
56+
try {
57+
while (count == capacity) {
58+
notFull.await();
59+
}
60+
buffer[tail] = item;
61+
tail = (tail + 1) % capacity;
62+
count++;
63+
notEmpty.signal();
64+
} finally {
65+
lock.unlock();
66+
}
67+
}
68+
69+
/**
70+
* @brief Removes and returns the element at the head of the queue, blocking if empty
71+
* @return the element at the head of the queue
72+
* @throws InterruptedException if the thread is interrupted while waiting
73+
*/
74+
@SuppressWarnings("unchecked")
75+
public T dequeue() throws InterruptedException {
76+
lock.lock();
77+
try {
78+
while (count == 0) {
79+
notEmpty.await();
80+
}
81+
T item = (T) buffer[head];
82+
buffer[head] = null;
83+
head = (head + 1) % capacity;
84+
count--;
85+
notFull.signal();
86+
return item;
87+
} finally {
88+
lock.unlock();
89+
}
90+
}
91+
92+
/**
93+
* @brief Adds an element to the tail of the queue without blocking
94+
* @param item the element to add
95+
* @return true if the element was added, false if the queue was full
96+
* @throws IllegalArgumentException if the item is null
97+
*/
98+
public boolean offer(T item) {
99+
if (item == null) {
100+
throw new IllegalArgumentException("Cannot enqueue null item.");
101+
}
102+
103+
lock.lock();
104+
try {
105+
if (count == capacity) {
106+
return false;
107+
}
108+
buffer[tail] = item;
109+
tail = (tail + 1) % capacity;
110+
count++;
111+
notEmpty.signal();
112+
return true;
113+
} finally {
114+
lock.unlock();
115+
}
116+
}
117+
118+
/**
119+
* @brief Removes and returns the element at the head without blocking
120+
* @return the element at the head, or null if the queue is empty
121+
*/
122+
@SuppressWarnings("unchecked")
123+
public T poll() {
124+
lock.lock();
125+
try {
126+
if (count == 0) {
127+
return null;
128+
}
129+
T item = (T) buffer[head];
130+
buffer[head] = null;
131+
head = (head + 1) % capacity;
132+
count--;
133+
notFull.signal();
134+
return item;
135+
} finally {
136+
lock.unlock();
137+
}
138+
}
139+
140+
/**
141+
* @brief Returns the number of elements in the queue
142+
* @return the current size of the queue
143+
*/
144+
public int size() {
145+
lock.lock();
146+
try {
147+
return count;
148+
} finally {
149+
lock.unlock();
150+
}
151+
}
152+
153+
/**
154+
* @brief Checks if the queue is empty
155+
* @return true if the queue contains no elements
156+
*/
157+
public boolean isEmpty() {
158+
lock.lock();
159+
try {
160+
return count == 0;
161+
} finally {
162+
lock.unlock();
163+
}
164+
}
165+
166+
/**
167+
* @brief Checks if the queue is full
168+
* @return true if the queue has reached its capacity
169+
*/
170+
public boolean isFull() {
171+
lock.lock();
172+
try {
173+
return count == capacity;
174+
} finally {
175+
lock.unlock();
176+
}
177+
}
178+
179+
/**
180+
* @brief Returns the maximum capacity of the queue
181+
* @return the capacity
182+
*/
183+
public int capacity() {
184+
return capacity;
185+
}
186+
}

0 commit comments

Comments
 (0)