c11_ThreadAndPool(线程和线程池)

c11_ThreadAndPool(线程和线程池)

2023年6月22日发(作者:)

windows系统中,需要vs2012才支持。

1.线程的创建

C++11线程类std::thread,头文件include

首先,看一个最简单的例子:

[cpp] view plaincopy

1. void my_thread()

2. {

3. puts("hello, world");

4. }

5.

6. int main(int argc, char *argv[])

7. {

8. std::thread t(my_thread);

9. ();

10.

11. system("pause");

12. return 0;

13. }

实例化一个线程对象t,参数my_thread是一个函数,在线程创建完成后将被执行,

()等待子线程my_thread执行完之后,主线程才可以继续执行下去,此时主线程会

释放掉执行完后的子线程资源。

当然,如果不想等待子线程,可以在主线程里面执行()将子线程从主线程里分离,

子线程执行完成后会自己释放掉资源。分离后的线程,主线程将对它没有控制权了。

相对于以前使用过的beginthread传多个参数需要传入struct地址,

boost::thread传参需要bind,std::thread传参真的非常方便,而且可读性也很好。

下面例子在实例化线程对象的时候,在线程函数my_thread后面紧接着传入两个参数。

[cpp] view plaincopy

1. #include

2. #include

3. #include

4. #include

5.

6. void my_thread(int num, const std::string& str)

7. {

8. std::cout << "num:" << num << ",name:" << str << std::endl; 9. }

10.

11. int main(int argc, char *argv[])

12. {

13. int num = 1234;

14. std::string str = "tujiaw";

15. std::thread t(my_thread, num, str);

16. ();

17.

18. system("pause");

19. return 0;

20. }

2.互斥量

多个线程同时访问共享资源的时候需要需要用到互斥量,当一个线程锁住了互斥量后,其他线程必须等待这个互斥量解锁后才能访问它。thread提供了四种不同的互斥量:

独占式互斥量non-recursive (std::mutex)

递归式互斥量recursive (std::recursive_mutex)

允许超时的独占式互斥量non-recursive that allows timeouts on the lock

functions(std::timed_mutex)

允许超时的递归式互斥量recursive mutex that allows timeouts on the lock functions

(std::recursive_timed_mutex)

独占式互斥量

独占式互斥量加解锁是成对的,同一个线程内独占式互斥量在没有解锁的情况下,再次对它进行加锁这是不对的,会得到一个未定义行为。

如果你想thread1输出10次10,thread2输出10次20,如果你想看到一个正确的显示效果,下面程序是做不到的,因为在thread1输出的时候,

thread2也会执行,输出的结果看起来有点乱(std::cout不是线程安全的),所以我们需要在它们访问共享资源的时候使用互斥量加锁。打开代码里面的三行注释就可以得到正确的结果了。在线程1中std::mutex使用成员函数lock加锁unlock解锁,看起来工作的很好,但这样是不安全的,你得始终记住lock之后一定要unlock,但是如果在它们中间出现了异常或者线程直接退出了unlock就没有执行,因为这个互斥量是独占式的,所以在thread1没有解锁之前,其他使用这个互斥量加锁的线程会一直处于等待状态得不到执行。lock_guard模板类使用RAII手法封装互斥量,在实例化对象的时候帮你加锁,并且能保证在离开作用域的时候自动解锁,所以你应该用lock_guard来帮你加解锁。

[cpp] view plaincopy

1. #include

2. #include

3. #include

4. #include 5. #include

6.

7. int g_num = 0;

8. std::mutex g_mutex;

9.

10. void thread1()

11. {

12. //g_();

13. g_num = 10;

14. for (int i=0; i<10; i++){

15. std::cout << "thread1:" << g_num << std::endl;

16. }

17. //g_();

18. }

19.

20. void thread2()

21. {

22. //std::lock_guard lg(g_mutex);

23. g_num = 20;

24. for (int i=0; i<10; i++){

25. std::cout << "thread2:" << g_num << std::endl;

26. }

27. }

28.

29. int main(int argc, char *argv[])

30. {

31. std::thread t1(thread1);

32. std::thread t2(thread2);

33. ();

34. ();

35.

36. system("pause");

37. return 0;

38. }

递归式互斥量

与独占式互斥量不同的是,同一个线程内在互斥量没有解锁的情况下可以再次进行加锁,不过他们的加解锁次数需要一致,递归式互斥量我们平时可能用得比较少些。

允许超时的互斥量 如果线程1对共享资源的访问时间比较长,这时线程2可能等不了那么久,所以设置一个超时时间,在超时时间内如果线程1中的互斥量还没有解锁,线程2就不等了,继续往下执行。

lock_guard只是提供了对互斥量最基本的加解锁封装,而unique_lock提供了多种构造方法,使用起来更加灵活,对于允许超时的互斥量需要使用unnique_lock来包装。

[cpp] view plaincopy

1. std::timed_mutex g_timed_mutex;

2. void thread1()

3. {

4. std::unique_lock tl(g_timed_mutex);

5. ::Sleep(3000); // 睡眠3秒

6. puts("thread1");

7. }

8.

9. void thread2()

10. {

11. std::unique_lock tl(g_timed_mutex, std::chrono::milliseconds(1000)); // 超时时间1秒

12. puts("thread2");

13. }

14.

15. int main(int argc, char *argv[])

16. {

17. std::thread t1(thread1);

18. ::Sleep(100); // 让线程1先启动

19. std::thread t2(thread2);

20. ();

21. ();

22.

23. system("pause");

24. return 0;

25. }

注意死锁

有时,一个操作需要对一个以上的mutex加锁,这时请注意了,这样很可能造成死锁。

[cpp] view plaincopy

1. struct Widget

2. {

3. std::mutex mutex_;

4. std::string str_;

5. }; 6.

7. void foo(Widget& w1, Widget& w2)

8. {

9. std::unique_lock t1(_);

10. std::unique_lock t2(_);

11. // do something

12. }

13. Widget g_w1, g_w2;

当一个线程调用foo(g_w1, g_w2),另外一个线程调用foo(g_w2, g_w1)的时候,

线程1: 线程2:

_.lock ...

... _.lock

... ...

_.lock等待 ...

_lock等待

可能的执行顺序:

线程1中的w1上锁;

线程2中的w2上锁;

线程1中的w2上锁,此时由于w2已经在线程2中上过锁了,所以必须等待;

线程2中的w1上锁,此时由于w1已经在线程1中上过锁了,所以必须等待;

这样两个线程都等不到对方释放锁,都处于等待状态造成了死锁。

thread提供了一个std::lock函数可以对多个互斥量同时加锁,每个线程里面的

w1和w2会同时上锁,他们之间就没有间隙了,如上将foo函数改为如下形式就可以了:

[cpp] view plaincopy

1. void foo(Widget& w1, Widget& w2)

2. {

3. std::unique_lock t1(_, std::defer_lock);

4. std::unique_lock t2(_, std::defer_lock);

5. std::lock(t1, t2);

6. // do something

7. }

在实例化的时候先不要加锁,等到两个对象都创建完成之后再一起加锁。

在初始化的时候保护数据

如果你的数据仅仅只在初始化的时候进行保护,使用一个互斥量是不行的,在初始化完成后会导致没必要的同步,C++11提供了一些方法来解决这个问题。 3.线程间同步,条件变量

如果我们在线程间共享数据,经常会存在一个线程等待另外一个线程的情况,它们之间存在先后关系。

这个与互斥量不同,互斥量是保证多个线程的时候当前只有一个线程访问加锁的代码块,它们之间是不存在先后关系的。

如下例子:线程1需要等到线程2将flag设置为非0才进行打印

[cpp] view plaincopy

1. #include

2. #include

3. #include

4. #include

5. #include

6.

7. class Foo

8. {

9. public:

10. Foo()

11. : flag_(0)

12. , thread1_(std::bind(&Foo::threadFunc1, this))

13. , thread2_(std::bind(&Foo::threadFunc2, this))

14. {

15. }

16.

17. ~Foo()

18. {

19. thread1_.join();

20. thread2_.join();

21. }

22.

23. private:

24. void threadFunc1()

25. {

26. {

27. std::unique_lock ul(mutex_);

28. while (0 == flag_) {

29. cond_.wait(ul); 30. }

31. std::cout << flag_ << std::endl;

32. }

33. }

34.

35. void threadFunc2()

36. {

37. // 为了测试,等待3秒

38. std::this_thread::sleep_for((std::chrono::milliseconds(3000)));

39. std::unique_lock ul(mutex_);

40. flag_ = 100;

41. cond_.notify_one();

42. }

43.

44. int flag_;

45. std::mutex mutex_;

46. std::condition_variable cond_;

47. std::thread thread1_;

48. std::thread thread2_;

49. };

50.

51. int _tmain(int argc, _TCHAR* argv[])

52. {

53. Foo f;

54.

55. system("pause");

56. return 0;

57. }

从代码可以看出,虽然线程1明显比线程2快些(人为制造等待3秒),但是线程1还是会等待线程2将flag设置为非0后才进行打印的。

这里有几个地方需要注意:

1> Foo类成员变量定义的顺序,mutex和cond必须在thread的前面。

原因是:如果线程的定义在前面,线程初始化完成之后立马会执行线程函数,而线程函数里用到了mutex和cond,此时如果mutex和cond还没初始化完成,就会出现内存错误。

2>由于同时有两个线程需要操作flag变量,所以在读写的时候要加锁,std::unique_lock会保证构造的时候加锁,离开作用域调用析构的时候解锁,所以不用担心加解锁不匹配。 3>threadFunc1中的while (0 == flag_),必须这样写不能写成if (0 == flag_),因为在多核CPU下会存在虚假唤醒( spurious wakes)的情况。

4>cond_.wait(ul);条件变量在wait的时候会释放锁的,所以其他线程可以继续执行。

4.线程池

[cpp] view plaincopy

1. #include

2. #include

3. #include

4. #include

5. #include

6. #include

7. #include

8. #include

9. #include

10. #include

11. #include

12. #include

13. #include

14. #include

15.

16. class ThreadPool

17. {

18. public:

19. typedef std::function Task;

20.

21. ThreadPool(int num)

22. : num_(num)

23. , maxQueueSize_(0)

24. , running_(false)

25. {

26. }

27.

28. ~ThreadPool()

29. {

30. if (running_) {

31. stop();

32. }

33. } 34.

35. ThreadPool(const ThreadPool&) = delete;

36. void operator=(const ThreadPool&) = delete;

37.

38. void setMaxQueueSize(int maxSize)

39. {

40. maxQueueSize_ = maxSize;

41. }

42.

43. void start()

44. {

45. assert(threads_.empty());

46. running_ = true;

47. threads_.reserve(num_);

48. for (int i = 0; i

49. threads_.push_back(std::thread(std::bind(&ThreadPool::threadFunc, this)));

50. }

51. }

52.

53. void stop()

54. {

55. {

56. std::unique_lock ul(mutex_);

57. running_ = false;

58. notEmpty_.notify_all();

59. }

60.

61. for (auto &iter : threads_) {

62. ();

63. }

64. }

65.

66. void run(const Task &t)

67. {

68. if (threads_.empty()) {

69. t();

70. }

71. else {

72. std::unique_lock ul(mutex_);

73. while (isFull()) {

74. notFull_.wait(ul);

75. }

76. assert(!isFull()); 77. queue_.push_back(t);

78. notEmpty_.notify_one();

79. }

80. }

81.

82. private:

83. bool isFull() const

84. {

85. return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;

86. }

87.

88. void threadFunc()

89. {

90. printf("create id:%dn", ::GetCurrentThreadId());

91. while (running_) {

92. Task task(take());

93. if (task) {

94. task();

95. }

96. }

97. printf("thread id:%dn", ::GetCurrentThreadId());

98. }

99.

100. Task take()

101. {

102. std::unique_lock ul(mutex_);

103. while (queue_.empty() && running_) {

104. notEmpty_.wait(ul);

105. }

106. Task task;

107. if (!queue_.empty()) {

108. task = queue_.front();

109. queue_.pop_front();

110. if (maxQueueSize_ > 0) {

111. notFull_.notify_one();

112. }

113. }

114. return task;

115. }

116.

117. private:

118. int num_;

119. std::mutex mutex_;

120. std::condition_variable notEmpty_; 121. std::condition_variable notFull_;

122. std::vector threads_;

123. std::deque queue_;

124. size_t maxQueueSize_;

125. bool running_;

126. };

127.

128. void fun()

129. {

130. printf("[id:%d] hello, world!n", ::GetCurrentThreadId());

131. }

132.

133. int _tmain(int argc, _TCHAR* argv[])

134. {

135. {

136. printf("main thread id:%dn", ::GetCurrentThreadId());

137. ThreadPool pool(3);

138. QueueSize(100);

139. ();

140. //std::this_thread::sleep_for(std::chrono::milliseconds(3000));

141.

142. for (int i = 0; i < 1000; i++) {

143. (fun);

144. }

145. std::this_thread::sleep_for(std::chrono::milliseconds(3000));

146. }

147.

148. system("pause");

149. return 0;

150. }

发布者:admin,转转请注明出处:http://www.yc00.com/news/1687425006a9099.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信