-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdispatch_queue.cpp
More file actions
135 lines (112 loc) · 2.21 KB
/
dispatch_queue.cpp
File metadata and controls
135 lines (112 loc) · 2.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#include "dispatch_queue.hpp"
namespace dispatch_queue {
namespace detail {
worker_pool::~worker_pool() {
shutdown();
}
int worker_pool::thread_count() const {
return worker_threads.size();
}
size_t worker_pool::size() {
std::lock_guard<std::mutex> lk(mutex);
return task_queue.size();
}
void worker_pool::enqueue_task(std::function<void()>&& task) {
{
std::lock_guard<std::mutex> lk(mutex);
task_queue.push_back(std::move(task));
}
condition_variable.notify_one();
}
void worker_pool::clear() {
std::lock_guard<std::mutex> lk(mutex);
task_queue.clear();
}
void worker_pool::shutdown() {
if (worker_threads.empty()) {
return;
}
{
std::lock_guard<std::mutex> lk(mutex);
is_shutting_down = true;
}
for (int i = 0; i < thread_count(); i++) {
condition_variable.notify_one();
}
for (auto& thread : worker_threads) {
if (thread.joinable()) {
thread.join();
}
}
worker_threads.clear();
is_shutting_down = false;
}
void worker_pool::run_task_loop() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lk(mutex);
condition_variable.wait(lk, [this]() { return is_shutting_down || !task_queue.empty(); });
if (is_shutting_down) {
return;
}
task = std::move(task_queue.front());
task_queue.pop_front();
}
task();
}
}
} // end namespace detail
dispatch_queue::dispatch_queue()
: dispatch_queue(0)
{
}
dispatch_queue::dispatch_queue(int thread_count)
: dispatch_queue(thread_count, [](int){})
{
}
dispatch_queue::~dispatch_queue() {
shutdown();
}
bool dispatch_queue::is_threaded() const {
return worker_pool != nullptr;
}
int dispatch_queue::thread_count() const {
if (worker_pool) {
return worker_pool->thread_count();
}
else {
return 0;
}
}
size_t dispatch_queue::size() const {
if (worker_pool) {
return worker_pool->size();
}
else {
return task_queue.size();
}
}
bool dispatch_queue::empty() const {
return size() == 0;
}
void dispatch_queue::clear() {
if (worker_pool) {
worker_pool->clear();
}
else {
task_queue.clear();
}
}
void dispatch_queue::wait() {
auto future = dispatch([](){});
future.wait();
}
void dispatch_queue::shutdown() {
clear();
if (worker_pool) {
delete worker_pool;
worker_pool = nullptr;
}
}
}