-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWaitQueue.h
More file actions
103 lines (86 loc) · 3.12 KB
/
Copy pathWaitQueue.h
File metadata and controls
103 lines (86 loc) · 3.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#pragma once
#include "SpinLock.h"
#include <future>
#include <deque>
#include <thread>
#include <utility>
#include <stdexcept>
namespace Utility
{
/**
* WaitQueue.
* A re-entrant mutex with enqueued clients (waiters).
* The promise at the front is fulfilled for acquire(), and
* remains on the queue as a placeholder until release(), at
* which point it is dismissed and the successor is fulfilled.
*/
class WaitQueue
{
public:
WaitQueue() = default;
// promises still on the queue will set_exception() on their futures
~WaitQueue() noexcept = default;
void lock() { append( std::this_thread::get_id() ).wait(); }
void unlock() { remove( std::this_thread::get_id() ); }
void acquire() { lock(); }
void release() { unlock(); }
private:
using ThdId = std::thread::id;
using Promise = std::promise<void>; // producer of ready signal
using Future = std::future<void>; // consumer of ready signal
using Pair = std::pair<ThdId, Promise>;
using Queue = std::deque<Pair>; // acquisition order
using Flag = std::atomic_flag;
Queue queue_;
ThdId curr_; // current holder of "lock"
Promise rpt_; // non-queue placeholder for re-entrancy
unsigned count_; // re-entrancy count
Flag flag_{ATOMIC_FLAG_INIT}; // mutex
// enable front of queue and mark its id
void fulfill( Pair& pr )
{
curr_ = pr.first;
count_ = 1;
pr.second.set_value();
}
// re-entrancy is not cheap!
Future re_enter()
{
rpt_ = Promise();
++count_;
rpt_.set_value();
return rpt_.get_future();
}
// new promise, fulfilled immediately if the only one
Future append( ThdId id )
{
SpinLock _lock(flag_);
if ( curr_ == id ) { return re_enter(); }
queue_.emplace_back( std::make_pair( id, Promise() ) );
// check empty -> non-empty transition
if ( queue_.size() == 1 ) { fulfill( queue_.front() ); }
return queue_.back().second.get_future();
}
// unblock placeholder, fulfill next promise if present
void remove( ThdId id )
{
SpinLock _lock(flag_);
if ( curr_ != id ) { throw std::runtime_error("release by wrong thread"); }
if ( --count_ == 0 )
{
queue_.pop_front();
if ( queue_.empty() ) { curr_ = ThdId(); }
else { fulfill( queue_.front() ); }
}
}
WaitQueue(WaitQueue const&) = delete;
WaitQueue& operator=( WaitQueue ) = delete;
};
class Ticket
{
WaitQueue& wq_;
public:
explicit Ticket(WaitQueue& wq) : wq_(wq) { wq.lock(); }
~Ticket() noexcept { wq_.unlock(); }
};
} // namespace Utility