2023年6月22日发(作者:)
IO Completion Port, Windows上提供的最有效实现高性能server的方式(无论是file
server, web server还是别的任何类似大量并发io请求的server),IIS本身就是基于此的。可惜,到目前为止没有一个真正简单的示例。今日便让我打响这第一炮吧。
没有一个简明例程的根源,可以说是因为IoCompletionPort本身的API设计非常糟糕,一个CreateIoCompletionPort包含了太多功能,名称又很confusing,让人云里雾里。所有的的例程,为了便于理解,都把这些让人迷惑的API封装,构造自己的class,但是呢,这样虽然从软件设计角度来说清晰了,但是对于了解IoCompletionPort的使用来说,反而更迷惑了(因为调用被分散到各个class中)。
本文的目的是用最简明的例子来介绍如何使用IO Completion Port。
在此之前,先要说IO Completion Port到底是什么东西-----就是threads pool,一个由Windows自动管理的threads pool. 好,你就需要了解这么多,再说多了就违背了本文的宗旨---提供简明例程。
1. IO Completion Port的程序,大致上可以划分为以下步骤:
2. CreateIOCompletionPort (可以理解为初始化threads pool)
3. 建立threads (就是一般的CreateThread或者_beginthreadex,将第一步所得到的HANDLE作为参数传进去,这个跟一般的thread没任何差别)
4. 开始IO 操作,比如建立SOCKET,
5. 在第一个Async IO之前,将上一步建立的HANDLE(比如socket)绑定到第一步得到的IO Completion Port的HANDLE 上
6. 根据具体情况操作IO
好吧,还是用代码来看比较直接:
先来看主程序:
view plaincopy to clipboardprint?
1. int _tmain(int argc, _TCHAR* argv[])
2. {
3.
4. // argv[1]为ip, argv[2]为port
5. // CTcpServer只是一个对socket的简单封装,代码
6. // 后面给出
7. CTcpServer server(argv[1], argv[2]);
8. if (!istening())
9. {
10. printf("failed listeningn");
11. return 1; 12. }
13.
14. // initialize IoCompletionPort
15. SOCKET& listeningSocket = ();
16. //1. 初始化IO Completion Port
17. HANDLE hIocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);
18.
19. if(hIocp == NULL)
20. {
21. printf("cannot create io completion portn");
22. return 1;
23. }
24.
25. //2. 绑定socket到completion port
26. //这意思是说,后面所有在这个socket上的async
27. // io 操作,都会notify这个completion port的
28. //threads
29. if (0 == ::CreateIoCompletionPort((HANDLE)listeningSocket, hIocp,
(ULONG_PTR)0, 0))
30. {
31. printf("cannot associate listening socket to io completion portn");
32. return 1;
33. }
34.
35. //3. 创建threads,将io completion port
36. // HANDLE 作为参数传入,这样每个thread
37. // 都能在有IO请求时query其status,参见
38. // thread的具体代码
39. int threadPoolSize = 4;
40.
41. HANDLE hWorker;
42.
43. for(int i=0;i 44. { 45. hWorker = CreateThread(NULL, 0, WorkerThreadFunc, (LPVOID)hIocp, 0, NULL); 46. CloseHandle(hWorker); 47. } 48. 49. //4. 等待新连接,因为我们不要busy loop, 所以 50. // 每个TCP连接需要等待并检查 51. // FD_ACCEPT event,然后用AcceptEx 52. while(true) 53. { 54. printf("waiting for connectionn"); 55. if (rAcceptEvent(10000)) 56. { 57. // 只管Accept了,至于Recv/Send由上面建立的 58. // thread来负责,后面会说thread的功能 59. NewConnection(); 60. } 61. } 62. 63. return 1; 64. } 然后先来看thread的实现. IO Completion Port的 thread因为是放入一个thread pool中,所以每个thread是“通用”的,换句话说,每个thread要能够完成多种功能,用伪代码来说是这样: Wait For IO Notification; ---> 等待比如Socket上的一个event,至于是什么event先不管。无妨想象成interrupt,也比较类似WaitForSingleObject,总之thread在这时候是sleep的 Check IO status Operation; ----> 检查IO状态,更关键是看到底是什么Event switch () { case ACCEPT: ... case READ: ... case WRITE: ... case WHATEVER: ... } 所以要清楚,io completion port的thread并不是去给每个read或者write建一个thread(也不是不可以,不过就是画蛇添足多此一举),而是依靠自定义的Overlapped结构来判断到底对IO进行什么操作。还是看下面的源代码吧。 view plaincopy to clipboardprint? 1. DWORD WINAPI WorkerThreadFunc(LPVOID lpParam) 2. { 3. ULONG_PTR *PerHandleKey; 4. WSAOVERLAPPED *pOverlap; 5. 6. OVERLAPPEDPLUS *pOverlapPlus, 7. *newolp; 8. DWORD dwBytesXfered; 9. 10. int ret; 11. 12. HANDLE hIocp = (HANDLE)lpParam; 13. 14. while (true) 15. { 16. //这里查询IO 状态 17. ret = GetQueuedCompletionStatus( 18. hIocp, 19. &dwBytesXfered, 20. (PULONG_PTR)&PerHandleKey, 21. &pOverlap, 22. INFINITE); 23. if (ret == 0) 24. { 25. // Operation failed 26. printf("cannot get queued completion statusn"); 27. continue; 28. } 29. 30. //OVERLAPPEDPLUS是我们自己定义的data structure,一般把正常的 31. //OVERLAPPED作为第一个field,所以指针是指向同样地址,后面会给出具 32. //体定义. CONTAINING_RECORD是一个标准win32 macro 33. 34. pOverlapPlus = CONTAINING_RECORD(pOverlap, OVERLAPPEDPLUS, overlapped); 35. 36. // OP_ACCEPT也是我们自己定义的 value,只是一个标识: 37. // #define OP_ACCEPT 1 38. 39. switch (pOverlapPlus->OpCode) 40. { 41. case OP_ACCEPT: 42. printf("acceptedn"); 43. //根据OVERLAPPEDPLUS的定义,一般应该有专门 44. //function来释放,不过这里就简单一下 45. free(pOverlapPlus); 46. break; 47. } 48. } 49. } 上面的OVERLAPPEDPLUS是一个很重要的自定义结构,可以把你要的东西全部放里面:D view plaincopy to clipboardprint? 1. #pragma once 2. 3. #include 4. #include 5. #include 6. #include 7. #include 8. #include 9. #include 10. #include 11. #include 12. #include 13. #include 14. #include 15. 16. 17. #define DATA_BUFSIZE 4096 18. 19. typedef struct _OVERLAPPEDPLUS { 20. WSAOVERLAPPED overlapped; 21. SOCKET serverSock; 22. SOCKET clientSock; 23. int OpCode; 24. WSABUF wbuf; 25. DWORD Bytes; 26. DWORD Flags; 27. // other useful information 28. } OVERLAPPEDPLUS; 29. 30. #define OP_READ 0 31. #define OP_WRITE 1 32. #define OP_ACCEPT 2 好,最后就是开始提到的CTcpServer这个对socket的封装,这个类并不复杂,除开封装socket之外(无非就是socket(...),bind,listen),最重要的是检查FD_ACCEPT event,然后调用AcceptEx(如果你去看MSDN,会发现AcceptEx的示例并没有用AcceptEx,汗...),当然,你可以用类似:while(true) { AcceptEx(...);}, 但是这种busy loop显然是极其恶劣的。 先来看.h view plaincopy to clipboardprint? 1. #pragma once 2. #include "common.h" 3. #include 4. using namespace std; 5. 6. class CTcpServer 7. { 8. WSADATA wsd; 9. SOCKET m_ListeningSocket; 10. ADDRINFOW* m_pAddrInfo; 11. HANDLE m_AcceptEvent; 12. 13. public: 14. CTcpServer(PCWSTR pIPAddress, PCWSTR port); 15. 16. ~CTcpServer(); 17. 18. SOCKET& Socket() 19. { 20. return m_ListeningSocket; 21. } 22. 23. bool StartListening(); 24. 25. int AddressFamily() 26. { 27. return m_pAddrInfo->ai_family; 28. } 29. 30. int SocketType() 31. { 32. return m_pAddrInfo->ai_socktype; 33. } 34. 35. int Protocol() 36. { 37. return m_pAddrInfo->ai_protocol; 38. } 39. 40. BOOL WaitForAcceptEvent(DWORD timeout); 41. 42. BOOL AcceptNewConnection(); 43. 44. }; 没什么稀奇的,如我所说,关键在于WaitForAcceptEvent()上,下面的代码是具体实现: view plaincopy to clipboardprint? 1. #include "stdafx.h" 2. 3. #include "TcpServer.h" 4. #include "common.h" 5. 6. //初始化socket,稍微改改可以支持ipv6,先不管它 7. CTcpServer::CTcpServer(PCWSTR pAddress, PCWSTR port) 8. :m_ListeningSocket(INVALID_SOCKET), m_pAddrInfo(NULL), m_AcceptEvent(NULL) 9. { 10. int rc = WSAStartup(MAKEWORD(2, 2), &wsd); 11. if (rc != 0) { 12. wprintf(L"WSAStartup failedn"); 13. throw new exception(); 14. } 15. 16. // Initialize the hints to retrieve the server address for IPv4 17. ADDRINFOW *result = NULL, 18. *ptr = NULL, 19. hints = {0}; 20. 21. _family = AF_INET; 22. _socktype = SOCK_STREAM; 23. _protocol = IPPROTO_TCP; 24. 25. rc =::GetAddrInfoW(pAddress, port, &hints, &m_pAddrInfo); 26. if (rc != 0) { 27. printf("getaddrinfo failed: %dn", rc ); 28. throw new exception(); 29. } 30. 31. m_ListeningSocket = WSASocket(AF_INET, 32. SOCK_STREAM, 33. IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); 34. 35. } 36. 37. // dtor负责cleanup,没什么特别 38. CTcpServer::~CTcpServer() 39. { 40. if (m_ListeningSocket != INVALID_SOCKET) 41. { 42. closesocket(m_ListeningSocket); 43. } 44. 45. if (m_AcceptEvent != NULL && m_AcceptEvent != INVALID_HANDLE_VALUE) 46. { 47. ::CloseHandle(m_AcceptEvent); 48. } 49. 50. WSACleanup(); 51. } 52. 53. // bind和listen,然后构造FD_ACCEPT event 54. bool CTcpServer::StartListening() 55. { 56. if (bind(m_ListeningSocket, m_pAddrInfo->ai_addr, m_pAddrInfo->ai_addrlen) == SOCKET_ERROR) 57. { 58. return false; 59. } 60. 61. if (listen(m_ListeningSocket, 1 ) == SOCKET_ERROR) 62. { 63. printf("failed listening"); 64. return false; 65. } 66. 67. //这个很重要,先创建event,然后用WSAEventSelect表示我们只对 68. // FD_ACCEPT感兴趣 69. m_AcceptEvent = WSACreateEvent(); 70. if(SOCKET_ERROR == WSAEventSelect(m_ListeningSocket, m_AcceptEvent, FD_ACCEPT)) 71. { 72. printf("WSAEventSelect failed: %dn", WSAGetLastError()); 73. return false; 74. } 75. 76. return true; 77. } 78. 79. // 等待FD_ACCEPT event 80. // 注意,跟普通event不一样,我们无需用ResetEvent()/SetEvent() 81. // 这些API,这个类似于Pulse 82. BOOL CTcpServer::WaitForAcceptEvent(DWORD timeout) 83. { 84. DWORD ret = WSAWaitForMultipleEvents(1, &m_AcceptEvent, FALSE, timeout, FALSE); 85. if (WSA_WAIT_TIMEOUT == ret || WSA_WAIT_FAILED == ret) 86. { 87. printf("wait for accept failedn"); 88. return false; 89. } 90. // 收到event 91. printf("test accept eventn"); 92. 93. WSANETWORKEVENTS events; 94. 95. // 检查该event是否在正确socket上 96. int nRet = WSAEnumNetworkEvents(m_ListeningSocket, m_AcceptEvent, &events); 97. 98. if (nRet == SOCKET_ERROR) 99. { 100. printf("error when enumerate network eventsn"); 101. return false; 102. } 103. 104. // 确认是FD_ACCEPT event 105. if (rkEvents & FD_ACCEPT) 106. { 107. printf("accept event foundn"); 108. return true; 109. } 110. 111. return false; 112. } 113. 114. // 接受新连接请求 115. BOOL CTcpServer::AcceptNewConnection() 116. { 117. DWORD dwBytes; 118. 119. // OVERLAPPEDPLUS的初始化比较繁琐,所以专门用一个function 120. OVERLAPPEDPLUS* pOverlapPlus = ::CreateOverlappedPlus(); 121. 122. pOverlapPlus->serverSock = m_ListeningSocket; 123. pOverlapPlus->OpCode = OP_ACCEPT; 124. 125. pOverlapPlus->clientSock = socket(m_pAddrInfo->ai_family, m_pAddrInfo->ai_socktype, m_pAddrInfo->ai_protocol); 126. 127. // 注意 AcceptEx是立即返回的,而且一般都是FALSE,如果要确认 128. // 是否出错,需要再调用WSAGetLastError()看是否是IO_PENDING,如 129. // 果是pending就没关系 130. // 另外, AcceptEx把client address放在接收buffer的最后,所以下 131. // 面会看到有一些"magic number" 132. return AcceptEx(m_ListeningSocket, pOverlapPlus->clientSock, pOverlapPlus->, pOverlapPlus-> - ((sizeof(sockaddr_in) + 16) * 2), sizeof(sockaddr_in) + 16,sizeof(sockaddr_in) + 16, &dwBytes, &pOverlapPlus->overlapped); 133. } 134. 135. //创建OVERLAPPEDPLUS 136. // 其实还有个Free的function,也就是多free一下mbuf,就 137. //不写了:D 138. OVERLAPPEDPLUS* CreateOverlappedPlus() 139. { 140. OVERLAPPEDPLUS* pOverlapPlus = (OVERLAPPEDPLUS *)malloc(sizeof(OVERLAPPEDPLUS)); 141. memset(pOverlapPlus, 0, sizeof(OVERLAPPEDPLUS)); 142. 143. char* pBuffer = (char*)malloc(DATA_BUFSIZE); 144. pOverlapPlus-> = pBuffer; 145. pOverlapPlus-> = DATA_BUFSIZE; 146. 147. return pOverlapPlus; 148. } 差不多就酱紫。另外说明在thread中,收到的Accept是带了第一个TCP payload的,所以如果你在AcceptEx之后去WSARecvFrom,是收不到东西的(因为已经收到了, 注意AcceptEx用pOverlapPlus-> 接收数据) 偶认为本人这篇是世界上最清晰易懂的IO Completion 教程:D 另外还有个BindIoCompletionPort,似乎要方便点,没try过...反正我这个是标准实现,呵呵。
发布者:admin,转转请注明出处:http://www.yc00.com/news/1687430307a9428.html
评论列表(0条)