本篇博客见《unix环境高级编程》的进程通信那一节,看到那一节就把这个写到博客里面啦,加深记忆。方便以后复习用……
消息队列,信号量,共享内存这三种我们称为XSI IPC,它们之间有很多相似之处。
在了解这三个之前,我们先来了解标识符和键。
每个内核中的IPC结构(这里指消息队列,信号量和共享存储段)都用一个非负整数的标识符加以引用。例如,为了对一个消息队列发送或读消息时,只需要知道
其队列的标识符。
标识符是IPC对象的内部名。为使多个合作进程能够在同一IPC对象上会和,需要提供一个外部名,因此使用了键(key),每个IPC对象都与一个键相关联,于是键
就用作为该对象的外部名。
无论何时创建IPC结构(调用 msgget、semget 或shmget),都应指定一个键(key),键的数据类型由系统规定为 key_t,通常在头文件<sys/types.h>中被规定为长整型。
键由内核变换成标识符。
三个get函数(msgget、semget和shmget)都有两个类似的参数:一个key和一个整型flag。如若满足下列两个条件之一,则创建一个新的IPC结构(服务器进程创建)
(1)key是IPC_PRIVATE
(2)key当前未与特定类型的IPC结构相结合,并且flag中指定了IPC_CREAT位。
为访问现存的队列,key必须等于创建该队列时所指定的键,并且不应该指定IPC_CREAT。
注意,访问一个现存的队列,决不能指定IPC_PRIVATE作为键。因为这是一个特殊的键值,它总是用于创建一个新队列。为了访问一个用IPC_PRIVATE键创建的现存队列,
一定要知道与该队列相结合的标识符,然后在其他IPC调用中(如msgsnd和msgrcv)使用该标识符。
权限:: XSI IPC权限
接下来我们来看消息队列:
消息队列是消息的链接表,存放在内核中并由消息队列标识符的标识。
每个队列都有一个msqid_ds结构与其相关联: struct msqid_ds{ struct ipc_perm msg_perm; //ipc结构 msgqnum_t msg_qnum; //队列中消息的数量 msglen_t msg_qbytes; //队列中最大的字节数 pid_t msg_lspid; //最后调用msgsnd的pid pid_t msg_lrpid; //最后调用msgrcv的pid time_t msg_stime; //最后调用msgsnd函数的时间 time_t msgrtime; //最后调用msgrcv函数的时间 time_t msg_ctime; //最后change的时间 ...... };消息队列的函数:
#include <sys/msg.h>
(1)msgget用于创建新队列或打开一个现存的队列。
int msgget(key_t key, int flag);
返回值:成功返回消息队列的ID,出错返回-1;此后该值就可被用于其他三个消息队列的函数。
参数:key:键,可以看上面的介绍,这个我们一般给一个非负整数,
flag:权限位
(2)msgctl函数对队列执行多种操作。
int msgctl(int msqid,int cmd,struct msqid_ds *buf);
返回值:成功返回0,出错-1
参数:msqid:msgget的返回值
cmd
(3)调用msgsnd将数据放到消息队列中。
int msgsnd(int msqid,const void *ptr,size_t nbyte,int flag);成功返回0,出错-1
每个消息都有三部分组成,正长整型类型字段,非负长度(nbyte),以及实际数据字节(对应于长度)。消息总是放在队列尾端。
ptr参数指向一个长整型数,它包含了正的整型消息类型,在其后紧跟消息数据,若发送的最长消息是128字节,则可定义下列结构:
struct mymesg
{
long mtype; //正的整型消息类型
char mtext[128]; //消息数据
};
ptr就是指向mymesg结构的指针。接受者可以使用消息类型以非先进先出的次序去消息。
参数flag的值可以指定为IPC_NOWAIT.这类似于文件I/O的非阻塞I/O标志。一般给0的话,即忽略。
当msgsnd成功返回,与消息队列相关的msqid_ds结构得到更新,以标明发出该调用的进程ID(msg_lspid),
进行该调用的时间(msg_stime),并指示队列中加了一条消息(msg_qnum)
(4)msgrcv从队列中取消息;
ssize_t msgrcv(int msqid,void *ptr,size_t nbytes, long type , int flag);
成功返回消息的数据部分的长度,出错返回-1;
ptr和msgsnd的prt一样;
nbyte:数据缓冲区的长度
type:我们可以指定想要哪一种消息:type == 0 返回队列中的第一个消息。
type > 0 返回队列中消息类型为type的第一个消息
type < 0 返回队列中消息类型值小于或等于type绝对值的消息,如果这种消息有若干个,则取类型值最小的消息。
当msgrcv成功返回,内核更新与消息队列相关的msqid_ds结构,以指示调用者进程ID(msg_lspid),
和调用的时间(msg_stime),并将队列中的消息数减一(msg_qnum)
下面简单代码实现消息队列:
创建消息队列,写入消息:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <sys/msg.h> struct mymesg //ptr指针指向的结构 { long mtype; char mtext[512]; }; void main() { int msgid = msgget((key_t)1234, 0664|IPC_CREAT); //打开消息队列,没有创建的话则创建 assert(msgid != -1); struct mymesg buf; buf.mtype = 1000; //消息类型1000 strcpy(buf.mtext, "hello"); //消息数据hello msgsnd(msgid, &buf, strlen(buf.mtext), 0); //将数据放到消息队列中 buf.mtype = 2000; //消息类型2000 strcpy(buf.mtext, "word");//消息数据word msgsnd(msgid, &buf, strlen(buf.mtext), 0);//将数据放到消息队列中 }从消息队列中取消息:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <sys/msg.h> struct node { long type; char buff[128]; }; void main() { int msgid = msgget((key_t)1234, 0664|IPC_CREAT); assert(msgid != -1); struct node buf; memset(&buf, 0, sizeof(buf)); msgrcv(msgid, &buf, sizeof(buf) - 1, 2000, 0); //从消息队列中取类型为2000的消息 printf("buf.type: %d\n", buf.type); printf("buf.buff: %s\n", buf.buff); }执行结果:
接下来我们看看信号量::
信号量与其他IPC不同的是,它是一个计数器,用于多进程对共享数据对象的访问。
为了获得共享资源,进程需要执行的操作:
(1)测试控制该资源的信号量。
(2)信号量的值为正时,则进程可以使用该资源。使用了,就将信号量减一(p操作)。
(3)信号量的值为0,则进程进入休眠状态,直至信号量的大于0。进程被唤醒后,返回至第(1)步
当进程不再使用该资源时,信号量的值加一(v操作)。如果有进程正在休眠等待此信号量,则唤醒进程。
信号量的测试及加一减一操作都是原子操作,信号量通常是在内核中实现。
常用的信号量形式被称为二元信号量或双态信号量,控制单个资源,初始值为1,。但是一般而言,信号量的初值可以是任意正数值,这说明
有多少个共享资源单位可供共享应用。
内核为每个信号量集合设置一个semid_ds结构: struct semid_ds{ struct ipc_perm sem_perm; unsigned short sem_nsems; //信号量在信号量集中的编号 time_t sem_otime; 最后调用semop()的时间。 time_t sem_ctime; 最后进行change的时间。 .... } 每个信号量由一个无名结构表示,它至少包含下列成员。 struct{ unsigned short semval; //信号量值,>=0 pid_t sempid; //最后使用信号量的pid unsigned short semcnt; //等待semval变为大于其当前值的线程或进程数 unsigned short semzcnt; //等待semval变成0的线程或进程数 }
XSI的信号量要复杂一些:
(1)信号量并非是单个非负值,而必须将信号量定义为含有一个或多个信号量值的集合。当创建一个信号量时,要指定集合中信号量值的数量。
(2)创建信号量与对其赋初值是分开的。
(3)即使没有进程正在使用各种形式的XSI IPC,它们仍然是存在的。有些程序在终止时并没有释放已经分配给它的信号量。
关于信号量的函数::
(1)要获得一个信号量的ID,要调用的函数semget
#include <sys/sem.h>
int semget(key_t key, int nsems, int flag);
返回值:成功返回信号量的ID,若出错则返回-1
nsems是该集合中的信号量数。如果引用一个现存的集合,则将nsems指定为0。如果是创建新集合,则必须指定nsems
(2)semctl函数包含了多种信号量的操作
int semctl(int semid, int semnum ,int cmd , /* union semun arg */);
第四个参数是可选的,如果使用该参数,则其类型是semun,它是多个特定命令参数的联合(union);
union semun { int val; //for SETVAL struct semid_ds *buf; //for IPC_STAT and IPC_SET unsigned short *array; //for GETALL and SETALL };cmd参数指定下列10种命令的一种, 在semid指定的信号量集合上执行此命令,其中有5条命令是针对一个特定的信号量值
的,他们用semnum指定该信号量集合中的一个成员。semnum值在0和nsems-1之间。(包含0和nsems-1) IPC_STAT:对此集合取semid_ds结构,并存放在由arg.buff指向的结构中。 IPC_SET:按由arg.buf指向结构中的值设置与此集合相关结构中的下面三个字段值:sem_perm.uid、sem_perm.gid和 sem_perm.mode。此命令只能由下列两种进程执行:一种是其有效用户ID等于sem_perm.cuid或sem_perm.uid的进程, 另一种是具有超级用户特权的进程。 IPC_RMID:从系统中删除该信号量集合。这种删除是立即发生的。仍在使用此信号集合的其他进程在他们下次试图对此信号量 集合进行操作时,将出错返回EIDRM。此命令只能由下列两种进程执行:一种是其有效用户ID等于sem_perm.cuid或 sem_perm.uid的进程;另一种是具有超级用户特权的进程。 GETVAL:返回成员semnum的semval值。 SETVAL:设置成员semnum的semval值。该值由arg.val指定。 GETPID:返回成员semnum的sempid值。 GETNCNT:返回成员semnum的semncnt值。 GETZCNT:返回成员semnum的semzcnt值。 GETALL:取该集合中所有信号量的值,并将他们存放在arg.array指向的数组中。 SETALL:按arg.array指向的数组中的值,设置该集合中所有信号量的值。 除GETALL以外的所有GET命令semctl函数都返回相应的值。其他命令返回值为0.
(3)函数semop自动执行信号量集合上的操作数组,这是个原子操作。
int semop(int semid,struct sembuf semoparray[ ] ,size_t nops);
返回值:成功返回0,出错返回-1
参数semoparray是一个指针,它指向一个信号量操作数组,信号量操作由sembuf结构表示:
struct sembuf { unsigned short sem_num; //信号集中的成员号(0,1....,nsems-1) short sem_op; //加减操作 short sem_flg; //IPC_NOWAIT, SEM_UNDO }; 参数nops规定该数组中的操作的数量(元素数)。 简单代码实现::我把信号量封装了一下。 信号量的声明:sem.h文件 #include <stdio.h> #include <stdlib.h> #include <assert.h> #include <string.h> #include <unistd.h> #include <sys/sem.h> union semun { int val; }; void seminit(); //初始化 void semp(); //减一操作 void semv(); //加一操作 void semdel(); //删除操作 sem.c文件 #include "sem.h" int semid = 0; void seminit() //初始化操作 { semid = semget((key_t)1234, 1, 0666); if(semid == -1) //打开现有的失败,创建信号量 { semid = semget((key_t)1234, 1, 0666 | IPC_CREAT); if(semid == -1) { perror(""); exit(0); } union semun v; v.val = 0; //设定初始值为0 semctl(semid, 0, SETVAL, v); } } void semp() //减一操作 { struct sembuf buf; buf.sem_num = 0; buf.sem_op = -1; buf.sem_flg = SEM_UNDO; semop(semid, &buf, 1); } void semv() //加一操作 { struct sembuf buf; buf.sem_num = 0; buf.sem_op = 1; buf.sem_flg = SEM_UNDO; semop(semid, &buf, 1); } void semdel() //删除信号量 { semctl(semid, 0, IPC_RMID, NULL); } 一个进程给一个文件里面写东西,当写的时候,另一个进程不能读,利用信号量 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <fcntl.h> #include "sem.h" void main() { int fd = open("a.txt", O_CREAT | O_WRONLY | O_TRUNC, 0664); assert(fd != -1); seminit(); //初始化信号量 while(1) { char buff[128] = {0}; printf("please input data: "); fflush(stdout); fgets(buff, 128, stdin); buff[strlen(buff) - 1] = 0; write(fd, buff, strlen(buff)); semv(); //写完给信号量值加一 if(strncmp(buff, "end", 3) == 0) { break; } } close(fd); } 写完了之后,信号量值加一,这时候读的那个进程就可执行了,读之前要对信号量值减一 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <fcntl.h> #include "sem.h" void main() { int fd = open("a.txt", O_CREAT | O_RDONLY , 0664); assert(fd != -1); seminit(); //初始化信号量 while(1) { char buff[128] = {0}; int n = 1, flag = 0; semp(); //减一操作 while(1) { n = read(fd, buff, 10); if(n == 0) { break; } if(strncmp(buff, "end", 3) == 0) { flag = 1; break; } printf("n = %d, buff = %s\n", n, buff); memset(buff, 0, 128); } if(flag) { break; } } close(fd); }共享存储::
共享存储允许两个或更多进程共享一给定的存储区。因为数据不需要在客户进程和服务器进程之间复制,所以共享存储是进程间通讯最快的一种。
使用共享存储必须是多进程之间同步访问共享存储区
即就是当我有一A进程正在将数据放入共享存储区,这个时候另一B进程是不能从共享存储区中去取数据,直到进程A写这一操作执行完
这里我用了信号量来实现进程的同步,信号量的操作利用上面信号量的代码::
简单代码实现如下:
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <sys/shm.h> #include "sem.h" void main() { int shmid = shmget((key_t)1234, 128, 0664 | IPC_CREAT); //获得一个共享存储标识符 assert(shmid != -1); char *ptr = (char *)shmat(shmid, NULL, 0); //将共享存储连接到内核分配的一块空间上 assert(ptr != (char *)-1); seminit(); //初始化信号量 while(1) { printf("please input: "); fflush(stdout); fgets(ptr, 128, stdin); ptr[strlen(ptr) - 1] = 0; semv(); //信号量值加一 if(strncmp(ptr, "end", 3) == 0) { break; } } shmdt(ptr); //对共享存储的操作已结束 } #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <string.h> #include <sys/shm.h> #include "sem.h" void main() { int shmid = shmget((key_t)1234, 128, 0664 | IPC_CREAT); assert(shmid != -1); char *ptr = (char *)shmat(shmid, NULL, 0); assert(ptr != (char *)-1); seminit(); while(1) { semp(); if(strncmp(ptr, "end", 3) == 0) { break; } printf("s = %s\n", ptr); } shmdt(ptr); shmctl(shmid, IPC_RMID, NULL); //从系统中删除共享存储段 semdel(); //删除信号量 }