compat_threads.c 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. /* Copyright (c) 2003-2004, Roger Dingledine
  2. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  3. * Copyright (c) 2007-2018, The Tor Project, Inc. */
  4. /* See LICENSE for licensing information */
  5. /**
  6. * \file compat_threads.c
  7. *
  8. * \brief Cross-platform threading and inter-thread communication logic.
  9. * (Platform-specific parts are written in the other compat_*threads
  10. * modules.)
  11. */
  12. #include "orconfig.h"
  13. #include <stdlib.h>
  14. #include "common/compat.h"
  15. #include "common/compat_threads.h"
  16. #include "common/util.h"
  17. #include "common/torlog.h"
  18. #ifdef HAVE_SYS_EVENTFD_H
  19. #include <sys/eventfd.h>
  20. #endif
  21. #ifdef HAVE_FCNTL_H
  22. #include <fcntl.h>
  23. #endif
  24. #ifdef HAVE_UNISTD_H
  25. #include <unistd.h>
  26. #endif
  27. /** Allocate and return a new condition variable. */
  28. tor_cond_t *
  29. tor_cond_new(void)
  30. {
  31. tor_cond_t *cond = tor_malloc(sizeof(tor_cond_t));
  32. if (BUG(tor_cond_init(cond)<0))
  33. tor_free(cond); // LCOV_EXCL_LINE
  34. return cond;
  35. }
  36. /** Free all storage held in <b>c</b>. */
  37. void
  38. tor_cond_free_(tor_cond_t *c)
  39. {
  40. if (!c)
  41. return;
  42. tor_cond_uninit(c);
  43. tor_free(c);
  44. }
  45. /** Identity of the "main" thread */
  46. static unsigned long main_thread_id = -1;
  47. /** Start considering the current thread to be the 'main thread'. This has
  48. * no effect on anything besides in_main_thread(). */
  49. void
  50. set_main_thread(void)
  51. {
  52. main_thread_id = tor_get_thread_id();
  53. }
  54. /** Return true iff called from the main thread. */
  55. int
  56. in_main_thread(void)
  57. {
  58. return main_thread_id == tor_get_thread_id();
  59. }
  60. #if defined(HAVE_EVENTFD) || defined(HAVE_PIPE)
  61. /* As write(), but retry on EINTR, and return the negative error code on
  62. * error. */
  63. static int
  64. write_ni(int fd, const void *buf, size_t n)
  65. {
  66. int r;
  67. again:
  68. r = (int) write(fd, buf, n);
  69. if (r < 0) {
  70. if (errno == EINTR)
  71. goto again;
  72. else
  73. return -errno;
  74. }
  75. return r;
  76. }
  77. /* As read(), but retry on EINTR, and return the negative error code on error.
  78. */
  79. static int
  80. read_ni(int fd, void *buf, size_t n)
  81. {
  82. int r;
  83. again:
  84. r = (int) read(fd, buf, n);
  85. if (r < 0) {
  86. if (errno == EINTR)
  87. goto again;
  88. else
  89. return -errno;
  90. }
  91. return r;
  92. }
  93. #endif /* defined(HAVE_EVENTFD) || defined(HAVE_PIPE) */
  94. /** As send(), but retry on EINTR, and return the negative error code on
  95. * error. */
  96. static int
  97. send_ni(int fd, const void *buf, size_t n, int flags)
  98. {
  99. int r;
  100. again:
  101. r = (int) send(fd, buf, n, flags);
  102. if (r < 0) {
  103. int error = tor_socket_errno(fd);
  104. if (ERRNO_IS_EINTR(error))
  105. goto again;
  106. else
  107. return -error;
  108. }
  109. return r;
  110. }
  111. /** As recv(), but retry on EINTR, and return the negative error code on
  112. * error. */
  113. static int
  114. recv_ni(int fd, void *buf, size_t n, int flags)
  115. {
  116. int r;
  117. again:
  118. r = (int) recv(fd, buf, n, flags);
  119. if (r < 0) {
  120. int error = tor_socket_errno(fd);
  121. if (ERRNO_IS_EINTR(error))
  122. goto again;
  123. else
  124. return -error;
  125. }
  126. return r;
  127. }
  128. #ifdef HAVE_EVENTFD
  129. /* Increment the event count on an eventfd <b>fd</b> */
  130. static int
  131. eventfd_alert(int fd)
  132. {
  133. uint64_t u = 1;
  134. int r = write_ni(fd, (void*)&u, sizeof(u));
  135. if (r < 0 && -r != EAGAIN)
  136. return -1;
  137. return 0;
  138. }
  139. /* Drain all events from an eventfd <b>fd</b>. */
  140. static int
  141. eventfd_drain(int fd)
  142. {
  143. uint64_t u = 0;
  144. int r = read_ni(fd, (void*)&u, sizeof(u));
  145. if (r < 0 && -r != EAGAIN)
  146. return r;
  147. return 0;
  148. }
  149. #endif /* defined(HAVE_EVENTFD) */
  150. #ifdef HAVE_PIPE
  151. /** Send a byte over a pipe. Return 0 on success or EAGAIN; -1 on error */
  152. static int
  153. pipe_alert(int fd)
  154. {
  155. ssize_t r = write_ni(fd, "x", 1);
  156. if (r < 0 && -r != EAGAIN)
  157. return (int)r;
  158. return 0;
  159. }
  160. /** Drain all input from a pipe <b>fd</b> and ignore it. Return 0 on
  161. * success, -1 on error. */
  162. static int
  163. pipe_drain(int fd)
  164. {
  165. char buf[32];
  166. ssize_t r;
  167. do {
  168. r = read_ni(fd, buf, sizeof(buf));
  169. } while (r > 0);
  170. if (r < 0 && errno != EAGAIN)
  171. return -errno;
  172. /* A value of r = 0 means EOF on the fd so successfully drained. */
  173. return 0;
  174. }
  175. #endif /* defined(HAVE_PIPE) */
  176. /** Send a byte on socket <b>fd</b>t. Return 0 on success or EAGAIN,
  177. * -1 on error. */
  178. static int
  179. sock_alert(tor_socket_t fd)
  180. {
  181. ssize_t r = send_ni(fd, "x", 1, 0);
  182. if (r < 0 && !ERRNO_IS_EAGAIN(-r))
  183. return (int)r;
  184. return 0;
  185. }
  186. /** Drain all the input from a socket <b>fd</b>, and ignore it. Return 0 on
  187. * success, -errno on error. */
  188. static int
  189. sock_drain(tor_socket_t fd)
  190. {
  191. char buf[32];
  192. ssize_t r;
  193. do {
  194. r = recv_ni(fd, buf, sizeof(buf), 0);
  195. } while (r > 0);
  196. if (r < 0 && !ERRNO_IS_EAGAIN(-r))
  197. return (int)r;
  198. /* A value of r = 0 means EOF on the fd so successfully drained. */
  199. return 0;
  200. }
  201. /** Allocate a new set of alert sockets, and set the appropriate function
  202. * pointers, in <b>socks_out</b>. */
  203. int
  204. alert_sockets_create(alert_sockets_t *socks_out, uint32_t flags)
  205. {
  206. tor_socket_t socks[2] = { TOR_INVALID_SOCKET, TOR_INVALID_SOCKET };
  207. #ifdef HAVE_EVENTFD
  208. /* First, we try the Linux eventfd() syscall. This gives a 64-bit counter
  209. * associated with a single file descriptor. */
  210. #if defined(EFD_CLOEXEC) && defined(EFD_NONBLOCK)
  211. if (!(flags & ASOCKS_NOEVENTFD2))
  212. socks[0] = eventfd(0, EFD_CLOEXEC|EFD_NONBLOCK);
  213. #endif
  214. if (socks[0] < 0 && !(flags & ASOCKS_NOEVENTFD)) {
  215. socks[0] = eventfd(0,0);
  216. if (socks[0] >= 0) {
  217. if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 ||
  218. set_socket_nonblocking(socks[0]) < 0) {
  219. // LCOV_EXCL_START -- if eventfd succeeds, fcntl will.
  220. tor_assert_nonfatal_unreached();
  221. close(socks[0]);
  222. return -1;
  223. // LCOV_EXCL_STOP
  224. }
  225. }
  226. }
  227. if (socks[0] >= 0) {
  228. socks_out->read_fd = socks_out->write_fd = socks[0];
  229. socks_out->alert_fn = eventfd_alert;
  230. socks_out->drain_fn = eventfd_drain;
  231. return 0;
  232. }
  233. #endif /* defined(HAVE_EVENTFD) */
  234. #ifdef HAVE_PIPE2
  235. /* Now we're going to try pipes. First type the pipe2() syscall, if we
  236. * have it, so we can save some calls... */
  237. if (!(flags & ASOCKS_NOPIPE2) &&
  238. pipe2(socks, O_NONBLOCK|O_CLOEXEC) == 0) {
  239. socks_out->read_fd = socks[0];
  240. socks_out->write_fd = socks[1];
  241. socks_out->alert_fn = pipe_alert;
  242. socks_out->drain_fn = pipe_drain;
  243. return 0;
  244. }
  245. #endif /* defined(HAVE_PIPE2) */
  246. #ifdef HAVE_PIPE
  247. /* Now try the regular pipe() syscall. Pipes have a bit lower overhead than
  248. * socketpairs, fwict. */
  249. if (!(flags & ASOCKS_NOPIPE) &&
  250. pipe(socks) == 0) {
  251. if (fcntl(socks[0], F_SETFD, FD_CLOEXEC) < 0 ||
  252. fcntl(socks[1], F_SETFD, FD_CLOEXEC) < 0 ||
  253. set_socket_nonblocking(socks[0]) < 0 ||
  254. set_socket_nonblocking(socks[1]) < 0) {
  255. // LCOV_EXCL_START -- if pipe succeeds, you can fcntl the output
  256. tor_assert_nonfatal_unreached();
  257. close(socks[0]);
  258. close(socks[1]);
  259. return -1;
  260. // LCOV_EXCL_STOP
  261. }
  262. socks_out->read_fd = socks[0];
  263. socks_out->write_fd = socks[1];
  264. socks_out->alert_fn = pipe_alert;
  265. socks_out->drain_fn = pipe_drain;
  266. return 0;
  267. }
  268. #endif /* defined(HAVE_PIPE) */
  269. /* If nothing else worked, fall back on socketpair(). */
  270. if (!(flags & ASOCKS_NOSOCKETPAIR) &&
  271. tor_socketpair(AF_UNIX, SOCK_STREAM, 0, socks) == 0) {
  272. if (set_socket_nonblocking(socks[0]) < 0 ||
  273. set_socket_nonblocking(socks[1])) {
  274. // LCOV_EXCL_START -- if socketpair worked, you can make it nonblocking.
  275. tor_assert_nonfatal_unreached();
  276. tor_close_socket(socks[0]);
  277. tor_close_socket(socks[1]);
  278. return -1;
  279. // LCOV_EXCL_STOP
  280. }
  281. socks_out->read_fd = socks[0];
  282. socks_out->write_fd = socks[1];
  283. socks_out->alert_fn = sock_alert;
  284. socks_out->drain_fn = sock_drain;
  285. return 0;
  286. }
  287. return -1;
  288. }
  289. /** Close the sockets in <b>socks</b>. */
  290. void
  291. alert_sockets_close(alert_sockets_t *socks)
  292. {
  293. if (socks->alert_fn == sock_alert) {
  294. /* they are sockets. */
  295. tor_close_socket(socks->read_fd);
  296. tor_close_socket(socks->write_fd);
  297. } else {
  298. close(socks->read_fd);
  299. if (socks->write_fd != socks->read_fd)
  300. close(socks->write_fd);
  301. }
  302. socks->read_fd = socks->write_fd = -1;
  303. }
  304. #ifndef HAVE_STDATOMIC_H
  305. /** Initialize a new atomic counter with the value 0 */
  306. void
  307. atomic_counter_init(atomic_counter_t *counter)
  308. {
  309. memset(counter, 0, sizeof(*counter));
  310. tor_mutex_init_nonrecursive(&counter->mutex);
  311. }
  312. /** Clean up all resources held by an atomic counter. */
  313. void
  314. atomic_counter_destroy(atomic_counter_t *counter)
  315. {
  316. tor_mutex_uninit(&counter->mutex);
  317. memset(counter, 0, sizeof(*counter));
  318. }
  319. /** Add a value to an atomic counter. */
  320. void
  321. atomic_counter_add(atomic_counter_t *counter, size_t add)
  322. {
  323. tor_mutex_acquire(&counter->mutex);
  324. counter->val += add;
  325. tor_mutex_release(&counter->mutex);
  326. }
  327. /** Subtract a value from an atomic counter. */
  328. void
  329. atomic_counter_sub(atomic_counter_t *counter, size_t sub)
  330. {
  331. // this relies on unsigned overflow, but that's fine.
  332. atomic_counter_add(counter, -sub);
  333. }
  334. /** Return the current value of an atomic counter */
  335. size_t
  336. atomic_counter_get(atomic_counter_t *counter)
  337. {
  338. size_t val;
  339. tor_mutex_acquire(&counter->mutex);
  340. val = counter->val;
  341. tor_mutex_release(&counter->mutex);
  342. return val;
  343. }
  344. /** Replace the value of an atomic counter; return the old one. */
  345. size_t
  346. atomic_counter_exchange(atomic_counter_t *counter, size_t newval)
  347. {
  348. size_t oldval;
  349. tor_mutex_acquire(&counter->mutex);
  350. oldval = counter->val;
  351. counter->val = newval;
  352. tor_mutex_release(&counter->mutex);
  353. return oldval;
  354. }
  355. #endif /* !defined(HAVE_STDATOMIC_H) */