C++并发编程 等待与唤醒
条件变量
条件变量, 包括(std::condition_variable 和 std::condition_variable_any)
定义在 condition_variable 头文件中, 它们都需要与互斥量(作为同步工具)一起才能工作. std::condition_variable 允许阻塞一个线程, 直到条件达成.成员函数void wait(std::unique_lock<std::mutex>& lock);
等待, 通过 notify_one(), notify_all()或伪唤醒结束等待
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
等待, 通过 notify_one(), notify_all()被调用, 并且谓词为 true 时结束等待.
pred 谓词必须是合法的, 并且需要返回一个值, 这个值可以和bool互相转化
cv_status wait_until(std::unique_lock<std::mutex>& lock, const std::chrono::time_point<Clock, Duration>& absolute_time);
调用 notify_one(), notify_all(), 超时或线程伪唤醒时, 结束等待.
返回值标识了是否超时.
bool wait_until(std::unique_lock<std::mutex>& lock, const std::chrono::time_point<Clock, Duration>& absolute_time, Predicate pred);
等待, 通过 notify_one(), notify_all(), 超时, 线程伪唤醒, 并且谓词为 true 时结束等待.
cv_status wait_for(std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& relative_time);
调用 notify_one(), notify_all(), 指定时间内达成条件或线程伪唤醒时,结束等待
bool wait_for(std::unique_lock<std::mutex>& lock, const std::chrono::duration<Rep, Period>& relative_time, Predicate pred);
调用 notify_one(), notify_all(), 指定时间内达成条件或线程伪唤醒时,并且谓词为 true 时结束等待.
void notify_one() noexcept; 唤醒一个等待当前 std::condition_variable 实例的线程
void notify_all() noexcept; 唤醒所有等待当前 std::condition_variable 实例的线程 一个线程安全的队列设计:#ifndef _THREAD_SAFE_QUEUE_#define _THREAD_SAFE_QUEUE_#include <condition_variable>#include <mutex>#include <queue>#include <memory>template<typename Ty, typename ConditionVar = std::condition_variable, typename Mutex = std::mutex>class ThreadSafeQueue{typedef std::queue<Ty>Queue;typedef std::shared_ptr<Ty> SharedPtr;typedef std::lock_guard<Mutex>MutexLockGuard;typedef std::unique_lock<Mutex> MutexUniqueLock;public:explicit ThreadSafeQueue() {}~ThreadSafeQueue() {}ThreadSafeQueue(const ThreadSafeQueue&) = delete;ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;bool IsEmpty() const{MutexLockGuard lk(mMutex);return mQueue.empty();}void WaitAndPop(Ty& value){MutexUniqueLock lk(mMutex);mConVar.wait(lk, [this] {return !mQueue.empty(); });value = mQueue.front();mQueue.pop();}SharedPtr WaitAndPop(){MutexUniqueLock lk(mMutex);mConVar.wait(lk, [this] {return !mQueue.empty();});SharedPtr sp = std::make_shared<Ty>(mQueue.front());mQueue.pop();return sp;}bool TryPop(Ty& value){MutexLockGuard lk(mMutex);if (mQueue.empty())return false;value = mQueue.front();mQueue.pop();return true;}SharedPtr TryPop(){MutexLockGuard lk(mMutex);if (mQueue.empty())return false;SharedPtr sp = std::make_shared<Ty>(mQueue.front());mQueue.pop();return sp;}void Push(const Ty& value){MutexLockGuard lk(mMutex);mQueue.push(value);mConVar.notify_all();}private:mutable Mutex mMutex;ConditionVarmConVar;Queue mQueue;};#endif // _THREAD_SAFE_QUEUE_另一个版本, 使用 shared_ptr 作为成员对队列的性能有很大的提升, 其在push时减少了互斥量持有的时间, 允许其它线程在分配内存的同时,对队列进行其它操作.template<typename Ty, typename ConditionVar = std::condition_variable, typename Mutex = std::mutex>class ThreadSafeQueue{typedef std::shared_ptr<Ty> SharedPtr;typedef std::queue<SharedPtr> Queue;typedef std::shared_ptr<Ty> SharedPtr;typedef std::lock_guard<Mutex>MutexLockGuard;typedef std::unique_lock<Mutex> MutexUniqueLock;public:explicit ThreadSafeQueue() {}~ThreadSafeQueue() {}ThreadSafeQueue(const ThreadSafeQueue&) = delete;ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;bool IsEmpty() const{MutexLockGuard lk(mMutex);return mQueue.empty();}void WaitAndPop(Ty& value){MutexUniqueLock lk(mMutex);mConVar.wait(lk, [this] {return !mQueue.empty();});value = std::move(*mQueue.front());mQueue.pop();}SharedPtr WaitAndPop(){MutexUniqueLock lk(mMutex);mConVar.wait(lk, [this] {return !mQueue.empty();});SharedPtr sp = mQueue.front();mQueue.pop();return sp;}bool TryPop(Ty& value){MutexLockGuard lk(mMutex);if (mQueue.empty())return false;value = std::move(*mQueue.front());mQueue.pop();return true;}SharedPtr TryPop(){MutexLockGuard lk(mMutex);if (mQueue.empty())return false;SharedPtr sp = mQueue.front();mQueue.pop();return sp;}void Push(const Ty& value){SharedPtr p = std::make_shared<Ty>(value);MutexLockGuard lk(mMutex);mQueue.push(p);mConVar.notify_all();}private:mutable Mutex mMutex;ConditionVarmConVar;Queue mQueue;}; std::future
期望(std::future)可以用来等待其他线程上的异步结果, 其实例可以在任意时间引用异步结果.
C++包括两种期望, std::future(唯一期望) 和 std::shared_future(共享期望)
std::future 的实例只能与一个指定事件相关联.
std::shared_future 的实例能关联多个事件, 它们同时变为就绪状态, 并且可以访问与事件相关的任何数据.
在与数据无关的地方,可以使用 std::future<void> 与 std::shared_future<void> 的特化模板.
期望对象本身并不提供同步访问, 如果多个线程要访问一个独立的期望对象, 需要使用互斥体进行保护.std::packaged_task
std::packaged_task 可包装一个函数或可调用的对象, 并且允许异步获取该可调用对象产生的结果, 返回值通过 get_future 返回的 std::future 对象取得, 其返回的 std::future 的模板类型为 std::packaged_task 模板函数签名中的返回值类型.
std::packaged_task 对象被调用时, 就会调用相应的函数或可调用对象, 将期望置为就绪, 并存储返回值.
std::packaged_task 的模板参数是一个函数签名, 如 int(std::string&, double*), 构造 std::packaged_task 实例时必须传入一个可以匹配的函数或可调用对象, 也可以是隐藏转换能匹配的.std::packaged_task<std::string(const std::string&)> task([](std::string str) {std::stringstream stm;stm << "tid:" << std::this_thread::get_id() << ", str:" << str << std::endl;std::cout << stm.str();std::this_thread::sleep_for(std::chrono::seconds(1));return std::string("MSG:Hello");});std::future<std::string> f = task.get_future();std::thread t(std::move(task), std::string("package task test"));t.detach();// 调用 f.get 返回结果, 但是须阻塞等到任务执行完成std::cout << "main tid:" << std::this_thread::get_id() << ", result: " << f.get() << std::endl; std::promise
std::promise 类型模板提供设置异步结果的方法, 这样其他线程就可以通过 std::future 实例来索引该结果.class SquareRoot{std::promise<double>& prom;public: SquareRoot(std::promise<double>& p) : prom(p) {}~SquareRoot() {}void operator()(double x){if (x < 0){prom.set_exception(std::make_exception_ptr(std::out_of_range("x<0")));}else{double result = std::sqrt(x);prom.set_value(result);}}};std::promise<double> prom;SquareRoot p(prom);std::thread t(std::bind(&SquareRoot::operator(), &p, 1));//std::thread t(std::bind(&SquareRoot::operator(), &p, -1));std::future<double> f = prom.get_future();try{double v = f.get();std::cout << "value:" << v << std::endl;}catch (std::exception& e){std::cout << "exception:" << e.what() << std::endl;}t.join();本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-12/138004.htm