C++怎么实现一个线程安全的队列_C++使用互斥锁和条件变量构建并发队列

线程安全队列通过互斥锁保护共享数据、条件变量实现线程同步,确保生产者-消费者模型中多线程并发访问时的数据一致性与高效协作。

在C++多线程编程中,实现一个线程安全的队列是常见的需求。比如生产者-消费者模型就需要多个线程同时访问同一个队列,而不会引发数据竞争或状态不一致的问题。通过使用互斥锁(std::mutex)和条件变量(std::condition_variable),我们可以构建一个高效且安全的并发队列。

基本设计思路

线程安全队列的核心目标是:确保任意数量的生产者和消费者可以并发地入队和出队,而不会破坏内部数据结构或导致竞态条件。

关键点包括:

  • 使用互斥锁保护对队列的读写操作,防止多个线程同时修改队列。
  • 使用条件变量通知等待的线程——当队列为空时,消费者等待;当有新元素加入时,唤醒消费者。
  • 避免忙等待,提高效率。

使用 std::queue + std::mutex + std::condition_variable 实现

下面是一个完整的线程安全队列实现示例:

#include 
#include 
#include 
#include 

template
class ThreadSafeQueue {
private:
    std::queue data_queue;
    mutable std::mutex mtx;
    std::condition_variable cv;

public:
    ThreadSafeQueue() = default;

    void push(T value) {
        std::lock_guard lock(mtx);
        data_queue.push(std::move(value));
        cv.notify_one(); // 唤醒一个等待的消费者
    }

    bool try_pop(T& value) {
        std::lock_guard lock(mtx);
        if (data_queue.empty()) {
            return false;
        }
        value = std::move(data_queue.front());
        data_queue.pop();
        return true;
    }

    void wait_and_pop(T& value) {
        std::unique_lock lock(mtx);
        cv.wait(lock, [this] { return !data_queue.empty(); });
        value = std::move(data_queue.front());
        data_queue.pop();
    }

    bool empty() const {
        std::lock_guard lock(mtx);
        return data_queue.empty();
    }

    size_t size() const {
        std::lock_guard lock(mtx);
        return data_queue.size();
    }
};

各方法说明与使用建议

push(T value):将元素加入队列。加锁后插入,并调用 notify_one() 唤醒一个正在等待的消费者线程。

try_pop(T& value):非阻塞尝试弹出元素。如果队列为空返回 false,否则赋值并返回 true。适合不想阻塞的场景。

wait_and_pop(T& value):阻塞式出队。若队列为空,则等待直到有元素可用。内部使用 unique_lock 配合条件变量实现高效等待。

empty() 和 size():提供只读查询功能。注意这些值可能在调用后立即变化,仅作参考。

使用示例:生产者-消费者模型

以下是一个简单的多线程测试例子:

#include 

void producer(ThreadSafeQueue& queue) {
    for (int i = 0; i < 5; ++i) {
        queue.push(i);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer(ThreadSafeQueue& queue) {
    for (int i = 0; i < 5; ++i) {
        int value;
        queue.wait_and_pop(value);
        std::cout << "Consumed: " << value << '\n';
    }
}

int main() {
    ThreadSafeQueue queue;
    std::thread p(producer, std::ref(queue));
    std::thread c(consumer, std::ref(queue));

    p.join();
    c.join();

    return 0;
}

这个例子中,生产者每100ms放入一个数,消费者同步取出并打印。由于使用了条件变量,消费者不会占用CPU空转,而是被唤醒后处理数据。

基本上就这些。只要正确使用互斥锁保护共享状态,配合条件变量实现线程间通信,就能写出稳定高效的并发队列。实际项目中还可以扩展支持超时弹出(wait_for)、多消费者唤醒等特性。