跳转到内容

C++/STL/Thread

维基教科书,自由的教学读本
< C++

threadw:C++标准程式库中的一个w:头文件,定义了w:C++11标准中的一些表示w:线程的类 、用于互斥访问的类与方法等。

线程对象thread

[编辑]

类thread表示一个线程。初始化时给出该线程的执行函数(或是可以调用的对象)。线程对象构造后即开始运行。默认情况下,C++11的如此创建的线程必须与父线程会合,即在父线程中调用thread::join()函数,或者与父线程分离,即调用thread::detach()函数。否则,如果子线程还在执行,父线程已经执行结束而撤销,则会抛出异常。

如果子线程的执行函数需要参数,可把实参列表写在thread对象构造函数的参数表中。

如果把可调用对象(callable object)作为参数传给子线程的构造函数,则把该可调用对象复制一份给子线程。如果需要传递可调用对象的左值引用给子线程,则采用std::ref( )来产生对象的引用、然后把引用值再传进去给子线程。

  • 类thread的构造函数:
    • thread() noexcept;缺省构造函数,线程不可执行
    • template <class Fn, class... Args> explicit thread (Fn&& fn, Args&&... args); 最常用的构造函数
    • thread (const thread&) = delete; 禁止复制构造
    • thread (thread&& x) noexcept; 允许移动构造
  • 类thread的运算符
    • thread& operator= (thread&& rhs) noexcept;
    • thread& operator= (const thread&) = delete;
  • 类thread的成员函数:
    • thread::hardware_concurrency(): 静态成员函数,返回当前计算机最大的硬件并发线程数目。基本上可以视为处理器的核心数目。
    • thread::get_id(): 返回一个线程对象的id
    • thread::joinable():检查thread对象是否标识一个活动(active)的可行性行线程。具体说,判断表达式get_id() != std::thread::id()。缺省构造的thread对象、已经完成join的thread对象、已经detach的thread对象都不是joinable。
    • thread::join():阻塞调用者(caller)所在的线程直至被join的std::thread对象标识的线程执行结束。
    • thread::detach:执行线程与当前的std::thread对象分开,线程继续独立执行下去。线程执行结束时释放分配的资源。
    • thread::swap:线程swap
    • thread::native_handle:返回native handle

this_thread命名空间

[编辑]

在this_thread命名空间中定义了4个函数

  • this_thread::get_id() 返回当前线程的id
  • this_thread::yield() 暂时放弃一段 CPU 时间、让给其他线程使用
  • this_thread::sleep_until() 设定一个绝对时刻、让执行线程在指定的时刻后再继续执行
  • this_thread::sleep_for() 停止一段指定的时长的执行

例子程序

[编辑]
#include <iostream>
#include <thread>

using namespace std;

void test_func()
{
  // do something
  cout << "In thread." << endl;
}

int main( int argc, char** argv )
{
  // execute thread
  thread mThread( test_func );

  // do somthing
  cout << "main thread." << endl;

  // wait the thread stop
  mThread.join();

  return 0;
}

线程对象析构时,如果joinable()为真,将导致执行terminate()。所以应该按照RAII管理进程对象:

#include <iostream>
#include <thread>
class ThreadRAII
{
    std::thread & m_thread;
    public:
        ThreadRAII(std::thread  & threadObj) : m_thread(threadObj)
        {
            
        }
        ~ThreadRAII()
        {
            // Check if thread is joinable then detach the thread
            if(m_thread.joinable())
            {
                m_thread.detach();
            }
        }
};
void thread_function()
{
    for(int i = 0; i < 10000; i++);
        std::cout<<"thread_function Executing"<<std::endl;
}
 
int main()  
{
    std::thread threadObj(thread_function);
    
    // If we comment this Line, then program will crash
    ThreadRAII wrapperObj(threadObj);
    return 0;
}

构造线程对象时,函数的参数哪怕是引用型,实际上也是给其构造了一个副本。为了确实传引用,需要使用std::ref():

#include <iostream>
#include <thread>
void threadCallback(int const & x)
{
    int & y = const_cast<int &>(x);
    y++;
    std::cout<<"Inside Thread x = "<<x<<std::endl;
}
int main()
{
    int x = 9;
    std::cout<<"In Main Thread : Before Thread Start x = "<<x<<std::endl;
    std::thread threadObj(threadCallback,std::ref(x));
    threadObj.join();
    std::cout<<"In Main Thread : After Thread Joins x = "<<x<<std::endl;
    return 0;
}

另外一个例子,构造了线程池:

#include<vector> 
#include<functional>
#include<memory>
#include <atomic> 
#include<list>
#include<mutex>
#include<thread>
#include<condition_variable>
#include <iostream>
using namespace std;

template<typename T>
class SyncQueue
{
public:
	SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false)
	{
	}

	void Put(const T&x)
	{
		Add(x);
	}

	void Put(T&&x)
	{
		Add(std::forward<T>(x));
	}

	void Take(std::list<T>& list)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });

		if (m_needStop)
			return;
		list = std::move(m_queue);
		m_notFull.notify_one();
	}

	void Take(T& t)
	{
		std::unique_lock<std::mutex> locker(m_mutex);
		m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); });

		if (m_needStop)
			return;
		t = m_queue.front();
		m_queue.pop_front();
		m_notFull.notify_one();
	}

	void Stop()
	{
		{
			std::lock_guard<std::mutex> locker(m_mutex);
			m_needStop = true;
		}
		m_notFull.notify_all();
		m_notEmpty.notify_all();
	}

	bool Empty()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.empty();
	}

	bool Full()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size() == m_maxSize;
	}

	size_t Size()
	{
		std::lock_guard<std::mutex> locker(m_mutex);
		return m_queue.size();
	}

	int Count()
	{
		return m_queue.size();
	}
private:
	bool NotFull() const
	{
		bool full = m_queue.size() >= m_maxSize;
		if (full)
			cout << "full, waiting,thread id: " << this_thread::get_id() << endl;
		return !full;
	}

	bool NotEmpty() const
	{
		bool empty = m_queue.empty();
		if (empty)
			cout << "empty,waiting,thread id: " << this_thread::get_id() << endl;
		return !empty;
	}

	template<typename F>
	void Add(F&&x)
	{
		std::unique_lock< std::mutex> locker(m_mutex);
		m_notFull.wait(locker, [this] {return m_needStop || NotFull(); });
		if (m_needStop)
			return;

		m_queue.push_back(std::forward<F>(x));
		m_notEmpty.notify_one();
	}

private:
	std::list<T> m_queue; //缓冲区
	std::mutex m_mutex; //互斥量和条件变量结合起来使用
	std::condition_variable m_notEmpty;//不为空的条件变量
	std::condition_variable m_notFull; //没有满的条件变量
	int m_maxSize; //同步队列最大的size

	bool m_needStop; //停止的标志
};

 


const int MaxTaskCount = 100;
class ThreadPool
{
public:
	using Task = std::function<void()>;
	ThreadPool(int numThreads = std::thread::hardware_concurrency()) : m_queue(MaxTaskCount)
	{
		Start(numThreads);
	}

	~ThreadPool(void)
	{
		//如果没有停止时则主动停止线程池
		Stop();
	}

	void Stop()
	{
		std::call_once(m_flag, [this] {StopThreadGroup(); }); //保证多线程情况下只调用一次StopThreadGroup
	}

	void AddTask(Task&&task)
	{
		m_queue.Put(std::forward<Task>(task));
	}

	void AddTask(const Task& task)
	{
		m_queue.Put(task);
	}

private:
	void Start(int numThreads)
	{
		m_running = true;
		//创建线程组
		for (int i = 0; i <numThreads; ++i)
		{
			m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
		}
	}

	void RunInThread()
	{
		while (m_running)
		{
			//取任务分别执行
			std::list<Task> list;
			m_queue.Take(list);

			for (auto& task : list)
			{
				if (!m_running)
					return;

				task();
			}
		}
	}

	void StopThreadGroup()
	{
		m_queue.Stop(); //让同步队列中的线程停止
		m_running = false; //置为false,让内部线程跳出循环并退出

		for (auto thread : m_threadgroup) //等待线程结束
		{
			if (thread)
				thread->join();
		}
		m_threadgroup.clear();
	}

	std::list<std::shared_ptr<std::thread>> m_threadgroup; //处理任务的线程组
	SyncQueue<Task> m_queue; //同步队列     
	atomic_bool m_running; //是否停止的标志
	std::once_flag m_flag;
};

void TestThdPool()
{
	ThreadPool pool; bool runing = true;

	std::thread thd1([&pool, &runing] {
		while (runing)
		{
			cout << "produce " << this_thread::get_id() << endl;

			pool.AddTask([] {
				std::cout << "consume " << this_thread::get_id() << endl;
			});
		}
	});


	this_thread::sleep_for(std::chrono::seconds(10));
	runing = false;
	pool.Stop();

	thd1.join();
	getchar();
}

int main()
{
	TestThdPool();
	 
}

参考文献

[编辑]