44#include < thread_pool/mpmc_bounded_queue.hpp>
55#include < thread_pool/slotted_bag.hpp>
66#include < thread_pool/thread_pool_options.hpp>
7+ #include < thread_pool/thread_pool_state.hpp>
78#include < thread_pool/worker.hpp>
89#include < thread_pool/rouser.hpp>
910
@@ -53,15 +54,11 @@ class GenericThreadPool final
5354
5455 /* *
5556 * @brief Move ctor implementation.
56- * @note Be very careful when invoking this while the thread pool is
57- * active, or in an otherwise undefined state.
5857 */
5958 GenericThreadPool (GenericThreadPool&& rhs) noexcept ;
6059
6160 /* *
6261 * @brief Move assignment implementaion.v
63- * @note Be very careful when invoking this while the thread pool is
64- * active, or in an otherwise undefined state.
6562 */
6663 GenericThreadPool& operator =(GenericThreadPool&& rhs) noexcept ;
6764
@@ -109,35 +106,31 @@ class GenericThreadPool final
109106 */
110107 size_t getWorkerId ();
111108
112- SlottedBag<Queue> m_idle_workers;
113- WorkerVector m_workers;
114- Rouser m_rouser;
115109 size_t m_failed_wakeup_retry_cap;
116110 std::atomic<size_t > m_next_worker;
117- std::atomic<size_t > m_num_busy_waiters;
111+ std::shared_ptr<Rouser> m_rouser;
112+ std::shared_ptr<ThreadPoolState<Task, Queue>> m_state;
118113};
119114
120115
121116// / Implementation
122117
123118template <typename Task, template <typename > class Queue >
124119inline GenericThreadPool<Task, Queue>::GenericThreadPool(ThreadPoolOptions options)
125- : m_idle_workers(options.threadCount())
126- , m_workers(options.threadCount())
127- , m_rouser(options.rousePeriod())
128- , m_failed_wakeup_retry_cap(options.failedWakeupRetryCap())
120+ : m_failed_wakeup_retry_cap(options.failedWakeupRetryCap())
129121 , m_next_worker(0 )
130- , m_num_busy_waiters(0 )
122+ , m_rouser(std::make_shared<Rouser>(options.rousePeriod()))
123+ , m_state(ThreadPoolState<Task, Queue>::create(options))
131124{
132125 // Instatiate all workers.
133- for (auto it = m_workers .begin (); it != m_workers .end (); ++it)
126+ for (auto it = m_state-> workers () .begin (); it != m_state-> workers () .end (); ++it)
134127 it->reset (new Worker<Task, Queue>(options.busyWaitOptions (), options.queueSize ()));
135128
136129 // Initialize all worker threads.
137- for (size_t i = 0 ; i < m_workers .size (); ++i)
138- m_workers [i]->start (i, m_workers, m_idle_workers, m_num_busy_waiters );
130+ for (size_t i = 0 ; i < m_state-> workers () .size (); ++i)
131+ m_state-> workers () [i]->start (i, m_state );
139132
140- m_rouser. start (m_workers, m_idle_workers, m_num_busy_waiters );
133+ m_rouser-> start (m_state );
141134}
142135
143136template <typename Task, template <typename > class Queue >
@@ -151,12 +144,10 @@ inline GenericThreadPool<Task, Queue>& GenericThreadPool<Task, Queue>::operator=
151144{
152145 if (this != &rhs)
153146 {
154- m_idle_workers = std::move (rhs.m_idle_workers );
155- m_workers = std::move (rhs.m_workers );
156- m_rouser = std::move (rhs.m_rouser );
157147 m_failed_wakeup_retry_cap = rhs.m_failed_wakeup_retry_cap ;
158148 m_next_worker = rhs.m_next_worker .load ();
159- m_num_busy_waiters = rhs.m_num_busy_waiters .load ();
149+ m_rouser = std::move (rhs.m_rouser );
150+ m_state = std::move (rhs.m_state );
160151 }
161152
162153 return *this ;
@@ -165,16 +156,22 @@ inline GenericThreadPool<Task, Queue>& GenericThreadPool<Task, Queue>::operator=
165156template <typename Task, template <typename > class Queue >
166157inline GenericThreadPool<Task, Queue>::~GenericThreadPool ()
167158{
168- m_rouser.stop ();
159+ if (!m_state || !m_rouser)
160+ return ;
169161
170- for (auto & worker_ptr : m_workers)
162+ m_rouser->stop ();
163+
164+ for (auto & worker_ptr : m_state->workers ())
171165 worker_ptr->stop ();
172166}
173167
174168template <typename Task, template <typename > class Queue >
175169template <typename Handler>
176170inline bool GenericThreadPool<Task, Queue>::tryPost(Handler&& handler)
177171{
172+ if (!m_state || !m_rouser)
173+ throw std::runtime_error (" Attempting to invoke post on a moved object." );
174+
178175 return tryPostImpl (std::forward<Handler>(handler), m_failed_wakeup_retry_cap);
179176}
180177
@@ -195,13 +192,13 @@ inline bool GenericThreadPool<Task, Queue>::tryPostImpl(Handler&& handler, size_
195192 // is fully utilized (num active workers = argmin(num tasks, num total workers)).
196193 // If there aren't busy waiters, let's see if we have any idling threads.
197194 // These incur higher overhead to wake up than the busy waiters.
198- if (m_num_busy_waiters .load (std::memory_order_acquire) == 0 )
195+ if (m_state-> numBusyWaiters () .load (std::memory_order_acquire) == 0 )
199196 {
200- auto result = m_idle_workers .tryEmptyAny ();
197+ auto result = m_state-> idleWorkers () .tryEmptyAny ();
201198 if (result.first )
202199 {
203- auto success = m_workers [result.second ]->tryPost (std::forward<Handler>(handler));
204- m_workers [result.second ]->wake ();
200+ auto success = m_state-> workers () [result.second ]->tryPost (std::forward<Handler>(handler));
201+ m_state-> workers () [result.second ]->wake ();
205202
206203 // The above post will only fail if the idle worker's queue is full, which is an extremely
207204 // low probability scenario. In that case, we wake the worker and let it get to work on
@@ -219,24 +216,24 @@ inline bool GenericThreadPool<Task, Queue>::tryPostImpl(Handler&& handler, size_
219216 auto initialWorkerId = id;
220217 do
221218 {
222- if (m_workers [id]->tryPost (std::forward<Handler>(handler)))
219+ if (m_state-> workers () [id]->tryPost (std::forward<Handler>(handler)))
223220 {
224221 // The following section increases the probability that tasks will not be dropped.
225222 // This is a soft constraint, the strict task dropping bound is covered by the Rouser
226223 // thread's functionality. This code experimentally lowers task response time under
227224 // low thread pool utilization without incurring significant performance penalties at
228225 // high thread pool utilization.
229- if (m_num_busy_waiters .load (std::memory_order_acquire) == 0 )
226+ if (m_state-> numBusyWaiters () .load (std::memory_order_acquire) == 0 )
230227 {
231- auto result = m_idle_workers .tryEmptyAny ();
228+ auto result = m_state-> idleWorkers () .tryEmptyAny ();
232229 if (result.first )
233- m_workers [result.second ]->wake ();
230+ m_state-> workers () [result.second ]->wake ();
234231 }
235232
236233 return true ;
237234 }
238235
239- ++id %= m_workers .size ();
236+ ++id %= m_state-> workers () .size ();
240237 } while (id != initialWorkerId);
241238
242239 // All Queues in our thread pool are full during one whole iteration.
@@ -249,8 +246,8 @@ inline size_t GenericThreadPool<Task, Queue>::getWorkerId()
249246{
250247 auto id = Worker<Task, Queue>::getWorkerIdForCurrentThread ();
251248
252- if (id > m_workers .size ())
253- id = m_next_worker.fetch_add (1 , std::memory_order_relaxed) % m_workers .size ();
249+ if (id > m_state-> workers () .size ())
250+ id = m_next_worker.fetch_add (1 , std::memory_order_relaxed) % m_state-> workers () .size ();
254251
255252 return id;
256253}
0 commit comments