test_pubsub_msg.c 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. /* Copyright (c) 2018, The Tor Project, Inc. */
  2. /* See LICENSE for licensing information */
  3. #define DISPATCH_PRIVATE
  4. #include "test/test.h"
  5. #include "lib/dispatch/dispatch.h"
  6. #include "lib/dispatch/dispatch_naming.h"
  7. #include "lib/dispatch/dispatch_st.h"
  8. #include "lib/dispatch/msgtypes.h"
  9. #include "lib/pubsub/pubsub_flags.h"
  10. #include "lib/pubsub/pub_binding_st.h"
  11. #include "lib/pubsub/pubsub_build.h"
  12. #include "lib/pubsub/pubsub_builder_st.h"
  13. #include "lib/pubsub/pubsub_connect.h"
  14. #include "lib/pubsub/pubsub_publish.h"
  15. #include "lib/log/escape.h"
  16. #include "lib/malloc/malloc.h"
  17. #include "lib/string/printf.h"
  18. #include <stdio.h>
  19. #include <string.h>
  20. static char *
  21. ex_str_fmt(msg_aux_data_t aux)
  22. {
  23. return esc_for_log(aux.ptr);
  24. }
  25. static void
  26. ex_str_free(msg_aux_data_t aux)
  27. {
  28. tor_free_(aux.ptr);
  29. }
  30. static dispatch_typefns_t stringfns = {
  31. .free_fn = ex_str_free,
  32. .fmt_fn = ex_str_fmt
  33. };
  34. // We're using the lowest-level publish/subscribe logic here, to avoid the
  35. // pubsub_macros.h macros and just test the dispatch core. We'll use a string
  36. // type for everything.
  37. #define DECLARE_MESSAGE(suffix) \
  38. static pub_binding_t pub_binding_##suffix; \
  39. static int msg_received_##suffix = 0; \
  40. static void recv_msg_##suffix(const msg_t *m) { \
  41. (void)m; \
  42. ++msg_received_##suffix; \
  43. } \
  44. EAT_SEMICOLON
  45. #define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags) \
  46. STMT_BEGIN { \
  47. con = pubsub_connector_for_subsystem(builder, \
  48. get_subsys_id(#subsys)); \
  49. pubsub_add_pub_(con, &pub_binding_##binding_suffix, \
  50. get_channel_id(#channel), \
  51. get_message_id(#msg), get_msg_type_id("string"), \
  52. (flags), __FILE__, __LINE__); \
  53. pubsub_connector_free(con); \
  54. } STMT_END
  55. #define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags) \
  56. STMT_BEGIN { \
  57. con = pubsub_connector_for_subsystem(builder, \
  58. get_subsys_id(#subsys)); \
  59. pubsub_add_sub_(con, recv_msg_##hook_suffix, \
  60. get_channel_id(#channel), \
  61. get_message_id(#msg), get_msg_type_id("string"), \
  62. (flags), __FILE__, __LINE__); \
  63. pubsub_connector_free(con); \
  64. } STMT_END
  65. #define SEND(binding_suffix, val) \
  66. STMT_BEGIN { \
  67. msg_aux_data_t data_; \
  68. data_.ptr = tor_strdup(val); \
  69. pubsub_pub_(&pub_binding_##binding_suffix, data_); \
  70. } STMT_END
  71. DECLARE_MESSAGE(msg1);
  72. DECLARE_MESSAGE(msg2);
  73. DECLARE_MESSAGE(msg3);
  74. DECLARE_MESSAGE(msg4);
  75. DECLARE_MESSAGE(msg5);
  76. static smartlist_t *strings_received = NULL;
  77. static void
  78. recv_msg_copy_string(const msg_t *m)
  79. {
  80. const char *s = m->aux_data__.ptr;
  81. smartlist_add(strings_received, tor_strdup(s));
  82. }
  83. static void *
  84. setup_dispatcher(const struct testcase_t *testcase)
  85. {
  86. (void)testcase;
  87. pubsub_builder_t *builder = pubsub_builder_new();
  88. pubsub_connector_t *con;
  89. {
  90. con = pubsub_connector_for_subsystem(builder, get_subsys_id("types"));
  91. pubsub_connector_register_type_(con,
  92. get_msg_type_id("string"),
  93. &stringfns,
  94. "nowhere.c", 99);
  95. pubsub_connector_free(con);
  96. }
  97. // message1 has one publisher and one subscriber.
  98. ADD_PUBLISH(msg1, sys1, main, message1, 0);
  99. ADD_SUBSCRIBE(msg1, sys2, main, message1, 0);
  100. // message2 has a publisher and a stub subscriber.
  101. ADD_PUBLISH(msg2, sys1, main, message2, 0);
  102. ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB);
  103. // message3 has a publisher and three subscribers.
  104. ADD_PUBLISH(msg3, sys1, main, message3, 0);
  105. ADD_SUBSCRIBE(msg3, sys2, main, message3, 0);
  106. ADD_SUBSCRIBE(msg3, sys3, main, message3, 0);
  107. ADD_SUBSCRIBE(msg3, sys4, main, message3, 0);
  108. // message4 has one publisher and two subscribers, but it's on another
  109. // channel.
  110. ADD_PUBLISH(msg4, sys2, other, message4, 0);
  111. ADD_SUBSCRIBE(msg4, sys1, other, message4, 0);
  112. ADD_SUBSCRIBE(msg4, sys3, other, message4, 0);
  113. // message5 has a huge number of recipients.
  114. ADD_PUBLISH(msg5, sys3, main, message5, 0);
  115. ADD_SUBSCRIBE(msg5, sys4, main, message5, 0);
  116. ADD_SUBSCRIBE(msg5, sys5, main, message5, 0);
  117. ADD_SUBSCRIBE(msg5, sys6, main, message5, 0);
  118. ADD_SUBSCRIBE(msg5, sys7, main, message5, 0);
  119. ADD_SUBSCRIBE(msg5, sys8, main, message5, 0);
  120. for (int i = 0; i < 1000-5; ++i) {
  121. char *sys;
  122. tor_asprintf(&sys, "xsys-%d", i);
  123. con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys));
  124. pubsub_add_sub_(con, recv_msg_copy_string,
  125. get_channel_id("main"),
  126. get_message_id("message5"),
  127. get_msg_type_id("string"), 0, "here", 100);
  128. pubsub_connector_free(con);
  129. tor_free(sys);
  130. }
  131. return pubsub_builder_finalize(builder, NULL);
  132. }
  133. static int
  134. cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_)
  135. {
  136. (void)testcase;
  137. dispatch_t *dispatcher = dispatcher_;
  138. dispatch_free(dispatcher);
  139. return 1;
  140. }
  141. static const struct testcase_setup_t dispatcher_setup = {
  142. setup_dispatcher, cleanup_dispatcher
  143. };
  144. static void
  145. test_pubsub_msg_minimal(void *arg)
  146. {
  147. dispatch_t *d = arg;
  148. tt_int_op(0, OP_EQ, msg_received_msg1);
  149. SEND(msg1, "hello world");
  150. tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
  151. tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
  152. tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message!
  153. done:
  154. ;
  155. }
  156. static void
  157. test_pubsub_msg_send_to_stub(void *arg)
  158. {
  159. dispatch_t *d = arg;
  160. tt_int_op(0, OP_EQ, msg_received_msg2);
  161. SEND(msg2, "hello silence");
  162. tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet.
  163. tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
  164. tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook.
  165. done:
  166. ;
  167. }
  168. static void
  169. test_pubsub_msg_cancel_msgs(void *arg)
  170. {
  171. dispatch_t *d = arg;
  172. tt_int_op(0, OP_EQ, msg_received_msg1);
  173. for (int i = 0; i < 100; ++i) {
  174. SEND(msg1, "hello world");
  175. }
  176. tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
  177. tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10));
  178. tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times.
  179. // At this point, the dispatcher will be freed with queued, undelivered
  180. // messages.
  181. done:
  182. ;
  183. }
  184. struct alertfn_target {
  185. dispatch_t *d;
  186. channel_id_t ch;
  187. int count;
  188. };
  189. static void
  190. alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg)
  191. {
  192. struct alertfn_target *t = arg;
  193. tt_ptr_op(d, OP_EQ, t->d);
  194. tt_int_op(ch, OP_EQ, t->ch);
  195. ++t->count;
  196. done:
  197. ;
  198. }
  199. static void
  200. test_pubsub_msg_alertfns(void *arg)
  201. {
  202. dispatch_t *d = arg;
  203. struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 };
  204. struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 };
  205. tt_int_op(0, OP_EQ,
  206. dispatch_set_alert_fn(d, get_channel_id("main"),
  207. alertfn_generic, &ch1_a));
  208. tt_int_op(0, OP_EQ,
  209. dispatch_set_alert_fn(d, get_channel_id("other"),
  210. alertfn_generic, &ch2_a));
  211. SEND(msg3, "hello");
  212. tt_int_op(ch1_a.count, OP_EQ, 1);
  213. SEND(msg3, "world");
  214. tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert
  215. tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other'
  216. SEND(msg4, "worse things happen in C");
  217. tt_int_op(ch2_a.count, OP_EQ, 1);
  218. // flush the first (main) channel...
  219. tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
  220. tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances.
  221. // now that the main channel is flushed, sending another message on it
  222. // starts another alert.
  223. tt_int_op(ch1_a.count, OP_EQ, 1);
  224. SEND(msg1, "plover");
  225. tt_int_op(ch1_a.count, OP_EQ, 2);
  226. tt_int_op(ch2_a.count, OP_EQ, 1);
  227. done:
  228. ;
  229. }
  230. /* try more than N_FAST_FNS hooks on msg5 */
  231. static void
  232. test_pubsub_msg_many_hooks(void *arg)
  233. {
  234. dispatch_t *d = arg;
  235. strings_received = smartlist_new();
  236. tt_int_op(0, OP_EQ, msg_received_msg5);
  237. SEND(msg5, "hello world");
  238. tt_int_op(0, OP_EQ, msg_received_msg5);
  239. tt_int_op(0, OP_EQ, smartlist_len(strings_received));
  240. tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000));
  241. tt_int_op(5, OP_EQ, msg_received_msg5);
  242. tt_int_op(995, OP_EQ, smartlist_len(strings_received));
  243. done:
  244. SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s));
  245. smartlist_free(strings_received);
  246. }
  247. #define T(name) \
  248. { #name, test_pubsub_msg_ ## name , TT_FORK, \
  249. &dispatcher_setup, NULL }
  250. struct testcase_t pubsub_msg_tests[] = {
  251. T(minimal),
  252. T(send_to_stub),
  253. T(cancel_msgs),
  254. T(alertfns),
  255. T(many_hooks),
  256. END_OF_TESTCASES
  257. };