123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 |
- /* Copyright (c) 2018, The Tor Project, Inc. */
- /* See LICENSE for licensing information */
- #define DISPATCH_PRIVATE
- #include "test/test.h"
- #include "lib/dispatch/dispatch.h"
- #include "lib/dispatch/dispatch_naming.h"
- #include "lib/dispatch/dispatch_st.h"
- #include "lib/dispatch/msgtypes.h"
- #include "lib/pubsub/pubsub_flags.h"
- #include "lib/pubsub/pub_binding_st.h"
- #include "lib/pubsub/pubsub_build.h"
- #include "lib/pubsub/pubsub_builder_st.h"
- #include "lib/pubsub/pubsub_connect.h"
- #include "lib/pubsub/pubsub_publish.h"
- #include "lib/log/escape.h"
- #include "lib/malloc/malloc.h"
- #include "lib/string/printf.h"
- #include <stdio.h>
- #include <string.h>
- static char *
- ex_str_fmt(msg_aux_data_t aux)
- {
- return esc_for_log(aux.ptr);
- }
- static void
- ex_str_free(msg_aux_data_t aux)
- {
- tor_free_(aux.ptr);
- }
- static dispatch_typefns_t stringfns = {
- .free_fn = ex_str_free,
- .fmt_fn = ex_str_fmt
- };
- // We're using the lowest-level publish/subscribe logic here, to avoid the
- // pubsub_macros.h macros and just test the dispatch core. We'll use a string
- // type for everything.
- #define DECLARE_MESSAGE(suffix) \
- static pub_binding_t pub_binding_##suffix; \
- static int msg_received_##suffix = 0; \
- static void recv_msg_##suffix(const msg_t *m) { \
- (void)m; \
- ++msg_received_##suffix; \
- } \
- EAT_SEMICOLON
- #define ADD_PUBLISH(binding_suffix, subsys, channel, msg, flags) \
- STMT_BEGIN { \
- con = pubsub_connector_for_subsystem(builder, \
- get_subsys_id(#subsys)); \
- pubsub_add_pub_(con, &pub_binding_##binding_suffix, \
- get_channel_id(#channel), \
- get_message_id(#msg), get_msg_type_id("string"), \
- (flags), __FILE__, __LINE__); \
- pubsub_connector_free(con); \
- } STMT_END
- #define ADD_SUBSCRIBE(hook_suffix, subsys, channel, msg, flags) \
- STMT_BEGIN { \
- con = pubsub_connector_for_subsystem(builder, \
- get_subsys_id(#subsys)); \
- pubsub_add_sub_(con, recv_msg_##hook_suffix, \
- get_channel_id(#channel), \
- get_message_id(#msg), get_msg_type_id("string"), \
- (flags), __FILE__, __LINE__); \
- pubsub_connector_free(con); \
- } STMT_END
- #define SEND(binding_suffix, val) \
- STMT_BEGIN { \
- msg_aux_data_t data_; \
- data_.ptr = tor_strdup(val); \
- pubsub_pub_(&pub_binding_##binding_suffix, data_); \
- } STMT_END
- DECLARE_MESSAGE(msg1);
- DECLARE_MESSAGE(msg2);
- DECLARE_MESSAGE(msg3);
- DECLARE_MESSAGE(msg4);
- DECLARE_MESSAGE(msg5);
- static smartlist_t *strings_received = NULL;
- static void
- recv_msg_copy_string(const msg_t *m)
- {
- const char *s = m->aux_data__.ptr;
- smartlist_add(strings_received, tor_strdup(s));
- }
- static void *
- setup_dispatcher(const struct testcase_t *testcase)
- {
- (void)testcase;
- pubsub_builder_t *builder = pubsub_builder_new();
- pubsub_connector_t *con;
- {
- con = pubsub_connector_for_subsystem(builder, get_subsys_id("types"));
- pubsub_connector_register_type_(con,
- get_msg_type_id("string"),
- &stringfns,
- "nowhere.c", 99);
- pubsub_connector_free(con);
- }
- // message1 has one publisher and one subscriber.
- ADD_PUBLISH(msg1, sys1, main, message1, 0);
- ADD_SUBSCRIBE(msg1, sys2, main, message1, 0);
- // message2 has a publisher and a stub subscriber.
- ADD_PUBLISH(msg2, sys1, main, message2, 0);
- ADD_SUBSCRIBE(msg2, sys2, main, message2, DISP_FLAG_STUB);
- // message3 has a publisher and three subscribers.
- ADD_PUBLISH(msg3, sys1, main, message3, 0);
- ADD_SUBSCRIBE(msg3, sys2, main, message3, 0);
- ADD_SUBSCRIBE(msg3, sys3, main, message3, 0);
- ADD_SUBSCRIBE(msg3, sys4, main, message3, 0);
- // message4 has one publisher and two subscribers, but it's on another
- // channel.
- ADD_PUBLISH(msg4, sys2, other, message4, 0);
- ADD_SUBSCRIBE(msg4, sys1, other, message4, 0);
- ADD_SUBSCRIBE(msg4, sys3, other, message4, 0);
- // message5 has a huge number of recipients.
- ADD_PUBLISH(msg5, sys3, main, message5, 0);
- ADD_SUBSCRIBE(msg5, sys4, main, message5, 0);
- ADD_SUBSCRIBE(msg5, sys5, main, message5, 0);
- ADD_SUBSCRIBE(msg5, sys6, main, message5, 0);
- ADD_SUBSCRIBE(msg5, sys7, main, message5, 0);
- ADD_SUBSCRIBE(msg5, sys8, main, message5, 0);
- for (int i = 0; i < 1000-5; ++i) {
- char *sys;
- tor_asprintf(&sys, "xsys-%d", i);
- con = pubsub_connector_for_subsystem(builder, get_subsys_id(sys));
- pubsub_add_sub_(con, recv_msg_copy_string,
- get_channel_id("main"),
- get_message_id("message5"),
- get_msg_type_id("string"), 0, "here", 100);
- pubsub_connector_free(con);
- tor_free(sys);
- }
- return pubsub_builder_finalize(builder, NULL);
- }
- static int
- cleanup_dispatcher(const struct testcase_t *testcase, void *dispatcher_)
- {
- (void)testcase;
- dispatch_t *dispatcher = dispatcher_;
- dispatch_free(dispatcher);
- return 1;
- }
- static const struct testcase_setup_t dispatcher_setup = {
- setup_dispatcher, cleanup_dispatcher
- };
- static void
- test_pubsub_msg_minimal(void *arg)
- {
- dispatch_t *d = arg;
- tt_int_op(0, OP_EQ, msg_received_msg1);
- SEND(msg1, "hello world");
- tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
- tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
- tt_int_op(1, OP_EQ, msg_received_msg1); // we got the message!
- done:
- ;
- }
- static void
- test_pubsub_msg_send_to_stub(void *arg)
- {
- dispatch_t *d = arg;
- tt_int_op(0, OP_EQ, msg_received_msg2);
- SEND(msg2, "hello silence");
- tt_int_op(0, OP_EQ, msg_received_msg2); // hasn't actually arrived yet.
- tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
- tt_int_op(0, OP_EQ, msg_received_msg2); // doesn't arrive -- stub hook.
- done:
- ;
- }
- static void
- test_pubsub_msg_cancel_msgs(void *arg)
- {
- dispatch_t *d = arg;
- tt_int_op(0, OP_EQ, msg_received_msg1);
- for (int i = 0; i < 100; ++i) {
- SEND(msg1, "hello world");
- }
- tt_int_op(0, OP_EQ, msg_received_msg1); // hasn't actually arrived yet.
- tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 10));
- tt_int_op(10, OP_EQ, msg_received_msg1); // we got the message 10 times.
- // At this point, the dispatcher will be freed with queued, undelivered
- // messages.
- done:
- ;
- }
- struct alertfn_target {
- dispatch_t *d;
- channel_id_t ch;
- int count;
- };
- static void
- alertfn_generic(dispatch_t *d, channel_id_t ch, void *arg)
- {
- struct alertfn_target *t = arg;
- tt_ptr_op(d, OP_EQ, t->d);
- tt_int_op(ch, OP_EQ, t->ch);
- ++t->count;
- done:
- ;
- }
- static void
- test_pubsub_msg_alertfns(void *arg)
- {
- dispatch_t *d = arg;
- struct alertfn_target ch1_a = { d, get_channel_id("main"), 0 };
- struct alertfn_target ch2_a = { d, get_channel_id("other"), 0 };
- tt_int_op(0, OP_EQ,
- dispatch_set_alert_fn(d, get_channel_id("main"),
- alertfn_generic, &ch1_a));
- tt_int_op(0, OP_EQ,
- dispatch_set_alert_fn(d, get_channel_id("other"),
- alertfn_generic, &ch2_a));
- SEND(msg3, "hello");
- tt_int_op(ch1_a.count, OP_EQ, 1);
- SEND(msg3, "world");
- tt_int_op(ch1_a.count, OP_EQ, 1); // only the first message sends an alert
- tt_int_op(ch2_a.count, OP_EQ, 0); // no alert for 'other'
- SEND(msg4, "worse things happen in C");
- tt_int_op(ch2_a.count, OP_EQ, 1);
- // flush the first (main) channel...
- tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 1000));
- tt_int_op(6, OP_EQ, msg_received_msg3); // 3 subscribers, 2 instances.
- // now that the main channel is flushed, sending another message on it
- // starts another alert.
- tt_int_op(ch1_a.count, OP_EQ, 1);
- SEND(msg1, "plover");
- tt_int_op(ch1_a.count, OP_EQ, 2);
- tt_int_op(ch2_a.count, OP_EQ, 1);
- done:
- ;
- }
- /* try more than N_FAST_FNS hooks on msg5 */
- static void
- test_pubsub_msg_many_hooks(void *arg)
- {
- dispatch_t *d = arg;
- strings_received = smartlist_new();
- tt_int_op(0, OP_EQ, msg_received_msg5);
- SEND(msg5, "hello world");
- tt_int_op(0, OP_EQ, msg_received_msg5);
- tt_int_op(0, OP_EQ, smartlist_len(strings_received));
- tt_int_op(0, OP_EQ, dispatch_flush(d, get_channel_id("main"), 100000));
- tt_int_op(5, OP_EQ, msg_received_msg5);
- tt_int_op(995, OP_EQ, smartlist_len(strings_received));
- done:
- SMARTLIST_FOREACH(strings_received, char *, s, tor_free(s));
- smartlist_free(strings_received);
- }
- #define T(name) \
- { #name, test_pubsub_msg_ ## name , TT_FORK, \
- &dispatcher_setup, NULL }
- struct testcase_t pubsub_msg_tests[] = {
- T(minimal),
- T(send_to_stub),
- T(cancel_msgs),
- T(alertfns),
- T(many_hooks),
- END_OF_TESTCASES
- };
|