relay.c 90 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240224122422243224422452246224722482249225022512252225322542255225622572258225922602261226222632264226522662267226822692270227122722273227422752276227722782279228022812282228322842285228622872288228922902291229222932294229522962297229822992300230123022303230423052306230723082309231023112312231323142315231623172318231923202321232223232324232523262327232823292330233123322333233423352336233723382339234023412342234323442345234623472348234923502351235223532354235523562357235823592360236123622363236423652366236723682369237023712372237323742375237623772378237923802381238223832384238523862387238823892390239123922393239423952396239723982399240024012402240324042405240624072408240924102411241224132414241524162417241824192420242124222423242424252426242724282429243024312432243324342435243624372438243924402441244224432444244524462447244824492450245124522453245424552456245724582459246024612462246324642465246624672468246924702471247224732474247524762477247824792480248124822483248424852486248724882489249024912492249324942495249624972498249925002501250225032504250525062507250825092510251125122513251425152516251725182519252025212522252325242525252625272528252925302531253225332534253525362537253825392540254125422543254425452546254725482549255025512552255325542555255625572558255925602561256225632564256525662567256825692570257125722573257425752576257725782579258025812582
  1. /* Copyright (c) 2001 Matej Pfajfar.
  2. * Copyright (c) 2001-2004, Roger Dingledine.
  3. * Copyright (c) 2004-2006, Roger Dingledine, Nick Mathewson.
  4. * Copyright (c) 2007-2010, The Tor Project, Inc. */
  5. /* See LICENSE for licensing information */
  6. /**
  7. * \file relay.c
  8. * \brief Handle relay cell encryption/decryption, plus packaging and
  9. * receiving from circuits, plus queuing on circuits.
  10. **/
  11. #include <math.h>
  12. #include "or.h"
  13. #include "buffers.h"
  14. #include "circuitbuild.h"
  15. #include "circuitlist.h"
  16. #include "config.h"
  17. #include "connection.h"
  18. #include "connection_edge.h"
  19. #include "connection_or.h"
  20. #include "control.h"
  21. #include "geoip.h"
  22. #include "main.h"
  23. #include "mempool.h"
  24. #include "networkstatus.h"
  25. #include "policies.h"
  26. #include "reasons.h"
  27. #include "relay.h"
  28. #include "rendcommon.h"
  29. #include "routerlist.h"
  30. #include "routerparse.h"
  31. static int relay_crypt(circuit_t *circ, cell_t *cell,
  32. cell_direction_t cell_direction,
  33. crypt_path_t **layer_hint, char *recognized);
  34. static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
  35. cell_direction_t cell_direction,
  36. crypt_path_t *layer_hint);
  37. static int connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
  38. edge_connection_t *conn,
  39. crypt_path_t *layer_hint);
  40. static void circuit_consider_sending_sendme(circuit_t *circ,
  41. crypt_path_t *layer_hint);
  42. static void circuit_resume_edge_reading(circuit_t *circ,
  43. crypt_path_t *layer_hint);
  44. static int circuit_resume_edge_reading_helper(edge_connection_t *conn,
  45. circuit_t *circ,
  46. crypt_path_t *layer_hint);
  47. static int circuit_consider_stop_edge_reading(circuit_t *circ,
  48. crypt_path_t *layer_hint);
  49. static int circuit_queue_streams_are_blocked(circuit_t *circ);
  50. /** Cache the current hi-res time; the cache gets reset when libevent
  51. * calls us. */
  52. static struct timeval cached_time_hires = {0, 0};
  53. /** Stop reading on edge connections when we have this many cells
  54. * waiting on the appropriate queue. */
  55. #define CELL_QUEUE_HIGHWATER_SIZE 256
  56. /** Start reading from edge connections again when we get down to this many
  57. * cells. */
  58. #define CELL_QUEUE_LOWWATER_SIZE 64
  59. static void
  60. tor_gettimeofday_cached(struct timeval *tv)
  61. {
  62. if (cached_time_hires.tv_sec == 0) {
  63. tor_gettimeofday(&cached_time_hires);
  64. }
  65. *tv = cached_time_hires;
  66. }
  67. void
  68. tor_gettimeofday_cache_clear(void)
  69. {
  70. cached_time_hires.tv_sec = 0;
  71. }
  72. /** Stats: how many relay cells have originated at this hop, or have
  73. * been relayed onward (not recognized at this hop)?
  74. */
  75. uint64_t stats_n_relay_cells_relayed = 0;
  76. /** Stats: how many relay cells have been delivered to streams at this
  77. * hop?
  78. */
  79. uint64_t stats_n_relay_cells_delivered = 0;
  80. /** Update digest from the payload of cell. Assign integrity part to
  81. * cell.
  82. */
  83. static void
  84. relay_set_digest(crypto_digest_env_t *digest, cell_t *cell)
  85. {
  86. char integrity[4];
  87. relay_header_t rh;
  88. crypto_digest_add_bytes(digest, (char*)cell->payload, CELL_PAYLOAD_SIZE);
  89. crypto_digest_get_digest(digest, integrity, 4);
  90. // log_fn(LOG_DEBUG,"Putting digest of %u %u %u %u into relay cell.",
  91. // integrity[0], integrity[1], integrity[2], integrity[3]);
  92. relay_header_unpack(&rh, cell->payload);
  93. memcpy(rh.integrity, integrity, 4);
  94. relay_header_pack(cell->payload, &rh);
  95. }
  96. /** Does the digest for this circuit indicate that this cell is for us?
  97. *
  98. * Update digest from the payload of cell (with the integrity part set
  99. * to 0). If the integrity part is valid, return 1, else restore digest
  100. * and cell to their original state and return 0.
  101. */
  102. static int
  103. relay_digest_matches(crypto_digest_env_t *digest, cell_t *cell)
  104. {
  105. char received_integrity[4], calculated_integrity[4];
  106. relay_header_t rh;
  107. crypto_digest_env_t *backup_digest=NULL;
  108. backup_digest = crypto_digest_dup(digest);
  109. relay_header_unpack(&rh, cell->payload);
  110. memcpy(received_integrity, rh.integrity, 4);
  111. memset(rh.integrity, 0, 4);
  112. relay_header_pack(cell->payload, &rh);
  113. // log_fn(LOG_DEBUG,"Reading digest of %u %u %u %u from relay cell.",
  114. // received_integrity[0], received_integrity[1],
  115. // received_integrity[2], received_integrity[3]);
  116. crypto_digest_add_bytes(digest, (char*) cell->payload, CELL_PAYLOAD_SIZE);
  117. crypto_digest_get_digest(digest, calculated_integrity, 4);
  118. if (memcmp(received_integrity, calculated_integrity, 4)) {
  119. // log_fn(LOG_INFO,"Recognized=0 but bad digest. Not recognizing.");
  120. // (%d vs %d).", received_integrity, calculated_integrity);
  121. /* restore digest to its old form */
  122. crypto_digest_assign(digest, backup_digest);
  123. /* restore the relay header */
  124. memcpy(rh.integrity, received_integrity, 4);
  125. relay_header_pack(cell->payload, &rh);
  126. crypto_free_digest_env(backup_digest);
  127. return 0;
  128. }
  129. crypto_free_digest_env(backup_digest);
  130. return 1;
  131. }
  132. /** Apply <b>cipher</b> to CELL_PAYLOAD_SIZE bytes of <b>in</b>
  133. * (in place).
  134. *
  135. * If <b>encrypt_mode</b> is 1 then encrypt, else decrypt.
  136. *
  137. * Return -1 if the crypto fails, else return 0.
  138. */
  139. static int
  140. relay_crypt_one_payload(crypto_cipher_env_t *cipher, uint8_t *in,
  141. int encrypt_mode)
  142. {
  143. int r;
  144. (void)encrypt_mode;
  145. r = crypto_cipher_crypt_inplace(cipher, (char*) in, CELL_PAYLOAD_SIZE);
  146. if (r) {
  147. log_warn(LD_BUG,"Error during relay encryption");
  148. return -1;
  149. }
  150. return 0;
  151. }
  152. /** Receive a relay cell:
  153. * - Crypt it (encrypt if headed toward the origin or if we <b>are</b> the
  154. * origin; decrypt if we're headed toward the exit).
  155. * - Check if recognized (if exitward).
  156. * - If recognized and the digest checks out, then find if there's a stream
  157. * that the cell is intended for, and deliver it to the right
  158. * connection_edge.
  159. * - If not recognized, then we need to relay it: append it to the appropriate
  160. * cell_queue on <b>circ</b>.
  161. *
  162. * Return -<b>reason</b> on failure.
  163. */
  164. int
  165. circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
  166. cell_direction_t cell_direction)
  167. {
  168. or_connection_t *or_conn=NULL;
  169. crypt_path_t *layer_hint=NULL;
  170. char recognized=0;
  171. int reason;
  172. tor_assert(cell);
  173. tor_assert(circ);
  174. tor_assert(cell_direction == CELL_DIRECTION_OUT ||
  175. cell_direction == CELL_DIRECTION_IN);
  176. if (circ->marked_for_close)
  177. return 0;
  178. if (relay_crypt(circ, cell, cell_direction, &layer_hint, &recognized) < 0) {
  179. log_warn(LD_BUG,"relay crypt failed. Dropping connection.");
  180. return -END_CIRC_REASON_INTERNAL;
  181. }
  182. if (recognized) {
  183. edge_connection_t *conn = relay_lookup_conn(circ, cell, cell_direction,
  184. layer_hint);
  185. if (cell_direction == CELL_DIRECTION_OUT) {
  186. ++stats_n_relay_cells_delivered;
  187. log_debug(LD_OR,"Sending away from origin.");
  188. if ((reason=connection_edge_process_relay_cell(cell, circ, conn, NULL))
  189. < 0) {
  190. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  191. "connection_edge_process_relay_cell (away from origin) "
  192. "failed.");
  193. return reason;
  194. }
  195. }
  196. if (cell_direction == CELL_DIRECTION_IN) {
  197. ++stats_n_relay_cells_delivered;
  198. log_debug(LD_OR,"Sending to origin.");
  199. if ((reason = connection_edge_process_relay_cell(cell, circ, conn,
  200. layer_hint)) < 0) {
  201. log_warn(LD_OR,
  202. "connection_edge_process_relay_cell (at origin) failed.");
  203. return reason;
  204. }
  205. }
  206. return 0;
  207. }
  208. /* not recognized. pass it on. */
  209. if (cell_direction == CELL_DIRECTION_OUT) {
  210. cell->circ_id = circ->n_circ_id; /* switch it */
  211. or_conn = circ->n_conn;
  212. } else if (! CIRCUIT_IS_ORIGIN(circ)) {
  213. cell->circ_id = TO_OR_CIRCUIT(circ)->p_circ_id; /* switch it */
  214. or_conn = TO_OR_CIRCUIT(circ)->p_conn;
  215. } else {
  216. log_fn(LOG_PROTOCOL_WARN, LD_OR,
  217. "Dropping unrecognized inbound cell on origin circuit.");
  218. return 0;
  219. }
  220. if (!or_conn) {
  221. // XXXX Can this splice stuff be done more cleanly?
  222. if (! CIRCUIT_IS_ORIGIN(circ) &&
  223. TO_OR_CIRCUIT(circ)->rend_splice &&
  224. cell_direction == CELL_DIRECTION_OUT) {
  225. or_circuit_t *splice = TO_OR_CIRCUIT(circ)->rend_splice;
  226. tor_assert(circ->purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED);
  227. tor_assert(splice->_base.purpose == CIRCUIT_PURPOSE_REND_ESTABLISHED);
  228. cell->circ_id = splice->p_circ_id;
  229. cell->command = CELL_RELAY; /* can't be relay_early anyway */
  230. if ((reason = circuit_receive_relay_cell(cell, TO_CIRCUIT(splice),
  231. CELL_DIRECTION_IN)) < 0) {
  232. log_warn(LD_REND, "Error relaying cell across rendezvous; closing "
  233. "circuits");
  234. /* XXXX Do this here, or just return -1? */
  235. circuit_mark_for_close(circ, -reason);
  236. return reason;
  237. }
  238. return 0;
  239. }
  240. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  241. "Didn't recognize cell, but circ stops here! Closing circ.");
  242. return -END_CIRC_REASON_TORPROTOCOL;
  243. }
  244. log_debug(LD_OR,"Passing on unrecognized cell.");
  245. ++stats_n_relay_cells_relayed; /* XXXX no longer quite accurate {cells}
  246. * we might kill the circ before we relay
  247. * the cells. */
  248. append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0);
  249. return 0;
  250. }
  251. /** Do the appropriate en/decryptions for <b>cell</b> arriving on
  252. * <b>circ</b> in direction <b>cell_direction</b>.
  253. *
  254. * If cell_direction == CELL_DIRECTION_IN:
  255. * - If we're at the origin (we're the OP), for hops 1..N,
  256. * decrypt cell. If recognized, stop.
  257. * - Else (we're not the OP), encrypt one hop. Cell is not recognized.
  258. *
  259. * If cell_direction == CELL_DIRECTION_OUT:
  260. * - decrypt one hop. Check if recognized.
  261. *
  262. * If cell is recognized, set *recognized to 1, and set
  263. * *layer_hint to the hop that recognized it.
  264. *
  265. * Return -1 to indicate that we should mark the circuit for close,
  266. * else return 0.
  267. */
  268. static int
  269. relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction,
  270. crypt_path_t **layer_hint, char *recognized)
  271. {
  272. relay_header_t rh;
  273. tor_assert(circ);
  274. tor_assert(cell);
  275. tor_assert(recognized);
  276. tor_assert(cell_direction == CELL_DIRECTION_IN ||
  277. cell_direction == CELL_DIRECTION_OUT);
  278. if (cell_direction == CELL_DIRECTION_IN) {
  279. if (CIRCUIT_IS_ORIGIN(circ)) { /* We're at the beginning of the circuit.
  280. * We'll want to do layered decrypts. */
  281. crypt_path_t *thishop, *cpath = TO_ORIGIN_CIRCUIT(circ)->cpath;
  282. thishop = cpath;
  283. if (thishop->state != CPATH_STATE_OPEN) {
  284. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  285. "Relay cell before first created cell? Closing.");
  286. return -1;
  287. }
  288. do { /* Remember: cpath is in forward order, that is, first hop first. */
  289. tor_assert(thishop);
  290. if (relay_crypt_one_payload(thishop->b_crypto, cell->payload, 0) < 0)
  291. return -1;
  292. relay_header_unpack(&rh, cell->payload);
  293. if (rh.recognized == 0) {
  294. /* it's possibly recognized. have to check digest to be sure. */
  295. if (relay_digest_matches(thishop->b_digest, cell)) {
  296. *recognized = 1;
  297. *layer_hint = thishop;
  298. return 0;
  299. }
  300. }
  301. thishop = thishop->next;
  302. } while (thishop != cpath && thishop->state == CPATH_STATE_OPEN);
  303. log_fn(LOG_PROTOCOL_WARN, LD_OR,
  304. "Incoming cell at client not recognized. Closing.");
  305. return -1;
  306. } else { /* we're in the middle. Just one crypt. */
  307. if (relay_crypt_one_payload(TO_OR_CIRCUIT(circ)->p_crypto,
  308. cell->payload, 1) < 0)
  309. return -1;
  310. // log_fn(LOG_DEBUG,"Skipping recognized check, because we're not "
  311. // "the client.");
  312. }
  313. } else /* cell_direction == CELL_DIRECTION_OUT */ {
  314. /* we're in the middle. Just one crypt. */
  315. if (relay_crypt_one_payload(TO_OR_CIRCUIT(circ)->n_crypto,
  316. cell->payload, 0) < 0)
  317. return -1;
  318. relay_header_unpack(&rh, cell->payload);
  319. if (rh.recognized == 0) {
  320. /* it's possibly recognized. have to check digest to be sure. */
  321. if (relay_digest_matches(TO_OR_CIRCUIT(circ)->n_digest, cell)) {
  322. *recognized = 1;
  323. return 0;
  324. }
  325. }
  326. }
  327. return 0;
  328. }
  329. /** Package a relay cell from an edge:
  330. * - Encrypt it to the right layer
  331. * - Append it to the appropriate cell_queue on <b>circ</b>.
  332. */
  333. static int
  334. circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
  335. cell_direction_t cell_direction,
  336. crypt_path_t *layer_hint, streamid_t on_stream)
  337. {
  338. or_connection_t *conn; /* where to send the cell */
  339. if (cell_direction == CELL_DIRECTION_OUT) {
  340. crypt_path_t *thishop; /* counter for repeated crypts */
  341. conn = circ->n_conn;
  342. if (!CIRCUIT_IS_ORIGIN(circ) || !conn) {
  343. log_warn(LD_BUG,"outgoing relay cell has n_conn==NULL. Dropping.");
  344. return 0; /* just drop it */
  345. }
  346. relay_set_digest(layer_hint->f_digest, cell);
  347. thishop = layer_hint;
  348. /* moving from farthest to nearest hop */
  349. do {
  350. tor_assert(thishop);
  351. /* XXXX RD This is a bug, right? */
  352. log_debug(LD_OR,"crypting a layer of the relay cell.");
  353. if (relay_crypt_one_payload(thishop->f_crypto, cell->payload, 1) < 0) {
  354. return -1;
  355. }
  356. thishop = thishop->prev;
  357. } while (thishop != TO_ORIGIN_CIRCUIT(circ)->cpath->prev);
  358. } else { /* incoming cell */
  359. or_circuit_t *or_circ;
  360. if (CIRCUIT_IS_ORIGIN(circ)) {
  361. /* We should never package an _incoming_ cell from the circuit
  362. * origin; that means we messed up somewhere. */
  363. log_warn(LD_BUG,"incoming relay cell at origin circuit. Dropping.");
  364. assert_circuit_ok(circ);
  365. return 0; /* just drop it */
  366. }
  367. or_circ = TO_OR_CIRCUIT(circ);
  368. conn = or_circ->p_conn;
  369. relay_set_digest(or_circ->p_digest, cell);
  370. if (relay_crypt_one_payload(or_circ->p_crypto, cell->payload, 1) < 0)
  371. return -1;
  372. }
  373. ++stats_n_relay_cells_relayed;
  374. append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream);
  375. return 0;
  376. }
  377. /** If cell's stream_id matches the stream_id of any conn that's
  378. * attached to circ, return that conn, else return NULL.
  379. */
  380. static edge_connection_t *
  381. relay_lookup_conn(circuit_t *circ, cell_t *cell,
  382. cell_direction_t cell_direction, crypt_path_t *layer_hint)
  383. {
  384. edge_connection_t *tmpconn;
  385. relay_header_t rh;
  386. relay_header_unpack(&rh, cell->payload);
  387. if (!rh.stream_id)
  388. return NULL;
  389. /* IN or OUT cells could have come from either direction, now
  390. * that we allow rendezvous *to* an OP.
  391. */
  392. if (CIRCUIT_IS_ORIGIN(circ)) {
  393. for (tmpconn = TO_ORIGIN_CIRCUIT(circ)->p_streams; tmpconn;
  394. tmpconn=tmpconn->next_stream) {
  395. if (rh.stream_id == tmpconn->stream_id &&
  396. !tmpconn->_base.marked_for_close &&
  397. tmpconn->cpath_layer == layer_hint) {
  398. log_debug(LD_APP,"found conn for stream %d.", rh.stream_id);
  399. return tmpconn;
  400. }
  401. }
  402. } else {
  403. for (tmpconn = TO_OR_CIRCUIT(circ)->n_streams; tmpconn;
  404. tmpconn=tmpconn->next_stream) {
  405. if (rh.stream_id == tmpconn->stream_id &&
  406. !tmpconn->_base.marked_for_close) {
  407. log_debug(LD_EXIT,"found conn for stream %d.", rh.stream_id);
  408. if (cell_direction == CELL_DIRECTION_OUT ||
  409. connection_edge_is_rendezvous_stream(tmpconn))
  410. return tmpconn;
  411. }
  412. }
  413. for (tmpconn = TO_OR_CIRCUIT(circ)->resolving_streams; tmpconn;
  414. tmpconn=tmpconn->next_stream) {
  415. if (rh.stream_id == tmpconn->stream_id &&
  416. !tmpconn->_base.marked_for_close) {
  417. log_debug(LD_EXIT,"found conn for stream %d.", rh.stream_id);
  418. return tmpconn;
  419. }
  420. }
  421. }
  422. return NULL; /* probably a begin relay cell */
  423. }
  424. /** Pack the relay_header_t host-order structure <b>src</b> into
  425. * network-order in the buffer <b>dest</b>. See tor-spec.txt for details
  426. * about the wire format.
  427. */
  428. void
  429. relay_header_pack(uint8_t *dest, const relay_header_t *src)
  430. {
  431. set_uint8(dest, src->command);
  432. set_uint16(dest+1, htons(src->recognized));
  433. set_uint16(dest+3, htons(src->stream_id));
  434. memcpy(dest+5, src->integrity, 4);
  435. set_uint16(dest+9, htons(src->length));
  436. }
  437. /** Unpack the network-order buffer <b>src</b> into a host-order
  438. * relay_header_t structure <b>dest</b>.
  439. */
  440. void
  441. relay_header_unpack(relay_header_t *dest, const uint8_t *src)
  442. {
  443. dest->command = get_uint8(src);
  444. dest->recognized = ntohs(get_uint16(src+1));
  445. dest->stream_id = ntohs(get_uint16(src+3));
  446. memcpy(dest->integrity, src+5, 4);
  447. dest->length = ntohs(get_uint16(src+9));
  448. }
  449. /** Convert the relay <b>command</b> into a human-readable string. */
  450. static const char *
  451. relay_command_to_string(uint8_t command)
  452. {
  453. switch (command) {
  454. case RELAY_COMMAND_BEGIN: return "BEGIN";
  455. case RELAY_COMMAND_DATA: return "DATA";
  456. case RELAY_COMMAND_END: return "END";
  457. case RELAY_COMMAND_CONNECTED: return "CONNECTED";
  458. case RELAY_COMMAND_SENDME: return "SENDME";
  459. case RELAY_COMMAND_EXTEND: return "EXTEND";
  460. case RELAY_COMMAND_EXTENDED: return "EXTENDED";
  461. case RELAY_COMMAND_TRUNCATE: return "TRUNCATE";
  462. case RELAY_COMMAND_TRUNCATED: return "TRUNCATED";
  463. case RELAY_COMMAND_DROP: return "DROP";
  464. case RELAY_COMMAND_RESOLVE: return "RESOLVE";
  465. case RELAY_COMMAND_RESOLVED: return "RESOLVED";
  466. case RELAY_COMMAND_BEGIN_DIR: return "BEGIN_DIR";
  467. case RELAY_COMMAND_ESTABLISH_INTRO: return "ESTABLISH_INTRO";
  468. case RELAY_COMMAND_ESTABLISH_RENDEZVOUS: return "ESTABLISH_RENDEZVOUS";
  469. case RELAY_COMMAND_INTRODUCE1: return "INTRODUCE1";
  470. case RELAY_COMMAND_INTRODUCE2: return "INTRODUCE2";
  471. case RELAY_COMMAND_RENDEZVOUS1: return "RENDEZVOUS1";
  472. case RELAY_COMMAND_RENDEZVOUS2: return "RENDEZVOUS2";
  473. case RELAY_COMMAND_INTRO_ESTABLISHED: return "INTRO_ESTABLISHED";
  474. case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED:
  475. return "RENDEZVOUS_ESTABLISHED";
  476. case RELAY_COMMAND_INTRODUCE_ACK: return "INTRODUCE_ACK";
  477. default: return "(unrecognized)";
  478. }
  479. }
  480. /** Make a relay cell out of <b>relay_command</b> and <b>payload</b>, and send
  481. * it onto the open circuit <b>circ</b>. <b>stream_id</b> is the ID on
  482. * <b>circ</b> for the stream that's sending the relay cell, or 0 if it's a
  483. * control cell. <b>cpath_layer</b> is NULL for OR->OP cells, or the
  484. * destination hop for OP->OR cells.
  485. *
  486. * If you can't send the cell, mark the circuit for close and return -1. Else
  487. * return 0.
  488. */
  489. int
  490. relay_send_command_from_edge(streamid_t stream_id, circuit_t *circ,
  491. uint8_t relay_command, const char *payload,
  492. size_t payload_len, crypt_path_t *cpath_layer)
  493. {
  494. cell_t cell;
  495. relay_header_t rh;
  496. cell_direction_t cell_direction;
  497. /* XXXX NM Split this function into a separate versions per circuit type? */
  498. tor_assert(circ);
  499. tor_assert(payload_len <= RELAY_PAYLOAD_SIZE);
  500. memset(&cell, 0, sizeof(cell_t));
  501. cell.command = CELL_RELAY;
  502. if (cpath_layer) {
  503. cell.circ_id = circ->n_circ_id;
  504. cell_direction = CELL_DIRECTION_OUT;
  505. } else if (! CIRCUIT_IS_ORIGIN(circ)) {
  506. cell.circ_id = TO_OR_CIRCUIT(circ)->p_circ_id;
  507. cell_direction = CELL_DIRECTION_IN;
  508. } else {
  509. return -1;
  510. }
  511. memset(&rh, 0, sizeof(rh));
  512. rh.command = relay_command;
  513. rh.stream_id = stream_id;
  514. rh.length = payload_len;
  515. relay_header_pack(cell.payload, &rh);
  516. if (payload_len)
  517. memcpy(cell.payload+RELAY_HEADER_SIZE, payload, payload_len);
  518. log_debug(LD_OR,"delivering %d cell %s.", relay_command,
  519. cell_direction == CELL_DIRECTION_OUT ? "forward" : "backward");
  520. /* If we are sending an END cell and this circuit is used for a tunneled
  521. * directory request, advance its state. */
  522. if (relay_command == RELAY_COMMAND_END && circ->dirreq_id)
  523. geoip_change_dirreq_state(circ->dirreq_id, DIRREQ_TUNNELED,
  524. DIRREQ_END_CELL_SENT);
  525. if (cell_direction == CELL_DIRECTION_OUT && circ->n_conn) {
  526. /* if we're using relaybandwidthrate, this conn wants priority */
  527. circ->n_conn->client_used = approx_time();
  528. }
  529. if (cell_direction == CELL_DIRECTION_OUT) {
  530. origin_circuit_t *origin_circ = TO_ORIGIN_CIRCUIT(circ);
  531. if (origin_circ->remaining_relay_early_cells > 0 &&
  532. (relay_command == RELAY_COMMAND_EXTEND ||
  533. cpath_layer != origin_circ->cpath)) {
  534. /* If we've got any relay_early cells left and (we're sending
  535. * an extend cell or we're not talking to the first hop), use
  536. * one of them. Don't worry about the conn protocol version:
  537. * append_cell_to_circuit_queue will fix it up. */
  538. cell.command = CELL_RELAY_EARLY;
  539. --origin_circ->remaining_relay_early_cells;
  540. log_debug(LD_OR, "Sending a RELAY_EARLY cell; %d remaining.",
  541. (int)origin_circ->remaining_relay_early_cells);
  542. /* Memorize the command that is sent as RELAY_EARLY cell; helps debug
  543. * task 878. */
  544. origin_circ->relay_early_commands[
  545. origin_circ->relay_early_cells_sent++] = relay_command;
  546. } else if (relay_command == RELAY_COMMAND_EXTEND) {
  547. /* If no RELAY_EARLY cells can be sent over this circuit, log which
  548. * commands have been sent as RELAY_EARLY cells before; helps debug
  549. * task 878. */
  550. smartlist_t *commands_list = smartlist_create();
  551. int i = 0;
  552. char *commands = NULL;
  553. for (; i < origin_circ->relay_early_cells_sent; i++)
  554. smartlist_add(commands_list, (char *)
  555. relay_command_to_string(origin_circ->relay_early_commands[i]));
  556. commands = smartlist_join_strings(commands_list, ",", 0, NULL);
  557. log_warn(LD_BUG, "Uh-oh. We're sending a RELAY_COMMAND_EXTEND cell, "
  558. "but we have run out of RELAY_EARLY cells on that circuit. "
  559. "Commands sent before: %s", commands);
  560. tor_free(commands);
  561. smartlist_free(commands_list);
  562. }
  563. }
  564. if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer,
  565. stream_id) < 0) {
  566. log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing.");
  567. circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
  568. return -1;
  569. }
  570. return 0;
  571. }
  572. /** Make a relay cell out of <b>relay_command</b> and <b>payload</b>, and
  573. * send it onto the open circuit <b>circ</b>. <b>fromconn</b> is the stream
  574. * that's sending the relay cell, or NULL if it's a control cell.
  575. * <b>cpath_layer</b> is NULL for OR->OP cells, or the destination hop
  576. * for OP->OR cells.
  577. *
  578. * If you can't send the cell, mark the circuit for close and
  579. * return -1. Else return 0.
  580. */
  581. int
  582. connection_edge_send_command(edge_connection_t *fromconn,
  583. uint8_t relay_command, const char *payload,
  584. size_t payload_len)
  585. {
  586. /* XXXX NM Split this function into a separate versions per circuit type? */
  587. circuit_t *circ;
  588. tor_assert(fromconn);
  589. circ = fromconn->on_circuit;
  590. if (fromconn->_base.marked_for_close) {
  591. log_warn(LD_BUG,
  592. "called on conn that's already marked for close at %s:%d.",
  593. fromconn->_base.marked_for_close_file,
  594. fromconn->_base.marked_for_close);
  595. return 0;
  596. }
  597. if (!circ) {
  598. if (fromconn->_base.type == CONN_TYPE_AP) {
  599. log_info(LD_APP,"no circ. Closing conn.");
  600. connection_mark_unattached_ap(fromconn, END_STREAM_REASON_INTERNAL);
  601. } else {
  602. log_info(LD_EXIT,"no circ. Closing conn.");
  603. fromconn->edge_has_sent_end = 1; /* no circ to send to */
  604. fromconn->end_reason = END_STREAM_REASON_INTERNAL;
  605. connection_mark_for_close(TO_CONN(fromconn));
  606. }
  607. return -1;
  608. }
  609. return relay_send_command_from_edge(fromconn->stream_id, circ,
  610. relay_command, payload,
  611. payload_len, fromconn->cpath_layer);
  612. }
  613. /** How many times will I retry a stream that fails due to DNS
  614. * resolve failure or misc error?
  615. */
  616. #define MAX_RESOLVE_FAILURES 3
  617. /** Return 1 if reason is something that you should retry if you
  618. * get the end cell before you've connected; else return 0. */
  619. static int
  620. edge_reason_is_retriable(int reason)
  621. {
  622. return reason == END_STREAM_REASON_HIBERNATING ||
  623. reason == END_STREAM_REASON_RESOURCELIMIT ||
  624. reason == END_STREAM_REASON_EXITPOLICY ||
  625. reason == END_STREAM_REASON_RESOLVEFAILED ||
  626. reason == END_STREAM_REASON_MISC ||
  627. reason == END_STREAM_REASON_NOROUTE;
  628. }
  629. /** Called when we receive an END cell on a stream that isn't open yet,
  630. * from the client side.
  631. * Arguments are as for connection_edge_process_relay_cell().
  632. */
  633. static int
  634. connection_ap_process_end_not_open(
  635. relay_header_t *rh, cell_t *cell, origin_circuit_t *circ,
  636. edge_connection_t *conn, crypt_path_t *layer_hint)
  637. {
  638. struct in_addr in;
  639. routerinfo_t *exitrouter;
  640. int reason = *(cell->payload+RELAY_HEADER_SIZE);
  641. int control_reason = reason | END_STREAM_REASON_FLAG_REMOTE;
  642. (void) layer_hint; /* unused */
  643. if (rh->length > 0 && edge_reason_is_retriable(reason) &&
  644. !connection_edge_is_rendezvous_stream(conn) /* avoid retry if rend */
  645. ) {
  646. log_info(LD_APP,"Address '%s' refused due to '%s'. Considering retrying.",
  647. safe_str(conn->socks_request->address),
  648. stream_end_reason_to_string(reason));
  649. exitrouter =
  650. router_get_by_digest(circ->build_state->chosen_exit->identity_digest);
  651. switch (reason) {
  652. case END_STREAM_REASON_EXITPOLICY:
  653. if (rh->length >= 5) {
  654. uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+1));
  655. int ttl;
  656. if (!addr) {
  657. log_info(LD_APP,"Address '%s' resolved to 0.0.0.0. Closing,",
  658. safe_str(conn->socks_request->address));
  659. connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
  660. return 0;
  661. }
  662. if (rh->length >= 9)
  663. ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+5));
  664. else
  665. ttl = -1;
  666. if (get_options()->ClientDNSRejectInternalAddresses &&
  667. is_internal_IP(addr, 0)) {
  668. log_info(LD_APP,"Address '%s' resolved to internal. Closing,",
  669. safe_str(conn->socks_request->address));
  670. connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
  671. return 0;
  672. }
  673. client_dns_set_addressmap(conn->socks_request->address, addr,
  674. conn->chosen_exit_name, ttl);
  675. }
  676. /* check if he *ought* to have allowed it */
  677. if (exitrouter &&
  678. (rh->length < 5 ||
  679. (tor_inet_aton(conn->socks_request->address, &in) &&
  680. !conn->chosen_exit_name))) {
  681. log_info(LD_APP,
  682. "Exitrouter '%s' seems to be more restrictive than its exit "
  683. "policy. Not using this router as exit for now.",
  684. exitrouter->nickname);
  685. policies_set_router_exitpolicy_to_reject_all(exitrouter);
  686. }
  687. /* rewrite it to an IP if we learned one. */
  688. if (addressmap_rewrite(conn->socks_request->address,
  689. sizeof(conn->socks_request->address),
  690. NULL)) {
  691. control_event_stream_status(conn, STREAM_EVENT_REMAP, 0);
  692. }
  693. if (conn->chosen_exit_optional ||
  694. conn->chosen_exit_retries) {
  695. /* stop wanting a specific exit */
  696. conn->chosen_exit_optional = 0;
  697. /* A non-zero chosen_exit_retries can happen if we set a
  698. * TrackHostExits for this address under a port that the exit
  699. * relay allows, but then try the same address with a different
  700. * port that it doesn't allow to exit. We shouldn't unregister
  701. * the mapping, since it is probably still wanted on the
  702. * original port. But now we give away to the exit relay that
  703. * we probably have a TrackHostExits on it. So be it. */
  704. conn->chosen_exit_retries = 0;
  705. tor_free(conn->chosen_exit_name); /* clears it */
  706. }
  707. if (connection_ap_detach_retriable(conn, circ, control_reason) >= 0)
  708. return 0;
  709. /* else, conn will get closed below */
  710. break;
  711. case END_STREAM_REASON_CONNECTREFUSED:
  712. if (!conn->chosen_exit_optional)
  713. break; /* break means it'll close, below */
  714. /* Else fall through: expire this circuit, clear the
  715. * chosen_exit_name field, and try again. */
  716. case END_STREAM_REASON_RESOLVEFAILED:
  717. case END_STREAM_REASON_TIMEOUT:
  718. case END_STREAM_REASON_MISC:
  719. case END_STREAM_REASON_NOROUTE:
  720. if (client_dns_incr_failures(conn->socks_request->address)
  721. < MAX_RESOLVE_FAILURES) {
  722. /* We haven't retried too many times; reattach the connection. */
  723. circuit_log_path(LOG_INFO,LD_APP,circ);
  724. tor_assert(circ->_base.timestamp_dirty);
  725. circ->_base.timestamp_dirty -= get_options()->MaxCircuitDirtiness;
  726. if (conn->chosen_exit_optional) {
  727. /* stop wanting a specific exit */
  728. conn->chosen_exit_optional = 0;
  729. tor_free(conn->chosen_exit_name); /* clears it */
  730. }
  731. if (connection_ap_detach_retriable(conn, circ, control_reason) >= 0)
  732. return 0;
  733. /* else, conn will get closed below */
  734. } else {
  735. log_notice(LD_APP,
  736. "Have tried resolving or connecting to address '%s' "
  737. "at %d different places. Giving up.",
  738. safe_str(conn->socks_request->address),
  739. MAX_RESOLVE_FAILURES);
  740. /* clear the failures, so it will have a full try next time */
  741. client_dns_clear_failures(conn->socks_request->address);
  742. }
  743. break;
  744. case END_STREAM_REASON_HIBERNATING:
  745. case END_STREAM_REASON_RESOURCELIMIT:
  746. if (exitrouter) {
  747. policies_set_router_exitpolicy_to_reject_all(exitrouter);
  748. }
  749. if (conn->chosen_exit_optional) {
  750. /* stop wanting a specific exit */
  751. conn->chosen_exit_optional = 0;
  752. tor_free(conn->chosen_exit_name); /* clears it */
  753. }
  754. if (connection_ap_detach_retriable(conn, circ, control_reason) >= 0)
  755. return 0;
  756. /* else, will close below */
  757. break;
  758. } /* end switch */
  759. log_info(LD_APP,"Giving up on retrying; conn can't be handled.");
  760. }
  761. log_info(LD_APP,
  762. "Edge got end (%s) before we're connected. Marking for close.",
  763. stream_end_reason_to_string(rh->length > 0 ? reason : -1));
  764. circuit_log_path(LOG_INFO,LD_APP,circ);
  765. /* need to test because of detach_retriable */
  766. if (!conn->_base.marked_for_close)
  767. connection_mark_unattached_ap(conn, control_reason);
  768. return 0;
  769. }
  770. /** Helper: change the socks_request-&gt;address field on conn to the
  771. * dotted-quad representation of <b>new_addr</b> (given in host order),
  772. * and send an appropriate REMAP event. */
  773. static void
  774. remap_event_helper(edge_connection_t *conn, uint32_t new_addr)
  775. {
  776. struct in_addr in;
  777. in.s_addr = htonl(new_addr);
  778. tor_inet_ntoa(&in, conn->socks_request->address,
  779. sizeof(conn->socks_request->address));
  780. control_event_stream_status(conn, STREAM_EVENT_REMAP,
  781. REMAP_STREAM_SOURCE_EXIT);
  782. }
  783. /** An incoming relay cell has arrived from circuit <b>circ</b> to
  784. * stream <b>conn</b>.
  785. *
  786. * The arguments here are the same as in
  787. * connection_edge_process_relay_cell() below; this function is called
  788. * from there when <b>conn</b> is defined and not in an open state.
  789. */
  790. static int
  791. connection_edge_process_relay_cell_not_open(
  792. relay_header_t *rh, cell_t *cell, circuit_t *circ,
  793. edge_connection_t *conn, crypt_path_t *layer_hint)
  794. {
  795. if (rh->command == RELAY_COMMAND_END) {
  796. if (CIRCUIT_IS_ORIGIN(circ) && conn->_base.type == CONN_TYPE_AP) {
  797. return connection_ap_process_end_not_open(rh, cell,
  798. TO_ORIGIN_CIRCUIT(circ), conn,
  799. layer_hint);
  800. } else {
  801. /* we just got an 'end', don't need to send one */
  802. conn->edge_has_sent_end = 1;
  803. conn->end_reason = *(cell->payload+RELAY_HEADER_SIZE) |
  804. END_STREAM_REASON_FLAG_REMOTE;
  805. connection_mark_for_close(TO_CONN(conn));
  806. return 0;
  807. }
  808. }
  809. if (conn->_base.type == CONN_TYPE_AP &&
  810. rh->command == RELAY_COMMAND_CONNECTED) {
  811. tor_assert(CIRCUIT_IS_ORIGIN(circ));
  812. if (conn->_base.state != AP_CONN_STATE_CONNECT_WAIT) {
  813. log_fn(LOG_PROTOCOL_WARN, LD_APP,
  814. "Got 'connected' while not in state connect_wait. Dropping.");
  815. return 0;
  816. }
  817. conn->_base.state = AP_CONN_STATE_OPEN;
  818. log_info(LD_APP,"'connected' received after %d seconds.",
  819. (int)(time(NULL) - conn->_base.timestamp_lastread));
  820. if (rh->length >= 4) {
  821. uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE));
  822. int ttl;
  823. if (!addr || (get_options()->ClientDNSRejectInternalAddresses &&
  824. is_internal_IP(addr, 0))) {
  825. char buf[INET_NTOA_BUF_LEN];
  826. struct in_addr a;
  827. a.s_addr = htonl(addr);
  828. tor_inet_ntoa(&a, buf, sizeof(buf));
  829. log_info(LD_APP,
  830. "...but it claims the IP address was %s. Closing.", buf);
  831. connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
  832. connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
  833. return 0;
  834. }
  835. if (rh->length >= 8)
  836. ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+4));
  837. else
  838. ttl = -1;
  839. client_dns_set_addressmap(conn->socks_request->address, addr,
  840. conn->chosen_exit_name, ttl);
  841. remap_event_helper(conn, addr);
  842. }
  843. circuit_log_path(LOG_INFO,LD_APP,TO_ORIGIN_CIRCUIT(circ));
  844. /* don't send a socks reply to transparent conns */
  845. if (!conn->socks_request->has_finished)
  846. connection_ap_handshake_socks_reply(conn, NULL, 0, 0);
  847. /* Was it a linked dir conn? If so, a dir request just started to
  848. * fetch something; this could be a bootstrap status milestone. */
  849. log_debug(LD_APP, "considering");
  850. if (TO_CONN(conn)->linked_conn &&
  851. TO_CONN(conn)->linked_conn->type == CONN_TYPE_DIR) {
  852. connection_t *dirconn = TO_CONN(conn)->linked_conn;
  853. log_debug(LD_APP, "it is! %d", dirconn->purpose);
  854. switch (dirconn->purpose) {
  855. case DIR_PURPOSE_FETCH_CERTIFICATE:
  856. if (consensus_is_waiting_for_certs())
  857. control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_KEYS, 0);
  858. break;
  859. case DIR_PURPOSE_FETCH_CONSENSUS:
  860. control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_STATUS, 0);
  861. break;
  862. case DIR_PURPOSE_FETCH_SERVERDESC:
  863. control_event_bootstrap(BOOTSTRAP_STATUS_LOADING_DESCRIPTORS,
  864. count_loading_descriptors_progress());
  865. break;
  866. }
  867. }
  868. /* handle anything that might have queued */
  869. if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) {
  870. /* (We already sent an end cell if possible) */
  871. connection_mark_for_close(TO_CONN(conn));
  872. return 0;
  873. }
  874. return 0;
  875. }
  876. if (conn->_base.type == CONN_TYPE_AP &&
  877. rh->command == RELAY_COMMAND_RESOLVED) {
  878. int ttl;
  879. int answer_len;
  880. uint8_t answer_type;
  881. if (conn->_base.state != AP_CONN_STATE_RESOLVE_WAIT) {
  882. log_fn(LOG_PROTOCOL_WARN, LD_APP, "Got a 'resolved' cell while "
  883. "not in state resolve_wait. Dropping.");
  884. return 0;
  885. }
  886. tor_assert(SOCKS_COMMAND_IS_RESOLVE(conn->socks_request->command));
  887. answer_len = cell->payload[RELAY_HEADER_SIZE+1];
  888. if (rh->length < 2 || answer_len+2>rh->length) {
  889. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  890. "Dropping malformed 'resolved' cell");
  891. connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
  892. return 0;
  893. }
  894. answer_type = cell->payload[RELAY_HEADER_SIZE];
  895. if (rh->length >= answer_len+6)
  896. ttl = (int)ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+
  897. 2+answer_len));
  898. else
  899. ttl = -1;
  900. if (answer_type == RESOLVED_TYPE_IPV4 && answer_len == 4) {
  901. uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
  902. if (get_options()->ClientDNSRejectInternalAddresses &&
  903. is_internal_IP(addr, 0)) {
  904. char buf[INET_NTOA_BUF_LEN];
  905. struct in_addr a;
  906. a.s_addr = htonl(addr);
  907. tor_inet_ntoa(&a, buf, sizeof(buf));
  908. log_info(LD_APP,"Got a resolve with answer %s. Rejecting.", buf);
  909. connection_ap_handshake_socks_resolved(conn,
  910. RESOLVED_TYPE_ERROR_TRANSIENT,
  911. 0, NULL, 0, TIME_MAX);
  912. connection_mark_unattached_ap(conn, END_STREAM_REASON_TORPROTOCOL);
  913. return 0;
  914. }
  915. }
  916. connection_ap_handshake_socks_resolved(conn,
  917. answer_type,
  918. cell->payload[RELAY_HEADER_SIZE+1], /*answer_len*/
  919. cell->payload+RELAY_HEADER_SIZE+2, /*answer*/
  920. ttl,
  921. -1);
  922. if (answer_type == RESOLVED_TYPE_IPV4 && answer_len == 4) {
  923. uint32_t addr = ntohl(get_uint32(cell->payload+RELAY_HEADER_SIZE+2));
  924. remap_event_helper(conn, addr);
  925. }
  926. connection_mark_unattached_ap(conn,
  927. END_STREAM_REASON_DONE |
  928. END_STREAM_REASON_FLAG_ALREADY_SOCKS_REPLIED);
  929. return 0;
  930. }
  931. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  932. "Got an unexpected relay command %d, in state %d (%s). Dropping.",
  933. rh->command, conn->_base.state,
  934. conn_state_to_string(conn->_base.type, conn->_base.state));
  935. return 0; /* for forward compatibility, don't kill the circuit */
  936. // connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
  937. // connection_mark_for_close(conn);
  938. // return -1;
  939. }
  940. /** An incoming relay cell has arrived on circuit <b>circ</b>. If
  941. * <b>conn</b> is NULL this is a control cell, else <b>cell</b> is
  942. * destined for <b>conn</b>.
  943. *
  944. * If <b>layer_hint</b> is defined, then we're the origin of the
  945. * circuit, and it specifies the hop that packaged <b>cell</b>.
  946. *
  947. * Return -reason if you want to warn and tear down the circuit, else 0.
  948. */
  949. static int
  950. connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
  951. edge_connection_t *conn,
  952. crypt_path_t *layer_hint)
  953. {
  954. static int num_seen=0;
  955. relay_header_t rh;
  956. unsigned domain = layer_hint?LD_APP:LD_EXIT;
  957. int reason;
  958. tor_assert(cell);
  959. tor_assert(circ);
  960. relay_header_unpack(&rh, cell->payload);
  961. // log_fn(LOG_DEBUG,"command %d stream %d", rh.command, rh.stream_id);
  962. num_seen++;
  963. log_debug(domain, "Now seen %d relay cells here (command %d, stream %d).",
  964. num_seen, rh.command, rh.stream_id);
  965. if (rh.length > RELAY_PAYLOAD_SIZE) {
  966. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  967. "Relay cell length field too long. Closing circuit.");
  968. return - END_CIRC_REASON_TORPROTOCOL;
  969. }
  970. /* either conn is NULL, in which case we've got a control cell, or else
  971. * conn points to the recognized stream. */
  972. if (conn && !connection_state_is_open(TO_CONN(conn)))
  973. return connection_edge_process_relay_cell_not_open(
  974. &rh, cell, circ, conn, layer_hint);
  975. switch (rh.command) {
  976. case RELAY_COMMAND_DROP:
  977. // log_info(domain,"Got a relay-level padding cell. Dropping.");
  978. return 0;
  979. case RELAY_COMMAND_BEGIN:
  980. case RELAY_COMMAND_BEGIN_DIR:
  981. if (layer_hint &&
  982. circ->purpose != CIRCUIT_PURPOSE_S_REND_JOINED) {
  983. log_fn(LOG_PROTOCOL_WARN, LD_APP,
  984. "Relay begin request unsupported at AP. Dropping.");
  985. return 0;
  986. }
  987. if (circ->purpose == CIRCUIT_PURPOSE_S_REND_JOINED &&
  988. layer_hint != TO_ORIGIN_CIRCUIT(circ)->cpath->prev) {
  989. log_fn(LOG_PROTOCOL_WARN, LD_APP,
  990. "Relay begin request to Hidden Service "
  991. "from intermediary node. Dropping.");
  992. return 0;
  993. }
  994. if (conn) {
  995. log_fn(LOG_PROTOCOL_WARN, domain,
  996. "Begin cell for known stream. Dropping.");
  997. return 0;
  998. }
  999. if (rh.command == RELAY_COMMAND_BEGIN_DIR) {
  1000. /* Assign this circuit and its app-ward OR connection a unique ID,
  1001. * so that we can measure download times. The local edge and dir
  1002. * connection will be assigned the same ID when they are created
  1003. * and linked. */
  1004. static uint64_t next_id = 0;
  1005. circ->dirreq_id = ++next_id;
  1006. TO_CONN(TO_OR_CIRCUIT(circ)->p_conn)->dirreq_id = circ->dirreq_id;
  1007. }
  1008. return connection_exit_begin_conn(cell, circ);
  1009. case RELAY_COMMAND_DATA:
  1010. ++stats_n_data_cells_received;
  1011. if (( layer_hint && --layer_hint->deliver_window < 0) ||
  1012. (!layer_hint && --circ->deliver_window < 0)) {
  1013. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  1014. "(relay data) circ deliver_window below 0. Killing.");
  1015. connection_edge_end(conn, END_STREAM_REASON_TORPROTOCOL);
  1016. connection_mark_for_close(TO_CONN(conn));
  1017. return -END_CIRC_REASON_TORPROTOCOL;
  1018. }
  1019. log_debug(domain,"circ deliver_window now %d.", layer_hint ?
  1020. layer_hint->deliver_window : circ->deliver_window);
  1021. circuit_consider_sending_sendme(circ, layer_hint);
  1022. if (!conn) {
  1023. log_info(domain,"data cell dropped, unknown stream (streamid %d).",
  1024. rh.stream_id);
  1025. return 0;
  1026. }
  1027. if (--conn->deliver_window < 0) { /* is it below 0 after decrement? */
  1028. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  1029. "(relay data) conn deliver_window below 0. Killing.");
  1030. return -END_CIRC_REASON_TORPROTOCOL;
  1031. }
  1032. stats_n_data_bytes_received += rh.length;
  1033. connection_write_to_buf((char*)(cell->payload + RELAY_HEADER_SIZE),
  1034. rh.length, TO_CONN(conn));
  1035. connection_edge_consider_sending_sendme(conn);
  1036. return 0;
  1037. case RELAY_COMMAND_END:
  1038. reason = rh.length > 0 ?
  1039. get_uint8(cell->payload+RELAY_HEADER_SIZE) : END_STREAM_REASON_MISC;
  1040. if (!conn) {
  1041. log_info(domain,"end cell (%s) dropped, unknown stream.",
  1042. stream_end_reason_to_string(reason));
  1043. return 0;
  1044. }
  1045. /* XXX add to this log_fn the exit node's nickname? */
  1046. log_info(domain,"%d: end cell (%s) for stream %d. Removing stream.",
  1047. conn->_base.s,
  1048. stream_end_reason_to_string(reason),
  1049. conn->stream_id);
  1050. if (conn->socks_request && !conn->socks_request->has_finished)
  1051. log_warn(LD_BUG,
  1052. "open stream hasn't sent socks answer yet? Closing.");
  1053. /* We just *got* an end; no reason to send one. */
  1054. conn->edge_has_sent_end = 1;
  1055. if (!conn->end_reason)
  1056. conn->end_reason = reason | END_STREAM_REASON_FLAG_REMOTE;
  1057. if (!conn->_base.marked_for_close) {
  1058. /* only mark it if not already marked. it's possible to
  1059. * get the 'end' right around when the client hangs up on us. */
  1060. connection_mark_for_close(TO_CONN(conn));
  1061. conn->_base.hold_open_until_flushed = 1;
  1062. }
  1063. return 0;
  1064. case RELAY_COMMAND_EXTEND:
  1065. if (conn) {
  1066. log_fn(LOG_PROTOCOL_WARN, domain,
  1067. "'extend' cell received for non-zero stream. Dropping.");
  1068. return 0;
  1069. }
  1070. return circuit_extend(cell, circ);
  1071. case RELAY_COMMAND_EXTENDED:
  1072. if (!layer_hint) {
  1073. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  1074. "'extended' unsupported at non-origin. Dropping.");
  1075. return 0;
  1076. }
  1077. log_debug(domain,"Got an extended cell! Yay.");
  1078. if ((reason = circuit_finish_handshake(TO_ORIGIN_CIRCUIT(circ),
  1079. CELL_CREATED,
  1080. cell->payload+RELAY_HEADER_SIZE)) < 0) {
  1081. log_warn(domain,"circuit_finish_handshake failed.");
  1082. return reason;
  1083. }
  1084. if ((reason=circuit_send_next_onion_skin(TO_ORIGIN_CIRCUIT(circ)))<0) {
  1085. log_info(domain,"circuit_send_next_onion_skin() failed.");
  1086. return reason;
  1087. }
  1088. return 0;
  1089. case RELAY_COMMAND_TRUNCATE:
  1090. if (layer_hint) {
  1091. log_fn(LOG_PROTOCOL_WARN, LD_APP,
  1092. "'truncate' unsupported at origin. Dropping.");
  1093. return 0;
  1094. }
  1095. if (circ->n_conn) {
  1096. uint8_t trunc_reason = *(uint8_t*)(cell->payload + RELAY_HEADER_SIZE);
  1097. circuit_clear_cell_queue(circ, circ->n_conn);
  1098. connection_or_send_destroy(circ->n_circ_id, circ->n_conn,
  1099. trunc_reason);
  1100. circuit_set_n_circid_orconn(circ, 0, NULL);
  1101. }
  1102. log_debug(LD_EXIT, "Processed 'truncate', replying.");
  1103. {
  1104. char payload[1];
  1105. payload[0] = (char)END_CIRC_REASON_REQUESTED;
  1106. relay_send_command_from_edge(0, circ, RELAY_COMMAND_TRUNCATED,
  1107. payload, sizeof(payload), NULL);
  1108. }
  1109. return 0;
  1110. case RELAY_COMMAND_TRUNCATED:
  1111. if (!layer_hint) {
  1112. log_fn(LOG_PROTOCOL_WARN, LD_EXIT,
  1113. "'truncated' unsupported at non-origin. Dropping.");
  1114. return 0;
  1115. }
  1116. circuit_truncated(TO_ORIGIN_CIRCUIT(circ), layer_hint);
  1117. return 0;
  1118. case RELAY_COMMAND_CONNECTED:
  1119. if (conn) {
  1120. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  1121. "'connected' unsupported while open. Closing circ.");
  1122. return -END_CIRC_REASON_TORPROTOCOL;
  1123. }
  1124. log_info(domain,
  1125. "'connected' received, no conn attached anymore. Ignoring.");
  1126. return 0;
  1127. case RELAY_COMMAND_SENDME:
  1128. if (!conn) {
  1129. if (layer_hint) {
  1130. layer_hint->package_window += CIRCWINDOW_INCREMENT;
  1131. log_debug(LD_APP,"circ-level sendme at origin, packagewindow %d.",
  1132. layer_hint->package_window);
  1133. circuit_resume_edge_reading(circ, layer_hint);
  1134. } else {
  1135. circ->package_window += CIRCWINDOW_INCREMENT;
  1136. log_debug(LD_APP,
  1137. "circ-level sendme at non-origin, packagewindow %d.",
  1138. circ->package_window);
  1139. circuit_resume_edge_reading(circ, layer_hint);
  1140. }
  1141. return 0;
  1142. }
  1143. conn->package_window += STREAMWINDOW_INCREMENT;
  1144. log_debug(domain,"stream-level sendme, packagewindow now %d.",
  1145. conn->package_window);
  1146. if (circuit_queue_streams_are_blocked(circ)) {
  1147. /* Still waiting for queue to flush; don't touch conn */
  1148. return 0;
  1149. }
  1150. connection_start_reading(TO_CONN(conn));
  1151. /* handle whatever might still be on the inbuf */
  1152. if (connection_edge_package_raw_inbuf(conn, 1, NULL) < 0) {
  1153. /* (We already sent an end cell if possible) */
  1154. connection_mark_for_close(TO_CONN(conn));
  1155. return 0;
  1156. }
  1157. return 0;
  1158. case RELAY_COMMAND_RESOLVE:
  1159. if (layer_hint) {
  1160. log_fn(LOG_PROTOCOL_WARN, LD_APP,
  1161. "resolve request unsupported at AP; dropping.");
  1162. return 0;
  1163. } else if (conn) {
  1164. log_fn(LOG_PROTOCOL_WARN, domain,
  1165. "resolve request for known stream; dropping.");
  1166. return 0;
  1167. } else if (circ->purpose != CIRCUIT_PURPOSE_OR) {
  1168. log_fn(LOG_PROTOCOL_WARN, domain,
  1169. "resolve request on circ with purpose %d; dropping",
  1170. circ->purpose);
  1171. return 0;
  1172. }
  1173. connection_exit_begin_resolve(cell, TO_OR_CIRCUIT(circ));
  1174. return 0;
  1175. case RELAY_COMMAND_RESOLVED:
  1176. if (conn) {
  1177. log_fn(LOG_PROTOCOL_WARN, domain,
  1178. "'resolved' unsupported while open. Closing circ.");
  1179. return -END_CIRC_REASON_TORPROTOCOL;
  1180. }
  1181. log_info(domain,
  1182. "'resolved' received, no conn attached anymore. Ignoring.");
  1183. return 0;
  1184. case RELAY_COMMAND_ESTABLISH_INTRO:
  1185. case RELAY_COMMAND_ESTABLISH_RENDEZVOUS:
  1186. case RELAY_COMMAND_INTRODUCE1:
  1187. case RELAY_COMMAND_INTRODUCE2:
  1188. case RELAY_COMMAND_INTRODUCE_ACK:
  1189. case RELAY_COMMAND_RENDEZVOUS1:
  1190. case RELAY_COMMAND_RENDEZVOUS2:
  1191. case RELAY_COMMAND_INTRO_ESTABLISHED:
  1192. case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED:
  1193. rend_process_relay_cell(circ, layer_hint,
  1194. rh.command, rh.length,
  1195. cell->payload+RELAY_HEADER_SIZE);
  1196. return 0;
  1197. }
  1198. log_fn(LOG_PROTOCOL_WARN, LD_PROTOCOL,
  1199. "Received unknown relay command %d. Perhaps the other side is using "
  1200. "a newer version of Tor? Dropping.",
  1201. rh.command);
  1202. return 0; /* for forward compatibility, don't kill the circuit */
  1203. }
  1204. /** How many relay_data cells have we built, ever? */
  1205. uint64_t stats_n_data_cells_packaged = 0;
  1206. /** How many bytes of data have we put in relay_data cells have we built,
  1207. * ever? This would be RELAY_PAYLOAD_SIZE*stats_n_data_cells_packaged if
  1208. * every relay cell we ever sent were completely full of data. */
  1209. uint64_t stats_n_data_bytes_packaged = 0;
  1210. /** How many relay_data cells have we received, ever? */
  1211. uint64_t stats_n_data_cells_received = 0;
  1212. /** How many bytes of data have we received relay_data cells, ever? This would
  1213. * be RELAY_PAYLOAD_SIZE*stats_n_data_cells_packaged if every relay cell we
  1214. * ever received were completely full of data. */
  1215. uint64_t stats_n_data_bytes_received = 0;
  1216. /** If <b>conn</b> has an entire relay payload of bytes on its inbuf (or
  1217. * <b>package_partial</b> is true), and the appropriate package windows aren't
  1218. * empty, grab a cell and send it down the circuit.
  1219. *
  1220. * If *<b>max_cells</b> is given, package no more than max_cells. Decrement
  1221. * *<b>max_cells</b> by the number of cells packaged.
  1222. *
  1223. * Return -1 (and send a RELAY_COMMAND_END cell if necessary) if conn should
  1224. * be marked for close, else return 0.
  1225. */
  1226. int
  1227. connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
  1228. int *max_cells)
  1229. {
  1230. size_t amount_to_process, length;
  1231. char payload[CELL_PAYLOAD_SIZE];
  1232. circuit_t *circ;
  1233. unsigned domain = conn->cpath_layer ? LD_APP : LD_EXIT;
  1234. tor_assert(conn);
  1235. if (conn->_base.marked_for_close) {
  1236. log_warn(LD_BUG,
  1237. "called on conn that's already marked for close at %s:%d.",
  1238. conn->_base.marked_for_close_file, conn->_base.marked_for_close);
  1239. return 0;
  1240. }
  1241. if (max_cells && *max_cells <= 0)
  1242. return 0;
  1243. repeat_connection_edge_package_raw_inbuf:
  1244. circ = circuit_get_by_edge_conn(conn);
  1245. if (!circ) {
  1246. log_info(domain,"conn has no circuit! Closing.");
  1247. conn->end_reason = END_STREAM_REASON_CANT_ATTACH;
  1248. return -1;
  1249. }
  1250. if (circuit_consider_stop_edge_reading(circ, conn->cpath_layer))
  1251. return 0;
  1252. if (conn->package_window <= 0) {
  1253. log_info(domain,"called with package_window %d. Skipping.",
  1254. conn->package_window);
  1255. connection_stop_reading(TO_CONN(conn));
  1256. return 0;
  1257. }
  1258. amount_to_process = buf_datalen(conn->_base.inbuf);
  1259. if (!amount_to_process)
  1260. return 0;
  1261. if (!package_partial && amount_to_process < RELAY_PAYLOAD_SIZE)
  1262. return 0;
  1263. if (amount_to_process > RELAY_PAYLOAD_SIZE) {
  1264. length = RELAY_PAYLOAD_SIZE;
  1265. } else {
  1266. length = amount_to_process;
  1267. }
  1268. stats_n_data_bytes_packaged += length;
  1269. stats_n_data_cells_packaged += 1;
  1270. connection_fetch_from_buf(payload, length, TO_CONN(conn));
  1271. log_debug(domain,"(%d) Packaging %d bytes (%d waiting).", conn->_base.s,
  1272. (int)length, (int)buf_datalen(conn->_base.inbuf));
  1273. if (connection_edge_send_command(conn, RELAY_COMMAND_DATA,
  1274. payload, length) < 0 )
  1275. /* circuit got marked for close, don't continue, don't need to mark conn */
  1276. return 0;
  1277. if (!conn->cpath_layer) { /* non-rendezvous exit */
  1278. tor_assert(circ->package_window > 0);
  1279. circ->package_window--;
  1280. } else { /* we're an AP, or an exit on a rendezvous circ */
  1281. tor_assert(conn->cpath_layer->package_window > 0);
  1282. conn->cpath_layer->package_window--;
  1283. }
  1284. if (--conn->package_window <= 0) { /* is it 0 after decrement? */
  1285. connection_stop_reading(TO_CONN(conn));
  1286. log_debug(domain,"conn->package_window reached 0.");
  1287. circuit_consider_stop_edge_reading(circ, conn->cpath_layer);
  1288. return 0; /* don't process the inbuf any more */
  1289. }
  1290. log_debug(domain,"conn->package_window is now %d",conn->package_window);
  1291. if (max_cells) {
  1292. *max_cells -= 1;
  1293. if (*max_cells <= 0)
  1294. return 0;
  1295. }
  1296. /* handle more if there's more, or return 0 if there isn't */
  1297. goto repeat_connection_edge_package_raw_inbuf;
  1298. }
  1299. /** Called when we've just received a relay data cell, or when
  1300. * we've just finished flushing all bytes to stream <b>conn</b>.
  1301. *
  1302. * If conn->outbuf is not too full, and our deliver window is
  1303. * low, send back a suitable number of stream-level sendme cells.
  1304. */
  1305. void
  1306. connection_edge_consider_sending_sendme(edge_connection_t *conn)
  1307. {
  1308. circuit_t *circ;
  1309. if (connection_outbuf_too_full(TO_CONN(conn)))
  1310. return;
  1311. circ = circuit_get_by_edge_conn(conn);
  1312. if (!circ) {
  1313. /* this can legitimately happen if the destroy has already
  1314. * arrived and torn down the circuit */
  1315. log_info(LD_APP,"No circuit associated with conn. Skipping.");
  1316. return;
  1317. }
  1318. while (conn->deliver_window <= STREAMWINDOW_START - STREAMWINDOW_INCREMENT) {
  1319. log_debug(conn->cpath_layer?LD_APP:LD_EXIT,
  1320. "Outbuf %d, Queuing stream sendme.",
  1321. (int)conn->_base.outbuf_flushlen);
  1322. conn->deliver_window += STREAMWINDOW_INCREMENT;
  1323. if (connection_edge_send_command(conn, RELAY_COMMAND_SENDME,
  1324. NULL, 0) < 0) {
  1325. log_warn(LD_APP,"connection_edge_send_command failed. Skipping.");
  1326. return; /* the circuit's closed, don't continue */
  1327. }
  1328. }
  1329. }
  1330. /** The circuit <b>circ</b> has received a circuit-level sendme
  1331. * (on hop <b>layer_hint</b>, if we're the OP). Go through all the
  1332. * attached streams and let them resume reading and packaging, if
  1333. * their stream windows allow it.
  1334. */
  1335. static void
  1336. circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
  1337. {
  1338. if (circuit_queue_streams_are_blocked(circ)) {
  1339. log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming");
  1340. return;
  1341. }
  1342. log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
  1343. if (CIRCUIT_IS_ORIGIN(circ))
  1344. circuit_resume_edge_reading_helper(TO_ORIGIN_CIRCUIT(circ)->p_streams,
  1345. circ, layer_hint);
  1346. else
  1347. circuit_resume_edge_reading_helper(TO_OR_CIRCUIT(circ)->n_streams,
  1348. circ, layer_hint);
  1349. }
  1350. /** A helper function for circuit_resume_edge_reading() above.
  1351. * The arguments are the same, except that <b>conn</b> is the head
  1352. * of a linked list of edge streams that should each be considered.
  1353. */
  1354. static int
  1355. circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
  1356. circuit_t *circ,
  1357. crypt_path_t *layer_hint)
  1358. {
  1359. edge_connection_t *conn;
  1360. int n_packaging_streams, n_streams_left;
  1361. int packaged_this_round;
  1362. int cells_on_queue;
  1363. int cells_per_conn;
  1364. edge_connection_t *chosen_stream = NULL;
  1365. /* How many cells do we have space for? It will be the minimum of
  1366. * the number needed to exhaust the package window, and the minimum
  1367. * needed to fill the cell queue. */
  1368. int max_to_package = circ->package_window;
  1369. if (CIRCUIT_IS_ORIGIN(circ)) {
  1370. cells_on_queue = circ->n_conn_cells.n;
  1371. } else {
  1372. or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
  1373. cells_on_queue = or_circ->p_conn_cells.n;
  1374. }
  1375. if (CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue < max_to_package)
  1376. max_to_package = CELL_QUEUE_HIGHWATER_SIZE - cells_on_queue;
  1377. /* Once we used to start listening on the streams in the order they
  1378. * appeared in the linked list. That leads to starvation on the
  1379. * streams that appeared later on the list, since the first streams
  1380. * would always get to read first. Instead, we just pick a random
  1381. * stream on the list, and enable reading for streams starting at that
  1382. * point (and wrapping around as if the list were circular). It would
  1383. * probably be better to actually remember which streams we've
  1384. * serviced in the past, but this is simple and effective. */
  1385. /* Select a stream uniformly at random from the linked list. We
  1386. * don't need cryptographic randomness here. */
  1387. {
  1388. int num_streams = 0;
  1389. for (conn = first_conn; conn; conn = conn->next_stream) {
  1390. num_streams++;
  1391. if ((tor_weak_random() % num_streams)==0)
  1392. chosen_stream = conn;
  1393. /* Invariant: chosen_stream has been chosen uniformly at random from
  1394. * among the first num_streams streams on first_conn. */
  1395. }
  1396. }
  1397. /* Count how many non-marked streams there are that have anything on
  1398. * their inbuf, and enable reading on all of the connections. */
  1399. n_packaging_streams = 0;
  1400. /* Activate reading starting from the chosen stream */
  1401. for (conn=chosen_stream; conn; conn = conn->next_stream) {
  1402. /* Start reading for the streams starting from here */
  1403. if (conn->_base.marked_for_close || conn->package_window <= 0)
  1404. continue;
  1405. if (!layer_hint || conn->cpath_layer == layer_hint) {
  1406. connection_start_reading(TO_CONN(conn));
  1407. if (buf_datalen(conn->_base.inbuf) > 0)
  1408. ++n_packaging_streams;
  1409. }
  1410. }
  1411. /* Go back and do the ones we skipped, circular-style */
  1412. for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
  1413. if (conn->_base.marked_for_close || conn->package_window <= 0)
  1414. continue;
  1415. if (!layer_hint || conn->cpath_layer == layer_hint) {
  1416. connection_start_reading(TO_CONN(conn));
  1417. if (buf_datalen(conn->_base.inbuf) > 0)
  1418. ++n_packaging_streams;
  1419. }
  1420. }
  1421. if (n_packaging_streams == 0) /* avoid divide-by-zero */
  1422. return 0;
  1423. again:
  1424. cells_per_conn = CEIL_DIV(max_to_package, n_packaging_streams);
  1425. packaged_this_round = 0;
  1426. n_streams_left = 0;
  1427. /* Iterate over all connections. Package up to cells_per_conn cells on
  1428. * each. Update packaged_this_round with the total number of cells
  1429. * packaged, and n_streams_left with the number that still have data to
  1430. * package.
  1431. */
  1432. for (conn=first_conn; conn; conn=conn->next_stream) {
  1433. if (conn->_base.marked_for_close || conn->package_window <= 0)
  1434. continue;
  1435. if (!layer_hint || conn->cpath_layer == layer_hint) {
  1436. int n = cells_per_conn, r;
  1437. /* handle whatever might still be on the inbuf */
  1438. r = connection_edge_package_raw_inbuf(conn, 1, &n);
  1439. /* Note how many we packaged */
  1440. packaged_this_round += (cells_per_conn-n);
  1441. if (r<0) {
  1442. /* Problem while packaging. (We already sent an end cell if
  1443. * possible) */
  1444. connection_mark_for_close(TO_CONN(conn));
  1445. continue;
  1446. }
  1447. /* If there's still data to read, we'll be coming back to this stream. */
  1448. if (buf_datalen(conn->_base.inbuf))
  1449. ++n_streams_left;
  1450. /* If the circuit won't accept any more data, return without looking
  1451. * at any more of the streams. Any connections that should be stopped
  1452. * have already been stopped by connection_edge_package_raw_inbuf. */
  1453. if (circuit_consider_stop_edge_reading(circ, layer_hint))
  1454. return -1;
  1455. /* XXXX should we also stop immediately if we fill up the cell queue?
  1456. * Probably. */
  1457. }
  1458. }
  1459. /* If we made progress, and we are willing to package more, and there are
  1460. * any streams left that want to package stuff... try again!
  1461. */
  1462. if (packaged_this_round && packaged_this_round < max_to_package &&
  1463. n_streams_left) {
  1464. max_to_package -= packaged_this_round;
  1465. n_packaging_streams = n_streams_left;
  1466. goto again;
  1467. }
  1468. return 0;
  1469. }
  1470. /** Check if the package window for <b>circ</b> is empty (at
  1471. * hop <b>layer_hint</b> if it's defined).
  1472. *
  1473. * If yes, tell edge streams to stop reading and return 1.
  1474. * Else return 0.
  1475. */
  1476. static int
  1477. circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
  1478. {
  1479. edge_connection_t *conn = NULL;
  1480. unsigned domain = layer_hint ? LD_APP : LD_EXIT;
  1481. if (!layer_hint) {
  1482. or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
  1483. log_debug(domain,"considering circ->package_window %d",
  1484. circ->package_window);
  1485. if (circ->package_window <= 0) {
  1486. log_debug(domain,"yes, not-at-origin. stopped.");
  1487. for (conn = or_circ->n_streams; conn; conn=conn->next_stream)
  1488. connection_stop_reading(TO_CONN(conn));
  1489. return 1;
  1490. }
  1491. return 0;
  1492. }
  1493. /* else, layer hint is defined, use it */
  1494. log_debug(domain,"considering layer_hint->package_window %d",
  1495. layer_hint->package_window);
  1496. if (layer_hint->package_window <= 0) {
  1497. log_debug(domain,"yes, at-origin. stopped.");
  1498. for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn;
  1499. conn=conn->next_stream)
  1500. if (conn->cpath_layer == layer_hint)
  1501. connection_stop_reading(TO_CONN(conn));
  1502. return 1;
  1503. }
  1504. return 0;
  1505. }
  1506. /** Check if the deliver_window for circuit <b>circ</b> (at hop
  1507. * <b>layer_hint</b> if it's defined) is low enough that we should
  1508. * send a circuit-level sendme back down the circuit. If so, send
  1509. * enough sendmes that the window would be overfull if we sent any
  1510. * more.
  1511. */
  1512. static void
  1513. circuit_consider_sending_sendme(circuit_t *circ, crypt_path_t *layer_hint)
  1514. {
  1515. // log_fn(LOG_INFO,"Considering: layer_hint is %s",
  1516. // layer_hint ? "defined" : "null");
  1517. while ((layer_hint ? layer_hint->deliver_window : circ->deliver_window) <=
  1518. CIRCWINDOW_START - CIRCWINDOW_INCREMENT) {
  1519. log_debug(LD_CIRC,"Queuing circuit sendme.");
  1520. if (layer_hint)
  1521. layer_hint->deliver_window += CIRCWINDOW_INCREMENT;
  1522. else
  1523. circ->deliver_window += CIRCWINDOW_INCREMENT;
  1524. if (relay_send_command_from_edge(0, circ, RELAY_COMMAND_SENDME,
  1525. NULL, 0, layer_hint) < 0) {
  1526. log_warn(LD_CIRC,
  1527. "relay_send_command_from_edge failed. Circuit's closed.");
  1528. return; /* the circuit's closed, don't continue */
  1529. }
  1530. }
  1531. }
  1532. #ifdef ACTIVE_CIRCUITS_PARANOIA
  1533. #define assert_active_circuits_ok_paranoid(conn) \
  1534. assert_active_circuits_ok(conn)
  1535. #else
  1536. #define assert_active_circuits_ok_paranoid(conn)
  1537. #endif
  1538. /** The total number of cells we have allocated from the memory pool. */
  1539. static int total_cells_allocated = 0;
  1540. /** A memory pool to allocate packed_cell_t objects. */
  1541. static mp_pool_t *cell_pool = NULL;
  1542. /** Memory pool to allocate insertion_time_elem_t objects used for cell
  1543. * statistics. */
  1544. static mp_pool_t *it_pool = NULL;
  1545. /** Allocate structures to hold cells. */
  1546. void
  1547. init_cell_pool(void)
  1548. {
  1549. tor_assert(!cell_pool);
  1550. cell_pool = mp_pool_new(sizeof(packed_cell_t), 128*1024);
  1551. }
  1552. /** Free all storage used to hold cells (and insertion times if we measure
  1553. * cell statistics). */
  1554. void
  1555. free_cell_pool(void)
  1556. {
  1557. /* Maybe we haven't called init_cell_pool yet; need to check for it. */
  1558. if (cell_pool) {
  1559. mp_pool_destroy(cell_pool);
  1560. cell_pool = NULL;
  1561. }
  1562. if (it_pool) {
  1563. mp_pool_destroy(it_pool);
  1564. it_pool = NULL;
  1565. }
  1566. }
  1567. /** Free excess storage in cell pool. */
  1568. void
  1569. clean_cell_pool(void)
  1570. {
  1571. tor_assert(cell_pool);
  1572. mp_pool_clean(cell_pool, 0, 1);
  1573. }
  1574. /** Release storage held by <b>cell</b>. */
  1575. static INLINE void
  1576. packed_cell_free_unchecked(packed_cell_t *cell)
  1577. {
  1578. --total_cells_allocated;
  1579. mp_pool_release(cell);
  1580. }
  1581. /** Allocate and return a new packed_cell_t. */
  1582. static INLINE packed_cell_t *
  1583. packed_cell_alloc(void)
  1584. {
  1585. ++total_cells_allocated;
  1586. return mp_pool_get(cell_pool);
  1587. }
  1588. /** Log current statistics for cell pool allocation at log level
  1589. * <b>severity</b>. */
  1590. void
  1591. dump_cell_pool_usage(int severity)
  1592. {
  1593. circuit_t *c;
  1594. int n_circs = 0;
  1595. int n_cells = 0;
  1596. for (c = _circuit_get_global_list(); c; c = c->next) {
  1597. n_cells += c->n_conn_cells.n;
  1598. if (!CIRCUIT_IS_ORIGIN(c))
  1599. n_cells += TO_OR_CIRCUIT(c)->p_conn_cells.n;
  1600. ++n_circs;
  1601. }
  1602. log(severity, LD_MM, "%d cells allocated on %d circuits. %d cells leaked.",
  1603. n_cells, n_circs, total_cells_allocated - n_cells);
  1604. mp_pool_log_status(cell_pool, severity);
  1605. }
  1606. /** Allocate a new copy of packed <b>cell</b>. */
  1607. static INLINE packed_cell_t *
  1608. packed_cell_copy(const cell_t *cell)
  1609. {
  1610. packed_cell_t *c = packed_cell_alloc();
  1611. cell_pack(c, cell);
  1612. c->next = NULL;
  1613. return c;
  1614. }
  1615. /** Append <b>cell</b> to the end of <b>queue</b>. */
  1616. void
  1617. cell_queue_append(cell_queue_t *queue, packed_cell_t *cell)
  1618. {
  1619. if (queue->tail) {
  1620. tor_assert(!queue->tail->next);
  1621. queue->tail->next = cell;
  1622. } else {
  1623. queue->head = cell;
  1624. }
  1625. queue->tail = cell;
  1626. cell->next = NULL;
  1627. ++queue->n;
  1628. }
  1629. /** Append a newly allocated copy of <b>cell</b> to the end of <b>queue</b> */
  1630. void
  1631. cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell)
  1632. {
  1633. packed_cell_t *copy = packed_cell_copy(cell);
  1634. /* Remember the time when this cell was put in the queue. */
  1635. if (get_options()->CellStatistics) {
  1636. struct timeval now;
  1637. uint32_t added;
  1638. insertion_time_queue_t *it_queue = queue->insertion_times;
  1639. if (!it_pool)
  1640. it_pool = mp_pool_new(sizeof(insertion_time_elem_t), 1024);
  1641. tor_gettimeofday_cached(&now);
  1642. #define SECONDS_IN_A_DAY 86400L
  1643. added = (uint32_t)(((now.tv_sec % SECONDS_IN_A_DAY) * 100L)
  1644. + ((uint32_t)now.tv_usec / (uint32_t)10000L));
  1645. if (!it_queue) {
  1646. it_queue = tor_malloc_zero(sizeof(insertion_time_queue_t));
  1647. queue->insertion_times = it_queue;
  1648. }
  1649. if (it_queue->last && it_queue->last->insertion_time == added) {
  1650. it_queue->last->counter++;
  1651. } else {
  1652. insertion_time_elem_t *elem = mp_pool_get(it_pool);
  1653. elem->next = NULL;
  1654. elem->insertion_time = added;
  1655. elem->counter = 1;
  1656. if (it_queue->last) {
  1657. it_queue->last->next = elem;
  1658. it_queue->last = elem;
  1659. } else {
  1660. it_queue->first = it_queue->last = elem;
  1661. }
  1662. }
  1663. }
  1664. cell_queue_append(queue, copy);
  1665. }
  1666. /** Remove and free every cell in <b>queue</b>. */
  1667. void
  1668. cell_queue_clear(cell_queue_t *queue)
  1669. {
  1670. packed_cell_t *cell, *next;
  1671. cell = queue->head;
  1672. while (cell) {
  1673. next = cell->next;
  1674. packed_cell_free_unchecked(cell);
  1675. cell = next;
  1676. }
  1677. queue->head = queue->tail = NULL;
  1678. queue->n = 0;
  1679. if (queue->insertion_times) {
  1680. while (queue->insertion_times->first) {
  1681. insertion_time_elem_t *elem = queue->insertion_times->first;
  1682. queue->insertion_times->first = elem->next;
  1683. mp_pool_release(elem);
  1684. }
  1685. tor_free(queue->insertion_times);
  1686. }
  1687. }
  1688. /** Extract and return the cell at the head of <b>queue</b>; return NULL if
  1689. * <b>queue</b> is empty. */
  1690. static INLINE packed_cell_t *
  1691. cell_queue_pop(cell_queue_t *queue)
  1692. {
  1693. packed_cell_t *cell = queue->head;
  1694. if (!cell)
  1695. return NULL;
  1696. queue->head = cell->next;
  1697. if (cell == queue->tail) {
  1698. tor_assert(!queue->head);
  1699. queue->tail = NULL;
  1700. }
  1701. --queue->n;
  1702. return cell;
  1703. }
  1704. /** Return a pointer to the "next_active_on_{n,p}_conn" pointer of <b>circ</b>,
  1705. * depending on whether <b>conn</b> matches n_conn or p_conn. */
  1706. static INLINE circuit_t **
  1707. next_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
  1708. {
  1709. tor_assert(circ);
  1710. tor_assert(conn);
  1711. if (conn == circ->n_conn) {
  1712. return &circ->next_active_on_n_conn;
  1713. } else {
  1714. or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
  1715. tor_assert(conn == orcirc->p_conn);
  1716. return &orcirc->next_active_on_p_conn;
  1717. }
  1718. }
  1719. /** Return a pointer to the "prev_active_on_{n,p}_conn" pointer of <b>circ</b>,
  1720. * depending on whether <b>conn</b> matches n_conn or p_conn. */
  1721. static INLINE circuit_t **
  1722. prev_circ_on_conn_p(circuit_t *circ, or_connection_t *conn)
  1723. {
  1724. tor_assert(circ);
  1725. tor_assert(conn);
  1726. if (conn == circ->n_conn) {
  1727. return &circ->prev_active_on_n_conn;
  1728. } else {
  1729. or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
  1730. tor_assert(conn == orcirc->p_conn);
  1731. return &orcirc->prev_active_on_p_conn;
  1732. }
  1733. }
  1734. /** Helper for sorting cell_ewma_t values in their priority queue. */
  1735. static int
  1736. compare_cell_ewma_counts(const void *p1, const void *p2)
  1737. {
  1738. const cell_ewma_t *e1=p1, *e2=p2;
  1739. if (e1->cell_count < e2->cell_count)
  1740. return -1;
  1741. else if (e1->cell_count > e2->cell_count)
  1742. return 1;
  1743. else
  1744. return 0;
  1745. }
  1746. /** Given a cell_ewma_t, return a pointer to the circuit containing it. */
  1747. static circuit_t *
  1748. cell_ewma_to_circuit(cell_ewma_t *ewma)
  1749. {
  1750. if (ewma->is_for_p_conn) {
  1751. /* This is an or_circuit_t's p_cell_ewma. */
  1752. or_circuit_t *orcirc = SUBTYPE_P(ewma, or_circuit_t, p_cell_ewma);
  1753. return TO_CIRCUIT(orcirc);
  1754. } else {
  1755. /* This is some circuit's n_cell_ewma. */
  1756. return SUBTYPE_P(ewma, circuit_t, n_cell_ewma);
  1757. }
  1758. }
  1759. /* ==== Functions for scaling cell_ewma_t ====
  1760. When choosing which cells to relay first, we favor circuits that have been
  1761. quiet recently. This gives better latency on connections that aren't
  1762. pushing lots of data, and makes the network feel more interactive.
  1763. Conceptually, we take an exponentially weighted mean average of the number
  1764. of cells a circuit has sent, and allow active circuits (those with cells to
  1765. relay) to send cells in reverse order of their exponentially-weighted mean
  1766. average (EWMA) cell count. [That is, a cell sent N seconds ago 'counts'
  1767. F^N times as much as a cell sent now, for 0<F<1.0, and we favor the
  1768. circuit that has sent the fewest cells]
  1769. If 'double' had infinite precision, we could do this simply by counting a
  1770. cell sent at startup as having weight 1.0, and a cell sent N seconds later
  1771. as having weight F^-N. This way, we would never need to re-scale
  1772. any already-sent cells.
  1773. To prevent double from overflowing, we could count a cell sent now as
  1774. having weight 1.0 and a cell sent N seconds ago as having weight F^N.
  1775. This, however, would mean we'd need to re-scale *ALL* old circuits every
  1776. time we wanted to send a cell.
  1777. So as a compromise, we divide time into 'ticks' (currently, 10-second
  1778. increments) and say that a cell sent at the start of a current tick is
  1779. worth 1.0, a cell sent N seconds before the start of the current tick is
  1780. worth F^N, and a cell sent N seconds after the start of the current tick is
  1781. worth F^-N. This way we don't overflow, and we don't need to constantly
  1782. rescale.
  1783. */
  1784. /** How long does a tick last (seconds)? */
  1785. #define EWMA_TICK_LEN 10
  1786. /** The default per-tick scale factor, if it hasn't been overridden by a
  1787. * consensus or a configuration setting. zero means "disabled". */
  1788. #define EWMA_DEFAULT_HALFLIFE 0.0
  1789. /** Given a timeval <b>now</b>, compute the cell_ewma tick in which it occurs
  1790. * and the fraction of the tick that has elapsed between the start of the tick
  1791. * and <b>now</b>. Return the former and store the latter in
  1792. * *<b>remainder_out</b>.
  1793. *
  1794. * These tick values are not meant to be shared between Tor instances, or used
  1795. * for other purposes. */
  1796. static unsigned
  1797. cell_ewma_tick_from_timeval(const struct timeval *now,
  1798. double *remainder_out)
  1799. {
  1800. unsigned res = (unsigned) (now->tv_sec / EWMA_TICK_LEN);
  1801. /* rem */
  1802. double rem = (now->tv_sec % EWMA_TICK_LEN) +
  1803. ((double)(now->tv_usec)) / 1.0e6;
  1804. *remainder_out = rem / EWMA_TICK_LEN;
  1805. return res;
  1806. }
  1807. /** Compute and return the current cell_ewma tick. */
  1808. unsigned
  1809. cell_ewma_get_tick(void)
  1810. {
  1811. return ((unsigned)approx_time() / EWMA_TICK_LEN);
  1812. }
  1813. /** The per-tick scale factor to be used when computing cell-count EWMA
  1814. * values. (A cell sent N ticks before the start of the current tick
  1815. * has value ewma_scale_factor ** N.)
  1816. */
  1817. static double ewma_scale_factor = 0.1;
  1818. static int ewma_enabled = 0;
  1819. #define EPSILON 0.00001
  1820. #define LOG_ONEHALF -0.69314718055994529
  1821. /** Adjust the global cell scale factor based on <b>options</b> */
  1822. void
  1823. cell_ewma_set_scale_factor(or_options_t *options, networkstatus_t *consensus)
  1824. {
  1825. int32_t halflife_ms;
  1826. double halflife;
  1827. const char *source;
  1828. if (options && options->CircuitPriorityHalflife >= -EPSILON) {
  1829. halflife = options->CircuitPriorityHalflife;
  1830. source = "CircuitPriorityHalflife in configuration";
  1831. } else if (consensus &&
  1832. (halflife_ms = networkstatus_get_param(
  1833. consensus, "CircuitPriorityHalflifeMsec", -1)) >= 0) {
  1834. halflife = ((double)halflife_ms)/1000.0;
  1835. source = "CircuitPriorityHalflifeMsec in consensus";
  1836. } else {
  1837. halflife = EWMA_DEFAULT_HALFLIFE;
  1838. source = "Default value";
  1839. }
  1840. if (halflife <= EPSILON) {
  1841. /* The cell EWMA algorithm is disabled. */
  1842. ewma_scale_factor = 0.1;
  1843. ewma_enabled = 0;
  1844. log_info(LD_OR,
  1845. "Disabled cell_ewma algorithm because of value in %s",
  1846. source);
  1847. } else {
  1848. /* convert halflife into halflife-per-tick. */
  1849. halflife /= EWMA_TICK_LEN;
  1850. /* compute per-tick scale factor. */
  1851. ewma_scale_factor = exp( LOG_ONEHALF / halflife );
  1852. ewma_enabled = 1;
  1853. log_info(LD_OR,
  1854. "Enabled cell_ewma algorithm because of value in %s; "
  1855. "scale factor is %lf per %d seconds",
  1856. source, ewma_scale_factor, EWMA_TICK_LEN);
  1857. }
  1858. }
  1859. /** Return the multiplier necessary to convert the value of a cell sent in
  1860. * 'from_tick' to one sent in 'to_tick'. */
  1861. static INLINE double
  1862. get_scale_factor(unsigned from_tick, unsigned to_tick)
  1863. {
  1864. /* This math can wrap around, but that's okay: unsigned overflow is
  1865. well-defined */
  1866. int diff = (int)(to_tick - from_tick);
  1867. return pow(ewma_scale_factor, diff);
  1868. }
  1869. /** Adjust the cell count of <b>ewma</b> so that it is scaled with respect to
  1870. * <b>cur_tick</b> */
  1871. static void
  1872. scale_single_cell_ewma(cell_ewma_t *ewma, unsigned cur_tick)
  1873. {
  1874. double factor = get_scale_factor(ewma->last_adjusted_tick, cur_tick);
  1875. ewma->cell_count *= factor;
  1876. ewma->last_adjusted_tick = cur_tick;
  1877. }
  1878. /** Adjust the cell count of every active circuit on <b>conn</b> so
  1879. * that they are scaled with respect to <b>cur_tick</b> */
  1880. static void
  1881. scale_active_circuits(or_connection_t *conn, unsigned cur_tick)
  1882. {
  1883. double factor = get_scale_factor(
  1884. conn->active_circuit_pqueue_last_recalibrated,
  1885. cur_tick);
  1886. /** Ordinarily it isn't okay to change the value of an element in a heap,
  1887. * but it's okay here, since we are preserving the order. */
  1888. SMARTLIST_FOREACH(conn->active_circuit_pqueue, cell_ewma_t *, e, {
  1889. tor_assert(e->last_adjusted_tick ==
  1890. conn->active_circuit_pqueue_last_recalibrated);
  1891. e->cell_count *= factor;
  1892. e->last_adjusted_tick = cur_tick;
  1893. });
  1894. conn->active_circuit_pqueue_last_recalibrated = cur_tick;
  1895. }
  1896. /** Rescale <b>ewma</b> to the same scale as <b>conn</b>, and add it to
  1897. * <b>conn</b>'s priority queue of active circuits */
  1898. static void
  1899. add_cell_ewma_to_conn(or_connection_t *conn, cell_ewma_t *ewma)
  1900. {
  1901. tor_assert(ewma->heap_index == -1);
  1902. scale_single_cell_ewma(ewma,
  1903. conn->active_circuit_pqueue_last_recalibrated);
  1904. smartlist_pqueue_add(conn->active_circuit_pqueue,
  1905. compare_cell_ewma_counts,
  1906. STRUCT_OFFSET(cell_ewma_t, heap_index),
  1907. ewma);
  1908. }
  1909. /** Remove <b>ewma</b> from <b>conn</b>'s priority queue of active circuits */
  1910. static void
  1911. remove_cell_ewma_from_conn(or_connection_t *conn, cell_ewma_t *ewma)
  1912. {
  1913. tor_assert(ewma->heap_index != -1);
  1914. smartlist_pqueue_remove(conn->active_circuit_pqueue,
  1915. compare_cell_ewma_counts,
  1916. STRUCT_OFFSET(cell_ewma_t, heap_index),
  1917. ewma);
  1918. }
  1919. /** Remove and return the first cell_ewma_t from conn's priority queue of
  1920. * active circuits. Requires that the priority queue is nonempty. */
  1921. static cell_ewma_t *
  1922. pop_first_cell_ewma_from_conn(or_connection_t *conn)
  1923. {
  1924. return smartlist_pqueue_pop(conn->active_circuit_pqueue,
  1925. compare_cell_ewma_counts,
  1926. STRUCT_OFFSET(cell_ewma_t, heap_index));
  1927. }
  1928. /** Add <b>circ</b> to the list of circuits with pending cells on
  1929. * <b>conn</b>. No effect if <b>circ</b> is already linked. */
  1930. void
  1931. make_circuit_active_on_conn(circuit_t *circ, or_connection_t *conn)
  1932. {
  1933. circuit_t **nextp = next_circ_on_conn_p(circ, conn);
  1934. circuit_t **prevp = prev_circ_on_conn_p(circ, conn);
  1935. if (*nextp && *prevp) {
  1936. /* Already active. */
  1937. return;
  1938. }
  1939. assert_active_circuits_ok_paranoid(conn);
  1940. if (! conn->active_circuits) {
  1941. conn->active_circuits = circ;
  1942. *prevp = *nextp = circ;
  1943. } else {
  1944. circuit_t *head = conn->active_circuits;
  1945. circuit_t *old_tail = *prev_circ_on_conn_p(head, conn);
  1946. *next_circ_on_conn_p(old_tail, conn) = circ;
  1947. *nextp = head;
  1948. *prev_circ_on_conn_p(head, conn) = circ;
  1949. *prevp = old_tail;
  1950. }
  1951. if (circ->n_conn == conn) {
  1952. add_cell_ewma_to_conn(conn, &circ->n_cell_ewma);
  1953. } else {
  1954. or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
  1955. tor_assert(conn == orcirc->p_conn);
  1956. add_cell_ewma_to_conn(conn, &orcirc->p_cell_ewma);
  1957. }
  1958. assert_active_circuits_ok_paranoid(conn);
  1959. }
  1960. /** Remove <b>circ</b> from the list of circuits with pending cells on
  1961. * <b>conn</b>. No effect if <b>circ</b> is already unlinked. */
  1962. void
  1963. make_circuit_inactive_on_conn(circuit_t *circ, or_connection_t *conn)
  1964. {
  1965. circuit_t **nextp = next_circ_on_conn_p(circ, conn);
  1966. circuit_t **prevp = prev_circ_on_conn_p(circ, conn);
  1967. circuit_t *next = *nextp, *prev = *prevp;
  1968. if (!next && !prev) {
  1969. /* Already inactive. */
  1970. return;
  1971. }
  1972. assert_active_circuits_ok_paranoid(conn);
  1973. tor_assert(next && prev);
  1974. tor_assert(*prev_circ_on_conn_p(next, conn) == circ);
  1975. tor_assert(*next_circ_on_conn_p(prev, conn) == circ);
  1976. if (next == circ) {
  1977. conn->active_circuits = NULL;
  1978. } else {
  1979. *prev_circ_on_conn_p(next, conn) = prev;
  1980. *next_circ_on_conn_p(prev, conn) = next;
  1981. if (conn->active_circuits == circ)
  1982. conn->active_circuits = next;
  1983. }
  1984. *prevp = *nextp = NULL;
  1985. if (circ->n_conn == conn) {
  1986. remove_cell_ewma_from_conn(conn, &circ->n_cell_ewma);
  1987. } else {
  1988. or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
  1989. tor_assert(conn == orcirc->p_conn);
  1990. remove_cell_ewma_from_conn(conn, &orcirc->p_cell_ewma);
  1991. }
  1992. assert_active_circuits_ok_paranoid(conn);
  1993. }
  1994. /** Remove all circuits from the list of circuits with pending cells on
  1995. * <b>conn</b>. */
  1996. void
  1997. connection_or_unlink_all_active_circs(or_connection_t *orconn)
  1998. {
  1999. circuit_t *head = orconn->active_circuits;
  2000. circuit_t *cur = head;
  2001. if (! head)
  2002. return;
  2003. do {
  2004. circuit_t *next = *next_circ_on_conn_p(cur, orconn);
  2005. *prev_circ_on_conn_p(cur, orconn) = NULL;
  2006. *next_circ_on_conn_p(cur, orconn) = NULL;
  2007. cur = next;
  2008. } while (cur != head);
  2009. orconn->active_circuits = NULL;
  2010. SMARTLIST_FOREACH(orconn->active_circuit_pqueue, cell_ewma_t *, e,
  2011. e->heap_index = -1);
  2012. smartlist_clear(orconn->active_circuit_pqueue);
  2013. }
  2014. /** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
  2015. * every edge connection that is using <b>circ</b> to write to <b>orconn</b>,
  2016. * and start or stop reading as appropriate.
  2017. *
  2018. * If <b>stream_id</b> is nonzero, block only the edge connection whose
  2019. * stream_id matches it.
  2020. *
  2021. * Returns the number of streams whose status we changed.
  2022. */
  2023. static int
  2024. set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
  2025. int block, streamid_t stream_id)
  2026. {
  2027. edge_connection_t *edge = NULL;
  2028. int n = 0;
  2029. if (circ->n_conn == orconn) {
  2030. circ->streams_blocked_on_n_conn = block;
  2031. if (CIRCUIT_IS_ORIGIN(circ))
  2032. edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
  2033. } else {
  2034. circ->streams_blocked_on_p_conn = block;
  2035. tor_assert(!CIRCUIT_IS_ORIGIN(circ));
  2036. edge = TO_OR_CIRCUIT(circ)->n_streams;
  2037. }
  2038. for (; edge; edge = edge->next_stream) {
  2039. connection_t *conn = TO_CONN(edge);
  2040. if (stream_id && edge->stream_id != stream_id)
  2041. continue;
  2042. if (edge->edge_blocked_on_circ != block) {
  2043. ++n;
  2044. edge->edge_blocked_on_circ = block;
  2045. }
  2046. if (!conn->read_event) {
  2047. /* This connection is a placeholder for something; probably a DNS
  2048. * request. It can't actually stop or start reading.*/
  2049. continue;
  2050. }
  2051. if (block) {
  2052. if (connection_is_reading(conn))
  2053. connection_stop_reading(conn);
  2054. } else {
  2055. /* Is this right? */
  2056. if (!connection_is_reading(conn))
  2057. connection_start_reading(conn);
  2058. }
  2059. }
  2060. return n;
  2061. }
  2062. /** Pull as many cells as possible (but no more than <b>max</b>) from the
  2063. * queue of the first active circuit on <b>conn</b>, and write them to
  2064. * <b>conn</b>-&gt;outbuf. Return the number of cells written. Advance
  2065. * the active circuit pointer to the next active circuit in the ring. */
  2066. int
  2067. connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
  2068. time_t now)
  2069. {
  2070. int n_flushed;
  2071. cell_queue_t *queue;
  2072. circuit_t *circ;
  2073. int streams_blocked;
  2074. /* The current (hi-res) time */
  2075. struct timeval now_hires;
  2076. /* The EWMA cell counter for the circuit we're flushing. */
  2077. cell_ewma_t *cell_ewma = NULL;
  2078. double ewma_increment = -1;
  2079. circ = conn->active_circuits;
  2080. if (!circ) return 0;
  2081. assert_active_circuits_ok_paranoid(conn);
  2082. /* See if we're doing the ewma circuit selection algorithm. */
  2083. if (ewma_enabled) {
  2084. unsigned tick;
  2085. double fractional_tick;
  2086. tor_gettimeofday_cached(&now_hires);
  2087. tick = cell_ewma_tick_from_timeval(&now_hires, &fractional_tick);
  2088. if (tick != conn->active_circuit_pqueue_last_recalibrated) {
  2089. scale_active_circuits(conn, tick);
  2090. }
  2091. ewma_increment = pow(ewma_scale_factor, -fractional_tick);
  2092. cell_ewma = smartlist_get(conn->active_circuit_pqueue, 0);
  2093. circ = cell_ewma_to_circuit(cell_ewma);
  2094. }
  2095. if (circ->n_conn == conn) {
  2096. queue = &circ->n_conn_cells;
  2097. streams_blocked = circ->streams_blocked_on_n_conn;
  2098. } else {
  2099. queue = &TO_OR_CIRCUIT(circ)->p_conn_cells;
  2100. streams_blocked = circ->streams_blocked_on_p_conn;
  2101. }
  2102. tor_assert(*next_circ_on_conn_p(circ,conn));
  2103. for (n_flushed = 0; n_flushed < max && queue->head; ) {
  2104. packed_cell_t *cell = cell_queue_pop(queue);
  2105. tor_assert(*next_circ_on_conn_p(circ,conn));
  2106. /* Calculate the exact time that this cell has spent in the queue. */
  2107. if (get_options()->CellStatistics && !CIRCUIT_IS_ORIGIN(circ)) {
  2108. struct timeval now;
  2109. uint32_t flushed;
  2110. uint32_t cell_waiting_time;
  2111. insertion_time_queue_t *it_queue = queue->insertion_times;
  2112. tor_gettimeofday_cached(&now);
  2113. flushed = (uint32_t)((now.tv_sec % SECONDS_IN_A_DAY) * 100L +
  2114. (uint32_t)now.tv_usec / (uint32_t)10000L);
  2115. if (!it_queue || !it_queue->first) {
  2116. log_info(LD_GENERAL, "Cannot determine insertion time of cell. "
  2117. "Looks like the CellStatistics option was "
  2118. "recently enabled.");
  2119. } else {
  2120. or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
  2121. insertion_time_elem_t *elem = it_queue->first;
  2122. cell_waiting_time =
  2123. (uint32_t)((flushed * 10L + SECONDS_IN_A_DAY * 1000L -
  2124. elem->insertion_time * 10L) %
  2125. (SECONDS_IN_A_DAY * 1000L));
  2126. #undef SECONDS_IN_A_DAY
  2127. elem->counter--;
  2128. if (elem->counter < 1) {
  2129. it_queue->first = elem->next;
  2130. if (elem == it_queue->last)
  2131. it_queue->last = NULL;
  2132. mp_pool_release(elem);
  2133. }
  2134. orcirc->total_cell_waiting_time += cell_waiting_time;
  2135. orcirc->processed_cells++;
  2136. }
  2137. }
  2138. /* If we just flushed our queue and this circuit is used for a
  2139. * tunneled directory request, possibly advance its state. */
  2140. if (queue->n == 0 && TO_CONN(conn)->dirreq_id)
  2141. geoip_change_dirreq_state(TO_CONN(conn)->dirreq_id,
  2142. DIRREQ_TUNNELED,
  2143. DIRREQ_CIRC_QUEUE_FLUSHED);
  2144. connection_write_to_buf(cell->body, CELL_NETWORK_SIZE, TO_CONN(conn));
  2145. packed_cell_free_unchecked(cell);
  2146. ++n_flushed;
  2147. if (cell_ewma) {
  2148. cell_ewma_t *tmp;
  2149. cell_ewma->cell_count += ewma_increment;
  2150. /* We pop and re-add the cell_ewma_t here, not above, since we need to
  2151. * re-add it immediately to keep the priority queue consistent with
  2152. * the linked-list implementation */
  2153. tmp = pop_first_cell_ewma_from_conn(conn);
  2154. tor_assert(tmp == cell_ewma);
  2155. add_cell_ewma_to_conn(conn, cell_ewma);
  2156. }
  2157. if (circ != conn->active_circuits) {
  2158. /* If this happens, the current circuit just got made inactive by
  2159. * a call in connection_write_to_buf(). That's nothing to worry about:
  2160. * circuit_make_inactive_on_conn() already advanced conn->active_circuits
  2161. * for us.
  2162. */
  2163. assert_active_circuits_ok_paranoid(conn);
  2164. goto done;
  2165. }
  2166. }
  2167. tor_assert(*next_circ_on_conn_p(circ,conn));
  2168. assert_active_circuits_ok_paranoid(conn);
  2169. conn->active_circuits = *next_circ_on_conn_p(circ, conn);
  2170. /* Is the cell queue low enough to unblock all the streams that are waiting
  2171. * to write to this circuit? */
  2172. if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
  2173. set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */
  2174. /* Did we just run out of cells on this circuit's queue? */
  2175. if (queue->n == 0) {
  2176. log_debug(LD_GENERAL, "Made a circuit inactive.");
  2177. make_circuit_inactive_on_conn(circ, conn);
  2178. }
  2179. done:
  2180. if (n_flushed)
  2181. conn->timestamp_last_added_nonpadding = now;
  2182. return n_flushed;
  2183. }
  2184. /** Add <b>cell</b> to the queue of <b>circ</b> writing to <b>orconn</b>
  2185. * transmitting in <b>direction</b>. */
  2186. void
  2187. append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
  2188. cell_t *cell, cell_direction_t direction,
  2189. streamid_t fromstream)
  2190. {
  2191. cell_queue_t *queue;
  2192. int streams_blocked;
  2193. if (circ->marked_for_close)
  2194. return;
  2195. if (direction == CELL_DIRECTION_OUT) {
  2196. queue = &circ->n_conn_cells;
  2197. streams_blocked = circ->streams_blocked_on_n_conn;
  2198. } else {
  2199. or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
  2200. queue = &orcirc->p_conn_cells;
  2201. streams_blocked = circ->streams_blocked_on_p_conn;
  2202. }
  2203. if (cell->command == CELL_RELAY_EARLY && orconn->link_proto < 2) {
  2204. /* V1 connections don't understand RELAY_EARLY. */
  2205. cell->command = CELL_RELAY;
  2206. }
  2207. cell_queue_append_packed_copy(queue, cell);
  2208. /* If we have too many cells on the circuit, we should stop reading from
  2209. * the edge streams for a while. */
  2210. if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
  2211. set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */
  2212. if (streams_blocked && fromstream) {
  2213. /* This edge connection is apparently not blocked; block it. */
  2214. set_streams_blocked_on_circ(circ, orconn, 1, fromstream);
  2215. }
  2216. if (queue->n == 1) {
  2217. /* This was the first cell added to the queue. We need to make this
  2218. * circuit active. */
  2219. log_debug(LD_GENERAL, "Made a circuit active.");
  2220. make_circuit_active_on_conn(circ, orconn);
  2221. }
  2222. if (! buf_datalen(orconn->_base.outbuf)) {
  2223. /* There is no data at all waiting to be sent on the outbuf. Add a
  2224. * cell, so that we can notice when it gets flushed, flushed_some can
  2225. * get called, and we can start putting more data onto the buffer then.
  2226. */
  2227. log_debug(LD_GENERAL, "Primed a buffer.");
  2228. connection_or_flush_from_first_active_circuit(orconn, 1, approx_time());
  2229. }
  2230. }
  2231. /** Append an encoded value of <b>addr</b> to <b>payload_out</b>, which must
  2232. * have at least 18 bytes of free space. The encoding is, as specified in
  2233. * tor-spec.txt:
  2234. * RESOLVED_TYPE_IPV4 or RESOLVED_TYPE_IPV6 [1 byte]
  2235. * LENGTH [1 byte]
  2236. * ADDRESS [length bytes]
  2237. * Return the number of bytes added, or -1 on error */
  2238. int
  2239. append_address_to_payload(uint8_t *payload_out, const tor_addr_t *addr)
  2240. {
  2241. uint32_t a;
  2242. switch (tor_addr_family(addr)) {
  2243. case AF_INET:
  2244. payload_out[0] = RESOLVED_TYPE_IPV4;
  2245. payload_out[1] = 4;
  2246. a = tor_addr_to_ipv4n(addr);
  2247. memcpy(payload_out+2, &a, 4);
  2248. return 6;
  2249. case AF_INET6:
  2250. payload_out[0] = RESOLVED_TYPE_IPV6;
  2251. payload_out[1] = 16;
  2252. memcpy(payload_out+2, tor_addr_to_in6_addr8(addr), 16);
  2253. return 18;
  2254. case AF_UNSPEC:
  2255. default:
  2256. return -1;
  2257. }
  2258. }
  2259. /** Given <b>payload_len</b> bytes at <b>payload</b>, starting with an address
  2260. * encoded as by append_address_to_payload(), try to decode the address into
  2261. * *<b>addr_out</b>. Return the next byte in the payload after the address on
  2262. * success, or NULL on failure. */
  2263. const uint8_t *
  2264. decode_address_from_payload(tor_addr_t *addr_out, const uint8_t *payload,
  2265. int payload_len)
  2266. {
  2267. if (payload_len < 2)
  2268. return NULL;
  2269. if (payload_len < 2+payload[1])
  2270. return NULL;
  2271. switch (payload[0]) {
  2272. case RESOLVED_TYPE_IPV4:
  2273. if (payload[1] != 4)
  2274. return NULL;
  2275. tor_addr_from_ipv4n(addr_out, get_uint32(payload+2));
  2276. break;
  2277. case RESOLVED_TYPE_IPV6:
  2278. if (payload[1] != 16)
  2279. return NULL;
  2280. tor_addr_from_ipv6_bytes(addr_out, (char*)(payload+2));
  2281. break;
  2282. default:
  2283. tor_addr_make_unspec(addr_out);
  2284. break;
  2285. }
  2286. return payload + 2 + payload[1];
  2287. }
  2288. /** Remove all the cells queued on <b>circ</b> for <b>orconn</b>. */
  2289. void
  2290. circuit_clear_cell_queue(circuit_t *circ, or_connection_t *orconn)
  2291. {
  2292. cell_queue_t *queue;
  2293. if (circ->n_conn == orconn) {
  2294. queue = &circ->n_conn_cells;
  2295. } else {
  2296. or_circuit_t *orcirc = TO_OR_CIRCUIT(circ);
  2297. tor_assert(orcirc->p_conn == orconn);
  2298. queue = &orcirc->p_conn_cells;
  2299. }
  2300. if (queue->n)
  2301. make_circuit_inactive_on_conn(circ,orconn);
  2302. cell_queue_clear(queue);
  2303. }
  2304. /** Fail with an assert if the active circuits ring on <b>orconn</b> is
  2305. * corrupt. */
  2306. void
  2307. assert_active_circuits_ok(or_connection_t *orconn)
  2308. {
  2309. circuit_t *head = orconn->active_circuits;
  2310. circuit_t *cur = head;
  2311. int n = 0;
  2312. if (! head)
  2313. return;
  2314. do {
  2315. circuit_t *next = *next_circ_on_conn_p(cur, orconn);
  2316. circuit_t *prev = *prev_circ_on_conn_p(cur, orconn);
  2317. cell_ewma_t *ewma;
  2318. tor_assert(next);
  2319. tor_assert(prev);
  2320. tor_assert(*next_circ_on_conn_p(prev, orconn) == cur);
  2321. tor_assert(*prev_circ_on_conn_p(next, orconn) == cur);
  2322. if (orconn == cur->n_conn) {
  2323. ewma = &cur->n_cell_ewma;
  2324. tor_assert(!ewma->is_for_p_conn);
  2325. } else {
  2326. ewma = &TO_OR_CIRCUIT(cur)->p_cell_ewma;
  2327. tor_assert(ewma->is_for_p_conn);
  2328. }
  2329. tor_assert(ewma->heap_index != -1);
  2330. tor_assert(ewma == smartlist_get(orconn->active_circuit_pqueue,
  2331. ewma->heap_index));
  2332. n++;
  2333. cur = next;
  2334. } while (cur != head);
  2335. tor_assert(n == smartlist_len(orconn->active_circuit_pqueue));
  2336. }
  2337. /** Return 1 if we shouldn't restart reading on this circuit, even if
  2338. * we get a SENDME. Else return 0.
  2339. */
  2340. static int
  2341. circuit_queue_streams_are_blocked(circuit_t *circ)
  2342. {
  2343. if (CIRCUIT_IS_ORIGIN(circ)) {
  2344. return circ->streams_blocked_on_n_conn;
  2345. } else {
  2346. return circ->streams_blocked_on_p_conn;
  2347. }
  2348. }