生产者消费者
# 介绍
生产者消费者模型是通过共享缓冲区协调生产数据和使用数据的两类线程/进程的并发协作模式。
# 特点
3-2-1:
- 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
- 两种角色: 生产者和消费者。(通常由进程或线程承担)
- 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)
# Blockingqueue 实现
在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。
其与普通的队列的区别在于:
当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。
Blockqueue.hpp
#include <pthread.h>
#include <iostream>
#include <queue>
using namespace std;
/**
* tip:
* 输入型参数:const &
* 输出型参数:*
* 输入输出型参数:&
*/
template <class T>
class BlockQueue {
public:
BlockQueue(int _maxcap) : maxcap(_maxcap) {
pthread_mutex_init(&mutex, nullptr);
pthread_cond_init(&p_cond, nullptr);
pthread_cond_init(&c_cond, nullptr);
}
void push(const T& in) {
// productor pushes data
pthread_mutex_lock(&mutex);
while (isFull()) {
// full?productor waiting...
pthread_cond_wait(&p_cond, &mutex);
}
q.push(in);
// wakeup consumer
pthread_cond_signal(&c_cond);
pthread_mutex_unlock(&mutex);
}
void pop(T* out) {
// consumer pops data
pthread_mutex_lock(&mutex);
if (isEmpty()) {
pthread_cond_wait(&c_cond, &mutex);
}
*out = q.front();
q.pop();
pthread_cond_signal(&p_cond);
pthread_mutex_unlock(&mutex);
}
~BlockQueue() {
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&p_cond);
pthread_cond_destroy(&c_cond);
}
private:
bool isFull() { return q.size() == maxcap; }
bool isEmpty() { return q.empty(); }
private:
queue<T> q;
int maxcap;
pthread_mutex_t mutex;
pthread_cond_t p_cond;
pthread_cond_t c_cond;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
Main.cpp
#include <unistd.h>
#include "Blockqueue.hpp"
void* consumer(void* args) {
while (true) {
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
// consumer activities:
int data;
bq->pop(&data);
cout << "consumer data:" << data << endl;
}
return nullptr;
}
void* productor(void* args) {
while (true) {
BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
// productor activities:
int data = rand() % 10 + 1;
bq->push(data);
cout << "productor data:" << data << endl;
}
return nullptr;
}
int main() {
srand((unsigned long)time(nullptr) ^ getpid());
BlockQueue<int>* bq = new BlockQueue<int>(5);
pthread_t c, p;
pthread_create(&c, nullptr, consumer, bq);
pthread_create(&p, nullptr, productor, bq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete bq;
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 环形队列 实现
# 单生产单消费
CircleQueue.hpp
#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <vector>
using namespace std;
template <class T>
class CircleQueue {
private:
void P(sem_t& sem) { sem_wait(&sem); }
void V(sem_t& sem) { sem_post(&sem); }
public:
CircleQueue(int _maxcap) : circlequeue(_maxcap), maxcap(_maxcap) {
sem_init(&p_sem, 0, maxcap);
sem_init(&c_sem, 0, 0);
consumerstep = productorstep = 0;
}
void push(const T& in) {
sleep(1);
P(p_sem);
// success
circlequeue[productorstep++] = in;
productorstep %= maxcap;
V(c_sem);
// fail
}
void pop(T* out) {
P(c_sem);
*out = circlequeue[consumerstep++];
consumerstep %= maxcap;
V(p_sem);
}
~CircleQueue() {
sem_destroy(&p_sem);
sem_destroy(&c_sem);
}
private:
vector<T> circlequeue;
int maxcap;
sem_t p_sem; // 生产者看重空间资源
sem_t c_sem; // 消费者看重数据资源
int consumerstep;
int productorstep;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
Main.cpp
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "Circlequeue.hpp"
void* consumer(void* args) {
CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);
while (true) {
int data;
cq->pop(&data);
cout << "consumer -- data:" << data << endl;
}
}
void* productor(void* args) {
CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);
while (true) {
int data = rand() % 10 + 1;
cq->push(data);
cout << "productor -- data:" << data << endl;
}
}
// single consumer + single productor:
int main() {
srand((unsigned long)time(nullptr) ^ getpid());
CircleQueue<int>* cq = new CircleQueue<int>(5);
pthread_t p, c;
pthread_create(&c, nullptr, consumer, cq);
pthread_create(&p, nullptr, productor, cq);
pthread_join(c, nullptr);
pthread_join(p, nullptr);
delete cq;
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 多成产多消费
CircleQueue.hpp
#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <vector>
using namespace std;
template <class T>
class CircleQueue {
private:
void P(sem_t& sem) { sem_wait(&sem); }
void V(sem_t& sem) { sem_post(&sem); }
public:
CircleQueue(int _maxcap) : circlequeue(_maxcap), maxcap(_maxcap) {
sem_init(&p_sem, 0, maxcap);
sem_init(&c_sem, 0, 0);
pthread_mutex_init(&pmutex, nullptr);
pthread_mutex_init(&cmutex, nullptr);
consumerstep = productorstep = 0;
}
void push(const T& in) {
P(p_sem);
pthread_mutex_lock(&pmutex);
// success
circlequeue[productorstep++] = in;
productorstep %= maxcap;
pthread_mutex_unlock(&pmutex);
V(c_sem);
}
void pop(T* out) {
P(c_sem);
pthread_mutex_lock(&cmutex);
*out = circlequeue[consumerstep++];
consumerstep %= maxcap;
pthread_mutex_unlock(&cmutex);
V(p_sem);
}
~CircleQueue() {
sem_destroy(&p_sem);
sem_destroy(&c_sem);
pthread_mutex_destroy(&pmutex);
pthread_mutex_destroy(&cmutex);
}
private:
vector<T> circlequeue;
int maxcap;
sem_t p_sem; // 生产者看重空间资源
sem_t c_sem; // 消费者看重数据资源
int consumerstep;
int productorstep;
pthread_mutex_t cmutex;
pthread_mutex_t pmutex;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
Main.cpp
#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "Circlequeue.hpp"
string setName() {
char buffer[32];
snprintf(buffer, sizeof(buffer), "thread[0x%x]", pthread_self());
return buffer;
}
void* consumer(void* args) {
CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);
while (true) {
int data;
cq->pop(&data);
cout << setName() << "consumer -- data:" << data << endl;
}
}
void* productor(void* args) {
CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);
while (true) {
int data = rand() % 10 + 1;
cq->push(data);
cout << setName() << "productor -- data:" << data << endl;
}
}
// multiple consumer + multiple productor:
int main() {
srand((unsigned long)time(nullptr) ^ getpid());
CircleQueue<int>* cq = new CircleQueue<int>(5);
pthread_t p[3], c[3];
for (auto& i : p) {
pthread_create(&i, nullptr, productor, cq);
}
for (auto& i : c) {
pthread_create(&i, nullptr, consumer, cq);
}
for (auto& i : p)
pthread_join(i, nullptr);
for (auto& i : c)
pthread_join(i, nullptr);
delete cq;
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
上次更新: 2025/09/03, 18:26:17