test_relay.py 15 KB


  1. #!/usr/bin/python3
  2. #
  3. import throughput_protocols
  4. import basic_protocols
  5. import useful
  6. import time
  7. import os
  8. import argparse
  9. import logging
  10. import socket
  11. import random
  12. import multiprocessing
  13. import stem.control
  14. import stem.descriptor.remote
  15. import stem.process
  16. import base64
  17. import binascii
  18. #
  19. logging.getLogger('stem').setLevel(logging.WARNING)
  20. #
  21. def start_client_process(protocol, id_num, finished_queue):
  22. p = multiprocessing.Process(target=run_client, args=(protocol, id_num, finished_queue))
  23. p.start()
  24. return p
  25. #
  26. def run_client(protocol, id_num, finished_queue):
  27. had_error = False
  28. try:
  29. logging.info('Starting protocol (id: {})'.format(id_num))
  30. protocol.run()
  31. logging.info('Done protocol (id: {})'.format(id_num))
  32. except:
  33. had_error = True
  34. logging.warning('Protocol error')
  35. logging.exception('Protocol id: {} had an error'.format(id_num))
  36. finally:
  37. finished_queue.put((id_num, had_error))
  38. #
  39. #
  40. def parse_range(range_str):
  41. return tuple(int(x) for x in range_str.split('-'))
  42. #
  43. def get_socks_port(control_port):
  44. with stem.control.Controller.from_port(port=control_port) as controller:
  45. controller.authenticate()
  46. #
  47. socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS)
  48. #logging.info(socks_addresses)
  49. assert(len(socks_addresses) == 1)
  50. assert(socks_addresses[0][0] == '127.0.0.1')
  51. #
  52. return socks_addresses[0][1]
  53. #
  54. #
  55. def send_measureme(controller, circuit_id, measureme_id, hop):
  56. response = controller.msg('SENDMEASUREME %s ID=%s HOP=%s' % (circuit_id, measureme_id, hop))
  57. stem.response.convert('SINGLELINE', response)
  58. #
  59. if not response.is_ok():
  60. if response.code in ('512', '552'):
  61. if response.message.startswith('Unknown circuit '):
  62. raise stem.InvalidArguments(response.code, response.message, [circuit_id])
  63. #
  64. raise stem.InvalidRequest(response.code, response.message)
  65. else:
  66. raise stem.ProtocolError('MEASUREME returned unexpected response code: %s' % response.code)
  67. #
  68. #
  69. #
  70. def connect_and_send_measureme(control_port, circuit_id, measureme_id, hop):
  71. with stem.control.Controller.from_port(port=control_port) as controller:
  72. controller.authenticate()
  73. send_measureme(controller, circuit_id, measureme_id, hop)
  74. #
  75. #
  76. def send_all_measuremes(controller, circuit_id, measureme_id):
  77. send_measureme(controller, circuit_id, measureme_id, 2)
  78. send_measureme(controller, circuit_id, measureme_id, 0)
  79. #
  80. def push_start_measureme_cb(control_port, circuit_id, measureme_id, wait_until, hops):
  81. logging.info('Sending measuremes to control port {}, then sleeping'.format(control_port))
  82. with stem.control.Controller.from_port(port=control_port) as controller:
  83. controller.authenticate()
  84. for hop in hops:
  85. send_measureme(controller, circuit_id, measureme_id, hop)
  86. #
  87. #
  88. time.sleep(wait_until-time.time())
  89. #
  90. if __name__ == '__main__':
  91. logging.basicConfig(level=logging.DEBUG)
  92. #
  93. parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
  94. parser.add_argument('ip', type=str, help='destination ip address')
  95. parser.add_argument('port', type=int, help='destination port')
  96. parser.add_argument('num_bytes', type=useful.parse_bytes,
  97. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  98. parser.add_argument('num_clients', type=int, help='number of Tor clients to start', metavar='num-clients')
  99. parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
  100. #parser.add_argument('--wait', type=int,
  101. # help='wait until the given time before pushing data (time in seconds since epoch)', metavar='time')
  102. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  103. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  104. parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)')
  105. parser.add_argument('--wait-range', type=int, default=0,
  106. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  107. parser.add_argument('--proxy-control-port-range', type=parse_range, help='range of ports for the control ports')
  108. parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
  109. args = parser.parse_args()
  110. #
  111. endpoint = (args.ip, args.port)
  112. #
  113. streams_per_client = args.num_streams_per_client
  114. #
  115. if args.num_clients > 0:
  116. num_clients = args.num_clients
  117. socks_port_start = 9551
  118. control_port_start = 12001
  119. target_relay = 'BCEDF6C193AA687AE471B8A22EBF6BC57C2D285E' # gurgle
  120. other_relays = list(set(['BC630CBBB518BE7E9F4E09712AB0269E9DC7D626', '18CFB7BA07F13AEABF50A7148786DA68773B2498', 'B771AA877687F88E6F1CA5354756DF6C8A7B6B24', '204DFD2A2C6A0DC1FA0EACB495218E0B661704FD', 'F6740DEABFD5F62612FA025A5079EA72846B1F67', 'A21D24309FD17F57395CDB0B4A3B813AE73FBC5A', 'B204DE75B37064EF6A4C6BAF955C5724578D0B32', '874D84382C892F3F61CC9E106BF08843DE0B865A', '25990FC54D7268C914170A118EE4EE75025451DA', 'B872BA6804C8C6E140AE1897B44CF32B42FD2397', 'B143D439B72D239A419F8DCE07B8A8EB1B486FA7', 'DA4B488C2826DFBBD04D635DA1E71A2BA5B20747', 'D80EA21626BFAE8044E4037FE765252E157E3586']))
  121. # bad relays = ['3F62F05E859D7F98B086F702A31F7714D566E49A', '8EE0534532EA31AA5172B1892F53B2F25C76EB02', '7DC52AE6667A30536BA2383CD102CFC24F20AD71']
  122. # no longer exist = ['DBAD17D706E2B6D5D917C2077961750513BDF879']
  123. #
  124. logging.info('Getting consensus')
  125. #
  126. try:
  127. consensus = stem.descriptor.remote.get_consensus()
  128. #relay_fingerprints = [desc.fingerprint for desc in consensus]
  129. logging.info([desc.exit_policy.can_exit_to(*endpoint) for desc in consensus if desc.fingerprint==target_relay])
  130. except Exception as e:
  131. raise Exception('Unable to retrieve the consensus') from e
  132. #
  133. logging.info('Starting tor processes')
  134. #
  135. clients = []
  136. #
  137. try:
  138. for client_index in range(num_clients):
  139. # start a tor client
  140. #
  141. socks_port = socks_port_start+client_index
  142. control_port = control_port_start+client_index
  143. #
  144. tor_process = stem.process.launch_tor_with_config(
  145. config = {
  146. 'Log': ['notice file /tmp/tor{}/log'.format(client_index), 'notice stdout'],
  147. 'TruncateLogFile': '1',
  148. 'MeasuremeLogFile': '/ramdisk/sengler/real/measureme-{}.log'.format(client_index),
  149. 'SafeLogging': 'relay',
  150. 'SocksPort': str(socks_port),
  151. 'ControlPort': str(control_port),
  152. 'DataDirectory': '/tmp/tor{}'.format(client_index),
  153. }
  154. )
  155. clients.append({'socks_port':socks_port, 'control_port':control_port, 'process':tor_process})
  156. logging.info('Started '+str(client_index))
  157. #
  158. except:
  159. for c in clients:
  160. if 'process' in c:
  161. c['process'].kill()
  162. #
  163. #
  164. raise
  165. #
  166. else:
  167. proxy_control_ports = list(range(args.proxy_control_port_range[0], args.proxy_control_port_range[1]+1))
  168. socks_ports = [get_socks_port(x) for x in proxy_control_ports]
  169. #
  170. logging.info('Getting consensus')
  171. #
  172. try:
  173. consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
  174. #
  175. relay_fingerprints = [desc.fingerprint for desc in consensus]
  176. exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)]
  177. except Exception as e:
  178. raise Exception('Unable to retrieve the consensus') from e
  179. #
  180. print('Num socks ports: {}'.format(len(socks_ports)))
  181. print('Num relays: {}'.format(len(relay_fingerprints)))
  182. print('Num exits: {}'.format(len(exit_fingerprints)))
  183. #
  184. assert(len(relay_fingerprints) >= 2)
  185. assert(len(exit_fingerprints) == 1)
  186. #
  187. target_relay = exit_fingerprints[0]
  188. other_relays = list(set(relay_fingerprints)-set(exit_fingerprints))
  189. #
  190. clients = []
  191. #
  192. for client_index in range(len(proxy_control_ports)):
  193. socks_port = socks_ports[client_index]
  194. control_port = proxy_control_ports[client_index]
  195. #
  196. clients.append({'socks_port':socks_port, 'control_port':control_port})
  197. #
  198. #
  199. ######################################3
  200. #
  201. controllers = []
  202. #
  203. for client_index in range(len(clients)):
  204. # make connections to client control ports
  205. #
  206. connection = stem.control.Controller.from_port(port=clients[client_index]['control_port'])
  207. connection.authenticate()
  208. #
  209. controllers.append({'connection':connection, 'id':client_index})
  210. #
  211. all_circuits_okay = True
  212. #
  213. for controller in controllers:
  214. # for each client, override the circuits for new streams
  215. #
  216. controller['circuits_remaining'] = []
  217. controller['circuit_ids'] = []
  218. controller['circuit_verbose'] = []
  219. #
  220. logging.info('Setting up controller id={}'.format(controller['id']))
  221. #
  222. for y in range(streams_per_client):
  223. circuit_id = None
  224. #
  225. while circuit_id is None:
  226. #circuit = [random.choice(relay_fingerprints), target_relay]
  227. first_relay = random.choice(other_relays)
  228. exit_relay = target_relay
  229. #circuit = [target_relay, exit_relay]
  230. circuit = [first_relay, exit_relay]
  231. #
  232. #if [desc.exit_policy.can_exit_to(*endpoint) for desc in consensus if desc.fingerprint==exit_relay][0] is False:
  233. # logging.info('Relay {} can\'t exit!'.format(exit_relay))
  234. # all_circuits_okay = False
  235. #
  236. try:
  237. circuit_id = controller['connection'].new_circuit(circuit, await_build=True)
  238. logging.info('New circuit (id={}): {}'.format(circuit_id, circuit))
  239. except stem.CircuitExtensionFailed:
  240. logging.info('Failed circuit: {}'.format(circuit))
  241. logging.warning('Circuit creation failed. Retrying...')
  242. #
  243. #
  244. #try:
  245. # circuit_id = controller['connection'].new_circuit(circuit, await_build=True)
  246. #except stem.CircuitExtensionFailed:
  247. # for c in controllers:
  248. # c['connection'].close()
  249. # #
  250. # for c in clients:
  251. # c['process'].kill()
  252. # #
  253. # raise
  254. #
  255. controller['circuits_remaining'].append(circuit_id)
  256. controller['circuit_ids'].append(circuit_id)
  257. controller['circuit_verbose'].append(circuit)
  258. time.sleep(0.5)#1.5
  259. #
  260. def attach_stream(stream, controller):
  261. try:
  262. if stream.status == 'NEW':
  263. # by default, let tor handle new streams
  264. circuit_id = 0
  265. #
  266. if stream.purpose == 'USER':
  267. # this is probably one of our streams (although not guaranteed)
  268. circuit_id = controller['circuits_remaining'][0]
  269. controller['circuits_remaining'] = controller['circuits_remaining'][1:]
  270. #
  271. try:
  272. controller['connection'].attach_stream(stream.id, circuit_id)
  273. #logging.info('Attaching to circuit {}'.format(circuit_id))
  274. except stem.UnsatisfiableRequest:
  275. if stream.purpose != 'USER':
  276. # could not attach a non-user stream, so probably raised:
  277. # stem.UnsatisfiableRequest: Connection is not managed by controller.
  278. # therefore we should ignore this exception
  279. pass
  280. else:
  281. raise
  282. #
  283. #
  284. #
  285. except:
  286. logging.exception('Error while attaching the stream (controller_id={}, circuit_id={}).'.format(controller['id'], circuit_id))
  287. raise
  288. #
  289. #
  290. controller['connection'].add_event_listener(lambda x, controller=controller: attach_stream(x, controller),
  291. stem.control.EventType.STREAM)
  292. controller['connection'].set_conf('__LeaveStreamsUnattached', '1')
  293. #
  294. '''
  295. if not all_circuits_okay:
  296. for c in clients:
  297. if 'process' in c:
  298. c['process'].kill()
  299. #
  300. #
  301. raise Exception('Not all circuits can exit! Stopping...')
  302. #
  303. '''
  304. processes = {}
  305. circuits = {}
  306. process_counter = 0
  307. finished_processes = multiprocessing.Queue()
  308. #
  309. logging.info('Starting protocols')
  310. #
  311. wait_time = int(time.time()+30)
  312. #
  313. for stream_index in range(streams_per_client):
  314. for client_index in range(len(clients)):
  315. client_socket = socket.socket()
  316. protocols = []
  317. #
  318. proxy_username = bytes([z for z in os.urandom(12) if z != 0])
  319. proxy_endpoint = ('127.0.0.1', clients[client_index]['socks_port'])
  320. #
  321. logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), proxy_endpoint)
  322. client_socket.connect(proxy_endpoint)
  323. logging.debug('Socket %d connected', client_socket.fileno())
  324. #
  325. proxy_protocol = basic_protocols.Socks4Protocol(client_socket, endpoint, username=proxy_username)
  326. protocols.append(proxy_protocol)
  327. #
  328. wait_until = wait_time+random.randint(0, args.wait_range)
  329. #
  330. if args.measureme:
  331. control_port = clients[client_index]['control_port']
  332. controller = controllers[client_index]['connection']
  333. circuit_id = controllers[client_index]['circuit_ids'][stream_index]
  334. measureme_id = stream_index*streams_per_client + client_index + 1 # this is not calculated correctly, leaving as-is for now
  335. raise Exception('The above line is broken (measureme_id not set properly)!!!')
  336. #print('Data: {}, {}'.format(circuit_id, measureme_id))
  337. #measureme_cb = lambda control_port=control_port, circuit_id=circuit_id, measureme_id=measureme_id: connect_and_send_measureme(control_port, circuit_id, measureme_id, 2) <---- need to change this to also send to hop 0
  338. #measureme_cb = lambda controller=controller, circuit_id=circuit_id, measureme_id=measureme_id: send_all_measuremes(controller, circuit_id, measureme_id)
  339. #measureme_cb()
  340. if args.num_clients==0:
  341. # using Chutney
  342. hops = [2, 1, 0]
  343. else:
  344. hops = [0]
  345. #
  346. start_cb = lambda control_port=control_port, circuit_id=circuit_id, measureme_id=measureme_id, \
  347. until=wait_until, hops=hops: \
  348. push_start_measureme_cb(control_port, circuit_id, measureme_id, until, hops)
  349. circuit_bytes = (controllers[client_index]['circuit_verbose'][stream_index][0]+':'+str(measureme_id)).encode('utf-8')
  350. else:
  351. start_cb = lambda until=wait_until: time.sleep(until-time.time())
  352. circuit_bytes = (controllers[client_index]['circuit_verbose'][stream_index][0]).encode('utf-8')
  353. #
  354. throughput_protocol = throughput_protocols.ClientProtocol(client_socket, args.num_bytes,
  355. # wait_until=wait_time+random.randint(0, args.wait_range),
  356. group_id=wait_time,
  357. custom_data=circuit_bytes,
  358. send_buffer_len=args.buffer_len,
  359. use_acceleration=(not args.no_accel),
  360. push_start_cb=start_cb)
  361. protocols.append(throughput_protocol)
  362. #
  363. combined_protocol = basic_protocols.ChainedProtocol(protocols)
  364. processes[process_counter] = start_client_process(combined_protocol, process_counter, finished_processes)
  365. circuits[process_counter] = controllers[client_index]['circuit_verbose'][stream_index]
  366. process_counter += 1
  367. client_socket.close()
  368. #
  369. time.sleep(0.01)
  370. #
  371. #
  372. if wait_time is not None:
  373. logging.info('Starting in {:.2f} seconds'.format(wait_time-time.time()))
  374. #
  375. try:
  376. while len(processes) > 0:
  377. logging.info('Waiting for processes ({} left)'.format(len(processes)))
  378. (p_id, error) = finished_processes.get()
  379. p = processes[p_id]
  380. p.join()
  381. processes.pop(p_id)
  382. if error:
  383. logging.info('Circuit with error: '+str(circuits[p_id]))
  384. #
  385. circuits.pop(p_id)
  386. #
  387. except KeyboardInterrupt as e:
  388. print()
  389. for p_id in processes:
  390. processes[p_id].terminate()
  391. #
  392. #
  393. logging.info('Processes finished')
  394. #
  395. for c in controllers:
  396. c['connection'].close()
  397. #
  398. for c in clients:
  399. if 'process' in c:
  400. c['process'].terminate()
  401. #
  402. #
  403. #