scheduler_vanilla.c 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. /* Copyright (c) 2017, The Tor Project, Inc. */
  2. /* See LICENSE for licensing information */
  3. #include <event2/event.h>
  4. #include "or.h"
  5. #include "config.h"
  6. #define TOR_CHANNEL_INTERNAL_
  7. #include "channel.h"
  8. #define SCHEDULER_PRIVATE_
  9. #include "scheduler.h"
  10. /*****************************************************************************
  11. * Other internal data
  12. *****************************************************************************/
  13. /* Maximum cells to flush in a single call to channel_flush_some_cells(); */
  14. #define MAX_FLUSH_CELLS 1000
  15. static scheduler_t *vanilla_scheduler = NULL;
  16. /*****************************************************************************
  17. * Externally called function implementations
  18. *****************************************************************************/
  19. /* Return true iff the scheduler has work to perform. */
  20. static int
  21. have_work(void)
  22. {
  23. smartlist_t *cp = get_channels_pending();
  24. tor_assert(cp);
  25. return smartlist_len(cp) > 0;
  26. }
  27. /** Retrigger the scheduler in a way safe to use from the callback */
  28. static void
  29. vanilla_scheduler_schedule(void)
  30. {
  31. if (!have_work()) {
  32. return;
  33. }
  34. struct event *ev = get_run_sched_ev();
  35. tor_assert(ev);
  36. event_active(ev, EV_TIMEOUT, 1);
  37. }
  38. static void
  39. vanilla_scheduler_run(void)
  40. {
  41. int n_cells, n_chans_before, n_chans_after;
  42. ssize_t flushed, flushed_this_time;
  43. smartlist_t *cp = get_channels_pending();
  44. smartlist_t *to_readd = NULL;
  45. channel_t *chan = NULL;
  46. log_debug(LD_SCHED, "We have a chance to run the scheduler");
  47. n_chans_before = smartlist_len(cp);
  48. while (smartlist_len(cp) > 0) {
  49. /* Pop off a channel */
  50. chan = smartlist_pqueue_pop(cp,
  51. scheduler_compare_channels,
  52. offsetof(channel_t, sched_heap_idx));
  53. tor_assert(chan);
  54. /* Figure out how many cells we can write */
  55. n_cells = channel_num_cells_writeable(chan);
  56. if (n_cells > 0) {
  57. log_debug(LD_SCHED,
  58. "Scheduler saw pending channel " U64_FORMAT " at %p with "
  59. "%d cells writeable",
  60. U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
  61. flushed = 0;
  62. while (flushed < n_cells) {
  63. flushed_this_time =
  64. channel_flush_some_cells(chan,
  65. MIN(MAX_FLUSH_CELLS, (size_t) n_cells - flushed));
  66. if (flushed_this_time <= 0) break;
  67. flushed += flushed_this_time;
  68. }
  69. if (flushed < n_cells) {
  70. /* We ran out of cells to flush */
  71. chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
  72. log_debug(LD_SCHED,
  73. "Channel " U64_FORMAT " at %p "
  74. "entered waiting_for_cells from pending",
  75. U64_PRINTF_ARG(chan->global_identifier),
  76. chan);
  77. } else {
  78. /* The channel may still have some cells */
  79. if (channel_more_to_flush(chan)) {
  80. /* The channel goes to either pending or waiting_to_write */
  81. if (channel_num_cells_writeable(chan) > 0) {
  82. /* Add it back to pending later */
  83. if (!to_readd) to_readd = smartlist_new();
  84. smartlist_add(to_readd, chan);
  85. log_debug(LD_SCHED,
  86. "Channel " U64_FORMAT " at %p "
  87. "is still pending",
  88. U64_PRINTF_ARG(chan->global_identifier),
  89. chan);
  90. } else {
  91. /* It's waiting to be able to write more */
  92. chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
  93. log_debug(LD_SCHED,
  94. "Channel " U64_FORMAT " at %p "
  95. "entered waiting_to_write from pending",
  96. U64_PRINTF_ARG(chan->global_identifier),
  97. chan);
  98. }
  99. } else {
  100. /* No cells left; it can go to idle or waiting_for_cells */
  101. if (channel_num_cells_writeable(chan) > 0) {
  102. /*
  103. * It can still accept writes, so it goes to
  104. * waiting_for_cells
  105. */
  106. chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
  107. log_debug(LD_SCHED,
  108. "Channel " U64_FORMAT " at %p "
  109. "entered waiting_for_cells from pending",
  110. U64_PRINTF_ARG(chan->global_identifier),
  111. chan);
  112. } else {
  113. /*
  114. * We exactly filled up the output queue with all available
  115. * cells; go to idle.
  116. */
  117. chan->scheduler_state = SCHED_CHAN_IDLE;
  118. log_debug(LD_SCHED,
  119. "Channel " U64_FORMAT " at %p "
  120. "become idle from pending",
  121. U64_PRINTF_ARG(chan->global_identifier),
  122. chan);
  123. }
  124. }
  125. }
  126. log_debug(LD_SCHED,
  127. "Scheduler flushed %d cells onto pending channel "
  128. U64_FORMAT " at %p",
  129. (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
  130. chan);
  131. } else {
  132. log_info(LD_SCHED,
  133. "Scheduler saw pending channel " U64_FORMAT " at %p with "
  134. "no cells writeable",
  135. U64_PRINTF_ARG(chan->global_identifier), chan);
  136. /* Put it back to WAITING_TO_WRITE */
  137. chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
  138. }
  139. }
  140. /* Readd any channels we need to */
  141. if (to_readd) {
  142. SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) {
  143. readd_chan->scheduler_state = SCHED_CHAN_PENDING;
  144. smartlist_pqueue_add(cp,
  145. scheduler_compare_channels,
  146. offsetof(channel_t, sched_heap_idx),
  147. readd_chan);
  148. } SMARTLIST_FOREACH_END(readd_chan);
  149. smartlist_free(to_readd);
  150. }
  151. n_chans_after = smartlist_len(cp);
  152. log_debug(LD_SCHED, "Scheduler handled %d of %d pending channels",
  153. n_chans_before - n_chans_after, n_chans_before);
  154. }
  155. scheduler_t *
  156. get_vanilla_scheduler(void)
  157. {
  158. if (!vanilla_scheduler) {
  159. log_debug(LD_SCHED, "Initializing vanilla scheduler struct");
  160. vanilla_scheduler = tor_malloc_zero(sizeof(*vanilla_scheduler));
  161. vanilla_scheduler->free_all = NULL;
  162. vanilla_scheduler->on_channel_free = NULL;
  163. vanilla_scheduler->init = NULL;
  164. vanilla_scheduler->on_new_consensus = NULL;
  165. vanilla_scheduler->schedule = vanilla_scheduler_schedule;
  166. vanilla_scheduler->run = vanilla_scheduler_run;
  167. vanilla_scheduler->on_new_options = NULL;
  168. }
  169. return vanilla_scheduler;
  170. }