events.c 17 KB


  1. /* Copyright (c) 2013-2019, The Tor Project, Inc. */
  2. /* See LICENSE for licensing information */
  3. #include "lib/evloop/events.h"
  4. #include "lib/log/util_bug.h"
  5. #include <event2/util.h>
  6. #include <event2/event.h>
  7. #include <string.h>
  8. /* An unsupported libevent function. */
  9. void event_active_later_(struct event *, int);
  10. /* How a subscribed listener wants to receive an event. */
  11. typedef struct event_subscription_t {
  12. event_listener_t *listener;
  13. } event_subscription_t;
  14. /* What a listener should do if it receives an event. */
  15. typedef struct event_callback_t {
  16. event_update_fn_t update_fn;
  17. process_event_fn_t process_event_fn;
  18. } event_callback_t;
  19. /**************************/
  20. static event_subscription_t *
  21. event_subscription_new(event_listener_t *listener)
  22. {
  23. tor_assert(listener != NULL);
  24. event_subscription_t *sub = tor_malloc_zero(sizeof(event_subscription_t));
  25. sub->listener = listener;
  26. return sub;
  27. }
  28. static void
  29. event_subscription_free(event_subscription_t *sub)
  30. {
  31. tor_assert(sub != NULL);
  32. memset(sub, 0x00, sizeof(*sub));
  33. tor_free(sub);
  34. }
  35. /**************************/
  36. static event_callback_t *
  37. event_callback_new(event_update_fn_t update_fn,
  38. process_event_fn_t process_event_fn)
  39. {
  40. tor_assert(process_event_fn != NULL);
  41. event_callback_t *cb = tor_malloc_zero(sizeof(event_callback_t));
  42. cb->update_fn = update_fn;
  43. cb->process_event_fn = process_event_fn;
  44. return cb;
  45. }
  46. static void
  47. event_callback_free(event_callback_t *cb)
  48. {
  49. tor_assert(cb != NULL);
  50. memset(cb, 0x00, sizeof(*cb));
  51. tor_free(cb);
  52. }
  53. /**************************/
  54. static event_wrapper_t *
  55. event_wrapper_new(event_label_t label,
  56. event_data_t data,
  57. void (*free_data_fn)(void *))
  58. {
  59. event_wrapper_t *wrapper = tor_malloc_zero(sizeof(event_wrapper_t));
  60. wrapper->label = label;
  61. wrapper->data = data;
  62. wrapper->free_data_fn = free_data_fn;
  63. return wrapper;
  64. }
  65. static void
  66. event_wrapper_free(event_wrapper_t *wrapper)
  67. {
  68. tor_assert(wrapper != NULL);
  69. if (wrapper->free_data_fn != NULL) {
  70. wrapper->free_data_fn(wrapper->data.ptr);
  71. }
  72. memset(wrapper, 0x00, sizeof(*wrapper));
  73. tor_free(wrapper);
  74. }
  75. /**************************/
  76. event_registry_t *
  77. event_registry_new(void)
  78. {
  79. event_registry_t* registry = tor_malloc_zero(sizeof(event_registry_t));
  80. tor_mutex_init(&registry->lock);
  81. registry->events = smartlist_new();
  82. return registry;
  83. }
  84. void
  85. event_registry_free(event_registry_t *registry)
  86. {
  87. tor_assert(registry != NULL);
  88. tor_mutex_uninit(&registry->lock);
  89. SMARTLIST_FOREACH_BEGIN(registry->events, char *, help_label) {
  90. if (help_label != NULL) {
  91. tor_free(help_label);
  92. }
  93. } SMARTLIST_FOREACH_END(help_label);
  94. smartlist_free(registry->events);
  95. memset(registry, 0x00, sizeof(*registry));
  96. tor_free(registry);
  97. }
  98. event_label_t
  99. event_registry_register_event(event_registry_t *registry,
  100. const char *help_label)
  101. {
  102. tor_assert(registry != NULL);
  103. tor_mutex_acquire(&registry->lock);
  104. int num_events = smartlist_len(registry->events);
  105. if (help_label) {
  106. smartlist_add_strdup(registry->events, help_label);
  107. } else {
  108. smartlist_add(registry->events, NULL);
  109. }
  110. tor_mutex_release(&registry->lock);
  111. return (event_label_t)num_events;
  112. }
  113. const char *
  114. event_registry_get_help_label(event_registry_t *registry,
  115. event_label_t event_label)
  116. {
  117. tor_assert(registry != NULL);
  118. tor_mutex_acquire(&registry->lock);
  119. int label_index = (int)event_label;
  120. tor_assert(label_index >= 0);
  121. const char *help_label = smartlist_get(registry->events,
  122. label_index);
  123. tor_mutex_release(&registry->lock);
  124. return help_label;
  125. }
  126. /**************************/
  127. static void
  128. event_listener_eventloop_cb(evutil_socket_t sock, short events, void *arg)
  129. {
  130. event_listener_t *listener = arg;
  131. (void) sock;
  132. (void) events;
  133. event_listener_process(listener);
  134. }
  135. event_listener_t *
  136. event_listener_new(void *context)
  137. {
  138. event_listener_t* listener = tor_malloc_zero(sizeof(event_listener_t));
  139. tor_mutex_init(&listener->lock);
  140. listener->is_pending = false;
  141. listener->callbacks = smartlist_new();
  142. TOR_TAILQ_INIT(&listener->pending_events);
  143. listener->context = context;
  144. listener->eventloop_ev = NULL;
  145. listener->max_iterations = -1;
  146. return listener;
  147. }
  148. void
  149. event_listener_free(event_listener_t *listener)
  150. {
  151. tor_assert(listener != NULL);
  152. tor_mutex_acquire(&listener->lock);
  153. if (listener->eventloop_ev != NULL) {
  154. event_listener_detach(listener);
  155. // this will make sure the libevent callback has stopped
  156. }
  157. while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
  158. event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
  159. TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
  160. event_wrapper_free(wrapper);
  161. }
  162. SMARTLIST_FOREACH_BEGIN(listener->callbacks, event_callback_t *, cb) {
  163. if (cb != NULL) {
  164. event_callback_free(cb);
  165. }
  166. } SMARTLIST_FOREACH_END(cb);
  167. smartlist_free(listener->callbacks);
  168. listener->context = NULL;
  169. listener->is_pending = false;
  170. tor_mutex_release(&listener->lock);
  171. tor_mutex_uninit(&listener->lock);
  172. memset(listener, 0x00, sizeof(*listener));
  173. tor_free(listener);
  174. }
  175. void
  176. event_listener_set_max_iterations(event_listener_t *listener, int max_iterations)
  177. {
  178. tor_assert(listener != NULL);
  179. tor_mutex_acquire(&listener->lock);
  180. listener->max_iterations = max_iterations;
  181. tor_mutex_release(&listener->lock);
  182. }
  183. void
  184. event_listener_attach(event_listener_t *listener, struct event_base *base)
  185. {
  186. tor_assert(listener != NULL);
  187. tor_assert(base != NULL);
  188. tor_mutex_acquire(&listener->lock);
  189. tor_assert(listener->eventloop_ev == NULL);
  190. listener->eventloop_ev = tor_event_new(base, -1,
  191. EV_READ|EV_PERSIST, // TODO: do we need persist?
  192. event_listener_eventloop_cb,
  193. listener);
  194. if (listener->is_pending) {
  195. event_active(listener->eventloop_ev, EV_READ, 1);
  196. }
  197. tor_mutex_release(&listener->lock);
  198. }
  199. void
  200. event_listener_detach(event_listener_t *listener)
  201. {
  202. tor_assert(listener != NULL);
  203. tor_mutex_acquire(&listener->lock);
  204. if (listener->eventloop_ev != NULL) {
  205. tor_event_free(listener->eventloop_ev);
  206. listener->eventloop_ev = NULL;
  207. }
  208. tor_mutex_release(&listener->lock);
  209. }
  210. void
  211. event_listener_set_callback(event_listener_t *listener, event_label_t label,
  212. event_update_fn_t update_fn,
  213. process_event_fn_t process_event_fn)
  214. {
  215. tor_assert(listener != NULL);
  216. tor_assert(label != EVENT_LABEL_UNSET);
  217. tor_assert(process_event_fn != NULL);
  218. int index = (int)label;
  219. tor_assert(index >= 0);
  220. event_callback_t *cb = event_callback_new(update_fn, process_event_fn);
  221. if (index >= 1000) {
  222. log_warn(LD_BUG, "An event label was very large (%d), but the event "
  223. "listener assumes that event labels are small.", index);
  224. /* We're using a smartlist as a lookup table, and assume that the labels are
  225. small and therefore the list should not be sparse. If the label is large,
  226. then we either have *many* events, or we're choosing our event labels
  227. inefficiently. */
  228. }
  229. tor_mutex_acquire(&listener->lock);
  230. smartlist_grow(listener->callbacks, index+1);
  231. event_callback_t *existing_cb = smartlist_get(listener->callbacks, index);
  232. if (existing_cb != NULL) {
  233. // we only support one callback per event type
  234. event_callback_free(existing_cb);
  235. log_warn(LD_BUG, "We are overriding a previous callback.");
  236. }
  237. smartlist_set(listener->callbacks, index, cb);
  238. tor_mutex_release(&listener->lock);
  239. }
  240. static void
  241. event_listener_receive(event_listener_t *listener, event_label_t label,
  242. event_wrapper_t *wrapper, bool notify)
  243. {
  244. tor_assert(listener != NULL);
  245. tor_assert(label != EVENT_LABEL_UNSET);
  246. int index = (int)label;
  247. tor_assert(index >= 0);
  248. tor_mutex_acquire(&listener->lock);
  249. if (index >= smartlist_len(listener->callbacks)) {
  250. log_warn(LD_BUG, "We don't have a callback for this event");
  251. if (wrapper != NULL) {
  252. event_wrapper_free(wrapper);
  253. }
  254. tor_mutex_release(&listener->lock);
  255. return;
  256. }
  257. event_callback_t *cb = smartlist_get(listener->callbacks, index);
  258. if (cb == NULL) {
  259. log_warn(LD_BUG, "We don't have a callback for this event");
  260. if (wrapper != NULL) {
  261. event_wrapper_free(wrapper);
  262. }
  263. tor_mutex_release(&listener->lock);
  264. return;
  265. }
  266. event_wrapper_t *last = TOR_TAILQ_LAST(&listener->pending_events,
  267. pending_events_head_t);
  268. if (cb->update_fn != NULL && last != NULL && last->label == label) {
  269. // the last added event was of the same type and we set an update function,
  270. // so we should update the last event rather than adding a new one
  271. cb->update_fn(label, &last->data, &wrapper->data);
  272. if (wrapper != NULL) {
  273. event_wrapper_free(wrapper);
  274. }
  275. } else {
  276. tor_assert(wrapper != NULL);
  277. TOR_TAILQ_INSERT_TAIL(&listener->pending_events, wrapper, next_event);
  278. }
  279. if (!listener->is_pending && notify) {
  280. listener->is_pending = true;
  281. if (listener->eventloop_ev != NULL) {
  282. event_active_later_(listener->eventloop_ev, EV_READ);
  283. }
  284. }
  285. tor_mutex_release(&listener->lock);
  286. }
  287. static void
  288. event_listener_wakeup(event_listener_t *listener)
  289. {
  290. tor_assert(listener != NULL);
  291. tor_mutex_acquire(&listener->lock);
  292. if (!listener->is_pending && !TOR_TAILQ_EMPTY(&listener->pending_events)) {
  293. // not pending but have waiting events
  294. listener->is_pending = true;
  295. if (listener->eventloop_ev != NULL) {
  296. event_active_later_(listener->eventloop_ev, EV_READ);
  297. }
  298. }
  299. tor_mutex_release(&listener->lock);
  300. }
  301. void
  302. event_listener_process(event_listener_t *listener)
  303. {
  304. tor_assert(listener != NULL);
  305. int counter = 0;
  306. tor_mutex_acquire(&listener->lock);
  307. void *context = listener->context;
  308. int max_iterations = listener->max_iterations;
  309. while (!TOR_TAILQ_EMPTY(&listener->pending_events) &&
  310. (max_iterations < 0 || counter < max_iterations)) {
  311. event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
  312. TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
  313. tor_assert(wrapper != NULL);
  314. process_event_fn_t process_event_fn = NULL;
  315. int index = (int)wrapper->label;
  316. // do we have a callback for this event label?
  317. if (PREDICT_LIKELY(index < smartlist_len(listener->callbacks))) {
  318. event_callback_t *cb = smartlist_get(listener->callbacks, index);
  319. if (cb != NULL) {
  320. process_event_fn = cb->process_event_fn;
  321. }
  322. }
  323. tor_mutex_release(&listener->lock);
  324. if (PREDICT_LIKELY(process_event_fn != NULL)) {
  325. process_event_fn(wrapper->label, wrapper->data, context);
  326. counter += 1;
  327. // only increase the counter if a callback was run
  328. } else {
  329. // no callback available
  330. log_warn(LD_BUG, "An event was received but had no callback");
  331. }
  332. event_wrapper_free(wrapper);
  333. tor_mutex_acquire(&listener->lock);
  334. }
  335. if (TOR_TAILQ_EMPTY(&listener->pending_events)) {
  336. listener->is_pending = false;
  337. } else {
  338. event_active_later_(listener->eventloop_ev, EV_READ);
  339. }
  340. tor_mutex_release(&listener->lock);
  341. }
  342. /**************************/
  343. event_source_t *
  344. event_source_new(void)
  345. {
  346. event_source_t* source = tor_malloc_zero(sizeof(event_source_t));
  347. tor_mutex_init(&source->lock);
  348. source->deliver_silently = smartlist_new();
  349. source->subscriptions = smartlist_new();
  350. return source;
  351. }
  352. void
  353. event_source_free(event_source_t *source)
  354. {
  355. tor_assert(source != NULL);
  356. tor_mutex_uninit(&source->lock);
  357. SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
  358. if (sub != NULL) {
  359. event_subscription_free(sub);
  360. }
  361. } SMARTLIST_FOREACH_END(sub);
  362. smartlist_free(source->subscriptions);
  363. smartlist_free(source->deliver_silently);
  364. memset(source, 0x00, sizeof(*source));
  365. tor_free(source);
  366. }
  367. void
  368. event_source_subscribe(event_source_t *source, event_listener_t *listener,
  369. event_label_t label)
  370. {
  371. tor_assert(source != NULL);
  372. tor_assert(listener != NULL);
  373. tor_assert(label != EVENT_LABEL_UNSET);
  374. int index = (int)label;
  375. tor_assert(index >= 0);
  376. if (index >= 1000) {
  377. log_warn(LD_BUG, "An event label was very large (%d), but the event source "
  378. "assumes that event labels are small.", index);
  379. /* We're using a smartlist as a lookup table, and assume that the labels are
  380. small and therefore the list should not be sparse. If the label is large,
  381. then we either have *many* events, or we're choosing our event labels
  382. inefficiently. */
  383. }
  384. event_subscription_t *sub = event_subscription_new(listener);
  385. tor_mutex_acquire(&source->lock);
  386. smartlist_grow(source->subscriptions, index+1);
  387. event_subscription_t *existing_sub = smartlist_get(source->subscriptions, index);
  388. if (existing_sub != NULL) {
  389. // we only support one listener per event type
  390. event_subscription_free(existing_sub);
  391. log_warn(LD_BUG, "We are overriding a previous listener.");
  392. }
  393. smartlist_set(source->subscriptions, index, sub);
  394. tor_mutex_release(&source->lock);
  395. }
  396. void
  397. event_source_unsubscribe(event_source_t *source, event_listener_t *listener,
  398. event_label_t label)
  399. {
  400. tor_assert(source != NULL);
  401. tor_assert(listener != NULL);
  402. tor_assert(label != EVENT_LABEL_UNSET);
  403. int index = (int)label;
  404. tor_assert(index >= 0);
  405. tor_mutex_acquire(&source->lock);
  406. if (index >= smartlist_len(source->subscriptions)) {
  407. // there are no subscribers for this event
  408. log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
  409. tor_mutex_release(&source->lock);
  410. return;
  411. }
  412. event_subscription_t *current_sub = smartlist_get(source->subscriptions, index);
  413. if (current_sub == NULL || current_sub->listener != listener) {
  414. log_warn(LD_GENERAL, "Listener wanted to unsubscribe, but was not subscribed.");
  415. tor_mutex_release(&source->lock);
  416. return;
  417. }
  418. smartlist_set(source->subscriptions, index, NULL);
  419. event_subscription_free(current_sub);
  420. tor_mutex_release(&source->lock);
  421. }
  422. void
  423. event_source_unsubscribe_all(event_source_t *source, event_listener_t *listener)
  424. {
  425. tor_assert(source != NULL);
  426. tor_assert(listener != NULL);
  427. tor_mutex_acquire(&source->lock);
  428. SMARTLIST_FOREACH_BEGIN(source->subscriptions, event_subscription_t *, sub) {
  429. if (sub != NULL && sub->listener == listener) {
  430. event_subscription_free(sub);
  431. SMARTLIST_REPLACE_CURRENT(source->subscriptions, sub, NULL);
  432. }
  433. } SMARTLIST_FOREACH_END(sub);
  434. tor_mutex_release(&source->lock);
  435. }
  436. void
  437. event_source_publish(event_source_t *source, event_label_t label,
  438. event_data_t data, void (*free_data_fn)(void *))
  439. {
  440. tor_assert(source != NULL);
  441. tor_assert(label != EVENT_LABEL_UNSET);
  442. int index = (int)label;
  443. tor_assert(index >= 0);
  444. tor_mutex_acquire(&source->lock);
  445. if (index >= smartlist_len(source->subscriptions)) {
  446. // there are no subscribers for this event
  447. tor_mutex_release(&source->lock);
  448. if (free_data_fn != NULL) {
  449. free_data_fn(data.ptr);
  450. }
  451. return;
  452. }
  453. event_subscription_t *sub = smartlist_get(source->subscriptions, index);
  454. if (sub == NULL || sub->listener == NULL) {
  455. // there are no subscribers for this event
  456. tor_mutex_release(&source->lock);
  457. if (free_data_fn != NULL) {
  458. free_data_fn(data.ptr);
  459. }
  460. return;
  461. }
  462. bool deliver_silently;
  463. if (index >= smartlist_len(source->deliver_silently)) {
  464. // default is to not deliver silently
  465. deliver_silently = false;
  466. } else {
  467. deliver_silently = (smartlist_get(source->deliver_silently, index) != 0);
  468. }
  469. event_wrapper_t *wrapper = NULL;
  470. wrapper = event_wrapper_new(label, data, free_data_fn);
  471. event_listener_receive(sub->listener, label, wrapper, !deliver_silently);
  472. tor_mutex_release(&source->lock);
  473. }
  474. void
  475. event_source_deliver_silently(event_source_t *source, event_label_t label,
  476. bool deliver_silently)
  477. {
  478. tor_assert(source != NULL);
  479. tor_assert(label != EVENT_LABEL_UNSET);
  480. int index = (int)label;
  481. tor_assert(index >= 0);
  482. if (index >= 1000) {
  483. log_warn(LD_BUG, "An event label was very large (%d), but the event source "
  484. "assumes that event labels are small.", index);
  485. /* We're using a smartlist as a lookup table, and assume that the labels are
  486. small and therefore the list should not be sparse. If the label is large,
  487. then we either have *many* events, or we're choosing our event labels
  488. inefficiently. */
  489. }
  490. tor_mutex_acquire(&source->lock);
  491. smartlist_grow(source->deliver_silently, index+1);
  492. // default is to not deliver silently
  493. smartlist_set(source->deliver_silently, index, (void *)deliver_silently);
  494. tor_mutex_release(&source->lock);
  495. }
  496. void
  497. event_source_wakeup_listener(event_source_t *source, event_label_t label)
  498. {
  499. tor_assert(source != NULL);
  500. tor_assert(label != EVENT_LABEL_UNSET);
  501. int index = (int)label;
  502. tor_assert(index >= 0);
  503. tor_mutex_acquire(&source->lock);
  504. if (index >= smartlist_len(source->subscriptions)) {
  505. // there are no subscribers for this event
  506. tor_mutex_release(&source->lock);
  507. return;
  508. }
  509. event_subscription_t *sub = smartlist_get(source->subscriptions, index);
  510. if (sub == NULL || sub->listener == NULL) {
  511. // there are no subscribers for this event
  512. tor_mutex_release(&source->lock);
  513. return;
  514. }
  515. event_listener_wakeup(sub->listener);
  516. tor_mutex_release(&source->lock);
  517. }