pubsub_build.c 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. /* Copyright (c) 2001, Matej Pfajfar.
  2. * Copyright (c) 2001-2004, Roger Dingledine.
  3. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  4. * Copyright (c) 2007-2018, The Tor Project, Inc. */
  5. /* See LICENSE for licensing information */
  6. /**
  7. * @file pubsub_build.c
  8. * @brief Construct a dispatch_t in safer, more OO way.
  9. **/
  10. #define PUBSUB_PRIVATE
  11. #include "lib/dispatch/dispatch.h"
  12. #include "lib/dispatch/dispatch_cfg.h"
  13. #include "lib/dispatch/dispatch_naming.h"
  14. #include "lib/dispatch/msgtypes.h"
  15. #include "lib/pubsub/pubsub_flags.h"
  16. #include "lib/pubsub/pub_binding_st.h"
  17. #include "lib/pubsub/pubsub_build.h"
  18. #include "lib/pubsub/pubsub_builder_st.h"
  19. #include "lib/pubsub/pubsub_connect.h"
  20. #include "lib/container/smartlist.h"
  21. #include "lib/log/util_bug.h"
  22. #include "lib/malloc/malloc.h"
  23. #include <string.h>
  24. /** Construct and return a new empty pubsub_items_t. */
  25. static pubsub_items_t *
  26. pubsub_items_new(void)
  27. {
  28. pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg));
  29. cfg->items = smartlist_new();
  30. cfg->type_items = smartlist_new();
  31. return cfg;
  32. }
  33. /** Release all storage held in a pubsub_items_t. */
  34. void
  35. pubsub_items_free_(pubsub_items_t *cfg)
  36. {
  37. if (! cfg)
  38. return;
  39. SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item));
  40. SMARTLIST_FOREACH(cfg->type_items,
  41. pubsub_type_cfg_t *, item, tor_free(item));
  42. smartlist_free(cfg->items);
  43. smartlist_free(cfg->type_items);
  44. tor_free(cfg);
  45. }
  46. /** Construct and return a new pubsub_builder_t. */
  47. pubsub_builder_t *
  48. pubsub_builder_new(void)
  49. {
  50. dispatch_naming_init();
  51. pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb));
  52. pb->cfg = dcfg_new();
  53. pb->items = pubsub_items_new();
  54. return pb;
  55. }
  56. /**
  57. * Release all storage held by a pubsub_builder_t.
  58. *
  59. * You'll (mostly) only want to call this function on an error case: if you're
  60. * constructing a dispatch_t instead, you should call
  61. * pubsub_builder_finalize() to consume the pubsub_builder_t.
  62. */
  63. void
  64. pubsub_builder_free_(pubsub_builder_t *pb)
  65. {
  66. if (pb == NULL)
  67. return;
  68. pubsub_items_free(pb->items);
  69. dcfg_free(pb->cfg);
  70. tor_free(pb);
  71. }
  72. /**
  73. * Create and return a pubsub_connector_t for the subsystem with ID
  74. * <b>subsys</b> to use in adding publications, subscriptions, and types to
  75. * <b>builder</b>.
  76. **/
  77. pubsub_connector_t *
  78. pubsub_connector_for_subsystem(pubsub_builder_t *builder,
  79. subsys_id_t subsys)
  80. {
  81. tor_assert(builder);
  82. ++builder->n_connectors;
  83. pubsub_connector_t *con = tor_malloc_zero(sizeof(*con));
  84. con->builder = builder;
  85. con->subsys_id = subsys;
  86. return con;
  87. }
  88. /**
  89. * Release all storage held by a pubsub_connector_t.
  90. **/
  91. void
  92. pubsub_connector_free_(pubsub_connector_t *con)
  93. {
  94. if (!con)
  95. return;
  96. if (con->builder) {
  97. --con->builder->n_connectors;
  98. tor_assert(con->builder->n_connectors >= 0);
  99. }
  100. tor_free(con);
  101. }
  102. /**
  103. * Use <b>con</b> to add a request for being able to publish messages of type
  104. * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>.
  105. **/
  106. int
  107. pubsub_add_pub_(pubsub_connector_t *con,
  108. pub_binding_t *out,
  109. channel_id_t channel,
  110. message_id_t msg,
  111. msg_type_id_t type,
  112. unsigned flags,
  113. const char *file,
  114. unsigned line)
  115. {
  116. pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
  117. memset(out, 0, sizeof(*out));
  118. cfg->is_publish = true;
  119. out->msg_template.sender = cfg->subsys = con->subsys_id;
  120. out->msg_template.channel = cfg->channel = channel;
  121. out->msg_template.msg = cfg->msg = msg;
  122. out->msg_template.type = cfg->type = type;
  123. cfg->flags = flags;
  124. cfg->added_by_file = file;
  125. cfg->added_by_line = line;
  126. /* We're grabbing a pointer to the pub_binding_t so we can tell it about
  127. * the dispatcher later on.
  128. */
  129. cfg->pub_binding = out;
  130. smartlist_add(con->builder->items->items, cfg);
  131. if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
  132. goto err;
  133. if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
  134. goto err;
  135. return 0;
  136. err:
  137. ++con->builder->n_errors;
  138. return -1;
  139. }
  140. /**
  141. * Use <b>con</b> to add a request for being able to publish messages of type
  142. * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>,
  143. * passing them to the callback in <b>recv_fn</b>.
  144. **/
  145. int
  146. pubsub_add_sub_(pubsub_connector_t *con,
  147. recv_fn_t recv_fn,
  148. channel_id_t channel,
  149. message_id_t msg,
  150. msg_type_id_t type,
  151. unsigned flags,
  152. const char *file,
  153. unsigned line)
  154. {
  155. pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
  156. cfg->is_publish = false;
  157. cfg->subsys = con->subsys_id;
  158. cfg->channel = channel;
  159. cfg->msg = msg;
  160. cfg->type = type;
  161. cfg->flags = flags;
  162. cfg->added_by_file = file;
  163. cfg->added_by_line = line;
  164. cfg->recv_fn = recv_fn;
  165. smartlist_add(con->builder->items->items, cfg);
  166. if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
  167. goto err;
  168. if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
  169. goto err;
  170. if (! (flags & DISP_FLAG_STUB)) {
  171. if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0)
  172. goto err;
  173. }
  174. return 0;
  175. err:
  176. ++con->builder->n_errors;
  177. return -1;
  178. }
  179. /**
  180. * Use <b>con</b> to define the functions to use for manipulating the type
  181. * <b>type</b>. Any function pointers left as NULL will be implemented as
  182. * no-ops.
  183. **/
  184. int
  185. pubsub_connector_register_type_(pubsub_connector_t *con,
  186. msg_type_id_t type,
  187. dispatch_typefns_t *fns,
  188. const char *file,
  189. unsigned line)
  190. {
  191. pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
  192. cfg->type = type;
  193. memcpy(&cfg->fns, fns, sizeof(*fns));
  194. cfg->subsys = con->subsys_id;
  195. cfg->added_by_file = file;
  196. cfg->added_by_line = line;
  197. smartlist_add(con->builder->items->type_items, cfg);
  198. if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0)
  199. goto err;
  200. return 0;
  201. err:
  202. ++con->builder->n_errors;
  203. return -1;
  204. }
  205. /**
  206. * Initialize the dispatch_ptr field in every relevant publish binding
  207. * for <b>d</b>.
  208. */
  209. static void
  210. pubsub_items_install_bindings(pubsub_items_t *items,
  211. dispatch_t *d)
  212. {
  213. SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
  214. if (cfg->pub_binding) {
  215. // XXXX we could skip this for STUB publishers, and for any publishers
  216. // XXXX where all subscribers are STUB.
  217. cfg->pub_binding->dispatch_ptr = d;
  218. }
  219. } SMARTLIST_FOREACH_END(cfg);
  220. }
  221. /**
  222. * Remove the dispatch_ptr fields for all the relevant publish bindings
  223. * in <b>items</b>. The prevents subsequent dispatch_pub_() calls from
  224. * sending messages to a dispatcher that has been freed.
  225. **/
  226. void
  227. pubsub_items_clear_bindings(pubsub_items_t *items)
  228. {
  229. SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
  230. if (cfg->pub_binding) {
  231. cfg->pub_binding->dispatch_ptr = NULL;
  232. }
  233. } SMARTLIST_FOREACH_END(cfg);
  234. }
  235. /**
  236. * Create a new dispatcher as configured in a pubsub_builder_t.
  237. *
  238. * Consumes and frees its input.
  239. **/
  240. dispatch_t *
  241. pubsub_builder_finalize(pubsub_builder_t *builder,
  242. pubsub_items_t **items_out)
  243. {
  244. dispatch_t *dispatcher = NULL;
  245. tor_assert_nonfatal(builder->n_connectors == 0);
  246. if (pubsub_builder_check(builder) < 0)
  247. goto err;
  248. if (builder->n_errors) {
  249. log_warn(LD_GENERAL, "At least one error occurred previously when "
  250. "configuring the dispatcher.");
  251. goto err;
  252. }
  253. dispatcher = dispatch_new(builder->cfg);
  254. if (!dispatcher)
  255. goto err;
  256. pubsub_items_install_bindings(builder->items, dispatcher);
  257. if (items_out) {
  258. *items_out = builder->items;
  259. builder->items = NULL; /* Prevent free */
  260. }
  261. err:
  262. pubsub_builder_free(builder);
  263. return dispatcher;
  264. }