cen's blog cen's blog
首页
  • 编程文章

    • markdown使用
  • 学习笔记

    • C++学习
    • C++数据结构
    • MySQL
    • Linux
    • 网络编程
算法
  • CLion
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 分类
  • 标签
  • 归档
关于
GitHub (opens new window)

cen

十年饮冰,难凉热血
首页
  • 编程文章

    • markdown使用
  • 学习笔记

    • C++学习
    • C++数据结构
    • MySQL
    • Linux
    • 网络编程
算法
  • CLion
  • 技术文档
  • GitHub技巧
  • Nodejs
  • 博客搭建
  • 分类
  • 标签
  • 归档
关于
GitHub (opens new window)
  • Linux环境搭建
  • 基本指令
  • 权限
  • Linux基础开发工具
  • 进程概念
  • 进程控制
  • 基础IO流
  • 动态库和静态库
  • 进程通信
  • 进程信号
  • 多线程
  • 线程安全
  • 生产者消费者
    • 介绍
    • 特点
    • Blockingqueue 实现
    • 环形队列 实现
      • 单生产单消费
      • 多成产多消费
  • 线程池
  • Linux
cen
2025-07-30
目录

生产者消费者

# 介绍

生产者消费者模型是通过共享缓冲区协调生产数据和使用数据的两类线程/进程的并发协作模式。

# 特点

3-2-1:

  • 三种关系: 生产者和生产者(互斥关系)、消费者和消费者(互斥关系)、生产者和消费者(互斥关系、同步关系)。
  • 两种角色: 生产者和消费者。(通常由进程或线程承担)
  • 一个交易场所: 通常指的是内存中的一段缓冲区。(可以自己通过某种方式组织起来)

# Blockingqueue 实现

在多线程编程中,阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。

其与普通的队列的区别在于:

当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中放入了元素。 当队列满时,往队列里存放元素的操作会被阻塞,直到有元素从队列中取出。

Blockqueue.hpp

#include <pthread.h>
#include <iostream>
#include <queue>

using namespace std;
/**
 * tip:
 * 输入型参数:const &
 * 输出型参数:*
 * 输入输出型参数:&
 */
template <class T>
class BlockQueue {
public:
    BlockQueue(int _maxcap) : maxcap(_maxcap) {
        pthread_mutex_init(&mutex, nullptr);
        pthread_cond_init(&p_cond, nullptr);
        pthread_cond_init(&c_cond, nullptr);
    }

    void push(const T& in) {
        // productor pushes data
        pthread_mutex_lock(&mutex);
        while (isFull()) {
            // full?productor waiting...
            pthread_cond_wait(&p_cond, &mutex);
        }
        q.push(in);
        // wakeup consumer
        pthread_cond_signal(&c_cond);
        pthread_mutex_unlock(&mutex);
    }

    void pop(T* out) {
        // consumer pops data
        pthread_mutex_lock(&mutex);
        if (isEmpty()) {
            pthread_cond_wait(&c_cond, &mutex);
        }
        *out = q.front();
        q.pop();
        pthread_cond_signal(&p_cond);
        pthread_mutex_unlock(&mutex);
    }

    ~BlockQueue() {
        pthread_mutex_destroy(&mutex);
        pthread_cond_destroy(&p_cond);
        pthread_cond_destroy(&c_cond);
    }

private:
    bool isFull() { return q.size() == maxcap; }
    bool isEmpty() { return q.empty(); }

private:
    queue<T> q;
    int maxcap;
    pthread_mutex_t mutex;
    pthread_cond_t p_cond;
    pthread_cond_t c_cond;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

Main.cpp

#include <unistd.h>
#include "Blockqueue.hpp"

void* consumer(void* args) {
    while (true) {
        BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
        // consumer activities:
        int data;
        bq->pop(&data);
        cout << "consumer data:" << data << endl;
    }
    return nullptr;
}

void* productor(void* args) {
    while (true) {
        BlockQueue<int>* bq = static_cast<BlockQueue<int>*>(args);
        // productor activities:
        int data = rand() % 10 + 1;
        bq->push(data);
        cout << "productor data:" << data << endl;
    }
    return nullptr;
}

int main() {
    srand((unsigned long)time(nullptr) ^ getpid());
    BlockQueue<int>* bq = new BlockQueue<int>(5);

    pthread_t c, p;
    pthread_create(&c, nullptr, consumer, bq);
    pthread_create(&p, nullptr, productor, bq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    delete bq;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 环形队列 实现

# 单生产单消费

CircleQueue.hpp

#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <vector>

using namespace std;

template <class T>
class CircleQueue {
private:
    void P(sem_t& sem) { sem_wait(&sem); }

    void V(sem_t& sem) { sem_post(&sem); }

public:
    CircleQueue(int _maxcap) : circlequeue(_maxcap), maxcap(_maxcap) {
        sem_init(&p_sem, 0, maxcap);
        sem_init(&c_sem, 0, 0);
        consumerstep = productorstep = 0;
    }

    void push(const T& in) {
        sleep(1);
        P(p_sem);
        // success
        circlequeue[productorstep++] = in;
        productorstep %= maxcap;
        V(c_sem);
        // fail
    }

    void pop(T* out) {
        P(c_sem);
        *out = circlequeue[consumerstep++];
        consumerstep %= maxcap;
        V(p_sem);
    }

    ~CircleQueue() {
        sem_destroy(&p_sem);
        sem_destroy(&c_sem);
    }

private:
    vector<T> circlequeue;
    int maxcap;
    sem_t p_sem;  // 生产者看重空间资源
    sem_t c_sem;  // 消费者看重数据资源
    int consumerstep;
    int productorstep;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

Main.cpp

#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "Circlequeue.hpp"

void* consumer(void* args) {
    CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);
    while (true) {
        int data;
        cq->pop(&data);
        cout << "consumer -- data:" << data << endl;
    }
}

void* productor(void* args) {
    CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);
    while (true) {
        int data = rand() % 10 + 1;
        cq->push(data);
        cout << "productor -- data:" << data << endl;
    }
}

// single consumer + single productor:
int main() {
    srand((unsigned long)time(nullptr) ^ getpid());
    CircleQueue<int>* cq = new CircleQueue<int>(5);

    pthread_t p, c;

    pthread_create(&c, nullptr, consumer, cq);
    pthread_create(&p, nullptr, productor, cq);

    pthread_join(c, nullptr);
    pthread_join(p, nullptr);

    delete cq;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 多成产多消费

CircleQueue.hpp

#include <pthread.h>
#include <semaphore.h>
#include <iostream>
#include <vector>

using namespace std;

template <class T>
class CircleQueue {
private:
    void P(sem_t& sem) { sem_wait(&sem); }

    void V(sem_t& sem) { sem_post(&sem); }

public:
    CircleQueue(int _maxcap) : circlequeue(_maxcap), maxcap(_maxcap) {
        sem_init(&p_sem, 0, maxcap);
        sem_init(&c_sem, 0, 0);
        pthread_mutex_init(&pmutex, nullptr);
        pthread_mutex_init(&cmutex, nullptr);
        consumerstep = productorstep = 0;
    }

    void push(const T& in) {
        P(p_sem);
        pthread_mutex_lock(&pmutex);
        // success
        circlequeue[productorstep++] = in;
        productorstep %= maxcap;
        pthread_mutex_unlock(&pmutex);
        V(c_sem);
    }

    void pop(T* out) {
        P(c_sem);
        pthread_mutex_lock(&cmutex);
        *out = circlequeue[consumerstep++];
        consumerstep %= maxcap;
        pthread_mutex_unlock(&cmutex);
        V(p_sem);
    }

    ~CircleQueue() {
        sem_destroy(&p_sem);
        sem_destroy(&c_sem);
        pthread_mutex_destroy(&pmutex);
        pthread_mutex_destroy(&cmutex);
    }

private:
    vector<T> circlequeue;
    int maxcap;
    sem_t p_sem;  // 生产者看重空间资源
    sem_t c_sem;  // 消费者看重数据资源
    int consumerstep;
    int productorstep;
    pthread_mutex_t cmutex;
    pthread_mutex_t pmutex;
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

Main.cpp

#include <unistd.h>
#include <cstdlib>
#include <ctime>
#include "Circlequeue.hpp"

string setName() {
    char buffer[32];
    snprintf(buffer, sizeof(buffer), "thread[0x%x]", pthread_self());
    return buffer;
}

void* consumer(void* args) {
    CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);
    while (true) {
        int data;
        cq->pop(&data);
        cout << setName() << "consumer -- data:" << data << endl;
    }
}

void* productor(void* args) {
    CircleQueue<int>* cq = static_cast<CircleQueue<int>*>(args);
    while (true) {
        int data = rand() % 10 + 1;
        cq->push(data);
        cout << setName() << "productor -- data:" << data << endl;
    }
}

// multiple consumer + multiple productor:
int main() {
    srand((unsigned long)time(nullptr) ^ getpid());
    CircleQueue<int>* cq = new CircleQueue<int>(5);

    pthread_t p[3], c[3];
    for (auto& i : p) {
        pthread_create(&i, nullptr, productor, cq);
    }

    for (auto& i : c) {
        pthread_create(&i, nullptr, consumer, cq);
    }

    for (auto& i : p)
        pthread_join(i, nullptr);

    for (auto& i : c)
        pthread_join(i, nullptr);

    delete cq;
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
上次更新: 2025/09/03, 18:26:17
线程安全
线程池

← 线程安全 线程池→

最近更新
01
网络协议
09-03
02
套接字和TCP
08-26
03
常用数据结构
08-23
更多文章>
Theme by Vdoing | Copyright © 2024-2025 京ICP备2020044002号-3 京公网安备11010502056119号
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式