IOCP——精选推荐

IOCP——精选推荐

2023年6月22日发(作者:)

IO Completion Port, Windows上提供的最有效实现高性能server的方式(无论是file

server, web server还是别的任何类似大量并发io请求的server),IIS本身就是基于此的。可惜,到目前为止没有一个真正简单的示例。今日便让我打响这第一炮吧。

没有一个简明例程的根源,可以说是因为IoCompletionPort本身的API设计非常糟糕,一个CreateIoCompletionPort包含了太多功能,名称又很confusing,让人云里雾里。所有的的例程,为了便于理解,都把这些让人迷惑的API封装,构造自己的class,但是呢,这样虽然从软件设计角度来说清晰了,但是对于了解IoCompletionPort的使用来说,反而更迷惑了(因为调用被分散到各个class中)。

本文的目的是用最简明的例子来介绍如何使用IO Completion Port。

在此之前,先要说IO Completion Port到底是什么东西-----就是threads pool,一个由Windows自动管理的threads pool. 好,你就需要了解这么多,再说多了就违背了本文的宗旨---提供简明例程。

1. IO Completion Port的程序,大致上可以划分为以下步骤:

2. CreateIOCompletionPort (可以理解为初始化threads pool)

3. 建立threads (就是一般的CreateThread或者_beginthreadex,将第一步所得到的HANDLE作为参数传进去,这个跟一般的thread没任何差别)

4. 开始IO 操作,比如建立SOCKET,

5. 在第一个Async IO之前,将上一步建立的HANDLE(比如socket)绑定到第一步得到的IO Completion Port的HANDLE 上

6. 根据具体情况操作IO

好吧,还是用代码来看比较直接:

先来看主程序:

view plaincopy to clipboardprint?

1. int _tmain(int argc, _TCHAR* argv[])

2. {

3.

4. // argv[1]为ip, argv[2]为port

5. // CTcpServer只是一个对socket的简单封装,代码

6. // 后面给出

7. CTcpServer server(argv[1], argv[2]);

8. if (!istening())

9. {

10. printf("failed listeningn");

11. return 1; 12. }

13.

14. // initialize IoCompletionPort

15. SOCKET& listeningSocket = ();

16. //1. 初始化IO Completion Port

17. HANDLE hIocp = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 0);

18.

19. if(hIocp == NULL)

20. {

21. printf("cannot create io completion portn");

22. return 1;

23. }

24.

25. //2. 绑定socket到completion port

26. //这意思是说,后面所有在这个socket上的async

27. // io 操作,都会notify这个completion port的

28. //threads

29. if (0 == ::CreateIoCompletionPort((HANDLE)listeningSocket, hIocp,

(ULONG_PTR)0, 0))

30. {

31. printf("cannot associate listening socket to io completion portn");

32. return 1;

33. }

34.

35. //3. 创建threads,将io completion port

36. // HANDLE 作为参数传入,这样每个thread

37. // 都能在有IO请求时query其status,参见

38. // thread的具体代码

39. int threadPoolSize = 4;

40.

41. HANDLE hWorker;

42.

43. for(int i=0;i

44. {

45. hWorker = CreateThread(NULL, 0, WorkerThreadFunc, (LPVOID)hIocp, 0, NULL);

46. CloseHandle(hWorker);

47. }

48.

49. //4. 等待新连接,因为我们不要busy loop, 所以

50. // 每个TCP连接需要等待并检查

51. // FD_ACCEPT event,然后用AcceptEx 52. while(true)

53. {

54. printf("waiting for connectionn");

55. if (rAcceptEvent(10000))

56. {

57. // 只管Accept了,至于Recv/Send由上面建立的

58. // thread来负责,后面会说thread的功能

59. NewConnection();

60. }

61. }

62.

63. return 1;

64. }

然后先来看thread的实现. IO Completion Port的 thread因为是放入一个thread pool中,所以每个thread是“通用”的,换句话说,每个thread要能够完成多种功能,用伪代码来说是这样:

Wait For IO Notification; ---> 等待比如Socket上的一个event,至于是什么event先不管。无妨想象成interrupt,也比较类似WaitForSingleObject,总之thread在这时候是sleep的

Check IO status Operation; ----> 检查IO状态,更关键是看到底是什么Event

switch ()

{

case ACCEPT: ...

case READ: ...

case WRITE: ...

case WHATEVER: ...

}

所以要清楚,io completion port的thread并不是去给每个read或者write建一个thread(也不是不可以,不过就是画蛇添足多此一举),而是依靠自定义的Overlapped结构来判断到底对IO进行什么操作。还是看下面的源代码吧。

view plaincopy to clipboardprint?

1. DWORD WINAPI WorkerThreadFunc(LPVOID lpParam)

2. {

3. ULONG_PTR *PerHandleKey;

4. WSAOVERLAPPED *pOverlap; 5.

6. OVERLAPPEDPLUS *pOverlapPlus,

7. *newolp;

8. DWORD dwBytesXfered;

9.

10. int ret;

11.

12. HANDLE hIocp = (HANDLE)lpParam;

13.

14. while (true)

15. {

16. //这里查询IO 状态

17. ret = GetQueuedCompletionStatus(

18. hIocp,

19. &dwBytesXfered,

20. (PULONG_PTR)&PerHandleKey,

21. &pOverlap,

22. INFINITE);

23. if (ret == 0)

24. {

25. // Operation failed

26. printf("cannot get queued completion statusn");

27. continue;

28. }

29.

30. //OVERLAPPEDPLUS是我们自己定义的data structure,一般把正常的

31. //OVERLAPPED作为第一个field,所以指针是指向同样地址,后面会给出具

32. //体定义. CONTAINING_RECORD是一个标准win32 macro

33.

34. pOverlapPlus = CONTAINING_RECORD(pOverlap, OVERLAPPEDPLUS, overlapped);

35.

36. // OP_ACCEPT也是我们自己定义的 value,只是一个标识:

37. // #define OP_ACCEPT 1

38.

39. switch (pOverlapPlus->OpCode)

40. {

41. case OP_ACCEPT:

42. printf("acceptedn");

43. //根据OVERLAPPEDPLUS的定义,一般应该有专门

44. //function来释放,不过这里就简单一下

45. free(pOverlapPlus);

46. break;

47. } 48. }

49. }

上面的OVERLAPPEDPLUS是一个很重要的自定义结构,可以把你要的东西全部放里面:D

view plaincopy to clipboardprint?

1. #pragma once

2.

3. #include

4. #include

5. #include

6. #include

7. #include

8. #include

9. #include

10. #include

11. #include

12. #include

13. #include

14. #include

15.

16.

17. #define DATA_BUFSIZE 4096

18.

19. typedef struct _OVERLAPPEDPLUS {

20. WSAOVERLAPPED overlapped;

21. SOCKET serverSock;

22. SOCKET clientSock;

23. int OpCode;

24. WSABUF wbuf;

25. DWORD Bytes;

26. DWORD Flags;

27. // other useful information

28. } OVERLAPPEDPLUS;

29.

30. #define OP_READ 0

31. #define OP_WRITE 1

32. #define OP_ACCEPT 2

好,最后就是开始提到的CTcpServer这个对socket的封装,这个类并不复杂,除开封装socket之外(无非就是socket(...),bind,listen),最重要的是检查FD_ACCEPT event,然后调用AcceptEx(如果你去看MSDN,会发现AcceptEx的示例并没有用AcceptEx,汗...),当然,你可以用类似:while(true) { AcceptEx(...);}, 但是这种busy loop显然是极其恶劣的。 先来看.h

view plaincopy to clipboardprint?

1. #pragma once

2. #include "common.h"

3. #include

4. using namespace std;

5.

6. class CTcpServer

7. {

8. WSADATA wsd;

9. SOCKET m_ListeningSocket;

10. ADDRINFOW* m_pAddrInfo;

11. HANDLE m_AcceptEvent;

12.

13. public:

14. CTcpServer(PCWSTR pIPAddress, PCWSTR port);

15.

16. ~CTcpServer();

17.

18. SOCKET& Socket()

19. {

20. return m_ListeningSocket;

21. }

22.

23. bool StartListening();

24.

25. int AddressFamily()

26. {

27. return m_pAddrInfo->ai_family;

28. }

29.

30. int SocketType()

31. {

32. return m_pAddrInfo->ai_socktype;

33. }

34.

35. int Protocol()

36. {

37. return m_pAddrInfo->ai_protocol;

38. }

39.

40. BOOL WaitForAcceptEvent(DWORD timeout);

41. 42. BOOL AcceptNewConnection();

43.

44. };

没什么稀奇的,如我所说,关键在于WaitForAcceptEvent()上,下面的代码是具体实现:

view plaincopy to clipboardprint?

1. #include "stdafx.h"

2.

3. #include "TcpServer.h"

4. #include "common.h"

5.

6. //初始化socket,稍微改改可以支持ipv6,先不管它

7. CTcpServer::CTcpServer(PCWSTR pAddress, PCWSTR port)

8. :m_ListeningSocket(INVALID_SOCKET), m_pAddrInfo(NULL), m_AcceptEvent(NULL)

9. {

10. int rc = WSAStartup(MAKEWORD(2, 2), &wsd);

11. if (rc != 0) {

12. wprintf(L"WSAStartup failedn");

13. throw new exception();

14. }

15.

16. // Initialize the hints to retrieve the server address for IPv4

17. ADDRINFOW *result = NULL,

18. *ptr = NULL,

19. hints = {0};

20.

21. _family = AF_INET;

22. _socktype = SOCK_STREAM;

23. _protocol = IPPROTO_TCP;

24.

25. rc =::GetAddrInfoW(pAddress, port, &hints, &m_pAddrInfo);

26. if (rc != 0) {

27. printf("getaddrinfo failed: %dn", rc );

28. throw new exception();

29. }

30.

31. m_ListeningSocket = WSASocket(AF_INET,

32. SOCK_STREAM,

33. IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);

34.

35. } 36.

37. // dtor负责cleanup,没什么特别

38. CTcpServer::~CTcpServer()

39. {

40. if (m_ListeningSocket != INVALID_SOCKET)

41. {

42. closesocket(m_ListeningSocket);

43. }

44.

45. if (m_AcceptEvent != NULL && m_AcceptEvent != INVALID_HANDLE_VALUE)

46. {

47. ::CloseHandle(m_AcceptEvent);

48. }

49.

50. WSACleanup();

51. }

52.

53. // bind和listen,然后构造FD_ACCEPT event

54. bool CTcpServer::StartListening()

55. {

56. if (bind(m_ListeningSocket, m_pAddrInfo->ai_addr, m_pAddrInfo->ai_addrlen) == SOCKET_ERROR)

57. {

58. return false;

59. }

60.

61. if (listen(m_ListeningSocket, 1 ) == SOCKET_ERROR)

62. {

63. printf("failed listening");

64. return false;

65. }

66.

67. //这个很重要,先创建event,然后用WSAEventSelect表示我们只对

68. // FD_ACCEPT感兴趣

69. m_AcceptEvent = WSACreateEvent();

70. if(SOCKET_ERROR == WSAEventSelect(m_ListeningSocket, m_AcceptEvent, FD_ACCEPT))

71. {

72. printf("WSAEventSelect failed: %dn", WSAGetLastError());

73. return false;

74. }

75.

76. return true; 77. }

78.

79. // 等待FD_ACCEPT event

80. // 注意,跟普通event不一样,我们无需用ResetEvent()/SetEvent()

81. // 这些API,这个类似于Pulse

82. BOOL CTcpServer::WaitForAcceptEvent(DWORD timeout)

83. {

84. DWORD ret = WSAWaitForMultipleEvents(1, &m_AcceptEvent, FALSE, timeout, FALSE);

85. if (WSA_WAIT_TIMEOUT == ret || WSA_WAIT_FAILED == ret)

86. {

87. printf("wait for accept failedn");

88. return false;

89. }

90. // 收到event

91. printf("test accept eventn");

92.

93. WSANETWORKEVENTS events;

94.

95. // 检查该event是否在正确socket上

96. int nRet = WSAEnumNetworkEvents(m_ListeningSocket, m_AcceptEvent,

&events);

97.

98. if (nRet == SOCKET_ERROR)

99. {

100. printf("error when enumerate network eventsn");

101. return false;

102. }

103.

104. // 确认是FD_ACCEPT event

105. if (rkEvents & FD_ACCEPT)

106. {

107. printf("accept event foundn");

108. return true;

109. }

110.

111. return false;

112. }

113.

114. // 接受新连接请求

115. BOOL CTcpServer::AcceptNewConnection()

116. {

117. DWORD dwBytes;

118. 119. // OVERLAPPEDPLUS的初始化比较繁琐,所以专门用一个function

120. OVERLAPPEDPLUS* pOverlapPlus = ::CreateOverlappedPlus();

121.

122. pOverlapPlus->serverSock = m_ListeningSocket;

123. pOverlapPlus->OpCode = OP_ACCEPT;

124.

125. pOverlapPlus->clientSock = socket(m_pAddrInfo->ai_family, m_pAddrInfo->ai_socktype, m_pAddrInfo->ai_protocol);

126.

127. // 注意 AcceptEx是立即返回的,而且一般都是FALSE,如果要确认

128. // 是否出错,需要再调用WSAGetLastError()看是否是IO_PENDING,如

129. // 果是pending就没关系

130. // 另外, AcceptEx把client address放在接收buffer的最后,所以下

131. // 面会看到有一些"magic number"

132. return AcceptEx(m_ListeningSocket, pOverlapPlus->clientSock,

pOverlapPlus->, pOverlapPlus-> - ((sizeof(sockaddr_in) + 16) * 2), sizeof(sockaddr_in) + 16,sizeof(sockaddr_in) + 16, &dwBytes, &pOverlapPlus->overlapped);

133. }

134.

135. //创建OVERLAPPEDPLUS

136. // 其实还有个Free的function,也就是多free一下mbuf,就

137. //不写了:D

138. OVERLAPPEDPLUS* CreateOverlappedPlus()

139. {

140. OVERLAPPEDPLUS* pOverlapPlus = (OVERLAPPEDPLUS *)malloc(sizeof(OVERLAPPEDPLUS));

141. memset(pOverlapPlus, 0, sizeof(OVERLAPPEDPLUS));

142.

143. char* pBuffer = (char*)malloc(DATA_BUFSIZE);

144. pOverlapPlus-> = pBuffer;

145. pOverlapPlus-> = DATA_BUFSIZE;

146.

147. return pOverlapPlus;

148. }

差不多就酱紫。另外说明在thread中,收到的Accept是带了第一个TCP payload的,所以如果你在AcceptEx之后去WSARecvFrom,是收不到东西的(因为已经收到了, 注意AcceptEx用pOverlapPlus-> 接收数据)

偶认为本人这篇是世界上最清晰易懂的IO Completion 教程:D

另外还有个BindIoCompletionPort,似乎要方便点,没try过...反正我这个是标准实现,呵呵。

发布者:admin,转转请注明出处:http://www.yc00.com/news/1687430307a9428.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信