基于消息队列的多进程服务器

xiaoxiao2025-06-04  51

目录

一、思路二、实现1. 原始代码2. 修改2.1 思路2.2 代码

一、思路

参考:Linux进程间通信(IPC)编程实践(五)消息队列实现回射客户/服务器

1)server进程接收时, 指定msgtyp为0, 从队首不断接收消息; 2)server进程发送时, 将mtype指定为接收到的client进程的pid; 3)client进程发送的时候, mtype指定为自己进程的pid; 4)client进程接收时, 需要将msgtyp指定为自己进程的pid, 只接收消息类型为自己pid的消息。

问题:

但上述程序是存在死锁的风险的,当同时开了多个客户端,将队列写满了,此时服务器端想要写入就会阻塞,而因为客户端一旦发送了数据就阻塞等待服务器端回射类型为pid的消息,即队列的消息不会减少,此时就会形成死锁,互相等待。非阻塞方式发送也不行,因为队列已满,会发生EAGAIN错误。

改进:

某个客户端先创建一个私有消息队列,然后将私有消息队列标识符和具体数据通过共享的队列发送给Server,服务器fork 出一个子进程,此时根据私有队列标识符就可以将数据回射到这个队列,这个客户端就可以从私有队列读取到回射的数据。

二、实现

1. 原始代码

server.c

/* Server */ #include <stdlib.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/types.h> #include <unistd.h> #include <errno.h> #include <string.h> #define ERR_EXIT(m) \ do { \ perror(m); \ exit(EXIT_FAILURE); \ } while(0) #define MSGMAX 8192 struct msgbuf { long mtype; char mtext[MSGMAX]; }; void echo_ser(int msgid) { struct msgbuf msg; memset(&msg, 0, sizeof(msg)); int nrcv ; while (1) { if ((nrcv = msgrcv(msgid, &msg, sizeof(msg.mtext), 1, 0)) < 0); //指定接受优先级为1的(msgtyp) int pid = *((int *)msg.mtext); fputs(msg.mtext + 4, stdout); msg.mtype = pid; msgsnd(msgid, &msg, nrcv, 0); memset(&msg, 0, sizeof(msg)); } } int main(int argc, char *argv[]) { int msgid; msgid = msgget(1234, IPC_CREAT | 0666); //创建一个消息队列 if (msgid == -1) ERR_EXIT("msgget"); echo_ser(msgid); return 0; } //--------------------- //作者:NK_test //来源: //原文:https://blog.csdn.net/NK_test/article/details/49866113 //版权声明:本文为博主原创文章,转载请附上博文链接!

client.c

#include <stdio.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/types.h> #include <unistd.h> #include <errno.h> #include <string.h> #define ERR_EXIT(m) \ do { \ perror(m); \ exit(EXIT_FAILURE); \ } while(0) #define MSGMAX 8192 struct msgbuf { long mtype; char mtext[MSGMAX]; }; void echo_cli(int msgid) { int nrcv; int pid = getpid(); struct msgbuf msg; memset(&msg, 0, sizeof(msg)); msg.mtype = 1; *((int *)msg.mtext) = pid; while (fgets(msg.mtext + 4, MSGMAX, stdin) != NULL) //客户端输入信息 { if (msgsnd(msgid, &msg, MSGMAX, IPC_NOWAIT) < 0) ERR_EXIT("msgsnd"); memset(msg.mtext + 4, 0, MSGMAX - 4); if ((nrcv = msgrcv(msgid, &msg, MSGMAX, pid, 0)) < 0) ERR_EXIT("msgsnd"); fputs(msg.mtext + 4, stdout); memset(msg.mtext + 4, 0, MSGMAX - 4); } } int main(int argc, char *argv[]) { int msgid; msgid = msgget(1234, 0); //打开名为1234的消息队列 if (msgid == -1) ERR_EXIT("msgget"); echo_cli(msgid); return 0; } //--------------------- //作者:NK_test //来源: //原文:https://blog.csdn.net/NK_test/article/details/49866113 //版权声明:本文为博主原创文章,转载请附上博文链接!

2. 修改

2.1 思路

2.2 代码

server.c

/* Server */ #include <stdlib.h> #include <stdio.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/types.h> #include <unistd.h> #include <errno.h> #include <string.h> #include <signal.h> #define MAX 512 #define ERR_EXIT(m) \ do \ { \ perror(m); \ exit(EXIT_FAILURE); \ } while (0) struct msgbuf { long mtype; char mtext[1]; }; void echo_ser(int pmsgid) { int nrcv; struct msgbuf *pmsg_snd; struct msgbuf *pmsg_rcv; char text[MAX]; while (1) { pmsg_rcv = (struct msgbuf *)malloc(sizeof(struct msgbuf) + MAX); nrcv = msgrcv(pmsgid, pmsg_rcv, MAX, 0, 0); if (nrcv == -1) { ERR_EXIT("pmsg_rcv error"); } printf("Client: %s", pmsg_rcv->mtext); free(pmsg_rcv); strcpy(text,"Received. "); pmsg_snd = (struct msgbuf *)malloc(sizeof(struct msgbuf) + sizeof(text)); pmsg_snd->mtype = 2; strcpy(pmsg_snd->mtext, text); nrcv = msgsnd(pmsgid, pmsg_snd, sizeof(text), IPC_NOWAIT); if (nrcv == -1) ERR_EXIT("pmsg_snd error"); free(pmsg_snd); } } int main(int argc, char *argv[]) { struct msgbuf *msg; int nrcv; int pid; int pmsgid; //创建一个消息队列 int msgid; msgid = msgget(1234, IPC_CREAT | 0666); printf("%d\n",msgid); if (msgid == -1) ERR_EXIT("msgget error"); //忽略子进程退出的细节 signal(SIGCHLD, SIG_IGN); while (1) //接收到新的连接消息 { msg = (struct msgbuf *)malloc(sizeof(struct msgbuf) + MAX); //接受连接, mtype=1 nrcv = msgrcv(msgid, msg, MAX, 1, 0); if (nrcv == -1){ ERR_EXIT("msgrcv error"); } pmsgid = *((int *)msg->mtext); printf("\npmsgid = %d \n", pmsgid); pid = fork(); if (pid == 0) //子进程 { echo_ser(pmsgid); } } return 0; }

client.c

#include <stdio.h> #include <stdlib.h> #include <sys/ipc.h> #include <sys/msg.h> #include <sys/types.h> #include <unistd.h> #include <errno.h> #include <string.h> #define MAX 512 #define ERR_EXIT(m) \ do \ { \ perror(m); \ exit(EXIT_FAILURE); \ } while (0) struct msgbuf { long mtype; char mtext[1]; }; void echo_cli(int pmsgid) { int nrcv; struct msgbuf *pmsg_snd; struct msgbuf *pmsg_rcv; char text[MAX]; printf("Enter some text: "); while (fgets(text, MAX, stdin) != NULL) //客户端输入信息 { pmsg_snd = (struct msgbuf *)malloc(sizeof(struct msgbuf) + sizeof(text)); pmsg_snd->mtype = 2; strcpy(pmsg_snd->mtext, text); nrcv = msgsnd(pmsgid, pmsg_snd, sizeof(text), IPC_NOWAIT); if (nrcv == -1) ERR_EXIT("pmsg_snd error"); free(pmsg_snd); pmsg_rcv = (struct msgbuf *)malloc(sizeof(struct msgbuf) + MAX); nrcv = msgrcv(pmsgid, pmsg_rcv, MAX, 0, 0); if (nrcv == -1) { ERR_EXIT("pmsg_rcv error"); } printf("Server: %s\nEnter some text:", pmsg_rcv->mtext); free(pmsg_rcv); } } int main(int argc, char *argv[]) { int pmsgid; int msgid; int ret; struct msgbuf *msg; //创建一个私有消息队列 pmsgid = msgget(IPC_PRIVATE, IPC_CREAT | 0666); if (pmsgid == -1) ERR_EXIT("msgget error"); printf("message %d queue created!\n", pmsgid); //将私有消息队列标识符和具体数据通过共享的队列发送给Server //打开名为1234的消息队列 msgid = msgget(1234, 0); if (msgid == -1) ERR_EXIT("msgget error"); msg = (struct msgbuf *)malloc(sizeof(struct msgbuf) + sizeof(pmsgid)); msg->mtype = 1; *((int *)msg->mtext) = pmsgid; printf("%d\n", *((int *)msg->mtext)); //发送消息 ret = msgsnd(msgid, msg, sizeof(pmsgid), IPC_NOWAIT); if (ret == -1) { ERR_EXIT("msgsnd error"); } free(msg); //发送完毕,释放内存 printf("msg send.\n"); echo_cli(pmsgid); ret = msgctl(pmsgid, IPC_RMID, NULL); if (ret == -1) { ERR_EXIT("del error"); } return 0; }
转载请注明原文地址: https://www.6miu.com/read-5031254.html

最新回复(0)