win_iocp_io_context.ipp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. //
  2. // detail/impl/win_iocp_io_context.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
  11. #define BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #if defined(BOOST_ASIO_HAS_IOCP)
  17. #include <boost/asio/error.hpp>
  18. #include <boost/asio/detail/cstdint.hpp>
  19. #include <boost/asio/detail/handler_alloc_helpers.hpp>
  20. #include <boost/asio/detail/handler_invoke_helpers.hpp>
  21. #include <boost/asio/detail/limits.hpp>
  22. #include <boost/asio/detail/throw_error.hpp>
  23. #include <boost/asio/detail/win_iocp_io_context.hpp>
  24. #include <boost/asio/detail/push_options.hpp>
  25. namespace boost {
  26. namespace asio {
  27. namespace detail {
  28. struct win_iocp_io_context::work_finished_on_block_exit
  29. {
  30. ~work_finished_on_block_exit()
  31. {
  32. io_context_->work_finished();
  33. }
  34. win_iocp_io_context* io_context_;
  35. };
  36. struct win_iocp_io_context::timer_thread_function
  37. {
  38. void operator()()
  39. {
  40. while (::InterlockedExchangeAdd(&io_context_->shutdown_, 0) == 0)
  41. {
  42. if (::WaitForSingleObject(io_context_->waitable_timer_.handle,
  43. INFINITE) == WAIT_OBJECT_0)
  44. {
  45. ::InterlockedExchange(&io_context_->dispatch_required_, 1);
  46. ::PostQueuedCompletionStatus(io_context_->iocp_.handle,
  47. 0, wake_for_dispatch, 0);
  48. }
  49. }
  50. }
  51. win_iocp_io_context* io_context_;
  52. };
  53. win_iocp_io_context::win_iocp_io_context(
  54. boost::asio::execution_context& ctx, int concurrency_hint)
  55. : execution_context_service_base<win_iocp_io_context>(ctx),
  56. iocp_(),
  57. outstanding_work_(0),
  58. stopped_(0),
  59. stop_event_posted_(0),
  60. shutdown_(0),
  61. gqcs_timeout_(get_gqcs_timeout()),
  62. dispatch_required_(0),
  63. concurrency_hint_(concurrency_hint)
  64. {
  65. BOOST_ASIO_HANDLER_TRACKING_INIT;
  66. iocp_.handle = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0,
  67. static_cast<DWORD>(concurrency_hint >= 0 ? concurrency_hint : DWORD(~0)));
  68. if (!iocp_.handle)
  69. {
  70. DWORD last_error = ::GetLastError();
  71. boost::system::error_code ec(last_error,
  72. boost::asio::error::get_system_category());
  73. boost::asio::detail::throw_error(ec, "iocp");
  74. }
  75. }
  76. void win_iocp_io_context::shutdown()
  77. {
  78. ::InterlockedExchange(&shutdown_, 1);
  79. if (timer_thread_.get())
  80. {
  81. LARGE_INTEGER timeout;
  82. timeout.QuadPart = 1;
  83. ::SetWaitableTimer(waitable_timer_.handle, &timeout, 1, 0, 0, FALSE);
  84. }
  85. while (::InterlockedExchangeAdd(&outstanding_work_, 0) > 0)
  86. {
  87. op_queue<win_iocp_operation> ops;
  88. timer_queues_.get_all_timers(ops);
  89. ops.push(completed_ops_);
  90. if (!ops.empty())
  91. {
  92. while (win_iocp_operation* op = ops.front())
  93. {
  94. ops.pop();
  95. ::InterlockedDecrement(&outstanding_work_);
  96. op->destroy();
  97. }
  98. }
  99. else
  100. {
  101. DWORD bytes_transferred = 0;
  102. dword_ptr_t completion_key = 0;
  103. LPOVERLAPPED overlapped = 0;
  104. ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
  105. &completion_key, &overlapped, gqcs_timeout_);
  106. if (overlapped)
  107. {
  108. ::InterlockedDecrement(&outstanding_work_);
  109. static_cast<win_iocp_operation*>(overlapped)->destroy();
  110. }
  111. }
  112. }
  113. if (timer_thread_.get())
  114. timer_thread_->join();
  115. }
  116. boost::system::error_code win_iocp_io_context::register_handle(
  117. HANDLE handle, boost::system::error_code& ec)
  118. {
  119. if (::CreateIoCompletionPort(handle, iocp_.handle, 0, 0) == 0)
  120. {
  121. DWORD last_error = ::GetLastError();
  122. ec = boost::system::error_code(last_error,
  123. boost::asio::error::get_system_category());
  124. }
  125. else
  126. {
  127. ec = boost::system::error_code();
  128. }
  129. return ec;
  130. }
  131. size_t win_iocp_io_context::run(boost::system::error_code& ec)
  132. {
  133. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  134. {
  135. stop();
  136. ec = boost::system::error_code();
  137. return 0;
  138. }
  139. win_iocp_thread_info this_thread;
  140. thread_call_stack::context ctx(this, this_thread);
  141. size_t n = 0;
  142. while (do_one(INFINITE, ec))
  143. if (n != (std::numeric_limits<size_t>::max)())
  144. ++n;
  145. return n;
  146. }
  147. size_t win_iocp_io_context::run_one(boost::system::error_code& ec)
  148. {
  149. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  150. {
  151. stop();
  152. ec = boost::system::error_code();
  153. return 0;
  154. }
  155. win_iocp_thread_info this_thread;
  156. thread_call_stack::context ctx(this, this_thread);
  157. return do_one(INFINITE, ec);
  158. }
  159. size_t win_iocp_io_context::wait_one(long usec, boost::system::error_code& ec)
  160. {
  161. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  162. {
  163. stop();
  164. ec = boost::system::error_code();
  165. return 0;
  166. }
  167. win_iocp_thread_info this_thread;
  168. thread_call_stack::context ctx(this, this_thread);
  169. return do_one(usec < 0 ? INFINITE : ((usec - 1) / 1000 + 1), ec);
  170. }
  171. size_t win_iocp_io_context::poll(boost::system::error_code& ec)
  172. {
  173. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  174. {
  175. stop();
  176. ec = boost::system::error_code();
  177. return 0;
  178. }
  179. win_iocp_thread_info this_thread;
  180. thread_call_stack::context ctx(this, this_thread);
  181. size_t n = 0;
  182. while (do_one(0, ec))
  183. if (n != (std::numeric_limits<size_t>::max)())
  184. ++n;
  185. return n;
  186. }
  187. size_t win_iocp_io_context::poll_one(boost::system::error_code& ec)
  188. {
  189. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  190. {
  191. stop();
  192. ec = boost::system::error_code();
  193. return 0;
  194. }
  195. win_iocp_thread_info this_thread;
  196. thread_call_stack::context ctx(this, this_thread);
  197. return do_one(0, ec);
  198. }
  199. void win_iocp_io_context::stop()
  200. {
  201. if (::InterlockedExchange(&stopped_, 1) == 0)
  202. {
  203. if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
  204. {
  205. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
  206. {
  207. DWORD last_error = ::GetLastError();
  208. boost::system::error_code ec(last_error,
  209. boost::asio::error::get_system_category());
  210. boost::asio::detail::throw_error(ec, "pqcs");
  211. }
  212. }
  213. }
  214. }
  215. void win_iocp_io_context::post_deferred_completion(win_iocp_operation* op)
  216. {
  217. // Flag the operation as ready.
  218. op->ready_ = 1;
  219. // Enqueue the operation on the I/O completion port.
  220. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
  221. {
  222. // Out of resources. Put on completed queue instead.
  223. mutex::scoped_lock lock(dispatch_mutex_);
  224. completed_ops_.push(op);
  225. ::InterlockedExchange(&dispatch_required_, 1);
  226. }
  227. }
  228. void win_iocp_io_context::post_deferred_completions(
  229. op_queue<win_iocp_operation>& ops)
  230. {
  231. while (win_iocp_operation* op = ops.front())
  232. {
  233. ops.pop();
  234. // Flag the operation as ready.
  235. op->ready_ = 1;
  236. // Enqueue the operation on the I/O completion port.
  237. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, op))
  238. {
  239. // Out of resources. Put on completed queue instead.
  240. mutex::scoped_lock lock(dispatch_mutex_);
  241. completed_ops_.push(op);
  242. completed_ops_.push(ops);
  243. ::InterlockedExchange(&dispatch_required_, 1);
  244. }
  245. }
  246. }
  247. void win_iocp_io_context::abandon_operations(
  248. op_queue<win_iocp_operation>& ops)
  249. {
  250. while (win_iocp_operation* op = ops.front())
  251. {
  252. ops.pop();
  253. ::InterlockedDecrement(&outstanding_work_);
  254. op->destroy();
  255. }
  256. }
  257. void win_iocp_io_context::on_pending(win_iocp_operation* op)
  258. {
  259. if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
  260. {
  261. // Enqueue the operation on the I/O completion port.
  262. if (!::PostQueuedCompletionStatus(iocp_.handle,
  263. 0, overlapped_contains_result, op))
  264. {
  265. // Out of resources. Put on completed queue instead.
  266. mutex::scoped_lock lock(dispatch_mutex_);
  267. completed_ops_.push(op);
  268. ::InterlockedExchange(&dispatch_required_, 1);
  269. }
  270. }
  271. }
  272. void win_iocp_io_context::on_completion(win_iocp_operation* op,
  273. DWORD last_error, DWORD bytes_transferred)
  274. {
  275. // Flag that the operation is ready for invocation.
  276. op->ready_ = 1;
  277. // Store results in the OVERLAPPED structure.
  278. op->Internal = reinterpret_cast<ulong_ptr_t>(
  279. &boost::asio::error::get_system_category());
  280. op->Offset = last_error;
  281. op->OffsetHigh = bytes_transferred;
  282. // Enqueue the operation on the I/O completion port.
  283. if (!::PostQueuedCompletionStatus(iocp_.handle,
  284. 0, overlapped_contains_result, op))
  285. {
  286. // Out of resources. Put on completed queue instead.
  287. mutex::scoped_lock lock(dispatch_mutex_);
  288. completed_ops_.push(op);
  289. ::InterlockedExchange(&dispatch_required_, 1);
  290. }
  291. }
  292. void win_iocp_io_context::on_completion(win_iocp_operation* op,
  293. const boost::system::error_code& ec, DWORD bytes_transferred)
  294. {
  295. // Flag that the operation is ready for invocation.
  296. op->ready_ = 1;
  297. // Store results in the OVERLAPPED structure.
  298. op->Internal = reinterpret_cast<ulong_ptr_t>(&ec.category());
  299. op->Offset = ec.value();
  300. op->OffsetHigh = bytes_transferred;
  301. // Enqueue the operation on the I/O completion port.
  302. if (!::PostQueuedCompletionStatus(iocp_.handle,
  303. 0, overlapped_contains_result, op))
  304. {
  305. // Out of resources. Put on completed queue instead.
  306. mutex::scoped_lock lock(dispatch_mutex_);
  307. completed_ops_.push(op);
  308. ::InterlockedExchange(&dispatch_required_, 1);
  309. }
  310. }
  311. size_t win_iocp_io_context::do_one(DWORD msec, boost::system::error_code& ec)
  312. {
  313. for (;;)
  314. {
  315. // Try to acquire responsibility for dispatching timers and completed ops.
  316. if (::InterlockedCompareExchange(&dispatch_required_, 0, 1) == 1)
  317. {
  318. mutex::scoped_lock lock(dispatch_mutex_);
  319. // Dispatch pending timers and operations.
  320. op_queue<win_iocp_operation> ops;
  321. ops.push(completed_ops_);
  322. timer_queues_.get_ready_timers(ops);
  323. post_deferred_completions(ops);
  324. update_timeout();
  325. }
  326. // Get the next operation from the queue.
  327. DWORD bytes_transferred = 0;
  328. dword_ptr_t completion_key = 0;
  329. LPOVERLAPPED overlapped = 0;
  330. ::SetLastError(0);
  331. BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle,
  332. &bytes_transferred, &completion_key, &overlapped,
  333. msec < gqcs_timeout_ ? msec : gqcs_timeout_);
  334. DWORD last_error = ::GetLastError();
  335. if (overlapped)
  336. {
  337. win_iocp_operation* op = static_cast<win_iocp_operation*>(overlapped);
  338. boost::system::error_code result_ec(last_error,
  339. boost::asio::error::get_system_category());
  340. // We may have been passed the last_error and bytes_transferred in the
  341. // OVERLAPPED structure itself.
  342. if (completion_key == overlapped_contains_result)
  343. {
  344. result_ec = boost::system::error_code(static_cast<int>(op->Offset),
  345. *reinterpret_cast<boost::system::error_category*>(op->Internal));
  346. bytes_transferred = op->OffsetHigh;
  347. }
  348. // Otherwise ensure any result has been saved into the OVERLAPPED
  349. // structure.
  350. else
  351. {
  352. op->Internal = reinterpret_cast<ulong_ptr_t>(&result_ec.category());
  353. op->Offset = result_ec.value();
  354. op->OffsetHigh = bytes_transferred;
  355. }
  356. // Dispatch the operation only if ready. The operation may not be ready
  357. // if the initiating function (e.g. a call to WSARecv) has not yet
  358. // returned. This is because the initiating function still wants access
  359. // to the operation's OVERLAPPED structure.
  360. if (::InterlockedCompareExchange(&op->ready_, 1, 0) == 1)
  361. {
  362. // Ensure the count of outstanding work is decremented on block exit.
  363. work_finished_on_block_exit on_exit = { this };
  364. (void)on_exit;
  365. op->complete(this, result_ec, bytes_transferred);
  366. ec = boost::system::error_code();
  367. return 1;
  368. }
  369. }
  370. else if (!ok)
  371. {
  372. if (last_error != WAIT_TIMEOUT)
  373. {
  374. ec = boost::system::error_code(last_error,
  375. boost::asio::error::get_system_category());
  376. return 0;
  377. }
  378. // If we're waiting indefinitely we need to keep going until we get a
  379. // real handler.
  380. if (msec == INFINITE)
  381. continue;
  382. ec = boost::system::error_code();
  383. return 0;
  384. }
  385. else if (completion_key == wake_for_dispatch)
  386. {
  387. // We have been woken up to try to acquire responsibility for dispatching
  388. // timers and completed operations.
  389. }
  390. else
  391. {
  392. // Indicate that there is no longer an in-flight stop event.
  393. ::InterlockedExchange(&stop_event_posted_, 0);
  394. // The stopped_ flag is always checked to ensure that any leftover
  395. // stop events from a previous run invocation are ignored.
  396. if (::InterlockedExchangeAdd(&stopped_, 0) != 0)
  397. {
  398. // Wake up next thread that is blocked on GetQueuedCompletionStatus.
  399. if (::InterlockedExchange(&stop_event_posted_, 1) == 0)
  400. {
  401. if (!::PostQueuedCompletionStatus(iocp_.handle, 0, 0, 0))
  402. {
  403. last_error = ::GetLastError();
  404. ec = boost::system::error_code(last_error,
  405. boost::asio::error::get_system_category());
  406. return 0;
  407. }
  408. }
  409. ec = boost::system::error_code();
  410. return 0;
  411. }
  412. }
  413. }
  414. }
  415. DWORD win_iocp_io_context::get_gqcs_timeout()
  416. {
  417. OSVERSIONINFOEX osvi;
  418. ZeroMemory(&osvi, sizeof(osvi));
  419. osvi.dwOSVersionInfoSize = sizeof(osvi);
  420. osvi.dwMajorVersion = 6ul;
  421. const uint64_t condition_mask = ::VerSetConditionMask(
  422. 0, VER_MAJORVERSION, VER_GREATER_EQUAL);
  423. if (!!::VerifyVersionInfo(&osvi, VER_MAJORVERSION, condition_mask))
  424. return INFINITE;
  425. return default_gqcs_timeout;
  426. }
  427. void win_iocp_io_context::do_add_timer_queue(timer_queue_base& queue)
  428. {
  429. mutex::scoped_lock lock(dispatch_mutex_);
  430. timer_queues_.insert(&queue);
  431. if (!waitable_timer_.handle)
  432. {
  433. waitable_timer_.handle = ::CreateWaitableTimer(0, FALSE, 0);
  434. if (waitable_timer_.handle == 0)
  435. {
  436. DWORD last_error = ::GetLastError();
  437. boost::system::error_code ec(last_error,
  438. boost::asio::error::get_system_category());
  439. boost::asio::detail::throw_error(ec, "timer");
  440. }
  441. LARGE_INTEGER timeout;
  442. timeout.QuadPart = -max_timeout_usec;
  443. timeout.QuadPart *= 10;
  444. ::SetWaitableTimer(waitable_timer_.handle,
  445. &timeout, max_timeout_msec, 0, 0, FALSE);
  446. }
  447. if (!timer_thread_.get())
  448. {
  449. timer_thread_function thread_function = { this };
  450. timer_thread_.reset(new thread(thread_function, 65536));
  451. }
  452. }
  453. void win_iocp_io_context::do_remove_timer_queue(timer_queue_base& queue)
  454. {
  455. mutex::scoped_lock lock(dispatch_mutex_);
  456. timer_queues_.erase(&queue);
  457. }
  458. void win_iocp_io_context::update_timeout()
  459. {
  460. if (timer_thread_.get())
  461. {
  462. // There's no point updating the waitable timer if the new timeout period
  463. // exceeds the maximum timeout. In that case, we might as well wait for the
  464. // existing period of the timer to expire.
  465. long timeout_usec = timer_queues_.wait_duration_usec(max_timeout_usec);
  466. if (timeout_usec < max_timeout_usec)
  467. {
  468. LARGE_INTEGER timeout;
  469. timeout.QuadPart = -timeout_usec;
  470. timeout.QuadPart *= 10;
  471. ::SetWaitableTimer(waitable_timer_.handle,
  472. &timeout, max_timeout_msec, 0, 0, FALSE);
  473. }
  474. }
  475. }
  476. } // namespace detail
  477. } // namespace asio
  478. } // namespace boost
  479. #include <boost/asio/detail/pop_options.hpp>
  480. #endif // defined(BOOST_ASIO_HAS_IOCP)
  481. #endif // BOOST_ASIO_DETAIL_IMPL_WIN_IOCP_IO_CONTEXT_IPP