-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathThreadPool.h
More file actions
69 lines (56 loc) · 1.65 KB
/
ThreadPool.h
File metadata and controls
69 lines (56 loc) · 1.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#pragma once
#include "TSQueue.h"
#include <atomic>
#include <functional>
#include <memory>
#include <thread>
#include <vector>
template <typename T> class ThreadPool {
private:
std::unique_ptr<TSQueue<T>[]> queueList;
std::vector<std::thread> workers;
int maxThreads{0};
// for round robin load balancing of tasks
std::atomic<int> pos{0};
void worker_loop(int ind) {
T task;
while (queueList[ind].pop(task)) {
task();
}
}
public:
ThreadPool() {
maxThreads = std::thread::hardware_concurrency();
queueList = std::make_unique<TSQueue<T>[]>(maxThreads);
for (int i = 0; i < maxThreads; i++) {
// here on this line we are directly making a thread by passing all the required
// parameters.
// we could also do std::thread thWorker(&ThreadPool::worker:loop, this, i);
// and then emplace_back, but this is shorter and better version
// as we do not have to call the worker_loop function, it will be called under the hood
workers.emplace_back(&ThreadPool::worker_loop, this, i);
}
}
void addTask(std::function<void()> task) {
if (task == nullptr)
return;
// for this search and understand CAS ( compare and exchange strong ) theory
int currentPos = pos.load();
int nextPos;
do {
nextPos = (currentPos + 1) % maxThreads;
} while (!pos.compare_exchange_strong(currentPos, nextPos));
queueList[currentPos].insert(std::move(task));
}
~ThreadPool() {
for (int i = 0; i < maxThreads; i++) {
// closing every queue
queueList[i].close();
}
for (auto &th : workers) {
if (th.joinable()) {
th.join();
}
}
}
};