123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237 |
- #!/usr/bin/python3
- #
- import throughput_protocols
- import basic_protocols
- import useful
- import time
- import os
- import argparse
- import logging
- import socket
- import random
- import multiprocessing
- import stem.control
- import stem.descriptor.remote
- import base64
- import binascii
- #
- logging.getLogger('stem').setLevel(logging.WARNING)
- #
- def start_client_process(protocol, id_num, finished_queue):
- p = multiprocessing.Process(target=run_client, args=(protocol, id_num, finished_queue))
- p.start()
- return p
- #
- def run_client(protocol, id_num, finished_queue):
- try:
- print('Starting protocol (id: {})'.format(id_num))
- protocol.run()
- print('Done protocol (id: {})'.format(id_num))
- finally:
- finished_queue.put(id_num)
- #
- #
- def parse_range(range_str):
- return tuple(int(x) for x in range_str.split('-'))
- #
- def get_socks_ports(control_ports):
- ports = []
- #
- for x in control_ports:
- #print(x)
- with stem.control.Controller.from_port(port=x) as controller:
- controller.authenticate()
- #
- socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS)
- #print(socks_addresses)
- assert(len(socks_addresses) == 1)
- assert(socks_addresses[0][0] == '127.0.0.1')
- #
- ports.append(socks_addresses[0][1])
- #
- #
- return ports
- #
- if __name__ == '__main__':
- logging.basicConfig(level=logging.DEBUG)
- #
- parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
- parser.add_argument('ip', type=str, help='destination ip address')
- parser.add_argument('port', type=int, help='destination port')
- parser.add_argument('num_bytes', type=useful.parse_bytes,
- help='number of bytes to send (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
- parser.add_argument('proxy_control_port_range', type=parse_range, help='range of ports for the control ports')
- #parser.add_argument('--proxy', type=str, help='proxy ip address and port', metavar=('ip','port'), nargs=2)
- #parser.add_argument('--fake-proxy', action='store_true', help='connecting to a fake-tor proxy')
- parser.add_argument('--wait', type=int,
- help='wait until the given time before pushing data (time in seconds since epoch)', metavar='time')
- parser.add_argument('--buffer-len', type=useful.parse_bytes,
- help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
- parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)')
- args = parser.parse_args()
- #
- endpoint = (args.ip, args.port)
- proxy_control_ports = list(range(args.proxy_control_port_range[0], args.proxy_control_port_range[1]+1))
- #
- streams_per_client = 3
- #
- socks_ports = get_socks_ports(proxy_control_ports)
- #
- try:
- consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
- #
- relay_fingerprints = [desc.fingerprint for desc in consensus]
- exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)]
- except Exception as e:
- raise Exception('Unable to retrieve the consensus') from e
- #
- print('Num socks ports: {}'.format(len(socks_ports)))
- print('Num relays: {}'.format(len(relay_fingerprints)))
- print('Num exits: {}'.format(len(exit_fingerprints)))
- #
- assert(len(relay_fingerprints) >= len(socks_ports)*3+1)
- assert(len(exit_fingerprints) >= len(socks_ports)+1)
- #
- remaining_relays = list(relay_fingerprints)
- #
- target_relay = exit_fingerprints[0]
- remaining_relays = list(set(remaining_relays)-set([target_relay]))
- exit_relays = exit_fingerprints[1:1+len(socks_ports)]
- remaining_relays = list(set(remaining_relays)-set(exit_fingerprints))
- guard_relays = remaining_relays[:len(socks_ports)]
- remaining_relays = list(set(remaining_relays)-set(guard_relays))
- middle_relays = remaining_relays[:len(socks_ports)]
- remaining_relays = list(set(remaining_relays)-set(middle_relays))
- #
- exit_relays = list(exit_relays)
- guard_relays = list(guard_relays)
- #
- controllers = []
- #
- controller_circuits = {}
- fraction_middle = 1
- #
- for x in range(len(proxy_control_ports)):
- #with stem.control.Controller.from_port(port=x) as controller:
- controller = stem.control.Controller.from_port(port=proxy_control_ports[x])
- controller.authenticate()
- #
- controller_circuits[controller] = []
- for y in range(streams_per_client):
- if (x*streams_per_client+y)/(len(proxy_control_ports)*streams_per_client+y) < fraction_middle:
- circuit = [random.choice(guard_relays), target_relay, random.choice(exit_relays)]
- else:
- circuit = [random.choice(guard_relays), random.choice(middle_relays), target_relay]
- #
- #circuit = [random.choice(guard_relays), random.choice(middle_relays), random.choice(exit_relays)]
- #circuit = [middle_relay, random.choice(exit_relays), random.choice(guard_relays)]
- #circuit = [random.choice(exit_relays), random.choice(guard_relays), middle_relay]
- print('New circuit #{}'.format(y))
- print(circuit)
- circuit_id = controller.new_circuit(circuit, await_build=True)
- controller_circuits[controller].append(circuit_id)
- #
- def attach_stream(stream, controller):
- #print(stream)
- #print(controller)
- #print(circuit_id)
- if stream.status == 'NEW' and stream.purpose == 'USER':
- print('Attaching (num_remaining={})'.format(len(controller_circuits[controller])-1))
- #controller.attach_stream(stream.id, circuit_id)
- controller.attach_stream(stream.id, controller_circuits[controller][0])
- controller_circuits[controller] = controller_circuits[controller][1:]
- #
- #
- controller.add_event_listener(lambda x, controller=controller: attach_stream(x, controller), stem.control.EventType.STREAM)
- controller.set_conf('__LeaveStreamsUnattached', '1')
- controllers.append(controller)
- #
- #
- processes = {}
- process_counter = 0
- finished_processes = multiprocessing.Queue()
- #
- for y in range(streams_per_client):
- for x in socks_ports:
- client_socket = socket.socket()
- protocols = []
- #
- proxy_username = bytes([z for z in os.urandom(12) if z != 0])
- proxy_endpoint = ('127.0.0.1', x)
- #
- logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), proxy_endpoint)
- client_socket.connect(proxy_endpoint)
- logging.debug('Socket %d connected', client_socket.fileno())
- #
- proxy_protocol = basic_protocols.Socks4Protocol(client_socket, endpoint, username=proxy_username)
- protocols.append(proxy_protocol)
- #
- throughput_protocol = throughput_protocols.ClientProtocol(client_socket, args.num_bytes,
- wait_until=args.wait,
- send_buffer_len=args.buffer_len,
- use_acceleration=(not args.no_accel))
- protocols.append(throughput_protocol)
- #
- combined_protocol = basic_protocols.ChainedProtocol(protocols)
- processes[process_counter] = start_client_process(combined_protocol, process_counter, finished_processes)
- process_counter += 1
- client_socket.close()
- #
- time.sleep(0.01)
- #
- #
- print('Starting in {:.2f} seconds'.format(args.wait-time.time()))
- #
- try:
- while len(processes) > 0:
- print('Waiting for processes ({} left)'.format(len(processes)))
- p_id = finished_processes.get()
- p = processes[p_id]
- p.join()
- processes.pop(p_id)
- #
- except KeyboardInterrupt as e:
- print()
- for p_id in processes:
- processes[p_id].terminate()
- #
- #
- print('Processes finished')
- #
- for c in controllers:
- c.close()
- #
- #
- # old code, keeping just in case
- '''
- with stem.control.Controller.from_port(port=proxy_control_ports[0]) as controller:
- controller.authenticate()
- #print(controller.get_version())
- #print(stem.version.Requirement.GETINFO_MICRODESCRIPTORS)
- #print(controller.get_version() >= stem.version.Requirement.GETINFO_MICRODESCRIPTORS)
- #print('-------')
- #print([x.exit_policy for x in controller.get_network_statuses()])
- relay_fingerprints = list(set([desc.fingerprint for desc in controller.get_network_statuses()]))
- #print(relay_fingerprints)
- relay_digest_map = {desc.digest: desc.fingerprint for desc in controller.get_network_statuses()}
- print(relay_digest_map)
- relay_exit_digests = list(set([desc.digest for desc in controller.get_microdescriptors() if desc.exit_policy.can_exit_to(*endpoint)]))
- #print(relay_exit_digests)
- print([desc.microdescriptor_digest for desc in controller.get_microdescriptors() if desc.exit_policy.can_exit_to(*endpoint)])
- print([binascii.hexlify(base64.b64decode(digest()+'===')).decode('utf-8').upper()[:40] for digest in relay_exit_digests])
- relay_exits = list(set([relay_digest_map[binascii.hexlify(base64.b64decode(digest()+'===')).decode('utf-8').upper()[:40]] for digest in relay_exit_digests]))
- #print(relay_exits)
- #
- #print(dir(list(controller.get_network_statuses())[0]))
- #print(type(list(controller.get_network_statuses())[0]))
- #print([desc for desc in controller.get_microdescriptors()])
- #print([desc.exit_policy for desc in controller.get_microdescriptors()])
- #print([desc.exit_policy.can_exit_to(*endpoint) for desc in controller.get_microdescriptors()])
- #print([desc.fingerprint for desc in controller.get_microdescriptors()])
- #print([desc.flags for desc in controller.get_microdescriptors()])
- #
- '''
|