experiment.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. #!/usr/bin/python3
  2. #
  3. import stem.control
  4. import stem.descriptor.remote
  5. import stem.process
  6. import socket
  7. import logging
  8. import multiprocessing
  9. import queue
  10. import random
  11. import time
  12. import json
  13. import os
  14. #
  15. import basic_protocols
  16. import throughput_protocols
  17. import useful
  18. #
  19. def get_socks_port(control_port):
  20. with stem.control.Controller.from_port(port=control_port) as controller:
  21. controller.authenticate()
  22. #
  23. socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS)
  24. assert(len(socks_addresses) == 1)
  25. assert(socks_addresses[0][0] == '127.0.0.1')
  26. #
  27. return socks_addresses[0][1]
  28. #
  29. #
  30. def wait_then_sleep(event, duration):
  31. event.wait()
  32. time.sleep(duration)
  33. #
  34. def send_measureme(stem_controller, circuit_id, measureme_id, hop):
  35. response = stem_controller.msg('SENDMEASUREME %s ID=%s HOP=%s' % (circuit_id, measureme_id, hop))
  36. stem.response.convert('SINGLELINE', response)
  37. #
  38. if not response.is_ok():
  39. if response.code in ('512', '552'):
  40. if response.message.startswith('Unknown circuit '):
  41. raise stem.InvalidArguments(response.code, response.message, [circuit_id])
  42. #
  43. raise stem.InvalidRequest(response.code, response.message)
  44. else:
  45. raise stem.ProtocolError('MEASUREME returned unexpected response code: %s' % response.code)
  46. #
  47. #
  48. #
  49. def send_measureme_cells(control_address, circuit_id, measureme_id, hops):
  50. logging.debug('Sending measuremes to control address {}, then sleeping'.format(control_address))
  51. with stem.control.Controller.from_port(address=control_address[0], port=control_address[1]) as controller:
  52. controller.authenticate()
  53. for hop in hops:
  54. send_measureme(controller, circuit_id, measureme_id, hop)
  55. #
  56. #
  57. #
  58. def send_measureme_cells_and_wait(control_port, circuit_id, measureme_id, hops, wait_event, wait_offset):
  59. send_measureme_cells(control_port, circuit_id, measureme_id, hops)
  60. wait_then_sleep(wait_event, wait_offset)
  61. #
  62. def get_fingerprints(consensus):
  63. """
  64. Get the fingerprints of all relays.
  65. """
  66. #
  67. return [desc.fingerprint for desc in consensus]
  68. #
  69. def get_exit_fingerprints(consensus, endpoint):
  70. """
  71. Get the fingerprints of relays that can exit to the endpoint.
  72. """
  73. #
  74. return [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)]
  75. #
  76. class ExperimentController:
  77. def __init__(self, control_address):
  78. self.control_address = control_address
  79. self.connection = None
  80. self.circuits = {}
  81. self.unassigned_circuit_ids = []
  82. self.assigned_streams = {}
  83. #
  84. def connect(self):
  85. self.connection = stem.control.Controller.from_port(address=self.control_address[0], port=self.control_address[1])
  86. self.connection.authenticate()
  87. #
  88. self.connection.add_event_listener(self._attach_stream, stem.control.EventType.STREAM)
  89. self.connection.set_conf('__LeaveStreamsUnattached', '1')
  90. #
  91. def disconnect(self):
  92. #if len(self.unused_circuit_ids) > 0:
  93. # logging.warning('Closed stem controller before all circuits were used')
  94. #
  95. self.connection.close()
  96. #
  97. def assign_stream(self, from_address):
  98. circuit_id = self.unassigned_circuit_ids.pop(0)
  99. self.assigned_streams[from_address] = circuit_id
  100. return circuit_id
  101. #
  102. def _attach_stream(self, stream):
  103. try:
  104. if stream.status == 'NEW':
  105. # by default, let tor handle new streams
  106. circuit_id = 0
  107. #
  108. if stream.purpose == 'USER':
  109. # this is probably one of our streams (although not guaranteed)
  110. circuit_id = self.assigned_streams[(stream.source_address, stream.source_port)]
  111. #
  112. try:
  113. self.connection.attach_stream(stream.id, circuit_id)
  114. #logging.debug('Attaching to circuit {}'.format(circuit_id))
  115. except stem.UnsatisfiableRequest:
  116. if stream.purpose != 'USER':
  117. # could not attach a non-user stream, so probably raised:
  118. # stem.UnsatisfiableRequest: Connection is not managed by controller.
  119. # therefore we should ignore this exception
  120. pass
  121. else:
  122. raise
  123. #
  124. #
  125. #
  126. except:
  127. logging.exception('Error while attaching the stream (control_port={}, circuit_id={}).'.format(self.control_port, circuit_id))
  128. raise
  129. #
  130. #
  131. def build_circuit(self, circuit_generator):
  132. circuit_id = None
  133. #
  134. while circuit_id is None:
  135. try:
  136. circuit = circuit_generator()
  137. circuit_id = self.connection.new_circuit(circuit, await_build=True)
  138. logging.debug('New circuit (id={}): {}'.format(circuit_id, circuit))
  139. except stem.CircuitExtensionFailed:
  140. logging.debug('Failed circuit: {}'.format(circuit))
  141. logging.warning('Circuit creation failed. Retrying...')
  142. #
  143. #
  144. self.unassigned_circuit_ids.append(circuit_id)
  145. self.circuits[circuit_id] = circuit
  146. #
  147. #
  148. class ExperimentProtocol(basic_protocols.ChainedProtocol):
  149. def __init__(self, socket, endpoint, num_bytes, custom_data=None, send_buffer_len=None, push_start_cb=None):
  150. proxy_username = bytes([z for z in os.urandom(12) if z != 0])
  151. proxy_protocol = basic_protocols.Socks4Protocol(socket, endpoint, username=proxy_username)
  152. #
  153. throughput_protocol = throughput_protocols.ClientProtocol(socket, num_bytes,
  154. custom_data=custom_data,
  155. send_buffer_len=send_buffer_len,
  156. use_acceleration=True,
  157. push_start_cb=push_start_cb)
  158. #
  159. super().__init__([proxy_protocol, throughput_protocol])
  160. #
  161. #
  162. class ExperimentProtocolManager():
  163. def __init__(self):
  164. self.stopped = False
  165. self.process_counter = 0
  166. self.used_ids = []
  167. self.running_processes = {}
  168. self.global_finished_process_queue = multiprocessing.Queue()
  169. self.local_finished_process_queue = queue.Queue()
  170. self.queue_getter = useful.QueueGetter(self.global_finished_process_queue,
  171. self.local_finished_process_queue.put)
  172. #
  173. def _run_client(self, protocol, protocol_id):
  174. had_error = False
  175. try:
  176. logging.debug('Starting protocol (id: {})'.format(protocol_id))
  177. protocol.run()
  178. logging.debug('Done protocol (id: {})'.format(protocol_id))
  179. except:
  180. had_error = True
  181. logging.warning('Protocol error')
  182. logging.exception('Protocol id: {} had an error'.format(protocol_id))
  183. finally:
  184. self.global_finished_process_queue.put((protocol_id, had_error))
  185. #
  186. #
  187. def start_experiment_protocol(self, protocol, protocol_id=None):
  188. if protocol_id is None:
  189. protocol_id = self.process_counter
  190. #
  191. assert not self.stopped
  192. assert protocol_id not in self.used_ids, 'Protocol ID already used'
  193. #
  194. p = multiprocessing.Process(target=self._run_client, args=(protocol, protocol_id))
  195. self.running_processes[protocol_id] = p
  196. self.used_ids.append(protocol_id)
  197. #
  198. p.start()
  199. self.process_counter += 1
  200. #
  201. #protocol.socket.close()
  202. #
  203. def wait(self, finished_protocol_cb=None):
  204. while len(self.running_processes) > 0:
  205. logging.debug('Waiting for processes ({} left)'.format(len(self.running_processes)))
  206. #
  207. (protocol_id, had_error) = self.local_finished_process_queue.get()
  208. p = self.running_processes[protocol_id]
  209. p.join()
  210. self.running_processes.pop(protocol_id)
  211. finished_protocol_cb(protocol_id, had_error)
  212. #
  213. #
  214. def stop(self):
  215. self.wait()
  216. self.queue_getter.stop()
  217. self.queue_getter.join()
  218. self.stopped = True
  219. #
  220. #
  221. def build_client_protocol(endpoint, socks_address, control_address, controller, wait_duration=0, measureme_id=None, num_bytes=None, buffer_len=None):
  222. client_socket = socket.socket()
  223. #
  224. logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), socks_address)
  225. client_socket.connect(socks_address)
  226. logging.debug('Socket %d connected', client_socket.fileno())
  227. #
  228. custom_data = {}
  229. #
  230. circuit_id = controller.assign_stream(client_socket.getsockname())
  231. custom_data['circuit'] = (circuit_id, controller.circuits[circuit_id])
  232. #
  233. if measureme_id is not None:
  234. custom_data['measureme_id'] = measureme_id
  235. #
  236. hops = list(range(len(controller.circuits[circuit_id])+1))[::-1]
  237. # send the measureme cells to the last relay first
  238. start_cb = lambda control_address=control_address, circuit_id=circuit_id, measureme_id=measureme_id, \
  239. hops=hops, event=start_event, wait_duration=wait_duration: \
  240. send_measureme_cells_and_wait(control_address, circuit_id, measureme_id, hops, event, wait_duration)
  241. else:
  242. start_cb = lambda event=start_event, duration=wait_duration: wait_then_sleep(event, duration)
  243. #
  244. custom_data = json.dumps(custom_data).encode('utf-8')
  245. protocol = ExperimentProtocol(client_socket, endpoint, args.num_bytes,
  246. custom_data=custom_data,
  247. send_buffer_len=args.buffer_len,
  248. push_start_cb=start_cb)
  249. return protocol
  250. #
  251. if __name__ == '__main__':
  252. import argparse
  253. #
  254. logging.basicConfig(level=logging.DEBUG)
  255. logging.getLogger('stem').setLevel(logging.WARNING)
  256. #
  257. parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
  258. parser.add_argument('ip', type=str, help='destination ip address')
  259. parser.add_argument('port', type=int, help='destination port')
  260. parser.add_argument('num_bytes', type=useful.parse_bytes,
  261. help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  262. parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
  263. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  264. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  265. parser.add_argument('--wait-range', type=int, default=0,
  266. help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
  267. parser.add_argument('--proxy-control-ports', type=useful.parse_range_list, help='range of ports for the control ports', metavar='control-ports')
  268. parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
  269. args = parser.parse_args()
  270. #
  271. endpoint = (args.ip, args.port)
  272. #
  273. logging.debug('Getting consensus')
  274. try:
  275. consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
  276. except Exception as e:
  277. raise Exception('Unable to retrieve the consensus') from e
  278. #
  279. fingerprints = get_fingerprints(consensus)
  280. exit_fingerprints = get_exit_fingerprints(consensus, endpoint)
  281. non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints))
  282. #
  283. assert len(exit_fingerprints) == 1, 'Need exactly one exit relay'
  284. assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay'
  285. #
  286. circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]]
  287. #
  288. proxy_addresses = []
  289. for control_port in args.proxy_control_ports:
  290. proxy = {}
  291. proxy['control'] = ('127.0.0.1', control_port)
  292. proxy['socks'] = ('127.0.0.1', get_socks_port(control_port))
  293. proxy_addresses.append(proxy)
  294. #
  295. controllers = []
  296. protocol_manager = ExperimentProtocolManager()
  297. #
  298. try:
  299. for proxy_address in proxy_addresses:
  300. controller = ExperimentController(proxy_address['control'])
  301. controller.connect()
  302. # the controller has to attach new streams to circuits, so the
  303. # connection has to stay open until we're done creating streams
  304. #
  305. for _ in range(args.num_streams_per_client):
  306. # make a circuit for each stream
  307. controller.build_circuit(circuit_generator)
  308. time.sleep(0.5)
  309. #
  310. controllers.append(controller)
  311. #
  312. start_event = multiprocessing.Event()
  313. #
  314. for stream_index in range(args.num_streams_per_client):
  315. for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers):
  316. '''
  317. client_socket = socket.socket()
  318. #
  319. logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), proxy_address['socks'])
  320. client_socket.connect(proxy_address['socks'])
  321. logging.debug('Socket %d connected', client_socket.fileno())
  322. #
  323. wait_offset = random.randint(0, args.wait_range)
  324. custom_data = {}
  325. #
  326. circuit_id = controller.assign_stream(client_socket.getsockname())
  327. custom_data['circuit'] = (circuit_id, controller.circuits[circuit_id])
  328. #
  329. if args.measureme:
  330. measureme_id = stream_index*args.num_streams_per_client + controllers.index(controller) + 1
  331. custom_data['measureme_id'] = measureme_id
  332. #
  333. hops = list(range(len(controller.circuits[circuit_id])+1))[::-1]
  334. # send the measureme cells to the last relay first
  335. start_cb = lambda control_address=proxy_address['control'], circuit_id=circuit_id, measureme_id=measureme_id, \
  336. hops=hops, event=start_event, wait_offset=wait_offset: \
  337. send_measureme_cells_and_wait(control_address, circuit_id, measureme_id, hops, event, wait_offset)
  338. else:
  339. start_cb = lambda event=start_event, duration=wait_offset: wait_then_sleep(event, duration)
  340. #
  341. custom_data = json.dumps(custom_data).encode('utf-8')
  342. protocol = ExperimentProtocol(client_socket, endpoint, args.num_bytes,
  343. custom_data=custom_data,
  344. send_buffer_len=args.buffer_len,
  345. push_start_cb=start_cb)
  346. #
  347. '''
  348. if args.measureme:
  349. measureme_id = stream_index*args.num_streams_per_client + controller_index + 1
  350. else:
  351. measureme_id = None
  352. #
  353. wait_duration = random.randint(0, args.wait_range)
  354. protocol = build_client_protocol(endpoint, proxy_address['socks'], proxy_address['control'], controller,
  355. wait_duration=wait_duration, measureme_id=measureme_id,
  356. num_bytes=args.num_bytes, buffer_len=args.buffer_len)
  357. protocol_manager.start_experiment_protocol(protocol, protocol_id=None)
  358. #
  359. #
  360. time.sleep(2)
  361. start_event.set()
  362. #
  363. protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error)))
  364. finally:
  365. for controller in controllers:
  366. controller.disconnect()
  367. #
  368. protocol_manager.stop()
  369. #
  370. #