events.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624
  1. /* Copyright (c) 2013-2019, The Tor Project, Inc. */
  2. /* See LICENSE for licensing information */
  3. #include "lib/evloop/events.h"
  4. #include "lib/log/util_bug.h"
  5. #include <event2/util.h>
  6. #include <event2/event.h>
  7. #include <string.h>
  8. /* How a subscribed listener wants to receive an event. */
  9. typedef struct event_subscription_t {
  10. event_listener_t *listener;
  11. } event_subscription_t;
  12. /* What a listener should do if it receives an event. */
  13. typedef struct event_callback_t {
  14. event_update_fn_t update_fn;
  15. process_event_fn_t process_event_fn;
  16. } event_callback_t;
  17. /**************************/
  18. static event_subscription_t *
  19. event_subscription_new(event_listener_t *listener)
  20. {
  21. tor_assert(listener != NULL);
  22. event_subscription_t *sub = tor_malloc_zero(sizeof(event_subscription_t));
  23. sub->listener = listener;
  24. return sub;
  25. }
  26. static void
  27. event_subscription_free(event_subscription_t *sub)
  28. {
  29. tor_assert(sub != NULL);
  30. memset(sub, 0x00, sizeof(*sub));
  31. tor_free(sub);
  32. }
  33. /**************************/
  34. static event_callback_t *
  35. event_callback_new(event_update_fn_t update_fn,
  36. process_event_fn_t process_event_fn)
  37. {
  38. tor_assert(process_event_fn != NULL);
  39. event_callback_t *cb = tor_malloc_zero(sizeof(event_callback_t));
  40. cb->update_fn = update_fn;
  41. cb->process_event_fn = process_event_fn;
  42. return cb;
  43. }
  44. static void
  45. event_callback_free(event_callback_t *cb)
  46. {
  47. tor_assert(cb != NULL);
  48. memset(cb, 0x00, sizeof(*cb));
  49. tor_free(cb);
  50. }
  51. /**************************/
  52. static event_wrapper_t *
  53. event_wrapper_new(event_label_t label,
  54. event_data_t data,
  55. void (*free_data_fn)(void *))
  56. {
  57. event_wrapper_t *wrapper = tor_malloc_zero(sizeof(event_wrapper_t));
  58. wrapper->label = label;
  59. wrapper->data = data;
  60. wrapper->free_data_fn = free_data_fn;
  61. return wrapper;
  62. }
  63. static void
  64. event_wrapper_free(event_wrapper_t *wrapper)
  65. {
  66. tor_assert(wrapper != NULL);
  67. if (wrapper->free_data_fn != NULL) {
  68. wrapper->free_data_fn(wrapper->data.ptr);
  69. }
  70. memset(wrapper, 0x00, sizeof(*wrapper));
  71. tor_free(wrapper);
  72. }
  73. /**************************/
  74. event_registry_t *
  75. event_registry_new(void)
  76. {
  77. event_registry_t* registry = tor_malloc_zero(sizeof(event_registry_t));
  78. tor_mutex_init(&registry->lock);
  79. registry->events = smartlist_new();
  80. return registry;
  81. }
  82. void
  83. event_registry_free(event_registry_t *registry)
  84. {
  85. tor_assert(registry != NULL);
  86. tor_mutex_uninit(&registry->lock);
  87. SMARTLIST_FOREACH_BEGIN(registry->events, char *, help_label) {
  88. if (help_label != NULL) {
  89. tor_free(help_label);
  90. }
  91. } SMARTLIST_FOREACH_END(help_label);
  92. smartlist_free(registry->events);
  93. memset(registry, 0x00, sizeof(*registry));
  94. tor_free(registry);
  95. }
  96. event_label_t
  97. event_registry_register_event(event_registry_t *registry,
  98. const char *help_label)
  99. {
  100. tor_assert(registry != NULL);
  101. tor_mutex_acquire(&registry->lock);
  102. int num_events = smartlist_len(registry->events);
  103. if (help_label) {
  104. smartlist_add_strdup(registry->events, help_label);
  105. } else {
  106. smartlist_add(registry->events, NULL);
  107. }
  108. tor_mutex_release(&registry->lock);
  109. return (event_label_t)num_events;
  110. }
  111. const char *
  112. event_registry_get_help_label(event_registry_t *registry,
  113. event_label_t event_label)
  114. {
  115. tor_assert(registry != NULL);
  116. tor_mutex_acquire(&registry->lock);
  117. int label_index = (int)event_label;
  118. tor_assert(label_index >= 0);
  119. const char *help_label = smartlist_get(registry->events,
  120. label_index);
  121. tor_mutex_release(&registry->lock);
  122. return help_label;
  123. }
  124. /**************************/
  125. static void
  126. event_listener_eventloop_cb(evutil_socket_t sock, short events, void *arg)
  127. {
  128. event_listener_t *listener = arg;
  129. (void) sock;
  130. (void) events;
  131. event_listener_process(listener);
  132. }
  133. event_listener_t *
  134. event_listener_new(void *context)
  135. {
  136. event_listener_t* listener = tor_malloc_zero(sizeof(event_listener_t));
  137. tor_mutex_init(&listener->lock);
  138. listener->is_pending = false;
  139. listener->callbacks = smartlist_new();
  140. TOR_TAILQ_INIT(&listener->pending_events);
  141. listener->context = context;
  142. listener->eventloop_ev = NULL;
  143. return listener;
  144. }
  145. void
  146. event_listener_free(event_listener_t *listener)
  147. {
  148. tor_assert(listener != NULL);
  149. tor_mutex_acquire(&listener->lock);
  150. if (listener->eventloop_ev != NULL) {
  151. event_listener_detach(listener);
  152. // this will make sure the libevent callback has stopped
  153. }
  154. while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
  155. event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
  156. TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
  157. event_wrapper_free(wrapper);
  158. }
  159. SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
  160. if (cb != NULL) {
  161. event_callback_free(cb);
  162. }
  163. } SMARTLIST_FOREACH_END(cb);
  164. smartlist_free(listener->callbacks);
  165. listener->context = NULL;
  166. listener->is_pending = false;
  167. tor_mutex_release(&listener->lock);
  168. tor_mutex_uninit(&listener->lock);
  169. memset(listener, 0x00, sizeof(*listener));
  170. tor_free(listener);
  171. }
  172. void
  173. event_listener_attach(event_listener_t *listener, struct event_base *base)
  174. {
  175. tor_assert(listener != NULL);
  176. tor_assert(base != NULL);
  177. tor_mutex_acquire(&listener->lock);
  178. tor_assert(listener->eventloop_ev == NULL);
  179. listener->eventloop_ev = tor_event_new(base, -1,
  180. EV_READ|EV_PERSIST, // TODO: do we need persist?
  181. event_listener_eventloop_cb,
  182. listener);
  183. if (listener->is_pending) {
  184. event_active(listener->eventloop_ev, EV_READ, 1);
  185. }
  186. tor_mutex_release(&listener->lock);
  187. }
  188. void
  189. event_listener_detach(event_listener_t *listener)
  190. {
  191. tor_assert(listener != NULL);
  192. tor_mutex_acquire(&listener->lock);
  193. if (listener->eventloop_ev != NULL) {
  194. tor_event_free(listener->eventloop_ev);
  195. listener->eventloop_ev = NULL;
  196. }
  197. tor_mutex_release(&listener->lock);
  198. }
  199. void
  200. event_listener_set_callback(event_listener_t *listener, event_label_t label,
  201. event_update_fn_t update_fn,
  202. process_event_fn_t process_event_fn)
  203. {
  204. tor_assert(listener != NULL);
  205. tor_assert(label != EVENT_LABEL_UNSET);
  206. tor_assert(process_event_fn != NULL);
  207. int index = (int)label;
  208. tor_assert(index >= 0);
  209. event_callback_t *cb = event_callback_new(update_fn, process_event_fn);
  210. if (index >= 1000) {
  211. log_warn(LD_BUG, "An event label was very large (%d), but the event "
  212. "listener assumes that event labels are small.", index);
  213. /* We're using a smartlist as a lookup table, and assume that the labels are
  214. small and therefore the list should not be sparse. If the label is large,
  215. then we either have *many* events, or we're choosing our event labels
  216. inefficiently. */
  217. }
  218. tor_mutex_acquire(&listener->lock);
  219. smartlist_grow(listener->callbacks, index+1);
  220. event_callback_t *existing_cb = smartlist_get(listener->callbacks, index);
  221. if (existing_cb != NULL) {
  222. // we only support one callback per event type
  223. event_callback_free(existing_cb);
  224. log_warn(LD_BUG, "We are overriding a previous callback.");
  225. }
  226. smartlist_set(listener->callbacks, index, cb);
  227. tor_mutex_release(&listener->lock);
  228. }
  229. static void
  230. event_listener_receive(event_listener_t *listener, event_label_t label,
  231. event_wrapper_t *wrapper, bool notify)
  232. {
  233. tor_assert(listener != NULL);
  234. tor_assert(label != EVENT_LABEL_UNSET);
  235. int index = (int)label;
  236. tor_assert(index >= 0);
  237. tor_mutex_acquire(&listener->lock);
  238. if (index >= smartlist_len(listener->callbacks)) {
  239. log_warn(LD_BUG, "We don't have a callback for this event");
  240. if (wrapper != NULL) {
  241. event_wrapper_free(wrapper);
  242. }
  243. tor_mutex_release(&listener->lock);
  244. return;
  245. }
  246. event_callback_t *cb = smartlist_get(listener->callbacks, index);
  247. if (cb == NULL) {
  248. log_warn(LD_BUG, "We don't have a callback for this event");
  249. if (wrapper != NULL) {
  250. event_wrapper_free(wrapper);
  251. }
  252. tor_mutex_release(&listener->lock);
  253. return;
  254. }
  255. event_wrapper_t *last = TOR_TAILQ_LAST(&listener->pending_events,
  256. pending_events_head_t);
  257. if (cb->update_fn != NULL && last != NULL && last->label == label) {
  258. // the last added event was of the same type and we set an update function,
  259. // so we should update the last event rather than adding a new one
  260. cb->update_fn(label, &last->data, &wrapper->data);
  261. if (wrapper != NULL) {
  262. event_wrapper_free(wrapper);
  263. }
  264. } else {
  265. tor_assert(wrapper != NULL);
  266. TOR_TAILQ_INSERT_TAIL(&listener->pending_events, wrapper, next_event);
  267. }
  268. if (!listener->is_pending && notify) {
  269. listener->is_pending = true;
  270. if (listener->eventloop_ev != NULL) {
  271. event_active(listener->eventloop_ev, EV_READ, 1);
  272. }
  273. }
  274. tor_mutex_release(&listener->lock);
  275. }
  276. static void
  277. event_listener_wakeup(event_listener_t *listener)
  278. {
  279. tor_assert(listener != NULL);
  280. tor_mutex_acquire(&listener->lock);
  281. if (!listener->is_pending && !TOR_TAILQ_EMPTY(&listener->pending_events)) {
  282. // not pending but have waiting events
  283. listener->is_pending = true;
  284. if (listener->eventloop_ev != NULL) {
  285. event_active(listener->eventloop_ev, EV_READ, 1);
  286. }
  287. }
  288. tor_mutex_release(&listener->lock);
  289. }
  290. void
  291. event_listener_process(event_listener_t *listener)
  292. {
  293. tor_assert(listener != NULL);
  294. tor_mutex_acquire(&listener->lock);
  295. void *context = listener->context;
  296. while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
  297. event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
  298. TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
  299. tor_assert(wrapper != NULL);
  300. process_event_fn_t process_event_fn = NULL;
  301. int index = (int)wrapper->label;
  302. // do we have a callback for this event label?
  303. if (PREDICT_LIKELY(index < smartlist_len(listener->callbacks))) {
  304. event_callback_t *cb = smartlist_get(listener->callbacks, index);
  305. if (cb != NULL) {
  306. process_event_fn = cb->process_event_fn;
  307. }
  308. }
  309. tor_mutex_release(&listener->lock);
  310. if (PREDICT_LIKELY(process_event_fn != NULL)) {
  311. process_event_fn(wrapper->label, wrapper->data, context);
  312. } else {
  313. // no callback available
  314. log_warn(LD_BUG, "An event was received but had no callback");
  315. }
  316. event_wrapper_free(wrapper);
  317. tor_mutex_acquire(&listener->lock);
  318. }
  319. listener->is_pending = false;
  320. tor_mutex_release(&listener->lock);
  321. }
  322. /**************************/
  323. event_source_t *
  324. event_source_new(void)
  325. {
  326. event_source_t* source = tor_malloc_zero(sizeof(event_source_t));
  327. tor_mutex_init(&source->lock);
  328. source->deliver_silently = smartlist_new();
  329. source->subscriptions = smartlist_new();
  330. return source;
  331. }
  332. void
  333. event_source_free(event_source_t *source)
  334. {
  335. tor_assert(source != NULL);
  336. tor_mutex_uninit(&source->lock);
  337. SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
  338. if (sub != NULL) {
  339. event_subscription_free(sub);
  340. }
  341. } SMARTLIST_FOREACH_END(sub);
  342. smartlist_free(source->subscriptions);
  343. smartlist_free(source->deliver_silently);
  344. memset(source, 0x00, sizeof(*source));
  345. tor_free(source);
  346. }
  347. void
  348. event_source_subscribe(event_source_t *source, event_listener_t *listener,
  349. event_label_t label)
  350. {
  351. tor_assert(source != NULL);
  352. tor_assert(listener != NULL);
  353. tor_assert(label != EVENT_LABEL_UNSET);
  354. int index = (int)label;
  355. tor_assert(index >= 0);
  356. if (index >= 1000) {
  357. log_warn(LD_BUG, "An event label was very large (%d), but the event source "
  358. "assumes that event labels are small.", index);
  359. /* We're using a smartlist as a lookup table, and assume that the labels are
  360. small and therefore the list should not be sparse. If the label is large,
  361. then we either have *many* events, or we're choosing our event labels
  362. inefficiently. */
  363. }
  364. event_subscription_t *sub = event_subscription_new(listener);
  365. tor_mutex_acquire(&source->lock);
  366. smartlist_grow(source->subscriptions, index+1);
  367. event_subscription_t *existing_sub = smartlist_get(source->subscriptions, index);
  368. if (existing_sub != NULL) {
  369. // we only support one listener per event type
  370. event_subscription_free(existing_sub);
  371. log_warn(LD_BUG, "We are overriding a previous listener.");
  372. }
  373. smartlist_set(source->subscriptions, index, sub);
  374. tor_mutex_release(&source->lock);
  375. }
  376. void
  377. event_source_unsubscribe(event_source_t *source, event_listener_t *listener,
  378. event_label_t label)
  379. {
  380. tor_assert(source != NULL);
  381. tor_assert(listener != NULL);
  382. tor_assert(label != EVENT_LABEL_UNSET);
  383. int index = (int)label;
  384. tor_assert(index >= 0);
  385. tor_mutex_acquire(&source->lock);
  386. if (index >= smartlist_len(source->subscriptions)) {
  387. // there are no subscribers for this event
  388. log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
  389. tor_mutex_release(&source->lock);
  390. return;
  391. }
  392. event_subscription_t *current_sub = smartlist_get(source->subscriptions, index);
  393. if (current_sub == NULL || current_sub->listener != listener) {
  394. log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
  395. tor_mutex_release(&source->lock);
  396. return;
  397. }
  398. smartlist_set(source->subscriptions, index, NULL);
  399. event_subscription_free(current_sub);
  400. tor_mutex_release(&source->lock);
  401. }
  402. void
  403. event_source_unsubscribe_all(event_source_t *source, event_listener_t *listener)
  404. {
  405. tor_assert(source != NULL);
  406. tor_assert(listener != NULL);
  407. tor_mutex_acquire(&source->lock);
  408. SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
  409. if (sub != NULL && sub->listener == listener) {
  410. event_subscription_free(sub);
  411. SMARTLIST_REPLACE_CURRENT(source->subscriptions, sub, NULL);
  412. }
  413. } SMARTLIST_FOREACH_END(sub);
  414. tor_mutex_release(&source->lock);
  415. }
  416. void
  417. event_source_publish(event_source_t *source, event_label_t label,
  418. event_data_t data, void (*free_data_fn)(void *))
  419. {
  420. tor_assert(source != NULL);
  421. tor_assert(label != EVENT_LABEL_UNSET);
  422. int index = (int)label;
  423. tor_assert(index >= 0);
  424. tor_mutex_acquire(&source->lock);
  425. if (index >= smartlist_len(source->subscriptions)) {
  426. // there are no subscribers for this event
  427. tor_mutex_release(&source->lock);
  428. if (free_data_fn != NULL) {
  429. free_data_fn(data.ptr);
  430. }
  431. return;
  432. }
  433. event_subscription_t *sub = smartlist_get(source->subscriptions, index);
  434. if (sub == NULL || sub->listener == NULL) {
  435. // there are no subscribers for this event
  436. tor_mutex_release(&source->lock);
  437. if (free_data_fn != NULL) {
  438. free_data_fn(data.ptr);
  439. }
  440. return;
  441. }
  442. bool deliver_silently;
  443. if (index >= smartlist_len(source->deliver_silently)) {
  444. // default is to not deliver silently
  445. deliver_silently = false;
  446. } else {
  447. deliver_silently = (smartlist_get(source->deliver_silently, index) != 0);
  448. }
  449. event_wrapper_t *wrapper = NULL;
  450. wrapper = event_wrapper_new(label, data, free_data_fn);
  451. event_listener_receive(sub->listener, label, wrapper, !deliver_silently);
  452. tor_mutex_release(&source->lock);
  453. }
  454. void
  455. event_source_deliver_silently(event_source_t *source, event_label_t label,
  456. bool deliver_silently)
  457. {
  458. tor_assert(source != NULL);
  459. tor_assert(label != EVENT_LABEL_UNSET);
  460. int index = (int)label;
  461. tor_assert(index >= 0);
  462. if (index >= 1000) {
  463. log_warn(LD_BUG, "An event label was very large (%d), but the event source "
  464. "assumes that event labels are small.", index);
  465. /* We're using a smartlist as a lookup table, and assume that the labels are
  466. small and therefore the list should not be sparse. If the label is large,
  467. then we either have *many* events, or we're choosing our event labels
  468. inefficiently. */
  469. }
  470. tor_mutex_acquire(&source->lock);
  471. smartlist_grow(source->deliver_silently, index+1);
  472. // default is to not deliver silently
  473. smartlist_set(source->deliver_silently, index, (void *)deliver_silently);
  474. tor_mutex_release(&source->lock);
  475. }
  476. void
  477. event_source_wakeup_listener(event_source_t *source, event_label_t label)
  478. {
  479. tor_assert(source != NULL);
  480. tor_assert(label != EVENT_LABEL_UNSET);
  481. int index = (int)label;
  482. tor_assert(index >= 0);
  483. tor_mutex_acquire(&source->lock);
  484. if (index >= smartlist_len(source->subscriptions)) {
  485. // there are no subscribers for this event
  486. tor_mutex_release(&source->lock);
  487. return;
  488. }
  489. event_subscription_t *sub = smartlist_get(source->subscriptions, index);
  490. if (sub == NULL || sub->listener == NULL) {
  491. // there are no subscribers for this event
  492. tor_mutex_release(&source->lock);
  493. return;
  494. }
  495. event_listener_wakeup(sub->listener);
  496. tor_mutex_release(&source->lock);
  497. }