scheduler_vanilla.c 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. /* Copyright (c) 2017, The Tor Project, Inc. */
  2. /* See LICENSE for licensing information */
  3. #include "or.h"
  4. #include "config.h"
  5. #define TOR_CHANNEL_INTERNAL_
  6. #include "channel.h"
  7. #define SCHEDULER_PRIVATE_
  8. #include "scheduler.h"
  9. /*****************************************************************************
  10. * Other internal data
  11. *****************************************************************************/
  12. /* Maximum cells to flush in a single call to channel_flush_some_cells(); */
  13. #define MAX_FLUSH_CELLS 1000
  14. /*****************************************************************************
  15. * Externally called function implementations
  16. *****************************************************************************/
  17. /* Return true iff the scheduler has work to perform. */
  18. static int
  19. have_work(void)
  20. {
  21. smartlist_t *cp = get_channels_pending();
  22. IF_BUG_ONCE(!cp) {
  23. return 0; // channels_pending doesn't exist so... no work?
  24. }
  25. return smartlist_len(cp) > 0;
  26. }
  27. /** Re-trigger the scheduler in a way safe to use from the callback */
  28. static void
  29. vanilla_scheduler_schedule(void)
  30. {
  31. if (!have_work()) {
  32. return;
  33. }
  34. /* Activate our event so it can process channels. */
  35. scheduler_ev_active();
  36. }
  37. static void
  38. vanilla_scheduler_run(void)
  39. {
  40. int n_cells, n_chans_before, n_chans_after;
  41. ssize_t flushed, flushed_this_time;
  42. smartlist_t *cp = get_channels_pending();
  43. smartlist_t *to_readd = NULL;
  44. channel_t *chan = NULL;
  45. log_debug(LD_SCHED, "We have a chance to run the scheduler");
  46. n_chans_before = smartlist_len(cp);
  47. while (smartlist_len(cp) > 0) {
  48. /* Pop off a channel */
  49. chan = smartlist_pqueue_pop(cp,
  50. scheduler_compare_channels,
  51. offsetof(channel_t, sched_heap_idx));
  52. IF_BUG_ONCE(!chan) {
  53. /* Some-freaking-how a NULL got into the channels_pending. That should
  54. * never happen, but it should be harmless to ignore it and keep looping.
  55. */
  56. continue;
  57. }
  58. /* Figure out how many cells we can write */
  59. n_cells = channel_num_cells_writeable(chan);
  60. if (n_cells > 0) {
  61. log_debug(LD_SCHED,
  62. "Scheduler saw pending channel " U64_FORMAT " at %p with "
  63. "%d cells writeable",
  64. U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
  65. flushed = 0;
  66. while (flushed < n_cells) {
  67. flushed_this_time =
  68. channel_flush_some_cells(chan,
  69. MIN(MAX_FLUSH_CELLS, (size_t) n_cells - flushed));
  70. if (flushed_this_time <= 0) break;
  71. flushed += flushed_this_time;
  72. }
  73. if (flushed < n_cells) {
  74. /* We ran out of cells to flush */
  75. scheduler_set_channel_state(chan, SCHED_CHAN_WAITING_FOR_CELLS);
  76. } else {
  77. /* The channel may still have some cells */
  78. if (channel_more_to_flush(chan)) {
  79. /* The channel goes to either pending or waiting_to_write */
  80. if (channel_num_cells_writeable(chan) > 0) {
  81. /* Add it back to pending later */
  82. if (!to_readd) to_readd = smartlist_new();
  83. smartlist_add(to_readd, chan);
  84. log_debug(LD_SCHED,
  85. "Channel " U64_FORMAT " at %p "
  86. "is still pending",
  87. U64_PRINTF_ARG(chan->global_identifier),
  88. chan);
  89. } else {
  90. /* It's waiting to be able to write more */
  91. scheduler_set_channel_state(chan, SCHED_CHAN_WAITING_TO_WRITE);
  92. }
  93. } else {
  94. /* No cells left; it can go to idle or waiting_for_cells */
  95. if (channel_num_cells_writeable(chan) > 0) {
  96. /*
  97. * It can still accept writes, so it goes to
  98. * waiting_for_cells
  99. */
  100. scheduler_set_channel_state(chan, SCHED_CHAN_WAITING_FOR_CELLS);
  101. } else {
  102. /*
  103. * We exactly filled up the output queue with all available
  104. * cells; go to idle.
  105. */
  106. scheduler_set_channel_state(chan, SCHED_CHAN_IDLE);
  107. }
  108. }
  109. }
  110. log_debug(LD_SCHED,
  111. "Scheduler flushed %d cells onto pending channel "
  112. U64_FORMAT " at %p",
  113. (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
  114. chan);
  115. } else {
  116. log_info(LD_SCHED,
  117. "Scheduler saw pending channel " U64_FORMAT " at %p with "
  118. "no cells writeable",
  119. U64_PRINTF_ARG(chan->global_identifier), chan);
  120. /* Put it back to WAITING_TO_WRITE */
  121. scheduler_set_channel_state(chan, SCHED_CHAN_WAITING_TO_WRITE);
  122. }
  123. }
  124. /* Readd any channels we need to */
  125. if (to_readd) {
  126. SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) {
  127. scheduler_set_channel_state(readd_chan, SCHED_CHAN_PENDING);
  128. smartlist_pqueue_add(cp,
  129. scheduler_compare_channels,
  130. offsetof(channel_t, sched_heap_idx),
  131. readd_chan);
  132. } SMARTLIST_FOREACH_END(readd_chan);
  133. smartlist_free(to_readd);
  134. }
  135. n_chans_after = smartlist_len(cp);
  136. log_debug(LD_SCHED, "Scheduler handled %d of %d pending channels",
  137. n_chans_before - n_chans_after, n_chans_before);
  138. }
  139. /* Stores the vanilla scheduler function pointers. */
  140. static scheduler_t vanilla_scheduler = {
  141. .type = SCHEDULER_VANILLA,
  142. .free_all = NULL,
  143. .on_channel_free = NULL,
  144. .init = NULL,
  145. .on_new_consensus = NULL,
  146. .schedule = vanilla_scheduler_schedule,
  147. .run = vanilla_scheduler_run,
  148. .on_new_options = NULL,
  149. };
  150. scheduler_t *
  151. get_vanilla_scheduler(void)
  152. {
  153. return &vanilla_scheduler;
  154. }