Browse Source

sched: Implement the KIST scheduler

Closes #12541

Signed-off-by: David Goulet <dgoulet@torproject.org>
Matt Traudt 6 years ago
parent
commit
dde358667d
7 changed files with 1591 additions and 644 deletions
  1. 0 4
      src/or/channel.c
  2. 3 8
      src/or/config.c
  3. 233 406
      src/or/scheduler.c
  4. 149 36
      src/or/scheduler.h
  5. 585 0
      src/or/scheduler_kist.c
  6. 186 0
      src/or/scheduler_vanilla.c
  7. 435 190
      src/test/test_scheduler.c

+ 0 - 4
src/or/channel.c

@@ -4826,8 +4826,6 @@ channel_update_xmit_queue_size(channel_t *chan)
                 U64_FORMAT ", new size is " U64_FORMAT,
                 U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
                 U64_PRINTF_ARG(estimated_total_queue_size));
-      /* Tell the scheduler we're increasing the queue size */
-      scheduler_adjust_queue_size(chan, 1, adj);
     }
   } else if (queued < chan->bytes_queued_for_xmit) {
     adj = chan->bytes_queued_for_xmit - queued;
@@ -4850,8 +4848,6 @@ channel_update_xmit_queue_size(channel_t *chan)
                 U64_FORMAT ", new size is " U64_FORMAT,
                 U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
                 U64_PRINTF_ARG(estimated_total_queue_size));
-      /* Tell the scheduler we're decreasing the queue size */
-      scheduler_adjust_queue_size(chan, -1, adj);
     }
   }
 }

+ 3 - 8
src/or/config.c

@@ -1813,14 +1813,9 @@ options_act(const or_options_t *old_options)
     return -1;
   }
 
-  /* XXXFORTOR remove set_watermarks */
-  /* Set up scheduler thresholds */
-  scheduler_set_watermarks(100 * 1024*1024 /* 100 MB */,
-                           101 * 1024*1024 /* 101 MB */,
-                           100);
-
-  /* XXXFORTOR enable notification to sched that the conf might have changed */
-  //scheduler_conf_changed();
+  /* Inform the scheduler subsystem that a configuration changed happened. It
+   * might be a change of scheduler or parameter. */
+  scheduler_conf_changed();
 
   /* Set up accounting */
   if (accounting_parse_options(options, 0)<0) {

+ 233 - 406
src/or/scheduler.c

@@ -2,9 +2,7 @@
 /* See LICENSE for licensing information */
 
 #include "or.h"
-
-#define TOR_CHANNEL_INTERNAL_ /* For channel_flush_some_cells() */
-#include "channel.h"
+#include "config.h"
 
 #include "compat_libevent.h"
 #define SCHEDULER_PRIVATE_
@@ -12,35 +10,41 @@
 
 #include <event2/event.h>
 
-/*
- * Scheduler high/low watermarks
- */
-
-static uint32_t sched_q_low_water = 16384;
-static uint32_t sched_q_high_water = 32768;
-
-/*
- * Maximum cells to flush in a single call to channel_flush_some_cells();
- * setting this low means more calls, but too high and we could overshoot
- * sched_q_high_water.
- */
-
-static uint32_t sched_max_flush_cells = 16;
-
 /**
  * \file scheduler.c
  * \brief Channel scheduling system: decides which channels should send and
  * receive when.
  *
- * This module implements a scheduler algorithm, to decide
- * which channels should send/receive when.
+ * This module is the global/common parts of the scheduling system. This system
+ * is what decides what channels get to send cells on their circuits and when.
+ *
+ * Terms:
+ * - "Scheduling system": the collection of scheduler*.{h,c} files and their
+ *   aggregate behavior.
+ * - "Scheduler implementation": a scheduler_t. The scheduling system has one
+ *   active scheduling implementation at a time.
+ *
+ * In this file you will find state that any scheduler implmentation can have
+ * access to as well as the functions the rest of Tor uses to interact with the
+ * scheduling system.
  *
  * The earliest versions of Tor approximated a kind of round-robin system
- * among active connections, but only approximated it.
+ * among active connections, but only approximated it. It would only consider
+ * one connection (roughly equal to a channel in today's terms) at a time, and
+ * thus could only prioritize circuits against others on the same connection.
+ *
+ * Then in response to the KIST paper[0], Tor implemented a global
+ * circuit scheduler. It was supposed to prioritize circuits across man
+ * channels, but wasn't effective. It is preserved in scheduler_vanilla.c.
+ *
+ * [0]: http://www.robgjansen.com/publications/kist-sec2014.pdf
  *
- * Now, write scheduling works by keeping track of which channels can
- * accept cells, and have cells to write.  From the scheduler's perspective,
- * a channel can be in four possible states:
+ * Then we actually got around to implementing KIST for real. We decided to
+ * modularize the scheduler so new ones can be implemented. You can find KIST
+ * in scheduler_kist.c.
+ *
+ * Channels have one of four scheduling states based on whether or not they
+ * have cells to send and whether or not they are able to send.
  *
  * <ol>
  * <li>
@@ -125,85 +129,108 @@ static uint32_t sched_max_flush_cells = 16;
  * </ol>
  *
  * Other event-driven parts of the code move channels between these scheduling
- * states by calling scheduler functions; the scheduler only runs on open-for-
- * writes/has-cells channels and is the only path for those to transition to
- * other states.  The scheduler_run() function gives us the opportunity to do
- * scheduling work, and is called from other scheduler functions whenever a
- * state transition occurs, and periodically from the main event loop.
+ * states by calling scheduler functions. The scheduling system builds up a
+ * list of channels in the SCHED_CHAN_PENDING state that the scheduler
+ * implementation should then use when it runs. Scheduling implementations need
+ * to properly update channel states during their scheduler_t->run() function
+ * as that is the only opportunity for channels to move from SCHED_CHAN_PENDING
+ * to any other state.
+ *
+ * The remainder of this file is a small amount of state that any scheduler
+ * implementation should have access to, and the functions the rest of Tor uses
+ * to interact with the scheduling system.
  */
 
-/* Scheduler global data structures */
+/*****************************************************************************
+ * Scheduling system state
+ *
+ * State that can be accessed from any scheduler implementation (but not
+ * outside the scheduling system)
+ *****************************************************************************/
+
+STATIC scheduler_t *scheduler;
 
 /*
  * We keep a list of channels that are pending - i.e, have cells to write
- * and can accept them to send.  The enum scheduler_state in channel_t
+ * and can accept them to send. The enum scheduler_state in channel_t
  * is reserved for our use.
+ *
+ * Priority queue of channels that can write and have cells (pending work)
  */
-
-/* Pqueue of channels that can write and have cells (pending work) */
 STATIC smartlist_t *channels_pending = NULL;
 
 /*
  * This event runs the scheduler from its callback, and is manually
  * activated whenever a channel enters open for writes/cells to send.
  */
-
 STATIC struct event *run_sched_ev = NULL;
 
-/*
- * Queue heuristic; this is not the queue size, but an 'effective queuesize'
- * that ages out contributions from stalled channels.
- */
-
-STATIC uint64_t queue_heuristic = 0;
+/*****************************************************************************
+ * Scheduling system static function definitions
+ *
+ * Functions that can only be accessed from this file.
+ *****************************************************************************/
 
 /*
- * Timestamp for last queue heuristic update
+ * Scheduler event callback; this should get triggered once per event loop
+ * if any scheduling work was created during the event loop.
  */
+static void
+scheduler_evt_callback(evutil_socket_t fd, short events, void *arg)
+{
+  (void) fd;
+  (void) events;
+  (void) arg;
 
-STATIC time_t queue_heuristic_timestamp = 0;
+  log_debug(LD_SCHED, "Scheduler event callback called");
 
-/* Scheduler static function declarations */
+  tor_assert(run_sched_ev);
 
-static void scheduler_evt_callback(evutil_socket_t fd,
-                                   short events, void *arg);
-static int scheduler_more_work(void);
-static void scheduler_retrigger(void);
-#if 0
-static void scheduler_trigger(void);
-#endif
+  /* Run the scheduler. This is a mandatory function. */
+  tor_assert(scheduler->run);
+  scheduler->run();
 
-/* Scheduler function implementations */
+  /* Schedule itself back in if it has more work. */
+  tor_assert(scheduler->schedule);
+  scheduler->schedule();
+}
 
-/** Free everything and shut down the scheduling system */
+/*****************************************************************************
+ * Scheduling system private function definitions
+ *
+ * Functions that can only be accessed from scheduler*.c
+ *****************************************************************************/
 
-void
-scheduler_free_all(void)
+/* Return the pending channel list. */
+smartlist_t *
+get_channels_pending(void)
 {
-  log_debug(LD_SCHED, "Shutting down scheduler");
-
-  if (run_sched_ev) {
-    if (event_del(run_sched_ev) < 0) {
-      log_warn(LD_BUG, "Problem deleting run_sched_ev");
-    }
-    tor_event_free(run_sched_ev);
-    run_sched_ev = NULL;
-  }
+  return channels_pending;
+}
 
-  if (channels_pending) {
-    smartlist_free(channels_pending);
-    channels_pending = NULL;
-  }
+/* Return our libevent scheduler event. */
+struct event *
+get_run_sched_ev(void)
+{
+  return run_sched_ev;
 }
 
-/**
- * Comparison function to use when sorting pending channels
- */
+/* Return true iff the scheduler subsystem should use KIST. */
+int
+scheduler_should_use_kist(void)
+{
+  int64_t run_freq = kist_scheduler_run_interval();
+  log_info(LD_SCHED, "Determined sched_run_interval should be %" PRId64 ". "
+                     "Will%s use KIST.",
+           run_freq, (run_freq > 0 ? "" : " not"));
+  return run_freq > 0;
+}
 
-MOCK_IMPL(STATIC int,
+/* Comparison function to use when sorting pending channels */
+MOCK_IMPL(int,
 scheduler_compare_channels, (const void *c1_v, const void *c2_v))
 {
-  channel_t *c1 = NULL, *c2 = NULL;
+  const channel_t *c1 = NULL, *c2 = NULL;
   /* These are a workaround for -Wbad-function-cast throwing a fit */
   const circuitmux_policy_t *p1, *p2;
   uintptr_t p1_i, p2_i;
@@ -211,8 +238,8 @@ scheduler_compare_channels, (const void *c1_v, const void *c2_v))
   tor_assert(c1_v);
   tor_assert(c2_v);
 
-  c1 = (channel_t *)(c1_v);
-  c2 = (channel_t *)(c2_v);
+  c1 = (const channel_t *)(c1_v);
+  c2 = (const channel_t *)(c2_v);
 
   tor_assert(c1);
   tor_assert(c2);
@@ -242,26 +269,109 @@ scheduler_compare_channels, (const void *c1_v, const void *c2_v))
   }
 }
 
+/*****************************************************************************
+ * Scheduling system global functions
+ *
+ * Functions that can be accessed from anywhere in Tor.
+ *****************************************************************************/
+
 /*
- * Scheduler event callback; this should get triggered once per event loop
- * if any scheduling work was created during the event loop.
+ * Little helper function called from a few different places. It changes the
+ * scheduler implementation, if necessary. And if it did, it then tells the
+ * old one to free its state and the new one to initialize.
  */
-
 static void
-scheduler_evt_callback(evutil_socket_t fd, short events, void *arg)
+set_scheduler(void)
 {
-  (void)fd;
-  (void)events;
-  (void)arg;
-  log_debug(LD_SCHED, "Scheduler event callback called");
+  int have_kist = 0;
 
-  tor_assert(run_sched_ev);
+  /* Switch, if needed */
+  scheduler_t *old_scheduler = scheduler;
+  if (scheduler_should_use_kist()) {
+    scheduler = get_kist_scheduler();
+    have_kist = 1;
+  } else {
+    scheduler = get_vanilla_scheduler();
+  }
+  tor_assert(scheduler);
+
+  if (old_scheduler != scheduler) {
+    /* Allow the old scheduler to clean up, if needed. */
+    if (old_scheduler && old_scheduler->free_all) {
+      old_scheduler->free_all();
+    }
+    /* We don't clean up the old one, we keep any type of scheduler we've
+     * allocated so we can do an easy switch back. */
+
+    /* Initialize the new scheduler. */
+    if (scheduler->init) {
+      scheduler->init();
+    }
+    log_notice(LD_CONFIG, "Using the %s scheduler.",
+               have_kist ? "KIST" : "vanilla");
+  }
+}
+
+/*
+ * This is how the scheduling system is notified of Tor's configuration
+ * changing. For example: a SIGHUP was issued.
+ */
+void
+scheduler_conf_changed(void)
+{
+  /* Let the scheduler decide what it should do. */
+  set_scheduler();
+
+  /* Then tell the (possibly new) scheduler that we have new options. */
+  if (scheduler->on_new_options) {
+    scheduler->on_new_options();
+  }
+}
+
+/*
+ * Whenever we get a new consensus, this function is called.
+ */
+void
+scheduler_notify_networkstatus_changed(const networkstatus_t *old_c,
+                                       const networkstatus_t *new_c)
+{
+  /* Then tell the (possibly new) scheduler that we have a new consensus */
+  if (scheduler->on_new_consensus) {
+    scheduler->on_new_consensus(old_c, new_c);
+  }
+  /* Maybe the consensus param made us change the scheduler. */
+  set_scheduler();
+}
+
+/*
+ * Free everything scheduling-related from main.c. Note this is only called
+ * when Tor is shutting down, while scheduler_t->free_all() is called both when
+ * Tor is shutting down and when we are switching schedulers.
+ */
+void
+scheduler_free_all(void)
+{
+  log_debug(LD_SCHED, "Shutting down scheduler");
+
+  if (run_sched_ev) {
+    if (event_del(run_sched_ev) < 0) {
+      log_warn(LD_BUG, "Problem deleting run_sched_ev");
+    }
+    tor_event_free(run_sched_ev);
+    run_sched_ev = NULL;
+  }
 
-  /* Run the scheduler */
-  scheduler_run();
+  if (channels_pending) {
+    /* We don't have ownership of the object in this list. */
+    smartlist_free(channels_pending);
+    channels_pending = NULL;
+  }
 
-  /* Do we have more work to do? */
-  if (scheduler_more_work()) scheduler_retrigger();
+  if (scheduler && scheduler->free_all) {
+    scheduler->free_all();
+  }
+  tor_free(scheduler);
+  scheduler = NULL;
 }
 
 /** Mark a channel as no longer ready to accept writes */
@@ -309,8 +419,6 @@ scheduler_channel_doesnt_want_writes,(channel_t *chan))
 MOCK_IMPL(void,
 scheduler_channel_has_waiting_cells,(channel_t *chan))
 {
-  int became_pending = 0;
-
   tor_assert(chan);
   tor_assert(channels_pending);
 
@@ -330,7 +438,9 @@ scheduler_channel_has_waiting_cells,(channel_t *chan))
               "Channel " U64_FORMAT " at %p went from waiting_for_cells "
               "to pending",
               U64_PRINTF_ARG(chan->global_identifier), chan);
-    became_pending = 1;
+    /* If we made a channel pending, we potentially have scheduling work to
+     * do. */
+    scheduler->schedule();
   } else {
     /*
      * It's not in waiting_for_cells, so it can't become pending; it's
@@ -345,16 +455,13 @@ scheduler_channel_has_waiting_cells,(channel_t *chan))
                 U64_PRINTF_ARG(chan->global_identifier), chan);
     }
   }
-
-  /*
-   * If we made a channel pending, we potentially have scheduling work
-   * to do.
-   */
-  if (became_pending) scheduler_retrigger();
 }
 
-/** Set up the scheduling system */
-
+/*
+ * Initialize everything scheduling-related from config.c. Note this is only
+ * called when Tor is starting up, while scheduler_t->init() is called both
+ * when Tor is starting up and when we are switching schedulers.
+ */
 void
 scheduler_init(void)
 {
@@ -363,34 +470,17 @@ scheduler_init(void)
   tor_assert(!run_sched_ev);
   run_sched_ev = tor_event_new(tor_libevent_get_base(), -1,
                                0, scheduler_evt_callback, NULL);
-
   channels_pending = smartlist_new();
-  queue_heuristic = 0;
-  queue_heuristic_timestamp = approx_time();
-}
 
-/** Check if there's more scheduling work */
-
-static int
-scheduler_more_work(void)
-{
-  tor_assert(channels_pending);
-
-  return ((scheduler_get_queue_heuristic() < sched_q_low_water) &&
-          ((smartlist_len(channels_pending) > 0))) ? 1 : 0;
+  set_scheduler();
 }
 
-/** Retrigger the scheduler in a way safe to use from the callback */
-
-static void
-scheduler_retrigger(void)
-{
-  tor_assert(run_sched_ev);
-  event_active(run_sched_ev, EV_TIMEOUT, 1);
-}
-
-/** Notify the scheduler of a channel being closed */
-
+/*
+ * If a channel is going away, this is how the scheduling system is informed
+ * so it can do any freeing necessary. This ultimately calls
+ * scheduler_t->on_channel_free() so the current scheduler can release any
+ * state specific to this channel.
+ */
 MOCK_IMPL(void,
 scheduler_release_channel,(channel_t *chan))
 {
@@ -398,179 +488,29 @@ scheduler_release_channel,(channel_t *chan))
   tor_assert(channels_pending);
 
   if (chan->scheduler_state == SCHED_CHAN_PENDING) {
-    smartlist_pqueue_remove(channels_pending,
-                            scheduler_compare_channels,
-                            offsetof(channel_t, sched_heap_idx),
-                            chan);
-  }
-
-  chan->scheduler_state = SCHED_CHAN_IDLE;
-}
-
-/** Run the scheduling algorithm if necessary */
-
-MOCK_IMPL(void,
-scheduler_run, (void))
-{
-  int n_cells, n_chans_before, n_chans_after;
-  uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after;
-  ssize_t flushed, flushed_this_time;
-  smartlist_t *to_readd = NULL;
-  channel_t *chan = NULL;
-
-  log_debug(LD_SCHED, "We have a chance to run the scheduler");
-
-  if (scheduler_get_queue_heuristic() < sched_q_low_water) {
-    n_chans_before = smartlist_len(channels_pending);
-    q_len_before = channel_get_global_queue_estimate();
-    q_heur_before = scheduler_get_queue_heuristic();
-
-    while (scheduler_get_queue_heuristic() <= sched_q_high_water &&
-           smartlist_len(channels_pending) > 0) {
-      /* Pop off a channel */
-      chan = smartlist_pqueue_pop(channels_pending,
-                                  scheduler_compare_channels,
-                                  offsetof(channel_t, sched_heap_idx));
-      tor_assert(chan);
-
-      /* Figure out how many cells we can write */
-      n_cells = channel_num_cells_writeable(chan);
-      if (n_cells > 0) {
-        log_debug(LD_SCHED,
-                  "Scheduler saw pending channel " U64_FORMAT " at %p with "
-                  "%d cells writeable",
-                  U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
-
-        flushed = 0;
-        while (flushed < n_cells &&
-               scheduler_get_queue_heuristic() <= sched_q_high_water) {
-          flushed_this_time =
-            channel_flush_some_cells(chan,
-                                     MIN(sched_max_flush_cells,
-                                         (size_t) n_cells - flushed));
-          if (flushed_this_time <= 0) break;
-          flushed += flushed_this_time;
-        }
-
-        if (flushed < n_cells) {
-          /* We ran out of cells to flush */
-          chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
-          log_debug(LD_SCHED,
-                    "Channel " U64_FORMAT " at %p "
-                    "entered waiting_for_cells from pending",
-                    U64_PRINTF_ARG(chan->global_identifier),
-                    chan);
-        } else {
-          /* The channel may still have some cells */
-          if (channel_more_to_flush(chan)) {
-          /* The channel goes to either pending or waiting_to_write */
-            if (channel_num_cells_writeable(chan) > 0) {
-              /* Add it back to pending later */
-              if (!to_readd) to_readd = smartlist_new();
-              smartlist_add(to_readd, chan);
-              log_debug(LD_SCHED,
-                        "Channel " U64_FORMAT " at %p "
-                        "is still pending",
-                        U64_PRINTF_ARG(chan->global_identifier),
-                        chan);
-            } else {
-              /* It's waiting to be able to write more */
-              chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
-              log_debug(LD_SCHED,
-                        "Channel " U64_FORMAT " at %p "
-                        "entered waiting_to_write from pending",
-                        U64_PRINTF_ARG(chan->global_identifier),
-                        chan);
-            }
-          } else {
-            /* No cells left; it can go to idle or waiting_for_cells */
-            if (channel_num_cells_writeable(chan) > 0) {
-              /*
-               * It can still accept writes, so it goes to
-               * waiting_for_cells
-               */
-              chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
-              log_debug(LD_SCHED,
-                        "Channel " U64_FORMAT " at %p "
-                        "entered waiting_for_cells from pending",
-                        U64_PRINTF_ARG(chan->global_identifier),
-                        chan);
-            } else {
-              /*
-               * We exactly filled up the output queue with all available
-               * cells; go to idle.
-               */
-              chan->scheduler_state = SCHED_CHAN_IDLE;
-              log_debug(LD_SCHED,
-                        "Channel " U64_FORMAT " at %p "
-                        "become idle from pending",
-                        U64_PRINTF_ARG(chan->global_identifier),
-                        chan);
-            }
-          }
-        }
-
-        log_debug(LD_SCHED,
-                  "Scheduler flushed %d cells onto pending channel "
-                  U64_FORMAT " at %p",
-                  (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
-                  chan);
-      } else {
-        log_info(LD_SCHED,
-                 "Scheduler saw pending channel " U64_FORMAT " at %p with "
-                 "no cells writeable",
-                 U64_PRINTF_ARG(chan->global_identifier), chan);
-        /* Put it back to WAITING_TO_WRITE */
-        chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
-      }
+    if (smartlist_pos(channels_pending, chan) == -1) {
+      log_warn(LD_SCHED, "Scheduler asked to release channel %" PRIu64 " "
+                         "but it wasn't in channels_pending",
+               chan->global_identifier);
+    } else {
+      smartlist_pqueue_remove(channels_pending,
+                              scheduler_compare_channels,
+                              offsetof(channel_t, sched_heap_idx),
+                              chan);
     }
-
-    /* Readd any channels we need to */
-    if (to_readd) {
-      SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) {
-        readd_chan->scheduler_state = SCHED_CHAN_PENDING;
-        smartlist_pqueue_add(channels_pending,
-                             scheduler_compare_channels,
-                             offsetof(channel_t, sched_heap_idx),
-                             readd_chan);
-      } SMARTLIST_FOREACH_END(readd_chan);
-      smartlist_free(to_readd);
+    if (scheduler->on_channel_free) {
+      scheduler->on_channel_free(chan);
     }
-
-    n_chans_after = smartlist_len(channels_pending);
-    q_len_after = channel_get_global_queue_estimate();
-    q_heur_after = scheduler_get_queue_heuristic();
-    log_debug(LD_SCHED,
-              "Scheduler handled %d of %d pending channels, queue size from "
-              U64_FORMAT " to " U64_FORMAT ", queue heuristic from "
-              U64_FORMAT " to " U64_FORMAT,
-              n_chans_before - n_chans_after, n_chans_before,
-              U64_PRINTF_ARG(q_len_before), U64_PRINTF_ARG(q_len_after),
-              U64_PRINTF_ARG(q_heur_before), U64_PRINTF_ARG(q_heur_after));
   }
-}
-
-/** Trigger the scheduling event so we run the scheduler later */
-
-#if 0
-static void
-scheduler_trigger(void)
-{
-  log_debug(LD_SCHED, "Triggering scheduler event");
-
-  tor_assert(run_sched_ev);
 
-  event_add(run_sched_ev, EV_TIMEOUT, 1);
+  chan->scheduler_state = SCHED_CHAN_IDLE;
 }
-#endif
 
 /** Mark a channel as ready to accept writes */
 
 void
 scheduler_channel_wants_writes(channel_t *chan)
 {
-  int became_pending = 0;
-
   tor_assert(chan);
   tor_assert(channels_pending);
 
@@ -579,6 +519,8 @@ scheduler_channel_wants_writes(channel_t *chan)
     /*
      * It can write now, so it goes to channels_pending.
      */
+    log_debug(LD_SCHED, "chan=%" PRIu64 " became pending",
+        chan->global_identifier);
     smartlist_pqueue_add(channels_pending,
                          scheduler_compare_channels,
                          offsetof(channel_t, sched_heap_idx),
@@ -588,7 +530,8 @@ scheduler_channel_wants_writes(channel_t *chan)
               "Channel " U64_FORMAT " at %p went from waiting_to_write "
               "to pending",
               U64_PRINTF_ARG(chan->global_identifier), chan);
-    became_pending = 1;
+    /* We just made a channel pending, we have scheduling work to do. */
+    scheduler->schedule();
   } else {
     /*
      * It's not in SCHED_CHAN_WAITING_TO_WRITE, so it can't become pending;
@@ -602,19 +545,13 @@ scheduler_channel_wants_writes(channel_t *chan)
                 U64_PRINTF_ARG(chan->global_identifier), chan);
     }
   }
-
-  /*
-   * If we made a channel pending, we potentially have scheduling work
-   * to do.
-   */
-  if (became_pending) scheduler_retrigger();
 }
 
-/**
- * Notify the scheduler that a channel's position in the pqueue may have
- * changed
- */
+#ifdef TOR_UNIT_TESTS
 
+/*
+ * Notify scheduler that a channel's queue position may have changed.
+ */
 void
 scheduler_touch_channel(channel_t *chan)
 {
@@ -634,115 +571,5 @@ scheduler_touch_channel(channel_t *chan)
   /* else no-op, since it isn't in the queue */
 }
 
-/**
- * Notify the scheduler of a queue size adjustment, to recalculate the
- * queue heuristic.
- */
-
-void
-scheduler_adjust_queue_size(channel_t *chan, int dir, uint64_t adj)
-{
-  time_t now = approx_time();
-
-  log_debug(LD_SCHED,
-            "Queue size adjustment by %s" U64_FORMAT " for channel "
-            U64_FORMAT,
-            (dir >= 0) ? "+" : "-",
-            U64_PRINTF_ARG(adj),
-            U64_PRINTF_ARG(chan->global_identifier));
-
-  /* Get the queue heuristic up to date */
-  scheduler_update_queue_heuristic(now);
-
-  /* Adjust as appropriate */
-  if (dir >= 0) {
-    /* Increasing it */
-    queue_heuristic += adj;
-  } else {
-    /* Decreasing it */
-    if (queue_heuristic > adj) queue_heuristic -= adj;
-    else queue_heuristic = 0;
-  }
-
-  log_debug(LD_SCHED,
-            "Queue heuristic is now " U64_FORMAT,
-            U64_PRINTF_ARG(queue_heuristic));
-}
-
-/**
- * Query the current value of the queue heuristic
- */
-
-STATIC uint64_t
-scheduler_get_queue_heuristic(void)
-{
-  time_t now = approx_time();
-
-  scheduler_update_queue_heuristic(now);
-
-  return queue_heuristic;
-}
-
-/**
- * Adjust the queue heuristic value to the present time
- */
-
-STATIC void
-scheduler_update_queue_heuristic(time_t now)
-{
-  time_t diff;
-
-  if (queue_heuristic_timestamp == 0) {
-    /*
-     * Nothing we can sensibly do; must not have been initted properly.
-     * Oh well.
-     */
-    queue_heuristic_timestamp = now;
-  } else if (queue_heuristic_timestamp < now) {
-    diff = now - queue_heuristic_timestamp;
-    /*
-     * This is a simple exponential age-out; the other proposed alternative
-     * was a linear age-out using the bandwidth history in rephist.c; I'm
-     * going with this out of concern that if an adversary can jam the
-     * scheduler long enough, it would cause the bandwidth to drop to
-     * zero and render the aging mechanism ineffective thereafter.
-     */
-    if (0 <= diff && diff < 64) queue_heuristic >>= diff;
-    else queue_heuristic = 0;
-
-    queue_heuristic_timestamp = now;
-
-    log_debug(LD_SCHED,
-              "Queue heuristic is now " U64_FORMAT,
-              U64_PRINTF_ARG(queue_heuristic));
-  }
-  /* else no update needed, or time went backward */
-}
-
-/**
- * Set scheduler watermarks and flush size
- */
-
-void
-scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush)
-{
-  /* Sanity assertions - caller should ensure these are true */
-  tor_assert(lo > 0);
-  tor_assert(hi > lo);
-  tor_assert(max_flush > 0);
-
-  sched_q_low_water = lo;
-  sched_q_high_water = hi;
-  sched_max_flush_cells = max_flush;
-}
-
-/* XXXFORTOR Temp def of this func to get this commit to compile. Replace with
- * real func */
-void
-scheduler_notify_networkstatus_changed(const networkstatus_t *old_c,
-                                       const networkstatus_t *new_c)
-{
-  (void) old_c;
-  (void) new_c;
-}
+#endif /* TOR_UNIT_TESTS */
 

+ 149 - 36
src/or/scheduler.h

@@ -1,9 +1,9 @@
-/* * Copyright (c) 2013-2017, The Tor Project, Inc. */
+/* * Copyright (c) 2017, The Tor Project, Inc. */
 /* See LICENSE for licensing information */
 
 /**
  * \file scheduler.h
- * \brief Header file for scheduler.c
+ * \brief Header file for scheduler*.c
  **/
 
 #ifndef TOR_SCHEDULER_H
@@ -13,50 +13,163 @@
 #include "channel.h"
 #include "testsupport.h"
 
-/* Global-visibility scheduler functions */
+/*
+ * A scheduler implementation is a collection of function pointers. If you
+ * would like to add a new scheduler called foo, create scheduler_foo.c,
+ * implement at least the mandatory ones, and implement get_foo_scheduler()
+ * that returns a complete scheduler_t for your foo scheduler. See
+ * scheduler_kist.c for an example.
+ *
+ * These function pointers SHOULD NOT be used anywhere outside of the
+ * scheduling source files. The rest of Tor should communicate with the
+ * scheduling system through the functions near the bottom of this file, and
+ * those functions will call into the current scheduler implementation as
+ * necessary.
+ *
+ * If your scheduler doesn't need to implement something (for example: it
+ * doesn't create any state for itself, thus it has nothing to free when Tor
+ * is shutting down), then set that function pointer to NULL.
+ */
+typedef struct scheduler_s {
+  /* (Optional) To be called when we want to prepare a scheduler for use.
+   * Perhaps Tor just started and we are the lucky chosen scheduler, or
+   * perhaps Tor is switching to this scheduler. No matter the case, this is
+   * where we would prepare any state and initialize parameters. You might
+   * think of this as the opposite of free_all(). */
+  void (*init)(void);
+
+  /* (Optional) To be called when we want to tell the scheduler to delete all
+   * of its state (if any). Perhaps Tor is shutting down or perhaps we are
+   * switching schedulers. */
+  void (*free_all)(void);
+
+  /* (Mandatory) Libevent controls the main event loop in Tor, and this is
+   * where we register with libevent the next execution of run_sched_ev [which
+   * ultimately calls run()]. */
+  void (*schedule)(void);
+
+  /* (Mandatory) This is the heart of a scheduler! This is where the
+   * excitement happens! Here libevent has given us the chance to execute, and
+   * we should do whatever we need to do in order to move some cells from
+   * their circuit queues to output buffers in an intelligent manner. We
+   * should do this quickly. When we are done, we'll try to schedule() ourself
+   * if more work needs to be done to setup the next scehduling run. */
+  void (*run)(void);
+
+  /*
+   * External event not related to the scheduler but that can influence it.
+   */
+
+  /* (Optional) To be called whenever Tor finds out about a new consensus.
+   * First the scheduling system as a whole will react to the new consensus
+   * and change the scheduler if needed. After that, whatever is the (possibly
+   * new) scheduler will call this so it has the chance to react to the new
+   * consensus too. If there's a consensus parameter that your scheduler wants
+   * to keep an eye on, this is where you should check for it.  */
+  void (*on_new_consensus)(const networkstatus_t *old_c,
+                           const networkstatus_t *new_c);
+
+  /* (Optional) To be called when a channel is being freed. Sometimes channels
+   * go away (for example: the relay on the other end is shutting down). If
+   * the scheduler keeps any channel-specific state and has memory to free
+   * when channels go away, implement this and free it here. */
+  void (*on_channel_free)(const channel_t *);
+
+  /* (Optional) To be called whenever Tor is reloading configuration options.
+   * For example: SIGHUP was issued and Tor is rereading its torrc. A
+   * scheduler should use this as an opportunity to parse and cache torrc
+   * options so that it doesn't have to call get_options() all the time. */
+  void (*on_new_options)(void);
+} scheduler_t;
+
+/*****************************************************************************
+ * Globally visible scheduler functions
+ *
+ * These functions are how the rest of Tor communicates with the scheduling
+ * system.
+ *****************************************************************************/
 
-/* Set up and shut down the scheduler from main.c */
-void scheduler_free_all(void);
 void scheduler_init(void);
-MOCK_DECL(void, scheduler_run, (void));
-
-/* Mark channels as having cells or wanting/not wanting writes */
-MOCK_DECL(void,scheduler_channel_doesnt_want_writes,(channel_t *chan));
-MOCK_DECL(void,scheduler_channel_has_waiting_cells,(channel_t *chan));
-void scheduler_channel_wants_writes(channel_t *chan);
-
-/* Notify the scheduler of a channel being closed */
-MOCK_DECL(void,scheduler_release_channel,(channel_t *chan));
-
-/* Notify scheduler of queue size adjustments */
-void scheduler_adjust_queue_size(channel_t *chan, int dir, uint64_t adj);
-
-/* Notify scheduler that a channel's queue position may have changed */
-void scheduler_touch_channel(channel_t *chan);
-
-/* Adjust the watermarks from config file*/
-void scheduler_set_watermarks(uint32_t lo, uint32_t hi, uint32_t max_flush);
-
-/* XXXFORTOR Temp def of this func to get this commit to compile. Replace with
- * real func */
+void scheduler_free_all(void);
+void scheduler_conf_changed(void);
 void scheduler_notify_networkstatus_changed(const networkstatus_t *old_c,
                                             const networkstatus_t *new_c);
+MOCK_DECL(void, scheduler_release_channel, (channel_t *chan));
 
-/* Things only scheduler.c and its test suite should see */
-
+/*
+ * Ways for a channel to interact with the scheduling system. A channel only
+ * really knows (i) whether or not it has cells it wants to send, and
+ * (ii) whether or not it would like to write.
+ */
+void scheduler_channel_wants_writes(channel_t *chan);
+MOCK_DECL(void, scheduler_channel_doesnt_want_writes, (channel_t *chan));
+MOCK_DECL(void, scheduler_channel_has_waiting_cells, (channel_t *chan));
+
+/*****************************************************************************
+ * Private scheduler functions
+ *
+ * These functions are only visible to the scheduling system, the current
+ * scheduler implementation, and tests.
+ *****************************************************************************/
 #ifdef SCHEDULER_PRIVATE_
-MOCK_DECL(STATIC int, scheduler_compare_channels,
+
+/*********************************
+ * Defined in scheduler.c
+ *********************************/
+int scheduler_should_use_kist(void);
+smartlist_t *get_channels_pending(void);
+struct event *get_run_sched_ev(void);
+MOCK_DECL(int, scheduler_compare_channels,
           (const void *c1_v, const void *c2_v));
-STATIC uint64_t scheduler_get_queue_heuristic(void);
-STATIC void scheduler_update_queue_heuristic(time_t now);
 
 #ifdef TOR_UNIT_TESTS
 extern smartlist_t *channels_pending;
 extern struct event *run_sched_ev;
-extern uint64_t queue_heuristic;
-extern time_t queue_heuristic_timestamp;
-#endif
-#endif
+extern scheduler_t *scheduler;
+void scheduler_touch_channel(channel_t *chan);
+#endif /* TOR_UNIT_TESTS */
+
+/*********************************
+ * Defined in scheduler_kist.c
+ *********************************/
+
+/* Socke table entry which holds information of a channel's socket and kernel
+ * TCP information. Only used by KIST. */
+typedef struct socket_table_ent_s {
+  HT_ENTRY(socket_table_ent_s) node;
+  const channel_t *chan;
+  /* Amount written this scheduling run */
+  uint64_t written;
+  /* Amount that can be written this scheduling run */
+  uint64_t limit;
+  /* TCP info from the kernel */
+  uint32_t cwnd;
+  uint32_t unacked;
+  uint32_t mss;
+  uint32_t notsent;
+} socket_table_ent_t;
+
+typedef HT_HEAD(outbuf_table_s, outbuf_table_ent_s) outbuf_table_t;
+
+MOCK_DECL(int, channel_should_write_to_kernel,
+          (outbuf_table_t *table, channel_t *chan));
+MOCK_DECL(void, channel_write_to_kernel, (channel_t *chan));
+MOCK_DECL(void, update_socket_info_impl, (socket_table_ent_t *ent));
+
+scheduler_t *get_kist_scheduler(void);
+int32_t kist_scheduler_run_interval(const networkstatus_t *ns);
+
+#ifdef TOR_UNIT_TESTS
+extern int32_t sched_run_interval;
+#endif /* TOR_UNIT_TESTS */
+
+/*********************************
+ * Defined in scheduler_vanilla.c
+ *********************************/
+
+scheduler_t *get_vanilla_scheduler(void);
+
+#endif /* SCHEDULER_PRIVATE_ */
 
-#endif /* !defined(TOR_SCHEDULER_H) */
+#endif /* TOR_SCHEDULER_H */
 

+ 585 - 0
src/or/scheduler_kist.c

@@ -1,5 +1,590 @@
 /* Copyright (c) 2017, The Tor Project, Inc. */
 /* See LICENSE for licensing information */
 
+#include <event2/event.h>
+#include <netinet/tcp.h>
+
+#include "or.h"
+#include "buffers.h"
+#include "config.h"
+#include "connection.h"
+#include "networkstatus.h"
+#define TOR_CHANNEL_INTERNAL_
+#include "channel.h"
+#include "channeltls.h"
+#define SCHEDULER_PRIVATE_
 #include "scheduler.h"
 
+#define TLS_PER_CELL_OVERHEAD 29
+
+/* Kernel interface needed for KIST. */
+#include <linux/sockios.h>
+
+/*****************************************************************************
+ * Data structures and supporting functions
+ *****************************************************************************/
+
+/* Socket_table hash table stuff. The socket_table keeps track of per-socket
+ * limit information imposed by kist and used by kist. */
+
+static uint32_t
+socket_table_ent_hash(const socket_table_ent_t *ent)
+{
+  return (uint32_t)ent->chan->global_identifier;
+}
+
+static unsigned
+socket_table_ent_eq(const socket_table_ent_t *a, const socket_table_ent_t *b)
+{
+  return a->chan->global_identifier == b->chan->global_identifier;
+}
+
+typedef HT_HEAD(socket_table_s, socket_table_ent_s) socket_table_t;
+
+static socket_table_t socket_table = HT_INITIALIZER();
+
+HT_PROTOTYPE(socket_table_s, socket_table_ent_s, node, socket_table_ent_hash,
+             socket_table_ent_eq)
+HT_GENERATE2(socket_table_s, socket_table_ent_s, node, socket_table_ent_hash,
+             socket_table_ent_eq, 0.6, tor_reallocarray, tor_free_)
+
+/* outbuf_table hash table stuff. The outbuf_table keeps track of which
+ * channels have data sitting in their outbuf so the kist scheduler can force
+ * a write from outbuf to kernel periodically during a run and at the end of a
+ * run. */
+
+typedef struct outbuf_table_ent_s {
+  HT_ENTRY(outbuf_table_ent_s) node;
+  channel_t *chan;
+} outbuf_table_ent_t;
+
+static uint32_t
+outbuf_table_ent_hash(const outbuf_table_ent_t *ent)
+{
+  return (uint32_t)ent->chan->global_identifier;
+}
+
+static unsigned
+outbuf_table_ent_eq(const outbuf_table_ent_t *a, const outbuf_table_ent_t *b)
+{
+  return a->chan->global_identifier == b->chan->global_identifier;
+}
+
+static outbuf_table_t outbuf_table = HT_INITIALIZER();
+
+HT_PROTOTYPE(outbuf_table_s, outbuf_table_ent_s, node, outbuf_table_ent_hash,
+             outbuf_table_ent_eq)
+HT_GENERATE2(outbuf_table_s, outbuf_table_ent_s, node, outbuf_table_ent_hash,
+             outbuf_table_ent_eq, 0.6, tor_reallocarray, tor_free_)
+
+/*****************************************************************************
+ * Other internal data
+ *****************************************************************************/
+
+static struct timeval scheduler_last_run = {0, 0};
+static double sock_buf_size_factor = 1.0;
+STATIC int32_t sched_run_interval = 10;
+static scheduler_t *kist_scheduler = NULL;
+
+/*****************************************************************************
+ * Internally called function implementations
+ *****************************************************************************/
+
+/* Little helper function to get the length of a channel's output buffer */
+static inline size_t
+channel_outbuf_length(channel_t *chan)
+{
+  return buf_datalen(TO_CONN(BASE_CHAN_TO_TLS(chan)->conn)->outbuf);
+}
+
+/* Little helper function for HT_FOREACH_FN. */
+static int
+each_channel_write_to_kernel(outbuf_table_ent_t *ent, void *data)
+{
+  (void) data; /* Make compiler happy. */
+  channel_write_to_kernel(ent->chan);
+  return 0; /* Returning non-zero removes the element from the table. */
+}
+
+/* Free the given outbuf table entry ent. */
+static int
+free_outbuf_info_by_ent(outbuf_table_ent_t *ent, void *data)
+{
+  (void) data; /* Make compiler happy. */
+  log_debug(LD_SCHED, "Freeing outbuf table entry from chan=%" PRIu64,
+            ent->chan->global_identifier);
+  tor_free(ent);
+  return 1; /* So HT_FOREACH_FN will remove the element */
+}
+
+/* Clean up outbuf_table. Probably because the KIST sched impl is going away */
+static void
+free_all_outbuf_info(void)
+{
+  HT_FOREACH_FN(outbuf_table_s, &outbuf_table, free_outbuf_info_by_ent, NULL);
+}
+
+/* Free the given socket table entry ent. */
+static int
+free_socket_info_by_ent(socket_table_ent_t *ent, void *data)
+{
+  (void) data; /* Make compiler happy. */
+  log_debug(LD_SCHED, "Freeing socket table entry from chan=%" PRIu64,
+            ent->chan->global_identifier);
+  tor_free(ent);
+  return 1; /* So HT_FOREACH_FN will remove the element */
+}
+
+/* Clean up socket_table. Probably because the KIST sched impl is going away */
+static void
+free_all_socket_info(void)
+{
+  HT_FOREACH_FN(socket_table_s, &socket_table, free_socket_info_by_ent, NULL);
+}
+
+static socket_table_ent_t *
+socket_table_search(socket_table_t *table, const channel_t *chan)
+{
+  socket_table_ent_t search, *ent = NULL;
+  search.chan = chan;
+  ent = HT_FIND(socket_table_s, table, &search);
+  return ent;
+}
+
+/* Free a socket entry in table for the given chan. */
+static void
+free_socket_info_by_chan(socket_table_t *table, const channel_t *chan)
+{
+  socket_table_ent_t *ent = NULL;
+  ent = socket_table_search(table, chan);
+  if (!ent)
+    return;
+  log_debug(LD_SCHED, "scheduler free socket info for chan=%" PRIu64,
+            chan->global_identifier);
+  HT_REMOVE(socket_table_s, table, ent);
+  free_socket_info_by_ent(ent, NULL);
+}
+
+MOCK_IMPL(void,
+update_socket_info_impl, (socket_table_ent_t *ent))
+{
+  int64_t tcp_space, extra_space;
+  const tor_socket_t sock =
+    TO_CONN(BASE_CHAN_TO_TLS((channel_t *) ent->chan)->conn)->s;
+  struct tcp_info tcp;
+  socklen_t tcp_info_len = sizeof(tcp);
+
+  /* Gather information */
+  getsockopt(sock, SOL_TCP, TCP_INFO, (void *)&(tcp), &tcp_info_len);
+  ioctl(sock, SIOCOUTQNSD, &(ent->notsent));
+  ent->cwnd = tcp.tcpi_snd_cwnd;
+  ent->unacked = tcp.tcpi_unacked;
+  ent->mss = tcp.tcpi_snd_mss;
+
+  tcp_space = (ent->cwnd - ent->unacked) * ent->mss;
+  if (tcp_space < 0) {
+    tcp_space = 0;
+  }
+  extra_space =
+    clamp_double_to_int64((ent->cwnd * ent->mss) * sock_buf_size_factor) -
+    ent->notsent;
+  if (extra_space < 0) {
+    extra_space = 0;
+  }
+  ent->limit = tcp_space + extra_space;
+  return;
+}
+
+/* Given a socket that isn't in the table, add it.
+ * Given a socket that is in the table, reinit values that need init-ing
+ * every scheduling run
+ */
+static void
+init_socket_info(socket_table_t *table, const channel_t *chan)
+{
+  socket_table_ent_t *ent = NULL;
+  ent = socket_table_search(table, chan);
+  if (!ent) {
+    log_debug(LD_SCHED, "scheduler init socket info for chan=%" PRIu64,
+              chan->global_identifier);
+    ent = tor_malloc_zero(sizeof(*ent));
+    ent->chan = chan;
+    HT_INSERT(socket_table_s, table, ent);
+  }
+  ent->written = 0;
+}
+
+/* Add chan to the outbuf table if it isn't already in it. If it is, then don't
+ * do anything */
+static void
+outbuf_table_add(outbuf_table_t *table, channel_t *chan)
+{
+  outbuf_table_ent_t search, *ent;
+  search.chan = chan;
+  ent = HT_FIND(outbuf_table_s, table, &search);
+  if (!ent) {
+    log_debug(LD_SCHED, "scheduler init outbuf info for chan=%" PRIu64,
+              chan->global_identifier);
+    ent = tor_malloc_zero(sizeof(*ent));
+    ent->chan = chan;
+    HT_INSERT(outbuf_table_s, table, ent);
+  }
+}
+
+static void
+outbuf_table_remove(outbuf_table_t *table, channel_t *chan)
+{
+  outbuf_table_ent_t search, *ent;
+  search.chan = chan;
+  ent = HT_FIND(outbuf_table_s, table, &search);
+  if (ent) {
+    HT_REMOVE(outbuf_table_s, table, ent);
+    free_outbuf_info_by_ent(ent, NULL);
+  }
+}
+
+/* Set the scheduler running interval. */
+static void
+set_scheduler_run_interval(const networkstatus_t *ns)
+{
+  int32_t old_sched_run_interval = sched_run_interval;
+  sched_run_interval = kist_scheduler_run_interval(ns);
+  if (old_sched_run_interval != sched_run_interval) {
+    log_info(LD_SCHED, "Scheduler KIST changing its running interval "
+                       "from %" PRId32 " to %" PRId32,
+             old_sched_run_interval, sched_run_interval);
+  }
+}
+
+/* Return true iff the channel associated socket can write to the kernel that
+ * is hasn't reach the limit. */
+static int
+socket_can_write(socket_table_t *table, const channel_t *chan)
+{
+  socket_table_ent_t *ent = NULL;
+  ent = socket_table_search(table, chan);
+  tor_assert(ent);
+
+  int64_t kist_limit_space =
+    (int64_t) (ent->limit - ent->written) /
+    (CELL_MAX_NETWORK_SIZE + TLS_PER_CELL_OVERHEAD);
+  return kist_limit_space > 0;
+}
+
+/* Update the channel's socket kernel information. */
+static void
+update_socket_info(socket_table_t *table, const channel_t *chan)
+{
+  socket_table_ent_t *ent = NULL;
+  ent = socket_table_search(table, chan);
+  tor_assert(ent);
+  update_socket_info_impl(ent);
+}
+
+/* Increament the channel's socket written value by the number of bytes. */
+static void
+update_socket_written(socket_table_t *table, channel_t *chan, size_t bytes)
+{
+  socket_table_ent_t *ent = NULL;
+  ent = socket_table_search(table, chan);
+  tor_assert(ent);
+
+  log_debug(LD_SCHED, "chan=%" PRIu64 " wrote %lu bytes, old was %" PRIi64,
+            chan->global_identifier, bytes, ent->written);
+
+  ent->written += bytes;
+}
+
+/*
+ * A naive KIST impl would write every single cell all the way to the kernel.
+ * That would take a lot of system calls. A less bad KIST impl would write a
+ * channel's outbuf to the kernel only when we are switching to a different
+ * channel. But if we have two channels with equal priority, we end up writing
+ * one cell for each and bouncing back and forth. This KIST impl avoids that
+ * by only writing a channel's outbuf to the kernel if it has 8 cells or more
+ * in it.
+ */
+MOCK_IMPL(int, channel_should_write_to_kernel,
+          (outbuf_table_t *table, channel_t *chan))
+{
+  outbuf_table_add(table, chan);
+  /* CELL_MAX_NETWORK_SIZE * 8 because we only want to write the outbuf to the
+   * kernel if there's 8 or more cells waiting */
+  return channel_outbuf_length(chan) > (CELL_MAX_NETWORK_SIZE * 8);
+}
+
+/* Little helper function to write a channel's outbuf all the way to the
+ * kernel */
+MOCK_IMPL(void, channel_write_to_kernel, (channel_t *chan))
+{
+  log_debug(LD_SCHED, "Writing %lu bytes to kernel for chan %" PRIu64,
+            channel_outbuf_length(chan), chan->global_identifier);
+  connection_handle_write(TO_CONN(BASE_CHAN_TO_TLS(chan)->conn), 0);
+}
+
+/* Return true iff the scheduler has work to perform. */
+static int
+have_work(void)
+{
+  smartlist_t *cp = get_channels_pending();
+  tor_assert(cp);
+  return smartlist_len(cp) > 0;
+}
+
+/* Function of the scheduler interface: free_all() */
+static void
+kist_free_all(void)
+{
+  free_all_outbuf_info();
+  free_all_socket_info();
+}
+
+/* Function of the scheduler interface: on_channel_free() */
+static void
+kist_on_channel_free(const channel_t *chan)
+{
+  free_socket_info_by_chan(&socket_table, chan);
+}
+
+/* Function of the scheduler interface: on_new_consensus() */
+static void
+kist_scheduler_on_new_consensus(const networkstatus_t *old_c,
+                                const networkstatus_t *new_c)
+{
+  (void) old_c;
+  (void) new_c;
+
+  set_scheduler_run_interval(new_c);
+}
+
+/* Function of the scheduler interface: run() */
+static void
+kist_scheduler_on_new_options(void)
+{
+  sock_buf_size_factor = get_options()->KISTSockBufSizeFactor;
+
+  /* Calls kist_scheduler_run_interval which calls get_options(). */
+  set_scheduler_run_interval(NULL);
+}
+
+/* Function of the scheduler interface: init() */
+static void
+kist_scheduler_init(void)
+{
+  kist_scheduler_on_new_options();
+  tor_assert(sched_run_interval > 0);
+}
+
+/* Function of the scheduler interface: schedule() */
+static void
+kist_scheduler_schedule(void)
+{
+  struct timeval now, next_run;
+  int32_t diff;
+  struct event *ev = get_run_sched_ev();
+  tor_assert(ev);
+  if (!have_work()) {
+    return;
+  }
+  tor_gettimeofday(&now);
+  diff = (int32_t) tv_mdiff(&scheduler_last_run, &now);
+  if (diff < sched_run_interval) {
+    next_run.tv_sec = 0;
+    /* 1000 for ms -> us */
+    next_run.tv_usec = (sched_run_interval - diff) * 1000;
+    /* Readding an event reschedules it. It does not duplicate it. */
+    event_add(ev, &next_run);
+  } else {
+    event_active(ev, EV_TIMEOUT, 1);
+  }
+}
+
+/* Function of the scheduler interface: run() */
+static void
+kist_scheduler_run(void)
+{
+  /* Define variables */
+  channel_t *chan = NULL; // current working channel
+  /* The last distinct chan served in a sched loop. */
+  channel_t *prev_chan = NULL;
+  int flush_result; // temporarily store results from flush calls
+  /* Channels to be readding to pending at the end */
+  smartlist_t *to_readd = NULL;
+  smartlist_t *cp = get_channels_pending();
+
+  /* For each pending channel, collect new kernel information */
+  SMARTLIST_FOREACH_BEGIN(cp, const channel_t *, pchan) {
+      init_socket_info(&socket_table, pchan);
+      update_socket_info(&socket_table, pchan);
+  } SMARTLIST_FOREACH_END(pchan);
+
+  log_debug(LD_SCHED, "Running the scheduler. %d channels pending",
+            smartlist_len(cp));
+
+  /* The main scheduling loop. Loop until there are no more pending channels */
+  while (smartlist_len(cp) > 0) {
+    /* get best channel */
+    chan = smartlist_pqueue_pop(cp, scheduler_compare_channels,
+                                offsetof(channel_t, sched_heap_idx));
+    tor_assert(chan);
+    outbuf_table_add(&outbuf_table, chan);
+
+    /* if we have switched to a new channel, consider writing the previous
+     * channel's outbuf to the kernel. */
+    if (!prev_chan) prev_chan = chan;
+    if (prev_chan != chan) {
+      if (channel_should_write_to_kernel(&outbuf_table, prev_chan)) {
+        channel_write_to_kernel(prev_chan);
+        outbuf_table_remove(&outbuf_table, prev_chan);
+      }
+      prev_chan = chan;
+    }
+
+    /* Only flush and write if the per-socket limit hasn't been hit */
+    if (socket_can_write(&socket_table, chan)) {
+      /* flush to channel queue/outbuf */
+      flush_result = (int)channel_flush_some_cells(chan, 1); // 1 for num cells
+      /* flush_result has the # cells flushed */
+      if (flush_result > 0) {
+        update_socket_written(&socket_table, chan, flush_result *
+                              (CELL_MAX_NETWORK_SIZE + TLS_PER_CELL_OVERHEAD));
+      }
+      /* XXX What if we didn't flush? */
+    }
+
+    /* Decide what to do with the channel now */
+
+    if (!channel_more_to_flush(chan) &&
+        !socket_can_write(&socket_table, chan)) {
+
+      /* Case 1: no more cells to send, and cannot write */
+
+      /*
+       * You might think we should put the channel in SCHED_CHAN_IDLE. And
+       * you're probably correct. While implementing KIST, we found that the
+       * scheduling system would sometimes lose track of channels when we did
+       * that. We suspect it has to do with the difference between "can't
+       * write because socket/outbuf is full" and KIST's "can't write because
+       * we've arbitrarily decided that that's enough for now." Sometimes
+       * channels run out of cells at the same time they hit their
+       * kist-imposed write limit and maybe the rest of Tor doesn't put the
+       * channel back in pending when it is supposed to.
+       *
+       * This should be investigated again. It is as simple as changing
+       * SCHED_CHAN_WAITING_FOR_CELLS to SCHED_CHAN_IDLE and seeing if Tor
+       * starts having serious throughput issues. Best done in shadow/chutney.
+       */
+      chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+      log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_for_cells",
+                chan->global_identifier);
+    } else if (!channel_more_to_flush(chan)) {
+
+      /* Case 2: no more cells to send, but still open for writes */
+
+      chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+      log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_for_cells",
+                chan->global_identifier);
+    } else if (!socket_can_write(&socket_table, chan)) {
+
+      /* Case 3: cells to send, but cannot write */
+
+      chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+      if (!to_readd)
+        to_readd = smartlist_new();
+      smartlist_add(to_readd, chan);
+      log_debug(LD_SCHED, "chan=%" PRIu64 " now waiting_to_write",
+                chan->global_identifier);
+    } else {
+
+      /* Case 4: cells to send, and still open for writes */
+
+      chan->scheduler_state = SCHED_CHAN_PENDING;
+      smartlist_pqueue_add(cp, scheduler_compare_channels,
+                           offsetof(channel_t, sched_heap_idx), chan);
+    }
+  } /* End of main scheduling loop */
+
+  /* Write the outbuf of any channels that still have data */
+  HT_FOREACH_FN(outbuf_table_s, &outbuf_table, each_channel_write_to_kernel,
+                NULL);
+  free_all_outbuf_info();
+  HT_CLEAR(outbuf_table_s, &outbuf_table);
+
+  log_debug(LD_SCHED, "len pending=%d, len to_readd=%d",
+            smartlist_len(cp),
+            (to_readd ? smartlist_len(to_readd) : -1));
+
+  /* Readd any channels we need to */
+  if (to_readd) {
+    SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) {
+      readd_chan->scheduler_state = SCHED_CHAN_PENDING;
+      if (!smartlist_contains(cp, readd_chan)) {
+        smartlist_pqueue_add(cp, scheduler_compare_channels,
+                             offsetof(channel_t, sched_heap_idx), readd_chan);
+      }
+    } SMARTLIST_FOREACH_END(readd_chan);
+    smartlist_free(to_readd);
+  }
+
+  tor_gettimeofday(&scheduler_last_run);
+}
+
+/*****************************************************************************
+ * Externally called function implementations not called through scheduler_t
+ *****************************************************************************/
+
+/* Return the KIST scheduler object. If it didn't exists, return a newly
+ * allocated one but init() is not called. */
+scheduler_t *
+get_kist_scheduler(void)
+{
+  if (!kist_scheduler) {
+    log_debug(LD_SCHED, "Allocating kist scheduler struct");
+    kist_scheduler = tor_malloc_zero(sizeof(*kist_scheduler));
+    kist_scheduler->free_all = kist_free_all;
+    kist_scheduler->on_channel_free = kist_on_channel_free;
+    kist_scheduler->init = kist_scheduler_init;
+    kist_scheduler->on_new_consensus = kist_scheduler_on_new_consensus;
+    kist_scheduler->schedule = kist_scheduler_schedule;
+    kist_scheduler->run = kist_scheduler_run;
+    kist_scheduler->on_new_options = kist_scheduler_on_new_options;
+  }
+  return kist_scheduler;
+}
+
+/* Default interval that KIST runs (in ms). */
+#define KIST_SCHED_RUN_INTERVAL_DEFAULT 10
+/* Minimum interval that KIST runs. This value disables KIST. */
+#define KIST_SCHED_RUN_INTERVAL_MIN 0
+/* Maximum interval that KIST runs (in ms). */
+#define KIST_SCHED_RUN_INTERVAL_MAX 100
+
+/* Check the torrc for the configured KIST scheduler run frequency.
+ * - If torrc < 0, then return the negative torrc value (shouldn't even be
+ *   using KIST)
+ * - If torrc > 0, then return the positive torrc value (should use KIST, and
+ *   should use the set value)
+ * - If torrc == 0, then look in the consensus for what the value should be.
+ *   - If == 0, then return -1 (don't use KIST)
+ *   - If > 0, then return the positive consensus value
+ *   - If consensus doesn't say anything, return 10 milliseconds
+ */
+int32_t
+kist_scheduler_run_interval(const networkstatus_t *ns)
+{
+  int32_t run_interval = (int32_t)get_options()->KISTSchedRunInterval;
+  if (run_interval != 0) {
+    log_debug(LD_SCHED, "Found KISTSchedRunInterval in torrc. Using that.");
+    return run_interval;
+  }
+
+  log_debug(LD_SCHED, "Turning to the consensus for KISTSchedRunInterval");
+  run_interval = networkstatus_get_param(ns, "KISTSchedRunInterval",
+                                         KIST_SCHED_RUN_INTERVAL_DEFAULT,
+                                         KIST_SCHED_RUN_INTERVAL_MIN,
+                                         KIST_SCHED_RUN_INTERVAL_MAX);
+  if (run_interval <= 0)
+    return -1;
+  return run_interval;
+}
+

+ 186 - 0
src/or/scheduler_vanilla.c

@@ -1,5 +1,191 @@
 /* Copyright (c) 2017, The Tor Project, Inc. */
 /* See LICENSE for licensing information */
 
+#include <event2/event.h>
+
+#include "or.h"
+#include "config.h"
+#define TOR_CHANNEL_INTERNAL_
+#include "channel.h"
+#define SCHEDULER_PRIVATE_
 #include "scheduler.h"
 
+/*****************************************************************************
+ * Other internal data
+ *****************************************************************************/
+
+/* Maximum cells to flush in a single call to channel_flush_some_cells(); */
+#define MAX_FLUSH_CELLS 1000
+
+static scheduler_t *vanilla_scheduler = NULL;
+
+/*****************************************************************************
+ * Externally called function implementations
+ *****************************************************************************/
+
+/* Return true iff the scheduler has work to perform. */
+static int
+have_work(void)
+{
+  smartlist_t *cp = get_channels_pending();
+  tor_assert(cp);
+  return smartlist_len(cp) > 0;
+}
+
+/** Retrigger the scheduler in a way safe to use from the callback */
+
+static void
+vanilla_scheduler_schedule(void)
+{
+  if (!have_work()) {
+    return;
+  }
+  struct event *ev = get_run_sched_ev();
+  tor_assert(ev);
+  event_active(ev, EV_TIMEOUT, 1);
+}
+
+static void
+vanilla_scheduler_run(void)
+{
+  int n_cells, n_chans_before, n_chans_after;
+  ssize_t flushed, flushed_this_time;
+  smartlist_t *cp = get_channels_pending();
+  smartlist_t *to_readd = NULL;
+  channel_t *chan = NULL;
+
+  log_debug(LD_SCHED, "We have a chance to run the scheduler");
+
+  n_chans_before = smartlist_len(cp);
+
+  while (smartlist_len(cp) > 0) {
+    /* Pop off a channel */
+    chan = smartlist_pqueue_pop(cp,
+                                scheduler_compare_channels,
+                                offsetof(channel_t, sched_heap_idx));
+    tor_assert(chan);
+
+    /* Figure out how many cells we can write */
+    n_cells = channel_num_cells_writeable(chan);
+    if (n_cells > 0) {
+      log_debug(LD_SCHED,
+                "Scheduler saw pending channel " U64_FORMAT " at %p with "
+                "%d cells writeable",
+                U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
+
+      flushed = 0;
+      while (flushed < n_cells) {
+        flushed_this_time =
+          channel_flush_some_cells(chan,
+                        MIN(MAX_FLUSH_CELLS, (size_t) n_cells - flushed));
+        if (flushed_this_time <= 0) break;
+        flushed += flushed_this_time;
+      }
+
+      if (flushed < n_cells) {
+        /* We ran out of cells to flush */
+        chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+        log_debug(LD_SCHED,
+                  "Channel " U64_FORMAT " at %p "
+                  "entered waiting_for_cells from pending",
+                  U64_PRINTF_ARG(chan->global_identifier),
+                  chan);
+      } else {
+        /* The channel may still have some cells */
+        if (channel_more_to_flush(chan)) {
+        /* The channel goes to either pending or waiting_to_write */
+          if (channel_num_cells_writeable(chan) > 0) {
+            /* Add it back to pending later */
+            if (!to_readd) to_readd = smartlist_new();
+            smartlist_add(to_readd, chan);
+            log_debug(LD_SCHED,
+                      "Channel " U64_FORMAT " at %p "
+                      "is still pending",
+                      U64_PRINTF_ARG(chan->global_identifier),
+                      chan);
+          } else {
+            /* It's waiting to be able to write more */
+            chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+            log_debug(LD_SCHED,
+                      "Channel " U64_FORMAT " at %p "
+                      "entered waiting_to_write from pending",
+                      U64_PRINTF_ARG(chan->global_identifier),
+                      chan);
+          }
+        } else {
+          /* No cells left; it can go to idle or waiting_for_cells */
+          if (channel_num_cells_writeable(chan) > 0) {
+            /*
+             * It can still accept writes, so it goes to
+             * waiting_for_cells
+             */
+            chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+            log_debug(LD_SCHED,
+                      "Channel " U64_FORMAT " at %p "
+                      "entered waiting_for_cells from pending",
+                      U64_PRINTF_ARG(chan->global_identifier),
+                      chan);
+          } else {
+            /*
+             * We exactly filled up the output queue with all available
+             * cells; go to idle.
+             */
+            chan->scheduler_state = SCHED_CHAN_IDLE;
+            log_debug(LD_SCHED,
+                      "Channel " U64_FORMAT " at %p "
+                      "become idle from pending",
+                      U64_PRINTF_ARG(chan->global_identifier),
+                      chan);
+          }
+        }
+      }
+
+      log_debug(LD_SCHED,
+                "Scheduler flushed %d cells onto pending channel "
+                U64_FORMAT " at %p",
+                (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
+                chan);
+    } else {
+      log_info(LD_SCHED,
+               "Scheduler saw pending channel " U64_FORMAT " at %p with "
+               "no cells writeable",
+               U64_PRINTF_ARG(chan->global_identifier), chan);
+      /* Put it back to WAITING_TO_WRITE */
+      chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+    }
+  }
+
+  /* Readd any channels we need to */
+  if (to_readd) {
+    SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, readd_chan) {
+      readd_chan->scheduler_state = SCHED_CHAN_PENDING;
+      smartlist_pqueue_add(cp,
+                           scheduler_compare_channels,
+                           offsetof(channel_t, sched_heap_idx),
+                           readd_chan);
+    } SMARTLIST_FOREACH_END(readd_chan);
+    smartlist_free(to_readd);
+  }
+
+  n_chans_after = smartlist_len(cp);
+  log_debug(LD_SCHED, "Scheduler handled %d of %d pending channels",
+            n_chans_before - n_chans_after, n_chans_before);
+}
+
+scheduler_t *
+get_vanilla_scheduler(void)
+{
+  if (!vanilla_scheduler) {
+    log_debug(LD_SCHED, "Initializing vanilla scheduler struct");
+    vanilla_scheduler = tor_malloc_zero(sizeof(*vanilla_scheduler));
+    vanilla_scheduler->free_all = NULL;
+    vanilla_scheduler->on_channel_free = NULL;
+    vanilla_scheduler->init = NULL;
+    vanilla_scheduler->on_new_consensus = NULL;
+    vanilla_scheduler->schedule = vanilla_scheduler_schedule;
+    vanilla_scheduler->run = vanilla_scheduler_run;
+    vanilla_scheduler->on_new_options = NULL;
+  }
+  return vanilla_scheduler;
+}
+

+ 435 - 190
src/test/test_scheduler.c

@@ -9,8 +9,12 @@
 #define TOR_CHANNEL_INTERNAL_
 #define CHANNEL_PRIVATE_
 #include "or.h"
+#include "config.h"
 #include "compat_libevent.h"
 #include "channel.h"
+#include "channeltls.h"
+#include "connection.h"
+#include "networkstatus.h"
 #define SCHEDULER_PRIVATE_
 #include "scheduler.h"
 
@@ -18,53 +22,65 @@
 #include "test.h"
 #include "fakechans.h"
 
-/* Event base for scheduelr tests */
-static struct event_base *mock_event_base = NULL;
-
-/* Statics controlling mocks */
-static circuitmux_t *mock_ccm_tgt_1 = NULL;
-static circuitmux_t *mock_ccm_tgt_2 = NULL;
+/* Shamelessly stolen from compat_libevent.c */
+#define V(major, minor, patch) \
+  (((major) << 24) | ((minor) << 16) | ((patch) << 8))
 
-static circuitmux_t *mock_cgp_tgt_1 = NULL;
-static circuitmux_policy_t *mock_cgp_val_1 = NULL;
-static circuitmux_t *mock_cgp_tgt_2 = NULL;
-static circuitmux_policy_t *mock_cgp_val_2 = NULL;
+/******************************************************************************
+ * Statistical info
+ *****************************************************************************/
 static int scheduler_compare_channels_mock_ctr = 0;
 static int scheduler_run_mock_ctr = 0;
 
-static void channel_flush_some_cells_mock_free_all(void);
-static void channel_flush_some_cells_mock_set(channel_t *chan,
-                                              ssize_t num_cells);
+/******************************************************************************
+ * Utility functions and things we need to mock
+ *****************************************************************************/
+static or_options_t mocked_options;
+static const or_options_t *
+mock_get_options(void)
+{
+  return &mocked_options;
+}
 
-/* Setup for mock event stuff */
-static void mock_event_free_all(void);
-static void mock_event_init(void);
+static void
+clear_options(void)
+{
+  memset(&mocked_options, 0, sizeof(mocked_options));
+}
 
-/* Mocks used by scheduler tests */
-static ssize_t channel_flush_some_cells_mock(channel_t *chan,
-                                             ssize_t num_cells);
-static int circuitmux_compare_muxes_mock(circuitmux_t *cmux_1,
-                                         circuitmux_t *cmux_2);
-static const circuitmux_policy_t * circuitmux_get_policy_mock(
-    circuitmux_t *cmux);
-static int scheduler_compare_channels_mock(const void *c1_v,
-                                           const void *c2_v);
-static void scheduler_run_noop_mock(void);
-static struct event_base * tor_libevent_get_base_mock(void);
-
-/* Scheduler test cases */
-static void test_scheduler_channel_states(void *arg);
-static void test_scheduler_compare_channels(void *arg);
-static void test_scheduler_initfree(void *arg);
-static void test_scheduler_loop(void *arg);
-static void test_scheduler_queue_heuristic(void *arg);
-
-/* Mock event init/free */
+static int32_t
+mock_vanilla_networkstatus_get_param(
+    const networkstatus_t *ns, const char *param_name, int32_t default_val,
+    int32_t min_val, int32_t max_val)
+{
+  (void)ns;
+  (void)default_val;
+  (void)min_val;
+  (void)max_val;
+  // only support KISTSchedRunInterval right now
+  tor_assert(strcmp(param_name, "KISTSchedRunInterval")==0);
+  return -1;
+}
 
-/* Shamelessly stolen from compat_libevent.c */
-#define V(major, minor, patch) \
-  (((major) << 24) | ((minor) << 16) | ((patch) << 8))
+static int32_t
+mock_kist_networkstatus_get_param(
+    const networkstatus_t *ns, const char *param_name, int32_t default_val,
+    int32_t min_val, int32_t max_val)
+{
+  (void)ns;
+  (void)default_val;
+  (void)min_val;
+  (void)max_val;
+  // only support KISTSchedRunInterval right now
+  tor_assert(strcmp(param_name, "KISTSchedRunInterval")==0);
+  return 12;
+}
 
+/* Event base for scheduelr tests */
+static struct event_base *mock_event_base = NULL;
+/* Setup for mock event stuff */
+static void mock_event_free_all(void);
+static void mock_event_init(void);
 static void
 mock_event_free_all(void)
 {
@@ -110,7 +126,84 @@ mock_event_init(void)
   return;
 }
 
-/* Mocks */
+static struct event_base *
+tor_libevent_get_base_mock(void)
+{
+  return mock_event_base;
+}
+
+static int
+scheduler_compare_channels_mock(const void *c1_v,
+                                const void *c2_v)
+{
+  uintptr_t p1, p2;
+
+  p1 = (uintptr_t)(c1_v);
+  p2 = (uintptr_t)(c2_v);
+
+  ++scheduler_compare_channels_mock_ctr;
+
+  if (p1 == p2) return 0;
+  else if (p1 < p2) return 1;
+  else return -1;
+}
+
+static void
+scheduler_run_noop_mock(void)
+{
+  ++scheduler_run_mock_ctr;
+}
+
+static circuitmux_t *mock_ccm_tgt_1 = NULL;
+static circuitmux_t *mock_ccm_tgt_2 = NULL;
+static circuitmux_t *mock_cgp_tgt_1 = NULL;
+static circuitmux_policy_t *mock_cgp_val_1 = NULL;
+static circuitmux_t *mock_cgp_tgt_2 = NULL;
+static circuitmux_policy_t *mock_cgp_val_2 = NULL;
+
+static const circuitmux_policy_t *
+circuitmux_get_policy_mock(circuitmux_t *cmux)
+{
+  const circuitmux_policy_t *result = NULL;
+
+  tt_assert(cmux != NULL);
+  if (cmux) {
+    if (cmux == mock_cgp_tgt_1) result = mock_cgp_val_1;
+    else if (cmux == mock_cgp_tgt_2) result = mock_cgp_val_2;
+    else result = circuitmux_get_policy__real(cmux);
+  }
+
+ done:
+  return result;
+}
+
+static int
+circuitmux_compare_muxes_mock(circuitmux_t *cmux_1,
+                              circuitmux_t *cmux_2)
+{
+  int result = 0;
+
+  tt_assert(cmux_1 != NULL);
+  tt_assert(cmux_2 != NULL);
+
+  if (cmux_1 != cmux_2) {
+    if (cmux_1 == mock_ccm_tgt_1 && cmux_2 == mock_ccm_tgt_2) result = -1;
+    else if (cmux_1 == mock_ccm_tgt_2 && cmux_2 == mock_ccm_tgt_1) {
+      result = 1;
+    } else {
+      if (cmux_1 == mock_ccm_tgt_1 || cmux_1 == mock_ccm_tgt_2) result = -1;
+      else if (cmux_2 == mock_ccm_tgt_1 || cmux_2 == mock_ccm_tgt_2) {
+        result = 1;
+      } else {
+        result = circuitmux_compare_muxes__real(cmux_1, cmux_2);
+      }
+    }
+  }
+  /* else result = 0 always */
+
+ done:
+  return result;
+}
 
 typedef struct {
   const channel_t *chan;
@@ -174,6 +267,67 @@ channel_flush_some_cells_mock_set(channel_t *chan, ssize_t num_cells)
   }
 }
 
+static int
+channel_more_to_flush_mock(channel_t *chan)
+{
+  tor_assert(chan);
+
+  flush_mock_channel_t *found_mock_ch = NULL;
+
+  /* Check if we have any queued */
+  if (! TOR_SIMPLEQ_EMPTY(&chan->incoming_queue))
+      return 1;
+
+  SMARTLIST_FOREACH_BEGIN(chans_for_flush_mock,
+                          flush_mock_channel_t *,
+                          flush_mock_ch) {
+    if (flush_mock_ch != NULL && flush_mock_ch->chan != NULL) {
+      if (flush_mock_ch->chan == chan) {
+        /* Found it */
+        found_mock_ch = flush_mock_ch;
+        break;
+      }
+    } else {
+      /* That shouldn't be there... */
+      SMARTLIST_DEL_CURRENT(chans_for_flush_mock, flush_mock_ch);
+      tor_free(flush_mock_ch);
+    }
+  } SMARTLIST_FOREACH_END(flush_mock_ch);
+
+  tor_assert(found_mock_ch);
+
+  /* Check if any circuits would like to queue some */
+  /* special for the mock: return the number of cells (instead of 1), or zero
+   * if nothing to flush */
+  return (found_mock_ch->cells > 0 ? (int)found_mock_ch->cells : 0 );
+}
+
+static void
+channel_write_to_kernel_mock(channel_t *chan)
+{
+  (void)chan;
+  //log_debug(LD_SCHED, "chan=%d writing to kernel",
+  //    (int)chan->global_identifier);
+}
+
+static int
+channel_should_write_to_kernel_mock(outbuf_table_t *ot, channel_t *chan)
+{
+  (void)ot;
+  (void)chan;
+  return 1;
+  /* We could make this more complicated if we wanted. But I don't think doing
+   * so tests much of anything */
+  //static int called_counter = 0;
+  //if (++called_counter >= 3) {
+  //  called_counter -= 3;
+  //  log_debug(LD_SCHED, "chan=%d should write to kernel",
+  //      (int)chan->global_identifier);
+  //  return 1;
+  //}
+  //return 0;
+}
+
 static ssize_t
 channel_flush_some_cells_mock(channel_t *chan, ssize_t num_cells)
 {
@@ -215,11 +369,6 @@ channel_flush_some_cells_mock(channel_t *chan, ssize_t num_cells)
 
         flushed += max;
         found->cells -= max;
-
-        if (found->cells <= 0) {
-          smartlist_remove(chans_for_flush_mock, found);
-          tor_free(found);
-        }
       }
     }
   }
@@ -228,90 +377,25 @@ channel_flush_some_cells_mock(channel_t *chan, ssize_t num_cells)
   return flushed;
 }
 
-static int
-circuitmux_compare_muxes_mock(circuitmux_t *cmux_1,
-                              circuitmux_t *cmux_2)
-{
-  int result = 0;
-
-  tt_ptr_op(cmux_1, OP_NE, NULL);
-  tt_ptr_op(cmux_2, OP_NE, NULL);
-
-  if (cmux_1 != cmux_2) {
-    if (cmux_1 == mock_ccm_tgt_1 && cmux_2 == mock_ccm_tgt_2) result = -1;
-    else if (cmux_1 == mock_ccm_tgt_2 && cmux_2 == mock_ccm_tgt_1) {
-      result = 1;
-    } else {
-      if (cmux_1 == mock_ccm_tgt_1 || cmux_1 == mock_ccm_tgt_2) result = -1;
-      else if (cmux_2 == mock_ccm_tgt_1 || cmux_2 == mock_ccm_tgt_2) {
-        result = 1;
-      } else {
-        result = circuitmux_compare_muxes__real(cmux_1, cmux_2);
-      }
-    }
-  }
-  /* else result = 0 always */
-
- done:
-  return result;
-}
-
-static const circuitmux_policy_t *
-circuitmux_get_policy_mock(circuitmux_t *cmux)
-{
-  const circuitmux_policy_t *result = NULL;
-
-  tt_ptr_op(cmux, OP_NE, NULL);
-  if (cmux) {
-    if (cmux == mock_cgp_tgt_1) result = mock_cgp_val_1;
-    else if (cmux == mock_cgp_tgt_2) result = mock_cgp_val_2;
-    else result = circuitmux_get_policy__real(cmux);
-  }
-
- done:
-  return result;
-}
-
-static int
-scheduler_compare_channels_mock(const void *c1_v,
-                                const void *c2_v)
-{
-  uintptr_t p1, p2;
-
-  p1 = (uintptr_t)(c1_v);
-  p2 = (uintptr_t)(c2_v);
-
-  ++scheduler_compare_channels_mock_ctr;
-
-  if (p1 == p2) return 0;
-  else if (p1 < p2) return 1;
-  else return -1;
-}
-
 static void
-scheduler_run_noop_mock(void)
-{
-  ++scheduler_run_mock_ctr;
-}
-
-static struct event_base *
-tor_libevent_get_base_mock(void)
+update_socket_info_impl_mock(socket_table_ent_t *ent)
 {
-  return mock_event_base;
+  ent->cwnd = ent->unacked = ent->mss = ent->notsent = 0;
+  ent->limit = INT_MAX;
 }
 
-/* Test cases */
-
 static void
-test_scheduler_channel_states(void *arg)
+perform_channel_state_tests(int KISTSchedRunInterval)
 {
   channel_t *ch1 = NULL, *ch2 = NULL;
   int old_count;
 
-  (void)arg;
+  /* setup options so we're sure about what sched we are running */
+  MOCK(get_options, mock_get_options);
+  clear_options();
+  mocked_options.KISTSchedRunInterval = KISTSchedRunInterval;
 
   /* Set up libevent and scheduler */
-
   mock_event_init();
   MOCK(tor_libevent_get_base, tor_libevent_get_base_mock);
   scheduler_init();
@@ -324,7 +408,7 @@ test_scheduler_channel_states(void *arg)
    * Disable scheduler_run so we can just check the state transitions
    * without having to make everything it might call work too.
    */
-  MOCK(scheduler_run, scheduler_run_noop_mock);
+  scheduler->run = scheduler_run_noop_mock;
 
   tt_int_op(smartlist_len(channels_pending), OP_EQ, 0);
 
@@ -351,7 +435,7 @@ test_scheduler_channel_states(void *arg)
   channel_register(ch2);
   tt_assert(ch2->registered);
 
-  /* Send it to SCHED_CHAN_WAITING_TO_WRITE */
+  /* Send ch1 to SCHED_CHAN_WAITING_TO_WRITE */
   scheduler_channel_has_waiting_cells(ch1);
   tt_int_op(ch1->scheduler_state, OP_EQ, SCHED_CHAN_WAITING_TO_WRITE);
 
@@ -415,8 +499,8 @@ test_scheduler_channel_states(void *arg)
   tor_free(ch2);
 
   UNMOCK(scheduler_compare_channels);
-  UNMOCK(scheduler_run);
   UNMOCK(tor_libevent_get_base);
+  UNMOCK(get_options);
 
   return;
 }
@@ -502,40 +586,21 @@ test_scheduler_compare_channels(void *arg)
   return;
 }
 
-static void
-test_scheduler_initfree(void *arg)
-{
-  (void)arg;
-
-  tt_ptr_op(channels_pending, OP_EQ, NULL);
-  tt_ptr_op(run_sched_ev, OP_EQ, NULL);
-
-  mock_event_init();
-  MOCK(tor_libevent_get_base, tor_libevent_get_base_mock);
-
-  scheduler_init();
-
-  tt_ptr_op(channels_pending, OP_NE, NULL);
-  tt_ptr_op(run_sched_ev, OP_NE, NULL);
-
-  scheduler_free_all();
-
-  UNMOCK(tor_libevent_get_base);
-  mock_event_free_all();
-
-  tt_ptr_op(channels_pending, OP_EQ, NULL);
-  tt_ptr_op(run_sched_ev, OP_EQ, NULL);
-
- done:
-  return;
-}
+/******************************************************************************
+ * The actual tests!
+ *****************************************************************************/
 
 static void
-test_scheduler_loop(void *arg)
+test_scheduler_loop_vanilla(void *arg)
 {
+  (void)arg;
   channel_t *ch1 = NULL, *ch2 = NULL;
+  void (*run_func_ptr)(void);
 
-  (void)arg;
+  /* setup options so we're sure about what sched we are running */
+  MOCK(get_options, mock_get_options);
+  clear_options();
+  mocked_options.KISTSchedRunInterval = -1;
 
   /* Set up libevent and scheduler */
 
@@ -551,12 +616,14 @@ test_scheduler_loop(void *arg)
    * Disable scheduler_run so we can just check the state transitions
    * without having to make everything it might call work too.
    */
-  MOCK(scheduler_run, scheduler_run_noop_mock);
+  run_func_ptr = scheduler->run;
+  scheduler->run = scheduler_run_noop_mock;
 
   tt_int_op(smartlist_len(channels_pending), OP_EQ, 0);
 
   /* Set up a fake channel */
   ch1 = new_fake_channel();
+  ch1->magic = TLS_CHAN_MAGIC;
   tt_assert(ch1);
 
   /* Start it off in OPENING */
@@ -574,6 +641,7 @@ test_scheduler_loop(void *arg)
 
   /* Now get another one */
   ch2 = new_fake_channel();
+  ch2->magic = TLS_CHAN_MAGIC;
   tt_assert(ch2);
   ch2->state = CHANNEL_STATE_OPENING;
   ch2->cmux = circuitmux_alloc();
@@ -615,15 +683,9 @@ test_scheduler_loop(void *arg)
 
   /*
    * Now we've got two pending channels and need to fire off
-   * scheduler_run(); first, unmock it.
+   * the scheduler run() that we kept.
    */
-
-  UNMOCK(scheduler_run);
-
-  scheduler_run();
-
-  /* Now re-mock it */
-  MOCK(scheduler_run, scheduler_run_noop_mock);
+  run_func_ptr();
 
   /*
    * Assert that they're still in the states we left and aren't still
@@ -661,15 +723,10 @@ test_scheduler_loop(void *arg)
   channel_flush_some_cells_mock_set(ch2, 48);
 
   /*
-   * And re-run the scheduler_run() loop with non-zero returns from
+   * And re-run the scheduler run() loop with non-zero returns from
    * channel_flush_some_cells() this time.
    */
-  UNMOCK(scheduler_run);
-
-  scheduler_run();
-
-  /* Now re-mock it */
-  MOCK(scheduler_run, scheduler_run_noop_mock);
+  run_func_ptr();
 
   /*
    * ch1 should have gone to SCHED_CHAN_WAITING_FOR_CELLS, with 16 flushed
@@ -707,52 +764,240 @@ test_scheduler_loop(void *arg)
 
   UNMOCK(channel_flush_some_cells);
   UNMOCK(scheduler_compare_channels);
-  UNMOCK(scheduler_run);
   UNMOCK(tor_libevent_get_base);
+  UNMOCK(get_options);
+}
+
+static void
+test_scheduler_loop_kist(void *arg)
+{
+  (void) arg;
+  channel_t *ch1 = new_fake_channel(), *ch2 = new_fake_channel();
+
+  /* setup options so we're sure about what sched we are running */
+  MOCK(get_options, mock_get_options);
+  MOCK(channel_flush_some_cells, channel_flush_some_cells_mock);
+  MOCK(channel_more_to_flush, channel_more_to_flush_mock);
+  MOCK(channel_write_to_kernel, channel_write_to_kernel_mock);
+  MOCK(channel_should_write_to_kernel, channel_should_write_to_kernel_mock);
+  MOCK(update_socket_info_impl, update_socket_info_impl_mock);
+  clear_options();
+  mocked_options.KISTSchedRunInterval = 11;
+  scheduler_init();
+
+  tt_assert(ch1);
+  ch1->magic = TLS_CHAN_MAGIC;
+  ch1->state = CHANNEL_STATE_OPENING;
+  ch1->cmux = circuitmux_alloc();
+  channel_register(ch1);
+  tt_assert(ch1->registered);
+  channel_change_state_open(ch1);
+  scheduler_channel_has_waiting_cells(ch1);
+  scheduler_channel_wants_writes(ch1);
+  channel_flush_some_cells_mock_set(ch1, 5);
+
+  tt_assert(ch2);
+  ch2->magic = TLS_CHAN_MAGIC;
+  ch2->state = CHANNEL_STATE_OPENING;
+  ch2->cmux = circuitmux_alloc();
+  channel_register(ch2);
+  tt_assert(ch2->registered);
+  channel_change_state_open(ch2);
+  scheduler_channel_has_waiting_cells(ch2);
+  scheduler_channel_wants_writes(ch2);
+  channel_flush_some_cells_mock_set(ch2, 5);
+
+  scheduler->run();
+
+  scheduler_channel_has_waiting_cells(ch1);
+  channel_flush_some_cells_mock_set(ch1, 5);
+
+  scheduler->run();
+
+  scheduler_channel_has_waiting_cells(ch1);
+  channel_flush_some_cells_mock_set(ch1, 5);
+  scheduler_channel_has_waiting_cells(ch2);
+  channel_flush_some_cells_mock_set(ch2, 5);
+
+  scheduler->run();
+
+  channel_flush_some_cells_mock_free_all();
+  tt_int_op(1,==,1);
+
+ done:
+  /* Prep the channel so the free() function doesn't explode. */
+  ch1->state = ch2->state = CHANNEL_STATE_CLOSED;
+  ch1->registered = ch2->registered = 0;
+  channel_free(ch1);
+  channel_free(ch2);
+  UNMOCK(update_socket_info_impl);
+  UNMOCK(channel_should_write_to_kernel);
+  UNMOCK(channel_write_to_kernel);
+  UNMOCK(channel_more_to_flush);
+  UNMOCK(channel_flush_some_cells);
+  UNMOCK(get_options);
+  scheduler_free_all();
+  return;
 }
 
 static void
-test_scheduler_queue_heuristic(void *arg)
+test_scheduler_channel_states(void *arg)
 {
-  time_t now = approx_time();
-  uint64_t qh;
+  (void)arg;
+  perform_channel_state_tests(-1); // vanilla
+  perform_channel_state_tests(11); // kist
+}
 
+static void
+test_scheduler_initfree(void *arg)
+{
   (void)arg;
 
-  queue_heuristic = 0;
-  queue_heuristic_timestamp = 0;
+  tt_ptr_op(channels_pending, ==, NULL);
+  tt_ptr_op(run_sched_ev, ==, NULL);
+
+  mock_event_init();
+  MOCK(tor_libevent_get_base, tor_libevent_get_base_mock);
 
-  /* Not yet inited case */
-  scheduler_update_queue_heuristic(now - 180);
-  tt_u64_op(queue_heuristic, OP_EQ, 0);
-  tt_int_op(queue_heuristic_timestamp, OP_EQ, now - 180);
+  scheduler_init();
 
-  queue_heuristic = 1000000000L;
-  queue_heuristic_timestamp = now - 120;
+  tt_ptr_op(channels_pending, !=, NULL);
+  tt_ptr_op(run_sched_ev, !=, NULL);
+  /* We have specified nothing in the torrc and there's no consensus so the
+   * KIST scheduler is what should be in use */
+  tt_ptr_op(scheduler, ==, get_kist_scheduler());
+  tt_int_op(sched_run_interval, ==, 10);
 
-  scheduler_update_queue_heuristic(now - 119);
-  tt_u64_op(queue_heuristic, OP_EQ, 500000000L);
-  tt_int_op(queue_heuristic_timestamp, OP_EQ, now - 119);
+  scheduler_free_all();
 
-  scheduler_update_queue_heuristic(now - 116);
-  tt_u64_op(queue_heuristic, OP_EQ, 62500000L);
-  tt_int_op(queue_heuristic_timestamp, OP_EQ, now - 116);
+  UNMOCK(tor_libevent_get_base);
+  mock_event_free_all();
 
-  qh = scheduler_get_queue_heuristic();
-  tt_u64_op(qh, OP_EQ, 0);
+  tt_ptr_op(channels_pending, ==, NULL);
+  tt_ptr_op(run_sched_ev, ==, NULL);
 
  done:
   return;
 }
 
+static void
+test_scheduler_should_use_kist(void *arg)
+{
+  (void)arg;
+
+  int res_should, res_freq;
+  MOCK(get_options, mock_get_options);
+
+  /* Test force disabling of KIST */
+  clear_options();
+  mocked_options.KISTSchedRunInterval = -1;
+  res_should = scheduler_should_use_kist();
+  res_freq = kist_scheduler_run_interval(NULL);
+  tt_int_op(res_should, ==, 0);
+  tt_int_op(res_freq, ==, -1);
+
+  /* Test force enabling of KIST */
+  clear_options();
+  mocked_options.KISTSchedRunInterval = 1234;
+  res_should = scheduler_should_use_kist();
+  res_freq = kist_scheduler_run_interval(NULL);
+  tt_int_op(res_should, ==, 1);
+  tt_int_op(res_freq, ==, 1234);
+
+  /* Test defer to consensus, but no consensus available */
+  clear_options();
+  mocked_options.KISTSchedRunInterval = 0;
+  res_should = scheduler_should_use_kist();
+  res_freq = kist_scheduler_run_interval(NULL);
+  tt_int_op(res_should, ==, 1);
+  tt_int_op(res_freq, ==, 10);
+
+  /* Test defer to consensus, and kist consensus available */
+  MOCK(networkstatus_get_param, mock_kist_networkstatus_get_param);
+  clear_options();
+  mocked_options.KISTSchedRunInterval = 0;
+  res_should = scheduler_should_use_kist();
+  res_freq = kist_scheduler_run_interval(NULL);
+  tt_int_op(res_should, ==, 1);
+  tt_int_op(res_freq, ==, 12);
+  UNMOCK(networkstatus_get_param);
+
+  /* Test defer to consensus, and vanilla consensus available */
+  MOCK(networkstatus_get_param, mock_vanilla_networkstatus_get_param);
+  clear_options();
+  mocked_options.KISTSchedRunInterval = 0;
+  res_should = scheduler_should_use_kist();
+  res_freq = kist_scheduler_run_interval(NULL);
+  tt_int_op(res_should, ==, 0);
+  tt_int_op(res_freq, ==, -1);
+  UNMOCK(networkstatus_get_param);
+
+ done:
+  UNMOCK(get_options);
+  return;
+}
+
+static void
+test_scheduler_ns_changed(void *arg)
+{
+  (void) arg;
+
+  /*
+   * Currently no scheduler implementations use the old/new consensuses passed
+   * in scheduler_notify_networkstatus_changed, so it is okay to pass NULL.
+   *
+   * "But then what does test actually exercise???" It tests that
+   * scheduler_notify_networkstatus_changed fetches the correct value from the
+   * consensus, and then switches the scheduler if necessasry.
+   */
+
+  MOCK(get_options, mock_get_options);
+  clear_options();
+
+  tt_ptr_op(scheduler, ==, NULL);
+
+  /* Change from vanilla to kist via consensus */
+  scheduler = get_vanilla_scheduler();
+  MOCK(networkstatus_get_param, mock_kist_networkstatus_get_param);
+  scheduler_notify_networkstatus_changed(NULL, NULL);
+  UNMOCK(networkstatus_get_param);
+  tt_ptr_op(scheduler, ==, get_kist_scheduler());
+
+  /* Change from kist to vanilla via consensus */
+  scheduler = get_kist_scheduler();
+  MOCK(networkstatus_get_param, mock_vanilla_networkstatus_get_param);
+  scheduler_notify_networkstatus_changed(NULL, NULL);
+  UNMOCK(networkstatus_get_param);
+  tt_ptr_op(scheduler, ==, get_vanilla_scheduler());
+
+  /* Doesn't change when using KIST */
+  scheduler = get_kist_scheduler();
+  MOCK(networkstatus_get_param, mock_kist_networkstatus_get_param);
+  scheduler_notify_networkstatus_changed(NULL, NULL);
+  UNMOCK(networkstatus_get_param);
+  tt_ptr_op(scheduler, ==, get_kist_scheduler());
+
+  /* Doesn't change when using vanilla */
+  scheduler = get_vanilla_scheduler();
+  MOCK(networkstatus_get_param, mock_vanilla_networkstatus_get_param);
+  scheduler_notify_networkstatus_changed(NULL, NULL);
+  UNMOCK(networkstatus_get_param);
+  tt_ptr_op(scheduler, ==, get_vanilla_scheduler());
+
+ done:
+  UNMOCK(get_options);
+  return;
+}
+
 struct testcase_t scheduler_tests[] = {
-  { "channel_states", test_scheduler_channel_states, TT_FORK, NULL, NULL },
   { "compare_channels", test_scheduler_compare_channels,
     TT_FORK, NULL, NULL },
+  { "channel_states", test_scheduler_channel_states, TT_FORK, NULL, NULL },
   { "initfree", test_scheduler_initfree, TT_FORK, NULL, NULL },
-  { "loop", test_scheduler_loop, TT_FORK, NULL, NULL },
-  { "queue_heuristic", test_scheduler_queue_heuristic,
-    TT_FORK, NULL, NULL },
+  { "loop_vanilla", test_scheduler_loop_vanilla, TT_FORK, NULL, NULL },
+  { "loop_kist", test_scheduler_loop_kist, TT_FORK, NULL, NULL },
+  { "ns_changed", test_scheduler_ns_changed, TT_FORK, NULL, NULL},
+  { "should_use_kist", test_scheduler_should_use_kist, TT_FORK, NULL, NULL },
   END_OF_TESTCASES
 };