dispatch_core.c 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. /* Copyright (c) 2001, Matej Pfajfar.
  2. * Copyright (c) 2001-2004, Roger Dingledine.
  3. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  4. * Copyright (c) 2007-2018, The Tor Project, Inc. */
  5. /* See LICENSE for licensing information */
  6. /**
  7. * \file dispatch_core.c
  8. * \brief Core module for sending and receiving messages.
  9. */
  10. #define DISPATCH_PRIVATE
  11. #include "orconfig.h"
  12. #include "lib/dispatch/dispatch.h"
  13. #include "lib/dispatch/dispatch_st.h"
  14. #include "lib/dispatch/dispatch_naming.h"
  15. #include "lib/malloc/malloc.h"
  16. #include "lib/log/util_bug.h"
  17. #include <string.h>
  18. /**
  19. * Use <b>d</b> to drop all storage held for <b>msg</b>.
  20. *
  21. * (We need the dispatcher so we know how to free the auxiliary data.)
  22. **/
  23. void
  24. dispatch_free_msg_(const dispatch_t *d, msg_t *msg)
  25. {
  26. if (!msg)
  27. return;
  28. d->typefns[msg->type].free_fn(msg->aux_data__);
  29. tor_free(msg);
  30. }
  31. /**
  32. * Format the auxiliary data held by msg.
  33. **/
  34. char *
  35. dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
  36. {
  37. if (!msg)
  38. return NULL;
  39. return d->typefns[msg->type].fmt_fn(msg->aux_data__);
  40. }
  41. /**
  42. * Release all storage held by <b>d</b>.
  43. **/
  44. void
  45. dispatch_free_(dispatch_t *d)
  46. {
  47. if (d == NULL)
  48. return;
  49. size_t n_queues = d->n_queues;
  50. for (size_t i = 0; i < n_queues; ++i) {
  51. msg_t *m, *mtmp;
  52. TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) {
  53. dispatch_free_msg(d, m);
  54. }
  55. }
  56. size_t n_msgs = d->n_msgs;
  57. for (size_t i = 0; i < n_msgs; ++i) {
  58. tor_free(d->table[i]);
  59. }
  60. tor_free(d->table);
  61. tor_free(d->typefns);
  62. tor_free(d->queues);
  63. // This is the only time we will treat d->cfg as non-const.
  64. //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
  65. tor_free(d);
  66. }
  67. /**
  68. * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
  69. * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error.
  70. **/
  71. int
  72. dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
  73. dispatch_alertfn_t fn, void *userdata)
  74. {
  75. if (BUG(chan >= d->n_queues))
  76. return -1;
  77. dqueue_t *q = &d->queues[chan];
  78. q->alert_fn = fn;
  79. q->alert_fn_arg = userdata;
  80. return 0;
  81. }
  82. /**
  83. * Send a message on the appropriate channel notifying that channel if
  84. * necessary.
  85. *
  86. * This function takes ownership of the auxiliary data; it can't be static or
  87. * stack-allocated, and the caller is not allowed to use it afterwards.
  88. *
  89. * This function does not check the various vields of the message object for
  90. * consistency.
  91. **/
  92. int
  93. dispatch_send(dispatch_t *d,
  94. subsys_id_t sender,
  95. channel_id_t channel,
  96. message_id_t msg,
  97. msg_type_id_t type,
  98. msg_aux_data_t auxdata)
  99. {
  100. if (!d->table[msg]) {
  101. /* Fast path: nobody wants this data. */
  102. d->typefns[type].free_fn(auxdata);
  103. return 0;
  104. }
  105. msg_t *m = tor_malloc(sizeof(msg_t));
  106. m->sender = sender;
  107. m->channel = channel;
  108. m->msg = msg;
  109. m->type = type;
  110. memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t));
  111. return dispatch_send_msg(d, m);
  112. }
  113. int
  114. dispatch_send_msg(dispatch_t *d, msg_t *m)
  115. {
  116. if (BUG(!d))
  117. goto err;
  118. if (BUG(!m))
  119. goto err;
  120. if (BUG(m->channel >= d->n_queues))
  121. goto err;
  122. if (BUG(m->msg >= d->n_msgs))
  123. goto err;
  124. dtbl_entry_t *ent = d->table[m->msg];
  125. if (ent) {
  126. if (BUG(m->type != ent->type))
  127. goto err;
  128. if (BUG(m->channel != ent->channel))
  129. goto err;
  130. }
  131. return dispatch_send_msg_unchecked(d, m);
  132. err:
  133. /* Probably it isn't safe to free m, since type could be wrong. */
  134. return -1;
  135. }
  136. /**
  137. * Send a message on the appropriate queue, notifying that queue if necessary.
  138. *
  139. * This function takes ownership of the message object and its auxiliary data;
  140. * it can't be static or stack-allocated, and the caller isn't allowed to use
  141. * it afterwards.
  142. *
  143. * This function does not check the various fields of the message object for
  144. * consistency, and can crash if they are out of range. Only functions that
  145. * have already constructed the message in a safe way, or checked it for
  146. * correctness themselves, should call this function.
  147. **/
  148. int
  149. dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
  150. {
  151. /* Find the right queue. */
  152. dqueue_t *q = &d->queues[m->channel];
  153. bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue);
  154. /* Append the message. */
  155. TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
  156. if (debug_logging_enabled()) {
  157. char *arg = dispatch_fmt_msg_data(d, m);
  158. log_debug(LD_MESG,
  159. "Queued: %s (%s) from %s, on %s.",
  160. get_message_id_name(m->msg),
  161. arg,
  162. get_subsys_id_name(m->sender),
  163. get_channel_id_name(m->channel));
  164. tor_free(arg);
  165. }
  166. /* If we just made the queue nonempty for the first time, call the alert
  167. * function. */
  168. if (was_empty) {
  169. q->alert_fn(d, m->channel, q->alert_fn_arg);
  170. }
  171. return 0;
  172. }
  173. /**
  174. * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
  175. **/
  176. static void
  177. dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
  178. {
  179. tor_assert(m->msg <= d->n_msgs);
  180. dtbl_entry_t *ent = d->table[m->msg];
  181. int n_fns = ent->n_fns;
  182. if (debug_logging_enabled()) {
  183. char *arg = dispatch_fmt_msg_data(d, m);
  184. log_debug(LD_MESG,
  185. "Delivering: %s (%s) from %s, on %s:",
  186. get_message_id_name(m->msg),
  187. arg,
  188. get_subsys_id_name(m->sender),
  189. get_channel_id_name(m->channel));
  190. tor_free(arg);
  191. }
  192. int i;
  193. for (i=0; i < n_fns; ++i) {
  194. if (ent->rcv[i].enabled) {
  195. log_debug(LD_MESG, " Delivering to %s.",
  196. get_subsys_id_name(ent->rcv[i].sys));
  197. ent->rcv[i].fn(m);
  198. }
  199. }
  200. }
  201. /**
  202. * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
  203. * on the given dispatcher. Return 0 on success or recoverable failure,
  204. * -1 on unrecoverable error.
  205. **/
  206. int
  207. dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
  208. {
  209. if (BUG(ch >= d->n_queues))
  210. return 0;
  211. int n_flushed = 0;
  212. dqueue_t *q = &d->queues[ch];
  213. while (n_flushed < max_msgs) {
  214. msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue);
  215. if (!m)
  216. break;
  217. TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next);
  218. dispatcher_run_msg_cbs(d, m);
  219. dispatch_free_msg(d, m);
  220. ++n_flushed;
  221. }
  222. return 0;
  223. }