C++:并发编程入门笔记
标准库
下面介绍一些常用的标准库,包括 C 的库,涵盖了多线程及其通信方式。
请勿试图记住他们,先粗略看一下有哪些 API,然后马上开始实战环节,用到再上来查。
pthread(POSIX 线程库)
考虑到 std::thread
是对 POSIX 标准的 pthread
库的封装。故从后者讲起。
头文件:#include <pthread.h>
链接参数:-pthread
创建线程
- pthread_create
-
作用:创建一个线程并开始执行
-
签名:
1int pthread_create(pthread_t *restrict thread, 2 const pthread_attr_t *restrict attr, 3 void *(*start_routine)(void *), 4 void *restrict arg);
注:restrict 告诉编译器只有这个指针,或者从它派生的指针(指针+ 1)才能访问指针指向的内容。
-
thread 是线程句柄。我们不需要自己生成,调用完会填充进去。
-
attr 是线程属性。一般填 NULL
-
start_routine 是线程所执行的函数地址
-
arg 是线程所执行的函数的参数
-
等待线程
- pthread_join
-
作用:阻塞当前线程,直到指定的线程结束
-
签名:
1int pthread_join(pthread_t thread, void **retval);
-
thread 是被等待线程句柄。
-
retval 是被等待线程返回值的指针。填 NULL 表示不需要此返回值。
-
返回值:0 成功,非 0 失败
-
退出线程
- pthread_exit
-
作用:结束当前线程
-
签名:
1void pthread_exit(void *retval);
-
retval 是线程返回值。
-
自取线程句柄
- pthread_self
-
作用:获取当前线程句柄
-
签名:
1pthread_t pthread_self(void);
-
返回值:当前线程句柄
-
让线程脱管
- pthread_detach
-
作用:让线程自动结束
-
(join 可以近似理解为 wait)线程默认的状态是 joinable,如果一个线程结束运行但没有被 join,则它的状态类似僵尸进程,因为退出状态码尚未被回收,所以创建线程者应该调用 pthread_join 来等待线程运行结束,并可得到线程的退出代码,回收其资源(类似于wait, waitpid)
-
如果不希望子线程阻塞当前线程,可以在当前线程调用
pthread_detach(child_thread_id)
将子线程设置为 detached 状态,子线程将在运行后自动释放所有资源。(也可以在子线程中调用pthread_detach(pthread_self())
,效果等同) -
签名:
1int pthread_detach(pthread_t thread);
-
thread 是线程句柄。
-
返回值:0 成功,非 0 失败
-
清理
void pthread_cleanup_push(void (*callback)(void *), void *arg); void pthread_cleanup_pop(int execute);
shm(共享内存)
创建共享内存
1#include <sys/ipc.h>
2#include <sys/shm.h>
3
4int shmget(key_t key, size_t size, int shmflg);
-
作用:创建共享内存区域
-
参数:
-
key 是共享内存的 key。输出参数。
-
size 是共享内存的大小,单位是字节。
-
shmflg 是共享内存的标志,可以是 0 或者 IPC_CREAT。
-
-
返回值:如果成功,返回共享内存的 id,如果失败,返回 -1。
解除共享内存
1#include <sys/types.h>
2#include <sys/shm.h>
3
4int shmdt(const void *shmaddr);
-
作用:解除共享内存区域
-
参数:
- shmaddr 是共享内存的起始地址。
mmap(共享内存)
mmap
除了可以映射文件,也可以将内存区域映射。实际上它是 shm 的底层实现。
1void *mmap(void *addr, size_t length, int prot, int flags, int fd, off_t offset);
-
作用:映射内存区域
-
参数:
-
addr 是映射区域的起始地址。如果为 NULL,系统将自动分配。
-
length 是映射区域的大小,单位是字节。
-
prot 是映射区域的保护属性。可以是 PROT_READ、PROT_WRITE、PROT_EXEC。(读、写、执行)
- 也可以设置为 PROT_NONE。(不可访问)
-
flags 是映射区域的标志。
flags参数 说明 MAP_SHARED 进程间共享内存 MAP_PRIVATE 调用进程所私有。对该内存段的修改不会反映到映射文件 MAP_ANNOYMOUS 匿名映射 MAP_FIXED 内存段必须位于start参数指定的地址处,start必须是页大小的整数倍(4K整数倍) MAP_HUGETLB 按照大内存页面来分配内存空间 -
fd 是映射区域所在的文件的描述符。如果为 -1,则表示映射区域为内存区域。
-
offset 是映射区域所在的文件的偏移量。
-
匿名映射可以表示为全 0 的虚拟文件。匿名映射只是大型的、零填充的内存块,随时可供使用。这些映射驻留在堆之外,因此不会造成数据段碎片。详见 linux - What is the purpose of MAP_ANONYMOUS flag in mmap system call? - Stack Overflow
使用方法例子:
-
open 打开一个文件,然后 mmap 得到共享内存。这样进程间可以通过相同的文件名得到相同的共享内存。
-
匿名映射,用于有直接或间接父子关系的进程间。
pipe
对于父子进程,可以使用匿名管道通信。对于不相干的进程,可以采用命名管道的通信。
1#include <unistd.h>
2int pipe(int pipefd[2]);
3#define _GNU_SOURCE /* See feature_test_macros(7) */
4#include <fcntl.h> /* Definition of O_* constants */
5#include <unistd.h>
6int pipe2(int pipefd[2], int flags);
-
作用:创建管道
-
参数:
-
pipefd 是管道的句柄。输出参数。
-
flags 是管道的标志
-
O_CLOEXEC:设置管道的文件描述符为 close-on-exec 标志。意思是,当进程 exec 为另一个进程后,管道的文件描述符将被关闭。
-
O_NONBLOCK:设置管道的文件描述符为 non-blocking (非阻塞)标志。意思是,当进程读取管道时,如果管道中没有数据,则读取将立即返回,而不会等待。
-
O_RDONLY:设置管道的文件描述符为只读标志。
-
-
#include <sys/types.h>
#include <sys/stat.h>
int mkfifo(const char *pathname, mode_t mode);
-
作用:创建 FIFO 管道(命名管道)
-
参数:
-
pathname 是管道的路径名。
-
mode 是管道的权限。
-
-
返回值:0 成功,非 0 失败
此外,也可以用万能的 mknod(创建 inode)来创建 FIFO 管道。
需要注意,close 并不能关闭管道,而是关闭管道的文件描述符。必须通过 unlink 删除管道。
管道的读写:用 read 和 write 函数即可读写管道。其中,读写端分别是 fd[0]
和 fd[1]
。
管道的关闭:用 close 函数即可关闭管道。
管道的缺点:
-
只能承载字节流,需要自行设计协议来实现高级的通信。
-
管道默认大小比较小。不过新的 Linux 内核已经允许很大的容量。
消息队列
创建和获取
1#include <sys/msg.h>
2int msgget(key_t key, int msgflg);
-
作用:创建/获取消息队列
-
参数:
-
key 是消息队列的 key。若作为输出参数,则会生成一个新的消息队列。若作为输入参数,则会获取已有的消息队列。
-
msgflg 是消息队列的标志。
- IPC_CREAT 表示如果给定的 key 不存在,则创建一个新的消息队列。存在则忽略。
-
-
返回值:返回一个以 key 命名的消息队列句柄
-
例子:
1auto msgqid = msgget(IPC_PRIVATE, IPC_CREAT | 0600);
收发消息
1#include <sys/msg.h>
2int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);
-
作用:发送消息
-
参数:
-
msqid 是消息队列的句柄。
-
msgp 是消息的内容。
-
msgsz 是消息的大小。
-
msgflg 是消息的标志。
-
0:阻塞式发送。(如队满)
-
IPC_NOWAIT:非阻塞式发送。(如队满,直接返回错误)
-
IPC_NOERROR:消息超过 msgsz 直接截断。
-
-
-
返回值:0 成功,非 0 失败
1ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
2 int msgflg);
-
作用:接收消息
-
参数:同上。
- msgflg
-
0:阻塞式接收。直到等待的类型消息到来。
-
IPC_NOWAIT:如果队列中没有请求类型的消息,则立即返回。系统调用失败,错误设置为 ENOMSG。
-
IPC_EXCEPT:与 msgtyp 一起使用,用于读取队列中消息类型与 msgtyp 不同的第一条消息。
-
IPC_NOERROR:如果长度超过 msgsz 字节,则截断消息文本。
-
- msgflg
-
返回值:0 成功,非 0 失败
msgp
的通行格式:
1struct msgbuf {
2 long mtype; /* message type, must be > 0 */
3 char mtext[1]; /* message data */
4};
删除消息
下面的代码删除一个消息队列:
1msgctl(msqid, IPC_RMID, 0)
函数原型:
1int msgctl(int msgid, int command, struct msgid_ds *buf);
-
作用:控制消息队列
-
参数:
-
msgid 是消息队列的句柄。
-
command 是控制命令。
- IPC_RMID 表示删除消息队列。
-
buf 是控制命令的参数。指向 msgid_ds 结构的指针,它指向消息队列模式和访问权限的结构。msgid_ds 结构至少包括以下成员:
-
-
返回值:0 成功,非 0 失败
1struct msgid_ds
2{
3 uid_t shm_perm.uid;
4 uid_t shm_perm.gid;
5 mode_t shm_perm.mode;
6};
线程同步
Mutex
1int pthread_mutex_init(
2 pthread_mutex_t *restrict mutex,
3 const pthread_mutexattr_t *restrict attr
4);
-
作用:初始化互斥锁
-
参数:
-
mutex 是互斥锁的指针。
-
attr 是互斥锁的属性。
-
-
返回值:0 成功,非 0 失败
1int pthread_mutex_destroy(pthread_mutex_t *mutex);
-
作用:销毁互斥锁
-
参数:
- mutex 是互斥锁的指针。
-
返回值:0 成功,非 0 失败
1int pthread_mutex_lock(pthread_mutex_t *mutex);
-
作用:锁定互斥锁
-
参数:
- mutex 是互斥锁的指针。
-
返回值:0 成功,非 0 失败
1int pthread_mutex_unlock(pthread_mutex_t *mutex);
-
作用:解锁互斥锁
-
参数:
- mutex 是互斥锁的指针。
-
返回值:0 成功,非 0 失败
1int pthread_mutex_trylock(pthread_mutex_t *mutex);
-
作用:尝试锁定互斥锁然后解锁
-
参数:
- mutex 是互斥锁的指针。
-
返回值:0 成功,非 0 失败
RWLock (SXLock)
读写锁,也称共享-独占锁。允许多个线程同时读取,但是只允许一个线程写入。
递归锁(Recursive Lock)也称为可重入互斥锁(reentrant mutex),是互斥锁的一种,同一线程对其多次加锁不会产生死锁。
1int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
2int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
3int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
4int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
5int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
6int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
7int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
1enum
2{
3 PTHREAD_RWLOCK_PREFER_READER_NP, // 读者优先,即必须所有读者都解锁,才能写入。且会被信新来的写者插队。(只要有读者,写者就会饥饿)
4 PTHREAD_RWLOCK_PREFER_WRITER_NP, // 写者优先,和读者优先基本一样,只不过新来的写者不能插队。
5 PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP, // 写者优先,和读者优先基本一样,只不过新来的写者不能插队,并且不能重入。(一般设置为此值避免饥饿)
6 PTHREAD_RWLOCK_DEFAULT_NP = PTHREAD_RWLOCK_PREFER_READER_NP
7};
spinlock
在申请加锁的时候,会使得线程阻塞,阻塞的过程又分两个阶段,第一阶段是会先自旋,即不断地去申请锁,失败一定次数(spin count),线程会进入等待队列,并休眠。当锁可用时被唤醒。
适合实时性较高的场合。
1 int pthread_spin_init (__pthread_spinlock_t *__lock, int __pshared);
2 int pthread_spin_destroy (__pthread_spinlock_t *__lock);
3 int pthread_spin_trylock (__pthread_spinlock_t *__lock);
4 int pthread_spin_unlock (__pthread_spinlock_t *__lock);
5 int pthread_spin_lock (__pthread_spinlock_t *__lock);
Conditional Variable
条件变量用于等待变量满足条件后唤醒线程。
1 int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
2 int pthread_cond_destroy(pthread_cond_t *cond);
3
4 int pthread_cond_signal(pthread_cond_t *cond);
5 int pthread_cond_broadcast(pthread_cond_t *cond);
6
7 int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
8 int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex, const struct timespec *restrict abstime);
条件信号必须在临界区中抛出。
Semaphore
1 int sem_destroy(sem_t *sem);
2 int sem_init(sem_t *sem, int pshared, unsigned int value);
3
4 int sem_wait(sem_t *sem); // sem -= 1
5 int sem_post(sem_t *sem); // sem += 1
6
7 int sem_getvalue(sem_t *sem, int *valp); // valp 是输出参数
Barrier
1int pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *restrict attr, unsigned count);
2int pthread_barrier_destroy(pthread_barrier_t *barrier);
3int pthread_barrier_wait(pthread_barrier_t *barrier);
std::thread
线程对象
std::thread
表示一个线程对象。一旦构造就启动线程并执行:
1 std::thread t1(
2 [](const char* name) {
3 std::cout << "Hello, " << name << "!" << std::endl;
4 },
5 "world");
6 t1.join();
除了可以传入 lambda 表达式,还可以传入:
-
仿函数对象。
-
函数指针。
-
成员函数。(则第一个参数必须是对象指针)
推荐阅读 C++并发:从std::thread()开始 | xinlu’s blog
如何获取线程的ID?
1 std::thread::id id = std::this_thread::get_id();
如何获取线程的返回值?
这就需要用 future 了
future
1 std::future<int> f = std::async(std::launch::async, []() {
2 return 42;
3 });
4 int result = f.get();
-
std::async
用于异步执行任务,并返回一个std::future
对象。 -
std::lauch::async
表示异步启动策略 -
std::launch::deferred
表示延迟启动策略,即等待调用std::future::get
才启动线程。 -
future_obj.get
用于获取异步执行的结果。 -
future_obj.wait
用于等待异步执行的结果。
mutex
C++ 提供这几种互斥锁:
-
std::mutex: 不可重入的互斥锁。
-
std::recursive_mutex: 可重入的互斥锁。
-
std::timed_mutex: 带有超时的互斥锁。
- 例子:
1{
2 std::timed_mutex m;
3 if(!m.try_lock_for(std::chrono::seconds(1)))
4 throw std::runtime_error("timeout");
5}
- std::recursive_timed_mutex: 带有超时的可重入的互斥锁。
相关辅助的类型:
-
std::lock_guard: 提供了基于作用域的上锁和自动释放。
- 例子:
1{ 2 std::mutex m; 3 std::lock_guard<std::mutex> lk(m); 4 // do something 5} 6// lk goes out of scope, m is unlocked
-
std::unique_lock: 允许延迟锁定、时间受限的锁定尝试、递归锁定、锁定所有权转移以及与条件变量一起使用。
- 与 lock_guard 相比,unique_lock 必须指定泛型参数。
-
std::try_lock: 尝试锁定。如果锁定失败,则返回 false。
- 例子:
1{ 2 std::mutex m; 3 std::unique_lock<std::mutex> lk(m); 4 if(!lk.try_lock()) 5 throw std::runtime_error("failed"); 6 // do something 7}
-
std::lock
尝试锁定,如果锁定失败,则抛出异常。- 例子:
1{ 2 std::mutex m; 3 std::unique_lock<std::mutex> lk(m); 4 lk.lock(); 5 // do something 6}
-
std::call_once
: 只执行一次的函数。- 例子:
1{ 2 std::once_flag flag; 3 std::call_once(flag, []() { 4 // do something 5 }); 6}
-
once_flag: 一个辅助结构,相当于一个锁,共用这个标志的线程,只有其中一个能成功执行。
-
如果一个线程异常,其它等待的线程会唤醒一个执行。
-
实现原理:互斥锁+条件变量+广播唤醒
unique_lock + timed_mutex 的例子
1{
2 std::timed_mutex m;
3 std::unique_lock<std::timed_mutex> lk(m, std::chrono::seconds(1));
4 if (lk.owns_lock()) {
5 // do something
6 }
7}
8// lk goes out of scope, m is unlocked
condition_variable
-
std::condition_variable: 一个条件变量。
-
std::condition_variable_any: 一个可以包含多个条件变量的条件变量。
-
cv.wait (lk): 在 lk 上阻塞,直到有其它线程调用 cv.notify_one() 或 cv.notify_all()。
- 例子:
1{ 2 std::condition_variable cv; 3 std::mutex m; 4 std::unique_lock<std::mutex> lk(m); 5 cv.wait(lk); 6 // do something 7}
-
cv.wait_for (lk, std::chrono::duration): 在 lk 上阻塞,直到有其它线程调用 cv.notify_one() 或 cv.notify_all()或超时。
-
cv.wait_until (lk, std::chrono::time_point): 在 lk 上阻塞,直到有其它线程调用 cv.notify_one() 或 cv.notify_all()或超时。
- 例子:
1{ 2 std::condition_variable cv; 3 std::mutex m; 4 std::unique_lock<std::mutex> lk(m); 5 cv.wait_until(lk, std::chrono::system_clock::now() + std::chrono::seconds(1)); 6 // do something 7}
-
cv.notify_one(): 唤醒一个线程。
- 例子:
1{ 2 std::condition_variable cv; 3 std::mutex m; 4 std::unique_lock<std::mutex> lk(m); 5 cv.notify_one(); 6}
-
cv.notify_all(): 唤醒所有线程。
- 例子:
1{ 2 std::condition_variable cv; 3 std::mutex m; 4 std::unique_lock<std::mutex> lk(m); 5 cv.notify_all(); 6}
semaphore
std::counting_semaphore: 一个计数信号量。信号量能够让最多 LeastMaxValue 个线程同时访问。
- 例子:
1{
2 std::counting_semaphore s(1);
3 std::thread t1([&s]() {
4 s.wait();
5 // do something
6 s.signal();
7 });
8 std::thread t2([&s]() {
9 s.wait();
10 // do something
11 s.signal();
12 });
13 t1.join();
14 t2.join();
15}
由于 C++ 11 没有标准库信号量,可以用 std::mutex + std::condition_variable 来实现一个:
1#include <mutex>
2#include <condition_variable>
3
4class semaphore {
5 std::mutex mutex_;
6 std::condition_variable condition_;
7 unsigned long count_ = 0; // Initialized as locked.
8
9public:
10 void release() { // v
11 std::lock_guard<decltype(mutex_)> lock(mutex_);
12 ++count_;
13 condition_.notify_one();
14 }
15
16 void acquire() { // P
17 std::unique_lock<decltype(mutex_)> lock(mutex_);
18 while(!count_) // Handle spurious wake-ups.
19 condition_.wait(lock);
20 --count_;
21 }
22
23 bool try_acquire() {
24 std::lock_guard<decltype(mutex_)> lock(mutex_);
25 if(count_) {
26 --count_;
27 return true;
28 }
29 return false;
30 }
31};
以及使用 pthread 提供的信号量:
1int sem_init(sem_t *sem, int pshared, unsigned int value);
-
sem_init: 初始化信号量。
- 例子:
1{ 2 sem_t s; 3 sem_init(&s, 0, 1); 4 sem_wait(&s); 5 // do something 6 sem_post(&s); 7}
- pshared 我们填 0 就行。true 表示进程间共享,false 表示进程内线程间共享。
-
sem_post:用于唤醒一个等待的线程。(对应 PV 操作中的 V)
-
sem_wait:用于阻塞当前线程,直到获得访问机会。(对应 PV 操作中的 P)
锁的原理
任务 1:写两个线程,分别能打印一次 a 和 b。
参考代码:
1#include <pthread.h>
2#include <stdio.h>
3struct args_t {
4 char out;
5};
6int print_thread(struct args_t* args) {
7 printf("%c", args->out);
8 return 0;
9}
10int main() {
11 pthread_t ha, hb;
12 struct args_t a = {.out = 'a'};
13 struct args_t b = {.out = 'b'};
14 pthread_create(&ha, NULL, (void* (*)(void*))(print_thread), (void*)(&a));
15 pthread_create(&hb, NULL, (void* (*)(void*))(print_thread), (void*)(&b));
16 pthread_join(ha, NULL);
17 pthread_join(hb, NULL);
18 return 0;
19}
参考代码2:
1#include <algorithm>
2#include <iostream>
3#include <thread>
4#include <vector>
5using namespace std;
6
7int main() {
8 vector<thread> ts;
9 ts.push_back(thread{[]() { cout << 'a'; }});
10 ts.push_back(thread{[]() { cout << 'b'; }});
11 for (auto&& t : ts) {
12 t.join();
13 }
14 return 0;
15}
Peterson 算法的原理
1int turn, x = 0, y = 0;
2
3void thread1() {
4 [1] x = 1;
5 [2] turn = T2;
6 [3] while (y && turn == T2) ;
7 [4] // critical section
8 [5] x = 0;
9}
10
11void thread2() {
12 [1] y = 1;
13 [2] turn = T1;
14 [3] while (x && turn == T1) ;
15 [4] // critical section
16 [5] y = 0;
17}
方括号表示 PC 指针。peterson 算法可以理解为:两个同学(x, y)想上厕所。
每个同学的策略都是:
-
先举旗子(表明自己要上厕所),然后在厕所门上写上对方的名字。
-
只要对方想上厕所,且门上写者对方的名字,就一直等待。
假设每行代码的执行都是顺序、原子的,则这个算法不会有任何问题。
可以用状态树来证明。此程序的状态初始状态是 <pc1, pc2, x, y, turn>
。穷举所有调度序列可知不可能存在 pc1 = pc2 = 4
的情况。详见 jyy 视频。
而我提供一个自己的理解:
Peterson 算法之所以有效,举个例子,假设 T2 进入临界区,则一定有
-
turn != T1,则说明 y = 1 被执行,则说明 T1 未进入临界区。
-
或者 x = 0,则 thread1 不位于临界区。
实现互斥的根本困难
实现互斥的根本困难是:无法做到同时读、写共享内存。
LOAD:读取时,只能看到一瞬间的状态。而此状态可能看到之后就马上变了。
STORE:写入时,无法知道写入之前里面是什么。
Peterson 算法之所以有效,其根本原因在于 LOAD 时通过 x 判断 t1 是否在临界区。如果 x = 0,则绝无可能在临界区。这是对 LOAD 前瞬间的保证。
同时如果 turn != T1,也能判断出 T2 绝无可能在 LOAD 之后改变 turn 的值。因为被 while 循环堵住了。这是对 LOAD 后瞬间的保证。
因此,要解决互斥,硬件层面最好能提供同时结合 LOAD 和 STORE 功能的指令。
自旋锁的实现原理
x86 提供了 lock
前缀。
-
lock addq
实现对某个值原子增加一个值。-
L步骤:读取旧值 val
-
S步骤:存放 val + incr
-
-
(lock) xchg
实现原子交换两个状态。-
L步骤:读取旧值 val
-
S步骤:存放 newval
-
返回 val
-
其它原子指令
-
tas
(test and set) 实现用 1 交换旧值-
L步骤:读取旧值 val
-
S步骤:存放 1
-
返回 val
-
-
cas
(compare and swap) 实现条件交换-
L步骤:读取旧值 val
-
S步骤:比较 val == expect,如果相等,存放 newval
-
返回 val
-
用 XCHG 实现互斥的原理:
-
初始状态:桌子上一把钥匙
-
大家去抢钥匙,具体方式是用手上的东西换
-
如果换出来一看是把钥匙,那就得到了锁。
-
如果换出来一看是不是要是,说明没抢到,重新抢。
-
用好钥匙的同学把钥匙还回去。
1int xchg(volatile int *addr, int newval) {
2 int result;
3 asm volatile ("lock xchg %0, %1"
4 : "+m"(*addr), "=a"(result) : "1"(newval));
5 return result;
6}
1int table = YES;
2
3void lock() {
4retry:
5 int got = xchg(&table, NOPE);
6 if (got == NOPE)
7 goto retry;
8 assert(got == YES);
9}
10
11void unlock() {
12 xchg(&table, YES)
13}
自旋锁的优点:
- xchg 成功,立即进入临界区,低开销
自旋锁的缺陷:
-
从硬件层面:xchg 指令自旋,会强制处理期间的缓存同步
-
僧多肉少:太多空转线程,争抢锁的处理器越多,利用率越低
-
调度灾难:获得自旋锁的线程可能被操作系统切换出去
自旋锁的使用场景
-
只有少量情况需要少量线程抢锁
-
持有锁时不会被切出
典型场景:操作系统内核的并发数据结构 (短临界区)
改进思路——睡眠锁
-
方法:把锁的管理放到内核。
-
原理:让没有获得锁的人进入等待队列。
-
效果:
-
上锁失败就休眠,不再浪费 CPU
-
缺点:即使上锁成功也需要进出内核
-
改进思路——Futex(Fast Userspace muTexes)
-
效果:
-
上锁时成功立即返回
-
失败时休眠
-
-
原理:
-
在用户空间抢锁
-
未获得锁时,把当前线程放到等待队列
-
锁的用途
生产者-消费者问题
一个线程生产左括号,另一个线程消费右括号。
一旦生成的括号序列不是合法括号序列的前缀,说明出了问题。
检验程序:
1#include <iostream>
2#include <stack>
3
4int main() {
5 std::stack<int> pairs;
6 for (int ch; (ch = std::getchar()) != EOF;) {
7 if (ch == '(') {
8 pairs.push(ch);
9 } else if (ch == ')') {
10 if (pairs.size() == 0) {
11 std::cerr << "failed!" << std::endl;
12 exit(1);
13 }
14 pairs.pop();
15 }
16 }
17 return 0;
18}
使用互斥锁:
1#include <iostream>
2#include <mutex>
3#include <thread>
4#include <vector>
5
6int count = 0;
7int N = 100;
8// 注意,mutex 不能 const
9std::mutex m;
10void t1() {
11 std::cout << "t1";
12 for (;;) {
13 std::lock_guard<std::mutex> lk(m);
14 if (count != N) {
15 count++;
16 std::cout << "(";
17 }
18 }
19}
20void t2() {
21 std::cout << "t1";
22 for (;;) {
23 std::lock_guard<std::mutex> lk(m);
24 if (count != 0) {
25 std::cout << ")";
26 count--;
27 }
28 }
29}
30
31int main() {
32 std::vector<std::thread> ts;
33 ts.push_back(std::thread(t1));
34 ts.push_back(std::thread(t2));
35 for (auto&& t : ts) {
36 t.join();
37 }
38 return 0;
39}
问题:有大量的时间浪费在不停询问锁是否可用的过程中。
分析:T1 一直在等待 count != N 的条件。T2 一直在等待 count != 0 的条件。
所以我们可以把这种等待条件的自旋变成睡眠。
1void t1() {
2 std::cout << "t1";
3 for (;;) {
4 std::lock_guard<std::mutex> lk(m);
5 while(count == N) {
6 std::cout << ".";
7 unlock_and_sleep();
8 }
9 count++;
10 wake_up();
11 std::cout << "(";
12 }
13}
14void t2() {
15 std::cout << "t1";
16 for (;;) {
17 std::lock_guard<std::mutex> lk(m);
18 if(count == 0) {
19 unlock_and_sleep();
20 }
21 std::cout << ")";
22 count--;
23 wake_up();
24 }
25}
这正是条件变量的思想:
1std::mutex m;
2std::condition_variable cv;
3void t1() {
4 std::cout << "t1";
5 for (;;) {
6 std::unique_lock<std::mutex> lk(m);
7 if(count == N) {
8 cv.wait(lk);
9 }
10 std::cout << "(";
11 count++;
12 cv.notify_all();
13 }
14}
15void t2() {
16 std::cout << "t1";
17 for (;;) {
18 std::unique_lock<std::mutex> lk(m);
19 if(count == 0) {
20 cv.wait(lk);
21 }
22 std::cout << ")";
23 count--;
24 cv.notify_all();
25
26 }
27}
上面代码是有 BUG 的。主要问题,就是虚假唤醒。只有一个条件百度,它并不区分我唤醒的是生产者还是消费者。唤醒之后,如果不重新判断条件是否满足,就会产生错误的结果。比如假设 1 个生产者,两个消费者,则很可能一个消费者唤醒了另一个消费者。
不能同类唤醒。应带采用两个条件变量的方式。
一个常见的错误如下(为什么?):
1std::mutex m;
2std::condition_variable can_produce;
3std::condition_variable can_comsume;
4void t1() {
5 std::cout << "t1";
6 for (;;) {
7 std::unique_lock<std::mutex> lk(m);
8 if (count == N) {
9 can_produce.wait(lk);
10 }
11 count++;
12 std::cout << "(";
13 can_comsume.notify_all();
14 }
15}
16void t2() {
17 std::cout << "t1";
18 for (;;) {
19 std::unique_lock<std::mutex> lk(m);
20 if (count == 0) {
21 can_comsume.wait(lk);
22 }
23 std::cout << ")";
24 count--;
25 can_produce.notify_all();
26 }
27}
我们可以用 gdb 的线程调试(详见我的另一篇文章)找出 bug。我们发现 count 竟然会变成负数。
原来,生产者唤醒了多个消费者,导致消费者不加检查就进行 count--
一个解决方法是醒来之后检查:
1std::mutex m;
2std::condition_variable can_produce;
3std::condition_variable can_comsume;
4void t1() {
5 std::cout << "t1";
6 for (;;) {
7 std::unique_lock<std::mutex> lk(m);
8 while (count == N) {
9 can_produce.wait(lk);
10 }
11 count++;
12 std::cout << "(";
13 can_comsume.notify_all();
14 }
15}
16void t2() {
17 std::cout << "t1";
18 for (;;) {
19 std::unique_lock<std::mutex> lk(m);
20 while (count == 0) {
21 can_comsume.wait(lk);
22 }
23 std::cout << ")";
24 count--;
25 can_produce.notify_all();
26 }
27}
条件变量的含义是:
-
wait 表示:把自己加入等待队列,当条件满足时自己被唤醒,然后重新尝试取得锁,并继续执行。
-
notify_all 表示:唤醒所有等待的线程。让它们争抢锁,得到锁的执行。
信号量
信号量是一个令牌,得到令牌的可以进入临界区。
P 表示得到一把钥匙,信号量计数器减 1,如果计数器为 0,则不能进入临界区,在队列等待。
V 表示释放一把钥匙,信号量计数器加 1,如果有线程等待,则相当于把令牌直接交给等待的一个线程。
1void producer(){
2 for(;;){
3 P(&empty);
4 print("(");
5 V(&fill);
6 }
7}
8
9void consumer(){
10 for(;;){
11 P(&fill);
12 print(")");
13 V(&empty);
14 }
15}
上面的代码定义了两个线程,一个生产者,一个消费者。两个信号量,empty
表示空计数器。fill
表示有计数器。
P(&empty)
的意思是,如果空闲计数器为 0,则不能进入临界区,在队列等待。进入后消耗一个空位。 V(&fill)
的意思是,将满计数器加 1。增加一个满位。
(可以结合上面我们自己实现的信号量的代码来理解)
哲学家吃饭问题
BUG 的复现
1#include <semaphore.h>
2#include <iostream>
3#include <thread>
4#include <vector>
5
6auto thread_phi(int N, int id, std::vector<sem_t*>* sems) {
7 int lhs = (id - 1) % N;
8 int rhs = id % N;
9 std::cout << "phi " << id << " started, left " << lhs << " right " << rhs
10 << std::endl;
11 std::this_thread::sleep_for(std::chrono::microseconds(10));
12 for (;;) {
13 sem_wait((*sems)[lhs]);
14 std::cout << "T" << id << "got" << lhs << std::endl;
15 sem_wait((*sems)[rhs]);
16 std::cout << "T" << id << "got" << rhs << std::endl;
17 // eat
18 sem_post((*sems)[lhs]);
19 sem_post((*sems)[rhs]);
20 }
21}
22int main(int argc, char const* argv[]) {
23 if (argc != 2) {
24 std::cerr << "need number of philosopher" << std::endl;
25 exit(EXIT_FAILURE);
26 }
27 int N = atoi(argv[1]);
28 if (N < 2) {
29 std::cerr << "need positive number" << std::endl;
30 exit(EXIT_FAILURE);
31 }
32 std::vector<std::thread> ts;
33 std::vector<sem_t*> sems;
34 for (auto i = 0; i < N; ++i) {
35 auto sem = new sem_t;
36 sem_init(sem, false, 1);
37 sems.push_back(sem);
38 }
39 for (auto i = 0; i < N; ++i) {
40 // 注意 id 从 1 开始,否则 lhs 会出现负数
41 ts.push_back(std::thread(thread_phi, N, i + 1, &sems));
42 }
43 for (auto&& t : ts) {
44 t.join();
45 }
46 for (auto&& s : sems) {
47 delete s;
48 }
49
50 return 0;
51}
运行后发现很快死锁。
为什么呢?举个例子:每个人同时举起左手,那么每个人都在等待别人的右手放下,从而卡住。
解决方法:
-
万能方法:使用条件变量。如果不满足左右都有空,就睡眠。
-
更好的方法:把资源管理放到 Supervisor 中。
-
对于每个哲学家:
-
发送 EAT 信号,表示想吃饭
-
等待许可
-
吃饭
-
发送 DONE 信号
-
-
而 Supervisor 则管理所有哲学家的这些信号。使用消息队列。
- 这需要我们自己设计如何调度和管理资源,计划经济,非常复杂。
死锁检测和处理
静态分析/动态分析
AA-Deadlock:使用防御性编程,assert
ABBA-Deadlock:
- Locker-ordering 确保所有锁的获得顺序都是一样的。在任意时刻,总有获得最后一个锁的进程可以继续前进。
使用状态空间模型检测。如果一个状态,被两条不同颜色的边进入,则此状态是非法的。
数据竞争:两种常见情况
-
忘记上锁——原子性违反(AV)
-
忘记同步——顺序违反(OV)
Lockdep:运行时死锁检查。我们有每一个锁的分配日志,从而构建有向图,一旦发现环就说明有可能出现死锁。
ThreadSanitizer:运行时数据竞争检查。记录内存访问和 lock/unlock 的日志,为所有事件建立 happens-before 关系图。
常用的动态分析工具:
-
AddressSanitizer:检查内存访问是否合法。
-
ThreadSanitizer:检查线程间的数据竞争。
-
MemorySanitizer:检查内存访问是否合法。
-
UBSanitizer:检查代码是否有未定义行为。
kasan ktsan
Canary:牺牲一些内存单元,用来预警memory error
各种锁的实现
互斥锁
要实现不饥饿的互斥锁需要操作系统的帮助(调度、队列化)。但如果只是玩玩,可以用原子变量。这样的话一般只能做成自旋锁,不适合竞争激烈的情景。
用 compare_exchange_strong 实现互斥锁:
1class Mutex {
2 std::atomic<int> _lock{0};
3
4 public:
5 void lock() {
6 int expect = 0;
7 while (!_lock.compare_exchange_strong(expect, 1,
8 std::memory_order_acquire)) {
9 expect = false;
10 }
11 }
12 void unlock() { _lock.store(0, std::memory_order_release); }
13};
更加省事儿的做法:
1class Mutex {
2 std::atomic_flag _lock{ATOMIC_FLAG_INIT};
3
4 public:
5 void lock() {
6 while (_lock.test_and_set(std::memory_order_acquire)) {
7 };
8 }
9 void unlock() { _lock.clear(std::memory_order_release); }
10};
读写锁
读写锁的特点是写与「读、写」都互斥。典型例子:计数器。
我们使用条件变量配合 mutex 实现一个读写锁:
1class RWLock {
2 std::mutex _mutex;
3 std::condition_variable _cond;
4 int _stat; // +N, for reader count. -N for writer count
5 public:
6 void RLock() {
7 std::unique_lock lk(_mutex);
8 while (_stat < 0) {
9 _cond.wait(lk); // 暂时解锁,直到被通知条件满足后获得并上锁
10 }
11 // std::cout << "RLock" << std::endl;
12 _stat++;
13 }
14 void WLock() {
15 std::unique_lock lk(_mutex);
16 while (_stat != 0) {
17 _cond.wait(lk);
18 }
19 // std::cout << "WLock" << std::endl;
20 _stat++;
21 }
22 void RUnlock() {
23 std::unique_lock lk(_mutex);
24 // std::cout << "RUnlock" << std::endl;
25 _stat--;
26 _cond.notify_one(); // 由于只允许一个写,所以用 notify_one
27 }
28 void WUnlock() {
29 std::unique_lock lk(_mutex);
30 // std::cout << "WUnlock" << std::endl;
31 _stat--;
32 _cond.notify_all();
33 }
34};
另一种方式:
1class RWLock {
2 std::mutex _read_mtx;
3 std::mutex _write_mtx;
4 int _nread = 0;
5
6 public:
7 void RLock() {
8 std::lock_guard g(_read_mtx);
9 _nread++;
10 if (_nread == 1) {
11 _write_mtx.lock();
12 }
13 }
14 void WLock() { _write_mtx.lock(); }
15 void RUnlock() {
16 std::lock_guard g(_read_mtx);
17 _nread--;
18 if (_nread == 0) {
19 _write_mtx.unlock();
20 }
21 }
22 void WUnlock() { _write_mtx.unlock(); }
23};
参考
感谢南京大小 jyy 老师的 Bilibili 开源课程。