定义:
线程是比进程更小的程序执行单位多个线程可共享全局数据,也可使用专有数据线程各自有独立的栈,但共享进程的堆;内核线程
操作系统内核支持多线程调度与执行内核线程使用资源较少,仅包括内核栈和上下文切换时需要的保存寄存器内容的空间轻量级进程(lightweight process , LWP)
由内核支持的独立调度单元,调度开销小于普通的进程系统支持多个轻量级进程同时进行,每个都与特定的内核线程相关联用户线程
建立在用户空间的多个用户级线程,映射到轻量级进程后执行用户线程在用户空间创建、同步和销毁,开销较低每个线程具有独特的ID使用说明
线程功能本身不属于C/C++标准库,连接时需要**-pthread**选项线程功能属于C++11标准库,可用C++11提供的thread类定义线程对象,C++11标准库同时提供基本的线程同步机制进程与线程的比较
线程空间不独立,有问题的线程会影响其他线程;进程空间独立,有问题的进程一般不会影响其他的进程创建进程需要额外的性能线程用于开发细颗粒度并行性,进程用于开发粗颗粒度并行性线程容易共享数据,进程共享数据必须使用进程间通讯机制线程创建函数
头文件:“pthread.h”原型:int pthread(pthread_t* thread ,const pthread_attr_t* attr , void* (*start_routine)(void*) , void* arg);线程创建流程
定义指向pthread_t对象的指针对象,pthread_t 用于存储新线程的ID定义指向线程属性 pthread_attr_t 对象的指针对象;线程属性对象控制线程与程序其他部分(可能是其他线程)的交互;如果传递NULL,则使用缺省属性构造新线程定义指向线程函数的指针对象,使其指向固定格式的线程函数实现线程函数;线程函数参数和返回值均为哑型指针;需要传递多个参数时,打包成单个 void* 型的指针对象线程退出时使用返回值将数据传递给主调线程;多个结果同样可以打包传递线程创建说明
pthread_create( ) 函数在线程创建完毕后立即返回,并不等待线程结束原线程与新线程如何执行与调度有关,程序不得依赖线程先后执行的关系可以使用同步机制确定线程的先后执行关系线程退出方式
线程函数结束执行调用pthread_exit() 显式结束被其他线程撤销 #include <pthread.h> #include <iostream> void* PrintAs(void* unused) { while(true) std::cerr << 'a' ; return NULL; } void* PrintZs(void* unused) { while(true) std::cerr << 'z'; return NULL; } int main() { pthread_t thread_id; //定义线程ID pthread_create(&thread_id , NULL , &PrintAs ,NULL); PrintZs(NULL); //主函数打印'z',子线程打印'a' return 0; }线程函数参数
#include <pthread.h> #include <iostream> cladd InfoPrinted { public: InfoPrinted(char c,int n):_c(c),_n(n) {}; void Show() const{for(int i = 0;i<_n;i++) std::cerr << _c;} private: char _c; int _n; }; //reinterpret_cast 将一个类型指针转换为另一个类型指针 //const_cast 用于去除指针变量的常属性,将它转换为一个对应指针类型的普通变量,反过来也可以将一个非常量指针转换为一个常量指针变量 //static_cast 用于转换基本类型和具有继承关系的类新之间转换,不太用于指针类型的之间的转换 //dynamic_cast 只能在继承类对象的指针之间或引用之间进行类型转换 //以上只有dynamic_cast这种转换并非在编译时,而是在运行时,动态的。其它均在编译时 void* PrintInfo(void* info) { InfoPrinted* p = reinterpret_cast<InfoPrinted*>(info); if(p) p->Show(); return NULL; } int main() { pthread_t tid1,tid2; //构造InfoPrinted类的动态对象,作为线程函数参数传递给 tid1; //输出100个 'a' InfoPrinted* p = new InfoPrinted('a',100); pthread_create(&tid1,NULL,&PrintInfo,reinterpret_cast<void*>(p));//reinterpret_cast<void*>(p)线程执行函数的参数 //构造InfoPrinted类的动态对象,作为线程函数参数传递给 tid2; //输出100个 'z' InfoPrinted* q = new InfoPrinted('z',100); pthread_create(&tid2,NULL,&PrintInfo,reinterpret_cast<void*>(q)); // 子线程需要使用主线程数据,如果主线程结束,子线程就访问不到了。使用 pthread_join 函数 pthread_join(tid1,NULL); pthread_join(tid2,NULL); return 0; }**pthread_join()**函数:等待子进程结束
原型: int pthread_join(pthread_t thread,void** retval);参数:thread为pthread_t类型的线程ID;retval接收线程返回值,不需要接收返回值时传递NULL线程函数返回值
#include <pthread.h> #include <cmath> #include <iostream> //判断素数 //参数传递和返回值都为哑型指针,所以作转型 void* IsPrime(void* n) { unsigned int p = reinterpret_cast<unsigned int>(n); unsigned int i = 3u, t = (unsigned int)sqrt(p) + 1u; if(p == 2u) return reinterpret_cast<void*>(true); if(p % 2u == 0u) return reinterpret_cast<void*>(false); while(i<=t) { if(p % i == 0u) return reinterpret_cast<void*>(false); i += 2u; } return reinterpret_cast<void*>(true); } //使用g++ main.cpp -pthread -lm -fpermissive编译 //以防止编译器将void*到int的转型当作错误 int main() { pthread_t tids[8]; bool primalities[8]; int i; for(i=0;i<8;i++) pthread_create(&tids[i]),NULL,&IsPrimem,reinterpret_cast<void*>(i+2)); for(i=0;i<8;i++) pthread_join(tids[i];reinterpret_cast<void**>(&primalities[i])); for(i=0;i<8;i++) std::cout<<primalities[i]<<""; std::cout<<std::endl; return 0; }线程ID pthread_equal()函数:确认两个线程是否相同
原型: int pthread_equal(pthread_t t1,pthread_t t2);pthread_self()函数:返回当前线程的ID
原型:pthread_t pthread_self();示例:if(!pthread_equal(pthread_self(),other_tid)) pthread_join(other_tid,NULL)线程属性 线程属性:精细调整线程的行为
设置线程属性的流程:
创建 pthread_attr_t 类型的对象调用 pthread_attr_init() 函数初始化线程的缺省属性,传递指向该线程属性对象的指针。 int pthread_attr_init(pthread_attr_t* attr);对线程属性进行必要的修改调用 pthread_creat() 函数时传递指向线程属性对象的指针调用 pthread_attr_destroy() 函数清除线程属性对象,pthread_attr_t 对象本身并没有被销毁,因而可以调用 pthread_attr_init() 函数再次初始化int pthread_attr_destroy(pthread_attr_t* attr)线程属性说明
单一线程属性对象可以用于创建多个线程线程创建后,继续保留线程属性对象本身并没有意义线程最重要的属性为 分离状态(detach state)线程分类
可连线程(joinable thread):缺省设置,终止时并不自动清除(类似僵尸进程),主线程必须调用 pthread_join() 获取其返回值,此后才能清除分离线程(detached thread):结束时自动清除,不能调用 pthread_join() 进行线程同步可连线程可通过 pthread_detach() 函数分离,分离线程不能再次联结int pthread_detach(pthread_t thread);pthread_attr_setdetachstate() 函数:设置线程分离属性
原型:int pthread_attr_setdetachstate(pthread_attr_t* attr,int detachstate)传递线程属性对象指针和分离线程设置参数 PTHREAD_CREATE_DETACHEDpthread_attr_getdetachstate() 函数:获取线程分离属性
原型:int pthread_attr_getdetachstate(pthread_attr_t* attr,int* detachstate); #include <pthread.h> //线程函数 void* ThreadFunc(void* arg) { ... } int main() { pthread_attr_t thread; pthread_t thread; //设置线程初始化属性 pthread_attr_init(&attr); //设置线程属性的分离状态 pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); //创建线程 pthread_create(&thread,&attr,&ThreadFunc,NULL); //清除线程属性对象 pthread_attr_destroy(&attr); //无需连接该线程 return 0; }pthread_setcancletype() 函数:设置线程的撤销类型
原型:int pthread_setcancletype(int type,int* oldtype)参数: type 为撤销类型, oldtype 用于保存原始线程撤销类型,NULL表示不保存PTHREAD_CANCLE_ASYNCHRONOUS: 线程异步可撤PTHREAD_CANCLE_DEFERRED:线程同步可撤销,即延迟到下一撤销点撤销pthread_setcanclestate() 函数:设置线程的撤销状态
原型: int pthread_setcanclestate(int state,int* oldstate);第一个参数 state 为可撤销状态,第二个参数 oldstate 用于保存原始线程可撤销状态,NULL表示不保存PTHREAD_CANCLE_ENANLE: 线程可撤销PTHREAD_CANCLE_DISABLE: 线程不可撤销线程的撤销状态可多次设置pthread_testcancel() 函数:设置撤销点
原型: void pthread_testcancel() ;在线程函数中调用 pthread_testcancel() 函数设置撤销点建议:周期性地设置撤销点,保证线程函数内部每隔一些代码就有一个撤销点,以保证资源能够正确释放**使用撤销状态构造临界区(critical section)
临界区:要么全部执行,要么都不执行的代码设置线程的撤销状态,线程一旦进入临界区,就必须等到离开临界区,才可以被撤销。 //账户转账 void Transfer(double* accounts, int from,int to,double amount) { int ocs; //数据有效性代码在此,确保转账操作合法有效 //将线程设置为不可撤销的,进入临界区 pthread_setcancelstate(PTHREAD_CANCEL_DISABLE,&ocs); accounts[to] += amount; accounts[from] -= amount; //恢复线程的撤销状态,离开临界区 pthread_setcancelstatee(ocs,NULL); }线程局部存储 线程局部存储(thread local storage , TSL) : 每个线程的独有数据:
线程特定数据(thread-specific data)进程的多个线程通过全局堆共享全局数据对象让线程拥有数据的独立副本:不能简单赋值或读取
pthread_key_create()函数:为线程特定数据创建一个键参数:第一个为指向 pthread_key_t 类型变量的指针(每个线程都可以使用它访问自己的独立数据副本);第二个为指向线程清除函数的指针,若不存在未NULLpthread_setspecific()函数:设置对应键的值pthread_getspecific()函数:读取对应键的值 #include <pthread.h> #include <stdio.h> static pthread_key_t tlk; void WriteToThreadLog(const char* msg) { FILE* fp = (FILE*)pthread_getspecific(tlk); fprintf(fp,"%d:%s\n",(int)pthread_self(),msg); } void CloseThreadLog(void* fp) { fclose((FILE*)fp); } void* ThreadFunc(void* args) { char filename[255]; FILE* fp; //生成与线程ID配套的日志文件名 sprintf(filename,"thread%d.log",(int)pthread_self()); fp = fopen(filename,"w") //设置线程日志文件指针与键的局部存储关联 pthread_setspecific(tlk,fp); //向日志中写入数据,不同的线程会写入不同的文件 WriteToThreadLog("Thread starting..."); return NULL; } int main() { int i; pthread_t thread[8]; //创建键,使用CloseThreadLog()函数作为其清除程序 pthread_key_create(&tlk,CloseThreadLog); for(i = 0;i < 8;++i) pthread_create(&thread[i],NULL,ThreadFunc,NULL); for(i = 0;i < 8;++i) pthread_join(thread[i],NULL); pthread_key_delete(tlk); return 0; }线程清除 线程清除函数:回调函数,单 void* 参数,无返回值
目的:销毁线程退出或被撤销时未释放的资源pthread_cleanup_push() 函数:注册线程清除函数
原型:void pthread_cleanup_push(void* (*routine)(void*),void* arg);参数: routine 为指向线程清除函数的函数指针,arg 为传递给回调函数的附加数据对象pthread_cleanup_pop() 函数:取消线程清除函数注册
原型:void pthread_clean_pop(int execute);参数:整型值,非0调用回调函数释放资源,0不释放 #include <malloc.h> #include <pthread.h> void* AllocateBuffer(size_t size) { return malloc(size); } void DeallocateBuffer(void* buffer) { free(buffer); } void DoSomeWork() { void* temp_buffer = AllocateBuffer(1024); //注册清除处理函数 pthread_cleanup_push(DeallocateBuffer,temp_buffer); //此处可调用pthread_exit()退出线程或者撤销线程 //取消注册,传递非0值,实施清除任务 pthread_cleanup_pop(1); }C++的问题
对象的析构函数在线程退出时可能没有机会被调用,因而线程栈上的数据未清除解决方法:保证线程资源被正确释放
定义异常类,线程在准备退出时引发异常,然后在异常处理中退出线程执行引发异常类,C++确保析构函数被调用 #include <pthread.h> class EThreadExit{ public: EThreadExit(void* ret_val):_thread_ret_val(ret_val) {} //实际退出线程,使用对象构造时的返回值 void* DoThreadExit() { pthread_exit(_thread_ret_val);} private: void* _thread_ret_val; } void* ThreadFunc(void* arg) { try{ if(线程需要立即退出) throw EThreadExit(线程返回值); } catch(const EThreadExit &e){ e.DoThreadExit(); //执行线程实际退出动作 } return NULL; }资源竞争
编程任务:
存在一个任务队列,多个并发线程同时处理这些任务,每个线程在完成某项任务后,检查任务队列中是否有新任务,如果有,就处理,并将该任务从任务队列中删除假设:两个线程碰巧完成各自任务,但队列中只有一个任务可能情况:第一个线程发现任务队列非空,准备接收该任务,但未完成全部设置。此时,操作系统碰巧中断该进程。第二个线程获得了执行,也发现任务队列非空,同样准备接收该任务,但发现已经无法正确设置任务队列最坏情况:第一个线程已经从队列中摘取了任务,但还没有将任务队列设置为空,第二个线程对任务队列的访问导致段错误,系统崩溃。互斥
互斥(mutex)定义与性质:MUTial EXclusion
相互独占锁,与二元信号量类似一次只有一个线程可以锁定数据对象,并访问只有该线程释放锁定,其他线程才能访问该数据对象pthread_mutex_init()函数:初始化互斥
原型:int pthread_mutex_init(pthread_mutex_t* mutex,const pthread_mutex_t* mutexattr);参数: mutex 为互斥对象,mutexattr 为互斥对象属性,NULL 表示使用缺省属性可以使用预定义宏 PTHREAD_MUTEX_INITIALIZER 初始化互斥pthread_mutex_destroy()函数:销毁互斥
原型:int pthread_mutex_destroy(pthread_mutex_t* mutex);pthread_mutex_lock()函数:互斥加锁
原型:int pthread_mutex_lock(pthread_mutex_t* mutex);如果无法锁定,则调用将阻塞,至该互斥被解除锁定状态pthread_mutex_trylock()函数:互斥加锁
原型:int pthread_mutex_trylock(pthread_mutex_t* mutex)如果无法锁定,则立即返回,不阻塞pthread_mutex_unlock()函数:互斥解锁
原型:int pthread_mutex_unlock(pthread_mutex_t* mutex);使用互斥流程:
定义 pthread_mutex_t 类型的变量,将其地址作为第一个参数传给 pthread_mutex_init() 函数;初始化函数只需调用一次锁定或尝试锁定该互斥;获得访问权后,执行正常程序代码;并在执行完毕后解锁互斥属性:
pshared 属性:进程共享属性; 取值:PTHREAD_PROCESS_PRIVATE(本进程内部共享) type 属性:互斥类型 //完成程序代码 #include <pthread.h> #include <iostream.h> #include <list> struct Job{ Job(int x = 0,int y = 0):x(x),y(y) { } int x,y; } //一般要求临界区代码越短越好,执行时间越短越好,使用C++ STL可能并不是好选择 std::list<Job*> job_queue; pthread_mutex_t job_queue_mutex = PTHREAD_MUTEX_INITIALIZER; //控制作业数目的信号量 sem_t job_queue_count; //此处作业处理工作仅为示例,简单输出线程ID和作业内容信息 void ProcessJob(Job* job) { std::cout << "Thread" <<(int)pthread_self(); std::cout << "process("<<job->x<<","<<job->y<<")\n"; } //处理作业时需要加锁 void* DequeueJob(void* arg) { while (true) { Job* job = NULL; sem_wait(&job_queue_count); //等待作业队列中有新作业 pthread_mutex_lock(&job_queue_mutex); if(!job_queue.empty()){ job = job_queue.front();//获取表头元素 job_queue.pop_front();//删除表头元素 } pthread_mutex_unlock(&job_queue_mutex); if(!job) break; ProcessJob(job); delete job,job = NULL; } return NULL; } // 作业入队时需要加锁 void* EnqueueJob(void* arg) { Job* job = reinterpret_cast<Job*>(arg); pthread_mutex_lock(&job_queue_mutex); //锁定互斥 job_queue.push_back(job); //新作业入队,递增信号量 sem_post(&job_queue_count); //入队时也输出线程ID和作业内容信息 std::cout << "Thread" <<(int)pthread_self(); std::cout <<"enqueueing("<<job->x<<","<<job->y<<")\n"; pthread_mutex_unlock(&job_queue_mutex); //解锁 return NULL; } int main() { int i; pthread_t thread[8]; if(!job_queue.empty()) job_queue.clear(); sem_init(&job_queue_count,0,0); //初始化,非进程共享,初始值0 for(i=0;i<5;++i) { Job* job = new Job(i+1,(i+1)*2); pthread_create(&thread[i],NULL,EnqueueJob,job); } for(i=5,i<8,++i) { pthread_create(&thread[i],NULL,DequeueJob,NULL); } for(i=0,i<8,++i) pthread_join(thread[i],NULL); //等待线程终止,无作业时线程被阻塞 sem_destroy(&job_queue_count); //销毁作业信号量 return 0; }死锁
定义:资源被竞争占用,且无法释放 处理策略:更改互斥类型
创建互斥实行 pthread_mutexattr_t 型的对象调用pthread_mutexattr_init() 函数初始化互斥属性对象,传递其地址调用 pthread_mutexattr_setkind_np() 函数设置互斥类型,函数第一个参数为指向互斥属性对象的指针,第二个参数为PTHREAD_MUTEX_RECURSIVE_NP (递归互斥) 或PTHREAD_MUTEX_ERRORCHECK_NP(检错互斥)调用 pthread_mutexattr_destroy()函数销毁互斥属性对象信号量
问题:如何确保任务队列中有任务可以做?
如果队列中没有任务,线程可能退出,后续任务出现时,没有线程可以执行它POSIX标准信号量:头文件 semaphore.h
用于多个线程的同步操作操作方法比进程信号量简单初始化信号量
原型:int sem_init(sem_t* sem, int pshared, unsigned int value);原型: sem为信号量对象,pshared 为共享属性,value 为信号量初始值等待信号量:P 操作
原型: int sem_wait(sem_t* sem);原型: int sem_trywait(sem_t* sem);原型: int sem_timewait(sem_t* sem,const struct timespec* abs_timeout);说明:sem_wait() 在无法操作时阻塞, sem_trywait()则立即返回,sem_timewait() 与 sem_wait() 类似,但有时间限制发布信号量: V 操作
原型: int sem_post(sem_t* sem);销毁信号量
原型: int sem_destroy(sem_t* sem);条件变量
条件变量的功能与目的
互斥用于同步线程对共享数据对象的访问条件变量用于在线程间公布数据共享数据对象的值初始化条件变量
原型:int pthread_cond_init(pthread_cond_t* cond,const pthread_condattr_t* cond_attr);可使用宏 PTHREAD_COND_INITIALIZER 代替销毁条件变量
原型:int pthread_cond_destroy(pthread_cond_t* cond);广播条件变量
原型:int pthread_cond_broadcast(pthread_cond_t* cond);以广播方式唤醒所有等待目标条件变量的线程唤醒条件变量
原型:int pthread_cond_signal(pthread_cond_t* cont);等待条件变量
原型:int pthread_cond_wait(pthread_cond_t* cond,pthread_mutex_t* mutex);参数:mutex 为互斥,以确保函数操作的原子性支持平台无关的并行程序开发 库 :atomic thread mutex condition_variable future
thread:std::thread 类与 std::this_thread 名空间mutex :互斥相关类,包括std::mutex系列类,std::lock_guard类,std::unique_lock类及其他型式和函数condition_variable: 条件变量类,包括 std::condition_variable 类与std::condition_variable_any 类atomic: std::atomic 类与std::atomic_flag 类,另外还有一套C风格的原子型式和原子操作函数future:包含两个承诺类(std::promise 类,std::package_task类),两个期许类(std::future类,std::shared_future类)及相关型式和函数线程类:thread
支持的线程函数无参数和返回值型式的特别要求,有无参数均可,返回值有无亦可与Linux线程机制相比,C++11线程类更易用线程局部存储使用 thread_local 关键字可派生自己的 thread 类,但实现上需特别注意线程类应支持移动语义,但不应支持拷贝语义常用线程类成员函数
判断线程是否可连:bool thread::joinanle()等待线程结束: void thread::join()分离线程: void thread::detach()定义于名空间 this_thread 的线程管理函数
获取线程ID: thread::id get_id();在处于等待状态时,让调度器选择其他线程执行: void yield();阻塞当前线程制定时长: template<typename_Rep,typename_Period> void sleep_for(const chrono::duration<_Rep,_Period>&_rtime)阻塞当前线程至指定时点: template<typename_Clock,typename_Duration> void sleep_until(const chrono::time_point<_Clock,_Duration>&_atime); // 无参数线程函数 #include <iostream> #include <thread> void ThreadFunc() { std::cout<<"Thread ID:"<<std::this_thread::get_id()<<std::endl; } int main() { std::thread t(&ThreadFunc); //创建线程对象并运行 t.join(); // 等待线程结束 return 0; } // 带双参数的线程函数 #include <iostream> #include <thread> void ThreadFunc(int a,int b) { std::cout<<"Thread ID:"<<std::this_thread::get_id()<<std::endl; std::cout<<a<<"+"<<b<<"="<<a+b<<std::endl; } int main() { int m = 10,n = 20;//C++11标准库使用可变参数的模板形式参数列表,线程函数参数个数任意 std::thread t(&ThreadFunc,m,n); //创建线程对象并运行 t.join(); // 等待线程结束 return 0; } // 带双参数的函子对象 #include <iostream> #include <thread> class Functor{ public: void operator()(int a,int b){ std::cout<<"Thread ID:"<<std::this_thread::get_id()<<std::endl; std::cout<<a<<"+"<<b<<"="<<a+b<<std::endl; } }; int main() { int m = 10,n = 20;//C++11标准库使用可变参数的模板形式参数列表,线程函数参数个数任意 std::thread t(Functor(),m,n); //创建线程对象并运行 t.join(); // 等待线程结束 return 0; } // 使用std::bind()函数绑定对象及其普通成员函数 #include <iostream> #include <thread> class Worker(){ public: Worker(int a = 0,int b = 0):_a(a),_b(b){ } void ThreadFunc(){...} private: int _a,b; }; int main() { Worker worker(10,20); std::thread t(std::bind(&Worker::ThreadFunc,&worker)); t.join(); return 0; }基本互斥: mutex 类
核心成员函数: lock(), try_lock()和 unlock()上述成员函数无参数,无返回值递归互斥: recursive_mutex 类
允许单个线程对互斥进行多次加锁与解锁处理定时互斥: timed_mutex 类
在某个时段或者某个时刻前获取互斥当线程在临界区操作的时间非常长,可以用定时锁指定时间定时递归互斥: recursive_timed_mutex 类
综合 timed_mutex 和 recursive_mutex #include <iostream> #include <thread> #include <mutex> #include <vector> std::mutex x; void ThreadFunc() { x.lock(); std::cout << std::this_thread::get_id()<< "is entering...">>std::endl; std::this_thread::sleep_for(std::chrono::seconds(3)); std::cout << std::this_thread::get_id() << "is leaving" << std::endl; x.unlock; } int main() { std::vector<std::thread*> v(8); for(int i = 0;i < 8;i++) v[i] = new std::thread(ThreadFunc); for(int i = 0;i < 8;i++) v[i]->join(); }互斥的问题: 容易导致死锁
若某个线程在临界区内操作异常,有可能无法解锁,导致其他线程被永久阻塞若临界区代码都多路分支,其中部分分支提前结束,但没有执行解锁操作,其他线程依然被永久阻塞当多个线程同时申请多个资源时,加锁次序不同也可能导致死锁资源获取即初始化(resource acquisition is initialization,RAII)
使用互斥对象管理类模板自动管理资源基于作用域的锁管理类模板: std::lock_guard
构造时是否加锁可选,不加锁时假定当前线程已获得锁的所有权,析构时自动解锁,所有权不可转移,对象生存期内不允许手动加锁和解锁 void some_operation(const std::string &message) { static std::mutex mutex; std::lock_guard<std::mutex> lock(mutex); // ...操作 // 当离开这个作用域的时候,互斥锁会被析构,同时unlock互斥锁 // 因此这个函数内部的可以认为是临界区 }由于 C++ 保证了所有栈对象在声明周期结束时会被销毁,所以这样的代码也是异常安全的。无论 some_operation() 正常返回、还是在中途抛出异常,都会引发堆栈回退,也就自动调用了 unlock()。
独一锁管理类模板:std::unique_lock
构造时是否加锁可选,对象析构时如果持有锁会自动解锁,所有权可转移,对象生存期内允许手动加锁和解锁std::unique_lock 则相对于 std::lock_guard 出现的,std::unique_lock 更加灵活,std::unique_lock 的对象会以独占所有权(没有其他的 unique_lock 对象同时拥有某个 mutex 对象的所有权)的方式管理 mutex 对象上的上锁和解锁的操作。所以在并发编程中,推荐使用 std::unique_lock #include <iostream> #include <thread> #include <mutex> std::mutex mtx; void block_area() { std::unique_lock<std::mutex> lock(mtx); //...临界区 } int main() { std::thread thd1(block_area); thd1.join(); return 0; }互斥管理策略
延迟: std::defer_lock ,构造互斥管理对象时延迟加锁操作尝试: std::try_to_lock ,构造互斥管理对象时尝试加锁操作,但不阻塞线程,互斥不可用时立即返回接收: std::adopt_lock ,假定当前进程已获得互斥所有权,不再加锁缺省行为:构造互斥管理对象时没有传递管理策略标签参数,阻塞当期线程至成功获得互斥互斥的解锁时机
当使用C++的互斥自动管理策略时,只有析构互斥管理对象时才自动释放互斥,因此要特别注意互斥的持有时间;若线程持有互斥的时间过长,有可能极大降低程序效率解决方案:使用复合语句块或选用辅助函数封装临界区操作;动态创建互斥管理对象,并尽早动态释放多个互斥的竞争访问
多个线程对多个互斥加锁时顺序保持一致,以避免可能的死锁使用 std::lock() 或 std::try_lock() //使用互斥管理策略类重新实现线程函数 template<typename T> class Worker { public: explicit Worker(int no,T a = 0,T b = 0 ): _no(no), _a(a), _b(b) { } void ThreadFunc(T* r) { { //使用复合语句块封装临界区操作,块结束时即释放局部对象 std::lock_guard<std::mutex> locker(x) //构造对象的同时加锁 *r = _x + _y; }//无需手工解锁,locker对象在析构时自动解锁 } private: int _no; T _a,_b; }; #include <iostream> #include <mutex> #include <thread> class Account { public: explicit Account(double balance):_balance(balance) { } double GetBlance() {return _balance;} void Increase(double amount) {_balance += amount;} void Decrease(double amount) {_balance -= amount;} std::mutex & GetMutex() {return _x;} private: double _balance; std::mutex x; }; //避免死锁,使用 std::lock() 函数锁定多个互斥,不同的锁定顺序不会导致死锁 //加锁时有可能引发异常,std::lock 函数会处理该异常 //将解锁此前已加锁的部分互斥,然后重新引发该异常 void Transfer(Account & from ,Account & to,double amount) { std::unique_lock<std::mutex> locker1(from.GetMutex(),std::adopt_lock); std::unique_lock<std::mutex> locker1(to.GetMutex(),std::adopt_lock); from.Decrease(amount); to.Increase(amount); } int main() { Account a1(100.0) , a2(200.0); //线程参数采用值传递机制,如果要传递引用,调用 std::ref() 函数 std::thread t1(Transfer,std::ref(a1),std::ref(a2),10.0); std::thread t2(Transfer,std::ref(a2),std::ref(a1),20.0); t1.join(); t2.join(); return 0; }std::condition_variable 类
必须与 std::unique_lock 配合使用std::condition_variable_any 类
更加通用的条件变量,可以与任意型式的互斥锁配合使用,相比前者使用时会有额外的开销多线程通信同步原语
阻塞一个或多个线程至收到来自其他线程的通知,超时或发生虚假唤醒两者具有同样的成员函数,且在等待条件变量前都必须要获得相应的锁成员函数 notify_one(): 通知一个等待线程
原型: void notify_one() noexcept;成员函数 notify_all() :通知全部线程等待
原型: void motify_all() noexcept;成员函数 wait() : 阻塞当前线程至被唤醒
原型: template<typename Lock> void wait(Lock &lock);原型:template<typename Lock,typename Predicate> void wait(Lock &lock ,Predicate p);成员函数 waitfor() :阻塞至被唤醒或超过指定时长
成员函数 waituntil() :阻塞至被唤醒或到达指定时点
例1:
#include <iostream> #include <mutex> #include <thread> #include <string> #include <condition_variable> #include <chrono> //两个线程共享的全局变量 std::mutex mutex; std::condition_variable cv; std::string data; bool ready = false; bool processed = false; //工作线程 void Worker(){ std::unique_lock<std::mutex> lock(mutex); cv.wait(lock,[]{return ready}); //等待主线程发送数据 std::cout<<"工作线程正在处理数据。。。"<<std::endl;//等待后继续拥有线程 std::this_thread::sleep_for(std::chrono::seconds(1));//睡眠一秒以模拟数据处理 data += "已处理"; processed = true;//数据发回主线程; std::cout<<"工作线程通知数据已经处理完毕。"<<std::endl; lock.unlock();//通知前,手动解锁以防正在等待的线程被唤醒后又立即被阻塞; cv.notify_one(); } //主线程 int main(){ std::thread worker(Worker); //把数据发给工作线程 { std::lock_guard<std::mutex> lock(mutex); std::cout<<"主线程正在准备数据..."<<std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); data = "样本数据"; ready = true; std::cout<<"主线程通知数据已经准备完毕"<<std::endl; } cv.notify(); //等待工作线程处理数据 { std::unique_lock<std::mutex> lock(mutex); cv.wait(lock,[]{return processed;}; } std::cout<<<"回到主线程,数据="<<data<<std::endl; worker.join(); return 0; } /* 输出: 主线程正在准备数据... 主线程通知数据已经准备完毕。 工作线程正在处理数据... 工作线程通知数据已经处理完毕。 回到主线程,数据 = 样本数据 已处理 */注意:
与条件变量搭配使用的「锁」,必须是 unique_lock,不能用 lock_guard。等待前先加锁。等待时,如果条件不满足,wait 会原子性地解锁并把线程挂起。条件变量被通知后,挂起的线程就被唤醒,但是唤醒也有可能是假唤醒,或者是因为超时等异常情况,所以被唤醒的线程仍要检查条件是否满足,所以 wait 是放在条件循环里面。cv.wait(lock, [] { return ready; }); 相当于:while (!ready) { cv.wait(lock); }例2:
#include <iostream> #include <mutex> #include <thread> #include <condition_variable> std::mutex x; std::condition_variable cond; bool ready = false; bool IsReady() {return ready;} void Run(int no) { std::unique_lock<std::mutex> locker(x); while (!ready) // 若标志位非 true,阻塞当前进程 cond.wait(locker); //解锁并睡眠,被唤醒后重新加锁 // 以上两行代码等价于 cond.wait(locker,&IsReady); // 第二个参数为谓词,亦可使用函子实现 std::std::cout << "thread"<<no<< '\n'; } int main() { std::thread threads[8]; for(i = 0;i < 8; ++i) threads[i] = std::thread(Run,i); std::cout<<"8 threads ready...\n"; { std::unique_lock<std::mutex> locker(x); //互斥加锁 ready = true; // 设置全局标志位为 true cond.notify_all(); //唤醒所有线程 } //离开作用域,自动解锁;可将此复合语句块实现为函数 // 基于区间的循环结构,对属于 threads 数组的所有元素t,执行循环体 for(auto &t:threads) t.join(); return 0; }使用方法
使用atomic模板定义原子对象使用预定义标准原子型式 :atomic_bool ,atomic_char ,atomic_int, atomic_uint, atomic_long, atomic_wchar_t 等等意义:轻量级,支持单变量上的原子操作
线程返回值
为支持跨平台,thread 类无属性字段保存线程函数的返回值解决方案
使用指针型式的函数参数使用期许: std::future 类模板使用承诺: std::promise 类模板指针型式参数
// 使用指针作为函数参数,获取线程计算结果 #include <iostream> #include <mutex> #include <thread> #include <vector> #include <tuple> std::mutex x; // 劳工线程类模板,处理T型数据对象 template<typename T> class Worker { public: explicit Worker(int no,T a = 0, T b = 0) : _no(no),_a(a),_b(b) {} void ThreadFunc(T *r) {x.lock();*r = _a + _b; x.unlock();} private: int _no; // 线程编号 T _a,_b; // 保存在线程中的待处理数据 }; int main() { // 定义能够存储8个三元组的向量V,元组首元素为指向劳工对象的指针, // 次元素保存该劳工对象计算后的结果数据,尾元素为指向劳工线程对象的指针 // 向量中的每个元素都表示一个描述线程运行的线程对象 // 该线程对象对应的执行具体任务的劳工对象,及该劳工对象运算后的返回值 std::vector<std::tuple<Worker<int>*,int,std::thread*>> v(8); //构造三元组向量,三元编号依次为 0,1,2 for(int i=0;i<8;i++) v[i] = std::make_tuple(new Worker<int>(i,i+1,i+2),0,nullptr); // 输出处理前结果;使用 std::get<n>(v[i]) 获取向量的第i个元组的第n个元素 // 三元编号为0,1,2,因而1号元保存的将是劳工对象运算后的结果 for(int i=0;i<8;i++) std::cout<<"No."<<i<<":result = "<<std::get<1>(v[i])<<std::endl; // 创建8个线程分别计算 for(int i=0;i<8;i++) { // 将劳工类成员函数绑定为线程函数,对应劳工对象绑定为执行对象 // 将构造线程对象时传递的附加参数作为被绑定的线程函数的第一个参数 // auto 表示由编译器自动推断f的型式 auto f = std::bind(&Worker<int>::ThreadFunc,std::get<0>(v[i]),std::placeholders::_1)=) // 动态构造线程对象,并保存到向量的第i个三元组中 // 传递三元组的1号元地址,即将该地址作为线程函数的参数 // 线程将在执行时将结果写入该地址 // 此性质由绑定函数 std::bind() 使用占位符 std::placeholders::_1 指定 // 线程对象为2号元,即三元组的最后一个元素 std::get<2>(v[i]) = new std::thread(f,&std::get<1>(v[i])); } for(int i=0;i<8;i++) { // 等待线程结束 std::get<2>(v[i]) ->join(); //销毁劳工对象 delete std::get<0>(v[i]), std::get<0>(v[i])=nullptr; //销毁线程对象 delete std::get<2>(v[i]), std::get<2>(v[i])=nullptr; } //输出线程计算后的结果 for(int i=0;i<8;i++) std::cout<<"No.>"<<i<<":result = "<< std::get<1>(v[i])<<std::endl; return 0; }期许
std::future类模板
用途:
获取异步操作结果,提供了一个访问异步操作结果的途径延迟引发线程一个异步操作异常使用方法
定义期许模板类的期许对象使用std::async()函数的返回值初始化调用期许对象的成员函数get()获取线程返回值试想,如果我们的主线程 A 希望新开辟一个线程 B 去执行某个我们预期的任务,并返回我一个结果。而这时候,线程 A 可能正在忙其他的事情,无暇顾及 B 的记过,所以我们会很自然的希望能够在某个特定的时间获得线程 B 的结果。
在 C++11 的 std::future 被引入之前,通常的做法是:创建一个线程A,在线程A里启动任务 B,当准备完毕后发送一个事件,并将结果保存在全局变量中。而主函数线程 A 里正在做其他的事情,当需要结果的时候,调用一个线程等待函数来获得执行的结果。
而 C++11 提供的 std::future 简化了这个流程,可以用来获取异步任务的结果。自然地,我们很容易能够想象到把它作为一种简单的线程同步手段。
此外,std::packaged_task 可以用来封装任何可以调用的目标,从而用于实现异步的调用。例如:
#include <iostream> #include <future> #include <thread> int main() { // 将一个返回值为7的 lambda 表达式封装到 task 中 // std::packaged_task 的模板参数为要封装函数的类型 std::packaged_task<int()> task([](){return 7;});//类模板 std::packaged_task 包装任何可调用 目标(函数、 lambda 表达式、 bind 表达式或其他函数对象),使得能异步调用它。其返回值或所抛异常被存储于能通过 std::future 对象访问的共享状态中。 // 获得 task 的 future std::future<int> result = task.get_future(); // 在一个线程中执行 task std::thread(std::move(task)).detach(); std::cout << "Waiting..."; result.wait();//等待结果变得可用 // 输出执行结果 std::cout << "Done!" << std:: endl << "Result is " << result.get() << '\n'; } //在封装好要调用的目标后,可以使用 get_future() 来获得一个 std::future 对象,以便之后事实线程同步。 // 使用期许对象获取线程返回值 #include <iostream> #include <exception> #include <thread> #include <future> unsigned long int CalculateFactorial(short int n) { unsigned long int r = 1; if(n>20) throw std::range_error("The number is too big"); for(short int i = 1;i < 20;i++) r* = i; return r; } int main() { short int n = 20; // 启动异步线程,执行后台计算任务,并返回 std::future 对象 std::future<unsigned long int> f = std::async(CalculateFactorial,n); try { // 获取线程返回值,若线程已结束,立即返回,否则等待该线程计算完毕 // 若线程引发异常,则延迟到 std::future::get() 或 std::future::wait() 调用时引发 unsigned long int r = f.get(); std::cout<<n<<"i="<<r<<std::endl; } catch(const std::range_error &e){ std::cerr<<e.what()<<std::endl; } return 0; }std::promise 类模板
目的:承诺对象允许期许对象获取线程对象创建的线程返回值使用方法:
创建承诺 std::promise对象获取该承诺对象的相关期许 std::future 对象创建线程对象,并传递承诺对象线程函数内部通过承诺模板类的成员函数 set_value(),set_value_at_thread_exit(),set_exception() 或set_exception_at_thread_exit() 设置值或异常通过期许对象等待并获取异步操作结果 // 使用期许对象获取线程返回值 #include <iostream> #include <exception> #include <thread> #include <future> unsigned long int CalculateFactorial(short int n) { unsigned long int r = 1; if(n>20) throw std::range_error("The number is too big"); for(short int i = 1;i < 20;i++) r* = i; return r; } // CalculateFactorial()函数的包装函数原型 void DoCalculateFactorial(std::promise<unsigned long int> && promise,short int n) { try { // 设置线程返回值,供期许对象获取 promise.set_value(CalculateFactorial(n)); } catch(...) { //捕获全部异常,并在期许获取线程返回值时重新引发 promise.set_exception(std::current_exception()); } } int main() { short int n = 6; std::promise<unsigned long int> p; //创建承诺对象 std::future<unsigned long int> f = p.get_future; // 获取相关期许对象 std::thread t(DoCalculateFactorial,std::move(p),n); t.detach(); try{ // 获取线程返回值,若线程已结束,立即返回,否则等待该线程计算完毕 // 若线程引发异常,则延迟到 std::future::get() 或 std::future::wait() 调用时引发 unsigned long int r = f.get(); std::cout<<n<<"i="<<r<<std::endl; } catch(const std::range_error &e){ std::cerr<<e.what()<<std::endl; } return 0; }