C++怎么实现一个阻塞队列_C++多线程生产者-消费者模型的关键数据结构

阻塞队列通过std::queue、std::mutex和std::condition_variable实现线程安全的生产者-消费者模型,支持队列满时生产者阻塞、队列空时消费者阻塞,利用wait与notify机制实现高效同步。

阻塞队列是实现 C++ 多线程生产者-消费者模型的核心数据结构。它允许一个或多个生产者线程向队列中添加元素,同时一个或多个消费者线程从队列中取出元素。当队列为空时,消费者线程会被阻塞,直到有新数据到来;当队列满时,生产者线程也会被阻塞,直到有空位可用。

在 C++ 中,可以通过 std::queuestd::mutexstd::condition_variable 来实现一个线程安全的阻塞队列。

基本组件说明

实现阻塞队列需要以下几个关键部分:

  • std::queue:用于存储数据的底层容器。
  • std::mutex:保护共享队列,防止多线程竞争。
  • std::condition_variable:用于线程间通信,实现“等待-通知”机制。
  • 最大容量(可选):限制队列大小,实现有界阻塞队列。

阻塞队列代码实现

下面是一个简单的有界阻塞队列实现:

#include 
#include 
#include 
#include 

template 
class BlockingQueue {
private:
    std::queue data_queue_;
    std::mutex mtx_;
    std::condition_variable not_full_;
    std::condition_variable not_empty_;
    size_t max_size_;

public:
    explicit BlockingQueue(size_t max_size = 1000) : max_size_(max_size) {}

    void put(T item) {
        std::unique_lock lock(mtx_);
        // 队列满时等待
        not_full_.wait(lock, [this] { return data_queue_.size() < max_size_; });
        data_queue_.push(std::move(item));
        not_empty_.notify_one(); // 通知消费者
    }

    T take() {
        std::unique_lock lock(mtx_);
        // 队列空时等待
        not_empty_.wait(lock, [this] { return !data_queue_.empty(); });
        T item = std::move(data_queue_.front());
        data_queue_.pop();
        not_full_.notify_one(); // 通知生产者
        return item;
    }

    bool empty() const {
        std::lock_guard lock(mtx_);
        return data_queue_.empty();
    }

    size_t size() const {
        std::lock_guard lock(mtx_);
        return data_queue_.size();
    }
};

生产者-消费者示例

使用上面的阻塞队列实现一个简单的生产者-消费者模型:

#include 

void producer(BlockingQueue& queue) {
    for (int i = 0; i < 5; ++i) {
        queue.put(i);
        std::cout << "Produced: " << i << "\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer(BlockingQueue& queue) {
    for (int i = 0; i < 5; ++i) {
        int value = queue.take();
        std::cout << "Consumed: " << value << "\n";
    }
}

int main() {
    BlockingQueue queue(3); // 容量为3

    std::thread p1(producer, std::ref(queue));
    std::thread c1(consumer, std::ref(queue));

    p1.join();
    c1.join();

    return 0;
}

关键点总结

实现阻塞队列需要注意以下几点:

  • 使用 unique_lock 配合 condition_variable 的 wait 方法,避免忙等。
  • wait 的谓词应使用 lambda 表达式检查条件,防止虚假唤醒。
  • 每次修改队列后,通过 notify_one 或 notify_all 唤醒等待线程。
  • 考虑是否需要支持超时操作(如 put_with_timeout),可根据实际需求扩展接口。

基本上就这些。这个阻塞队列足够支撑大多数生产者-消费者场景,且线程安全、易于使用。