#!/usr/bin/python3 # import stem.control import stem.descriptor.remote import stem.process import socket import logging import multiprocessing import queue import random import time import json import os import datetime # import basic_protocols import throughput_protocols import useful # def get_socks_port(control_port): with stem.control.Controller.from_port(port=control_port) as controller: controller.authenticate() # socks_addresses = controller.get_listeners(stem.control.Listener.SOCKS) assert(len(socks_addresses) == 1) assert(socks_addresses[0][0] == '127.0.0.1') # return socks_addresses[0][1] # # def wait_then_sleep(event, duration): event.wait() time.sleep(duration) # def send_measureme(stem_controller, circuit_id, measureme_id, hop): response = stem_controller.msg('SENDMEASUREME %s ID=%s HOP=%s' % (circuit_id, measureme_id, hop)) stem.response.convert('SINGLELINE', response) # if not response.is_ok(): if response.code in ('512', '552'): if response.message.startswith('Unknown circuit '): raise stem.InvalidArguments(response.code, response.message, [circuit_id]) # raise stem.InvalidRequest(response.code, response.message) else: raise stem.ProtocolError('MEASUREME returned unexpected response code: %s' % response.code) # # # def send_measureme_cells(control_address, circuit_id, measureme_id, hops): logging.debug('Sending measuremes to control address {}, then sleeping'.format(control_address)) with stem.control.Controller.from_port(address=control_address[0], port=control_address[1]) as controller: controller.authenticate() for hop in hops: send_measureme(controller, circuit_id, measureme_id, hop) # # # def send_measureme_cells_and_wait(control_port, circuit_id, measureme_id, hops, wait_event, wait_offset): send_measureme_cells(control_port, circuit_id, measureme_id, hops) wait_then_sleep(wait_event, wait_offset) # def get_fingerprints(consensus): """ Get the fingerprints of all relays. """ # return [desc.fingerprint for desc in consensus] # def get_exit_fingerprints(consensus, endpoint): """ Get the fingerprints of relays that can exit to the endpoint. """ # return [desc.fingerprint for desc in consensus if desc.exit_policy.can_exit_to(*endpoint)] # class ExperimentController: def __init__(self, control_address): self.control_address = control_address self.connection = None self.circuits = {} self.unassigned_circuit_ids = [] self.assigned_streams = {} # def connect(self): self.connection = stem.control.Controller.from_port(address=self.control_address[0], port=self.control_address[1]) self.connection.authenticate() # self.connection.add_event_listener(self.stream_event, stem.control.EventType.STREAM) self.connection.add_event_listener(self.circuit_event, stem.control.EventType.CIRC) self.connection.set_conf('__LeaveStreamsUnattached', '1') #self.connection.set_conf('__DisablePredictedCircuits', '1') # we still need to generate circuits for things like directory fetches # def disconnect(self): #if len(self.unused_circuit_ids) > 0: # logging.warning('Closed stem controller before all circuits were used') # self.connection.close() # def assign_stream(self, from_address): """ Should run this function before starting the protocol, and therefore before telling the SOCKS proxy where we're connecting to and before the stream is created. """ circuit_id = self.unassigned_circuit_ids.pop(0) self.assigned_streams[from_address] = circuit_id return circuit_id # def stream_event(self, stream): try: if stream.status == 'NEW': # by default, let tor handle new streams circuit_id = 0 # if stream.purpose == 'USER': # NOTE: we used to try to attach all streams (including non-user streams, # which we attached to circuit 0, but Stem was found to hang sometimes # when attaching DIR_FETCH streams, so now we only attach user streams # and let tor take care of other streams # # this is probably one of our streams (although not guaranteed) circuit_id = self.assigned_streams[(stream.source_address, stream.source_port)] # try: logging.debug('Attaching to circuit {}'.format(circuit_id)) self.connection.attach_stream(stream.id, circuit_id) logging.debug('Attached to circuit {}'.format(circuit_id)) except stem.InvalidRequest: if stream.purpose != 'USER': # could not attach a non-user stream, ignoring pass else: raise # except stem.UnsatisfiableRequest: if stream.purpose != 'USER': # could not attach a non-user stream, so probably raised: # stem.UnsatisfiableRequest: Connection is not managed by controller. # therefore we should ignore this exception pass else: raise # except stem.SocketClosed: logging.debug('Stream {} ({}, controller={}) {}: socket closed while attaching'.format(stream.id, stream.purpose, self.control_address, stream.status)) raise # # # if stream.status == 'DETACHED' or stream.status == 'FAILED': logging.debug('Stream {} ({}, controller={}) {}: {}; {}'.format(stream.id, stream.purpose, self.control_address, stream.status, stream.reason, stream.remote_reason)) # except: logging.exception('Error while attaching the stream.') raise # # def circuit_event(self, circuit): if circuit.purpose == 'CONTROLLER' and (circuit.status == 'FAILED' or circuit.status == 'CLOSED'): logging.debug('Circuit {} ({}, controller={}) {}: {}; {}'.format(circuit.id, circuit.purpose, self.control_address, circuit.status, circuit.reason, circuit.remote_reason)) # # def build_circuit(self, circuit_generator, gen_id): circuit_id = None tries_remaining = 5 # while circuit_id is None and tries_remaining > 0: try: circuit = circuit_generator(gen_id) tries_remaining -= 1 circuit_id = self.connection.new_circuit(circuit, await_build=True, purpose='controller', timeout=10) logging.debug('New circuit (circ_id={}, controller={}): {}'.format(circuit_id, self.control_address, circuit)) except stem.CircuitExtensionFailed as e: wait_seconds = 1 logging.debug('Failed circuit: {}'.format(circuit)) if tries_remaining == 0: logging.warning('Tried too many times') raise # logging.warning('Circuit creation failed (CircuitExtensionFailed: {}). Retrying in {} second{}...'.format(str(e), wait_seconds, 's' if wait_seconds != 1 else '')) time.sleep(wait_seconds) except stem.InvalidRequest as e: wait_seconds = 15 logging.debug('Failed circuit: {}'.format(circuit)) if tries_remaining == 0: logging.warning('Tried too many times') raise # logging.warning('Circuit creation failed (InvalidRequest: {}). Retrying in {} second{}...'.format(str(e), wait_seconds, 's' if wait_seconds != 1 else '')) time.sleep(wait_seconds) except stem.Timeout as e: wait_seconds = 5 logging.debug('Failed circuit: {}'.format(circuit)) if tries_remaining == 0: logging.warning('Tried too many times') raise # logging.warning('Circuit creation timed out (Timeout: {}). Retrying in {} second{}...'.format(str(e), wait_seconds, 's' if wait_seconds != 1 else '')) time.sleep(wait_seconds) # # self.unassigned_circuit_ids.append(circuit_id) self.circuits[circuit_id] = circuit # # class ExperimentProtocol(basic_protocols.ChainedProtocol): def __init__(self, socket, endpoint, num_bytes, circuit_info, custom_data=None, send_buffer_len=None, push_start_cb=None): proxy_username = bytes([z for z in os.urandom(12) if z != 0]) proxy_protocol = basic_protocols.Socks4Protocol(socket, endpoint, username=proxy_username) # self.proxy_info = socket.getpeername() self.circuit_info = circuit_info # throughput_protocol = throughput_protocols.ClientProtocol(socket, num_bytes, custom_data=custom_data, send_buffer_len=send_buffer_len, use_acceleration=True, push_start_cb=push_start_cb) # super().__init__([proxy_protocol, throughput_protocol]) # def get_desc(self): super_desc = super().get_desc() if super_desc is not None: return '{} -> {} - {}'.format(self.proxy_info, self.circuit_info, super_desc) else: return '{} -> {}'.format(self.proxy_info, self.circuit_info) # # class ExperimentProtocolManager(): def __init__(self): self.stopped = False self.process_counter = 0 self.used_ids = [] self.running_processes = {} self.checked_in = multiprocessing.Manager().dict() self.global_finished_process_queue = multiprocessing.Queue() self.local_finished_process_queue = queue.Queue() self.queue_getter = useful.QueueGetter(self.global_finished_process_queue, self.local_finished_process_queue.put) # def _run_client(self, protocol, protocol_id): had_error = False try: logging.debug('Starting client protocol (id: {}, desc: {})'.format(protocol_id, protocol.get_desc())) self.checked_in[protocol_id] = True protocol.run() logging.debug('Done client protocol (id: {})'.format(protocol_id)) except KeyboardInterrupt: had_error = True logging.info('Client protocol id: {} stopped (KeyboardInterrupt)'.format(protocol_id)) except: had_error = True logging.warning('Client protocol error') logging.exception('Client protocol id: {} had an error ({})'.format(protocol_id, datetime.datetime.now().time())) finally: self.global_finished_process_queue.put((protocol_id, had_error)) if had_error: logging.warning('Client protocol with error successfully added self to global queue') # # # def start_experiment_protocol(self, protocol, protocol_id=None): if protocol_id is None: protocol_id = self.process_counter # assert not self.stopped assert protocol_id not in self.used_ids, 'Protocol ID already used' # #logging.debug('Launching client protocol (id: {})'.format(protocol_id)) p = multiprocessing.Process(target=self._run_client, args=(protocol, protocol_id)) self.running_processes[protocol_id] = p self.checked_in[protocol_id] = False # because of Python multiprocessing bugs, the process may deadlock when it starts self.used_ids.append(protocol_id) # p.start() self.process_counter += 1 # #protocol.socket.close() # def _get_not_checked_in(self): temp = self.checked_in.copy() return [x for x in temp if not temp[x]] # #def _count_checked_in(self): # temp = self.checked_in.copy() # only_checked_in = [True for x in temp if temp is True] # return (len(only_checked_in), len(self.checked_in)) # def _get_dead_processes(self): dead_processes = [] for (protocol_id, p) in self.running_processes.items(): if not p.is_alive(): dead_processes.append((protocol_id, p)) # # return dead_processes # def _cleanup_process(self, p, protocol_id, had_error, finished_protocol_cb): p.join() self.running_processes.pop(protocol_id) if finished_protocol_cb is not None: finished_protocol_cb(protocol_id, had_error) # # def _wait(self, timeout=None, finished_protocol_cb=None): return_on_timeout = True if timeout is not None else False timeout = timeout if timeout is not None else 10 last_waiting_message = None # while len(self.running_processes) > 0: dead_processes = self._get_dead_processes() # while len(self.running_processes) > 0: #checked_in_count = self._count_checked_in() #not_checked_in = checked_in_count[1]-checked_in_count[0] not_checked_in = self._get_not_checked_in() # if last_waiting_message is None or last_waiting_message != len(self.running_processes): logging.debug('Waiting for processes ({} left, {} not checked in)'.format(len(self.running_processes), len(not_checked_in))) last_waiting_message = len(self.running_processes) # if len(self.running_processes) <= len(not_checked_in): running_not_checked_in = [protocol_id for protocol_id in self.running_processes if protocol_id in not_checked_in] if len(self.running_processes) == len(running_not_checked_in): logging.debug('The remaining processes have not checked in, so stopping the wait') return # # try: (protocol_id, had_error) = self.local_finished_process_queue.get(timeout=timeout) p = self.running_processes[protocol_id] self._cleanup_process(p, protocol_id, had_error, finished_protocol_cb) if (protocol_id, p) in dead_processes: dead_processes.remove((protocol_id, p)) # logging.debug('Completed protocol (id: {}, checked_in={})'.format(protocol_id, self.checked_in[protocol_id])) except queue.Empty: if return_on_timeout: return else: break # #if kill_timeout is not None: # logging.warning('Timed out waiting for processes to finish, will terminate remaining processes') # kill_remaining = True # # # for (protocol_id, p) in dead_processes: # these processes were dead but didn't add themselves to the finished queue logging.debug('Found a dead process (id: {})'.format(protocol_id)) self._cleanup_process(p, protocol_id, True, finished_protocol_cb) # # # def wait(self, finished_protocol_cb=None, kill_timeout=None): self._wait(kill_timeout, finished_protocol_cb) # if len(self.running_processes) > 0: logging.warning('Timed out ({} seconds) waiting for processes to finish, will terminate remaining processes'.format(kill_timeout)) # while len(self.running_processes) > 0: (protocol_id, p) = next(iter(self.running_processes.items())) # just get any process and kill it was_alive = p.is_alive() p.terminate() logging.debug('Terminated protocol (id: {}, was_dead={}, checked_in={})'.format(protocol_id, (not was_alive), self.checked_in[protocol_id])) # self._cleanup_process(p, protocol_id, True, finished_protocol_cb) # # def stop(self): self.wait(kill_timeout=1.5) self.queue_getter.stop() self.queue_getter.join(timeout=10) self.stopped = True # # def build_client_protocol(endpoint, socks_address, control_address, controller, start_event, send_measureme, wait_duration=0, measureme_id=None, num_bytes=None, buffer_len=None): client_socket = socket.socket() # logging.debug('Socket %d connecting to proxy %r...', client_socket.fileno(), socks_address) client_socket.connect(socks_address) logging.debug('Socket %d connected', client_socket.fileno()) # custom_data = {} # circuit_id = controller.assign_stream(client_socket.getsockname()) custom_data['circuit'] = (circuit_id, controller.circuits[circuit_id]) # if measureme_id is not None: custom_data['measureme_id'] = measureme_id # if send_measureme: assert measureme_id != None hops = list(range(len(controller.circuits[circuit_id])+1))[::-1] # send the measureme cells to the last relay first start_cb = lambda control_address=control_address, circuit_id=circuit_id, measureme_id=measureme_id, \ hops=hops, event=start_event, wait_duration=wait_duration: \ send_measureme_cells_and_wait(control_address, circuit_id, measureme_id, hops, event, wait_duration) else: start_cb = lambda event=start_event, duration=wait_duration: wait_then_sleep(event, duration) # custom_data = json.dumps(custom_data).encode('utf-8') protocol = ExperimentProtocol(client_socket, endpoint, num_bytes, '{}: {}'.format(circuit_id, controller.circuits[circuit_id]), custom_data=custom_data, send_buffer_len=buffer_len, push_start_cb=start_cb) return protocol # if __name__ == '__main__': import argparse # logging.basicConfig(level=logging.DEBUG) logging.getLogger('stem').setLevel(logging.WARNING) # parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).') parser.add_argument('ip', type=str, help='destination ip address') parser.add_argument('port', type=int, help='destination port') parser.add_argument('num_bytes', type=useful.parse_bytes, help='number of bytes to send per connection (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='num-bytes') parser.add_argument('num_streams_per_client', type=int, help='number of streams per Tor client', metavar='num-streams-per-client') parser.add_argument('--buffer-len', type=useful.parse_bytes, help='size of the send and receive buffers (can also end with \'B\', \'KiB\', \'MiB\', or \'GiB\')', metavar='bytes') parser.add_argument('--wait-range', type=int, default=0, help='add a random wait time to each connection so that they don\'t all start at the same time (default is 0)', metavar='time') parser.add_argument('--proxy-control-ports', type=useful.parse_range_list, help='range of ports for the control ports', metavar='control-ports') parser.add_argument('--measureme', action='store_true', help='send measureme cells to the exit') args = parser.parse_args() # endpoint = (args.ip, args.port) # logging.debug('Getting consensus') try: consensus = stem.descriptor.remote.get_consensus(endpoints=(stem.DirPort('127.0.0.1', 7000),)) except Exception as e: raise Exception('Unable to retrieve the consensus') from e # fingerprints = get_fingerprints(consensus) exit_fingerprints = get_exit_fingerprints(consensus, endpoint) non_exit_fingerprints = list(set(fingerprints)-set(exit_fingerprints)) # assert len(exit_fingerprints) == 1, 'Need exactly one exit relay' assert len(non_exit_fingerprints) >= 1, 'Need at least one non-exit relay' # circuit_generator = lambda gen_id=None: [random.choice(non_exit_fingerprints), exit_fingerprints[0]] # proxy_addresses = [] for control_port in args.proxy_control_ports: proxy = {} proxy['control'] = ('127.0.0.1', control_port) proxy['socks'] = ('127.0.0.1', get_socks_port(control_port)) proxy_addresses.append(proxy) # controllers = [] protocol_manager = ExperimentProtocolManager() # try: for proxy_address in proxy_addresses: controller = ExperimentController(proxy_address['control']) controller.connect() # the controller has to attach new streams to circuits, so the # connection has to stay open until we're done creating streams # for _ in range(args.num_streams_per_client): # make a circuit for each stream controller.build_circuit(circuit_generator) time.sleep(0.5) # controllers.append(controller) # start_event = multiprocessing.Event() # for stream_index in range(args.num_streams_per_client): for (controller_index, proxy_address, controller) in zip(range(len(controllers)), proxy_addresses, controllers): if args.measureme: measureme_id = stream_index*args.num_streams_per_client + controller_index + 1 else: measureme_id = None # wait_duration = random.randint(0, args.wait_range) protocol = build_client_protocol(endpoint, proxy_address['socks'], proxy_address['control'], controller, start_event, args.measureme, wait_duration=wait_duration, measureme_id=measureme_id, num_bytes=args.num_bytes, buffer_len=args.buffer_len) protocol_manager.start_experiment_protocol(protocol, protocol_id=None) # # time.sleep(2) start_event.set() # protocol_manager.wait(finished_protocol_cb=lambda protocol_id,had_error: logging.info('Finished {} (had_error={})'.format(protocol_id,had_error))) finally: for controller in controllers: controller.disconnect() # protocol_manager.stop() # #