-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.cpp
More file actions
126 lines (112 loc) · 3.87 KB
/
main.cpp
File metadata and controls
126 lines (112 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#include <iostream>
#include <thread>
#include <vector>
#include <string>
#include <queue>
#include <stdexcept>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
// lvalue - existing objects
// rvalue - temp objects
class ThreadPool {
private:
int pool_size_;
std::vector<std::thread> thread_pool_;
std::queue<std::function<void()>> tasks_;
std::mutex mtx;
std::condition_variable cv;
// Using atomic for thread-safe flag operations
std::atomic<bool> stop_;
public:
ThreadPool(int pool_size) : pool_size_{ pool_size }, stop_(false) {
try {
for (int i = 0; i < pool_size_; i++) {
// using emplace_back instead of push_back(copying operation) because it's moving operation
// [this] - lambda capture clause
thread_pool_.emplace_back([this] {
while(true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx);
// - on false we wait, releasing lock and go to sleep
// and wait on notify to check condition again;
// - on true we process the task
cv.wait(lock, [this] {return !tasks_.empty() || stop_; });
if (stop_ || tasks_.empty()) {
return; // exit thread
}
// move from ownership of reference from
// queue to local variable
task = std::move(tasks_.front());
tasks_.pop();
}
// cv.notify_one(); // notify any thread
try {
task(); // Execute the task
} catch (const std::exception& e) {
// Handle the exception (e.g., log it)
std::cerr << "Task execution error: " << e.what() << std::endl;
}
}
});
}
// Catch all exception no matter the type
} catch (...) {
shutdown();
// without argument re-throws the original exception
throw;
}
}
~ThreadPool() {
shutdown();
}
void shutdown() {
{
std::unique_lock<std::mutex> lock(mtx);
stop_ = true;
}
cv.notify_all(); // Wake all threads to check for stop condition
for (std::thread &th : thread_pool_) {
if (th.joinable()) {
th.join(); // Wait for all threads to finish
}
}
}
void enqueue(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(mtx);
if (stop_) {
throw std::runtime_error("Cannot enqueue on stopped ThreadPool");
}
tasks_.push(std::move(task));
}
cv.notify_one();
}
};
void print_hw() {
std::cout << "Hello world" << std::endl;
}
void print_hw_i(int i) {
try {
std::string text = "Hello world " + std::to_string(i) + "\r\n";
std::cout << text;
} catch (const std::exception& e) {
std::cerr << " Caught an exception: " << e.what() << std::endl;
} catch (...) {
std::cout << "Caught an unknown exception" << std::endl;
}
}
int main() {
ThreadPool tp(4);
tp.enqueue(print_hw);
// lambda function
tp.enqueue([](){ print_hw_i(1); });
tp.enqueue([](){ print_hw_i(2); });
tp.enqueue([](){ print_hw_i(3); });
// wait a bit for tasks to proceed or race condition happens with shutdown
std::this_thread::sleep_for(std::chrono::milliseconds(100));
tp.shutdown();
return 0;
}