mainloop_pubsub.c 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /* Copyright (c) 2001, Matej Pfajfar.
  2. * Copyright (c) 2001-2004, Roger Dingledine.
  3. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  4. * Copyright (c) 2007-2018, The Tor Project, Inc. */
  5. /* See LICENSE for licensing information */
  6. #include "orconfig.h"
  7. #include "core/or/or.h"
  8. #include "core/mainloop/mainloop.h"
  9. #include "core/mainloop/mainloop_pubsub.h"
  10. #include "lib/container/smartlist.h"
  11. #include "lib/dispatch/dispatch.h"
  12. #include "lib/dispatch/dispatch_naming.h"
  13. #include "lib/evloop/compat_libevent.h"
  14. #include "lib/pubsub/pubsub.h"
  15. #include "lib/pubsub/pubsub_build.h"
  16. /**
  17. * Dispatcher to use for delivering messages.
  18. **/
  19. static dispatch_t *the_dispatcher = NULL;
  20. static pubsub_items_t *the_pubsub_items = NULL;
  21. /**
  22. * A list of mainloop_event_t, indexed by channel ID, to flush the messages
  23. * on a channel.
  24. **/
  25. static smartlist_t *alert_events = NULL;
  26. /**
  27. * Mainloop event callback: flush all the messages in a channel.
  28. *
  29. * The channel is encoded as a pointer, and passed via arg.
  30. **/
  31. static void
  32. flush_channel_event(mainloop_event_t *ev, void *arg)
  33. {
  34. (void)ev;
  35. if (!the_dispatcher)
  36. return;
  37. channel_id_t chan = (channel_id_t)(uintptr_t)(arg);
  38. dispatch_flush(the_dispatcher, chan, INT_MAX);
  39. }
  40. /**
  41. * Construct our global pubsub object from <b>builder</b>. Return 0 on
  42. * success, -1 on failure. */
  43. int
  44. tor_mainloop_connect_pubsub(struct pubsub_builder_t *builder)
  45. {
  46. int rv = -1;
  47. tor_mainloop_disconnect_pubsub();
  48. the_dispatcher = pubsub_builder_finalize(builder, &the_pubsub_items);
  49. if (! the_dispatcher)
  50. goto err;
  51. rv = 0;
  52. goto done;
  53. err:
  54. tor_mainloop_disconnect_pubsub();
  55. done:
  56. return rv;
  57. }
  58. /**
  59. * Install libevent events for all of the pubsub channels.
  60. *
  61. * Invoke this after tor_mainloop_connect_pubsub, and after libevent has been
  62. * initialized.
  63. */
  64. void
  65. tor_mainloop_connect_pubsub_events(void)
  66. {
  67. tor_assert(the_dispatcher);
  68. tor_assert(! alert_events);
  69. const size_t num_channels = get_num_channel_ids();
  70. alert_events = smartlist_new();
  71. for (size_t i = 0; i < num_channels; ++i) {
  72. smartlist_add(alert_events,
  73. mainloop_event_postloop_new(flush_channel_event,
  74. (void*)(uintptr_t)(i)));
  75. }
  76. }
  77. /**
  78. * Dispatch alertfn callback: do nothing. Implements DELIV_NEVER.
  79. **/
  80. static void
  81. alertfn_never(dispatch_t *d, channel_id_t chan, void *arg)
  82. {
  83. (void)d;
  84. (void)chan;
  85. (void)arg;
  86. }
  87. /**
  88. * Dispatch alertfn callback: activate a mainloop event. Implements
  89. * DELIV_PROMPT.
  90. **/
  91. static void
  92. alertfn_prompt(dispatch_t *d, channel_id_t chan, void *arg)
  93. {
  94. (void)d;
  95. (void)chan;
  96. mainloop_event_t *event = arg;
  97. mainloop_event_activate(event);
  98. }
  99. /**
  100. * Dispatch alertfn callback: flush all messages right now. Implements
  101. * DELIV_IMMEDIATE.
  102. **/
  103. static void
  104. alertfn_immediate(dispatch_t *d, channel_id_t chan, void *arg)
  105. {
  106. (void) arg;
  107. dispatch_flush(d, chan, INT_MAX);
  108. }
  109. /**
  110. * Set the strategy to be used for delivering messages on the named channel.
  111. *
  112. * This function needs to be called once globally for each channel, to
  113. * set up how messages are delivered.
  114. **/
  115. int
  116. tor_mainloop_set_delivery_strategy(const char *msg_channel_name,
  117. deliv_strategy_t strategy)
  118. {
  119. channel_id_t chan = get_channel_id(msg_channel_name);
  120. if (BUG(chan == ERROR_ID) ||
  121. BUG(chan >= smartlist_len(alert_events)))
  122. return -1;
  123. switch (strategy) {
  124. case DELIV_NEVER:
  125. dispatch_set_alert_fn(the_dispatcher, chan, alertfn_never, NULL);
  126. break;
  127. case DELIV_PROMPT:
  128. dispatch_set_alert_fn(the_dispatcher, chan, alertfn_prompt,
  129. smartlist_get(alert_events, chan));
  130. break;
  131. case DELIV_IMMEDIATE:
  132. dispatch_set_alert_fn(the_dispatcher, chan, alertfn_immediate, NULL);
  133. break;
  134. }
  135. return 0;
  136. }
  137. /**
  138. * Remove all pubsub dispatchers and events from the mainloop.
  139. **/
  140. void
  141. tor_mainloop_disconnect_pubsub(void)
  142. {
  143. if (the_pubsub_items) {
  144. pubsub_items_clear_bindings(the_pubsub_items);
  145. pubsub_items_free(the_pubsub_items);
  146. }
  147. if (alert_events) {
  148. SMARTLIST_FOREACH(alert_events, mainloop_event_t *, ev,
  149. mainloop_event_free(ev));
  150. smartlist_free(alert_events);
  151. }
  152. dispatch_free(the_dispatcher);
  153. }