C++/STL/Thread
外觀
< C++
thread 是w:C++標準程式庫中的一個w:頭文件,定義了w:C++11標準中的一些表示w:執行緒的類 、用於互斥訪問的類與方法等。
執行緒對象thread
[編輯]類thread表示一個執行緒。初始化時給出該執行緒的執行函數(或是可以調用的對象)。執行緒對象構造後即開始運行。默認情況下,C++11的如此創建的執行緒必須與父執行緒會合,即在父執行緒中調用thread::join()函數,或者與父執行緒分離,即調用thread::detach()函數。否則,如果子執行緒還在執行,父執行緒已經執行結束而撤銷,則會拋出異常。
如果子執行緒的執行函數需要參數,可把實參列表寫在thread對象構造函數的參數表中。
如果把可調用對象(callable object)作為參數傳給子執行緒的構造函數,則把該可調用對象複製一份給子執行緒。如果需要傳遞可調用對象的左值引用給子執行緒,則採用std::ref( )來產生對象的引用、然後把引用值再傳進去給子執行緒。
- 類thread的構造函數:
- thread() noexcept;預設構造函數,執行緒不可執行
- template <class Fn, class... Args> explicit thread (Fn&& fn, Args&&... args); 最常用的構造函數
- thread (const thread&) = delete; 禁止複製構造
- thread (thread&& x) noexcept; 允許移動構造
- 類thread的運算符
- thread& operator= (thread&& rhs) noexcept;
- thread& operator= (const thread&) = delete;
- 類thread的成員函數:
- thread::hardware_concurrency(): 靜態成員函數,返回當前計算機最大的硬體並發執行緒數目。基本上可以視為處理器的核心數目。
- thread::get_id(): 返回一個執行緒對象的id
- thread::joinable():檢查thread對象是否標識一個活動(active)的可行性行執行緒。具體說,判斷表達式
get_id() != std::thread::id()
。預設構造的thread對象、已經完成join的thread對象、已經detach的thread對象都不是joinable。 - thread::join():阻塞調用者(caller)所在的執行緒直至被join的std::thread對象標識的執行緒執行結束。
- thread::detach:執行執行緒與當前的std::thread對象分開,執行緒繼續獨立執行下去。執行緒執行結束時釋放分配的資源。
- thread::swap:執行緒swap
- thread::native_handle:返回native handle
this_thread命名空間
[編輯]在this_thread命名空間中定義了4個函數
- this_thread::get_id() 返回當前執行緒的id
- this_thread::yield() 暫時放棄一段 CPU 時間、讓給其他執行緒使用
- this_thread::sleep_until() 設定一個絕對時刻、讓執行執行緒在指定的時刻後再繼續執行
- this_thread::sleep_for() 停止一段指定的時長的執行
例子程序
[編輯]#include <iostream>
#include <thread>
using namespace std;
void test_func()
{
// do something
cout << "In thread." << endl;
}
int main( int argc, char** argv )
{
// execute thread
thread mThread( test_func );
// do somthing
cout << "main thread." << endl;
// wait the thread stop
mThread.join();
return 0;
}
執行緒對象析構時,如果joinable()為真,將導致執行terminate()。所以應該按照RAII管理進程對象:
#include <iostream>
#include <thread>
class ThreadRAII
{
std::thread & m_thread;
public:
ThreadRAII(std::thread & threadObj) : m_thread(threadObj)
{
}
~ThreadRAII()
{
// Check if thread is joinable then detach the thread
if(m_thread.joinable())
{
m_thread.detach();
}
}
};
void thread_function()
{
for(int i = 0; i < 10000; i++);
std::cout<<"thread_function Executing"<<std::endl;
}
int main()
{
std::thread threadObj(thread_function);
// If we comment this Line, then program will crash
ThreadRAII wrapperObj(threadObj);
return 0;
}
構造執行緒對象時,函數的參數哪怕是引用型,實際上也是給其構造了一個副本。為了確實傳引用,需要使用std::ref():
#include <iostream>
#include <thread>
void threadCallback(int const & x)
{
int & y = const_cast<int &>(x);
y++;
std::cout<<"Inside Thread x = "<<x<<std::endl;
}
int main()
{
int x = 9;
std::cout<<"In Main Thread : Before Thread Start x = "<<x<<std::endl;
std::thread threadObj(threadCallback,std::ref(x));
threadObj.join();
std::cout<<"In Main Thread : After Thread Joins x = "<<x<<std::endl;
return 0;
}
另外一個例子,構造了執行緒池:
#include<vector>
#include<functional>
#include<memory>
#include <atomic>
#include<list>
#include<mutex>
#include<thread>
#include<condition_variable>
#include <iostream>
using namespace std;
template<typename T>
class SyncQueue
{
public:
SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
{
}
void Put(const T&x)
{
Add(x);
}
void Put(T&&x)
{
Add(std::forward<T>(x));
}
void Take(std::list<T>& list)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
if (m_needStop)
return;
list = std::move(m_queue);
m_notFull.notify_one();
}
void Take(T& t)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });
if (m_needStop)
return;
t = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
}
void Stop()
{
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;
}
m_notFull.notify_all();
m_notEmpty.notify_all();
}
bool Empty()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
}
bool Full()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
}
size_t Size()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
}
int Count()
{
return m_queue.size();
}
private:
bool NotFull() const
{
bool full = m_queue.size() >= m_maxSize;
if (full)
cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
return !full;
}
bool NotEmpty() const
{
bool empty = m_queue.empty();
if (empty)
cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
return !empty;
}
template<typename F>
void Add(F&&x)
{
std::unique_lock< std::mutex> locker(m_mutex);
m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
if (m_needStop)
return;
m_queue.push_back(std::forward<F>(x));
m_notEmpty.notify_one();
}
private:
std::list<T> m_queue; //缓冲区
std::mutex m_mutex; //互斥量和条件变量结合起来使用
std::condition_variable m_notEmpty;//不为空的条件变量
std::condition_variable m_notFull; //没有满的条件变量
int m_maxSize; //同步队列最大的size
bool m_needStop; //停止的标志
};
const int MaxTaskCount = 100;
class ThreadPool
{
public:
using Task = std::function<void()>;
ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
{
Start(numThreads);
}
~ThreadPool(void)
{
//如果没有停止时则主动停止线程池
Stop();
}
void Stop()
{
std::call_once(m_flag, [this] {StopThreadGroup(); }); //保证多线程情况下只调用一次StopThreadGroup
}
void AddTask(Task&&task)
{
m_queue.Put(std::forward<Task>(task));
}
void AddTask(const Task& task)
{
m_queue.Put(task);
}
private:
void Start(int numThreads)
{
m_running = true;
//创建线程组
for (int i = 0; i <numThreads; ++i)
{
m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
}
}
void RunInThread()
{
while (m_running)
{
//取任务分别执行
std::list<Task> list;
m_queue.Take(list);
for (auto& task : list)
{
if (!m_running)
return;
task();
}
}
}
void StopThreadGroup()
{
m_queue.Stop(); //让同步队列中的线程停止
m_running = false; //置为false,让内部线程跳出循环并退出
for (auto thread : m_threadgroup) //等待线程结束
{
if (thread)
thread->join();
}
m_threadgroup.clear();
}
std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
SyncQueue<Task> m_queue; //同步队列
atomic_bool m_running; //是否停止的标志
std::once_flag m_flag;
};
void TestThdPool()
{
ThreadPool pool; bool runing = true;
std::thread thd1([&pool, &runing] {
while (runing)
{
cout << "produce " << this_thread::get_id() << endl;
pool.AddTask([] {
std::cout << "consume " << this_thread::get_id() << endl;
});
}
});
this_thread::sleep_for(std::chrono::seconds(10));
runing = false;
pool.Stop();
thd1.join();
getchar();
}
int main()
{
TestThdPool();
}