#!/usr/bin/python3 # import throughput_protocols import basic_protocols import os import multiprocessing import queue import logging import argparse # if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) # parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).') parser.add_argument('port', type=int, help='listen on port') parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)') args = parser.parse_args() # endpoint = ('127.0.0.1', args.port) # processes = [] processes_map = {} joinable_connections = multiprocessing.Queue() conn_counter = [0] group_queue = multiprocessing.Queue() bw_queue = multiprocessing.Queue() # def group_id_callback(conn_id, group_id): # put them in a queue to display later #logging.debug('For conn %d Received group id: %d', conn_id, group_id) group_queue.put({'conn_id':conn_id, 'group_id':group_id}) # def bw_callback(conn_id, data_size, time_first_byte, time_last_byte, transfer_rate): # put them in a queue to display later bw_queue.put({'conn_id':conn_id, 'data_size':data_size, 'time_of_first_byte':time_first_byte, 'time_of_last_byte':time_last_byte, 'transfer_rate':transfer_rate}) # def start_server_conn(socket, conn_id): server = throughput_protocols.ServerProtocol(socket, conn_id, group_id_callback=group_id_callback, bandwidth_callback=bw_callback, use_acceleration=(not args.no_accel)) try: server.run() except KeyboardInterrupt: socket.close() finally: joinable_connections.put(conn_id) # # def accept_callback(socket): conn_id = conn_counter[0] conn_counter[0] += 1 #logging.debug('Adding connection %d', conn_id) p = multiprocessing.Process(target=start_server_conn, args=(socket, conn_id)) processes.append(p) processes_map[conn_id] = p p.start() socket.close() # close this process' copy of the socket # l = basic_protocols.ServerListener(endpoint, accept_callback) # try: while True: l.accept() try: while True: conn_id = joinable_connections.get(False) p = processes_map[conn_id] p.join() # except queue.Empty: pass # # except KeyboardInterrupt: print() # bw_values = {} group_values = {} # try: while True: bw_val = bw_queue.get(False) bw_values[bw_val['conn_id']] = bw_val # except queue.Empty: pass # try: while True: group_val = group_queue.get(False) group_values[group_val['conn_id']] = group_val # except queue.Empty: pass # group_set = set([x['group_id'] for x in group_values.values()]) for group in group_set: # doesn't handle group == None conns_in_group = [x[0] for x in group_values.items() if x[1]['group_id'] == group] in_group = [x for x in bw_values.values() if x['conn_id'] in conns_in_group] if len(in_group) > 0: avg_data_size = sum([x['data_size'] for x in in_group])/len(in_group) avg_transfer_rate = sum([x['transfer_rate'] for x in in_group])/len(in_group) total_transfer_rate = sum([x['data_size'] for x in in_group])/(max([x['time_of_last_byte'] for x in in_group])-min([x['time_of_first_byte'] for x in in_group])) # logging.info('Group size: %d', len(in_group)) logging.info('Avg Transferred (MB): %.4f', avg_data_size/(1024**2)) logging.info('Avg Transfer rate (MB/s): %.4f', avg_transfer_rate/(1024**2)) logging.info('Total Transfer rate (MB/s): %.4f', total_transfer_rate/(1024**2)) # # # for p in processes: p.join() # #