scheduler_vanilla.c 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. /* Copyright (c) 2017, The Tor Project, Inc. */
  2. /* See LICENSE for licensing information */
  3. #include <event2/event.h>
  4. #include "or.h"
  5. #include "config.h"
  6. #define TOR_CHANNEL_INTERNAL_
  7. #include "channel.h"
  8. #define SCHEDULER_PRIVATE_
  9. #include "scheduler.h"
  10. /*****************************************************************************
  11. * Other internal data
  12. *****************************************************************************/
  13. /* Maximum cells to flush in a single call to channel_flush_some_cells(); */
  14. #define MAX_FLUSH_CELLS 1000
  15. /* Stores the vanilla scheduler function pointers. */
  16. static scheduler_t *vanilla_scheduler = NULL;
  17. /*****************************************************************************
  18. * Externally called function implementations
  19. *****************************************************************************/
  20. /* Return true iff the scheduler has work to perform. */
  21. static int
  22. have_work(void)
  23. {
  24. smartlist_t *cp = get_channels_pending();
  25. IF_BUG_ONCE(!cp) {
  26. return 0; // channels_pending doesn't exist so... no work?
  27. }
  28. return smartlist_len(cp) > 0;
  29. }
  30. /** Retrigger the scheduler in a way safe to use from the callback */
  31. static void
  32. vanilla_scheduler_schedule(void)
  33. {
  34. if (!have_work()) {
  35. return;
  36. }
  37. /* Activate our event so it can process channels. */
  38. scheduler_ev_active(EV_TIMEOUT);
  39. }
  40. static void
  41. vanilla_scheduler_run(void)
  42. {
  43. int n_cells, n_chans_before, n_chans_after;
  44. ssize_t flushed, flushed_this_time;
  45. smartlist_t *cp = get_channels_pending();
  46. smartlist_t *to_readd = NULL;
  47. channel_t *chan = NULL;
  48. log_debug(LD_SCHED, "We have a chance to run the scheduler");
  49. n_chans_before = smartlist_len(cp);
  50. while (smartlist_len(cp) > 0) {
  51. /* Pop off a channel */
  52. chan = smartlist_pqueue_pop(cp,
  53. scheduler_compare_channels,
  54. offsetof(channel_t, sched_heap_idx));
  55. IF_BUG_ONCE(!chan) {
  56. /* Some-freaking-how a NULL got into the channels_pending. That should
  57. * never happen, but it should be harmless to ignore it and keep looping.
  58. */
  59. continue;
  60. }
  61. /* Figure out how many cells we can write */
  62. n_cells = channel_num_cells_writeable(chan);
  63. if (n_cells > 0) {
  64. log_debug(LD_SCHED,
  65. "Scheduler saw pending channel " U64_FORMAT " at %p with "
  66. "%d cells writeable",
  67. U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
  68. flushed = 0;
  69. while (flushed < n_cells) {
  70. flushed_this_time =
  71. channel_flush_some_cells(chan,
  72. MIN(MAX_FLUSH_CELLS, (size_t) n_cells - flushed));
  73. if (flushed_this_time <= 0) break;
  74. flushed += flushed_this_time;
  75. }
  76. if (flushed < n_cells) {
  77. /* We ran out of cells to flush */
  78. chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
  79. log_debug(LD_SCHED,
  80. "Channel " U64_FORMAT " at %p "
  81. "entered waiting_for_cells from pending",
  82. U64_PRINTF_ARG(chan->global_identifier),
  83. chan);
  84. } else {
  85. /* The channel may still have some cells */
  86. if (channel_more_to_flush(chan)) {
  87. /* The channel goes to either pending or waiting_to_write */
  88. if (channel_num_cells_writeable(chan) > 0) {
  89. /* Add it back to pending later */
  90. if (!to_readd) to_readd = smartlist_new();
  91. smartlist_add(to_readd, chan);
  92. log_debug(LD_SCHED,
  93. "Channel " U64_FORMAT " at %p "
  94. "is still pending",
  95. U64_PRINTF_ARG(chan->global_identifier),
  96. chan);
  97. } else {
  98. /* It's waiting to be able to write more */
  99. chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
  100. log_debug(LD_SCHED,
  101. "Channel " U64_FORMAT " at %p "
  102. "entered waiting_to_write from pending",
  103. U64_PRINTF_ARG(chan->global_identifier),
  104. chan);
  105. }
  106. } else {
  107. /* No cells left; it can go to idle or waiting_for_cells */
  108. if (channel_num_cells_writeable(chan) > 0) {
  109. /*
  110. * It can still accept writes, so it goes to
  111. * waiting_for_cells
  112. */
  113. chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
  114. log_debug(LD_SCHED,
  115. "Channel " U64_FORMAT " at %p "
  116. "entered waiting_for_cells from pending",
  117. U64_PRINTF_ARG(chan->global_identifier),
  118. chan);
  119. } else {
  120. /*
  121. * We exactly filled up the output queue with all available
  122. * cells; go to idle.
  123. */
  124. chan->scheduler_state = SCHED_CHAN_IDLE;
  125. log_debug(LD_SCHED,
  126. "Channel " U64_FORMAT " at %p "
  127. "become idle from pending",
  128. U64_PRINTF_ARG(chan->global_identifier),
  129. chan);
  130. }
  131. }
  132. }
  133. log_debug(LD_SCHED,
  134. "Scheduler flushed %d cells onto pending channel "
  135. U64_FORMAT " at %p",
  136. (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
  137. chan);
  138. } else {
  139. log_info(LD_SCHED,
  140. "Scheduler saw pending channel " U64_FORMAT " at %p with "
  141. "no cells writeable",
  142. U64_PRINTF_ARG(chan->global_identifier), chan);
  143. /* Put it back to WAITING_TO_WRITE */
  144. chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
  145. }
  146. }
  147. /* Readd any channels we need to */
  148. if (to_readd) {
  149. SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) {
  150. readd_chan->scheduler_state = SCHED_CHAN_PENDING;
  151. smartlist_pqueue_add(cp,
  152. scheduler_compare_channels,
  153. offsetof(channel_t, sched_heap_idx),
  154. readd_chan);
  155. } SMARTLIST_FOREACH_END(readd_chan);
  156. smartlist_free(to_readd);
  157. }
  158. n_chans_after = smartlist_len(cp);
  159. log_debug(LD_SCHED, "Scheduler handled %d of %d pending channels",
  160. n_chans_before - n_chans_after, n_chans_before);
  161. }
  162. scheduler_t *
  163. get_vanilla_scheduler(void)
  164. {
  165. if (!vanilla_scheduler) {
  166. log_debug(LD_SCHED, "Initializing vanilla scheduler struct");
  167. vanilla_scheduler = tor_malloc_zero(sizeof(*vanilla_scheduler));
  168. vanilla_scheduler->free_all = NULL;
  169. vanilla_scheduler->on_channel_free = NULL;
  170. vanilla_scheduler->init = NULL;
  171. vanilla_scheduler->on_new_consensus = NULL;
  172. vanilla_scheduler->schedule = vanilla_scheduler_schedule;
  173. vanilla_scheduler->run = vanilla_scheduler_run;
  174. vanilla_scheduler->on_new_options = NULL;
  175. }
  176. return vanilla_scheduler;
  177. }