摘要:上篇文章《Windows Sockets网络编程(4)套接字重叠IO模型》中,讲到了重叠IO的模型,同时也提到了APC函数,重叠IO是通过注册APC函数让线程调用来实现的,细心的你会发现这里有个严重的性能问题——那就是,只存在一个服务处理线程。为了解决该问题,Windows提出了IOCP模型(也叫完成端口模型),该模型主要增加了线程池。完成端口模型是一种真正意义上的异步模型,当应用程序需要管理成百上万的套接字,并且希望随着系统安装的CPU数量的增加,应用程序性能得到显著的提升,那么,完成端口是最好的选择。
按照大多数课本上的说法,IOCP模型的线程池大小设置为: CPU数量 × 2。
那么IOCP模型要如何实现呢?其实实现起来并不复杂。完成端口顾名思义,存在端口的概念,同时存在一个异步事件完成通知的概念。
Step 1 首先,你需要创建一个端口,
//创建完成端口 if ((hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) == NULL){ return -1; }
Step 2 端口,这在两个地方会用到。第一,创建线程池的时候,需要①将线程池中的每个线程都绑定到某一个端口上。
//线程池 for (unsigned int i = 0; i < cpuNum * 2; ++i){ if ((hThreadHandle[i] = (HANDLE)_beginthreadex(NULL, 0, service_work_thread, hComPort, 0, NULL)) == NULL){ return -1; } }
Step 3 第二个地方在哪里呢?是的,你需要②将每个套接字也绑定到某一个端口上,不然如何完成“线程<->套接字<->通知”这一些列的操作?没错,端口就是其中的桥梁。③完成端口是一个贯穿线程池、套接字的桥梁。
//绑定完成端口 if (CreateIoCompletionPort((HANDLE)sAccept, hComPort, (DWORD)iocp, 0) == NULL){ return -1; }
这里一直在用到CreateIoCompletionPort函数,但却没有介绍它,只知道它是一个系统API,那么是怎么使用它的呢?
CreateIoCompletionPort( _In_ HANDLE FileHandle, _In_opt_ HANDLE ExistingCompletionPort, _In_ ULONG_PTR CompletionKey, _In_ DWORD NumberOfConcurrentThreads );
FileHandle:文件句柄(在IOCP模型中就是套接字句柄);
ExistingCompletionPort:完成端口句柄;
CompletionKey:一般用于传入上下文,等到异步事件发生时可能通过该传入的上下文获取必须的信息;
NumberOfConcurrentThreads:完成端口并发线程的数量,如果=0则默认为CPU数量。
注意:由于创建完成端口使用的也是该函数(文章开头提到了),而此时并不存在套接字、完成端口,这时候第一个参数与第二个参数必须传入“INVALID_HANDLE_VALUE与NULL”。
说道这里可能会很疑惑,异步完成端口事件发生之后,线程池是如何工作的?其实,是由系统函数GetQueuedCompletionStatus来完成的。一般如何使用呢?下文章最后的DEMO中是这样使用的,
BOOL ret = GetQueuedCompletionStatus(hComPort, &dwIoSize, (LPDWORD)&iocp, &lpoverlapped, INFINITE);
函数原型,
GetQueuedCompletionStatus( _In_ HANDLE CompletionPort, _Out_ LPDWORD lpNumberOfBytesTransferred, _Out_ PULONG_PTR lpCompletionKey, _Out_ LPOVERLAPPED * lpOverlapped, _In_ DWORD dwMilliseconds );
CompletionPort:完成端口句柄;
lpNumberOfBytesTransferred:IO操作完成后,实际传输的字节数;
lpCompletionKey:上文中讲到的,利用它传输上下文;
lpOverlapped:指向重叠IO操作结构体的一个指针,可以通过它获得IO操作结果;
dwMilliseconds:在完成端口上的等待时间。
其实,GetQueuedCompletionStatus暂且就相当于select函数,都是阻塞式的,不同的是:当完成端口上有异步IO操作完成时,GetQueuedCompletionStatus会根据FIFO调度算法唤醒线程池中的一个线程,对该完成事件服务。
最后一个疑惑点,既然GetQueuedCompletionStatus是阻塞的,又只能被完成事件触发,那么如果完成端口一直没有事件被触发,岂不是程序无法退出的?系统的巨轮依然完美的行驶,说明早已解决了该疑惑。
引出本文最后一个介绍的函数PostQueuedCompletionStatus,它与GetQueuedCompletionStatus函数相得益彰,参数也几乎一样,你能通过以下这段代码猜到它的功能吗?
//销毁线程池 for (unsigned int i = 0; i < cpuNum * 2; ++i){ PostQueuedCompletionStatus(hComPort, 0, NULL, NULL); CloseHandle(hThreadHandle[i]); } WaitForMultipleObjects(cpuNum * 2, hThreadHandle, TRUE, INFINITE);
没错,PostQueuedCompletionStatus就是用来投递一个完成端口事件,用以触发GetQueuedCompletionStatus函数。下面来看一下PostQueuedCompletionStatus的原型,
PostQueuedCompletionStatus( _In_ HANDLE CompletionPort, _In_ DWORD dwNumberOfBytesTransferred, _In_ ULONG_PTR dwCompletionKey, _In_opt_ LPOVERLAPPED lpOverlapped );
与GetQueuedCompletionStatus函数原型一对比,只是少了个dwMilliseconds参数。这里,PostQueuedCompletionStatus与GetQueuedCompletionStatus函数的参数其实是一一映射关系,Post什么Get哪块就会接收到相应的内容。所以PostQueuedCompletionStatus一般用于完成端口间传递消息,那么通知完成端口退出,销毁线程池,当然完全不在话下。
文章最后,附上完整DEMO:
iocp.h
#pragma once #include <WinSock2.h> #pragma comment(lib,"ws2_32.lib") #define DATA_BUFSIZE 512 enum { READ, WRITE }; typedef struct io_operation_data { OVERLAPPED overlapped; WSABUF databuf; CHAR buffer[DATA_BUFSIZE]; BYTE type; DWORD length; }IO_OPERATION_DATA; class IOCP { public: IOCP(SOCKET s, SOCKADDR_IN sin); virtual ~IOCP(); BOOL recvAsynData(); BOOL sendAsynData(); public: SOCKET m_s; SOCKADDR_IN m_sin; IO_OPERATION_DATA m_io; };iocp.cpp
#include "IOCP.h" #include <process.h> #include <list> std::list<IOCP*> iocpList; HANDLE hComPort = NULL; IOCP::IOCP(SOCKET s, SOCKADDR_IN sin) { m_s = s; m_sin = sin; } IOCP::~IOCP() { } BOOL IOCP::recvAsynData() { DWORD flags = 0L; DWORD length = 0L; ZeroMemory(&m_io, sizeof (IO_OPERATION_DATA)); m_io.type = READ; m_io.databuf.buf = m_io.buffer; m_io.databuf.len = DATA_BUFSIZE; if (WSARecv(m_s, &m_io.databuf, 1, &length, &flags, &m_io.overlapped, NULL) == SOCKET_ERROR){ if (ERROR_IO_PENDING != WSAGetLastError()){ return FALSE; } } return TRUE; } BOOL IOCP::sendAsynData() { DWORD flags = 0L; DWORD length = 0L; m_io.type = WRITE; m_io.databuf.buf = m_io.buffer; m_io.databuf.len = strlen(m_io.buffer); if (WSASend(m_s, &m_io.databuf, 1, &length, flags, (LPOVERLAPPED)&m_io, NULL) == SOCKET_ERROR){ if (ERROR_IO_PENDING != WSAGetLastError()){ return FALSE; } } return TRUE; } void eraseIocp(IOCP* iocp) { std::list<IOCP*>::iterator iter = iocpList.begin(); for (; iter != iocpList.end(); ++iter){ if ((*iter) == iocp){ iocpList.erase(iter); break; } } } unsigned int __stdcall service_work_thread(void* context) { DWORD dwIoSize; IOCP* iocp; LPOVERLAPPED lpoverlapped; while (true){ Sleep(2000); dwIoSize = -1; lpoverlapped = NULL; iocp = NULL; BOOL ret = GetQueuedCompletionStatus(hComPort, &dwIoSize, (LPDWORD)&iocp, &lpoverlapped, INFINITE); if (iocp == NULL && lpoverlapped == NULL){//PostQueuedCompletionStatus break; } if (!ret){ printf("err\n"); DWORD dwIoErr = GetLastError(); if (dwIoErr == WAIT_TIMEOUT){ continue; } else if (NULL != lpoverlapped){ eraseIocp(iocp); } else{ break; } } else{ if (0 == dwIoSize){ eraseIocp(iocp); continue; } IO_OPERATION_DATA* pIO = CONTAINING_RECORD(lpoverlapped, IO_OPERATION_DATA, overlapped); switch (pIO->type){ case READ: iocp->m_io.length = dwIoSize; iocp->m_io.buffer[iocp->m_io.length] = '\0'; //printf("read:%s\n", iocp->m_io.buffer); char buffer[64]; sprintf_s(buffer, 64, "hi iocp msg.(%d)", iocp->m_s); strcpy_s(iocp->m_io.buffer, 64, buffer); iocp->sendAsynData(); break; case WRITE: //printf("write:%s\n",iocp->m_io.buffer); pIO->length = 0; if (FALSE == iocp->recvAsynData()){ eraseIocp(iocp); } break; default: break; } } } return 0; } int createIoCompletionPort(int port) { SYSTEM_INFO systemInfo; HANDLE hThreadHandle[64]; //创建完成端口 if ((hComPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0)) == NULL){ return -1; } GetSystemInfo(&systemInfo); DWORD cpuNum = systemInfo.dwNumberOfProcessors; //线程池 for (unsigned int i = 0; i < cpuNum * 2; ++i){ if ((hThreadHandle[i] = (HANDLE)_beginthreadex(NULL, 0, service_work_thread, hComPort, 0, NULL)) == NULL){ return -1; } } int ret = 0; WSADATA wsaData; if ((ret = WSAStartup(0x0202, &wsaData)) != 0){ return -1; } SOCKET sListen = INVALID_SOCKET; if ((sListen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED)) == INVALID_SOCKET){ WSACleanup(); return -1; } SOCKADDR_IN sin; sin.sin_family = AF_INET; sin.sin_addr.S_un.S_addr = htonl(INADDR_ANY); sin.sin_port = htons(port); if (bind(sListen, (SOCKADDR*)&sin, sizeof(sin)) == SOCKET_ERROR){ closesocket(sListen); WSACleanup(); return -1; } if (listen(sListen, SOMAXCONN)){ closesocket(sListen); WSACleanup(); return -1; } SOCKET sAccept = INVALID_SOCKET; SOCKADDR_IN saddr_in; int length = sizeof(saddr_in); int total = 0; while (true){ if ((sAccept = WSAAccept(sListen, (SOCKADDR*)&saddr_in, &length, NULL, 0)) == SOCKET_ERROR){ break; } ++total; if (total % 10 == 0){ printf("accept client count(%d)\n", total); } IOCP* iocp = new IOCP(sAccept, saddr_in); iocpList.push_back(iocp); //绑定完成端口 if (CreateIoCompletionPort((HANDLE)sAccept, hComPort, (DWORD)iocp, 0) == NULL){ return -1; } if (!iocp->recvAsynData()){ eraseIocp(iocp); } } //销毁线程池 for (unsigned int i = 0; i < cpuNum * 2; ++i){ PostQueuedCompletionStatus(hComPort, 0, NULL, NULL); CloseHandle(hThreadHandle[i]); } WaitForMultipleObjects(cpuNum * 2, hThreadHandle, TRUE, INFINITE); return 0; } int main(int argc, char* argv[]) { printf("io completion port.\n"); createIoCompletionPort(8086); return 0; }TCPClient.cpp
#include <WinSock2.h> #pragma comment(lib,"ws2_32.lib") #include <stdio.h> /* Date |Change ----------------------------------------- 2017-7-31 |SOCKET TCP测试客户端 */ void tcp_client() { SOCKET sClient = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); sockaddr_in sin; sin.sin_family = AF_INET; sin.sin_port = htons(8086); sin.sin_addr.S_un.S_addr = inet_addr("127.0.0.1"); if (connect(sClient, (sockaddr *)&sin, sizeof(sin)) == SOCKET_ERROR){ closesocket(sClient); return; } char buffer[1024]; sprintf_s(buffer, 1024, "hi client msg.(%d)", sClient); //Sleep(1000); int ret = send(sClient, buffer, strlen(buffer), 0); ret = recv(sClient, buffer, sizeof(buffer), 0); buffer[ret] = '\0'; //printf("%s\n", buffer); //closesocket(sClient); } int main(int argc, char* argv[]) { printf("tcp client.\n"); WSADATA wsa; WSAStartup(MAKEWORD(2, 2), &wsa); int i = 500000; while (i--){ tcp_client(); //Sleep(3); } WSACleanup(); return 0; }
在文章开始之前,先讲解一下电脑配置问题,如 i7-3720QM 16G 256G(ssd)+1T,16G很明显表示的是运行内存,256G+1T都是机械硬盘(用来存储),i7-3720QM是CPU型号,表示什么呢?M表示的是笔记本CPU,Q表示是4核的CPU(i7出现后才有此表示法)。如果去网上查询一下,就可以知道,该款CPU是“四核八线程”的。
这里需要再次解释一下什么是四核八线程?四核其实就是表示CPU里集成了四个CPU,那么四个CPU为何可以同时运行八个线程呢?《计算机组成原理》一书中讲到,CPU为了提高并发,里面设置了超流水。按照这里的八线程,就是每个逻辑CPU中有两条超流水(一个CPU内部可以2路并行)。当然,四个八线程和真正的8个独立CPU性能上肯定还存在较大区别。
完成端口(IOCP)由于是真正的异步模型,同时和线程池有莫大的联系,那么线程池的大小就至关重要了。一般而言,在古老的70年代,基本上都是单CPU系统,也不存在并行执行的问题。在那个时代,你无法想象一个人竟然可以一边听音乐一边QQ聊天(这运行的是两个软件)。在那个时代里,电脑必须执行完一件事才能去执行第二件事(听完音乐,才能去聊天),单CPU系统,资源不需要调度,运行速度极快。
但是,它却慢慢的不能满足时代的需求,80年代推出了——分时系统。这个系统,主要是通过时间片轮转算法来调度的,然后外加多队列缓冲机制。开始了真正的宏观上并行微观上串行格局,这时候也拓展了进程概念,同时也引入了线程,甚至后来出现了纤程。
一般而言,一个运行的程序就被视为一个进程,进程是系统分配资源的主体。在并发中,如果创建大量的进程的话,会造成系统资源的快速消耗,为此,推出了一种轻量级的“线程”,一个进程当中,存在1-N个线程(每个线程基本上只需要维护一个线程栈,系统默认分配4M大小),操作系统调度的实际上是线程(进程中必有一个主线程)。那么,什么是纤程呢?这是一个更加轻量级的概念,纤程的主体是线程,也就是说,纤程是依赖于线程而存在,在具体开发中是这样的,一个线程当中可以创建若干个纤程,线程本身是由操作系统调度,但是线程中的1-N个纤程却是由程序猿来管理运行的。
扯得有的远了,回到完成端口(IOCP)模型,由于引入了线程池,那么就需要控制线程池的大小。一般而言,四核八线程的CPU就只能并行运行8个线程。(注意:并行是指大家肩并肩一起前行,是同一时刻;并发是指某一段时间间隔内表现出一起前进的假象)。但是,并是不CPU同一时刻只能运行8个线程,就让线程池大小为8。这里需要考虑一个性能的问题,就是每个线程都可能进行IO操作,或者进入阻塞(等待其他事件发生)。这时候,CPU就闲下来了,虽然此时有8个线程存在。针对这种现象,为了充分发挥CPU的性能,需要一些后备线程,一旦CPU空闲,马上调度起来。
@qingdujun
2017-8-6 in Xi'An