events.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. /* Copyright (c) 2013-2019, The Tor Project, Inc. */
  2. /* See LICENSE for licensing information */
  3. #include "lib/evloop/events.h"
  4. #include "lib/log/util_bug.h"
  5. #include <event2/util.h>
  6. #include <event2/event.h>
  7. #include <string.h>
  8. /* How a subscribed listener wants to receive an event. */
  9. typedef struct event_subscription_t {
  10. event_listener_t *listener;
  11. } event_subscription_t;
  12. /* What a listener should do if it receives an event. */
  13. typedef struct event_callback_t {
  14. event_update_fn_t update_fn;
  15. process_event_fn_t process_event_fn;
  16. } event_callback_t;
  17. /**************************/
  18. static event_subscription_t *
  19. event_subscription_new(event_listener_t *listener)
  20. {
  21. tor_assert(listener != NULL);
  22. event_subscription_t *sub = tor_malloc_zero(sizeof(event_subscription_t));
  23. sub->listener = listener;
  24. return sub;
  25. }
  26. static void
  27. event_subscription_free(event_subscription_t *sub)
  28. {
  29. tor_assert(sub != NULL);
  30. memset(sub, 0x00, sizeof(*sub));
  31. tor_free(sub);
  32. }
  33. /**************************/
  34. static event_callback_t *
  35. event_callback_new(event_update_fn_t update_fn,
  36. process_event_fn_t process_event_fn)
  37. {
  38. tor_assert(process_event_fn != NULL);
  39. event_callback_t *cb = tor_malloc_zero(sizeof(event_callback_t));
  40. cb->update_fn = update_fn;
  41. cb->process_event_fn = process_event_fn;
  42. return cb;
  43. }
  44. static void
  45. event_callback_free(event_callback_t *cb)
  46. {
  47. tor_assert(cb != NULL);
  48. memset(cb, 0x00, sizeof(*cb));
  49. tor_free(cb);
  50. }
  51. /**************************/
  52. static event_wrapper_t *
  53. event_wrapper_new(event_label_t label,
  54. event_data_t data,
  55. void (*free_data_fn)(void *))
  56. {
  57. event_wrapper_t *wrapper = tor_malloc_zero(sizeof(event_wrapper_t));
  58. wrapper->label = label;
  59. wrapper->data = data;
  60. wrapper->free_data_fn = free_data_fn;
  61. return wrapper;
  62. }
  63. static void
  64. event_wrapper_free(event_wrapper_t *wrapper)
  65. {
  66. tor_assert(wrapper != NULL);
  67. if (wrapper->free_data_fn != NULL) {
  68. wrapper->free_data_fn(wrapper->data.ptr);
  69. }
  70. memset(wrapper, 0x00, sizeof(*wrapper));
  71. tor_free(wrapper);
  72. }
  73. /**************************/
  74. event_registry_t *
  75. event_registry_new(void)
  76. {
  77. event_registry_t* registry = tor_malloc_zero(sizeof(event_registry_t));
  78. tor_mutex_init(&registry->lock);
  79. registry->events = smartlist_new();
  80. return registry;
  81. }
  82. void
  83. event_registry_free(event_registry_t *registry)
  84. {
  85. tor_assert(registry != NULL);
  86. tor_mutex_uninit(&registry->lock);
  87. SMARTLIST_FOREACH_BEGIN(registry->events, char *, help_label) {
  88. if (help_label != NULL) {
  89. tor_free(help_label);
  90. }
  91. } SMARTLIST_FOREACH_END(help_label);
  92. smartlist_free(registry->events);
  93. memset(registry, 0x00, sizeof(*registry));
  94. tor_free(registry);
  95. }
  96. event_label_t
  97. event_registry_register_event(event_registry_t *registry,
  98. const char *help_label)
  99. {
  100. tor_assert(registry != NULL);
  101. tor_mutex_acquire(&registry->lock);
  102. int num_events = smartlist_len(registry->events);
  103. if (help_label) {
  104. smartlist_add_strdup(registry->events, help_label);
  105. } else {
  106. smartlist_add(registry->events, NULL);
  107. }
  108. tor_mutex_release(&registry->lock);
  109. return (event_label_t)num_events;
  110. }
  111. const char *
  112. event_registry_get_help_label(event_registry_t *registry,
  113. event_label_t event_label)
  114. {
  115. tor_assert(registry != NULL);
  116. tor_mutex_acquire(&registry->lock);
  117. int label_index = (int)event_label;
  118. tor_assert(label_index >= 0);
  119. const char *help_label = smartlist_get(registry->events,
  120. label_index);
  121. tor_mutex_release(&registry->lock);
  122. return help_label;
  123. }
  124. /**************************/
  125. static void
  126. event_listener_eventloop_cb(evutil_socket_t sock, short events, void *arg)
  127. {
  128. event_listener_t *listener = arg;
  129. (void) sock;
  130. (void) events;
  131. event_listener_process(listener);
  132. }
  133. event_listener_t *
  134. event_listener_new(void *context)
  135. {
  136. event_listener_t* listener = tor_malloc_zero(sizeof(event_listener_t));
  137. tor_mutex_init(&listener->lock);
  138. listener->is_pending = false;
  139. listener->callbacks = smartlist_new();
  140. TOR_TAILQ_INIT(&listener->pending_events);
  141. listener->context = context;
  142. listener->eventloop_ev = NULL;
  143. return listener;
  144. }
  145. void
  146. event_listener_free(event_listener_t *listener)
  147. {
  148. tor_assert(listener != NULL);
  149. tor_mutex_acquire(&listener->lock);
  150. if (listener->eventloop_ev != NULL) {
  151. event_listener_detach(listener);
  152. // this will make sure the libevent callback has stopped
  153. }
  154. while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
  155. event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
  156. TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
  157. event_wrapper_free(wrapper);
  158. }
  159. SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
  160. if (cb != NULL) {
  161. event_callback_free(cb);
  162. }
  163. } SMARTLIST_FOREACH_END(cb);
  164. smartlist_free(listener->callbacks);
  165. listener->context = NULL;
  166. listener->is_pending = false;
  167. tor_mutex_release(&listener->lock);
  168. tor_mutex_uninit(&listener->lock);
  169. memset(listener, 0x00, sizeof(*listener));
  170. tor_free(listener);
  171. }
  172. void
  173. event_listener_attach(event_listener_t *listener, struct event_base *base)
  174. {
  175. tor_assert(listener != NULL);
  176. tor_assert(base != NULL);
  177. tor_mutex_acquire(&listener->lock);
  178. tor_assert(listener->eventloop_ev == NULL);
  179. listener->eventloop_ev = tor_event_new(base, -1,
  180. EV_READ|EV_PERSIST, // TODO: do we need persist?
  181. event_listener_eventloop_cb,
  182. listener);
  183. if (listener->is_pending) {
  184. event_active(listener->eventloop_ev, EV_READ, 1);
  185. }
  186. tor_mutex_release(&listener->lock);
  187. }
  188. void
  189. event_listener_detach(event_listener_t *listener)
  190. {
  191. tor_assert(listener != NULL);
  192. tor_mutex_acquire(&listener->lock);
  193. if (listener->eventloop_ev != NULL) {
  194. tor_event_free(listener->eventloop_ev);
  195. listener->eventloop_ev = NULL;
  196. }
  197. tor_mutex_release(&listener->lock);
  198. }
  199. void
  200. event_listener_set_callback(event_listener_t *listener, event_label_t label,
  201. event_update_fn_t update_fn,
  202. process_event_fn_t process_event_fn)
  203. {
  204. tor_assert(listener != NULL);
  205. tor_assert(label != EVENT_LABEL_UNSET);
  206. tor_assert(process_event_fn != NULL);
  207. int index = (int)label;
  208. tor_assert(index >= 0);
  209. event_callback_t *cb = event_callback_new(update_fn, process_event_fn);
  210. if (index >= 1000) {
  211. log_warn(LD_BUG, "An event label was very large (%d), but the event "
  212. "listener assumes that event labels are small.", index);
  213. /* We're using a smartlist as a lookup table, and assume that the labels are
  214. small and therefore the list should not be sparse. If the label is large,
  215. then we either have *many* events, or we're choosing our event labels
  216. inefficiently. */
  217. }
  218. tor_mutex_acquire(&listener->lock);
  219. smartlist_grow(listener->callbacks, index+1);
  220. event_callback_t *existing_cb = smartlist_get(listener->callbacks, index);
  221. if (existing_cb != NULL) {
  222. // we only support one callback per event type
  223. event_callback_free(existing_cb);
  224. log_warn(LD_BUG, "We are overriding a previous callback.");
  225. }
  226. smartlist_set(listener->callbacks, index, cb);
  227. tor_mutex_release(&listener->lock);
  228. }
  229. static void
  230. event_listener_receive(event_listener_t *listener, event_label_t label,
  231. event_wrapper_t *wrapper)
  232. {
  233. tor_assert(listener != NULL);
  234. tor_assert(label != EVENT_LABEL_UNSET);
  235. int index = (int)label;
  236. tor_assert(index >= 0);
  237. tor_mutex_acquire(&listener->lock);
  238. if (index >= smartlist_len(listener->callbacks)) {
  239. log_warn(LD_BUG, "We don't have a callback for this event");
  240. if (wrapper != NULL) {
  241. event_wrapper_free(wrapper);
  242. }
  243. tor_mutex_release(&listener->lock);
  244. return;
  245. }
  246. event_callback_t *cb = smartlist_get(listener->callbacks, index);
  247. if (cb == NULL) {
  248. log_warn(LD_BUG, "We don't have a callback for this event");
  249. if (wrapper != NULL) {
  250. event_wrapper_free(wrapper);
  251. }
  252. tor_mutex_release(&listener->lock);
  253. return;
  254. }
  255. event_wrapper_t *last = TOR_TAILQ_LAST(&listener->pending_events,
  256. pending_events_head_t);
  257. if (cb->update_fn != NULL && last != NULL && last->label == label) {
  258. // the last added event was of the same type and we set an update function,
  259. // so we should update the last event rather than adding a new one
  260. cb->update_fn(label, &last->data, &wrapper->data);
  261. if (wrapper != NULL) {
  262. event_wrapper_free(wrapper);
  263. }
  264. } else {
  265. tor_assert(wrapper != NULL);
  266. TOR_TAILQ_INSERT_TAIL(&listener->pending_events, wrapper, next_event);
  267. }
  268. if (!listener->is_pending) {
  269. listener->is_pending = true;
  270. if (listener->eventloop_ev != NULL) {
  271. event_active(listener->eventloop_ev, EV_READ, 1);
  272. }
  273. }
  274. tor_mutex_release(&listener->lock);
  275. }
  276. void
  277. event_listener_process(event_listener_t *listener)
  278. {
  279. tor_assert(listener != NULL);
  280. tor_mutex_acquire(&listener->lock);
  281. void *context = listener->context;
  282. while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
  283. event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
  284. TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
  285. tor_assert(wrapper != NULL);
  286. process_event_fn_t process_event_fn = NULL;
  287. int index = (int)wrapper->label;
  288. // do we have a callback for this event label?
  289. if (PREDICT_LIKELY(index < smartlist_len(listener->callbacks))) {
  290. event_callback_t *cb = smartlist_get(listener->callbacks, index);
  291. if (cb != NULL) {
  292. process_event_fn = cb->process_event_fn;
  293. }
  294. }
  295. tor_mutex_release(&listener->lock);
  296. if (PREDICT_LIKELY(process_event_fn != NULL)) {
  297. process_event_fn(wrapper->label, wrapper->data, context);
  298. } else {
  299. // no callback available
  300. log_warn(LD_BUG, "An event was received but had no callback");
  301. }
  302. event_wrapper_free(wrapper);
  303. tor_mutex_acquire(&listener->lock);
  304. }
  305. listener->is_pending = false;
  306. tor_mutex_release(&listener->lock);
  307. }
  308. /**************************/
  309. event_source_t *
  310. event_source_new(void)
  311. {
  312. event_source_t* source = tor_malloc_zero(sizeof(event_source_t));
  313. tor_mutex_init(&source->lock);
  314. source->subscriptions = smartlist_new();
  315. return source;
  316. }
  317. void
  318. event_source_free(event_source_t *source)
  319. {
  320. tor_assert(source != NULL);
  321. tor_mutex_uninit(&source->lock);
  322. SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
  323. if (sub != NULL) {
  324. event_subscription_free(sub);
  325. }
  326. } SMARTLIST_FOREACH_END(sub);
  327. smartlist_free(source->subscriptions);
  328. memset(source, 0x00, sizeof(*source));
  329. tor_free(source);
  330. }
  331. void
  332. event_source_subscribe(event_source_t *source, event_listener_t *listener,
  333. event_label_t label)
  334. {
  335. tor_assert(source != NULL);
  336. tor_assert(listener != NULL);
  337. tor_assert(label != EVENT_LABEL_UNSET);
  338. int index = (int)label;
  339. tor_assert(index >= 0);
  340. if (index >= 1000) {
  341. log_warn(LD_BUG, "An event label was very large (%d), but the event source "
  342. "assumes that event labels are small.", index);
  343. /* We're using a smartlist as a lookup table, and assume that the labels are
  344. small and therefore the list should not be sparse. If the label is large,
  345. then we either have *many* events, or we're choosing our event labels
  346. inefficiently. */
  347. }
  348. event_subscription_t *sub = event_subscription_new(listener);
  349. tor_mutex_acquire(&source->lock);
  350. smartlist_grow(source->subscriptions, index+1);
  351. event_subscription_t *existing_sub = smartlist_get(source->subscriptions, index);
  352. if (existing_sub != NULL) {
  353. // we only support one listener per event type
  354. event_subscription_free(existing_sub);
  355. log_warn(LD_BUG, "We are overriding a previous listener.");
  356. }
  357. smartlist_set(source->subscriptions, index, sub);
  358. tor_mutex_release(&source->lock);
  359. }
  360. void
  361. event_source_unsubscribe(event_source_t *source, event_listener_t *listener,
  362. event_label_t label)
  363. {
  364. tor_assert(source != NULL);
  365. tor_assert(listener != NULL);
  366. tor_assert(label != EVENT_LABEL_UNSET);
  367. int index = (int)label;
  368. tor_assert(index >= 0);
  369. tor_mutex_acquire(&source->lock);
  370. if (index >= smartlist_len(source->subscriptions)) {
  371. // there are no subscribers for this event
  372. log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
  373. tor_mutex_release(&source->lock);
  374. return;
  375. }
  376. event_subscription_t *current_sub = smartlist_get(source->subscriptions, index);
  377. if (current_sub == NULL || current_sub->listener != listener) {
  378. log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
  379. tor_mutex_release(&source->lock);
  380. return;
  381. }
  382. smartlist_set(source->subscriptions, index, NULL);
  383. event_subscription_free(current_sub);
  384. tor_mutex_release(&source->lock);
  385. }
  386. void
  387. event_source_unsubscribe_all(event_source_t *source, event_listener_t *listener)
  388. {
  389. tor_assert(source != NULL);
  390. tor_assert(listener != NULL);
  391. tor_mutex_acquire(&source->lock);
  392. SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
  393. if (sub != NULL && sub->listener == listener) {
  394. event_subscription_free(sub);
  395. SMARTLIST_REPLACE_CURRENT(source->subscriptions, sub, NULL);
  396. }
  397. } SMARTLIST_FOREACH_END(sub);
  398. tor_mutex_release(&source->lock);
  399. }
  400. void
  401. event_source_publish(event_source_t *source, event_label_t label,
  402. event_data_t data, void (*free_data_fn)(void *))
  403. {
  404. tor_assert(source != NULL);
  405. tor_assert(label != EVENT_LABEL_UNSET);
  406. int index = (int)label;
  407. tor_assert(index >= 0);
  408. tor_mutex_acquire(&source->lock);
  409. if (index >= smartlist_len(source->subscriptions)) {
  410. // there are no subscribers for this event
  411. tor_mutex_release(&source->lock);
  412. if (free_data_fn != NULL) {
  413. free_data_fn(data.ptr);
  414. }
  415. return;
  416. }
  417. event_subscription_t *sub = smartlist_get(source->subscriptions, index);
  418. if (sub == NULL || sub->listener == NULL) {
  419. // there are no subscribers for this event
  420. tor_mutex_release(&source->lock);
  421. if (free_data_fn != NULL) {
  422. free_data_fn(data.ptr);
  423. }
  424. return;
  425. }
  426. event_wrapper_t *wrapper = NULL;
  427. wrapper = event_wrapper_new(label, data, free_data_fn);
  428. event_listener_receive(sub->listener, label, wrapper);
  429. tor_mutex_release(&source->lock);
  430. }