stress_tester.py 9.0 KB


  1. #!/usr/bin/python3
  2. #
  3. import throughput_protocols
  4. import basic_protocols
  5. import useful
  6. import time
  7. import os
  8. import argparse
  9. import logging
  10. import socket
  11. import random
  12. import multiprocessing
  13. import stem.control
  14. import stem.descriptor.remote
  15. import base64
  16. import binascii
  17. #
  18. logging.getLogger('stem').setLevel(logging.WARNING)
  19. #
  20. def start_client_process(protocol, id_num, finished_queue):
  21. p = multiprocessing.Process(target=run_client, args=(protocol, id_num, finished_queue))
  22. p.start()
  23. return p
  24. #
  25. def run_client(protocol, id_num, finished_queue):
  26. try:
  27. print('Starting protocol (id: {})'.format(id_num))
  28. protocol.run()
  29. print('Done protocol (id: {})'.format(id_num))
  30. finally:
  31. finished_queue.put(id_num)
  32. #
  33. #
  34. def parse_range(range_str):
  35. return tuple(int(x) for x in range_str.split('-'))
  36. #
  37. def get_socks_ports(control_ports):
  38. ports = []
  39. #
  40. for x in control_ports:
  41. #print(x)
  42. with stem.control.Controller.from_port(port=x) as controller:
  43. controller.authenticate()
  44. #
  45. socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS)
  46. #print(socks_addresses)
  47. assert(len(socks_addresses) == 1)
  48. assert(socks_addresses[0][0] == '127.0.0.1')
  49. #
  50. ports.append(socks_addresses[0][1])
  51. #
  52. #
  53. return ports
  54. #
  55. if __name__ == '__main__':
  56. logging.basicConfig(level=logging.DEBUG)
  57. #
  58. parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
  59. parser.add_argument('ip', type=str, help='destination ip address')
  60. parser.add_argument('port', type=int, help='destination port')
  61. parser.add_argument('num_bytes', type=useful.parse_bytes,
  62. help='number of bytes to send (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
  63. parser.add_argument('proxy_control_port_range', type=parse_range, help='range of ports for the control ports')
  64. #parser.add_argument('--proxy', type=str, help='proxy ip address and port', metavar=('ip','port'), nargs=2)
  65. #parser.add_argument('--fake-proxy', action='store_true', help='connecting to a fake-tor proxy')
  66. parser.add_argument('--wait', type=int,
  67. help='wait until the given time before pushing data (time in seconds since epoch)', metavar='time')
  68. parser.add_argument('--buffer-len', type=useful.parse_bytes,
  69. help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
  70. parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)')
  71. args = parser.parse_args()
  72. #
  73. endpoint = (args.ip, args.port)
  74. proxy_control_ports = list(range(args.proxy_control_port_range[0], args.proxy_control_port_range[1]+1))
  75. #
  76. streams_per_client = 3
  77. #
  78. socks_ports = get_socks_ports(proxy_control_ports)
  79. #
  80. try:
  81. consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
  82. #
  83. relay_fingerprints = [desc.fingerprint for desc in consensus]
  84. exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)]
  85. except Exception as e:
  86. raise Exception('Unable to retrieve the consensus') from e
  87. #
  88. print('Num socks ports: {}'.format(len(socks_ports)))
  89. print('Num relays: {}'.format(len(relay_fingerprints)))
  90. print('Num exits: {}'.format(len(exit_fingerprints)))
  91. #
  92. assert(len(relay_fingerprints) >= len(socks_ports)*3+1)
  93. assert(len(exit_fingerprints) >= len(socks_ports)+1)
  94. #
  95. remaining_relays = list(relay_fingerprints)
  96. #
  97. target_relay = exit_fingerprints[0]
  98. remaining_relays = list(set(remaining_relays)-set([target_relay]))
  99. exit_relays = exit_fingerprints[1:1+len(socks_ports)]
  100. remaining_relays = list(set(remaining_relays)-set(exit_fingerprints))
  101. guard_relays = remaining_relays[:len(socks_ports)]
  102. remaining_relays = list(set(remaining_relays)-set(guard_relays))
  103. middle_relays = remaining_relays[:len(socks_ports)]
  104. remaining_relays = list(set(remaining_relays)-set(middle_relays))
  105. #
  106. exit_relays = list(exit_relays)
  107. guard_relays = list(guard_relays)
  108. #
  109. controllers = []
  110. #
  111. controller_circuits = {}
  112. fraction_middle = 1
  113. #
  114. for x in range(len(proxy_control_ports)):
  115. #with stem.control.Controller.from_port(port=x) as controller:
  116. controller = stem.control.Controller.from_port(port=proxy_control_ports[x])
  117. controller.authenticate()
  118. #
  119. controller_circuits[controller] = []
  120. for y in range(streams_per_client):
  121. if (x*streams_per_client+y)/(len(proxy_control_ports)*streams_per_client+y) < fraction_middle:
  122. circuit = [random.choice(guard_relays), target_relay, random.choice(exit_relays)]
  123. else:
  124. circuit = [random.choice(guard_relays), random.choice(middle_relays), target_relay]
  125. #
  126. #circuit = [random.choice(guard_relays), random.choice(middle_relays), random.choice(exit_relays)]
  127. #circuit = [middle_relay, random.choice(exit_relays), random.choice(guard_relays)]
  128. #circuit = [random.choice(exit_relays), random.choice(guard_relays), middle_relay]
  129. print('New circuit #{}'.format(y))
  130. print(circuit)
  131. circuit_id = controller.new_circuit(circuit, await_build=True)
  132. controller_circuits[controller].append(circuit_id)
  133. #
  134. def attach_stream(stream, controller):
  135. #print(stream)
  136. #print(controller)
  137. #print(circuit_id)
  138. if stream.status == 'NEW' and stream.purpose == 'USER':
  139. print('Attaching (num_remaining={})'.format(len(controller_circuits[controller])-1))
  140. #controller.attach_stream(stream.id, circuit_id)
  141. controller.attach_stream(stream.id, controller_circuits[controller][0])
  142. controller_circuits[controller] = controller_circuits[controller][1:]
  143. #
  144. #
  145. controller.add_event_listener(lambda x, controller=controller: attach_stream(x, controller), stem.control.EventType.STREAM)
  146. controller.set_conf('__LeaveStreamsUnattached', '1')
  147. controllers.append(controller)
  148. #
  149. #
  150. processes = {}
  151. process_counter = 0
  152. finished_processes = multiprocessing.Queue()
  153. #
  154. for y in range(streams_per_client):
  155. for x in socks_ports:
  156. client_socket = socket.socket()
  157. protocols = []
  158. #
  159. proxy_username = bytes([z for z in os.urandom(12) if z != 0])
  160. proxy_endpoint = ('127.0.0.1', x)
  161. #
  162. logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), proxy_endpoint)
  163. client_socket.connect(proxy_endpoint)
  164. logging.debug('Socket %d connected', client_socket.fileno())
  165. #
  166. proxy_protocol = basic_protocols.Socks4Protocol(client_socket, endpoint, username=proxy_username)
  167. protocols.append(proxy_protocol)
  168. #
  169. throughput_protocol = throughput_protocols.ClientProtocol(client_socket, args.num_bytes,
  170. wait_until=args.wait,
  171. send_buffer_len=args.buffer_len,
  172. use_acceleration=(not args.no_accel))
  173. protocols.append(throughput_protocol)
  174. #
  175. combined_protocol = basic_protocols.ChainedProtocol(protocols)
  176. processes[process_counter] = start_client_process(combined_protocol, process_counter, finished_processes)
  177. process_counter += 1
  178. client_socket.close()
  179. #
  180. time.sleep(0.01)
  181. #
  182. #
  183. print('Starting in {:.2f} seconds'.format(args.wait-time.time()))
  184. #
  185. try:
  186. while len(processes) > 0:
  187. print('Waiting for processes ({} left)'.format(len(processes)))
  188. p_id = finished_processes.get()
  189. p = processes[p_id]
  190. p.join()
  191. processes.pop(p_id)
  192. #
  193. except KeyboardInterrupt as e:
  194. print()
  195. for p_id in processes:
  196. processes[p_id].terminate()
  197. #
  198. #
  199. print('Processes finished')
  200. #
  201. for c in controllers:
  202. c.close()
  203. #
  204. #
  205. # old code, keeping just in case
  206. '''
  207. with stem.control.Controller.from_port(port=proxy_control_ports[0]) as controller:
  208. controller.authenticate()
  209. #print(controller.get_version())
  210. #print(stem.version.Requirement.GETINFO_MICRODESCRIPTORS)
  211. #print(controller.get_version() >= stem.version.Requirement.GETINFO_MICRODESCRIPTORS)
  212. #print('-------')
  213. #print([x.exit_policy for x in controller.get_network_statuses()])
  214. relay_fingerprints = list(set([desc.fingerprint for desc in controller.get_network_statuses()]))
  215. #print(relay_fingerprints)
  216. relay_digest_map = {desc.digest: desc.fingerprint for desc in controller.get_network_statuses()}
  217. print(relay_digest_map)
  218. relay_exit_digests = list(set([desc.digest for desc in controller.get_microdescriptors() if desc.exit_policy.can_exit_to(*endpoint)]))
  219. #print(relay_exit_digests)
  220. print([desc.microdescriptor_digest for desc in controller.get_microdescriptors() if desc.exit_policy.can_exit_to(*endpoint)])
  221. print([binascii.hexlify(base64.b64decode(digest()+'===')).decode('utf-8').upper()[:40] for digest in relay_exit_digests])
  222. relay_exits = list(set([relay_digest_map[binascii.hexlify(base64.b64decode(digest()+'===')).decode('utf-8').upper()[:40]] for digest in relay_exit_digests]))
  223. #print(relay_exits)
  224. #
  225. #print(dir(list(controller.get_network_statuses())[0]))
  226. #print(type(list(controller.get_network_statuses())[0]))
  227. #print([desc for desc in controller.get_microdescriptors()])
  228. #print([desc.exit_policy for desc in controller.get_microdescriptors()])
  229. #print([desc.exit_policy.can_exit_to(*endpoint) for desc in controller.get_microdescriptors()])
  230. #print([desc.fingerprint for desc in controller.get_microdescriptors()])
  231. #print([desc.flags for desc in controller.get_microdescriptors()])
  232. #
  233. '''