C++/STL/Thread

維基教科書,自由的教學讀本
< C++

threadw:C++標準程式庫中的一個w:頭文件,定義了w:C++11標準中的一些表示w:線程的類 、用於互斥訪問的類與方法等。

線程對象thread[編輯]

類thread表示一個線程。初始化時給出該線程的執行函數(或是可以調用的對象)。線程對象構造後即開始運行。默認情況下,C++11的如此創建的線程必須與父線程會合,即在父線程中調用thread::join()函數,或者與父線程分離,即調用thread::detach()函數。否則,如果子線程還在執行,父線程已經執行結束而撤銷,則會拋出異常。

如果子線程的執行函數需要參數,可把實參列表寫在thread對象構造函數的參數表中。

如果把可調用對象(callable object)作為參數傳給子線程的構造函數,則把該可調用對象複製一份給子線程。如果需要傳遞可調用對象的左值引用給子線程,則採用std::ref( )來產生對象的引用、然後把引用值再傳進去給子線程。

  • 類thread的構造函數:
    • thread() noexcept;缺省構造函數,線程不可執行
    • template <class Fn, class... Args> explicit thread (Fn&& fn, Args&&... args); 最常用的構造函數
    • thread (const thread&) = delete; 禁止複製構造
    • thread (thread&& x) noexcept; 允許移動構造
  • 類thread的運算符
    • thread& operator= (thread&& rhs) noexcept;
    • thread& operator= (const thread&) = delete;
  • 類thread的成員函數:
    • thread::hardware_concurrency(): 靜態成員函數,返回當前計算機最大的硬件並發線程數目。基本上可以視為處理器的核心數目。
    • thread::get_id(): 返回一個線程對象的id
    • thread::joinable():檢查thread對象是否標識一個活動(active)的可行性行線程。具體說,判斷表達式get_id() != std::thread::id()。缺省構造的thread對象、已經完成join的thread對象、已經detach的thread對象都不是joinable。
    • thread::join():阻塞調用者(caller)所在的線程直至被join的std::thread對象標識的線程執行結束。
    • thread::detach:執行線程與當前的std::thread對象分開,線程繼續獨立執行下去。線程執行結束時釋放分配的資源。
    • thread::swap:線程swap
    • thread::native_handle:返回native handle

this_thread命名空間[編輯]

在this_thread命名空間中定義了4個函數

  • this_thread::get_id() 返回當前線程的id
  • this_thread::yield() 暫時放棄一段 CPU 時間、讓給其他線程使用
  • this_thread::sleep_until() 設定一個絕對時刻、讓執行線程在指定的時刻後再繼續執行
  • this_thread::sleep_for() 停止一段指定的時長的執行

例子程序[編輯]

#include <iostream>
#include <thread>

using namespace std;

void test_func()
{
  // do something
  cout << "In thread." << endl;
}

int main( int argc, char** argv )
{
  // execute thread
  thread mThread( test_func );

  // do somthing
  cout << "main thread." << endl;

  // wait the thread stop
  mThread.join();

  return 0;
}

線程對象析構時,如果joinable()為真,將導致執行terminate()。所以應該按照RAII管理進程對象:

#include <iostream>
#include <thread>
class ThreadRAII
{
    std::thread & m_thread;
    public:
        ThreadRAII(std::thread  & threadObj) : m_thread(threadObj)
        {
            
        }
        ~ThreadRAII()
        {
            // Check if thread is joinable then detach the thread
            if(m_thread.joinable())
            {
                m_thread.detach();
            }
        }
};
void thread_function()
{
    for(int i = 0; i < 10000; i++);
        std::cout<<"thread_function Executing"<<std::endl;
}
 
int main()  
{
    std::thread threadObj(thread_function);
    
    // If we comment this Line, then program will crash
    ThreadRAII wrapperObj(threadObj);
    return 0;
}

構造線程對象時,函數的參數哪怕是引用型,實際上也是給其構造了一個副本。為了確實傳引用,需要使用std::ref():

#include <iostream>
#include <thread>
void threadCallback(int const & x)
{
    int & y = const_cast<int &>(x);
    y++;
    std::cout<<"Inside Thread x = "<<x<<std::endl;
}
int main()
{
    int x = 9;
    std::cout<<"In Main Thread : Before Thread Start x = "<<x<<std::endl;
    std::thread threadObj(threadCallback,std::ref(x));
    threadObj.join();
    std::cout<<"In Main Thread : After Thread Joins x = "<<x<<std::endl;
    return 0;
}

另外一個例子,構造了線程池:

#include<vector> 
#include<functional>
#include<memory>
#include <atomic> 
#include<list>
#include<mutex>
#include<thread>
#include<condition_variable>
#include <iostream>
using namespace std;

template<typename T>
class SyncQueue
{
public:
	SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
	{
	}

	void Put(const T&x)
	{
		Add(x);
	}

	void Put(T&&x)
	{
		Add(std::forward<T>(x));
	}

	void Take(std::list<T>& list)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });

		if (m_needStop)
			return;
		list = std::move(m_queue);
		m_notFull.notify_one();
	}

	void Take(T& t)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });

		if (m_needStop)
			return;
		t = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}

	void Stop()
	{
		{
			std::lock_guard<std::mutex> locker(m_mutex);
			m_needStop = true;
		}
		m_notFull.notify_all();
		m_notEmpty.notify_all();
	}

	bool Empty()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.empty();
	}

	bool Full()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size() == m_maxSize;
	}

	size_t Size()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size();
	}

	int Count()
	{
		return m_queue.size();
	}
private:
	bool NotFull() const
	{
		bool full = m_queue.size() >= m_maxSize;
		if (full)
			cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
		return !full;
	}

	bool NotEmpty() const
	{
		bool empty = m_queue.empty();
		if (empty)
			cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
		return !empty;
	}

	template<typename F>
	void Add(F&&x)
	{
		std::unique_lock< std::mutex> locker(m_mutex);
		m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
		if (m_needStop)
			return;

		m_queue.push_back(std::forward<F>(x));
		m_notEmpty.notify_one();
	}

private:
	std::list<T> m_queue; //缓冲区
	std::mutex m_mutex; //互斥量和条件变量结合起来使用
	std::condition_variable m_notEmpty;//不为空的条件变量
	std::condition_variable m_notFull; //没有满的条件变量
	int m_maxSize; //同步队列最大的size

	bool m_needStop; //停止的标志
};

 


const int MaxTaskCount = 100;
class ThreadPool
{
public:
	using Task = std::function<void()>;
	ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
	{
		Start(numThreads);
	}

	~ThreadPool(void)
	{
		//如果没有停止时则主动停止线程池
		Stop();
	}

	void Stop()
	{
		std::call_once(m_flag, [this] {StopThreadGroup(); }); //保证多线程情况下只调用一次StopThreadGroup
	}

	void AddTask(Task&&task)
	{
		m_queue.Put(std::forward<Task>(task));
	}

	void AddTask(const Task& task)
	{
		m_queue.Put(task);
	}

private:
	void Start(int numThreads)
	{
		m_running = true;
		//创建线程组
		for (int i = 0; i <numThreads; ++i)
		{
			m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
		}
	}

	void RunInThread()
	{
		while (m_running)
		{
			//取任务分别执行
			std::list<Task> list;
			m_queue.Take(list);

			for (auto& task : list)
			{
				if (!m_running)
					return;

				task();
			}
		}
	}

	void StopThreadGroup()
	{
		m_queue.Stop(); //让同步队列中的线程停止
		m_running = false; //置为false,让内部线程跳出循环并退出

		for (auto thread : m_threadgroup) //等待线程结束
		{
			if (thread)
				thread->join();
		}
		m_threadgroup.clear();
	}

	std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
	SyncQueue<Task> m_queue; //同步队列     
	atomic_bool m_running; //是否停止的标志
	std::once_flag m_flag;
};

void TestThdPool()
{
	ThreadPool pool; bool runing = true;

	std::thread thd1([&pool, &runing] {
		while (runing)
		{
			cout << "produce " << this_thread::get_id() << endl;

			pool.AddTask([] {
				std::cout << "consume " << this_thread::get_id() << endl;
			});
		}
	});


	this_thread::sleep_for(std::chrono::seconds(10));
	runing = false;
	pool.Stop();

	thd1.join();
	getchar();
}

int main()
{
	TestThdPool();
	 
}

參考文獻[編輯]