scheduler.ipp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. //
  2. // detail/impl/scheduler.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
  11. #define BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #include <boost/asio/detail/concurrency_hint.hpp>
  17. #include <boost/asio/detail/event.hpp>
  18. #include <boost/asio/detail/limits.hpp>
  19. #include <boost/asio/detail/reactor.hpp>
  20. #include <boost/asio/detail/scheduler.hpp>
  21. #include <boost/asio/detail/scheduler_thread_info.hpp>
  22. #include <boost/asio/detail/push_options.hpp>
  23. namespace boost {
  24. namespace asio {
  25. namespace detail {
  26. struct scheduler::task_cleanup
  27. {
  28. ~task_cleanup()
  29. {
  30. if (this_thread_->private_outstanding_work > 0)
  31. {
  32. boost::asio::detail::increment(
  33. scheduler_->outstanding_work_,
  34. this_thread_->private_outstanding_work);
  35. }
  36. this_thread_->private_outstanding_work = 0;
  37. // Enqueue the completed operations and reinsert the task at the end of
  38. // the operation queue.
  39. lock_->lock();
  40. scheduler_->task_interrupted_ = true;
  41. scheduler_->op_queue_.push(this_thread_->private_op_queue);
  42. scheduler_->op_queue_.push(&scheduler_->task_operation_);
  43. }
  44. scheduler* scheduler_;
  45. mutex::scoped_lock* lock_;
  46. thread_info* this_thread_;
  47. };
  48. struct scheduler::work_cleanup
  49. {
  50. ~work_cleanup()
  51. {
  52. if (this_thread_->private_outstanding_work > 1)
  53. {
  54. boost::asio::detail::increment(
  55. scheduler_->outstanding_work_,
  56. this_thread_->private_outstanding_work - 1);
  57. }
  58. else if (this_thread_->private_outstanding_work < 1)
  59. {
  60. scheduler_->work_finished();
  61. }
  62. this_thread_->private_outstanding_work = 0;
  63. #if defined(BOOST_ASIO_HAS_THREADS)
  64. if (!this_thread_->private_op_queue.empty())
  65. {
  66. lock_->lock();
  67. scheduler_->op_queue_.push(this_thread_->private_op_queue);
  68. }
  69. #endif // defined(BOOST_ASIO_HAS_THREADS)
  70. }
  71. scheduler* scheduler_;
  72. mutex::scoped_lock* lock_;
  73. thread_info* this_thread_;
  74. };
  75. scheduler::scheduler(
  76. boost::asio::execution_context& ctx, int concurrency_hint)
  77. : boost::asio::detail::execution_context_service_base<scheduler>(ctx),
  78. one_thread_(concurrency_hint == 1
  79. || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
  80. SCHEDULER, concurrency_hint)
  81. || !BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
  82. REACTOR_IO, concurrency_hint)),
  83. mutex_(BOOST_ASIO_CONCURRENCY_HINT_IS_LOCKING(
  84. SCHEDULER, concurrency_hint)),
  85. task_(0),
  86. task_interrupted_(true),
  87. outstanding_work_(0),
  88. stopped_(false),
  89. shutdown_(false),
  90. concurrency_hint_(concurrency_hint)
  91. {
  92. BOOST_ASIO_HANDLER_TRACKING_INIT;
  93. }
  94. void scheduler::shutdown()
  95. {
  96. mutex::scoped_lock lock(mutex_);
  97. shutdown_ = true;
  98. lock.unlock();
  99. // Destroy handler objects.
  100. while (!op_queue_.empty())
  101. {
  102. operation* o = op_queue_.front();
  103. op_queue_.pop();
  104. if (o != &task_operation_)
  105. o->destroy();
  106. }
  107. // Reset to initial state.
  108. task_ = 0;
  109. }
  110. void scheduler::init_task()
  111. {
  112. mutex::scoped_lock lock(mutex_);
  113. if (!shutdown_ && !task_)
  114. {
  115. task_ = &use_service<reactor>(this->context());
  116. op_queue_.push(&task_operation_);
  117. wake_one_thread_and_unlock(lock);
  118. }
  119. }
  120. std::size_t scheduler::run(boost::system::error_code& ec)
  121. {
  122. ec = boost::system::error_code();
  123. if (outstanding_work_ == 0)
  124. {
  125. stop();
  126. return 0;
  127. }
  128. thread_info this_thread;
  129. this_thread.private_outstanding_work = 0;
  130. thread_call_stack::context ctx(this, this_thread);
  131. mutex::scoped_lock lock(mutex_);
  132. std::size_t n = 0;
  133. for (; do_run_one(lock, this_thread, ec); lock.lock())
  134. if (n != (std::numeric_limits<std::size_t>::max)())
  135. ++n;
  136. return n;
  137. }
  138. std::size_t scheduler::run_one(boost::system::error_code& ec)
  139. {
  140. ec = boost::system::error_code();
  141. if (outstanding_work_ == 0)
  142. {
  143. stop();
  144. return 0;
  145. }
  146. thread_info this_thread;
  147. this_thread.private_outstanding_work = 0;
  148. thread_call_stack::context ctx(this, this_thread);
  149. mutex::scoped_lock lock(mutex_);
  150. return do_run_one(lock, this_thread, ec);
  151. }
  152. std::size_t scheduler::wait_one(long usec, boost::system::error_code& ec)
  153. {
  154. ec = boost::system::error_code();
  155. if (outstanding_work_ == 0)
  156. {
  157. stop();
  158. return 0;
  159. }
  160. thread_info this_thread;
  161. this_thread.private_outstanding_work = 0;
  162. thread_call_stack::context ctx(this, this_thread);
  163. mutex::scoped_lock lock(mutex_);
  164. return do_wait_one(lock, this_thread, usec, ec);
  165. }
  166. std::size_t scheduler::poll(boost::system::error_code& ec)
  167. {
  168. ec = boost::system::error_code();
  169. if (outstanding_work_ == 0)
  170. {
  171. stop();
  172. return 0;
  173. }
  174. thread_info this_thread;
  175. this_thread.private_outstanding_work = 0;
  176. thread_call_stack::context ctx(this, this_thread);
  177. mutex::scoped_lock lock(mutex_);
  178. #if defined(BOOST_ASIO_HAS_THREADS)
  179. // We want to support nested calls to poll() and poll_one(), so any handlers
  180. // that are already on a thread-private queue need to be put on to the main
  181. // queue now.
  182. if (one_thread_)
  183. if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
  184. op_queue_.push(outer_info->private_op_queue);
  185. #endif // defined(BOOST_ASIO_HAS_THREADS)
  186. std::size_t n = 0;
  187. for (; do_poll_one(lock, this_thread, ec); lock.lock())
  188. if (n != (std::numeric_limits<std::size_t>::max)())
  189. ++n;
  190. return n;
  191. }
  192. std::size_t scheduler::poll_one(boost::system::error_code& ec)
  193. {
  194. ec = boost::system::error_code();
  195. if (outstanding_work_ == 0)
  196. {
  197. stop();
  198. return 0;
  199. }
  200. thread_info this_thread;
  201. this_thread.private_outstanding_work = 0;
  202. thread_call_stack::context ctx(this, this_thread);
  203. mutex::scoped_lock lock(mutex_);
  204. #if defined(BOOST_ASIO_HAS_THREADS)
  205. // We want to support nested calls to poll() and poll_one(), so any handlers
  206. // that are already on a thread-private queue need to be put on to the main
  207. // queue now.
  208. if (one_thread_)
  209. if (thread_info* outer_info = static_cast<thread_info*>(ctx.next_by_key()))
  210. op_queue_.push(outer_info->private_op_queue);
  211. #endif // defined(BOOST_ASIO_HAS_THREADS)
  212. return do_poll_one(lock, this_thread, ec);
  213. }
  214. void scheduler::stop()
  215. {
  216. mutex::scoped_lock lock(mutex_);
  217. stop_all_threads(lock);
  218. }
  219. bool scheduler::stopped() const
  220. {
  221. mutex::scoped_lock lock(mutex_);
  222. return stopped_;
  223. }
  224. void scheduler::restart()
  225. {
  226. mutex::scoped_lock lock(mutex_);
  227. stopped_ = false;
  228. }
  229. void scheduler::compensating_work_started()
  230. {
  231. thread_info_base* this_thread = thread_call_stack::contains(this);
  232. ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
  233. }
  234. void scheduler::post_immediate_completion(
  235. scheduler::operation* op, bool is_continuation)
  236. {
  237. #if defined(BOOST_ASIO_HAS_THREADS)
  238. if (one_thread_ || is_continuation)
  239. {
  240. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  241. {
  242. ++static_cast<thread_info*>(this_thread)->private_outstanding_work;
  243. static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
  244. return;
  245. }
  246. }
  247. #else // defined(BOOST_ASIO_HAS_THREADS)
  248. (void)is_continuation;
  249. #endif // defined(BOOST_ASIO_HAS_THREADS)
  250. work_started();
  251. mutex::scoped_lock lock(mutex_);
  252. op_queue_.push(op);
  253. wake_one_thread_and_unlock(lock);
  254. }
  255. void scheduler::post_deferred_completion(scheduler::operation* op)
  256. {
  257. #if defined(BOOST_ASIO_HAS_THREADS)
  258. if (one_thread_)
  259. {
  260. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  261. {
  262. static_cast<thread_info*>(this_thread)->private_op_queue.push(op);
  263. return;
  264. }
  265. }
  266. #endif // defined(BOOST_ASIO_HAS_THREADS)
  267. mutex::scoped_lock lock(mutex_);
  268. op_queue_.push(op);
  269. wake_one_thread_and_unlock(lock);
  270. }
  271. void scheduler::post_deferred_completions(
  272. op_queue<scheduler::operation>& ops)
  273. {
  274. if (!ops.empty())
  275. {
  276. #if defined(BOOST_ASIO_HAS_THREADS)
  277. if (one_thread_)
  278. {
  279. if (thread_info_base* this_thread = thread_call_stack::contains(this))
  280. {
  281. static_cast<thread_info*>(this_thread)->private_op_queue.push(ops);
  282. return;
  283. }
  284. }
  285. #endif // defined(BOOST_ASIO_HAS_THREADS)
  286. mutex::scoped_lock lock(mutex_);
  287. op_queue_.push(ops);
  288. wake_one_thread_and_unlock(lock);
  289. }
  290. }
  291. void scheduler::do_dispatch(
  292. scheduler::operation* op)
  293. {
  294. work_started();
  295. mutex::scoped_lock lock(mutex_);
  296. op_queue_.push(op);
  297. wake_one_thread_and_unlock(lock);
  298. }
  299. void scheduler::abandon_operations(
  300. op_queue<scheduler::operation>& ops)
  301. {
  302. op_queue<scheduler::operation> ops2;
  303. ops2.push(ops);
  304. }
  305. std::size_t scheduler::do_run_one(mutex::scoped_lock& lock,
  306. scheduler::thread_info& this_thread,
  307. const boost::system::error_code& ec)
  308. {
  309. while (!stopped_)
  310. {
  311. if (!op_queue_.empty())
  312. {
  313. // Prepare to execute first handler from queue.
  314. operation* o = op_queue_.front();
  315. op_queue_.pop();
  316. bool more_handlers = (!op_queue_.empty());
  317. if (o == &task_operation_)
  318. {
  319. task_interrupted_ = more_handlers;
  320. if (more_handlers && !one_thread_)
  321. wakeup_event_.unlock_and_signal_one(lock);
  322. else
  323. lock.unlock();
  324. task_cleanup on_exit = { this, &lock, &this_thread };
  325. (void)on_exit;
  326. // Run the task. May throw an exception. Only block if the operation
  327. // queue is empty and we're not polling, otherwise we want to return
  328. // as soon as possible.
  329. task_->run(more_handlers ? 0 : -1, this_thread.private_op_queue);
  330. }
  331. else
  332. {
  333. std::size_t task_result = o->task_result_;
  334. if (more_handlers && !one_thread_)
  335. wake_one_thread_and_unlock(lock);
  336. else
  337. lock.unlock();
  338. // Ensure the count of outstanding work is decremented on block exit.
  339. work_cleanup on_exit = { this, &lock, &this_thread };
  340. (void)on_exit;
  341. // Complete the operation. May throw an exception. Deletes the object.
  342. o->complete(this, ec, task_result);
  343. return 1;
  344. }
  345. }
  346. else
  347. {
  348. wakeup_event_.clear(lock);
  349. wakeup_event_.wait(lock);
  350. }
  351. }
  352. return 0;
  353. }
  354. std::size_t scheduler::do_wait_one(mutex::scoped_lock& lock,
  355. scheduler::thread_info& this_thread, long usec,
  356. const boost::system::error_code& ec)
  357. {
  358. if (stopped_)
  359. return 0;
  360. operation* o = op_queue_.front();
  361. if (o == 0)
  362. {
  363. wakeup_event_.clear(lock);
  364. wakeup_event_.wait_for_usec(lock, usec);
  365. usec = 0; // Wait at most once.
  366. o = op_queue_.front();
  367. }
  368. if (o == &task_operation_)
  369. {
  370. op_queue_.pop();
  371. bool more_handlers = (!op_queue_.empty());
  372. task_interrupted_ = more_handlers;
  373. if (more_handlers && !one_thread_)
  374. wakeup_event_.unlock_and_signal_one(lock);
  375. else
  376. lock.unlock();
  377. {
  378. task_cleanup on_exit = { this, &lock, &this_thread };
  379. (void)on_exit;
  380. // Run the task. May throw an exception. Only block if the operation
  381. // queue is empty and we're not polling, otherwise we want to return
  382. // as soon as possible.
  383. task_->run(more_handlers ? 0 : usec, this_thread.private_op_queue);
  384. }
  385. o = op_queue_.front();
  386. if (o == &task_operation_)
  387. {
  388. if (!one_thread_)
  389. wakeup_event_.maybe_unlock_and_signal_one(lock);
  390. return 0;
  391. }
  392. }
  393. if (o == 0)
  394. return 0;
  395. op_queue_.pop();
  396. bool more_handlers = (!op_queue_.empty());
  397. std::size_t task_result = o->task_result_;
  398. if (more_handlers && !one_thread_)
  399. wake_one_thread_and_unlock(lock);
  400. else
  401. lock.unlock();
  402. // Ensure the count of outstanding work is decremented on block exit.
  403. work_cleanup on_exit = { this, &lock, &this_thread };
  404. (void)on_exit;
  405. // Complete the operation. May throw an exception. Deletes the object.
  406. o->complete(this, ec, task_result);
  407. return 1;
  408. }
  409. std::size_t scheduler::do_poll_one(mutex::scoped_lock& lock,
  410. scheduler::thread_info& this_thread,
  411. const boost::system::error_code& ec)
  412. {
  413. if (stopped_)
  414. return 0;
  415. operation* o = op_queue_.front();
  416. if (o == &task_operation_)
  417. {
  418. op_queue_.pop();
  419. lock.unlock();
  420. {
  421. task_cleanup c = { this, &lock, &this_thread };
  422. (void)c;
  423. // Run the task. May throw an exception. Only block if the operation
  424. // queue is empty and we're not polling, otherwise we want to return
  425. // as soon as possible.
  426. task_->run(0, this_thread.private_op_queue);
  427. }
  428. o = op_queue_.front();
  429. if (o == &task_operation_)
  430. {
  431. wakeup_event_.maybe_unlock_and_signal_one(lock);
  432. return 0;
  433. }
  434. }
  435. if (o == 0)
  436. return 0;
  437. op_queue_.pop();
  438. bool more_handlers = (!op_queue_.empty());
  439. std::size_t task_result = o->task_result_;
  440. if (more_handlers && !one_thread_)
  441. wake_one_thread_and_unlock(lock);
  442. else
  443. lock.unlock();
  444. // Ensure the count of outstanding work is decremented on block exit.
  445. work_cleanup on_exit = { this, &lock, &this_thread };
  446. (void)on_exit;
  447. // Complete the operation. May throw an exception. Deletes the object.
  448. o->complete(this, ec, task_result);
  449. return 1;
  450. }
  451. void scheduler::stop_all_threads(
  452. mutex::scoped_lock& lock)
  453. {
  454. stopped_ = true;
  455. wakeup_event_.signal_all(lock);
  456. if (!task_interrupted_ && task_)
  457. {
  458. task_interrupted_ = true;
  459. task_->interrupt();
  460. }
  461. }
  462. void scheduler::wake_one_thread_and_unlock(
  463. mutex::scoped_lock& lock)
  464. {
  465. if (!wakeup_event_.maybe_unlock_and_signal_one(lock))
  466. {
  467. if (!task_interrupted_ && task_)
  468. {
  469. task_interrupted_ = true;
  470. task_->interrupt();
  471. }
  472. lock.unlock();
  473. }
  474. }
  475. } // namespace detail
  476. } // namespace asio
  477. } // namespace boost
  478. #include <boost/asio/detail/pop_options.hpp>
  479. #endif // BOOST_ASIO_DETAIL_IMPL_SCHEDULER_IPP