线程同步与互斥
mutex(互斥量)
大部分线程,线程使用的数据都是局部变量,变量的地址空间在线程空间内,这种情况,变量归属单个线程,其他线程无法获得这种变量。
但有时候,很多变量需要在线程空间内共享,这样的变量称为共享变量,可以通过数据的共享,完成线程之间的交互。
多个线程并发的操作共享变量,会带来一些问题。
例如
//操作共享变量会有问题的售票代码 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <pthread.h> int ticket = 100;//全局变量,都可以看到并修改 void *route(void *arg){ char *id = (char*)arg; while(1){ if(ticket > 0){ usleep(1000); printf("%s sells ticket:%d\n",id,ticket); --ticket; }else{ break; } } return NULL; } int main(){ pthread_t t1,t2,t3,t4; pthread_create(&t1,NULL,route,(void*)"thread 1"); pthread_create(&t2,NULL,route,(void*)"thread 2"); pthread_create(&t3,NULL,route,(void*)"thread 3"); pthread_create(&t4,NULL,route,(void*)"thread 4"); pthread_join(t1,NULL); pthread_join(t2,NULL); pthread_join(t3,NULL); pthread_join(t4,NULL); return 0; }运行结果是有问题的:
thread 2 sells ticket:5 thread 3 sells ticket:4 thread 4 sells ticket:3 thread 2 sells ticket:2 thread 1 sells ticket:1 thread 4 sells ticket:0 thread 3 sells ticket:-1 thread 2 sells ticket:-2//出现了负数产生这个问题的的原因:
if语句判断条件为真以后,代码可以并发的切换到其他进程。
usleep这个模拟漫长业务的过程,在这个漫长的业务过程中,可能有很多个线程会进入该代码段
--ticket操作本身就不是一个原子操作。
解决方法:
代码必须要有互斥行为:当代码进入临界区执行时,不允许其他线程进入该临界区。
如果多个线程同时要求执行临界区的代码,并且临界区没有线程在执行,那么只能允许一个线程进入该临界区。
如果线程不在临界区中执行,那么该新城不能阻止其他线程进入临界区
做到这三点的本质上是需要一把锁。Linux上提供了一把锁叫做互斥量。
互斥量的接口
初始化信号量:
方法1:静态分配:
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER方法2:动态分配:
int pthread_mutex_init(pthread_mutex_t *restrict mutex,const pthread_mutexattr_t *restrict attr)参数:mutex:要初始化的信号量 attr:NULL
销毁互斥量:
使用PTHREAD_MUTEX_INITIALIZER初始化的互斥量不需要销毁
不要销毁一个已经加锁的互斥量
已经销毁的互斥量,要确保后面不会有线程在尝试加速
int pthread_mutex_destroy(pthread_mutex_t *mutex)互斥量加锁和解锁
int pthread_mutex_lock(pthread_mutex_t *mutex); int pthread_mutex_unlock(pthread_mutex_t *mutex);都是成功返回0,失败返回错误号调用pthread_lock时,可能会遇到如下情况:
互斥量处于未锁状态,该函数会将互斥量锁定,同时返回成功
发起函数调用时,其他线程已经锁定互斥量,或者存在其他线程同时申请互斥量,但没有竞争到互斥量,那么pthread_lock就会陷入阻塞,等待互斥量解锁
改进版售票系统:
//改进版售票系统 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <pthread.h> int ticket = 100; pthread_mutex_t mutex; void *route(void* arg){ char* id = (char*)arg; while(1){ pthread_mutex_lock(&mutex);//申请 if(ticket > 0){ usleep(1000); printf("%s sells ticket:%d\n",id,ticket); --ticket; pthread_mutex_unlock(&mutex);//释放 }else { pthread_mutex_unlock(&mutex);//释放 break; } } return NULL; } int main(){ pthread_t t1,t2,t3,t4; pthread_mutex_init(&mutex,NULL); pthread_create(&t1,NULL,route,(void*)"thread 1"); pthread_create(&t2,NULL,route,(void*)"thread 2"); pthread_create(&t3,NULL,route,(void*)"thread 3"); pthread_create(&t4,NULL,route,(void*)"thread 4"); pthread_join(t1,NULL); pthread_join(t2,NULL); pthread_join(t3,NULL); pthread_join(t4,NULL); pthread_mutex_destroy(&mutex);//销毁 }结果正确:
thread 1 sells ticket:8 thread 1 sells ticket:7 thread 1 sells ticket:6 thread 1 sells ticket:5 thread 1 sells ticket:4 thread 1 sells ticket:3 thread 1 sells ticket:2 thread 1 sells ticket:1 [syf@dreame testlinux]$ cat newtrain.c条件变量
当一个线程互斥的访问某个变量时,他可能发现在其他线程改变状态之前自己什么也做不了
例如一个线程访问队列时,发现队列为空,他只能等待,直到其他线程将一个节点添加到队列中。这时候可以使用到条件变量
条件变量函数
初始化
int pthread_cond_init(pthread_cond_t *restrict cond,const pthread_condattr_t *restrict attr);参数:cond:要初始化的条件变量 attr:NULL
销毁
int pthread_cond_destroy(pthread_cond_t *cond)等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,pthread_mutex_t *restrict mutex);参数:cond:要在这个条件变量上等待 mutex:互斥量
唤醒等待
int pthread_cond_brodacast(pthread_cond_t *cond); int pthread_cond_signal(pthread_cond_t *cond);栗子:
//条件变量 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <pthread.h> pthread_cond_t cond; pthread_mutex_t mutex; void *r1(void *arg){ (void)arg; while(1){ pthread_cond_wait(&cond,&mutex); printf("活动\n"); } return NULL; } void *r2(void *arg){ (void)arg; while(1){ pthread_cond_signal(&cond); sleep(1); } } int main(){ pthread_t t1,t2; pthread_cond_init(&cond,NULL); pthread_mutex_init(&mutex,NULL); pthread_create(&t1,NULL,r1,NULL); pthread_create(&t2,NULL,r2,NULL); pthread_join(t1,NULL); pthread_join(t2,NULL); pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); }pthread_cond_wait需要互斥量的原因
条件等待是线程间同步的一种手段,如果只有一个线程,条件不满足,一直等下去都不会满足,所以必须要有一个线程通过某些操作,改变共享变量,使原先不满足的条件变的满足,并且友好的通知等待在条件变量上的线程
条件不会无缘无故突然满足了,必然会牵扯到共享数据的变化。所以一定要用互斥锁来保护。没有互斥锁就无法安全的获取和修改共享数据。
生产者消费者模型
三二一:三种关系,两个角色,一个交易场所。
栗子:
//生产消费模型 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <pthread.h> #define CONSUMERS_COUNT 2 #define PRODUCERS_COUNT 2 struct msg{ struct msg *next; int num; }; struct msg *head = NULL; pthread_cond_t cond; pthread_mutex_t mutex; pthread_t threads[CONSUMERS_COUNT+PRODUCERS_COUNT]; void *consumer(void *p){ int num = *(int*)p; free(p); struct msg *mp; for(; ;){ pthread_mutex_lock(&mutex); while(head == NULL){ printf("%d begin wait a condition...\n",num); pthread_cond_wait(&cond,&mutex); } printf("%d end wait a condition...\n",num); printf("%d begin consume product...\n",num); mp = head; head = mp->next; pthread_mutex_unlock(&mutex); printf("Consume %d\n",mp->num); free(mp); printf("%d end consume product...\n",num); sleep(rand()%5); } } void *producer(void *p){ struct msg *mp; int num = *(int*)p; free(p); for(; ;){ printf("%d begin producer product...\n",num); mp = (struct msg*)malloc(sizeof(struct msg)); mp->num = rand()00+1; printf("produce %d\n",mp->num); pthread_mutex_lock(&mutex); mp->next = head; head = mp; printf("%d end prodice product...\n",num); pthread_cond_signal(&cond); pthread_mutex_unlock(&mutex); sleep(rand()%5); } } int main() { srand((unsigned)time(NULL)); pthread_cond_init(&cond,NULL); pthread_mutex_init(&mutex,NULL); int i; for(i = 0;i<CONSUMERS_COUNT;++i){ int *p = (int*)malloc(sizeof(int)); *p = i; pthread_create(&threads[i],NULL,consumer,(void*)p); } for(i = 0;i<PRODUCERS_COUNT;++i){ int *p = (int*)malloc(sizeof(int)); *p = i; pthread_create(&threads[CONSUMERS_COUNT+i],NULL,producer,(void*)p); } for(i = 0;i<CONSUMERS_COUNT+PRODUCERS_COUNT;++i) pthread_join(threads[i],NULL); pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); return 0; }POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突访问共享资源的目的。但POSIX可以用于线程间同步
初始化信号量
#include <semaphore.h> int sem_init(sem_t *sem,int pshared,unsigned int value);参数: pshared:0表示线程间共享,非零表示进程间共享 value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem)等待信号量:等待信号量会将信号量的值减一
int sem_wait(sem_t *sem);发布信号量:表示资源使用完毕,可以归还资源,将信号量值加一
int sem_post(sem_t *sem);
读写锁:
在多线程中,有一种情况是十分常见的,有些公共数据修改的机会比较少。相比较改写,他们读的机会反而高得多。通常而言,在读的过程中,往往会伴随着查找的操作,中间耗时较长。如果给这种代码加锁,会极大地降低效率。所以引入了读写锁。读写锁本质上是一种自旋锁。
写独占,读共享,写锁优先级高
读写锁接口:
初始化:
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,const pthread_rwlockattr_t *restrict attr)销毁:
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock)加锁和解锁 int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);