events.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591
  1. /* Copyright (c) 2013-2019, The Tor Project, Inc. */
  2. /* See LICENSE for licensing information */
  3. #include "lib/evloop/events.h"
  4. #include "lib/log/util_bug.h"
  5. #include <event2/util.h>
  6. #include <event2/event.h>
  7. #include <string.h>
  8. /* How a subscribed listener wants to receive an event. */
  9. typedef struct event_subscription_t {
  10. event_listener_t *listener;
  11. bool send_full_event;
  12. } event_subscription_t;
  13. /* What a listener should do if it receives an event. */
  14. typedef struct event_callback_t {
  15. bool edge_triggered;
  16. bool is_edge_pending;
  17. void (*process_event_fn)(event_label_t, event_data_t, void *);
  18. } event_callback_t;
  19. /**************************/
  20. static event_subscription_t *
  21. event_subscription_new(event_listener_t *listener, bool send_full_event)
  22. {
  23. tor_assert(listener != NULL);
  24. event_subscription_t *sub = tor_malloc_zero(sizeof(event_subscription_t));
  25. sub->listener = listener;
  26. sub->send_full_event = send_full_event;
  27. return sub;
  28. }
  29. static void
  30. event_subscription_free(event_subscription_t *sub)
  31. {
  32. tor_assert(sub != NULL);
  33. memset(sub, 0x00, sizeof(*sub));
  34. tor_free(sub);
  35. }
  36. /**************************/
  37. static event_callback_t *
  38. event_callback_new(bool edge_triggered,
  39. void (*process_event_fn)(event_label_t, event_data_t, void *))
  40. {
  41. tor_assert(process_event_fn != NULL);
  42. event_callback_t *cb = tor_malloc_zero(sizeof(event_callback_t));
  43. cb->edge_triggered = edge_triggered;
  44. cb->is_edge_pending = false;
  45. cb->process_event_fn = process_event_fn;
  46. return cb;
  47. }
  48. static void
  49. event_callback_free(event_callback_t *cb)
  50. {
  51. tor_assert(cb != NULL);
  52. memset(cb, 0x00, sizeof(*cb));
  53. tor_free(cb);
  54. }
  55. /**************************/
  56. static event_wrapper_t *
  57. event_wrapper_new(event_label_t label,
  58. event_data_t data,
  59. void (*free_data_fn)(void *))
  60. {
  61. event_wrapper_t *wrapper = tor_malloc_zero(sizeof(event_wrapper_t));
  62. wrapper->label = label;
  63. wrapper->data = data;
  64. wrapper->free_data_fn = free_data_fn;
  65. return wrapper;
  66. }
  67. static void
  68. event_wrapper_free(event_wrapper_t *wrapper)
  69. {
  70. tor_assert(wrapper != NULL);
  71. if (wrapper->free_data_fn != NULL) {
  72. wrapper->free_data_fn(wrapper->data.ptr);
  73. }
  74. memset(wrapper, 0x00, sizeof(*wrapper));
  75. tor_free(wrapper);
  76. }
  77. /**************************/
  78. event_registry_t *
  79. event_registry_new(void)
  80. {
  81. event_registry_t* registry = tor_malloc_zero(sizeof(event_registry_t));
  82. tor_mutex_init(&registry->lock);
  83. registry->events = smartlist_new();
  84. return registry;
  85. }
  86. void
  87. event_registry_free(event_registry_t *registry)
  88. {
  89. tor_assert(registry != NULL);
  90. tor_mutex_uninit(&registry->lock);
  91. SMARTLIST_FOREACH_BEGIN(registry->events, char *, help_label) {
  92. if (help_label != NULL) {
  93. tor_free(help_label);
  94. }
  95. } SMARTLIST_FOREACH_END(help_label);
  96. smartlist_free(registry->events);
  97. memset(registry, 0x00, sizeof(*registry));
  98. tor_free(registry);
  99. }
  100. event_label_t
  101. event_registry_register_event(event_registry_t *registry,
  102. const char *help_label)
  103. {
  104. tor_assert(registry != NULL);
  105. tor_mutex_acquire(&registry->lock);
  106. int num_events = smartlist_len(registry->events);
  107. if (help_label) {
  108. smartlist_add_strdup(registry->events, help_label);
  109. } else {
  110. smartlist_add(registry->events, NULL);
  111. }
  112. tor_mutex_release(&registry->lock);
  113. return (event_label_t)num_events;
  114. }
  115. const char *
  116. event_registry_get_help_label(event_registry_t *registry,
  117. event_label_t event_label)
  118. {
  119. tor_assert(registry != NULL);
  120. tor_mutex_acquire(&registry->lock);
  121. int label_index = (int)event_label;
  122. tor_assert(label_index >= 0);
  123. const char *help_label = smartlist_get(registry->events,
  124. label_index);
  125. tor_mutex_release(&registry->lock);
  126. return help_label;
  127. }
  128. /**************************/
  129. static void
  130. event_listener_eventloop_cb(evutil_socket_t sock, short events, void *arg)
  131. {
  132. event_listener_t *listener = arg;
  133. (void) sock;
  134. (void) events;
  135. event_listener_process(listener);
  136. }
  137. event_listener_t *
  138. event_listener_new(void *context)
  139. {
  140. event_listener_t* listener = tor_malloc_zero(sizeof(event_listener_t));
  141. tor_mutex_init(&listener->lock);
  142. listener->is_pending = false;
  143. listener->callbacks = smartlist_new();
  144. TOR_TAILQ_INIT(&listener->pending_events);
  145. listener->context = context;
  146. listener->eventloop_ev = NULL;
  147. return listener;
  148. }
  149. void
  150. event_listener_free(event_listener_t *listener)
  151. {
  152. tor_assert(listener != NULL);
  153. tor_mutex_acquire(&listener->lock);
  154. if (listener->eventloop_ev != NULL) {
  155. event_listener_detach(listener);
  156. // this will make sure the libevent callback has stopped
  157. }
  158. while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
  159. event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
  160. TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
  161. event_wrapper_free(wrapper);
  162. }
  163. SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
  164. if (cb != NULL) {
  165. event_callback_free(cb);
  166. }
  167. } SMARTLIST_FOREACH_END(cb);
  168. smartlist_free(listener->callbacks);
  169. listener->context = NULL;
  170. listener->is_pending = false;
  171. tor_mutex_release(&listener->lock);
  172. tor_mutex_uninit(&listener->lock);
  173. memset(listener, 0x00, sizeof(*listener));
  174. tor_free(listener);
  175. }
  176. void
  177. event_listener_attach(event_listener_t *listener, struct event_base *base)
  178. {
  179. tor_assert(listener != NULL);
  180. tor_assert(base != NULL);
  181. tor_mutex_acquire(&listener->lock);
  182. tor_assert(listener->eventloop_ev == NULL);
  183. listener->eventloop_ev = tor_event_new(base, -1,
  184. EV_READ|EV_PERSIST, // TODO: do we need persist?
  185. event_listener_eventloop_cb,
  186. listener);
  187. if (listener->is_pending) {
  188. event_active(listener->eventloop_ev, EV_READ, 1);
  189. }
  190. tor_mutex_release(&listener->lock);
  191. }
  192. void
  193. event_listener_detach(event_listener_t *listener)
  194. {
  195. tor_assert(listener != NULL);
  196. tor_mutex_acquire(&listener->lock);
  197. if (listener->eventloop_ev != NULL) {
  198. tor_event_free(listener->eventloop_ev);
  199. listener->eventloop_ev = NULL;
  200. }
  201. tor_mutex_release(&listener->lock);
  202. }
  203. void
  204. event_listener_set_callback(event_listener_t *listener, event_label_t label,
  205. bool edge_triggered,
  206. void (*process_event_fn)(event_label_t,
  207. event_data_t,
  208. void *))
  209. {
  210. tor_assert(listener != NULL);
  211. tor_assert(label != EVENT_LABEL_UNSET);
  212. tor_assert(process_event_fn != NULL);
  213. int index = (int)label;
  214. tor_assert(index >= 0);
  215. event_callback_t *cb = event_callback_new(edge_triggered, process_event_fn);
  216. if (index >= 1000) {
  217. log_warn(LD_BUG, "An event label was very large (%d), but the event "
  218. "listener assumes that event labels are small.", index);
  219. /* We're using a smartlist as a lookup table, and assume that the labels are
  220. small and therefore the list should not be sparse. If the label is large,
  221. then we either have *many* events, or we're choosing our event labels
  222. inefficiently. */
  223. }
  224. tor_mutex_acquire(&listener->lock);
  225. smartlist_grow(listener->callbacks, index+1);
  226. event_callback_t *existing_cb = smartlist_get(listener->callbacks, index);
  227. if (existing_cb != NULL) {
  228. // we only support one callback per event type
  229. event_callback_free(existing_cb);
  230. log_warn(LD_BUG, "We are overriding a previous callback.");
  231. }
  232. smartlist_set(listener->callbacks, index, cb);
  233. tor_mutex_release(&listener->lock);
  234. }
  235. static void
  236. event_listener_receive(event_listener_t *listener, event_label_t label,
  237. event_wrapper_t *wrapper)
  238. {
  239. tor_assert(listener != NULL);
  240. tor_assert(label != EVENT_LABEL_UNSET);
  241. int index = (int)label;
  242. tor_assert(index >= 0);
  243. tor_mutex_acquire(&listener->lock);
  244. if (index >= smartlist_len(listener->callbacks)) {
  245. log_warn(LD_BUG, "We don't have a callback for this event");
  246. if (wrapper != NULL) {
  247. event_wrapper_free(wrapper);
  248. }
  249. tor_mutex_release(&listener->lock);
  250. return;
  251. }
  252. event_callback_t *cb = smartlist_get(listener->callbacks, index);
  253. if (cb == NULL) {
  254. log_warn(LD_BUG, "We don't have a callback for this event");
  255. if (wrapper != NULL) {
  256. event_wrapper_free(wrapper);
  257. }
  258. tor_mutex_release(&listener->lock);
  259. return;
  260. }
  261. if (cb->edge_triggered) {
  262. cb->is_edge_pending = true;
  263. if (wrapper != NULL) {
  264. log_warn(LD_BUG, "An edge-triggered event received a full event");
  265. event_wrapper_free(wrapper);
  266. }
  267. } else {
  268. tor_assert(wrapper != NULL);
  269. TOR_TAILQ_INSERT_TAIL(&listener->pending_events, wrapper, next_event);
  270. }
  271. if (!listener->is_pending) {
  272. listener->is_pending = true;
  273. if (listener->eventloop_ev != NULL) {
  274. event_active(listener->eventloop_ev, EV_READ, 1);
  275. }
  276. }
  277. tor_mutex_release(&listener->lock);
  278. }
  279. void
  280. event_listener_process(event_listener_t *listener)
  281. {
  282. tor_assert(listener != NULL);
  283. tor_mutex_acquire(&listener->lock);
  284. void *context = listener->context;
  285. bool more_events = true;
  286. while (more_events) {
  287. // first process edge-triggered events
  288. event_data_t null_data = { .ptr = NULL };
  289. SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
  290. if (cb != NULL && cb->is_edge_pending) {
  291. void (*process_event_fn)(event_label_t, event_data_t, void *) = NULL;
  292. process_event_fn = cb->process_event_fn;
  293. event_label_t label = (int)cb_sl_idx;
  294. cb->is_edge_pending = false;
  295. tor_mutex_release(&listener->lock);
  296. if (PREDICT_LIKELY(process_event_fn != NULL)) {
  297. process_event_fn(label, null_data, context);
  298. // edge-triggered events don't have corresponding event data
  299. } else {
  300. // no callback available
  301. log_warn(LD_BUG, "An edge event was received but had no callback");
  302. }
  303. tor_mutex_acquire(&listener->lock);
  304. // while we were unlocked, the list may have changed length,
  305. // so we make sure it's correct
  306. cb_sl_len = smartlist_len(listener->callbacks);
  307. }
  308. } SMARTLIST_FOREACH_END(cb);
  309. // then process regular events
  310. while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
  311. event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
  312. TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
  313. tor_assert(wrapper != NULL);
  314. void (*process_event_fn)(event_label_t, event_data_t, void *) = NULL;
  315. int index = (int)wrapper->label;
  316. // do we have a callback for this event label?
  317. if (PREDICT_LIKELY(index < smartlist_len(listener->callbacks))) {
  318. event_callback_t *cb = smartlist_get(listener->callbacks, index);
  319. if (cb != NULL) {
  320. tor_assert(!cb->edge_triggered);
  321. process_event_fn = cb->process_event_fn;
  322. }
  323. }
  324. tor_mutex_release(&listener->lock);
  325. if (PREDICT_LIKELY(process_event_fn != NULL)) {
  326. process_event_fn(wrapper->label, wrapper->data, context);
  327. } else {
  328. // no callback available
  329. log_warn(LD_BUG, "An event was received but had no callback");
  330. }
  331. event_wrapper_free(wrapper);
  332. tor_mutex_acquire(&listener->lock);
  333. }
  334. // there's a possibility edge events have been added while running callbacks
  335. more_events = false;
  336. SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
  337. if (cb != NULL && cb->is_edge_pending) {
  338. more_events = true;
  339. }
  340. } SMARTLIST_FOREACH_END(cb);
  341. }
  342. listener->is_pending = false;
  343. tor_mutex_release(&listener->lock);
  344. }
  345. /**************************/
  346. event_source_t *
  347. event_source_new(void)
  348. {
  349. event_source_t* source = tor_malloc_zero(sizeof(event_source_t));
  350. tor_mutex_init(&source->lock);
  351. source->subscriptions = smartlist_new();
  352. return source;
  353. }
  354. void
  355. event_source_free(event_source_t *source)
  356. {
  357. tor_assert(source != NULL);
  358. tor_mutex_uninit(&source->lock);
  359. SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
  360. if (sub != NULL) {
  361. event_subscription_free(sub);
  362. }
  363. } SMARTLIST_FOREACH_END(sub);
  364. smartlist_free(source->subscriptions);
  365. memset(source, 0x00, sizeof(*source));
  366. tor_free(source);
  367. }
  368. void
  369. event_source_subscribe(event_source_t *source, event_listener_t *listener,
  370. event_label_t label, bool send_full_event)
  371. {
  372. tor_assert(source != NULL);
  373. tor_assert(listener != NULL);
  374. tor_assert(label != EVENT_LABEL_UNSET);
  375. int index = (int)label;
  376. tor_assert(index >= 0);
  377. if (index >= 1000) {
  378. log_warn(LD_BUG, "An event label was very large (%d), but the event source "
  379. "assumes that event labels are small.", index);
  380. /* We're using a smartlist as a lookup table, and assume that the labels are
  381. small and therefore the list should not be sparse. If the label is large,
  382. then we either have *many* events, or we're choosing our event labels
  383. inefficiently. */
  384. }
  385. event_subscription_t *sub = event_subscription_new(listener, send_full_event);
  386. tor_mutex_acquire(&source->lock);
  387. smartlist_grow(source->subscriptions, index+1);
  388. event_subscription_t *existing_sub = smartlist_get(source->subscriptions, index);
  389. if (existing_sub != NULL) {
  390. // we only support one listener per event type
  391. event_subscription_free(existing_sub);
  392. log_warn(LD_BUG, "We are overriding a previous listener.");
  393. }
  394. smartlist_set(source->subscriptions, index, sub);
  395. tor_mutex_release(&source->lock);
  396. }
  397. void
  398. event_source_unsubscribe(event_source_t *source, event_listener_t *listener,
  399. event_label_t label)
  400. {
  401. tor_assert(source != NULL);
  402. tor_assert(listener != NULL);
  403. tor_assert(label != EVENT_LABEL_UNSET);
  404. int index = (int)label;
  405. tor_assert(index >= 0);
  406. tor_mutex_acquire(&source->lock);
  407. if (index >= smartlist_len(source->subscriptions)) {
  408. // there are no subscribers for this event
  409. log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
  410. tor_mutex_release(&source->lock);
  411. return;
  412. }
  413. event_subscription_t *current_sub = smartlist_get(source->subscriptions, index);
  414. if (current_sub == NULL || current_sub->listener != listener) {
  415. log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
  416. tor_mutex_release(&source->lock);
  417. return;
  418. }
  419. smartlist_set(source->subscriptions, index, NULL);
  420. event_subscription_free(current_sub);
  421. tor_mutex_release(&source->lock);
  422. }
  423. void
  424. event_source_unsubscribe_all(event_source_t *source, event_listener_t *listener)
  425. {
  426. tor_assert(source != NULL);
  427. tor_assert(listener != NULL);
  428. tor_mutex_acquire(&source->lock);
  429. SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
  430. if (sub != NULL && sub->listener == listener) {
  431. event_subscription_free(sub);
  432. SMARTLIST_REPLACE_CURRENT(source->subscriptions, sub, NULL);
  433. }
  434. } SMARTLIST_FOREACH_END(sub);
  435. tor_mutex_release(&source->lock);
  436. }
  437. void
  438. event_source_publish(event_source_t *source, event_label_t label,
  439. event_data_t data, void (*free_data_fn)(void *))
  440. {
  441. tor_assert(source != NULL);
  442. tor_assert(label != EVENT_LABEL_UNSET);
  443. int index = (int)label;
  444. tor_assert(index >= 0);
  445. tor_mutex_acquire(&source->lock);
  446. if (index >= smartlist_len(source->subscriptions)) {
  447. // there are no subscribers for this event
  448. tor_mutex_release(&source->lock);
  449. if (free_data_fn != NULL) {
  450. free_data_fn(data.ptr);
  451. }
  452. return;
  453. }
  454. event_subscription_t *sub = smartlist_get(source->subscriptions, index);
  455. if (sub == NULL || sub->listener == NULL) {
  456. // there are no subscribers for this event
  457. tor_mutex_release(&source->lock);
  458. if (free_data_fn != NULL) {
  459. free_data_fn(data.ptr);
  460. }
  461. return;
  462. }
  463. event_wrapper_t *wrapper = NULL;
  464. if (sub->send_full_event) {
  465. wrapper = event_wrapper_new(label, data, free_data_fn);
  466. } else {
  467. // we don't need to send an event, so free the data now
  468. if (free_data_fn != NULL) {
  469. free_data_fn(data.ptr);
  470. }
  471. }
  472. event_listener_receive(sub->listener, label, wrapper);
  473. tor_mutex_release(&source->lock);
  474. }