#!/usr/bin/python3 # import stem.control import stem.descriptor.remote import stem.process import socket import logging import multiprocessing import queue import random import time import json import os # import basic_protocols import throughput_protocols import useful # def get_socks_port(control_port): with stem.control.Controller.from_port(port=control_port) as controller: controller.authenticate() # socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS) assert(len(socks_addresses) == 1) assert(socks_addresses[0][0] == '127.0.0.1') # return socks_addresses[0][1] # # def wait_then_sleep(event, duration): event.wait() time.sleep(duration) # def send_measureme(stem_controller, circuit_id, measureme_id, hop): response = stem_controller.msg('SENDMEASUREME %s ID=%s HOP=%s' % (circuit_id, measureme_id, hop)) stem.response.convert('SINGLELINE', response) # if not response.is_ok(): if response.code in ('512', '552'): if response.message.startswith('Unknown circuit '): raise stem.InvalidArguments(response.code, response.message, [circuit_id]) # raise stem.InvalidRequest(response.code, response.message) else: raise stem.ProtocolError('MEASUREME returned unexpected response code: %s' % response.code) # # # def send_measureme_cells(control_address, circuit_id, measureme_id, hops): logging.debug('Sending measuremes to control address {}, then sleeping'.format(control_address)) with stem.control.Controller.from_port(address=control_address[0], port=control_address[1]) as controller: controller.authenticate() for hop in hops: send_measureme(controller, circuit_id, measureme_id, hop) # # # def send_measureme_cells_and_wait(control_port, circuit_id, measureme_id, hops, wait_event, wait_offset): send_measureme_cells(control_port, circuit_id, measureme_id, hops) wait_then_sleep(wait_event, wait_offset) # def get_fingerprints(consensus): """ Get the fingerprints of all relays. """ # return [desc.fingerprint for desc in consensus] # def get_exit_fingerprints(consensus, endpoint): """ Get the fingerprints of relays that can exit to the endpoint. """ # return [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)] # class ExperimentController: def __init__(self, control_address): self.control_address = control_address self.connection = None self.circuits = {} self.unassigned_circuit_ids = [] self.assigned_streams = {} # def connect(self): self.connection = stem.control.Controller.from_port(address=self.control_address[0], port=self.control_address[1]) self.connection.authenticate() # self.connection.add_event_listener(self._attach_stream, stem.control.EventType.STREAM) self.connection.set_conf('__LeaveStreamsUnattached', '1') # def disconnect(self): #if len(self.unused_circuit_ids) > 0: # logging.warning('Closed stem controller before all circuits were used') # self.connection.close() # def assign_stream(self, from_address): circuit_id = self.unassigned_circuit_ids.pop(0) self.assigned_streams[from_address] = circuit_id return circuit_id # def _attach_stream(self, stream): try: if stream.status == 'NEW': # by default, let tor handle new streams circuit_id = 0 # if stream.purpose == 'USER': # this is probably one of our streams (although not guaranteed) circuit_id = self.assigned_streams[(stream.source_address, stream.source_port)] # try: self.connection.attach_stream(stream.id, circuit_id) #logging.debug('Attaching to circuit {}'.format(circuit_id)) except stem.UnsatisfiableRequest: if stream.purpose != 'USER': # could not attach a non-user stream, so probably raised: # stem.UnsatisfiableRequest: Connection is not managed by controller. # therefore we should ignore this exception pass else: raise # # # except: logging.exception('Error while attaching the stream (control_port={}, circuit_id={}).'.format(self.control_port, circuit_id)) raise # # def build_circuit(self, circuit_generator): circuit_id = None # while circuit_id is None: try: circuit = circuit_generator() circuit_id = self.connection.new_circuit(circuit, await_build=True) logging.debug('New circuit (id={}): {}'.format(circuit_id, circuit)) except stem.CircuitExtensionFailed: logging.debug('Failed circuit: {}'.format(circuit)) logging.warning('Circuit creation failed. Retrying...') # # self.unassigned_circuit_ids.append(circuit_id) self.circuits[circuit_id] = circuit # # class ExperimentProtocol(basic_protocols.ChainedProtocol): def __init__(self, socket, endpoint, num_bytes, custom_data=None, send_buffer_len=None, push_start_cb=None): proxy_username = bytes([z for z in os.urandom(12) if z != 0]) proxy_protocol = basic_protocols.Socks4Protocol(socket, endpoint, username=proxy_username) # throughput_protocol = throughput_protocols.ClientProtocol(socket, num_bytes, custom_data=custom_data, send_buffer_len=send_buffer_len, use_acceleration=True, push_start_cb=push_start_cb) # super().__init__([proxy_protocol, throughput_protocol]) # # class ExperimentProtocolManager(): def __init__(self): self.stopped = False self.process_counter = 0 self.used_ids = [] self.running_processes = {} self.global_finished_process_queue = multiprocessing.Queue() self.local_finished_process_queue = queue.Queue() self.queue_getter = useful.QueueGetter(self.global_finished_process_queue, self.local_finished_process_queue.put) # def _run_client(self, protocol, protocol_id): had_error = False try: logging.debug('Starting protocol (id: {})'.format(protocol_id)) protocol.run() logging.debug('Done protocol (id: {})'.format(protocol_id)) except: had_error = True logging.warning('Protocol error') logging.exception('Protocol id: {} had an error'.format(protocol_id)) finally: self.global_finished_process_queue.put((protocol_id, had_error)) # # def start_experiment_protocol(self, protocol, protocol_id=None): if protocol_id is None: protocol_id = self.process_counter # assert not self.stopped assert protocol_id not in self.used_ids, 'Protocol ID already used' # p = multiprocessing.Process(target=self._run_client, args=(protocol, protocol_id)) self.running_processes[protocol_id] = p self.used_ids.append(protocol_id) # p.start() self.process_counter += 1 # #protocol.socket.close() # def wait(self, finished_protocol_cb=None): while len(self.running_processes) > 0: logging.debug('Waiting for processes ({} left)'.format(len(self.running_processes))) # (protocol_id, had_error) = self.local_finished_process_queue.get() p = self.running_processes[protocol_id] p.join() self.running_processes.pop(protocol_id) finished_protocol_cb(protocol_id, had_error) # # def stop(self): self.wait() self.queue_getter.stop() self.queue_getter.join() self.stopped = True # # def build_client_protocol(endpoint, socks_address, control_address, controller, wait_duration=0, measureme_id=None, num_bytes=None, buffer_len=None): client_socket = socket.socket() # logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), socks_address) client_socket.connect(socks_address) logging.debug('Socket %d connected', client_socket.fileno()) # custom_data = {} # circuit_id = controller.assign_stream(client_socket.getsockname()) custom_data['circuit'] = (circuit_id, controller.circuits[circuit_id]) # if measureme_id is not None: custom_data['measureme_id'] = measureme_id # hops = list(range(len(controller.circuits[circuit_id])+1))[::-1] # send the measureme cells to the last relay first start_cb = lambda control_address=control_address, circuit_id=circuit_id, measureme_id=measureme_id, \ hops=hops, event=start_event, wait_duration=wait_duration: \ send_measureme_cells_and_wait(control_address, circuit_id, measureme_id, hops, event, wait_duration) else: start_cb = lambda event=start_event, duration=wait_duration: wait_then_sleep(event, duration) # custom_data = json.dumps(custom_data).encode('utf-8') protocol = ExperimentProtocol(client_socket, endpoint, args.num_bytes, custom_data=custom_data, send_buffer_len=args.buffer_len, push_start_cb=start_cb) return protocol # if __name__ == '__main__': import argparse # logging.basicConfig(level=logging.DEBUG) logging.getLogger('stem').setLevel(logging.WARNING) # parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).') parser.add_argument('ip', type=str, help='destination ip address') parser.add_argument('port', type=int, help='destination port') parser.add_argument('num_bytes', type=useful.parse_bytes, help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes') parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client') parser.add_argument('--buffer-len', type=useful.parse_bytes, help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes') parser.add_argument('--wait-range', type=int, default=0, help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time') parser.add_argument('--proxy-control-ports', type=useful.parse_range_list, help='range of ports for the control ports', metavar='control-ports') parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit') args = parser.parse_args() # endpoint = (args.ip, args.port) # logging.debug('Getting consensus') try: consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),)) except Exception as e: raise Exception('Unable to retrieve the consensus') from e # fingerprints = get_fingerprints(consensus) exit_fingerprints = get_exit_fingerprints(consensus, endpoint) non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)) # assert len(exit_fingerprints) == 1, 'Need exactly one exit relay' assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay' # circuit_generator = lambda: [random.choice(non_exit_fingerprints), exit_fingerprints[0]] # proxy_addresses = [] for control_port in args.proxy_control_ports: proxy = {} proxy['control'] = ('127.0.0.1', control_port) proxy['socks'] = ('127.0.0.1', get_socks_port(control_port)) proxy_addresses.append(proxy) # controllers = [] protocol_manager = ExperimentProtocolManager() # try: for proxy_address in proxy_addresses: controller = ExperimentController(proxy_address['control']) controller.connect() # the controller has to attach new streams to circuits, so the # connection has to stay open until we're done creating streams # for _ in range(args.num_streams_per_client): # make a circuit for each stream controller.build_circuit(circuit_generator) time.sleep(0.5) # controllers.append(controller) # start_event = multiprocessing.Event() # for stream_index in range(args.num_streams_per_client): for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers): ''' client_socket = socket.socket() # logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), proxy_address['socks']) client_socket.connect(proxy_address['socks']) logging.debug('Socket %d connected', client_socket.fileno()) # wait_offset = random.randint(0, args.wait_range) custom_data = {} # circuit_id = controller.assign_stream(client_socket.getsockname()) custom_data['circuit'] = (circuit_id, controller.circuits[circuit_id]) # if args.measureme: measureme_id = stream_index*args.num_streams_per_client + controllers.index(controller) + 1 custom_data['measureme_id'] = measureme_id # hops = list(range(len(controller.circuits[circuit_id])+1))[::-1] # send the measureme cells to the last relay first start_cb = lambda control_address=proxy_address['control'], circuit_id=circuit_id, measureme_id=measureme_id, \ hops=hops, event=start_event, wait_offset=wait_offset: \ send_measureme_cells_and_wait(control_address, circuit_id, measureme_id, hops, event, wait_offset) else: start_cb = lambda event=start_event, duration=wait_offset: wait_then_sleep(event, duration) # custom_data = json.dumps(custom_data).encode('utf-8') protocol = ExperimentProtocol(client_socket, endpoint, args.num_bytes, custom_data=custom_data, send_buffer_len=args.buffer_len, push_start_cb=start_cb) # ''' if args.measureme: measureme_id = stream_index*args.num_streams_per_client + controller_index + 1 else: measureme_id = None # wait_duration = random.randint(0, args.wait_range) protocol = build_client_protocol(endpoint, proxy_address['socks'], proxy_address['control'], controller, wait_duration=wait_duration, measureme_id=measureme_id, num_bytes=args.num_bytes, buffer_len=args.buffer_len) protocol_manager.start_experiment_protocol(protocol, protocol_id=None) # # time.sleep(2) start_event.set() # protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error))) finally: for controller in controllers: controller.disconnect() # protocol_manager.stop() # #