dev_poll_reactor.ipp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. //
  2. // detail/impl/dev_poll_reactor.ipp
  3. // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
  4. //
  5. // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com)
  6. //
  7. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  8. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  9. //
  10. #ifndef BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP
  11. #define BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP
  12. #if defined(_MSC_VER) && (_MSC_VER >= 1200)
  13. # pragma once
  14. #endif // defined(_MSC_VER) && (_MSC_VER >= 1200)
  15. #include <boost/asio/detail/config.hpp>
  16. #if defined(BOOST_ASIO_HAS_DEV_POLL)
  17. #include <boost/asio/detail/dev_poll_reactor.hpp>
  18. #include <boost/asio/detail/assert.hpp>
  19. #include <boost/asio/detail/throw_error.hpp>
  20. #include <boost/asio/error.hpp>
  21. #include <boost/asio/detail/push_options.hpp>
  22. namespace boost {
  23. namespace asio {
  24. namespace detail {
  25. dev_poll_reactor::dev_poll_reactor(boost::asio::execution_context& ctx)
  26. : boost::asio::detail::execution_context_service_base<dev_poll_reactor>(ctx),
  27. scheduler_(use_service<scheduler>(ctx)),
  28. mutex_(),
  29. dev_poll_fd_(do_dev_poll_create()),
  30. interrupter_(),
  31. shutdown_(false)
  32. {
  33. // Add the interrupter's descriptor to /dev/poll.
  34. ::pollfd ev = { 0, 0, 0 };
  35. ev.fd = interrupter_.read_descriptor();
  36. ev.events = POLLIN | POLLERR;
  37. ev.revents = 0;
  38. ::write(dev_poll_fd_, &ev, sizeof(ev));
  39. }
  40. dev_poll_reactor::~dev_poll_reactor()
  41. {
  42. shutdown();
  43. ::close(dev_poll_fd_);
  44. }
  45. void dev_poll_reactor::shutdown()
  46. {
  47. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  48. shutdown_ = true;
  49. lock.unlock();
  50. op_queue<operation> ops;
  51. for (int i = 0; i < max_ops; ++i)
  52. op_queue_[i].get_all_operations(ops);
  53. timer_queues_.get_all_timers(ops);
  54. scheduler_.abandon_operations(ops);
  55. }
  56. void dev_poll_reactor::notify_fork(
  57. boost::asio::execution_context::fork_event fork_ev)
  58. {
  59. if (fork_ev == boost::asio::execution_context::fork_child)
  60. {
  61. detail::mutex::scoped_lock lock(mutex_);
  62. if (dev_poll_fd_ != -1)
  63. ::close(dev_poll_fd_);
  64. dev_poll_fd_ = -1;
  65. dev_poll_fd_ = do_dev_poll_create();
  66. interrupter_.recreate();
  67. // Add the interrupter's descriptor to /dev/poll.
  68. ::pollfd ev = { 0, 0, 0 };
  69. ev.fd = interrupter_.read_descriptor();
  70. ev.events = POLLIN | POLLERR;
  71. ev.revents = 0;
  72. ::write(dev_poll_fd_, &ev, sizeof(ev));
  73. // Re-register all descriptors with /dev/poll. The changes will be written
  74. // to the /dev/poll descriptor the next time the reactor is run.
  75. for (int i = 0; i < max_ops; ++i)
  76. {
  77. reactor_op_queue<socket_type>::iterator iter = op_queue_[i].begin();
  78. reactor_op_queue<socket_type>::iterator end = op_queue_[i].end();
  79. for (; iter != end; ++iter)
  80. {
  81. ::pollfd& pending_ev = add_pending_event_change(iter->first);
  82. pending_ev.events |= POLLERR | POLLHUP;
  83. switch (i)
  84. {
  85. case read_op: pending_ev.events |= POLLIN; break;
  86. case write_op: pending_ev.events |= POLLOUT; break;
  87. case except_op: pending_ev.events |= POLLPRI; break;
  88. default: break;
  89. }
  90. }
  91. }
  92. interrupter_.interrupt();
  93. }
  94. }
  95. void dev_poll_reactor::init_task()
  96. {
  97. scheduler_.init_task();
  98. }
  99. int dev_poll_reactor::register_descriptor(socket_type, per_descriptor_data&)
  100. {
  101. return 0;
  102. }
  103. int dev_poll_reactor::register_internal_descriptor(int op_type,
  104. socket_type descriptor, per_descriptor_data&, reactor_op* op)
  105. {
  106. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  107. op_queue_[op_type].enqueue_operation(descriptor, op);
  108. ::pollfd& ev = add_pending_event_change(descriptor);
  109. ev.events = POLLERR | POLLHUP;
  110. switch (op_type)
  111. {
  112. case read_op: ev.events |= POLLIN; break;
  113. case write_op: ev.events |= POLLOUT; break;
  114. case except_op: ev.events |= POLLPRI; break;
  115. default: break;
  116. }
  117. interrupter_.interrupt();
  118. return 0;
  119. }
  120. void dev_poll_reactor::move_descriptor(socket_type,
  121. dev_poll_reactor::per_descriptor_data&,
  122. dev_poll_reactor::per_descriptor_data&)
  123. {
  124. }
  125. void dev_poll_reactor::start_op(int op_type, socket_type descriptor,
  126. dev_poll_reactor::per_descriptor_data&, reactor_op* op,
  127. bool is_continuation, bool allow_speculative)
  128. {
  129. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  130. if (shutdown_)
  131. {
  132. post_immediate_completion(op, is_continuation);
  133. return;
  134. }
  135. if (allow_speculative)
  136. {
  137. if (op_type != read_op || !op_queue_[except_op].has_operation(descriptor))
  138. {
  139. if (!op_queue_[op_type].has_operation(descriptor))
  140. {
  141. if (op->perform())
  142. {
  143. lock.unlock();
  144. scheduler_.post_immediate_completion(op, is_continuation);
  145. return;
  146. }
  147. }
  148. }
  149. }
  150. bool first = op_queue_[op_type].enqueue_operation(descriptor, op);
  151. scheduler_.work_started();
  152. if (first)
  153. {
  154. ::pollfd& ev = add_pending_event_change(descriptor);
  155. ev.events = POLLERR | POLLHUP;
  156. if (op_type == read_op
  157. || op_queue_[read_op].has_operation(descriptor))
  158. ev.events |= POLLIN;
  159. if (op_type == write_op
  160. || op_queue_[write_op].has_operation(descriptor))
  161. ev.events |= POLLOUT;
  162. if (op_type == except_op
  163. || op_queue_[except_op].has_operation(descriptor))
  164. ev.events |= POLLPRI;
  165. interrupter_.interrupt();
  166. }
  167. }
  168. void dev_poll_reactor::cancel_ops(socket_type descriptor,
  169. dev_poll_reactor::per_descriptor_data&)
  170. {
  171. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  172. cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted);
  173. }
  174. void dev_poll_reactor::deregister_descriptor(socket_type descriptor,
  175. dev_poll_reactor::per_descriptor_data&, bool)
  176. {
  177. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  178. // Remove the descriptor from /dev/poll.
  179. ::pollfd& ev = add_pending_event_change(descriptor);
  180. ev.events = POLLREMOVE;
  181. interrupter_.interrupt();
  182. // Cancel any outstanding operations associated with the descriptor.
  183. cancel_ops_unlocked(descriptor, boost::asio::error::operation_aborted);
  184. }
  185. void dev_poll_reactor::deregister_internal_descriptor(
  186. socket_type descriptor, dev_poll_reactor::per_descriptor_data&)
  187. {
  188. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  189. // Remove the descriptor from /dev/poll. Since this function is only called
  190. // during a fork, we can apply the change immediately.
  191. ::pollfd ev = { 0, 0, 0 };
  192. ev.fd = descriptor;
  193. ev.events = POLLREMOVE;
  194. ev.revents = 0;
  195. ::write(dev_poll_fd_, &ev, sizeof(ev));
  196. // Destroy all operations associated with the descriptor.
  197. op_queue<operation> ops;
  198. boost::system::error_code ec;
  199. for (int i = 0; i < max_ops; ++i)
  200. op_queue_[i].cancel_operations(descriptor, ops, ec);
  201. }
  202. void dev_poll_reactor::cleanup_descriptor_data(
  203. dev_poll_reactor::per_descriptor_data&)
  204. {
  205. }
  206. void dev_poll_reactor::run(long usec, op_queue<operation>& ops)
  207. {
  208. boost::asio::detail::mutex::scoped_lock lock(mutex_);
  209. // We can return immediately if there's no work to do and the reactor is
  210. // not supposed to block.
  211. if (usec == 0 && op_queue_[read_op].empty() && op_queue_[write_op].empty()
  212. && op_queue_[except_op].empty() && timer_queues_.all_empty())
  213. return;
  214. // Write the pending event registration changes to the /dev/poll descriptor.
  215. std::size_t events_size = sizeof(::pollfd) * pending_event_changes_.size();
  216. if (events_size > 0)
  217. {
  218. errno = 0;
  219. int result = ::write(dev_poll_fd_,
  220. &pending_event_changes_[0], events_size);
  221. if (result != static_cast<int>(events_size))
  222. {
  223. boost::system::error_code ec = boost::system::error_code(
  224. errno, boost::asio::error::get_system_category());
  225. for (std::size_t i = 0; i < pending_event_changes_.size(); ++i)
  226. {
  227. int descriptor = pending_event_changes_[i].fd;
  228. for (int j = 0; j < max_ops; ++j)
  229. op_queue_[j].cancel_operations(descriptor, ops, ec);
  230. }
  231. }
  232. pending_event_changes_.clear();
  233. pending_event_change_index_.clear();
  234. }
  235. // Calculate timeout.
  236. int timeout;
  237. if (usec == 0)
  238. timeout = 0;
  239. else
  240. {
  241. timeout = (usec < 0) ? -1 : ((usec - 1) / 1000 + 1);
  242. timeout = get_timeout(timeout);
  243. }
  244. lock.unlock();
  245. // Block on the /dev/poll descriptor.
  246. ::pollfd events[128] = { { 0, 0, 0 } };
  247. ::dvpoll dp = { 0, 0, 0 };
  248. dp.dp_fds = events;
  249. dp.dp_nfds = 128;
  250. dp.dp_timeout = timeout;
  251. int num_events = ::ioctl(dev_poll_fd_, DP_POLL, &dp);
  252. lock.lock();
  253. // Dispatch the waiting events.
  254. for (int i = 0; i < num_events; ++i)
  255. {
  256. int descriptor = events[i].fd;
  257. if (descriptor == interrupter_.read_descriptor())
  258. {
  259. interrupter_.reset();
  260. }
  261. else
  262. {
  263. bool more_reads = false;
  264. bool more_writes = false;
  265. bool more_except = false;
  266. // Exception operations must be processed first to ensure that any
  267. // out-of-band data is read before normal data.
  268. if (events[i].events & (POLLPRI | POLLERR | POLLHUP))
  269. more_except =
  270. op_queue_[except_op].perform_operations(descriptor, ops);
  271. else
  272. more_except = op_queue_[except_op].has_operation(descriptor);
  273. if (events[i].events & (POLLIN | POLLERR | POLLHUP))
  274. more_reads = op_queue_[read_op].perform_operations(descriptor, ops);
  275. else
  276. more_reads = op_queue_[read_op].has_operation(descriptor);
  277. if (events[i].events & (POLLOUT | POLLERR | POLLHUP))
  278. more_writes = op_queue_[write_op].perform_operations(descriptor, ops);
  279. else
  280. more_writes = op_queue_[write_op].has_operation(descriptor);
  281. if ((events[i].events & (POLLERR | POLLHUP)) != 0
  282. && !more_except && !more_reads && !more_writes)
  283. {
  284. // If we have an event and no operations associated with the
  285. // descriptor then we need to delete the descriptor from /dev/poll.
  286. // The poll operation can produce POLLHUP or POLLERR events when there
  287. // is no operation pending, so if we do not remove the descriptor we
  288. // can end up in a tight polling loop.
  289. ::pollfd ev = { 0, 0, 0 };
  290. ev.fd = descriptor;
  291. ev.events = POLLREMOVE;
  292. ev.revents = 0;
  293. ::write(dev_poll_fd_, &ev, sizeof(ev));
  294. }
  295. else
  296. {
  297. ::pollfd ev = { 0, 0, 0 };
  298. ev.fd = descriptor;
  299. ev.events = POLLERR | POLLHUP;
  300. if (more_reads)
  301. ev.events |= POLLIN;
  302. if (more_writes)
  303. ev.events |= POLLOUT;
  304. if (more_except)
  305. ev.events |= POLLPRI;
  306. ev.revents = 0;
  307. int result = ::write(dev_poll_fd_, &ev, sizeof(ev));
  308. if (result != sizeof(ev))
  309. {
  310. boost::system::error_code ec(errno,
  311. boost::asio::error::get_system_category());
  312. for (int j = 0; j < max_ops; ++j)
  313. op_queue_[j].cancel_operations(descriptor, ops, ec);
  314. }
  315. }
  316. }
  317. }
  318. timer_queues_.get_ready_timers(ops);
  319. }
  320. void dev_poll_reactor::interrupt()
  321. {
  322. interrupter_.interrupt();
  323. }
  324. int dev_poll_reactor::do_dev_poll_create()
  325. {
  326. int fd = ::open("/dev/poll", O_RDWR);
  327. if (fd == -1)
  328. {
  329. boost::system::error_code ec(errno,
  330. boost::asio::error::get_system_category());
  331. boost::asio::detail::throw_error(ec, "/dev/poll");
  332. }
  333. return fd;
  334. }
  335. void dev_poll_reactor::do_add_timer_queue(timer_queue_base& queue)
  336. {
  337. mutex::scoped_lock lock(mutex_);
  338. timer_queues_.insert(&queue);
  339. }
  340. void dev_poll_reactor::do_remove_timer_queue(timer_queue_base& queue)
  341. {
  342. mutex::scoped_lock lock(mutex_);
  343. timer_queues_.erase(&queue);
  344. }
  345. int dev_poll_reactor::get_timeout(int msec)
  346. {
  347. // By default we will wait no longer than 5 minutes. This will ensure that
  348. // any changes to the system clock are detected after no longer than this.
  349. const int max_msec = 5 * 60 * 1000;
  350. return timer_queues_.wait_duration_msec(
  351. (msec < 0 || max_msec < msec) ? max_msec : msec);
  352. }
  353. void dev_poll_reactor::cancel_ops_unlocked(socket_type descriptor,
  354. const boost::system::error_code& ec)
  355. {
  356. bool need_interrupt = false;
  357. op_queue<operation> ops;
  358. for (int i = 0; i < max_ops; ++i)
  359. need_interrupt = op_queue_[i].cancel_operations(
  360. descriptor, ops, ec) || need_interrupt;
  361. scheduler_.post_deferred_completions(ops);
  362. if (need_interrupt)
  363. interrupter_.interrupt();
  364. }
  365. ::pollfd& dev_poll_reactor::add_pending_event_change(int descriptor)
  366. {
  367. hash_map<int, std::size_t>::iterator iter
  368. = pending_event_change_index_.find(descriptor);
  369. if (iter == pending_event_change_index_.end())
  370. {
  371. std::size_t index = pending_event_changes_.size();
  372. pending_event_changes_.reserve(pending_event_changes_.size() + 1);
  373. pending_event_change_index_.insert(std::make_pair(descriptor, index));
  374. pending_event_changes_.push_back(::pollfd());
  375. pending_event_changes_[index].fd = descriptor;
  376. pending_event_changes_[index].revents = 0;
  377. return pending_event_changes_[index];
  378. }
  379. else
  380. {
  381. return pending_event_changes_[iter->second];
  382. }
  383. }
  384. } // namespace detail
  385. } // namespace asio
  386. } // namespace boost
  387. #include <boost/asio/detail/pop_options.hpp>
  388. #endif // defined(BOOST_ASIO_HAS_DEV_POLL)
  389. #endif // BOOST_ASIO_DETAIL_IMPL_DEV_POLL_REACTOR_IPP