-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTaskController.cpp
More file actions
130 lines (112 loc) · 3.13 KB
/
TaskController.cpp
File metadata and controls
130 lines (112 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
* Copyright (C) 2021 Ilya Entin
*/
#include "TaskController.h"
#include "Logger.h"
#include "ServerOptions.h"
#include "Task.h"
TaskControllerPtr TaskController::_instance;
TaskController::Phase TaskController::_phase = PREPROCESSTASK;
std::mutex TaskController::_mutex;
TaskController::TaskController() :
_barrier(ServerOptions::_numberWorkThreads, onTaskCompletion),
_threadPool(ServerOptions::_numberWorkThreads) {
// start with empty task
_task = std::make_shared<Task>();
}
TaskControllerWeakPtr TaskController::getWeakPtr() {
return _instance->weak_from_this();
}
// This method is called by one of the threads
// when the current barrier phase completes.
void TaskController::onTaskCompletion() noexcept {
if (_instance)
_instance->onCompletion();
}
void TaskController::onCompletion() {
switch (_phase) {
case PREPROCESSTASK:
if (Task::_preprocessRequest) {
_task->sortIndices();
_task->resetIndex();
}
_phase = PROCESSTASK;
break;
case PROCESSTASK:
_task->finish();
// Blocks until the new task is available.
setNextTask();
_task->resetIndex();
_phase = PREPROCESSTASK;
break;
}
}
bool TaskController::start() {
for (int i = 0; i < ServerOptions::_numberWorkThreads; ++i) {
auto worker = std::make_shared<Worker>(_instance);
_threadPool.push(worker);
}
return true;
}
void TaskController::push(TaskPtr task) {
std::lock_guard lock(_queueMutex);
_queue.push(task);
_queueCondition.notify_one();
}
void TaskController::processTask(TaskPtr task) {
auto future = task->getPromise().get_future();
push(task);
future.get();
}
void TaskController::setNextTask() {
std::unique_lock lock(_queueMutex);
_queueCondition.wait(lock, [this] { return !_queue.empty() || _stopped; });
if (_stopped)
return;
_task = _queue.front();
_queue.pop();
}
bool TaskController::create() {
std::lock_guard lock(_mutex);
if (!_instance)
_instance = std::make_shared<TaskController>();
return _instance->start();
}
void TaskController::stop() {
// stop threads
{
std::lock_guard lock(_queueMutex);
_stopped.store(true);
_queueCondition.notify_one();
}
_threadPool.stop();
}
void TaskController::destroy() {
if (_instance)
_instance->stop();
// destroy controller
std::shared_ptr<TaskController>().swap(_instance);
}
TaskController::Worker::Worker(TaskControllerWeakPtr taskController) :
Runnable(ServerOptions::_numberWorkThreads),
_taskController(taskController) {}
// Process the current task (batch of requests) by all threads. Arrive
// at the sync point when the task is done and wait for the next one.
void TaskController::Worker::run() noexcept {
if (auto taskController = _taskController.lock(); taskController) {
auto& stopped = taskController->_stopped;
auto& task = taskController->_task;
auto& barrier = taskController->_barrier;
while (!stopped) {
if (_phase == PROCESSTASK) {
while (task->processNext());
barrier.arrive_and_wait();
}
else if (_phase == PREPROCESSTASK) {
if (Task::_preprocessRequest)
while (task->preprocessNext());
barrier.arrive_and_wait();
}
}
}
}