#!/usr/bin/python3 # import throughput_protocols import basic_protocols import useful import time import os import argparse import logging import socket import random import multiprocessing import stem.control import stem.descriptor.remote import stem.process import base64 import binascii # logging.getLogger('stem').setLevel(logging.WARNING) # def start_client_process(protocol, id_num, finished_queue): p = multiprocessing.Process(target=run_client, args=(protocol, id_num, finished_queue)) p.start() return p # def run_client(protocol, id_num, finished_queue): had_error = False try: logging.info('Starting protocol (id: {})'.format(id_num)) protocol.run() logging.info('Done protocol (id: {})'.format(id_num)) except: had_error = True logging.warning('Protocol error') logging.exception('Protocol id: {} had an error'.format(id_num)) finally: finished_queue.put((id_num, had_error)) # # def parse_range(range_str): return tuple(int(x) for x in range_str.split('-')) # def get_socks_port(control_port): with stem.control.Controller.from_port(port=control_port) as controller: controller.authenticate() # socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS) #logging.info(socks_addresses) assert(len(socks_addresses) == 1) assert(socks_addresses[0][0] == '127.0.0.1') # return socks_addresses[0][1] # # def send_measureme(controller, circuit_id, measureme_id, hop): response = controller.msg('SENDMEASUREME %s ID=%s HOP=%s' % (circuit_id, measureme_id, hop)) stem.response.convert('SINGLELINE', response) # if not response.is_ok(): if response.code in ('512', '552'): if response.message.startswith('Unknown circuit '): raise stem.InvalidArguments(response.code, response.message, [circuit_id]) # raise stem.InvalidRequest(response.code, response.message) else: raise stem.ProtocolError('MEASUREME returned unexpected response code: %s' % response.code) # # # def connect_and_send_measureme(control_port, circuit_id, measureme_id, hop): with stem.control.Controller.from_port(port=control_port) as controller: controller.authenticate() send_measureme(controller, circuit_id, measureme_id, hop) # # def send_all_measuremes(controller, circuit_id, measureme_id): send_measureme(controller, circuit_id, measureme_id, 2) send_measureme(controller, circuit_id, measureme_id, 0) # def push_start_measureme_cb(control_port, circuit_id, measureme_id, wait_until, hops): logging.info('Sending measuremes to control port {}, then sleeping'.format(control_port)) with stem.control.Controller.from_port(port=control_port) as controller: controller.authenticate() for hop in hops: send_measureme(controller, circuit_id, measureme_id, hop) # # time.sleep(wait_until-time.time()) # if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) # parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).') parser.add_argument('ip', type=str, help='destination ip address') parser.add_argument('port', type=int, help='destination port') parser.add_argument('num_bytes', type=useful.parse_bytes, help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes') parser.add_argument('num_clients', type=int, help='number of Tor clients to start', metavar='num-clients') parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client') #parser.add_argument('--wait', type=int, # help='wait until the given time before pushing data (time in seconds since epoch)', metavar='time') parser.add_argument('--buffer-len', type=useful.parse_bytes, help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes') parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)') parser.add_argument('--wait-range', type=int, default=0, help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time') parser.add_argument('--proxy-control-port-range', type=parse_range, help='range of ports for the control ports') parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit') args = parser.parse_args() # endpoint = (args.ip, args.port) # streams_per_client = args.num_streams_per_client # if args.num_clients > 0: num_clients = args.num_clients socks_port_start = 9551 control_port_start = 12001 target_relay = 'BCEDF6C193AA687AE471B8A22EBF6BC57C2D285E' # gurgle other_relays = list(set(['BC630CBBB518BE7E9F4E09712AB0269E9DC7D626', '18CFB7BA07F13AEABF50A7148786DA68773B2498', 'B771AA877687F88E6F1CA5354756DF6C8A7B6B24', '204DFD2A2C6A0DC1FA0EACB495218E0B661704FD', 'F6740DEABFD5F62612FA025A5079EA72846B1F67', 'A21D24309FD17F57395CDB0B4A3B813AE73FBC5A', 'B204DE75B37064EF6A4C6BAF955C5724578D0B32', '874D84382C892F3F61CC9E106BF08843DE0B865A', '25990FC54D7268C914170A118EE4EE75025451DA', 'B872BA6804C8C6E140AE1897B44CF32B42FD2397', 'B143D439B72D239A419F8DCE07B8A8EB1B486FA7', 'DA4B488C2826DFBBD04D635DA1E71A2BA5B20747', 'D80EA21626BFAE8044E4037FE765252E157E3586'])) # bad relays = ['3F62F05E859D7F98B086F702A31F7714D566E49A', '8EE0534532EA31AA5172B1892F53B2F25C76EB02', '7DC52AE6667A30536BA2383CD102CFC24F20AD71'] # no longer exist = ['DBAD17D706E2B6D5D917C2077961750513BDF879'] # logging.info('Getting consensus') # try: consensus = stem.descriptor.remote.get_consensus() #relay_fingerprints = [desc.fingerprint for desc in consensus] logging.info([desc.exit_policy.can_exit_to(*endpoint) for desc in consensus if desc.fingerprint==target_relay]) except Exception as e: raise Exception('Unable to retrieve the consensus') from e # logging.info('Starting tor processes') # clients = [] # try: for client_index in range(num_clients): # start a tor client # socks_port = socks_port_start+client_index control_port = control_port_start+client_index # tor_process = stem.process.launch_tor_with_config( config = { 'Log': ['notice file /tmp/tor{}/log'.format(client_index), 'notice stdout'], 'TruncateLogFile': '1', 'MeasuremeLogFile': '/ramdisk/sengler/real/measureme-{}.log'.format(client_index), 'SafeLogging': 'relay', 'SocksPort': str(socks_port), 'ControlPort': str(control_port), 'DataDirectory': '/tmp/tor{}'.format(client_index), } ) clients.append({'socks_port':socks_port, 'control_port':control_port, 'process':tor_process}) logging.info('Started '+str(client_index)) # except: for c in clients: if 'process' in c: c['process'].kill() # # raise # else: proxy_control_ports = list(range(args.proxy_control_port_range[0], args.proxy_control_port_range[1]+1)) socks_ports = [get_socks_port(x) for x in proxy_control_ports] # logging.info('Getting consensus') # try: consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),)) # relay_fingerprints = [desc.fingerprint for desc in consensus] exit_fingerprints = [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)] except Exception as e: raise Exception('Unable to retrieve the consensus') from e # print('Num socks ports: {}'.format(len(socks_ports))) print('Num relays: {}'.format(len(relay_fingerprints))) print('Num exits: {}'.format(len(exit_fingerprints))) # assert(len(relay_fingerprints) >= 2) assert(len(exit_fingerprints) == 1) # target_relay = exit_fingerprints[0] other_relays = list(set(relay_fingerprints)-set(exit_fingerprints)) # clients = [] # for client_index in range(len(proxy_control_ports)): socks_port = socks_ports[client_index] control_port = proxy_control_ports[client_index] # clients.append({'socks_port':socks_port, 'control_port':control_port}) # # ######################################3 # controllers = [] # for client_index in range(len(clients)): # make connections to client control ports # connection = stem.control.Controller.from_port(port=clients[client_index]['control_port']) connection.authenticate() # controllers.append({'connection':connection, 'id':client_index}) # all_circuits_okay = True # for controller in controllers: # for each client, override the circuits for new streams # controller['circuits_remaining'] = [] controller['circuit_ids'] = [] controller['circuit_verbose'] = [] # logging.info('Setting up controller id={}'.format(controller['id'])) # for y in range(streams_per_client): circuit_id = None # while circuit_id is None: #circuit = [random.choice(relay_fingerprints), target_relay] first_relay = random.choice(other_relays) exit_relay = target_relay #circuit = [target_relay, exit_relay] circuit = [first_relay, exit_relay] # #if [desc.exit_policy.can_exit_to(*endpoint) for desc in consensus if desc.fingerprint==exit_relay][0] is False: # logging.info('Relay {} can\'t exit!'.format(exit_relay)) # all_circuits_okay = False # try: circuit_id = controller['connection'].new_circuit(circuit, await_build=True) logging.info('New circuit (id={}): {}'.format(circuit_id, circuit)) except stem.CircuitExtensionFailed: logging.info('Failed circuit: {}'.format(circuit)) logging.warning('Circuit creation failed. Retrying...') # # #try: # circuit_id = controller['connection'].new_circuit(circuit, await_build=True) #except stem.CircuitExtensionFailed: # for c in controllers: # c['connection'].close() # # # for c in clients: # c['process'].kill() # # # raise # controller['circuits_remaining'].append(circuit_id) controller['circuit_ids'].append(circuit_id) controller['circuit_verbose'].append(circuit) time.sleep(0.5)#1.5 # def attach_stream(stream, controller): try: if stream.status == 'NEW': # by default, let tor handle new streams circuit_id = 0 # if stream.purpose == 'USER': # this is probably one of our streams (although not guaranteed) circuit_id = controller['circuits_remaining'][0] controller['circuits_remaining'] = controller['circuits_remaining'][1:] # try: controller['connection'].attach_stream(stream.id, circuit_id) #logging.info('Attaching to circuit {}'.format(circuit_id)) except stem.UnsatisfiableRequest: if stream.purpose != 'USER': # could not attach a non-user stream, so probably raised: # stem.UnsatisfiableRequest: Connection is not managed by controller. # therefore we should ignore this exception pass else: raise # # # except: logging.exception('Error while attaching the stream (controller_id={}, circuit_id={}).'.format(controller['id'], circuit_id)) raise # # controller['connection'].add_event_listener(lambda x, controller=controller: attach_stream(x, controller), stem.control.EventType.STREAM) controller['connection'].set_conf('__LeaveStreamsUnattached', '1') # ''' if not all_circuits_okay: for c in clients: if 'process' in c: c['process'].kill() # # raise Exception('Not all circuits can exit! Stopping...') # ''' processes = {} circuits = {} process_counter = 0 finished_processes = multiprocessing.Queue() # logging.info('Starting protocols') # wait_time = int(time.time()+30) # for stream_index in range(streams_per_client): for client_index in range(len(clients)): client_socket = socket.socket() protocols = [] # proxy_username = bytes([z for z in os.urandom(12) if z != 0]) proxy_endpoint = ('127.0.0.1', clients[client_index]['socks_port']) # logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), proxy_endpoint) client_socket.connect(proxy_endpoint) logging.debug('Socket %d connected', client_socket.fileno()) # proxy_protocol = basic_protocols.Socks4Protocol(client_socket, endpoint, username=proxy_username) protocols.append(proxy_protocol) # wait_until = wait_time+random.randint(0, args.wait_range) # if args.measureme: control_port = clients[client_index]['control_port'] controller = controllers[client_index]['connection'] circuit_id = controllers[client_index]['circuit_ids'][stream_index] measureme_id = stream_index*streams_per_client + client_index + 1 # this is not calculated correctly, leaving as-is for now raise Exception('The above line is broken (measureme_id not set properly)!!!') #print('Data: {}, {}'.format(circuit_id, measureme_id)) #measureme_cb = lambda control_port=control_port, circuit_id=circuit_id, measureme_id=measureme_id: connect_and_send_measureme(control_port, circuit_id, measureme_id, 2) <---- need to change this to also send to hop 0 #measureme_cb = lambda controller=controller, circuit_id=circuit_id, measureme_id=measureme_id: send_all_measuremes(controller, circuit_id, measureme_id) #measureme_cb() if args.num_clients==0: # using Chutney hops = [2, 1, 0] else: hops = [0] # start_cb = lambda control_port=control_port, circuit_id=circuit_id, measureme_id=measureme_id, \ until=wait_until, hops=hops: \ push_start_measureme_cb(control_port, circuit_id, measureme_id, until, hops) circuit_bytes = (controllers[client_index]['circuit_verbose'][stream_index][0]+':'+str(measureme_id)).encode('utf-8') else: start_cb = lambda until=wait_until: time.sleep(until-time.time()) circuit_bytes = (controllers[client_index]['circuit_verbose'][stream_index][0]).encode('utf-8') # throughput_protocol = throughput_protocols.ClientProtocol(client_socket, args.num_bytes, # wait_until=wait_time+random.randint(0, args.wait_range), group_id=wait_time, custom_data=circuit_bytes, send_buffer_len=args.buffer_len, use_acceleration=(not args.no_accel), push_start_cb=start_cb) protocols.append(throughput_protocol) # combined_protocol = basic_protocols.ChainedProtocol(protocols) processes[process_counter] = start_client_process(combined_protocol, process_counter, finished_processes) circuits[process_counter] = controllers[client_index]['circuit_verbose'][stream_index] process_counter += 1 client_socket.close() # time.sleep(0.01) # # if wait_time is not None: logging.info('Starting in {:.2f} seconds'.format(wait_time-time.time())) # try: while len(processes) > 0: logging.info('Waiting for processes ({} left)'.format(len(processes))) (p_id, error) = finished_processes.get() p = processes[p_id] p.join() processes.pop(p_id) if error: logging.info('Circuit with error: '+str(circuits[p_id])) # circuits.pop(p_id) # except KeyboardInterrupt as e: print() for p_id in processes: processes[p_id].terminate() # # logging.info('Processes finished') # for c in controllers: c['connection'].close() # for c in clients: if 'process' in c: c['process'].terminate() # # #