dispatch_core.c 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. /* Copyright (c) 2001, Matej Pfajfar.
  2. * Copyright (c) 2001-2004, Roger Dingledine.
  3. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  4. * Copyright (c) 2007-2018, The Tor Project, Inc. */
  5. /* See LICENSE for licensing information */
  6. /**
  7. * \file dispatch_core.c
  8. * \brief Core module for sending and receiving messages.
  9. */
  10. #define DISPATCH_PRIVATE
  11. #include "orconfig.h"
  12. #include "lib/dispatch/dispatch.h"
  13. #include "lib/dispatch/dispatch_st.h"
  14. #include "lib/malloc/malloc.h"
  15. #include "lib/log/util_bug.h"
  16. #include <string.h>
  17. /**
  18. * Use <b>d</b> to drop all storage held for <b>msg</b>.
  19. *
  20. * (We need the dispatcher so we know how to free the auxiliary data.)
  21. **/
  22. void
  23. dispatch_free_msg_(const dispatch_t *d, msg_t *msg)
  24. {
  25. if (!msg)
  26. return;
  27. d->typefns[msg->type].free_fn(msg->aux_data__);
  28. tor_free(msg);
  29. }
  30. /**
  31. * Format the auxiliary data held by msg.
  32. **/
  33. char *
  34. dispatch_fmt_msg_data(const dispatch_t *d, const msg_t *msg)
  35. {
  36. if (!msg)
  37. return NULL;
  38. return d->typefns[msg->type].fmt_fn(msg->aux_data__);
  39. }
  40. /**
  41. * Release all storage held by <b>d</b>.
  42. **/
  43. void
  44. dispatch_free_(dispatch_t *d)
  45. {
  46. if (d == NULL)
  47. return;
  48. size_t n_queues = d->n_queues;
  49. for (size_t i = 0; i < n_queues; ++i) {
  50. msg_t *m, *mtmp;
  51. TOR_SIMPLEQ_FOREACH_SAFE(m, &d->queues[i].queue, next, mtmp) {
  52. dispatch_free_msg(d, m);
  53. }
  54. }
  55. size_t n_msgs = d->n_msgs;
  56. for (size_t i = 0; i < n_msgs; ++i) {
  57. tor_free(d->table[i]);
  58. }
  59. tor_free(d->table);
  60. tor_free(d->typefns);
  61. tor_free(d->queues);
  62. // This is the only time we will treat d->cfg as non-const.
  63. //dispatch_cfg_free_((dispatch_items_t *) d->cfg);
  64. tor_free(d);
  65. }
  66. /**
  67. * Tell the dispatcher to call <b>fn</b> with <b>userdata</b> whenever
  68. * <b>chan</b> becomes nonempty. Return 0 on success, -1 on error.
  69. **/
  70. int
  71. dispatch_set_alert_fn(dispatch_t *d, channel_id_t chan,
  72. dispatch_alertfn_t fn, void *userdata)
  73. {
  74. if (BUG(chan >= d->n_queues))
  75. return -1;
  76. dqueue_t *q = &d->queues[chan];
  77. q->alert_fn = fn;
  78. q->alert_fn_arg = userdata;
  79. return 0;
  80. }
  81. /**
  82. * Send a message on the appropriate channel notifying that channel if
  83. * necessary.
  84. *
  85. * This function takes ownership of the auxiliary data; it can't be static or
  86. * stack-allocated, and the caller is not allowed to use it afterwards.
  87. *
  88. * This function does not check the various vields of the message object for
  89. * consistency.
  90. **/
  91. int
  92. dispatch_send(dispatch_t *d,
  93. subsys_id_t sender,
  94. channel_id_t channel,
  95. message_id_t msg,
  96. msg_type_id_t type,
  97. msg_aux_data_t auxdata)
  98. {
  99. if (!d->table[msg]) {
  100. /* Fast path: nobody wants this data. */
  101. d->typefns[type].free_fn(auxdata);
  102. return 0;
  103. }
  104. msg_t *m = tor_malloc(sizeof(msg_t));
  105. m->sender = sender;
  106. m->channel = channel;
  107. m->msg = msg;
  108. m->type = type;
  109. memcpy(&m->aux_data__, &auxdata, sizeof(msg_aux_data_t));
  110. return dispatch_send_msg(d, m);
  111. }
  112. int
  113. dispatch_send_msg(dispatch_t *d, msg_t *m)
  114. {
  115. if (BUG(!d))
  116. goto err;
  117. if (BUG(!m))
  118. goto err;
  119. if (BUG(m->channel >= d->n_queues))
  120. goto err;
  121. if (BUG(m->msg >= d->n_msgs))
  122. goto err;
  123. dtbl_entry_t *ent = d->table[m->msg];
  124. if (ent) {
  125. if (BUG(m->type != ent->type))
  126. goto err;
  127. if (BUG(m->channel != ent->channel))
  128. goto err;
  129. }
  130. return dispatch_send_msg_unchecked(d, m);
  131. err:
  132. /* Probably it isn't safe to free m, since type could be wrong. */
  133. return -1;
  134. }
  135. /**
  136. * Send a message on the appropriate queue, notifying that queue if necessary.
  137. *
  138. * This function takes ownership of the message object and its auxiliary data;
  139. * it can't be static or stack-allocated, and the caller isn't allowed to use
  140. * it afterwards.
  141. *
  142. * This function does not check the various fields of the message object for
  143. * consistency, and can crash if they are out of range. Only functions that
  144. * have already constructed the message in a safe way, or checked it for
  145. * correctness themselves, should call this function.
  146. **/
  147. int
  148. dispatch_send_msg_unchecked(dispatch_t *d, msg_t *m)
  149. {
  150. /* Find the right queue. */
  151. dqueue_t *q = &d->queues[m->channel];
  152. bool was_empty = TOR_SIMPLEQ_EMPTY(&q->queue);
  153. /* Append the message. */
  154. TOR_SIMPLEQ_INSERT_TAIL(&q->queue, m, next);
  155. /* If we just made the queue nonempty for the first time, call the alert
  156. * function. */
  157. if (was_empty) {
  158. q->alert_fn(d, m->channel, q->alert_fn_arg);
  159. }
  160. return 0;
  161. }
  162. /**
  163. * Run all of the callbacks on <b>d</b> associated with <b>m</b>.
  164. **/
  165. static void
  166. dispatcher_run_msg_cbs(const dispatch_t *d, msg_t *m)
  167. {
  168. tor_assert(m->msg <= d->n_msgs);
  169. dtbl_entry_t *ent = d->table[m->msg];
  170. int n_fns = ent->n_fns;
  171. int i;
  172. for (i=0; i < n_fns; ++i) {
  173. if (ent->rcv[i].enabled)
  174. ent->rcv[i].fn(m);
  175. }
  176. }
  177. /**
  178. * Run up to <b>max_msgs</b> callbacks for messages on the channel <b>ch</b>
  179. * on the given dispatcher. Return 0 on success or recoverable failure,
  180. * -1 on unrecoverable error.
  181. **/
  182. int
  183. dispatch_flush(dispatch_t *d, channel_id_t ch, int max_msgs)
  184. {
  185. if (BUG(ch >= d->n_queues))
  186. return 0;
  187. int n_flushed = 0;
  188. dqueue_t *q = &d->queues[ch];
  189. while (n_flushed < max_msgs) {
  190. msg_t *m = TOR_SIMPLEQ_FIRST(&q->queue);
  191. if (!m)
  192. break;
  193. TOR_SIMPLEQ_REMOVE_HEAD(&q->queue, next);
  194. dispatcher_run_msg_cbs(d, m);
  195. dispatch_free_msg(d, m);
  196. ++n_flushed;
  197. }
  198. return 0;
  199. }