IO CP完全解析


#pragma comment(lib,"ws2_32.lib") 从本质上说,完成端口模型要求创建一个 windows 完成端口对象,该对象通过指定数量的线程,对重叠 I/O 请求进 行管理,以便为已经完成的重叠 I/O 请求提供服务。 首先要创建一个 I/O 完成端口对象,用它面向任意数量的套接字句柄,管理多个 I/O 请求。调用以下函数创建 完成端口对象: HANDLE CreateIoCompletionPort( HANDLE FileHandle,// 同 IOCP 关联在一起的套接字句柄 HANDLE ExistingCompletionPort,// IOCP 句柄 ULONG_PTR CompletionKey, // 完成健 DWORD NumberOfConcurrentThreads // 在 IOCP 上,同时允许执行的线程数量 ); 该函数有两个作用: (1)创建一个完成端口对象 (2)将一个句柄同完成端口关联到一起 然后就要创建一定数量的工作者线程,以便在套接字的 I/O 请求投递给完成端口后,为完成端口提供服务。写 文字描述很烦,还是看代码吧: // NetServer3.cpp : Defines the entry point for the console application. // #include "stdafx.h" #include "NetServer3.h" #include #pragma comment(lib, "ws2_32.lib") #include using namespace std; /**/////////////////////////////////////////////////////////////////////////// #ifdef _DEBUG #define new DEBUG_NEW #undef THIS_FILE static char THIS_FILE[] = __FILE__; #endif /**/////////////////////////////////////////////////////////////////////////// // 单句柄数据 typedef struct tagPER_HANDLE_DATA { SOCKET Socket; SOCKADDR_STORAGE ClientAddr; // 将和这个句柄关联的其他有用信息,尽管放在这里面吧 }PER_HANDLE_DATA, *LPPER_HANDLE_DATA; // 但 I/O 操作数据 typedef struct tagPER_IO_DATA { OVERLAPPED Overlapped; WSABUF DataBuf; char buffer[1024]; int BufferLen; int OperationType; // 可以作为读写的标志,为简单,我忽略了 }PER_IO_DATA, *LPPER_IO_DATA; DWORD WINAPI ServerWorkerThread(LPVOID lpParam); /**////////////////////////////////////////////////////////////////////////////// // The one and only application object CWinApp theApp; using namespace std; int _tmain(int argc, TCHAR* argv[], TCHAR* envp[]) { int nRetCode = 0; // initialize MFC and print and error on failure if (!AfxWinInit(::GetModuleHandle(NULL), NULL, ::GetCommandLine(), 0)) { // TODO: change error code to suit your needs cerr << _T("Fatal Error: MFC initialization failed") << endl; nRetCode = 1; } else { // TODO: code your application's behavior here. CString strHello; strHello.LoadString(IDS_HELLO); cout << (LPCTSTR)strHello << endl; } /**/////////////////////////////////////////////////////////////////////////// HANDLE CompletionPort; WSADATA wsd; SYSTEM_INFO SystemInfo; SOCKADDR_IN InternetAddr; SOCKET Listen; // 加载 WinSock2.2 WSAStartup(MAKEWORD(2, 2), &wsd); // 1.创建一个 I/O 完成端口 CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); // 2.确定系统中有多少个处理器 GetSystemInfo(&SystemInfo); // 3.基于系统中可用的处理器数量创建工作器线程 for (int i = 0; i < SystemInfo.dwNumberOfProcessors * 2; ++i) { HANDLE ThreadHandle; // 创建一个服务器的工作器线程,并将完成端口传递到该线程 ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, CompletionPort, 0, NULL); CloseHandle(ThreadHandle); } // 4.创建一个监听套接字,以下的套路都是固定的。 Listen = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED); InternetAddr.sin_family = PF_INET; InternetAddr.sin_port = htons(5000); InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY); bind(Listen, (SOCKADDR*)&InternetAddr, sizeof(InternetAddr)); listen(Listen, 5); BOOL b = TRUE; while (b) { PER_HANDLE_DATA * PerHandleData = NULL; SOCKADDR_IN saRemote; SOCKET Accept; int RemoteLen; // 5.接收连接,并分配完成端口,这儿可以用 AcceptEx 来代替,以创 // 建可伸缩的 Winsock 应用程序。 RemoteLen = sizeof(saRemote); Accept = accept(Listen, (SOCKADDR*)&saRemote, &RemoteLen); // 6.创建用来和套接字关联的单句柄数据信息结构 PerHandleData = (LPPER_HANDLE_DATA)GlobalAlloc(GPTR, sizeof(PER_HANDLE_DATA)); cout << "Socket number " << Accept << " connected" << endl; PerHandleData->Socket = Accept; memcpy(&PerHandleData->ClientAddr, &saRemote, RemoteLen); // 7.将接受套接字和完成端口关联起来 CreateIoCompletionPort((HANDLE)Accept, CompletionPort, (DWORD)PerHandleData, 0); // 开始在接受套接字上处理 I/O // 使用重叠 I/O 机制,在新建的套接字上投递一个或多个异步 // WSARecv 或 WSASend 请求。这些 I/O 请求完成后,工作者线程 // 会为 I/O 请求提供服务,之后就可以坐享其成了 static int const DATA_BUFSIZE = 4096; // DWORD RecvBytes = 0; DWORD Flags = 0; // 单 I/O 操作数据 LPPER_IO_DATA PerIoData = NULL; PerIoData = (LPPER_IO_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_DATA)); ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED)); PerIoData->DataBuf.len = 1024; PerIoData->DataBuf.buf = PerIoData->buffer; PerIoData->OperationType = 0; // read WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags, &(PerIoData->Overlapped), NULL); } /**/////////////////////////////////////////////////////////////////////////// return nRetCode; } /**/////////////////////////////////////////////////////////////////////////// DWORD WINAPI ServerWorkerThread(LPVOID lpParam) { HANDLE CompletionPort = (HANDLE)lpParam; DWORD BytesTransferred; LPOVERLAPPED lpOverlapped; LPPER_HANDLE_DATA PerHandleData = NULL; LPPER_IO_DATA PerIoData = NULL; DWORD SendBytes; DWORD RecvBytes; DWORD Flags; BOOL bRet = FALSE; while (TRUE) { bRet = GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR) &PerHandleData, (LPOVERLAPPED*) &lpOverlapped, INFINITE); // 检查成功的返回,这儿要注意使用这个宏 CONTAINING_RECORD PerIoData = (LPPER_IO_DATA)CONTAINING_RECORD(lpOverlapped, PER_IO_DATA, Overlapped); // 先检查一下,看看是否在套接字上已有错误发生 if (0 == BytesTransferred) { closesocket(PerHandleData->Socket); GlobalFree(PerHandleData); GlobalFree(PerIoData); continue; } // 数据处理 // 成功了!!!这儿就收到了来自客户端的数据 cout << PerIoData->DataBuf.buf << endl; Flags = 0; // 为下一个重叠调用建立单 I/O 操作数据 ZeroMemory(&(PerIoData->Overlapped), sizeof(OVERLAPPED)); PerIoData->DataBuf.len = 1024; PerIoData->DataBuf.buf = PerIoData->buffer; PerIoData->OperationType = 0; // read WSARecv(PerHandleData->Socket, &(PerIoData->DataBuf), 1, &RecvBytes, &Flags, &(PerIoData->Overlapped), NULL); } return 0; } 通常要开发网络应用程序并不是一件轻松的事情,不过,实际上只要掌握几个关键的原则也就可以了——创建和连 接一个套接字,尝试进行连接,然后收发数据。真 正难的是要写出一个可以接纳少则一个,多则数千个连接的网 络应用程序。本文将讨论如何通过 Winsock2 在 Windows NT 和 Windows 2000 上开发高扩展能力的 Winsock 应用程 序。文章主要的焦点在客户机/服务器模型的服务器这一方,当然,其中的许多要点对模型的双方都适用。 API 与响应规模 通过 Win32 的重叠 I/O 机制,应用程序可以提请一项 I/O 操作,重叠的操作请求在后台完成,而同一时间提请操作 的线程去做其他的事情。等重叠操作完成后线程收到有关的通知。这种机制对那些耗时的操作而言特别有用。不过, 像 Windows 3.1 上的 WSAAsyncSelect()及 Unix 下的 select()那样的函数虽然易于使用,但是它们不能满足响应规 模的需要。而完成端口机制是针对操作系统内部进行了优化,在 Windows NT 和 Windows 2000 上,使用了完成端 口的重叠 I/O 机制才能够真正扩大系统的响应规模。 完成端口 一个完成端口其实就是一个通知队列,由操作系统把已经完成的重叠 I/O 请求的通知放入其中。当某项 I/O 操作一 旦完成,某个可以对该操作结果进行处理的工作者线程就会收到一则通知。而套接字在被创建后,可以在任何时候 与某个完成端口进行关联。 通常情况下,我们会在应用程序中创建一定数量的工作者线程来处理这些通知。线程数量取决于应用程序的特定需 要。理想的情况是,线程数量等于处理器的数量,不过这也要求任何线程都不应该执行诸如同步读写、等待事件通 知等阻塞型的操作,以免线程阻塞。每个线程都将分到一定的 CPU 时间,在此期间该线程可以运行,然后另一个线 程将分到一个时间片并开始执行。如果某个线程执行了阻塞型的操作,操作系统将剥夺其未使用的剩余时间片并让 其它线程开始执行。也就是说,前一个线程没有充分使用其时间片,当发生这样的情况时,应用程序应该准备其它 线程来充分利用这些时间片。 完成端口的使用分为两步。首先创建完成端口,如以下代码所示: HANDLE hIocp; hIocp = CreateIoCompletionPort( INVALID_HANDLE_VALUE, NULL, (ULONG_PTR)0, 0); if (hIocp == NULL) { // Error } 完成端口创建后,要把将使用该完成端口的套接字与之关联起来。方法是再次调用 CreateIoCompletionPort ()函 数,第一个参数 FileHandle 设为套接字的句柄,第二个参数 ExistingCompletionPort 设为刚刚创建的那个完成 端口的句柄。 以下代码创建了一个套接字,并把它和前面创建的完成端口关联起来: SOCKET s; s = socket(AF_INET, SOCK_STREAM, 0); if (s == INVALID_SOCKET) { // Error if (CreateIoCompletionPort((HANDLE)s, hIocp, (ULONG_PTR)0, 0) == NULL) { // Error } ... } 这时就完成了套接字与完成端口的关联操作。在这个套接字上进行的任何重叠操作都将通过完成端口发出完成通 知。注意,CreateIoCompletionPort()函数中的第三个参数用来设置一个与该套接字相关的“完成键(completion key)”(译者注:完成键可以是任何数据类型)。每当完成通知到来时,应用程序可以读取相应的完成键,因此,完 成键可用来给套接字传递一些背景信息。 在创建了完成端口、将一个或多个套接字与之相关联之后,我们就要创建若干个线程来处理完成通知。这些线程不 断循环调用 GetQueuedCompletionStatus ()函数并返回完成通知。 下面,我们先来看看应用程序如何跟踪这些重叠操作。当应用程序调用一个重叠操作函数时,要把指向一个 overlapped 结构的指针包括在其参数中。当操作完成后,我们可以通过 GetQueuedCompletionStatus()函数中拿回 这个指针。不过,单是根据这个指针所指向的 overlapped 结构,应用程序并不能分辨究竟完成的是哪个操作。要 实现对操作的跟踪,你可以自己定义一个 OVERLAPPED 结构,在其中加入所需的跟踪信息。 无论何时调用重叠操作函数时,总是会通过其 lpOverlapped 参数传递一个 OVERLAPPEDPLUS 结构(例如 WSASend、 WSARecv 等函数)。这就允许你为每一个重叠调用操作设置某些操作状态信息,当操作结束后,你可以通过 GetQueuedCompletionStatus()函数获得你自定义结构的指针。注意 OVERLAPPED 字段不要求一定是这个扩展后的结 构的第一个字段。当得到了指向 OVERLAPPED 结构的指针以后,可以用 CONTAINING_RECORD 宏取出其中指向扩展结 构的指针。 OVERLAPPED 结构的定义如下: typedef struct _OVERLAPPEDPLUS { OVERLAPPED ol; SOCKET s, sclient; int OpCode; WSABUF wbuf; DWORD dwBytes, dwFlags; // 其它有用的信息 } OVERLAPPEDPLUS; #define OP_READ 0 #define OP_WRITE 1 #define OP_ACCEPT 2 下面让我们来看看工作者线程的情况。 工作线程 WorkerThread 代码: DWORD WINAPI WorkerThread(LPVOID lpParam) { ULONG_PTR *PerHandleKey; OVERLAPPED *Overlap; OVERLAPPEDPLUS *OverlapPlus, *newolp; DWORD dwBytesXfered; while (1) { ret = GetQueuedCompletionStatus( hIocp, &dwBytesXfered, (PULONG_PTR)&PerHandleKey, &Overlap, INFINITE); if (ret == 0) { // Operation failed continue; } OverlapPlus = CONTAINING_RECORD(Overlap, OVERLAPPEDPLUS, ol); switch (OverlapPlus->OpCode) { case OP_ACCEPT: // Client socket is contained in OverlapPlus.sclient // Add client to completion port CreateIoCompletionPort( (HANDLE)OverlapPlus->sclient, hIocp, (ULONG_PTR)0, 0); // Need a new OVERLAPPEDPLUS structure // for the newly accepted socket. Perhaps // keep a look aside list of free structures. newolp = AllocateOverlappedPlus(); if (!newolp) { // Error } newolp->s = OverlapPlus->sclient; newolp->OpCode = OP_READ; // This function prepares the data to be sent PrepareSendBuffer(&newolp->wbuf); ret = WSASend( newolp->s, &newolp->wbuf, 1, &newolp->dwBytes, 0, &newolp.ol, NULL); if (ret == SOCKET_ERROR) { if (WSAGetLastError() != WSA_IO_PENDING) { // Error } } // Put structure in look aside list for later use FreeOverlappedPlus(OverlapPlus); // Signal accept thread to issue another AcceptEx SetEvent(hAcceptThread); break; case OP_READ: // Process the data read // ... // Repost the read if necessary, reusing the same // receive buffer as before memset(&OverlapPlus->ol, 0, sizeof(OVERLAPPED)); ret = WSARecv( OverlapPlus->s, &OverlapPlus->wbuf, 1, &OverlapPlus->dwBytes, &OverlapPlus->dwFlags, &OverlapPlus->ol, NULL); if (ret == SOCKET_ERROR) { if (WSAGetLastError() != WSA_IO_PENDING) { // Error } } break; case OP_WRITE: // Process the data sent, etc. break; } // switch } // while } // WorkerThread 其中每句柄键(PerHandleKey)变量的内容,是在把完成端口与套接字进行关联时所设置的完成键参数;Overlap 参 数返回的是一个指向发出重叠操作时所使用的那个 OVERLAPPEDPLUS 结构的指针。 要记住,如果重叠操作调用失败时(也就是说,返回值是 SOCKET_ERROR,并且错误原因不是 WSA_IO_PENDING),那 么完成端口将不会收到任何完成通知。如果重叠操作调用成功,或者发生原因是 WSA_IO_PENDING 的错误时,完成 端口将总是能够收到完成通知。 Windows NT 和 Windows 2000 的套接字架构 对于开发大响应规模的 Winsock 应用程序而言,对 Windows NT 和 Windows 2000 的套接字架构有基本的了解是很有 帮助的。下图是 Windows 2000 中的 Winsock 架构: 与其它类型操作系统不同,Windows NT 和 Windows 2000 的传输协议没有一种风格像套接字那样的、可以和应用程 序直接交谈的界面,而是采用了一种更为底层的 API,叫做传输驱动程序界面 (Transport Driver Interface,TDI)。 Winsock 的核心模式驱动程序负责连接和缓冲区管理,以便向应用程序提供套接字仿真(在 AFD.SYS 文件中实现), 同时负责与底层传输驱动程序对话。 谁来负责管理缓冲区? 正如上面所说的,应用程序通过 Winsock 来和传输协议驱动程序交谈,而 AFD.SYS 负责为应用程序进行缓冲区管理。 也就是说,当应用程序调用 send()或 WSASend()函数来发送数据时,AFD.SYS 将把数据拷贝进它自己的内部缓冲区 (取决于 SO_SNDBUF 设定值),然后 send ()或 WSASend()函数立即返回。也可以这么说,AFD.SYS 在后台负责把数 据发送出去。不过,如果应用程序要求发出的数据超过了 SO_SNDBUF 设定的缓冲区大小,那么 WSASend()函数会阻 塞,直至所有数据发送完毕。 从远程客户端接收数据的情况也类似。只要不用从应用程序那里接收大量的数据,而且没有超出 SO_RCVBUF 设定的 值,AFD.SYS 将把数据先拷贝到其内部缓冲区中。当应用程序调用 recv()或 WSARecv()函数时,数据将从内部缓冲 拷贝到应用程序提供的缓冲区。 多数情况下,这样的架构运行良好,特别在是应用程序采用传统的套接字下非重叠的 send()和 receive()模式编写 的时候。不过程序员要小心的是,尽管可以通过 setsockopt()这个 API 来把 SO_SNDBUF 和 SO_RCVBUF 选项值设成 0(关闭内部缓冲区),但是程序员必须十分清楚把 AFD.SYS 的内部缓冲区关掉会造成什么后果,避免收发数据时有 关的缓冲区拷贝可能引起的系统崩溃。 举例来说,一个应用程序通过设定 SO_SNDBUF 为 0 把缓冲区关闭,然后发出一个阻塞 send()调用。在这样的情况 下,系统内核会把应用程序的缓冲区锁定,直到接收方确认收到了整个缓冲区后 send()调用才返回。似乎这是一 种判定你的数据是否已经为对方全部收到的简洁的方法,实际上却并非如此。想想看,即使远端 TCP 通知数据已 经收到,其实也根本不代表数据已经成功送给客户端应用程序,比如对方可能发生资源不足的情况,导致 AFD.SYS 不能把数据拷贝给应用程序。另一个更要紧的问题是,在每个线程中每次只能进行一次发送调用,效率极其低下。 把 SO_RCVBUF 设为 0,关闭 AFD.SYS 的接收缓冲区也不能让性能得到提升,这只会迫使接收到的数据在比 Winsock 更低的层次进行缓冲,当你发出 receive 调用时,同样要进行缓冲区拷贝,因此你本来想避免缓冲区拷贝的阴谋不 会得逞。 现在我们应该清楚了,关闭缓冲区对于多数应用程序而言并不是什么好主意。只要要应用程序注意随时在某个连接 上保持几个 WSARecvs 重叠调用,那么通常没有必要关闭接收缓冲区。如果 AFD.SYS 总是有由应用程序提供的缓冲 区可用,那么它将没有必要使用内部缓冲区。 高性能的服务器应用程序可以关闭发送缓冲区,同时不会损失性能。不过,这样的应用程序必须十分小心,保证它 总是发出多个重叠发送调用,而不是等待某个重叠发送结束了才发出下一个。如果应用程序是按一个发完再发下一 个的顺序来操作,那浪费掉两次发送中间的空档时间,总之是要保证传输驱动程序在发送完一个缓冲区后,立刻可 以转向另一个缓冲区。 资源的限制条件 在设计任何服务器应用程序时,其强健性是主要的目标。也就是说, 你的应用程序要能够应对任何突发的问题,例如并发客户请求数达到峰值、可用内存临时出现不足、以及其它短时 间的现象。这就要求程序的设计者注意 Windows NT 和 2000 系统下的资源限制条件的问题,从容地处理突发性事件。 你可以直接控制的、最基本的资源就是网络带宽。通常,使用用户数据报协议(UDP)的应用程序都可能会比较注意 带宽方面的限制,以最大限度地减少包的丢失。然而,在使用 TCP 连接时,服务器必须十分小心地控制好,防止网 络带宽过载超过一定的时间,否则将需要重发大量的包或造成大量连接中断。关于带宽管理的方法应根据不同的应 用程序而定,这超出了本文讨论的范围。 虚拟内存的使用也必须很小心地管理。通过谨慎地申请和释放内存,或 者应用 lookaside lists(一种高速缓存) 技术来重新使用已分配的内存,将有助于控制服务器应用程序的内存开销(原文为“让服务器应用程序留下的脚印 小一点”),避免操作系统频繁地将应用程序申请的物理内存交换到虚拟内存中(原文为“让操作系统能够总是把更 多的应用程序地址空间更多地保留在内存中”)。你也可以通过 SetWorkingSetSize()这个 Win32 API 让操作系统 分配给你的应用程序更多的物理内存。 在使用 Winsock 时还可能碰到另外两个非直接的资源不足情况。一个是被锁定的内存页面的极限。如果你把 AFD.SYS 的缓冲关闭,当应用程序收发数据时,应用程序缓冲区的所有页面将被锁定到物理内存中。这是因为内核驱动程序 需要访问这些内存,在此期间这些页面不能交换出去。如果操作系统需要给其它应用程序分配一些可分页的物理内 存,而又没有足够的内存时就会发生问题。我们的目标是要防止写出一个病态的、锁定所有物理内存、让系统崩溃 的程序。也就是说,你的程序锁定内存时,不要超出系统规定的内存分页极限。 在 Windows NT 和 2000 系统上,所有应用程序总共可以锁定的内存大约是物理内存的 1/8(不过这只是一个大概的 估计,不是你计算内存的依据)。如果你的应用程序不注意这一点,当你的发出太多的重叠收发调用,而且 I/O 没 来得及完成时,就可能偶尔发生 ERROR_INSUFFICIENT_RESOURCES 的错误。在这种情况下你要避免过度锁定内存。 同时要注意,系统会锁定包含你的缓冲区所在的整个内存页面,因此缓冲区靠近页边界时是有代价的(译者理解, 缓冲区如果正好超过页面边界,那怕是 1 个字节,超出的这个字节所在的页面也会被锁定)。 另外一个限制是你的程序可能会遇到系统未分页池资源不足的情况。所谓未分页池是一块永远不被交换出去的内存 区域,这块内存用来存储一些供各种内核组件访问的数据,其中有的内核组件是不能访问那些被交换出去的页面空 间的。Windows NT 和 2000 的驱动程序能够从这个特定的未分页池分配内存。 当应用程序创建一个套接字(或者是类似的打开某个文件)时,内核会从未分页池中分配一定数量的内存,而且在绑 定、连接套接字时,内核又会从未分页池中再分配一些内存。当你注意观察这种行为时你将发现,如果你发出某些 I/O 请求时(例如收发数据),你会从未分页池里再分配多一些内存(比如要追踪某个待决的 I/O 操作,你可能需要 给这个操作添加一个自定义结构,如前文所提及的)。最后这就可能会造成一定的问题,操作系统会限制未分页内 存的用量。 在 Windows NT 和 2000 这两种操作系统上,给每个连接分配的未分页内存的具体数量是不同的,未来版本的 Windows 很可能也不同。为了使应用程序的生命期更长,你就不应该计算对未分页池内存的具体需求量。 你的程序必须防止消耗到未分页池的极限。当系统中未分页池剩余空间太小时,某些与你的应用程序毫无关系的内 核驱动就会发疯,甚至造成系统崩溃,特别是当系统中有第三方设备或驱动程序时,更容易发生这样的惨剧(而且 无法预测)。同时你还要记住,同一台电脑上还可能运行有其它同样消耗未分页池的其它应用程序,因此在设计你 的应用程序时,对资源量的预估要特别保守和谨慎。 处理资源不足的问题是十分复杂的,因为发生上述情况时你不会收到特别的错误代码,通常你只能收到一般性的 WSAENOBUFS 或者 ERROR_INSUFFICIENT_RESOURCES 错误。要处理这些错误,首先,把你的应用程序工作配置调整 到 合 理 的 最 大 值 ( 译 者 注 : 所 谓 工 作 配 置 , 是指应用程序各部分运行中所需的内存用量 , 请 参 考 http://msdn.microsoft.com/msdnmag/issues/1000/Bugslayer/Bugslayer1000.asp ,关于内存优化,译者另有译 文),如果错误继续出现,那么注意检查是否是网络带宽不足的问题。之后,请确认你没有同时发出太多的收发调 用。最后,如果还是收到资源不足的错误,那就很可能是遇到了未分页内存池不足的问题了。要释放未分页内存池 空间,请关闭应用程序中相当部分的连接,等待系统自行渡过和修正这个瞬时的错误。 接受连接请求 服务器要做的最普通的事情之一就是接受来自客户端的连接请求。在套接字上使用重叠 I/O 接受连接的惟一 API 就 是 AcceptEx()函数。有趣的是,通常的同步接受函数 accept()的返回值是一个新的套接字,而 AcceptEx()函数则 需要另外一个套接字作为它的参数之一。这是因为 AcceptEx()是一个重叠操作,所以你需要事先创建一个套接字 (但不要绑定或连接它),并把这个套接字通过参数传给 AcceptEx()。以下是一小段典型的使用 AcceptEx()的伪代 码: do { -等待上一个 AcceptEx 完成 -创建一个新套接字并与完成端口进行关联 -设置背景结构等等 -发出一个 AcceptEx 请求 }while(TRUE); 作为一个高响应能力的服务器,它必须发出足够的 AcceptEx 调用,守候着,一旦出现客户端连接请求就立刻响应。 至于发出多少个 AcceptEx 才够,就取决于你的服务器程序所期待的通信交通类型。比如,如果进入连接率高的情 况(因为连接持续时间较短,或者出现交通高峰),那么所需要守候的 AcceptEx 当然要比那些偶尔进入的客户端连 接的情况要多。聪明的做法是,由应用程序来分析交通状况,并调整 AcceptEx 守候的数量,而不是固定在某个数 量上。 对于 Windows2000,Winsock 提供了一些机制,帮助你判定 AcceptEx 的数量是否足够。这就是,在创建监听套接字 时创建一个事件,通过 WSAEventSelect()这个 API 并注册 FD_ACCEPT 事件通知来把套接字和这个事件关联起来。 一旦系统收到一个连接请求,如果系统中没有 AcceptEx()正在等待接受连接,那么上面的事件将收到一个信号。 通过这个事件,你就可以判断你有没有发出足够的 AcceptEx(),或者检测出一个非正常的客户请求(下文述)。这 种机制对 Windows NT 4.0 不适用。 使用 AcceptEx()的一大好处是,你可以通过一次调用就完成接受客户端连接请求和接受数据(通过传送 lpOutputBuffer 参数)两件事情。也就是说,如果客户端在发出连接的同时传输数据,你的 AcceptEx()调用在连接 创建并接收了客户端数据后就可以立刻返回。这样可能是很有用的,但是也可能会引发问题,因为 AcceptEx()必 须等全部客户端数据都收到了才返回。具体来说,如果你在发出 AcceptEx()调用的同时传递了 lpOutputBuffer 参 数,那么 AcceptEx()不再是一项原子型的操作,而是分成了两步:接受客户连接,等待接收数据。当缺少一种机 制来通知你的应用程序所发生的这种情况:“连接已经建立了,正在等待客户端数据”,这将意味着有可能出现客户 端只发出连接请求,但是不发送数据。如果你的服务器收到太多这种类型的连接时,它将拒绝连接更多的合法客户 端请求。这就是黑客进行“拒绝服务”攻击的常见手法。 要预防此类攻击,接受连接的线程应该不时地通过调用 getsockopt()函数(选项参数为 SO_CONNECT_TIME)来检查 AcceptEx()里守候的套接字。getsockopt()函数的选项值将被设置为套接字被连接的时间,或者设置为-1(代表套 接字尚未建立连接)。这时,WSAEventSelect()的特性就可以很好地利用来做这种检查。如果发现连接已经建立, 但是很久都没有收到数据的情况,那么就应该终止连接,方法就是关闭作为参数提供给 AcceptEx()的那个套接字。 注意,在多数非紧急情况下,如果套接字已经传递给 AcceptEx()并开始守候,但还未建立连接,那么你的应用程 序不应该关闭它们。这是因为即使关闭了这些套接字,出于提高系统性能的考虑,在连接进入之前,或者监听套接 字自身被关闭之前,相应的内核模式的数据结构也不会被干净地清除。 发出 AcceptEx()调用的线程,似乎与那个进行完成端口关联操作、处理其它 I/O 完成通知的线程是同一个,但是, 别忘记线程里应该尽力避免执行阻塞型的操作。Winsock2 分层结构的一个副作用是调用 socket()或 WSASocket() API 的上层架构可能很重要(译者不太明白原文意思,抱歉)。每个 AcceptEx()调用都需要创建一个新套接字,所以 最好有一个独立的线程专门调用 AcceptEx(),而不参与其它 I/O 处理。你也可以利用这个线程来执行其它任务, 比如事件记录。 有关 AcceptEx()的最后一个注意事项:要实现这些 API,并不需要其它提供商提供的 Winsock2 实现。这一点对微 软特有的其它 API 也同样适用,比如 TransmitFile()和 GetAcceptExSockAddrs(),以及其它可能会被加入到新版 Windows 的 API. 在 Windows NT 和 2000 上,这些 API 是在微软的底层提供者 DLL(mswsock.dll)中实现的,可通过 与 mswsock.lib 编译连接进行调用,或者通过 WSAIoctl() (选项参数为 SIO_GET_EXTENSION_FUNCTION_POINTER) 动态获得函数的指针。 如果在没有事先获得函数指针的情况下直接调用函数(也就是说,编译时静态连接 mswsock.lib,在程序中直接调 用函数),那么性能将很受影响。因为 AcceptEx()被置于 Winsock2 架构之外,每次调用时它都被迫通过 WSAIoctl() 取得函数指针。要避免这种性能损失,需要使用这些 API 的应用程序应该通过调用 WSAIoctl()直接从底层的提供 者那里取得函数的指针。 参见下图套接字架构: TransmitFile 和 TransmitPackets Winsock 提供两个专门为文件和内存数据传输进行了优化的函数。其中 TransmitFile()这个 API 函数在 Windows NT 4.0 和 Windows 2000 上都可以使用,而 TransmitPackets()则将在未来版本的 Windows 中实现。 TransmitFile ()用来把文件内容通过 Winsock 进行传输。通常发送文件的做法是,先调用 CreateFile()打开一个 文件,然后不断循环调用 ReadFile () 和 WSASend ()直至数据发送完毕。但是这种方法很没有效率,因为每次调 用 ReadFile() 和 WSASend ()都会涉及一次从用户模式到内核模式的转换。如果换成 TransmitFile(),那么只需 要给它一个已打开文件的句柄和要发送的字节数,而所涉及的模式转换操作将只在调用 CreateFile()打开文件时 发生一次,然后 TransmitFile()时再发生一次。这样效率就高多了。 TransmitPackets()比 TransmitFile()更进一步,它允许用户只调用一次就可以发送指定的多个文件和内存缓冲区。 函数原型如下: BOOL TransmitPackets( SOCKET hSocket, LPTRANSMIT_PACKET_ELEMENT lpPacketArray, DWORD nElementCount, DWORD nSendSize, LPOVERLAPPED lpOverlapped, DWORD dwFlags ); 其中,lpPacketArray 是一个结构的数组,其中的每个元素既可以是一个文件句柄或者内存缓冲区,该结构定义如 下: typedef struct _TRANSMIT_PACKETS_ELEMENT { DWORD dwElFlags; DWORD cLength; union { struct { LARGE_INTEGER nFileOffset; HANDLE hFile; }; PVOID pBuffer; }; } TRANSMIT_FILE_BUFFERS; 其中各字段是自描述型的(self explanatory)。 dwElFlags 字 段 : 指定当前元素是一个文件句柄还是内存缓冲区 ( 分 别 通 过 常 量 TF_ELEMENT_FILE 和 TF_ELEMENT_MEMORY 指定); cLength 字段:指定将从数据源发送的字节数(如果是文件,这个字段值为 0 表示发送整个文件); 结构中的无名联合体:包含文件句柄的内存缓冲区(以及可能的偏移量)。 使用这两个 API 的另一个好处,是可以通过指定 TF_REUSE_SOCKET 和 TF_DISCONNECT 标志来重用套接字句柄。每当 API 完成数据的传输工作后,就会在传输层级别断开连接,这样这个套接字就又可以重新提供给 AcceptEx()使用。 采用这种优化的方法编程,将减轻那个专门做接受操作的线程创建套接字的压力(前文述及)。 这两个 API 也都有一个共同的弱点:Windows NT Workstation 或 Windows 2000 专业版中,函数每次只能处理两 个调用请求,只有在 Windows NT、Windows 2000 服务器版、Windows 2000 高级服务器版或 Windows 2000 Data Center 中才获得完全支持。 放在一起看看 以上各节中,我们讨论了开发高性能的、大响应规模的应用程序所需的函数、方法和可能遇到的资源瓶颈问题。这 些对你意味着什么呢?其实,这取决于你如何构造你的服务器和客户端。当你能够在服务器和客户端设计上进行更 好地控制时,那么你越能够避开瓶颈问题。 来看一个示范的环境。我们要设计一个服务器来响应客户端的连接、发送请求、接收数据以及断开连接。那么,服 务器将需要创建一个监听套接字,把它与某个完成端口进行关联,为每颗 CPU 创建一个工作线程。再创建一个线程 专门用来发出 AcceptEx()。我们知道客户端会在发出连接请求后立刻传送数据,所以如果我们准备好接收缓冲区 会使事情变得更为容易。当然,不要忘记不时地轮询 AcceptEx()调用中使用的套接字(使用 SO_CONNECT_TIME 选项 参数)来确保没有恶意超时的连接。 该设计中有一个重要的问题要考虑,我们应该允许多少个 AcceptEx()进行守候。这是因为,每发出一个 AcceptEx() 时我们都同时需要为它提供一个接收缓冲区,那么内存中将会出现很多被锁定的页面(前文说过了,每个重叠操作 都会消耗一小部分未分页内存池,同时还会锁定所有涉及的缓冲区)。这个问题很难回答,没有一个确切的答案。 最好的方法是把这个值做成可以调整的,通过反复做性能测试,你就可以得出在典型应用环境中最佳的值。 好了,当你测算清楚后,下面就是发送数据的问题了,考虑的重点是你希望服务器同时处理多少个并发的连接。通 常情况下,服务器应该限制并发连接的数量以及等候处理的发送调用。因为并发连接数量越多,所消耗的未分页内 存池也越多;等候处理的发送调用越多,被锁定的内存页面也越多(小心别超过了极限)。这同样也需要反复测试才 知道答案。 对于上述环境,通常不需要关闭单个套接字的缓冲区,因为只在 AcceptEx()中有一次接收数据的操作,而要保证 给每个到来的连接提供接收缓冲区并不是太难的事情。但是,如果客户机与服务器交互的方式变一变,客户机在发 送了一次数据之后,还需要发送更多的数据,在这种情况下关闭接收缓冲就不太妙了,除非你想办法保证在每个连 接上都发出了重叠接收调用来接收更多的数据。 结论 开发大响应规模的 Winsock 服务器并不是很可怕,其实也就是设置一个监听套接字、接受连接请求和进行重叠收发 调用。通过设置合理的进行守候的重叠调用的数量,防止出现未分页内存池被耗尽,这才是最主要的挑战。按照我 们前面讨论的一些原则,你就可以开发出大响应规模的服务器应用程序。 另一篇 程序运行效果截图: 在 WINDOWS 下进行网络服务端程序开发,毫无疑问,Winsock 完成端口模型是最高效的。Winsock 的完成端口模型 借助 Widnows 的重叠 IO 和完成端口来实现,完成端口模型懂了之后是比较简单的,但是要想掌握 Winsock 完成端 口模型,需要对 WINDOWS 下的线程、线程同步,Winsock API 以及 WINDOWS IO 机制有一定的了解。如果不了解, 推荐几本书:《Inside Windows 2000,《WINDOWS 核心编程》,《WIN32 多线程程序设计》、《WINDOWS 网络编程技术》。 在去年,我在 C 语言下用完成端口模型写了一个 WEBSERVER,前些天,我决定用 C++重写这个 WEBSERVER,给这个 WEBSERVER 增加了一些功能,并改进完成端口操作方法,比如采用 AcceptEx 来代替 accept 和使用 LOOKASIDE LIST 来管理内存,使得 WEBSERVER 的性能有了比较大的提高。 在重写的开始,我决定把完成端口模型封装成一个比较通用的 C++类,针对各种网络服务端程序的开发,只要简单 地继承这个类,改写其中两个虚拟函数就能满足各种需要。到昨天为止,WEBSERVER 重写完毕,我就写了这篇文章 对完成端口模型做一个总结,并介绍一下我的这个类。 一:完成端口模型 至于完成端口和 Winsock 完成端口模型的详细介绍,请参见我上面介绍的那几本书,这里只是我个人对完成端口模 型理解的一点心得。 首先我们要抽象出一个完成端口大概的处理流程: 1:创建一个完成端口。 2:创建一个线程 A。 3:A 线程循环调用 GetQueuedCompletionStatus()函数来得到 IO 操作结果,这个函数是个阻塞函数。 4:主线程循环里调用 accept 等待客户端连接上来。 5:主线程里 accept 返回新连接建立以后,把这个新的套接字句柄用 CreateIoCompletionPort 关联到完成端口, 然后发出一个异步的 WSASend 或者 WSARecv 调用,因为是异步函数,WSASend/WSARecv 会马上返回,实际的发送或 者接收数据的操作由 WINDOWS 系统去做。 6:主线程继续下一次循环,阻塞在 accept 这里等待客户端连接。 7:WINDOWS 系统完成 WSASend 或者 WSArecv 的操作,把结果发到完成端口。 8:A 线程里的 GetQueuedCompletionStatus()马上返回,并从完成端口取得刚完成的 WSASend/WSARecv 的结果。 9:在 A 线程里对这些数据进行处理(如果处理过程很耗时,需要新开线程处理),然后接着发出 WSASend/WSARecv, 并继续下一次循环阻塞在 GetQueuedCompletionStatus()这里。 具体的流程请看附图,其中红线表示是 WINDOWS 系统进行的处理,不需要我们程序干预。 归根到底概括完成端口模型一句话: 我们不停地发出异步的 WSASend/WSARecv IO 操作,具体的 IO 处理过程由 WINDOWS 系统完成,WINDOWS 系统完成实 际的 IO 处理后,把结果送到完成端口上(如果有多个 IO 都完成了,那么就在完成端口那里排成一个队列)。我们 在另外一个线程里从完成端口不断地取出 IO 操作结果,然后根据需要再发出 WSASend/WSARecv IO 操作。 二:提高完成端口效率的几种有效方法 1:使用 AcceptEx 代替 accept。 AcceptEx 函数是微软的 Winsosk 扩展函数,这个函数和 accept 的区别就是:accept 是阻塞的,一直要到有客户端连接上来后 accept 才返回,而 AcceptEx 是异步的,直接就返回了,所以我们利用 AcceptEx 可以发出多个 AcceptEx 调用 等待客户端连接。另外,如果我们可以预见到客户端一连接上来后就会发送数据(比如 WEBSERVER 的客户端浏览器), 那么可以随着 AcceptEx 投递一个 BUFFER 进去,这样连接一建立成功,就可以接收客户端发出的数据到 BUFFER 里, 这样使用的话,一次 AcceptEx 调用相当于 accpet 和 recv 的一次连续调用。同时,微软的几个扩展函数针对操作 系统优化过,效率优于 WINSOCK 的标准 API 函数。 2:在套接字上使用 SO_RCVBUF 和 SO_SNDBUF 选项来关闭系统缓冲区。这个方法见仁见智,详细的介绍可以参考 《WINDOWS 核心编程》第 9 章。这里不做详细介绍,我封装的类中也没有使用这个方法。 3:内存分配方法。因为每次为一个新建立的套接字都要动态分配一个“单 IO 数据”和“单句柄数据”的数据结构, 然后在套接字关闭的时候释放,这样如果有成千上万个客户频繁连接时候,会使得程序很多开销花费在内存分配和 释放上。这里我们可以使用 lookaside list。开始在微软的 platform sdk 里的 SAMPLE 里看到 lookaside list, 我一点不明白,MSDN 里有没有。后来还是在 DDK 的文档中找到了,, lookaside list A system-managed queue from which entries of a fixed size can be allocated and into which entries can be deallocated dynamically. Callers of the Ex(ecutive) Support lookaside list routines can use a lookaside list to manage any dynamically sized set of fixed-size buffers or structures with caller-determined contents. For example, the I/O Manager uses a lookaside for fast allocation and deallocation of IRPs and MDLs. As another example, some of the system-supplied SCSI class drivers use lookaside lists to allocate and release memory for SRBs. lookaside list 名字比较古怪(也许是我孤陋寡闻,第一次看到),其实就是一种内存管理方法,和内存池使用方 法类似。我个人的理解:就是一个单链表。每次要分配内存前,先查看这个链表是否为空,如果不为空,就从这个 链表中解下一个结点,则不需要新分配。如果为空,再动态分配。使用完成后,把这个数据结构不释放,而是把它 插入到链表中去,以便下一次使用。这样相比效率就高了很多。在我的程序中,我就使用了这种单链表来管理。 在我们使用 AcceptEx 并随着 AcceptEx 投递一个 BUFFER 后会带来一个副作用:比如某个客户端只执行一个 connect 操作,并不执行 send 操作,那么 AcceptEx 这个请求不会完成,相应的,我们用 GetQueuedCompletionStatus 在 完成端口中得不到操作结果,这样,如果有很多个这样的连接,对程序性能会造成巨大的影响,我们需要用一种方 法来定时检测,当某个连接已经建立并且连接时间超过我们规定的时间而且没有收发过数据,那么我们就把它关闭。 检测连接时间可以用 SO_CONNECT_TIME 来调用 getsockopt 得到。 还有一个值得注意的地方:就是我们不能一下子发出很多 AcceptEx 调用等待客户连接,这样对程序的性能有影响, 同时,在我们发出的 AcceptEx 调用耗尽的时候需要新增加 AcceptEx 调用,我们可以把 FD_ACCEPT 事件和一个 EVENT 关联起来,然后 用 WaitForSingleObject 等待这个 Event,当已经发出 AccpetEx 调用数目耗尽而又有新的客户端需要连接上来, FD_ACCEPT 事件将被触发,EVENT 变为已传信状态, WaitForSingleObject 返回,我们就重新发出足够的 AcceptEx 调用。 在类 CCPM 的构造函数调用过程中,完成了大部分的事情. 将 Init()和 SetEvent()分别在后面展开 CCPM_Server::CCPM_Server(unsigned short port,CDataQueue* pDataQueue ,CServerEventProcessor* pServerEventProcessor):\ m_Port(port),m_pInDataQueue(pDataQueue),\ m_pServerEventProcessor(pServerEventProcessor) { m_lAcceptCount = 0 ; for (int i=0; i< MAXTHREAD_COUNT; i++) { m_hThreadArray[i] = INVALID_HANDLE_VALUE; } m_ListenSocket = INVALID_SOCKET ; m_hAccept = INVALID_HANDLE_VALUE ; m_ConnManager.Set_DataQueue(m_pInDataQueue); m_ConnManager.Set_ServerEventProcessor(pServerEventProcessor); m_pConsumer = new CConsumer(m_pInDataQueue,pServerEventProcessor,DWORD(this)); m_bStopped = true ; m_pListenClient = NULL ; m_pAfterClientDisconnected = NULL ; m_pAfterClientCheckedIn = NULL ; m_pPacketProcess = NULL ; m_pPacketPreProcess = NULL ; if (false == Init()) { #ifdef _DEBUG TRACE("Init() failed!\n"); #endif return ; } if (false == SetEvent()) { #ifdef _DEBUG TRACE("SetEvent() failed!\n"); #endif return ; } } bool CCPM_Server::Init() /*++ 函数描述: 初始化,创建完成端口、创建完成端口线程,并调用类成员函数 InitWinsock 初始化 Winsock、 建立一个监听套接字 m_ListenSocket,并将此套接字同完成端口关联起来,获取 AcceptEx 指针。 Arguments: 无。 Return Value: 函数调用成功返回 TRUE,失败返回 FALSE。--*/ { bool bSuccess ; m_hCOP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0); if (NULL == m_hCOP) { #ifdef _DEBUG TRACE("CreateIoCompletionPort() failed: %d\n",GetLastError()); #endif return false; } // //取得系统中 CPU 的数目,创建和 CPU 数目相等的线程,如果事先估计到完成端口处理线程会堵塞, //可以考虑创建 SysInfo.dwNumberOfProcessors*2 个线程。一般在单处理器上创建和 CPU 数目相等//的线程 就可以了 // SYSTEM_INFO SysInfo; GetSystemInfo(&SysInfo); /* if (MAXTHREAD_COUNT < SysInfo.dwNumberOfProcessors) { SysInfo.dwNumberOfProcessors = MAXTHREAD_COUNT; }*/ DWORD dwID ; //for (int i=0; i<(int)SysInfo.dwNumberOfProcessors*2; i++) for (int i=0;i < MAXTHREAD_COUNT;i++) { m_hThreadArray[i] = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)CompletionPortRoutine, (LPVOID)this, 0, &dwID); #ifdef _DEBUG TRACE("completion port THREAD %x Created!\n",dwID); #endif if (NULL == m_hThreadArray[i]) { while (i>0) { CloseHandle(m_hThreadArray[i-1]); m_hThreadArray[i-1] = INVALID_HANDLE_VALUE; i--; }//end of while #ifdef _DEBUG TRACE ("CreateThread() failed: %d\n", GetLastError()) ; #endif CloseHandle(m_hCOP); m_hCOP = NULL ; return false; } else { //CloseHandle(m_hThreadArray[i]); // //std::cout << "thread " << i <<": " << dwID << " created!" << std::endl ; } }//end of for //CloseThreadHandles(); // //调用 InitWinsock 函数初始化 Winsock、建立一个监听套接字 m_ListenSocket, //并将此套接字同完成端口关联起来,获取 AcceptEx 指针。 // bSuccess = InitSocket(); if (!bSuccess) { // //给完成端口线程发送消息,指示线程退出。 // PostQueuedCompletionStatus(m_hCOP, 0, NULL, NULL); CloseHandle(m_hCOP); m_hCOP = NULL ; return false; } // //调用 BindAndListenSocket()绑定套接字并将套接字置于监听状态 // bSuccess = BindAndListenSocket(); if (!bSuccess) { PostQueuedCompletionStatus(m_hCOP, 0, NULL, NULL); CloseHandle(m_hCOP); m_hCOP = NULL ; return false; } return true; }//end of Init() bool CCPM_Server::InitSocket() /*++函数描述: 初始化 Winsock,创建一个监听套接字,获取 AcceptEx 函数指针,为监听套接字分配一个单句柄数据,并将监 听套接字与完成端口 hCOP 关联。 Arguments: 无。 Return Value: 函数调用成功返回 TRUE,失败返回 FALSE。--*/ { WSADATA wsd; int nResult = WSAStartup(MAKEWORD(2,2), &wsd); if (0 != nResult) { #ifdef _DEBUG TRACE("WSAStartup() failed!\n"); #endif return false; } if (m_ListenSocket == INVALID_SOCKET) m_ListenSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED); if (INVALID_SOCKET == m_ListenSocket) { #ifdef _DEBUG TRACE("WSASocket() failed:%d\n ", WSAGetLastError()); #endif WSACleanup(); return false; } struct linger linger1; linger1.l_onoff = 1; linger1.l_linger = 0 ; int nError = setsockopt(m_ListenSocket,SOL_SOCKET,SO_LINGER,(const char*)&linger1,sizeof(linger1)); if (nError == SOCKET_ERROR) { #ifdef _DEBUG TRACE("setsockopt linger error : %d\n! ",WSAGetLastError()); #endif closesocket(m_ListenSocket); m_ListenSocket = INVALID_SOCKET ; WSACleanup(); return false; } DWORD dwResult; // //获取微软 SOCKET 扩展函数指针 // GUID GUIDAcceptEx = WSAID_ACCEPTEX; nResult = WSAIoctl(m_ListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GUIDAcceptEx, sizeof(GUID), &lpAcceptEx, sizeof(lpAcceptEx), &dwResult, NULL, NULL ); if (SOCKET_ERROR == nResult) { #ifdef _DEBUG TRACE("Get AcceptEx failed: %d\n", WSAGetLastError()); #endif closesocket(m_ListenSocket); m_ListenSocket = INVALID_SOCKET ; WSACleanup(); return false; } // GUID GUIDTransmitFile = WSAID_TRANSMITFILE; nResult = WSAIoctl(m_ListenSocket, SIO_GET_EXTENSION_FUNCTION_POINTER, &GUIDTransmitFile, sizeof(GUID), &lpTransmitFile, sizeof(lpTransmitFile), &dwResult, NULL, NULL ); if (SOCKET_ERROR == nResult) { #ifdef _DEBUG TRACE("Get TransmitFile failed: %d\n",WSAGetLastError()); #endif closesocket(m_ListenSocket); m_ListenSocket = INVALID_SOCKET ; WSACleanup(); return false; } // //为监听套接字分配一个客户对象作为单句柄数据 // if(m_pListenClient == NULL) m_pListenClient=new CSock_Stream; m_pListenClient->set_socket(m_ListenSocket); if (NULL == m_pListenClient) { closesocket(m_ListenSocket); m_ListenSocket = INVALID_SOCKET ; WSACleanup(); #ifdef _DEBUG TRACE("InitSocket failed!\n ") ; #endif return false; } // //将监听套接字 m_ListenSocket 和已经建立的完成端口关联起来 // HANDLE hrc = CreateIoCompletionPort( (HANDLE)m_ListenSocket, m_hCOP, (ULONG_PTR)m_pListenClient, 0 ); if (NULL == hrc) { closesocket(m_ListenSocket); m_ListenSocket = INVALID_SOCKET ; WSACleanup(); #ifdef _DEBUG TRACE("CreateIoCompletionPort failed:%d\n ", GetLastError()); #endif return false; } return true; } bool CCPM_Server::BindAndListenSocket() /*++函数描述: private 函数,供 Init 调用。 将监听套接字 m_ListenSocket 绑定到本地 IP 地址,并置于监听模式。 Arguments: 无。 Return Value: 函数调用成功返回 TRUE,失败返回 FALSE。--*/ { SOCKADDR_IN InternetAddr; memset(&InternetAddr,0,sizeof(InternetAddr)); InternetAddr.sin_family = AF_INET; // InternetAddr.sin_addr.s_addr = htonl(ADDR_ANY); InternetAddr.sin_port = htons(m_Port); int nResult = bind(m_ListenSocket, (PSOCKADDR)&InternetAddr, sizeof(InternetAddr)); if (SOCKET_ERROR == nResult) { WSACleanup(); closesocket(m_ListenSocket); #ifdef _DEBUG TRACE("bind() failed: %d\n",WSAGetLastError()); #endif return false; } nResult = listen(m_ListenSocket, DEFAULT_BACKLOG); if (SOCKET_ERROR == nResult) { WSACleanup(); closesocket(m_ListenSocket); #ifdef _DEBUG TRACE("listen() failed:%d\n ",WSAGetLastError()); #endif return false; } return true; } bool CCPM_Server::SetEvent() /*++将 FD_ACCEPT 事件注册到 m_hEvent,这样当可用 AcceptEx 调用被耗尽的时候,就会触发 FD_ACCEPT 事件,然后 DoAccept 里的 WaitForSingleObject 就会成功返回,导致 PostAcceptEx 被调用。 Arguments: 无。 Return Value:函数调用成功返回 TRUE,失败返回 FALSE。 --*/ { m_hEvent = CreateEvent(NULL, FALSE, FALSE, NULL); if (NULL == m_hEvent) { m_hEvent = NULL ; PostQueuedCompletionStatus(m_hCOP, 0, NULL, NULL); CloseHandle(m_hCOP); m_hCOP = NULL ; #ifdef _DEBUG TRACE("CreateEvent() failed: %d\n",GetLastError()); #endif return false; } int nResult = WSAEventSelect(m_ListenSocket, m_hEvent, FD_ACCEPT); if (SOCKET_ERROR == nResult) { PostQueuedCompletionStatus(m_hCOP, 0, NULL, NULL); CloseHandle(m_hEvent); m_hEvent = NULL ; #ifdef _DEBUG TRACE("WSAEventSeclet() failed: %d\n",WSAGetLastError()); #endif return false; } return true; } 这个函数是核心服务器的主要起始函数,当按下”start”按钮时最终就会调用到这个函数,函数在初始化 CTS_TcpServer(3333,10,this);时调用了 CCPM_Server::CCPM_Server()从而完成了网络初始化和 FD_ACCEPT 事件 设置等等大部分的工作 void CSimulator::Start() { TRY_START Stop(); //就是因为这个函数的调用,使得//CCPM_Server::CCPM_Server()被调用 m_pNetServer = new CTS_TcpServer(3333, 10, this); m_pNetServer->Set_AfterClientCheckedIn(::AfterClientCheckIn); m_pNetServer->Set_AfterClientDisconnected(::AfterClientDisconnected); m_pNetServer->Set_PacketPreProcess(::PreProcessPacket); m_pNetServer->Set_PacketProcess(::ProcessPacket); m_pNetServer->Start(); //开启主线程,其中主要是 AccpetEx //??? for debug char datebuf[128], timebuf[128]; _tzset(); /* Display operating system-style date and time. */ _strdate(datebuf); _strtime(timebuf); CErrorLog::Instance()->WriteLog("Start the net server, can receive/send data packets, start at %s %s\n", datebuf, timebuf); //??? for debug m_pCalculateThread = new CCalculateThread(m_pNetServer); m_pCalculateThread->Start(); TRYEND_NORET } CTS_TcpServer::CTS_TcpServer(unsigned short uPort ,int iQueueSize,CserverEventProcessor *pServerEventProcessor):CCPM_Server(uPort,iQueueSize,pServerEventProcessor) { m_pOutDataQueue = new CDataQueue(GET_RUNTIME_BUFFER(CBuffer),10); m_pSendThread = new CSendThread(this,m_pOutDataQueue); m_pSendThread->Start(); } void CCPM_Server::Start() { m_bStopped = false ; m_pConsumer->Start(); // if(m_hAccept == INVALID_HANDLE_VALUE) { DWORD dwThreadId ; m_hAccept = CreateThread( NULL,0,(LPTHREAD_START_ROUTINE)AcceptThreadRoutine,this,THREAD_PRIORITY_NORMAL,&dwThreadId); #ifdef _DEBUG TRACE("ACCEPT THREAD %x Created!\n",dwThreadId); #endif } if (INVALID_HANDLE_VALUE == m_hAccept) { m_bStopped = true ; #ifdef _DEBUG TRACE("Create Accept Thread failed!\n"); #endif return ; } return ; } AcceptThreadRoutine 的实现 DWORD CCPM_Server::AcceptThreadRoutine(LPVOID param) { CCPM_Server* pThis = (CCPM_Server*) param ; return pThis->DoAccept(param); } DoAccept 的实现 DWORD CCPM_Server::DoAccept(LPVOID param) /*++Fucntion Description: 主线程循环,用 WaitForSigleObject 等待 m_hEvent,已经发出的 AcceptEx()调用耗尽,FD_ACCEPT 事件将被触发,WaitForSigleObject 成功返回,然后调用 MakeAcceptEx()来发出 AcceptEx()调用。 WaitForSigleObject 每次等待 10 秒,超时返回后,对系统中已经建立成功了的并且还没有收发过数据的 SOCKET 连接进行检测,如果某个连接已经建立了 30 秒,并且还没收发过数据,则强制关闭。 Arguments: 无。 Return Value: 函数调用成功返回 0,调用失败返回-1;--*/ { DWORD dwResult; int nCounter = 0; #ifdef _DEBUG int nTimeOut = 0; #endif while (!m_bStopped) { dwResult = WaitForSingleObject(m_hEvent, 1000); if (WAIT_FAILED == dwResult) { PostQueuedCompletionStatus(m_hCOP, 0, NULL, NULL); CloseHandle(m_hCOP); m_hCOP = NULL ; //ADDED ON 2006-05-19 CloseHandle(m_hEvent); m_hEvent = NULL ; char szError[255]; sprintf(szError,"WSAWaitForSingleEvents() failed: %d\n",WSAGetLastError() ); ::MessageBox(NULL,szError,"error",MB_OK|MB_ICONERROR); return -1; } if (WAIT_TIMEOUT == dwResult) { nCounter++; } else { if (WAIT_TIMEOUT != dwResult) { if (false == MakeAcceptEx()) { PostQueuedCompletionStatus(m_hCOP, 0, NULL, NULL); CloseHandle(m_hCOP); m_hCOP = NULL ; CloseHandle(m_hEvent); m_hEvent = NULL ; #ifdef _DEBUG TRACE("makeaccept error!\n"); #endif return -1; } } } } return 0; } MakeAcceptEx() 的实现 bool CCPM_Server::MakeAcceptEx() /*++Fucntion Description: 连续发出 AcceptEx 调用。 Arguments: Return Value:函数调用成功返回 TRUE,失败返回 FALSE。--*/ { int nZero = 0; while (m_lAcceptCount < DEFAULT_ACCEPT_SIZE) { SOCKET AcceptSocket = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, \ NULL, 0, WSA_FLAG_OVERLAPPED); if (INVALID_SOCKET == AcceptSocket) { char szError[255]; sprintf(szError,"WSASocket failed: %d\n",WSAGetLastError() ); ::MessageBox(NULL,szError,"error",MB_OK|MB_ICONERROR); return false; } CSock_Stream* pClient = m_ConnManager.CreateClient(AcceptSocket); //pClient->Set_DataQueue(m_pInDataQueue); if (NULL == pClient) { char szError[255]; sprintf(szError,"sCreateClient failed: %d\n" ); ::MessageBox(NULL,szError,"error",MB_OK|MB_ICONERROR); closesocket(AcceptSocket); return false; } DWORD dwBytes; BOOL bSuccess = lpAcceptEx( m_ListenSocket, AcceptSocket, pClient->m_ovRecv.szBuffer, pClient->m_ovRecv.wsaBuffer.len - ((sizeof(SOCKADDR_IN) + 16) * 2), sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes, &(pClient->m_ovRecv.ol)); if (FALSE == bSuccess) { int nResult = WSAGetLastError(); if (nResult != ERROR_IO_PENDING) { char szError[255]; sprintf(szError,"AcceptEx() failed : %d\n" ); ::MessageBox(NULL,szError,"error",MB_OK|MB_ICONERROR); closesocket(AcceptSocket); m_ConnManager.ReleaseClient(pClient); return false; } InterlockedExchangeAdd(&m_lAcceptCount, 1); } } InterlockedExchangeAdd(&m_lAcceptCount,-(DEFAULT_ACCEPT_SIZE)); return true; }//end of PostAccetExRoutine() 完成端口处理线程的回调函数 DWORD CCPM_Server::CompletionPortRoutine(LPVOID param)/*++函数描述: 完成端口处理线程,循环调用 GetQueuedCompletionStatus 来获取 IO 操作结果。 Arguments: Return Value: 线程退出代码。--*/ { CCPM_Server* pThis = (CCPM_Server*)param; DWORD dwNumberBytes; PPER_HANDLE_CONTEXT lpCompletionKey = NULL; LPWSAOVERLAPPED lpOverlapped = NULL; int nResult; BOOL bSuccess; CSock_Stream* pClient = NULL; while (true) { bSuccess = GetQueuedCompletionStatus( pThis->m_hCOP, &dwNumberBytes, (PULONG_PTR )&lpCompletionKey, &lpOverlapped, //通过这个重叠 IO 获取到完成端 //口上的数据 INFINITE ); if (NULL == lpCompletionKey) { pThis->m_ConnManager.ReleaseAllActiveClients(); #ifdef _DEBUG TRACE("COMPLETION PORT WORKER THREAD %x EXIT!\n",GetCurrentThreadId()); #endif return 0; } if (FALSE == bSuccess) { #ifdef _DEBUG TRACE("GetQueuedCompletionStatus() failed: %d\n ", GetLastError()); #endif pClient = (CSock_Stream*)lpCompletionKey ; pThis->m_ConnManager.ReleaseClient(pClient); continue; } POVERLAPPED_PLUS lp = (POVERLAPPED_PLUS) lpOverlapped; //获取到了完成端口 I/O 完成队列中的出队数据 后面的内容就是根据你的程序的具体要实现的目录,对这个重叠 IO 的数据进行相应的处理. pClient = (CSock_Stream*)lp->dwClient ; if(IoAccept != lp->IoOperation) { if((!bSuccess) || (bSuccess && (0 == dwNumberBytes))) { pThis->m_ConnManager.ReleaseClient(pClient); continue ; } } //*/ if(IoAccept == lp->IoOperation )//建立链接请求 { nResult = setsockopt( pClient->get_socket(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&pThis->m_ListenSocket, sizeof(pThis->m_ListenSocket)); if (SOCKET_ERROR == nResult) { pThis->m_ConnManager.ReleaseClient(pClient); #ifdef _DEBUG TRACE("SO_UPDATE_ACCEPT_CONTEXT failed to update accept socket!\n" ); #endif continue ; } //取本地和客户端地址 LPFN_GETACCEPTEXSOCKADDRS lpGetAcceptExSockAddr; GUID GUIDGetAcceptExSockAddrs = WSAID_GETACCEPTEXSOCKADDRS ; DWORD dwResult ; int nResult = WSAIoctl( pClient->get_socket(), SIO_GET_EXTENSION_FUNCTION_POINTER, &GUIDGetAcceptExSockAddrs, sizeof(GUID), &lpGetAcceptExSockAddr, sizeof(lpGetAcceptExSockAddr), &dwResult, NULL, NULL ); // int nLoal,nRemote ; DWORD dwAddrLen = sizeof(SOCKADDR_IN) + 16 ; SOCKADDR_IN Addr ; sockaddr* pLocal_Addr; sockaddr* pRemote_Addr ; lpGetAcceptExSockAddr(lp->wsaBuffer.buf, lp->wsaBuffer.len - 2*dwAddrLen, dwAddrLen, dwAddrLen, &pLocal_Addr, &nLoal, &pRemote_Addr, &nRemote); memcpy(&Addr,(const void*)pLocal_Addr,sizeof(sockaddr)); #ifdef _DEBUG TRACE("LOACAL ADDR %s \n",inet_ntoa(Addr.sin_addr)); #endif pClient->SetLocalAddr(Addr); memcpy(&Addr,(const void*)pRemote_Addr,sizeof(sockaddr)); #ifdef _DEBUG TRACE("REMOTE ADDR %s \n",inet_ntoa(Addr.sin_addr)); #endif pClient->SetRemoteAddr(Addr); BOOL bKeepAlive = TRUE ; nResult = setsockopt(pClient->get_socket(), SOL_SOCKET, SO_KEEPALIVE, (const char*)&bKeepAlive, sizeof(bKeepAlive) ); if (SOCKET_ERROR == nResult) { pThis->m_ConnManager.ReleaseClient(pClient); #ifdef _DEBUG TRACE("SO_KEEPALIVE failed to update accept socket!\n" ); #endif continue ; } HANDLE hResult ; hResult = CreateIoCompletionPort( (HANDLE)pClient->get_socket(),pThis->m_hCOP,(ULONG_PTR)pClient,0); if( NULL == hResult) { #ifdef _DEBUG TRACE("CreateIoCompletionPort() failed: %d\n ",GetLastError()); #endif pThis->m_ConnManager.ReleaseClient(pClient); continue ; } } //*/ if(lp->IoOperation == IoAccept || lp->IoOperation == IoRead) { if(dwNumberBytes > 0) { //连接后收到的第一个数据包为 CHECKIN 包 if (pClient->m_siteID.eSiteType == SITE_UNKNOWN) { CSerial_Base* pkt = CPacketFactory::CreatePacketFromBuffer(lp->wsaBuffer.buf + sizeof(int) + sizeof(ST_CLIENT_ID)); if (pkt && strcmp(pkt->GetPacketClassInfo()->m_lpszPacketName,"CCheckInPacket") == 0 ) { CCheckInPacket* chk = (CCheckInPacket*)pkt; // // // ST_CLIENT_ID siteID; siteID.eSiteType = chk->m_eSiteType ; siteID.iSiteID = chk->m_iSiteID ; if (true == pThis->m_ConnManager.BindClient(pClient,siteID)) { pClient->Read(lp->wsaBuffer.buf,dwNumberBytes); // if (pThis->m_pServerEventProcessor != NULL) // { // pThis->m_pServerEventProcessor->AfterClientCheckIn(siteID,DWORD(pThis)); // } if(pThis->m_pAfterClientCheckedIn) pThis->m_pAfterClientCheckedIn(pThis->m_dwParam,siteID); } else { //席位绑定失败则释放连接 pThis->m_ConnManager.ReleaseClient(pClient); continue ; } } } else { lp->dwNumberBytes += dwNumberBytes ; pClient->Read(lp->wsaBuffer.buf,dwNumberBytes); } //发出新的接收请求 if (false == pClient->IssueRead()) { pThis->m_ConnManager.ReleaseClient(pClient); continue ; } } } //发送数据 else { //do nothing } } return 0; }//end of CompletionRoutine()
还剩36页未读

继续阅读

下载pdf到电脑,查找使用更方便

pdf的实际排版效果,会与网站的显示效果略有不同!!

需要 5 金币 [ 分享pdf获得金币 ] 0 人已下载

下载pdf

pdf贡献者

btymaster

贡献于2011-11-27

下载需要 5 金币 [金币充值 ]
亲,您也可以通过 分享原创pdf 来获得金币奖励!
下载pdf