123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 |
- /* Copyright (c) 2001, Matej Pfajfar.
- * Copyright (c) 2001-2004, Roger Dingledine.
- * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
- * Copyright (c) 2007-2018, The Tor Project, Inc. */
- /* See LICENSE for licensing information */
- /**
- * @file pubsub_build.c
- * @brief Construct a dispatch_t in safer, more OO way.
- **/
- #define PUBSUB_PRIVATE
- #include "lib/dispatch/dispatch.h"
- #include "lib/dispatch/dispatch_cfg.h"
- #include "lib/dispatch/dispatch_naming.h"
- #include "lib/dispatch/msgtypes.h"
- #include "lib/pubsub/pubsub_flags.h"
- #include "lib/pubsub/pub_binding_st.h"
- #include "lib/pubsub/pubsub_build.h"
- #include "lib/pubsub/pubsub_builder_st.h"
- #include "lib/pubsub/pubsub_connect.h"
- #include "lib/container/smartlist.h"
- #include "lib/log/util_bug.h"
- #include "lib/malloc/malloc.h"
- #include <string.h>
- /** Construct and return a new empty pubsub_items_t. */
- static pubsub_items_t *
- pubsub_items_new(void)
- {
- pubsub_items_t *cfg = tor_malloc_zero(sizeof(*cfg));
- cfg->items = smartlist_new();
- cfg->type_items = smartlist_new();
- return cfg;
- }
- /** Release all storage held in a pubsub_items_t. */
- void
- pubsub_items_free_(pubsub_items_t *cfg)
- {
- if (! cfg)
- return;
- SMARTLIST_FOREACH(cfg->items, pubsub_cfg_t *, item, tor_free(item));
- SMARTLIST_FOREACH(cfg->type_items,
- pubsub_type_cfg_t *, item, tor_free(item));
- smartlist_free(cfg->items);
- smartlist_free(cfg->type_items);
- tor_free(cfg);
- }
- /** Construct and return a new pubsub_builder_t. */
- pubsub_builder_t *
- pubsub_builder_new(void)
- {
- dispatch_naming_init();
- pubsub_builder_t *pb = tor_malloc_zero(sizeof(*pb));
- pb->cfg = dcfg_new();
- pb->items = pubsub_items_new();
- return pb;
- }
- /**
- * Release all storage held by a pubsub_builder_t.
- *
- * You'll (mostly) only want to call this function on an error case: if you're
- * constructing a dispatch_t instead, you should call
- * pubsub_builder_finalize() to consume the pubsub_builder_t.
- */
- void
- pubsub_builder_free_(pubsub_builder_t *pb)
- {
- if (pb == NULL)
- return;
- pubsub_items_free(pb->items);
- dcfg_free(pb->cfg);
- tor_free(pb);
- }
- /**
- * Create and return a pubsub_connector_t for the subsystem with ID
- * <b>subsys</b> to use in adding publications, subscriptions, and types to
- * <b>builder</b>.
- **/
- pubsub_connector_t *
- pubsub_connector_for_subsystem(pubsub_builder_t *builder,
- subsys_id_t subsys)
- {
- tor_assert(builder);
- ++builder->n_connectors;
- pubsub_connector_t *con = tor_malloc_zero(sizeof(*con));
- con->builder = builder;
- con->subsys_id = subsys;
- return con;
- }
- /**
- * Release all storage held by a pubsub_connector_t.
- **/
- void
- pubsub_connector_free_(pubsub_connector_t *con)
- {
- if (!con)
- return;
- if (con->builder) {
- --con->builder->n_connectors;
- tor_assert(con->builder->n_connectors >= 0);
- }
- tor_free(con);
- }
- /**
- * Use <b>con</b> to add a request for being able to publish messages of type
- * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>.
- **/
- int
- pubsub_add_pub_(pubsub_connector_t *con,
- pub_binding_t *out,
- channel_id_t channel,
- message_id_t msg,
- msg_type_id_t type,
- unsigned flags,
- const char *file,
- unsigned line)
- {
- pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
- memset(out, 0, sizeof(*out));
- cfg->is_publish = true;
- out->msg_template.sender = cfg->subsys = con->subsys_id;
- out->msg_template.channel = cfg->channel = channel;
- out->msg_template.msg = cfg->msg = msg;
- out->msg_template.type = cfg->type = type;
- cfg->flags = flags;
- cfg->added_by_file = file;
- cfg->added_by_line = line;
- /* We're grabbing a pointer to the pub_binding_t so we can tell it about
- * the dispatcher later on.
- */
- cfg->pub_binding = out;
- smartlist_add(con->builder->items->items, cfg);
- if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
- goto err;
- if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
- goto err;
- return 0;
- err:
- ++con->builder->n_errors;
- return -1;
- }
- /**
- * Use <b>con</b> to add a request for being able to publish messages of type
- * <b>msg</b> with auxiliary data of <b>type</b> on <b>channel</b>,
- * passing them to the callback in <b>recv_fn</b>.
- **/
- int
- pubsub_add_sub_(pubsub_connector_t *con,
- recv_fn_t recv_fn,
- channel_id_t channel,
- message_id_t msg,
- msg_type_id_t type,
- unsigned flags,
- const char *file,
- unsigned line)
- {
- pubsub_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
- cfg->is_publish = false;
- cfg->subsys = con->subsys_id;
- cfg->channel = channel;
- cfg->msg = msg;
- cfg->type = type;
- cfg->flags = flags;
- cfg->added_by_file = file;
- cfg->added_by_line = line;
- cfg->recv_fn = recv_fn;
- smartlist_add(con->builder->items->items, cfg);
- if (dcfg_msg_set_type(con->builder->cfg, msg, type) < 0)
- goto err;
- if (dcfg_msg_set_chan(con->builder->cfg, msg, channel) < 0)
- goto err;
- if (! (flags & DISP_FLAG_STUB)) {
- if (dcfg_add_recv(con->builder->cfg, msg, cfg->subsys, recv_fn) < 0)
- goto err;
- }
- return 0;
- err:
- ++con->builder->n_errors;
- return -1;
- }
- /**
- * Use <b>con</b> to define the functions to use for manipulating the type
- * <b>type</b>. Any function pointers left as NULL will be implemented as
- * no-ops.
- **/
- int
- pubsub_connector_register_type_(pubsub_connector_t *con,
- msg_type_id_t type,
- dispatch_typefns_t *fns,
- const char *file,
- unsigned line)
- {
- pubsub_type_cfg_t *cfg = tor_malloc_zero(sizeof(*cfg));
- cfg->type = type;
- memcpy(&cfg->fns, fns, sizeof(*fns));
- cfg->subsys = con->subsys_id;
- cfg->added_by_file = file;
- cfg->added_by_line = line;
- smartlist_add(con->builder->items->type_items, cfg);
- if (dcfg_type_set_fns(con->builder->cfg, type, fns) < 0)
- goto err;
- return 0;
- err:
- ++con->builder->n_errors;
- return -1;
- }
- /**
- * Initialize the dispatch_ptr field in every relevant publish binding
- * for <b>d</b>.
- */
- static void
- pubsub_items_install_bindings(pubsub_items_t *items,
- dispatch_t *d)
- {
- SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
- if (cfg->pub_binding) {
- // XXXX we could skip this for STUB publishers, and for any publishers
- // XXXX where all subscribers are STUB.
- cfg->pub_binding->dispatch_ptr = d;
- }
- } SMARTLIST_FOREACH_END(cfg);
- }
- /**
- * Remove the dispatch_ptr fields for all the relevant publish bindings
- * in <b>items</b>. The prevents subsequent dispatch_pub_() calls from
- * sending messages to a dispatcher that has been freed.
- **/
- void
- pubsub_items_clear_bindings(pubsub_items_t *items)
- {
- SMARTLIST_FOREACH_BEGIN(items->items, pubsub_cfg_t *, cfg) {
- if (cfg->pub_binding) {
- cfg->pub_binding->dispatch_ptr = NULL;
- }
- } SMARTLIST_FOREACH_END(cfg);
- }
- /**
- * Create a new dispatcher as configured in a pubsub_builder_t.
- *
- * Consumes and frees its input.
- **/
- dispatch_t *
- pubsub_builder_finalize(pubsub_builder_t *builder,
- pubsub_items_t **items_out)
- {
- dispatch_t *dispatcher = NULL;
- tor_assert_nonfatal(builder->n_connectors == 0);
- if (pubsub_builder_check(builder) < 0)
- goto err;
- if (builder->n_errors) {
- log_warn(LD_GENERAL, "At least one error occurred previously when "
- "configuring the dispatcher.");
- goto err;
- }
- dispatcher = dispatch_new(builder->cfg);
- if (!dispatcher)
- goto err;
- pubsub_items_install_bindings(builder->items, dispatcher);
- if (items_out) {
- *items_out = builder->items;
- builder->items = NULL; /* Prevent free */
- }
- err:
- pubsub_builder_free(builder);
- return dispatcher;
- }
|