#!/usr/bin/python3 # import throughput_protocols import basic_protocols import os import multiprocessing import threading import queue import logging import argparse import useful # class ThroughputServer: def __init__(self, bind_endpoint, stop_event=None): self.bind = bind_endpoint self.stop_event = stop_event # self.server_listener = basic_protocols.ServerListener(bind_endpoint, self._accept_callback) # self.accepted_pending = [] self.processes = [] self.process_counter = 0 # self.results_queue = multiprocessing.Queue() self.results = [] self.results_getter = useful.QueueGetter(self.results_queue, self.results.append) # if self.stop_event is not None: self.event_thread = threading.Thread(target=self._wait_for_event, args=(self.stop_event, self._stop)) self.event_thread.start() else: self.event_thread = None # # def _accept_callback(self, socket): conn_id = self.process_counter self.process_counter += 1 logging.info('Server protocol id: {} accepted'.format(conn_id)) # self.accepted_pending.append((socket, conn_id)) # def _start_new_process(self, socket, conn_id): p = multiprocessing.Process(target=self._start_server_conn, args=(socket, conn_id)) self.processes.append(p) p.start() # # close this process' copy of the socket socket.close() # def _start_server_conn(self, socket, conn_id): results_callback = lambda results: self.results_queue.put({'conn_id':conn_id, 'results':results}) protocol = throughput_protocols.ServerProtocol(socket, results_callback=results_callback, use_acceleration=True) try: logging.info('Server protocol id: {} starting'.format(conn_id)) protocol.run() except KeyboardInterrupt: logging.info('Server protocol id: {} stopped (KeyboardInterrupt)'.format(conn_id)) except: logging.exception('Server protocol id: {} had an error'.format(conn_id)) finally: logging.info('Server protocol id: {} done'.format(conn_id)) socket.close() # # def _wait_for_event(self, event, callback): event.wait() callback() # def _stop(self): self.server_listener.stop() # def run(self): try: while True: block = True if len(self.accepted_pending) > 0: block = False # accepted_something = self.server_listener.accept(False) if not accepted_something and len(self.accepted_pending) > 0: # didn't accept anything and we have things to do (socket, conn_id) = self.accepted_pending.pop(0) self._start_new_process(socket, conn_id) # # except OSError as e: if e.errno == 22 and self.stop_event is not None and self.stop_event.is_set(): # we closed the socket on purpose pass else: raise # finally: if self.stop_event is not None: # set the event to stop the thread self.stop_event.set() # if self.event_thread is not None: # make sure the event thread is stopped self.event_thread.join() # for p in self.processes: # wait for all processes to finish p.join() # # finish reading from the results queue self.results_getter.stop() self.results_getter.join() # # # if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) # parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).') parser.add_argument('port', type=int, help='listen on port') parser.add_argument('--localhost', action='store_true', help='bind to 127.0.0.1 instead of 0.0.0.0') args = parser.parse_args() # if args.localhost: bind_to = ('127.0.0.1', args.port) else: bind_to = ('0.0.0.0', args.port) # #stop_event = multiprocessing.Event() server = ThroughputServer(bind_to, None) try: server.run() except KeyboardInterrupt: print('') logging.debug('Server stopped (KeyboardInterrupt).') # #t = threading.Thread(target=server.run) #t.start() #try: # t.join() #except KeyboardInterrupt: # print('') # stop_event.set() # #p = multiprocessing.Process(target=server.run) #p.start() #try: # p.join() #except KeyboardInterrupt: # print('') # stop_event.set() # results = server.results # group_ids = list(set([x['results']['custom_data'] for x in results])) groups = [(g, [r['results'] for r in results if r['results']['custom_data'] == g]) for g in group_ids] # for (group_id, group) in groups: avg_data_size = sum([x['data_size'] for x in group])/len(group) avg_transfer_rate = sum([x['transfer_rate'] for x in group])/len(group) time_of_first_byte = min([x['time_of_first_byte'] for x in group]) time_of_last_byte = max([x['time_of_last_byte'] for x in group]) total_transfer_rate = sum([x['data_size'] for x in group])/(time_of_last_byte-time_of_first_byte) # logging.info('Group id: %s', int.from_bytes(group_id, byteorder='big') if len(group_id)!=0 else None) logging.info(' Group size: %d', len(group)) logging.info(' Avg Transferred (MiB): %.4f', avg_data_size/(1024**2)) logging.info(' Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2)) logging.info(' Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2)) # #