events.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. /* Copyright (c) 2013-2019, The Tor Project, Inc. */
  2. /* See LICENSE for licensing information */
  3. #include "lib/evloop/events.h"
  4. #include "lib/log/util_bug.h"
  5. #include <event2/util.h>
  6. #include <event2/event.h>
  7. #include <string.h>
  8. /* How a subscribed listener wants to receive an event. */
  9. typedef struct event_subscription_t {
  10. event_listener_t *listener;
  11. } event_subscription_t;
  12. /* What a listener should do if it receives an event. */
  13. typedef struct event_callback_t {
  14. event_update_fn_t update_fn;
  15. process_event_fn_t process_event_fn;
  16. } event_callback_t;
  17. /**************************/
  18. static event_subscription_t *
  19. event_subscription_new(event_listener_t *listener)
  20. {
  21. tor_assert(listener != NULL);
  22. event_subscription_t *sub = tor_malloc_zero(sizeof(event_subscription_t));
  23. sub->listener = listener;
  24. return sub;
  25. }
  26. static void
  27. event_subscription_free(event_subscription_t *sub)
  28. {
  29. tor_assert(sub != NULL);
  30. memset(sub, 0x00, sizeof(*sub));
  31. tor_free(sub);
  32. }
  33. /**************************/
  34. static event_callback_t *
  35. event_callback_new(event_update_fn_t update_fn,
  36. process_event_fn_t process_event_fn)
  37. {
  38. tor_assert(process_event_fn != NULL);
  39. event_callback_t *cb = tor_malloc_zero(sizeof(event_callback_t));
  40. cb->update_fn = update_fn;
  41. cb->process_event_fn = process_event_fn;
  42. return cb;
  43. }
  44. static void
  45. event_callback_free(event_callback_t *cb)
  46. {
  47. tor_assert(cb != NULL);
  48. memset(cb, 0x00, sizeof(*cb));
  49. tor_free(cb);
  50. }
  51. /**************************/
  52. static event_wrapper_t *
  53. event_wrapper_new(event_label_t label,
  54. event_data_t data,
  55. void (*free_data_fn)(void *))
  56. {
  57. event_wrapper_t *wrapper = tor_malloc_zero(sizeof(event_wrapper_t));
  58. wrapper->label = label;
  59. wrapper->data = data;
  60. wrapper->free_data_fn = free_data_fn;
  61. return wrapper;
  62. }
  63. static void
  64. event_wrapper_free(event_wrapper_t *wrapper)
  65. {
  66. tor_assert(wrapper != NULL);
  67. if (wrapper->free_data_fn != NULL) {
  68. wrapper->free_data_fn(wrapper->data.ptr);
  69. }
  70. memset(wrapper, 0x00, sizeof(*wrapper));
  71. tor_free(wrapper);
  72. }
  73. /**************************/
  74. event_registry_t *
  75. event_registry_new(void)
  76. {
  77. event_registry_t* registry = tor_malloc_zero(sizeof(event_registry_t));
  78. tor_mutex_init(&registry->lock);
  79. registry->events = smartlist_new();
  80. return registry;
  81. }
  82. void
  83. event_registry_free(event_registry_t *registry)
  84. {
  85. tor_assert(registry != NULL);
  86. tor_mutex_uninit(&registry->lock);
  87. SMARTLIST_FOREACH_BEGIN(registry->events, char *, help_label) {
  88. if (help_label != NULL) {
  89. tor_free(help_label);
  90. }
  91. } SMARTLIST_FOREACH_END(help_label);
  92. smartlist_free(registry->events);
  93. memset(registry, 0x00, sizeof(*registry));
  94. tor_free(registry);
  95. }
  96. event_label_t
  97. event_registry_register_event(event_registry_t *registry,
  98. const char *help_label)
  99. {
  100. tor_assert(registry != NULL);
  101. tor_mutex_acquire(&registry->lock);
  102. int num_events = smartlist_len(registry->events);
  103. if (help_label) {
  104. smartlist_add_strdup(registry->events, help_label);
  105. } else {
  106. smartlist_add(registry->events, NULL);
  107. }
  108. tor_mutex_release(&registry->lock);
  109. return (event_label_t)num_events;
  110. }
  111. const char *
  112. event_registry_get_help_label(event_registry_t *registry,
  113. event_label_t event_label)
  114. {
  115. tor_assert(registry != NULL);
  116. tor_mutex_acquire(&registry->lock);
  117. int label_index = (int)event_label;
  118. tor_assert(label_index >= 0);
  119. const char *help_label = smartlist_get(registry->events,
  120. label_index);
  121. tor_mutex_release(&registry->lock);
  122. return help_label;
  123. }
  124. /**************************/
  125. static void
  126. event_listener_eventloop_cb(evutil_socket_t sock, short events, void *arg)
  127. {
  128. event_listener_t *listener = arg;
  129. (void) sock;
  130. (void) events;
  131. event_listener_process(listener);
  132. }
  133. event_listener_t *
  134. event_listener_new(void *context)
  135. {
  136. event_listener_t* listener = tor_malloc_zero(sizeof(event_listener_t));
  137. tor_mutex_init(&listener->lock);
  138. listener->is_pending = false;
  139. listener->callbacks = smartlist_new();
  140. TOR_TAILQ_INIT(&listener->pending_events);
  141. listener->context = context;
  142. listener->eventloop_ev = NULL;
  143. listener->max_iterations = -1;
  144. return listener;
  145. }
  146. void
  147. event_listener_free(event_listener_t *listener)
  148. {
  149. tor_assert(listener != NULL);
  150. tor_mutex_acquire(&listener->lock);
  151. if (listener->eventloop_ev != NULL) {
  152. event_listener_detach(listener);
  153. // this will make sure the libevent callback has stopped
  154. }
  155. while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
  156. event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
  157. TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
  158. event_wrapper_free(wrapper);
  159. }
  160. SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
  161. if (cb != NULL) {
  162. event_callback_free(cb);
  163. }
  164. } SMARTLIST_FOREACH_END(cb);
  165. smartlist_free(listener->callbacks);
  166. listener->context = NULL;
  167. listener->is_pending = false;
  168. tor_mutex_release(&listener->lock);
  169. tor_mutex_uninit(&listener->lock);
  170. memset(listener, 0x00, sizeof(*listener));
  171. tor_free(listener);
  172. }
  173. void
  174. event_listener_set_max_iterations(event_listener_t *listener, int max_iterations)
  175. {
  176. tor_assert(listener != NULL);
  177. tor_mutex_acquire(&listener->lock);
  178. listener->max_iterations = max_iterations;
  179. tor_mutex_release(&listener->lock);
  180. }
  181. void
  182. event_listener_attach(event_listener_t *listener, struct event_base *base)
  183. {
  184. tor_assert(listener != NULL);
  185. tor_assert(base != NULL);
  186. tor_mutex_acquire(&listener->lock);
  187. tor_assert(listener->eventloop_ev == NULL);
  188. listener->eventloop_ev = tor_event_new(base, -1,
  189. EV_READ|EV_PERSIST, // TODO: do we need persist?
  190. event_listener_eventloop_cb,
  191. listener);
  192. if (listener->is_pending) {
  193. event_active(listener->eventloop_ev, EV_READ, 1);
  194. }
  195. tor_mutex_release(&listener->lock);
  196. }
  197. void
  198. event_listener_detach(event_listener_t *listener)
  199. {
  200. tor_assert(listener != NULL);
  201. tor_mutex_acquire(&listener->lock);
  202. if (listener->eventloop_ev != NULL) {
  203. tor_event_free(listener->eventloop_ev);
  204. listener->eventloop_ev = NULL;
  205. }
  206. tor_mutex_release(&listener->lock);
  207. }
  208. void
  209. event_listener_set_callback(event_listener_t *listener, event_label_t label,
  210. event_update_fn_t update_fn,
  211. process_event_fn_t process_event_fn)
  212. {
  213. tor_assert(listener != NULL);
  214. tor_assert(label != EVENT_LABEL_UNSET);
  215. tor_assert(process_event_fn != NULL);
  216. int index = (int)label;
  217. tor_assert(index >= 0);
  218. event_callback_t *cb = event_callback_new(update_fn, process_event_fn);
  219. if (index >= 1000) {
  220. log_warn(LD_BUG, "An event label was very large (%d), but the event "
  221. "listener assumes that event labels are small.", index);
  222. /* We're using a smartlist as a lookup table, and assume that the labels are
  223. small and therefore the list should not be sparse. If the label is large,
  224. then we either have *many* events, or we're choosing our event labels
  225. inefficiently. */
  226. }
  227. tor_mutex_acquire(&listener->lock);
  228. smartlist_grow(listener->callbacks, index+1);
  229. event_callback_t *existing_cb = smartlist_get(listener->callbacks, index);
  230. if (existing_cb != NULL) {
  231. // we only support one callback per event type
  232. event_callback_free(existing_cb);
  233. log_warn(LD_BUG, "We are overriding a previous callback.");
  234. }
  235. smartlist_set(listener->callbacks, index, cb);
  236. tor_mutex_release(&listener->lock);
  237. }
  238. static void
  239. event_listener_receive(event_listener_t *listener, event_label_t label,
  240. event_wrapper_t *wrapper, bool notify)
  241. {
  242. tor_assert(listener != NULL);
  243. tor_assert(label != EVENT_LABEL_UNSET);
  244. int index = (int)label;
  245. tor_assert(index >= 0);
  246. tor_mutex_acquire(&listener->lock);
  247. if (index >= smartlist_len(listener->callbacks)) {
  248. log_warn(LD_BUG, "We don't have a callback for this event");
  249. if (wrapper != NULL) {
  250. event_wrapper_free(wrapper);
  251. }
  252. tor_mutex_release(&listener->lock);
  253. return;
  254. }
  255. event_callback_t *cb = smartlist_get(listener->callbacks, index);
  256. if (cb == NULL) {
  257. log_warn(LD_BUG, "We don't have a callback for this event");
  258. if (wrapper != NULL) {
  259. event_wrapper_free(wrapper);
  260. }
  261. tor_mutex_release(&listener->lock);
  262. return;
  263. }
  264. event_wrapper_t *last = TOR_TAILQ_LAST(&listener->pending_events,
  265. pending_events_head_t);
  266. if (cb->update_fn != NULL && last != NULL && last->label == label) {
  267. // the last added event was of the same type and we set an update function,
  268. // so we should update the last event rather than adding a new one
  269. cb->update_fn(label, &last->data, &wrapper->data);
  270. if (wrapper != NULL) {
  271. event_wrapper_free(wrapper);
  272. }
  273. } else {
  274. tor_assert(wrapper != NULL);
  275. TOR_TAILQ_INSERT_TAIL(&listener->pending_events, wrapper, next_event);
  276. }
  277. if (!listener->is_pending && notify) {
  278. listener->is_pending = true;
  279. if (listener->eventloop_ev != NULL) {
  280. event_active(listener->eventloop_ev, EV_READ, 1);
  281. }
  282. }
  283. tor_mutex_release(&listener->lock);
  284. }
  285. static void
  286. event_listener_wakeup(event_listener_t *listener)
  287. {
  288. tor_assert(listener != NULL);
  289. tor_mutex_acquire(&listener->lock);
  290. if (!listener->is_pending && !TOR_TAILQ_EMPTY(&listener->pending_events)) {
  291. // not pending but have waiting events
  292. listener->is_pending = true;
  293. if (listener->eventloop_ev != NULL) {
  294. event_active(listener->eventloop_ev, EV_READ, 1);
  295. }
  296. }
  297. tor_mutex_release(&listener->lock);
  298. }
  299. void
  300. event_listener_process(event_listener_t *listener)
  301. {
  302. tor_assert(listener != NULL);
  303. int counter = 0;
  304. tor_mutex_acquire(&listener->lock);
  305. void *context = listener->context;
  306. int max_iterations = listener->max_iterations;
  307. while (!TOR_TAILQ_EMPTY(&listener->pending_events) &&
  308. (max_iterations < 0 || counter < max_iterations)) {
  309. event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
  310. TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
  311. tor_assert(wrapper != NULL);
  312. process_event_fn_t process_event_fn = NULL;
  313. int index = (int)wrapper->label;
  314. // do we have a callback for this event label?
  315. if (PREDICT_LIKELY(index < smartlist_len(listener->callbacks))) {
  316. event_callback_t *cb = smartlist_get(listener->callbacks, index);
  317. if (cb != NULL) {
  318. process_event_fn = cb->process_event_fn;
  319. }
  320. }
  321. tor_mutex_release(&listener->lock);
  322. if (PREDICT_LIKELY(process_event_fn != NULL)) {
  323. process_event_fn(wrapper->label, wrapper->data, context);
  324. counter += 1;
  325. // only increase the counter if a callback was run
  326. } else {
  327. // no callback available
  328. log_warn(LD_BUG, "An event was received but had no callback");
  329. }
  330. event_wrapper_free(wrapper);
  331. tor_mutex_acquire(&listener->lock);
  332. }
  333. if (TOR_TAILQ_EMPTY(&listener->pending_events)) {
  334. listener->is_pending = false;
  335. } else {
  336. event_active(listener->eventloop_ev, EV_READ, 1);
  337. }
  338. tor_mutex_release(&listener->lock);
  339. }
  340. /**************************/
  341. event_source_t *
  342. event_source_new(void)
  343. {
  344. event_source_t* source = tor_malloc_zero(sizeof(event_source_t));
  345. tor_mutex_init(&source->lock);
  346. source->deliver_silently = smartlist_new();
  347. source->subscriptions = smartlist_new();
  348. return source;
  349. }
  350. void
  351. event_source_free(event_source_t *source)
  352. {
  353. tor_assert(source != NULL);
  354. tor_mutex_uninit(&source->lock);
  355. SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
  356. if (sub != NULL) {
  357. event_subscription_free(sub);
  358. }
  359. } SMARTLIST_FOREACH_END(sub);
  360. smartlist_free(source->subscriptions);
  361. smartlist_free(source->deliver_silently);
  362. memset(source, 0x00, sizeof(*source));
  363. tor_free(source);
  364. }
  365. void
  366. event_source_subscribe(event_source_t *source, event_listener_t *listener,
  367. event_label_t label)
  368. {
  369. tor_assert(source != NULL);
  370. tor_assert(listener != NULL);
  371. tor_assert(label != EVENT_LABEL_UNSET);
  372. int index = (int)label;
  373. tor_assert(index >= 0);
  374. if (index >= 1000) {
  375. log_warn(LD_BUG, "An event label was very large (%d), but the event source "
  376. "assumes that event labels are small.", index);
  377. /* We're using a smartlist as a lookup table, and assume that the labels are
  378. small and therefore the list should not be sparse. If the label is large,
  379. then we either have *many* events, or we're choosing our event labels
  380. inefficiently. */
  381. }
  382. event_subscription_t *sub = event_subscription_new(listener);
  383. tor_mutex_acquire(&source->lock);
  384. smartlist_grow(source->subscriptions, index+1);
  385. event_subscription_t *existing_sub = smartlist_get(source->subscriptions, index);
  386. if (existing_sub != NULL) {
  387. // we only support one listener per event type
  388. event_subscription_free(existing_sub);
  389. log_warn(LD_BUG, "We are overriding a previous listener.");
  390. }
  391. smartlist_set(source->subscriptions, index, sub);
  392. tor_mutex_release(&source->lock);
  393. }
  394. void
  395. event_source_unsubscribe(event_source_t *source, event_listener_t *listener,
  396. event_label_t label)
  397. {
  398. tor_assert(source != NULL);
  399. tor_assert(listener != NULL);
  400. tor_assert(label != EVENT_LABEL_UNSET);
  401. int index = (int)label;
  402. tor_assert(index >= 0);
  403. tor_mutex_acquire(&source->lock);
  404. if (index >= smartlist_len(source->subscriptions)) {
  405. // there are no subscribers for this event
  406. log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
  407. tor_mutex_release(&source->lock);
  408. return;
  409. }
  410. event_subscription_t *current_sub = smartlist_get(source->subscriptions, index);
  411. if (current_sub == NULL || current_sub->listener != listener) {
  412. log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
  413. tor_mutex_release(&source->lock);
  414. return;
  415. }
  416. smartlist_set(source->subscriptions, index, NULL);
  417. event_subscription_free(current_sub);
  418. tor_mutex_release(&source->lock);
  419. }
  420. void
  421. event_source_unsubscribe_all(event_source_t *source, event_listener_t *listener)
  422. {
  423. tor_assert(source != NULL);
  424. tor_assert(listener != NULL);
  425. tor_mutex_acquire(&source->lock);
  426. SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
  427. if (sub != NULL && sub->listener == listener) {
  428. event_subscription_free(sub);
  429. SMARTLIST_REPLACE_CURRENT(source->subscriptions, sub, NULL);
  430. }
  431. } SMARTLIST_FOREACH_END(sub);
  432. tor_mutex_release(&source->lock);
  433. }
  434. void
  435. event_source_publish(event_source_t *source, event_label_t label,
  436. event_data_t data, void (*free_data_fn)(void *))
  437. {
  438. tor_assert(source != NULL);
  439. tor_assert(label != EVENT_LABEL_UNSET);
  440. int index = (int)label;
  441. tor_assert(index >= 0);
  442. tor_mutex_acquire(&source->lock);
  443. if (index >= smartlist_len(source->subscriptions)) {
  444. // there are no subscribers for this event
  445. tor_mutex_release(&source->lock);
  446. if (free_data_fn != NULL) {
  447. free_data_fn(data.ptr);
  448. }
  449. return;
  450. }
  451. event_subscription_t *sub = smartlist_get(source->subscriptions, index);
  452. if (sub == NULL || sub->listener == NULL) {
  453. // there are no subscribers for this event
  454. tor_mutex_release(&source->lock);
  455. if (free_data_fn != NULL) {
  456. free_data_fn(data.ptr);
  457. }
  458. return;
  459. }
  460. bool deliver_silently;
  461. if (index >= smartlist_len(source->deliver_silently)) {
  462. // default is to not deliver silently
  463. deliver_silently = false;
  464. } else {
  465. deliver_silently = (smartlist_get(source->deliver_silently, index) != 0);
  466. }
  467. event_wrapper_t *wrapper = NULL;
  468. wrapper = event_wrapper_new(label, data, free_data_fn);
  469. event_listener_receive(sub->listener, label, wrapper, !deliver_silently);
  470. tor_mutex_release(&source->lock);
  471. }
  472. void
  473. event_source_deliver_silently(event_source_t *source, event_label_t label,
  474. bool deliver_silently)
  475. {
  476. tor_assert(source != NULL);
  477. tor_assert(label != EVENT_LABEL_UNSET);
  478. int index = (int)label;
  479. tor_assert(index >= 0);
  480. if (index >= 1000) {
  481. log_warn(LD_BUG, "An event label was very large (%d), but the event source "
  482. "assumes that event labels are small.", index);
  483. /* We're using a smartlist as a lookup table, and assume that the labels are
  484. small and therefore the list should not be sparse. If the label is large,
  485. then we either have *many* events, or we're choosing our event labels
  486. inefficiently. */
  487. }
  488. tor_mutex_acquire(&source->lock);
  489. smartlist_grow(source->deliver_silently, index+1);
  490. // default is to not deliver silently
  491. smartlist_set(source->deliver_silently, index, (void *)deliver_silently);
  492. tor_mutex_release(&source->lock);
  493. }
  494. void
  495. event_source_wakeup_listener(event_source_t *source, event_label_t label)
  496. {
  497. tor_assert(source != NULL);
  498. tor_assert(label != EVENT_LABEL_UNSET);
  499. int index = (int)label;
  500. tor_assert(index >= 0);
  501. tor_mutex_acquire(&source->lock);
  502. if (index >= smartlist_len(source->subscriptions)) {
  503. // there are no subscribers for this event
  504. tor_mutex_release(&source->lock);
  505. return;
  506. }
  507. event_subscription_t *sub = smartlist_get(source->subscriptions, index);
  508. if (sub == NULL || sub->listener == NULL) {
  509. // there are no subscribers for this event
  510. tor_mutex_release(&source->lock);
  511. return;
  512. }
  513. event_listener_wakeup(sub->listener);
  514. tor_mutex_release(&source->lock);
  515. }