线程安全
# 线程互斥
基本概念:
- 临界资源:多线程执行流安全访问的共享的资源
- 临界区:每个线程内部,访问临界资源的代码
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性:对一个资源进行访问的时候,要么不做、要么做完,并且对资源进行的操作,只用一条汇编即可完成
例如,下面的模拟抢票程序:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
void *route(void *arg) {
char *id = (char *)arg;
while (1) {
if (ticket > 0) {
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
}
else {
break;
}
}
return nullptr;
}
int main(void) {
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void*)"thread 1");
pthread_create(&t2, NULL, route, (void*)"thread 2");
pthread_create(&t3, NULL, route, (void*)"thread 3");
pthread_create(&t4, NULL, route, (void*)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
运行程序结果如下:
thread 2 sells ticket:100
thread 3 sells ticket:99
thread 4 sells ticket:98
thread 1 sells ticket:100
...
thread 4 sells ticket:3
thread 3 sells ticket:2
thread 2 sells ticket:1
thread 4 sells ticket:0
thread 3 sells ticket:-1
thread 1 sells ticket:-2
2
3
4
5
6
7
8
9
10
11
我们发现了票数会重复并且出现了负数,原因如下:
- if 语句判断条件为真以后,代码可以并发的切换到其他线程
- usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
- --ticket 操作本身就不是一个原子操作,而是由 3 条汇编语言构成的:
- load :将共享变量 ticket 从内存加载到寄存器中
- update : 更新寄存器里面的值,执行-1 操作
- store :将新值,从寄存器写回共享变量 ticket 的内存地址
为了解决以上问题,就要做到以下三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
Linux 中,提供的方式就是(互斥量)锁

# 初始化互斥量
互斥量,即互斥锁,用于保护共享资源,确保在任意时刻只有一个线程可以访问该资源。它允许一个线程进入临界区执行操作,而其他线程必须等待当前线程释放锁后才能进入。互斥锁的作用是防止多个线程同时访问共享资源,从而避免竞争条件和数据不一致的问题。
- 静态分配
使用宏定义来初始化,其内部状态会在程序结束时由操作系统自动清理,无需手动调用 pthread_mutex_destroy()来销毁这种类型的互斥锁。
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <iostream>
#include "Mutex.hpp"
using namespace std;
// 没有分号哦
#define UserNum 4
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 共享资源:
int tickets = 1000;
// 抢票函数:
void* thread_run(void* args) {
char* threadname = (char*) args;
while (true) {
pthread_mutex_lock(&lock);
usleep(10000);
if(tickets > 0) {
cout << threadname << "正在抢票:" << tickets << endl;
tickets--;
// 解锁
pthread_mutex_unlock(&lock);
} else {
pthread_mutex_unlock(&lock);
break;
}
usleep(1000);
}
return nullptr;
}
int main() {
pthread_mutex_t lock;
vector<pthread_t> pids(UserNum);
pthread_mutex_init(&lock, nullptr);
for(int i = 0; i < UserNum; i++) {
char buffer[64];
snprintf(buffer, sizeof buffer, "thread-%d", i + 1);
pthread_create(&pids[i], nullptr, thread_run, buffer);
}
for(auto& pid : pids) {
pthread_join(pid, nullptr);
}
pthread_mutex_destroy(&lock);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
- 动态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
# 互斥量的加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// 返回值:成功返回0,失败返回错误号
2
3
调用 pthread_mutex_lock 时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功。
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么 pthread_mutex_lock 调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
将模拟抢票程序优化,加上互斥锁:
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <iostream>
using namespace std;
// 没有分号哦
#define UserNum 4
class ThreadData {
public:
ThreadData(const string& name, pthread_mutex_t* _mutex) : threadname(name), mutex(_mutex) {}
~ThreadData() {}
public:
string threadname;
pthread_mutex_t* mutex;
};
// 共享资源:
int tickets = 1000;
// 抢票函数:
void* thread_run(void* args) {
ThreadData* td = static_cast<ThreadData*>(args);
while (true) {
pthread_mutex_lock(td->mutex);
usleep(10000);
if(tickets > 0) {
cout << td->threadname << "正在抢票:" << tickets << endl;
tickets--;
// 解锁
pthread_mutex_unlock(td->mutex);
} else {
pthread_mutex_unlock(td->mutex);
break;
}
usleep(1000);
}
return nullptr;
}
int main() {
pthread_mutex_t lock;
vector<pthread_t> pids(UserNum);
pthread_mutex_init(&lock, nullptr);
for(int i = 0; i < UserNum; i++) {
char buffer[64];
snprintf(buffer, sizeof buffer, "thread-%d", i + 1);
ThreadData* td = new ThreadData(buffer, &lock);
pthread_create(&pids[i], nullptr, thread_run, td);
}
for(auto& pid : pids) {
pthread_join(pid, nullptr);
}
pthread_mutex_destroy(&lock);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
也可以使用 RAII 风格来封装:
// Mutex.hpp
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
class Mutex {
public:
Mutex(pthread_mutex_t* lock_p_ = nullptr) : lock_p(lock_p_) {}
~Mutex() {}
void lock() {
if(lock_p) pthread_mutex_lock(lock_p);
}
void unlock() {
if(lock_p) pthread_mutex_unlock(lock_p);
}
private:
pthread_mutex_t* lock_p;
};
class LockGuard {
public:
LockGuard(pthread_mutex_t* lock_p_) : mutex(lock_p_) {
mutex.lock();
}
~LockGuard() {
mutex.unlock();
}
private:
Mutex mutex;
};
// ---------------------
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <iostream>
#include "Mutex.hpp"
using namespace std;
// 没有分号哦
#define UserNum 4
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 共享资源:
int tickets = 1000;
// 抢票函数:
void* thread_run(void* args) {
char* threadname = (char*) args;
while (true) {
LockGuard lockguard(&lock);
usleep(10000);
if(tickets > 0) {
cout << threadname << "正在抢票:" << tickets << endl;
tickets--;
} else {
break;
}
usleep(1000);
}
return nullptr;
}
int main() {
pthread_mutex_t lock;
vector<pthread_t> pids(UserNum);
pthread_mutex_init(&lock, nullptr);
for(int i = 0; i < UserNum; i++) {
char buffer[64];
snprintf(buffer, sizeof buffer, "thread-%d", i + 1);
pthread_create(&pids[i], nullptr, thread_run, buffer);
}
for(auto& pid : pids) {
pthread_join(pid, nullptr);
}
pthread_mutex_destroy(&lock);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# 互斥量的销毁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
销毁互斥量需要注意:
- 使用 PTHREAD_MUTEX_INITIALIZER 初始化的互斥量不需要销毁
- 不能销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
# 互斥量实现原理
互斥锁通常是一个存储在内存中的变量(或更复杂的数据结构),它用于控制对共享资源的访问。
- 当一个线程尝试获取互斥锁时,首先,线程会检查互斥锁的状态,通常以某个变量或标志表示。假设该变量的值为 1 表示锁未被持有,为 0 表示锁已被持有。
- 如果线程发现锁的状态为 1,即表示锁未被其他线程持有,那么它将执行获取锁的操作,并将锁的状态修改为 0,标志自己已经成功获取了锁。
- 如果线程发现锁的状态为 0,即表示锁已经被其他线程持有,那么它将无法立即获取锁。在这种情况下,线程会被阻塞等待,直到锁的状态变为 1。
互斥锁的原子性是通过操作系统提供的原子操作或锁机制来实现的。这些机制确保了在任何时刻只有一个线程能够成功地修改锁的状态,以避免竞争条件和数据不一致性的问题。
因此,互斥锁的交换或修改操作本质上是在内存中修改锁的状态。这个过程确保了只有一个线程能够访问共享资源,从而避免了数据竞争和不一致性。
lock:
movb $0,%al
xchgb al mutex
if(a1寄存器的内容 > 0) {
return 0;
}
else
挂起等待;
goto lock;
unlock :
movb $1,mutex
唤醒等待Mutex的线程;
return 0;
2
3
4
5
6
7
8
9
10
11
12
13

# 死锁
概念
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态
必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁问题
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
# 线程同步
线程同步是多个线程协调彼此操作以完成特定任务的过程。它确保线程按照一定的顺序执行,以避免竞态条件和数据不一致性问题。在多线程环境中,线程同步涉及到控制线程的执行顺序和对共享资源的访问。
竞态条件
竞态条件指的是在多线程或并发环境下,程序的执行结果依赖于线程执行的具体时序或顺序。具体来说,竞态条件发生在多个线程试图同时访问共享资源,并且对资源的访问顺序未经过适当的同步控制时。在竞态条件下,由于线程执行顺序的不确定性,程序的行为可能变得不可预测,导致出现错误或不一致的结果。
# 条件变量
条件变量是一种线程同步的机制,用于实现线程间的条件等待和通知。它允许一个线程在等待某个特定条件为真时进入睡眠状态,同时允许其他线程在条件变量满足特定条件时唤醒等待的线程。
条件变量通常与互斥锁一起使用,以确保等待线程在检查条件和进入等待状态之间不会发生竞态条件。等待线程首先获取互斥锁,然后检查条件是否满足,如果条件不满足,则等待条件变量。当其他线程改变了条件并且发出信号时,等待线程被唤醒并重新检查条件。
条件变量的核心思想是提供一个"通知-等待"机制:
- 等待:当条件不满足时,线程在条件变量上阻塞(休眠),自动释放互斥锁,把资源让给其他线程。
- 通知:当条件满足时,其他线程通过条件变量唤醒等待的线程,被唤醒的线程在返回前会自动重新获得互斥锁。
# 条件变量初始化
- 动态分配:
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
参数:
- cond:要初始化的条件变量
- attr:nullptr
- 静态分配
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
# 条件变量销毁
int pthread_cond_destroy(pthread_cond_t *cond);
# 等待条件变量满足
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
pthread_mutex_t *restrict mutex,
const struct timespec *restrict abstime);
2
3
4
使线程等待一个条件变量变为真。在等待期间,线程会释放与之关联的互斥锁,以便其他线程可以修改条件。当条件变量被其他线程通过 pthread_cond_signal 或 pthread_cond_broadcast 唤醒时,pthread_cond_wait 会重新获取互斥锁并返回。
# 唤醒等待
// 1.
int pthread_cond_broadcast(pthread_cond_t *cond);
// 2.
int pthread_cond_signal(pthread_cond_t *cond);
2
3
4
区别:
- pthread_cond_signal 函数用于唤醒等待在指定条件变量上的一个线程。如果有多个线程正在等待该条件变量,那么具体唤醒哪个线程是由系统调度器决定的,通常是基于某种优先级或调度策略。
- pthread_cond_broadcast 函数用于唤醒等待队列中的全部线程。
使用规范:
// 等待端:
pthread_mutex_lock(&mutex);
while (条件为假) {
pthread_cond_wait(&cond, &mutex); // 阻塞并自动释放锁
}
// ... 条件满足,执行操作
pthread_mutex_unlock(&mutex);
// 通知端:
pthread_mutex_lock(&mutex);
// ... 修改共享数据,使条件成立
pthread_cond_signal(&cond); // 或者 broadcast
pthread_mutex_unlock(&mutex);
2
3
4
5
6
7
8
9
10
11
12
13
使用 while 循环是为了防止虚假唤醒:
虚假唤醒是指在没有收到信号的情况下,等待条件变量的线程被唤醒的情况。如果使用 if 语句,那么即使条件不满足,线程也可能在没有收到信号的情况下被唤醒,然后继续执行后续代码。使用 while 循环是当一个线程被唤醒后,它会重新检查条件,如果条件仍然不满足,则线程会继续等待,只有当条件满足时,线程才会继续执行后续代码。
虚假唤醒的产生原因:
- 在 POSIX 系统中,如果进程接收到了普通的 Unix 信号(比如 SIGUSR1),处于 pthread_cond_wait 阻塞的线程可能会被操作系统内核中断以执行信号处理函数。当中断返回后,系统为了简化实现逻辑,有时会直接让 wait 返回错误码或成功返回,而不是回到原来的深度睡眠状态。
- 为了追求更高的并发性能,一些底层硬件指令或内核调度策略会主动、提前唤醒某些休眠线程,以此减少真正事件到来时的唤醒延迟。
条件变量使用示例:
#include <pthread.h>
#include <unistd.h>
#include <iostream>
using namespace std;
// 定义条件变量和互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int tickets = 100;
void* start_routine(void* args) {
string name = static_cast<const char*>(args);
// what's pthread do
while (true) {
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
cout << name << " -> " << tickets;
tickets--;
pthread_mutex_unlock(&mutex);
}
return nullptr;
}
int main() {
pthread_t p1, p2;
pthread_create(&p1, nullptr, start_routine, (void*)"thread1");
pthread_create(&p1, nullptr, start_routine, (void*)"thread2");
while (true) {
sleep(1);
pthread_cond_signal(&cond);
cout << "main thread wakeup a thread" << endl;
}
pthread_join(p1, nullptr);
pthread_join(p1, nullptr);
return 0;
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
运行结果:
thread1 -> 100main thread wakeup a thread
thread2 -> 99main thread wakeup a thread
thread1 -> 98main thread wakeup a thread
thread2 -> 97main thread wakeup a thread
thread1 -> 96main thread wakeup a thread
2
3
4
5
# 信号量
信号量是一种用于控制对共享资源的访问的同步原语。它通常是一个非负整数,用于表示可用资源的数量,并提供两个原子操作:
- P 操作(wait):将信号量的值减 1,如果结果值小于 0,则进程将被阻塞(放入等待队列),直到信号量值变为非负值。
- V 操作(signal):将信号量的值加 1,如果有任何进程在等待队列中等待该信号量,则其中一个进程将被唤醒。
信号量的主要作用是协调多个线程或进程对共享资源的访问,以避免竞争条件和提高系统效率。信号量常被用于实现互斥和同步:
- 互斥:互斥锁是信号量的一个特例,其值通常初始化为 1。当一个线程需要访问共享资源时,它会尝试获取这个信号量(即将其值减 1)。如果信号量的值大于 0,则获取成功,线程可以访问共享资源;如果信号量的值为 0,则线程必须等待,直到其他线程释放该信号量(即将其值加 1)。
- 同步:信号量还可以用于同步多个线程或进程的活动。例如,可以使用信号量来确保一个线程在另一个线程完成某个任务之前不会开始执行。或者,可以使用信号量来限制同时访问共享资源的线程数量。
pthread 的信号量函数
int sem_init(sem_t *sem, int pshared, unsigned int value)
// sem:指向要初始化的信号量对象的指针
// pshared:指定信号量是否应在进程间共享
// value:信号量的初始值
int sem_destroy(sem_t *sem)
int sem_wait(sem_t *sem); // P:信号量的值大于零,则减一并立即返回;信号量的值为零,则调用线程会被阻塞,直到信号量的值变为大于零
int sem_port(sem_t *sem); // V:如果有其他线程或进程正在 sem_wait 调用中等待该信号量,那么其中一个等待的线程或进程将被唤醒并继续执行
2
3
4
5
6
7
# 可重入与线程安全
# 概念
- 重入:同一个函数被不同的执行流调用,当前一个进程还没有执行完,就又其他的执行流再次进入,我们称之为重入。一个函数在被重入的情况下,运行结果不会出现任何不同或者任何问题,则该函数被称为可重入函数,否则是不可重入函数。可重入函数不依赖于全局或静态变量,而是通过参数传递和局部变量来保持线程安全。
- 线程安全是指在多线程环境下,共享资源能够被多个线程安全地访问和操作,而不会导致数据不一致或不可预料的结果。
# 常见情况
- 常见的线程不安全的情况:
- 不保护共享变量的函数
- 函数状态随着被调用,状态发生变化的函数
- 返回指向静态变量指针的函数
- 调用线程不安全函数的函数
- 常见的线程安全的情况:
- 每个线程对全局变量或者静态变量只有读取的权限,而没有写入的权限,一般来说这些线程是安全的
- 类或者接口对于线程来说都是原子操作
- 多个线程之间的切换不会导致该接口的执行结果存在二义性
- 常见不可重入的情况:
- 调用了 malloc/free 函数,因为 malloc 函数是用全局链表来管理堆的
- 调用了标准 I/O 库函数,标准 I/O 库的很多实现都以不可重入的方式使用全局数据结构
- 可重入函数体内使用了静态的数据结构
- 常见可重入的情况:
- 不使用全局变量或静态变量
- 不使用用 malloc 或者 new 开辟出的空间
- 不调用不可重入函数
- 不返回静态或全局数据,所有数据都有函数的调用者提供
- 使用本地数据,或者通过制作全局数据的本地拷贝来保护全局数据