Threads & Tasks
多线程是指在一个进程中同时有多个线程运行, 这些线程间可能是独立的, 也可能进行通信. C++多线程有2种基本方式:
- Threads: 用
std::thread
创建, 没有返回值 - Tasks: 用
std::async
创建, 有返回值
无论哪种方式, 都可以使用以下3种方法传参:
- pointer to function
- functors
- lambda function
例如有一个求和函数sum(int& s, int l, int r)
,
单线程可能耗时很久, 因此可以用多个线程同时计算多个部分和,
最终累加部分和即可.
1 | // create and start each thread |
如果需要很多线程, 可以用数组: 1
2
3
4
5
6
7
8for (int i = 0; i < 10; ++i) {
threads.push_back(thread(sum, std::ref(partialSum[i]), i * step, (i + 1) * step));
}
for (thread &t : threads) {
if (t.joinable()) {
t.join();
}
}
除了函数指针, 还可以用Functor Object.
仿函数是实现了operator()
的class/struct
:
1 | struct cmp { |
通过仿函数传参: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15class sumFunctor {
public:
void operator() (int l, int r) {
_sum = 0;
for (int i = l; i < r; ++i) {
_sum += i;
}
}
int _sum;
};
for (int i = 0; i < 10; ++i) {
sumFunctor* functor = new sumFunctor();
threads.push_back(thread(std::ref(*functor), i * step, (i + 1) * step));
}
thread()
构造函数的第一个参数可以有2种选择:
- functor object: 后续无需使用仿函数的成员变量
- ref(functor obj): 后续需要使用成员变量
除了用引用搞定返回值, 还可以将其存储到成员变量中供后续使用.
第3种方式是用lambda function传参: 1
2
3
4
5
6
7for (int i = 0; i < 10; ++i) {
threads.push_back(thread([i, &partialSum, step] {
for (int j = i * step; j < (i + 1) * step; ++j) {
partialSum[i] += j;
}
}));
}
如果sum
确实需要返回值T
,
可以使用std::async
生成返回值future<T>
:
1
2
3
4
5
6
7
8
9vector<future<int>> tasks;
for (int i = 0; i < 10; ++i) {
// create, start and push each task
tasks.push_back(std::async(sum, i * step, (i + 1) * step));
}
// if future value not ready, main() blocks. similar to join()
for (auto& t : tasks) {
total += t.get(); // wait for tasks to end and read return values
}
Mutex & Conditional Variables
由于多个线程可能需要同时访问共享变量, 因此会产生race condition,
对最终结果造成不确定性. 最经典的例子便是计数器: 1
2
3
4
5
6
7
8
9
10int cnt = 0;
void increase() {
for (int i = 0; i < 100; ++i) {
++cnt;
}
}
for (int i = 0; i < 100; ++i) {
threads.push_back(thread(increase));
}cnt
增加100, 但是最终结果却未必是10000.
核心原因在于++cnt
不是原子操作, 可以粗略拆为3个阶段: 读取,
加一, 写入. 因此2个线程的执行顺序可以是: 1
2t1: read(0) inc write(1)
t2: read(0) inc write(1)1
2t1: read1 read2 inc write
t2: read inc write
可以发现: 核心在于多个线程需要竞争访问共享资源, 并且线程间执行顺序不合法, 导致undefined behavior. 解决方案有3种:
- mutex and locks: mutex lock/unlock, lock_guard, unique_lock, shared_lock, scoped_lock
std::atomic
: memory models- abstraction: CSP, Actors, Map-Reduce
所谓mutex(mutual exclusion), 本质上还是为了在同一时间只有单个线程访问共享资源, 使得共享资源的访问原子化.
所以计数器程序可以更正为: 1
2
3
4
5
6
7
8
9std::mutex mtx; // mutex is shared by all threads
int cnt; // shared memory
void increase() {
for (int i = 0; i < 100; ++i) {
mtx.lock();
++cnt; // critical section
mtx.unlock();
}
}
- 忘记
unlock
, 其他线程永远无法进入临界区 - 如果临界区抛出异常, 无法调用
unlock
需要注意的是:
lock()
两次或unlock()
两次都是未定义行为,
应当避免.
为了减少意外,
C++提供了lock_guard
可以自动lock/unlock
:
1
2
3
4
5
6
7
8std::mutex mtx; // mutex is shared by all threads
int cnt; // shared memory
void increase() {
for (int i = 0; i < 100; ++i) {
lock_guard<mutex> guard(mtx);
++cnt;
}
}
lock_guard
没有提供lock/unlock
接口,
不够灵活. 因此C++引入了unique_lock
: lock_guard
+ lock/unlock
接口. 1
2
3
4
5
6
7
8
9
10std::mutex mtx; // mutex is shared by all threads
int cnt; // shared memory
void increase() {
for (int i = 0; i < 100; ++i) {
unique_lock<mutex> ul(mtx);
++cnt;
ul.unlock();
ul.lock();
}
}
为了提高并发效率, 可以让多个线程同时读取,
于是就有了shared_lock
: 1
2
3
4
5
6
7
8
9
10
11
12
13
14std::shared_mutex mtx;
int cnt; // shared memory
void increase() {
for (int i = 0; i < 100; ++i) {
unique_lock<shared_mutex> ul(mtx); // unique_lock for writers
++cnt;
}
}
void reader() {
for (int i = 0; i < 100; ++i) {
shared_lock<shared_mutex> ul(mtx); // shared_lock for readers
cout << cnt;
}
}
如果需要多个互斥量, 那么要注意避免死锁: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16std::mutex mtx1, mtx2;
int cnt;
void increase1() {
for (int i = 0; i < 100; ++i) {
lock_guard<mutex> lock1(mtx1);
lock_guard<mutex> lock2(mtx2);
++cnt;
}
}
void increase2() {
for (int i = 0; i < 100; ++i) {
lock_guard<mutex> lock2(mtx2);
lock_guard<mutex> lock1(mtx1);
++cnt;
}
}lock()
, 通过all or nothing避免死锁:
1
2
3
4
5
6
7
8
9void increase() {
for (int i = 0; i < 100; ++i) {
lock(mtx1, mtx2); // lock both mutexes without deadlock
// make sure the locked mutexes unlocked at the end of scope
lock_guard<mutex> lock1(mtx1, std::adopt_lock);
lock_guard<mutex> lock2(mtx2, std::adopt_lock);
++cnt;
}
}scoped_lock
, 仍然通过RAII的方式避免手动解锁:
1
2
3
4
5
6void increase() {
for (int i = 0; i < 100; ++i) {
scoped_lock lck(mtx1, mtx2);
++cnt;
}
}
ok, 到此为止解决了互斥访问的问题, 接下来解决线程间通信的问题, 由此引入条件变量. 线程间通信最经典的例子就是生产者-消费者问题, 当准备好数据后, 生产者需要某种方式通知消费者:
- 共享内存: 设置共享变量
data
和ready
. 生产者准备好数据后, 将ready=true
; 消费者忙等监测ready
的状态. - 条件变量: 为了提高效率, 生产者准备好数据后通过条件变量发送通知; 消费者接到通知后才唤醒. It is all about sending a message.
1 | mutex mtx; |
注意到wait
是需要2件事才能唤醒的:
- 收到
cv
的通知:cv.notify_one()
ready == true
之所以仍然需要ready
变量,
是因为wait
可能出现虚假唤醒的情况.
生产者消费者
本质上是通过引入缓冲区来平衡生产速度和消费速度,包括单生产者-单消费者,单生产者-多消费者,多生产者-单消费者和多生产者-多消费者。
有几个需要注意的地方: 1
2
3while (q.empty()) {
cv.wait(lock);
}while
而不是if
,因为wait
从阻塞到返回并不一定是由notify_one()
造成的,还可能由于其他原因导致,即伪唤醒,这样就会导致后续执行出错。
由于单独使用mutex
可能导致死锁,因此选择使用unique_lock
管理互斥锁。之所以不能用lock_guard
,因为lock_guard
只能是在构造时自动调用lock()
上锁,析构时自动释放锁,即所谓的RAII(资源分配即初始化),没有lock()
和unlock()
接口供程序员使用。
多线程顺序打印
1 |
|
线程安全的队列
STL的std::queue<T>
并不是线程安全的,举个栗子:
1 | if (!queue.empty()) { |
1 |
|
Ref
The Producer
Consumer Problem in C++
producer-consumer
生产者消费者模型
C++
multithread
使用C++手写线程安全队列