scheduler_vanilla.c 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. /* Copyright (c) 2017, The Tor Project, Inc. */
  2. /* See LICENSE for licensing information */
  3. #include <event2/event.h>
  4. #include "or.h"
  5. #include "config.h"
  6. #define TOR_CHANNEL_INTERNAL_
  7. #include "channel.h"
  8. #define SCHEDULER_PRIVATE_
  9. #include "scheduler.h"
  10. /*****************************************************************************
  11. * Other internal data
  12. *****************************************************************************/
  13. /* Maximum cells to flush in a single call to channel_flush_some_cells(); */
  14. #define MAX_FLUSH_CELLS 1000
  15. /*****************************************************************************
  16. * Externally called function implementations
  17. *****************************************************************************/
  18. /* Return true iff the scheduler has work to perform. */
  19. static int
  20. have_work(void)
  21. {
  22. smartlist_t *cp = get_channels_pending();
  23. IF_BUG_ONCE(!cp) {
  24. return 0; // channels_pending doesn't exist so... no work?
  25. }
  26. return smartlist_len(cp) > 0;
  27. }
  28. /** Retrigger the scheduler in a way safe to use from the callback */
  29. static void
  30. vanilla_scheduler_schedule(void)
  31. {
  32. if (!have_work()) {
  33. return;
  34. }
  35. /* Activate our event so it can process channels. */
  36. scheduler_ev_active(EV_TIMEOUT);
  37. }
  38. static void
  39. vanilla_scheduler_run(void)
  40. {
  41. int n_cells, n_chans_before, n_chans_after;
  42. ssize_t flushed, flushed_this_time;
  43. smartlist_t *cp = get_channels_pending();
  44. smartlist_t *to_readd = NULL;
  45. channel_t *chan = NULL;
  46. log_debug(LD_SCHED, "We have a chance to run the scheduler");
  47. n_chans_before = smartlist_len(cp);
  48. while (smartlist_len(cp) > 0) {
  49. /* Pop off a channel */
  50. chan = smartlist_pqueue_pop(cp,
  51. scheduler_compare_channels,
  52. offsetof(channel_t, sched_heap_idx));
  53. IF_BUG_ONCE(!chan) {
  54. /* Some-freaking-how a NULL got into the channels_pending. That should
  55. * never happen, but it should be harmless to ignore it and keep looping.
  56. */
  57. continue;
  58. }
  59. /* Figure out how many cells we can write */
  60. n_cells = channel_num_cells_writeable(chan);
  61. if (n_cells > 0) {
  62. log_debug(LD_SCHED,
  63. "Scheduler saw pending channel " U64_FORMAT " at %p with "
  64. "%d cells writeable",
  65. U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
  66. flushed = 0;
  67. while (flushed < n_cells) {
  68. flushed_this_time =
  69. channel_flush_some_cells(chan,
  70. MIN(MAX_FLUSH_CELLS, (size_t) n_cells - flushed));
  71. if (flushed_this_time <= 0) break;
  72. flushed += flushed_this_time;
  73. }
  74. if (flushed < n_cells) {
  75. /* We ran out of cells to flush */
  76. chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
  77. log_debug(LD_SCHED,
  78. "Channel " U64_FORMAT " at %p "
  79. "entered waiting_for_cells from pending",
  80. U64_PRINTF_ARG(chan->global_identifier),
  81. chan);
  82. } else {
  83. /* The channel may still have some cells */
  84. if (channel_more_to_flush(chan)) {
  85. /* The channel goes to either pending or waiting_to_write */
  86. if (channel_num_cells_writeable(chan) > 0) {
  87. /* Add it back to pending later */
  88. if (!to_readd) to_readd = smartlist_new();
  89. smartlist_add(to_readd, chan);
  90. log_debug(LD_SCHED,
  91. "Channel " U64_FORMAT " at %p "
  92. "is still pending",
  93. U64_PRINTF_ARG(chan->global_identifier),
  94. chan);
  95. } else {
  96. /* It's waiting to be able to write more */
  97. chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
  98. log_debug(LD_SCHED,
  99. "Channel " U64_FORMAT " at %p "
  100. "entered waiting_to_write from pending",
  101. U64_PRINTF_ARG(chan->global_identifier),
  102. chan);
  103. }
  104. } else {
  105. /* No cells left; it can go to idle or waiting_for_cells */
  106. if (channel_num_cells_writeable(chan) > 0) {
  107. /*
  108. * It can still accept writes, so it goes to
  109. * waiting_for_cells
  110. */
  111. chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
  112. log_debug(LD_SCHED,
  113. "Channel " U64_FORMAT " at %p "
  114. "entered waiting_for_cells from pending",
  115. U64_PRINTF_ARG(chan->global_identifier),
  116. chan);
  117. } else {
  118. /*
  119. * We exactly filled up the output queue with all available
  120. * cells; go to idle.
  121. */
  122. chan->scheduler_state = SCHED_CHAN_IDLE;
  123. log_debug(LD_SCHED,
  124. "Channel " U64_FORMAT " at %p "
  125. "become idle from pending",
  126. U64_PRINTF_ARG(chan->global_identifier),
  127. chan);
  128. }
  129. }
  130. }
  131. log_debug(LD_SCHED,
  132. "Scheduler flushed %d cells onto pending channel "
  133. U64_FORMAT " at %p",
  134. (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
  135. chan);
  136. } else {
  137. log_info(LD_SCHED,
  138. "Scheduler saw pending channel " U64_FORMAT " at %p with "
  139. "no cells writeable",
  140. U64_PRINTF_ARG(chan->global_identifier), chan);
  141. /* Put it back to WAITING_TO_WRITE */
  142. chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
  143. }
  144. }
  145. /* Readd any channels we need to */
  146. if (to_readd) {
  147. SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) {
  148. readd_chan->scheduler_state = SCHED_CHAN_PENDING;
  149. smartlist_pqueue_add(cp,
  150. scheduler_compare_channels,
  151. offsetof(channel_t, sched_heap_idx),
  152. readd_chan);
  153. } SMARTLIST_FOREACH_END(readd_chan);
  154. smartlist_free(to_readd);
  155. }
  156. n_chans_after = smartlist_len(cp);
  157. log_debug(LD_SCHED, "Scheduler handled %d of %d pending channels",
  158. n_chans_before - n_chans_after, n_chans_before);
  159. }
  160. /* Stores the vanilla scheduler function pointers. */
  161. static scheduler_t vanilla_scheduler = {
  162. .free_all = NULL,
  163. .on_channel_free = NULL,
  164. .init = NULL,
  165. .on_new_consensus = NULL,
  166. .schedule = vanilla_scheduler_schedule,
  167. .run = vanilla_scheduler_run,
  168. .on_new_options = NULL,
  169. };
  170. scheduler_t *
  171. get_vanilla_scheduler(void)
  172. {
  173. return &vanilla_scheduler;
  174. }