123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- #!/usr/bin/python3
- #
- import throughput_protocols
- import basic_protocols
- import useful
- import time
- import os
- import argparse
- import logging
- import socket
- import random
- import multiprocessing
- import stem.control
- import stem.descriptor.remote
- import stem.process
- import base64
- import binascii
- #
- logging.getLogger('stem').setLevel(logging.WARNING)
- #
- def start_client_process(protocol, id_num, finished_queue):
- p = multiprocessing.Process(target=run_client, args=(protocol, id_num, finished_queue))
- p.start()
- return p
- #
- def run_client(protocol, id_num, finished_queue):
- had_error = False
- try:
- logging.info('Starting protocol (id: {})'.format(id_num))
- protocol.run()
- logging.info('Done protocol (id: {})'.format(id_num))
- except:
- had_error = True
- logging.warning('Protocol error')
- logging.exception('Protocol id: {} had an error'.format(id_num))
- finally:
- finished_queue.put((id_num, had_error))
- #
- #
- def parse_range(range_str):
- return tuple(int(x) for x in range_str.split('-'))
- #
- def get_socks_port(control_port):
- with stem.control.Controller.from_port(port=control_port) as controller:
- controller.authenticate()
- #
- socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS)
- #logging.info(socks_addresses)
- assert(len(socks_addresses) == 1)
- assert(socks_addresses[0][0] == '127.0.0.1')
- #
- return socks_addresses[0][1]
- #
- #
- def send_measureme(controller, circuit_id, measureme_id, hop):
- response = controller.msg('SENDMEASUREME %s ID=%s HOP=%s' % (circuit_id, measureme_id, hop))
- stem.response.convert('SINGLELINE', response)
- #
- if not response.is_ok():
- if response.code in ('512', '552'):
- if response.message.startswith('Unknown circuit '):
- raise stem.InvalidArguments(response.code, response.message, [circuit_id])
- #
- raise stem.InvalidRequest(response.code, response.message)
- else:
- raise stem.ProtocolError('MEASUREME returned unexpected response code: %s' % response.code)
- #
- #
- #
- def connect_and_send_measureme(control_port, circuit_id, measureme_id, hop):
- with stem.control.Controller.from_port(port=control_port) as controller:
- controller.authenticate()
- send_measureme(controller, circuit_id, measureme_id, hop)
- #
- #
- def send_all_measuremes(controller, circuit_id, measureme_id):
- send_measureme(controller, circuit_id, measureme_id, 2)
- send_measureme(controller, circuit_id, measureme_id, 0)
- #
- def push_start_measureme_cb(control_port, circuit_id, measureme_id, wait_until, hops):
- logging.info('Sending measuremes to control port {}, then sleeping'.format(control_port))
- with stem.control.Controller.from_port(port=control_port) as controller:
- controller.authenticate()
- for hop in hops:
- send_measureme(controller, circuit_id, measureme_id, hop)
- #
- #
- time.sleep(wait_until-time.time())
- #
- if __name__ == '__main__':
- logging.basicConfig(level=logging.DEBUG)
- #
- parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).')
- parser.add_argument('ip', type=str, help='destination ip address')
- parser.add_argument('port', type=int, help='destination port')
- parser.add_argument('num_bytes', type=useful.parse_bytes,
- help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes')
- parser.add_argument('num_clients', type=int, help='number of Tor clients to start', metavar='num-clients')
- parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client')
- #parser.add_argument('--wait', type=int,
- # help='wait until the given time before pushing data (time in seconds since epoch)', metavar='time')
- parser.add_argument('--buffer-len', type=useful.parse_bytes,
- help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes')
- parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)')
- parser.add_argument('--wait-range', type=int, default=0,
- help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time')
- parser.add_argument('--proxy-control-port-range', type=parse_range, help='range of ports for the control ports')
- parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit')
- args = parser.parse_args()
- #
- endpoint = (args.ip, args.port)
- #
- streams_per_client = args.num_streams_per_client
- #
- if args.num_clients > 0:
- num_clients = args.num_clients
- socks_port_start = 9551
- control_port_start = 12001
- target_relay = 'BCEDF6C193AA687AE471B8A22EBF6BC57C2D285E' # gurgle
- other_relays = list(set(['BC630CBBB518BE7E9F4E09712AB0269E9DC7D626', '18CFB7BA07F13AEABF50A7148786DA68773B2498', 'B771AA877687F88E6F1CA5354756DF6C8A7B6B24', '204DFD2A2C6A0DC1FA0EACB495218E0B661704FD', 'F6740DEABFD5F62612FA025A5079EA72846B1F67', 'A21D24309FD17F57395CDB0B4A3B813AE73FBC5A', 'B204DE75B37064EF6A4C6BAF955C5724578D0B32', '874D84382C892F3F61CC9E106BF08843DE0B865A', '25990FC54D7268C914170A118EE4EE75025451DA', 'B872BA6804C8C6E140AE1897B44CF32B42FD2397', 'B143D439B72D239A419F8DCE07B8A8EB1B486FA7', 'DA4B488C2826DFBBD04D635DA1E71A2BA5B20747', 'D80EA21626BFAE8044E4037FE765252E157E3586']))
- # bad relays = ['3F62F05E859D7F98B086F702A31F7714D566E49A', '8EE0534532EA31AA5172B1892F53B2F25C76EB02', '7DC52AE6667A30536BA2383CD102CFC24F20AD71']
- # no longer exist = ['DBAD17D706E2B6D5D917C2077961750513BDF879']
- #
- logging.info('Getting consensus')
- #
- try:
- consensus = stem.descriptor.remote.get_consensus()
- #relay_fingerprints = [desc.fingerprint for desc in consensus]
- logging.info([desc.exit_policy.can_exit_to(*endpoint) for desc in consensus if desc.fingerprint==target_relay])
- except Exception as e:
- raise Exception('Unable to retrieve the consensus') from e
- #
- logging.info('Starting tor processes')
- #
- clients = []
- #
- try:
- for client_index in range(num_clients):
- # start a tor client
- #
- socks_port = socks_port_start+client_index
- control_port = control_port_start+client_index
- #
- tor_process = stem.process.launch_tor_with_config(
- config = {
- 'Log': ['notice file /tmp/tor{}/log'.format(client_index), 'notice stdout'],
- 'TruncateLogFile': '1',
- 'MeasuremeLogFile': '/ramdisk/sengler/real/measureme-{}.log'.format(client_index),
- 'SafeLogging': 'relay',
- 'SocksPort': str(socks_port),
- 'ControlPort': str(control_port),
- 'DataDirectory': '/tmp/tor{}'.format(client_index),
- }
- )
- clients.append({'socks_port':socks_port, 'control_port':control_port, 'process':tor_process})
- logging.info('Started '+str(client_index))
- #
- except:
- for c in clients:
- if 'process' in c:
- c['process'].kill()
- #
- #
- raise
- #
- else:
- proxy_control_ports = list(range(args.proxy_control_port_range[0], args.proxy_control_port_range[1]+1))
- socks_ports = [get_socks_port(x) for x in proxy_control_ports]
- #
- logging.info('Getting consensus')
- #
- try:
- consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),))
- #
- relay_fingerprints = [desc.fingerprint for desc in consensus]
- exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)]
- except Exception as e:
- raise Exception('Unable to retrieve the consensus') from e
- #
- print('Num socks ports: {}'.format(len(socks_ports)))
- print('Num relays: {}'.format(len(relay_fingerprints)))
- print('Num exits: {}'.format(len(exit_fingerprints)))
- #
- assert(len(relay_fingerprints) >= 2)
- assert(len(exit_fingerprints) == 1)
- #
- target_relay = exit_fingerprints[0]
- other_relays = list(set(relay_fingerprints)-set(exit_fingerprints))
- #
- clients = []
- #
- for client_index in range(len(proxy_control_ports)):
- socks_port = socks_ports[client_index]
- control_port = proxy_control_ports[client_index]
- #
- clients.append({'socks_port':socks_port, 'control_port':control_port})
- #
- #
- ######################################3
- #
- controllers = []
- #
- for client_index in range(len(clients)):
- # make connections to client control ports
- #
- connection = stem.control.Controller.from_port(port=clients[client_index]['control_port'])
- connection.authenticate()
- #
- controllers.append({'connection':connection, 'id':client_index})
- #
- all_circuits_okay = True
- #
- for controller in controllers:
- # for each client, override the circuits for new streams
- #
- controller['circuits_remaining'] = []
- controller['circuit_ids'] = []
- controller['circuit_verbose'] = []
- #
- logging.info('Setting up controller id={}'.format(controller['id']))
- #
- for y in range(streams_per_client):
- circuit_id = None
- #
- while circuit_id is None:
- #circuit = [random.choice(relay_fingerprints), target_relay]
- first_relay = random.choice(other_relays)
- exit_relay = target_relay
- #circuit = [target_relay, exit_relay]
- circuit = [first_relay, exit_relay]
- #
- #if [desc.exit_policy.can_exit_to(*endpoint) for desc in consensus if desc.fingerprint==exit_relay][0] is False:
- # logging.info('Relay {} can\'t exit!'.format(exit_relay))
- # all_circuits_okay = False
- #
- try:
- circuit_id = controller['connection'].new_circuit(circuit, await_build=True)
- logging.info('New circuit (id={}): {}'.format(circuit_id, circuit))
- except stem.CircuitExtensionFailed:
- logging.info('Failed circuit: {}'.format(circuit))
- logging.warning('Circuit creation failed. Retrying...')
- #
- #
- #try:
- # circuit_id = controller['connection'].new_circuit(circuit, await_build=True)
- #except stem.CircuitExtensionFailed:
- # for c in controllers:
- # c['connection'].close()
- # #
- # for c in clients:
- # c['process'].kill()
- # #
- # raise
- #
- controller['circuits_remaining'].append(circuit_id)
- controller['circuit_ids'].append(circuit_id)
- controller['circuit_verbose'].append(circuit)
- time.sleep(0.5)#1.5
- #
- def attach_stream(stream, controller):
- try:
- if stream.status == 'NEW':
- # by default, let tor handle new streams
- circuit_id = 0
- #
- if stream.purpose == 'USER':
- # this is probably one of our streams (although not guaranteed)
- circuit_id = controller['circuits_remaining'][0]
- controller['circuits_remaining'] = controller['circuits_remaining'][1:]
- #
- try:
- controller['connection'].attach_stream(stream.id, circuit_id)
- #logging.info('Attaching to circuit {}'.format(circuit_id))
- except stem.UnsatisfiableRequest:
- if stream.purpose != 'USER':
- # could not attach a non-user stream, so probably raised:
- # stem.UnsatisfiableRequest: Connection is not managed by controller.
- # therefore we should ignore this exception
- pass
- else:
- raise
- #
- #
- #
- except:
- logging.exception('Error while attaching the stream (controller_id={}, circuit_id={}).'.format(controller['id'], circuit_id))
- raise
- #
- #
- controller['connection'].add_event_listener(lambda x, controller=controller: attach_stream(x, controller),
- stem.control.EventType.STREAM)
- controller['connection'].set_conf('__LeaveStreamsUnattached', '1')
- #
- '''
- if not all_circuits_okay:
- for c in clients:
- if 'process' in c:
- c['process'].kill()
- #
- #
- raise Exception('Not all circuits can exit! Stopping...')
- #
- '''
- processes = {}
- circuits = {}
- process_counter = 0
- finished_processes = multiprocessing.Queue()
- #
- logging.info('Starting protocols')
- #
- wait_time = int(time.time()+30)
- #
- for stream_index in range(streams_per_client):
- for client_index in range(len(clients)):
- client_socket = socket.socket()
- protocols = []
- #
- proxy_username = bytes([z for z in os.urandom(12) if z != 0])
- proxy_endpoint = ('127.0.0.1', clients[client_index]['socks_port'])
- #
- logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), proxy_endpoint)
- client_socket.connect(proxy_endpoint)
- logging.debug('Socket %d connected', client_socket.fileno())
- #
- proxy_protocol = basic_protocols.Socks4Protocol(client_socket, endpoint, username=proxy_username)
- protocols.append(proxy_protocol)
- #
- wait_until = wait_time+random.randint(0, args.wait_range)
- #
- if args.measureme:
- control_port = clients[client_index]['control_port']
- controller = controllers[client_index]['connection']
- circuit_id = controllers[client_index]['circuit_ids'][stream_index]
- measureme_id = stream_index*streams_per_client + client_index + 1 # this is not calculated correctly, leaving as-is for now
- raise Exception('The above line is broken (measureme_id not set properly)!!!')
- #print('Data: {}, {}'.format(circuit_id, measureme_id))
- #measureme_cb = lambda control_port=control_port, circuit_id=circuit_id, measureme_id=measureme_id: connect_and_send_measureme(control_port, circuit_id, measureme_id, 2) <---- need to change this to also send to hop 0
- #measureme_cb = lambda controller=controller, circuit_id=circuit_id, measureme_id=measureme_id: send_all_measuremes(controller, circuit_id, measureme_id)
- #measureme_cb()
- if args.num_clients==0:
- # using Chutney
- hops = [2, 1, 0]
- else:
- hops = [0]
- #
- start_cb = lambda control_port=control_port, circuit_id=circuit_id, measureme_id=measureme_id, \
- until=wait_until, hops=hops: \
- push_start_measureme_cb(control_port, circuit_id, measureme_id, until, hops)
- circuit_bytes = (controllers[client_index]['circuit_verbose'][stream_index][0]+':'+str(measureme_id)).encode('utf-8')
- else:
- start_cb = lambda until=wait_until: time.sleep(until-time.time())
- circuit_bytes = (controllers[client_index]['circuit_verbose'][stream_index][0]).encode('utf-8')
- #
- throughput_protocol = throughput_protocols.ClientProtocol(client_socket, args.num_bytes,
- # wait_until=wait_time+random.randint(0, args.wait_range),
- group_id=wait_time,
- custom_data=circuit_bytes,
- send_buffer_len=args.buffer_len,
- use_acceleration=(not args.no_accel),
- push_start_cb=start_cb)
- protocols.append(throughput_protocol)
- #
- combined_protocol = basic_protocols.ChainedProtocol(protocols)
- processes[process_counter] = start_client_process(combined_protocol, process_counter, finished_processes)
- circuits[process_counter] = controllers[client_index]['circuit_verbose'][stream_index]
- process_counter += 1
- client_socket.close()
- #
- time.sleep(0.01)
- #
- #
- if wait_time is not None:
- logging.info('Starting in {:.2f} seconds'.format(wait_time-time.time()))
- #
- try:
- while len(processes) > 0:
- logging.info('Waiting for processes ({} left)'.format(len(processes)))
- (p_id, error) = finished_processes.get()
- p = processes[p_id]
- p.join()
- processes.pop(p_id)
- if error:
- logging.info('Circuit with error: '+str(circuits[p_id]))
- #
- circuits.pop(p_id)
- #
- except KeyboardInterrupt as e:
- print()
- for p_id in processes:
- processes[p_id].terminate()
- #
- #
- logging.info('Processes finished')
- #
- for c in controllers:
- c['connection'].close()
- #
- for c in clients:
- if 'process' in c:
- c['process'].terminate()
- #
- #
- #
|