线程安全
# 线程互斥
- 临界资源:多线程执行流安全访问的共享的资源
- 临界区:每个线程内部,访问临界资源的代码
- 互斥:任何时刻,互斥保证有且只有一个执行流进入临界区,访问临界资源,通常对临界资源起保护作用
- 原子性:对一个资源进行访问的时候,要么不做、要么做完,并且对资源进行的操作,只用一条汇编即可完成
例如,下面的模拟抢票程序:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
int ticket = 100;
void *route(void *arg) {
char *id = (char *)arg;
while (1) {
if (ticket > 0) {
usleep(1000);
printf("%s sells ticket:%d\n", id, ticket);
ticket--;
}
else {
break;
}
}
return nullptr;
}
int main(void) {
pthread_t t1, t2, t3, t4;
pthread_create(&t1, NULL, route, (void*)"thread 1");
pthread_create(&t2, NULL, route, (void*)"thread 2");
pthread_create(&t3, NULL, route, (void*)"thread 3");
pthread_create(&t4, NULL, route, (void*)"thread 4");
pthread_join(t1, NULL);
pthread_join(t2, NULL);
pthread_join(t3, NULL);
pthread_join(t3, NULL);
pthread_join(t4, NULL);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
运行程序结果如下:
thread 2 sells ticket:100
thread 3 sells ticket:99
thread 4 sells ticket:98
thread 1 sells ticket:100
...
thread 4 sells ticket:3
thread 3 sells ticket:2
thread 2 sells ticket:1
thread 4 sells ticket:0
thread 3 sells ticket:-1
thread 1 sells ticket:-2
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
我们发现了票数会重复并且出现了负数,原因如下:
- if 语句判断条件为真以后,代码可以并发的切换到其他线程
- usleep 这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
- --ticket 操作本身就不是一个原子操作,而是由 3 条汇编语言构成的:
- load :将共享变量 ticket 从内存加载到寄存器中
- update : 更新寄存器里面的值,执行-1 操作
- store :将新值,从寄存器写回共享变量 ticket 的内存地址
为了解决以上问题,就要做到以下三点:
- 代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
- 如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
- 如果线程不在临界区中执行,那么该线程不能阻止其他线程进入临界区。
Linux 中,提供的方式就是(互斥量)锁
# 初始化互斥量
- 静态分配
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER
1
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <iostream>
#include "Mutex.hpp"
using namespace std;
// 没有分号哦
#define UserNum 4
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 共享资源:
int tickets = 1000;
// 抢票函数:
void* thread_run(void* args) {
char* threadname = (char*) args;
while (true) {
pthread_mutex_lock(&lock);
usleep(10000);
if(tickets > 0) {
cout << threadname << "正在抢票:" << tickets << endl;
tickets--;
// 解锁
pthread_mutex_unlock(&lock);
} else {
pthread_mutex_unlock(&lock);
break;
}
usleep(1000);
}
return nullptr;
}
int main() {
pthread_mutex_t lock;
vector<pthread_t> pids(UserNum);
pthread_mutex_init(&lock, nullptr);
for(int i = 0; i < UserNum; i++) {
char buffer[64];
snprintf(buffer, sizeof buffer, "thread-%d", i + 1);
pthread_create(&pids[i], nullptr, thread_run, buffer);
}
for(auto& pid : pids) {
pthread_join(pid, nullptr);
}
pthread_mutex_destroy(&lock);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
- 动态分配
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
1
# 互斥量的加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// 返回值:成功返回0,失败返回错误号
1
2
3
2
3
调用 pthread_mutex_lock 时,可能会遇到以下情况:
- 互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功。
- 发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么 pthread_mutex_lock 调用会陷入阻塞(执行流被挂起),等待互斥量解锁。
将模拟抢票程序优化,加上互斥锁:
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <iostream>
using namespace std;
// 没有分号哦
#define UserNum 4
class ThreadData {
public:
ThreadData(const string& name, pthread_mutex_t* _mutex) : threadname(name), mutex(_mutex) {}
~ThreadData() {}
public:
string threadname;
pthread_mutex_t* mutex;
};
// 共享资源:
int tickets = 1000;
// 抢票函数:
void* thread_run(void* args) {
ThreadData* td = static_cast<ThreadData*>(args);
while (true) {
pthread_mutex_lock(td->mutex);
usleep(10000);
if(tickets > 0) {
cout << td->threadname << "正在抢票:" << tickets << endl;
tickets--;
// 解锁
pthread_mutex_unlock(td->mutex);
} else {
pthread_mutex_unlock(td->mutex);
break;
}
usleep(1000);
}
return nullptr;
}
int main() {
pthread_mutex_t lock;
vector<pthread_t> pids(UserNum);
pthread_mutex_init(&lock, nullptr);
for(int i = 0; i < UserNum; i++) {
char buffer[64];
snprintf(buffer, sizeof buffer, "thread-%d", i + 1);
ThreadData* td = new ThreadData(buffer, &lock);
pthread_create(&pids[i], nullptr, thread_run, td);
}
for(auto& pid : pids) {
pthread_join(pid, nullptr);
}
pthread_mutex_destroy(&lock);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
也可以使用 RAII 风格来封装:
// Mutex.hpp
#include <iostream>
#include <unistd.h>
#include <pthread.h>
using namespace std;
class Mutex {
public:
Mutex(pthread_mutex_t* lock_p_ = nullptr) : lock_p(lock_p_) {}
~Mutex() {}
void lock() {
if(lock_p) pthread_mutex_lock(lock_p);
}
void unlock() {
if(lock_p) pthread_mutex_unlock(lock_p);
}
private:
pthread_mutex_t* lock_p;
};
class LockGuard {
public:
LockGuard(pthread_mutex_t* lock_p_) : mutex(lock_p_) {
mutex.lock();
}
~LockGuard() {
mutex.unlock();
}
private:
Mutex mutex;
};
// ---------------------
#include <pthread.h>
#include <unistd.h>
#include <vector>
#include <iostream>
#include "Mutex.hpp"
using namespace std;
// 没有分号哦
#define UserNum 4
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 共享资源:
int tickets = 1000;
// 抢票函数:
void* thread_run(void* args) {
char* threadname = (char*) args;
while (true) {
LockGuard lockguard(&lock);
usleep(10000);
if(tickets > 0) {
cout << threadname << "正在抢票:" << tickets << endl;
tickets--;
} else {
break;
}
usleep(1000);
}
return nullptr;
}
int main() {
pthread_mutex_t lock;
vector<pthread_t> pids(UserNum);
pthread_mutex_init(&lock, nullptr);
for(int i = 0; i < UserNum; i++) {
char buffer[64];
snprintf(buffer, sizeof buffer, "thread-%d", i + 1);
pthread_create(&pids[i], nullptr, thread_run, buffer);
}
for(auto& pid : pids) {
pthread_join(pid, nullptr);
}
pthread_mutex_destroy(&lock);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# 互斥量的销毁
int pthread_mutex_destroy(pthread_mutex_t *mutex);
1
销毁互斥量需要注意:
- 使用 PTHREAD* MUTEX* INITIALIZER 初始化的互斥量不需要销毁
- 不能销毁一个已经加锁的互斥量
- 已经销毁的互斥量,要确保后面不会有线程再尝试加锁
# 互斥量实现原理
lock:
movb $0,%al
xchgb al mutex
if(a1寄存器的内容 > 0) {
return 0;
}
else
挂起等待;
goto lock;
unlock :
movb $1,mutex
唤醒等待Mutex的线程;
return 0;
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 死锁
概念
死锁是指在一组进程中的各个进程均占有不会释放的资源,但因互相申请被其他进程所站用不会释放的资源而处于的一种永久等待状态
必要条件
- 互斥条件:一个资源每次只能被一个执行流使用
- 请求与保持条件:一个执行流因请求资源而阻塞时,对已获得的资源保持不放
- 不剥夺条件:一个执行流已获得的资源,在末使用完之前,不能强行剥夺
- 循环等待条件:若干执行流之间形成一种头尾相接的循环等待资源的关系
避免死锁问题
- 破坏死锁的四个必要条件
- 加锁顺序一致
- 避免锁未释放的场景
- 资源一次性分配
# Linux 线程同步
# 同步和竞态条件
- 同步:在保证数据安全的前提下,让线程能够按照某种特定的顺序访问临界资源,从而避免死锁
- 竞态条件:因为时序问题,导致程序异常
# 条件变量
当一个线程互斥地访问某个变量时,它可能发现在其它线程改变状态之前,它什么也做不了
# 条件变量初始化
- 动态分配:
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
1
参数:
- cond:要初始化的条件变量
- attr:nullptr
- 静态分配
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
1
# 条件变量销毁
int pthread_cond_destroy(pthread_cond_t *cond);
1
# 等待条件变量满足
int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
1
# 唤醒等待
// 1.
int pthread_cond_broadcast(pthread_cond_t *cond);
// 2.
int pthread_cond_signal(pthread_cond_t *cond);
1
2
3
4
2
3
4
区别:
- pthread_cond_signal 函数用于唤醒等待队列中首个线程。
- pthread_cond_broadcast 函数用于唤醒等待队列中的全部线程。
示例:
#include <pthread.h>
#include <unistd.h>
#include <iostream>
using namespace std;
// 定义条件变量和互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int tickets = 100;
void* start_routine(void* args) {
string name = static_cast<const char*>(args);
// what's pthread do
while (true) {
pthread_mutex_lock(&mutex);
pthread_cond_wait(&cond, &mutex);
cout << name << " -> " << tickets;
tickets--;
pthread_mutex_unlock(&mutex);
}
return nullptr;
}
int main() {
pthread_t p1, p2;
pthread_create(&p1, nullptr, start_routine, (void*)"thread1");
pthread_create(&p1, nullptr, start_routine, (void*)"thread2");
while (true) {
sleep(1);
pthread_cond_signal(&cond);
cout << "main thread wakeup a thread" << endl;
}
pthread_join(p1, nullptr);
pthread_join(p1, nullptr);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
运行结果:
thread1 -> 100main thread wakeup a thread
thread2 -> 99main thread wakeup a thread
thread1 -> 98main thread wakeup a thread
thread2 -> 97main thread wakeup a thread
thread1 -> 96main thread wakeup a thread
1
2
3
4
5
2
3
4
5
上次更新: 2025/09/03, 18:26:17