#!/usr/bin/python3 # import throughput_protocols import basic_protocols import os import multiprocessing import threading import queue import logging import argparse # def overlap_byte_counters(byte_counters): start_time = None finish_time = None for x in byte_counters: if start_time is None or x['start_time'] < start_time: start_time = x['start_time'] # if finish_time is None or x['start_time']+len(x['history']) > finish_time: finish_time = x['start_time']+len(x['history']) # # total_history = [0]*(finish_time-start_time) # for x in byte_counters: for y in range(len(x['history'])): total_history[(x['start_time']-start_time)+y] += x['history'][y] # # return total_history # if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) # parser = argparse.ArgumentParser(description='Test the network throughput (optionally through a proxy).') parser.add_argument('port', type=int, help='listen on port') parser.add_argument('--no-accel', action='store_true', help='don\'t use C acceleration (use pure Python)') parser.add_argument('--localhost', action='store_true', help='bind to 127.0.0.1 instead of 0.0.0.0') args = parser.parse_args() # if args.localhost: endpoint = ('127.0.0.1', args.port) else: endpoint = ('0.0.0.0', args.port) # processes = [] processes_map = {} joinable_connections = multiprocessing.Queue() joinable_connections_list = [] conn_counter = [0] group_queue = multiprocessing.Queue() group_queue_list = [] bw_queue = multiprocessing.Queue() bw_queue_list = [] # def group_id_callback(conn_id, group_id): # put them in a queue to display later #logging.debug('For conn %d Received group id: %d', conn_id, group_id) group_queue.put({'conn_id':conn_id, 'group_id':group_id}) # #def bw_callback(conn_id, data_size, time_first_byte, time_last_byte, transfer_rate, byte_counter, byte_counter_start_time): def bw_callback(conn_id, custom_data, data_size, time_first_byte, time_last_byte, transfer_rate, deltas): # put them in a queue to display later #bw_queue.put({'conn_id':conn_id, 'data_size':data_size, 'time_of_first_byte':time_first_byte, 'time_of_last_byte':time_last_byte, 'transfer_rate':transfer_rate, 'byte_counter':byte_counter, 'byte_counter_start_time':byte_counter_start_time}) bw_queue.put({'conn_id':conn_id, 'custom_data':custom_data, 'data_size':data_size, 'time_of_first_byte':time_first_byte, 'time_of_last_byte':time_last_byte, 'transfer_rate':transfer_rate, 'deltas':deltas}) # def start_server_conn(socket, conn_id): server = throughput_protocols.ServerProtocol(socket, conn_id, group_id_callback=group_id_callback, bandwidth_callback=bw_callback, use_acceleration=(not args.no_accel)) try: server.run() except KeyboardInterrupt: socket.close() finally: joinable_connections.put(conn_id) ''' while True: # while we're waiting to join, we might get a KeyboardInterrupt, # in which case we cannot let the process end since it will kill # the queue threads, which may be waiting to push data to the pipe try: joinable_connections.close() group_queue.close() bw_queue.close() # group_queue.join_thread() bw_queue.join_thread() joinable_connections.join_thread() # break except KeyboardInterrupt: pass # # ''' # # def accept_callback(socket): conn_id = conn_counter[0] conn_counter[0] += 1 #logging.debug('Adding connection %d', conn_id) p = multiprocessing.Process(target=start_server_conn, args=(socket, conn_id)) processes.append(p) processes_map[conn_id] = p p.start() socket.close() # close this process' copy of the socket # def unqueue(q, l, print_len=False): while True: val = q.get() if val is None: break # l.append(val) if print_len: print('Queue length: {}'.format(len(l)), end='\r') # # # l = basic_protocols.ServerListener(endpoint, accept_callback) # t_joinable_connections = threading.Thread(target=unqueue, args=(joinable_connections, joinable_connections_list)) t_group_queue = threading.Thread(target=unqueue, args=(group_queue, group_queue_list)) t_bw_queue = threading.Thread(target=unqueue, args=(bw_queue, bw_queue_list, True)) # t_joinable_connections.start() t_group_queue.start() t_bw_queue.start() # try: while True: l.accept() ''' try: while True: conn_id = joinable_connections.get(False) p = processes_map[conn_id] p.join() # except queue.Empty: pass # ''' # except KeyboardInterrupt: print() # try: for p in processes: p.join() # except KeyboardInterrupt: pass # joinable_connections.put(None) group_queue.put(None) bw_queue.put(None) t_joinable_connections.join() t_group_queue.join() t_bw_queue.join() # bw_values = {} group_values = {} # ''' logging.info('BW queue length: {}'.format(bw_queue.qsize())) logging.info('Group queue length: {}'.format(group_queue.qsize())) # temp_counter = 0 try: while True: bw_val = bw_queue.get(False) bw_values[bw_val['conn_id']] = bw_val temp_counter += 1 # except queue.Empty: pass # logging.info('temp counter: {}'.format(temp_counter)) import time time.sleep(2) try: while True: bw_val = bw_queue.get(False) bw_values[bw_val['conn_id']] = bw_val temp_counter += 1 # except queue.Empty: pass # logging.info('temp counter: {}'.format(temp_counter)) # try: while True: group_val = group_queue.get(False) group_values[group_val['conn_id']] = group_val # except queue.Empty: pass # logging.info('bw_values length: {}'.format(len(bw_values))) logging.info('group_values length: {}'.format(len(group_values))) logging.info('group_values set: {}'.format(list(set([x['group_id'] for x in group_values.values()])))) # ''' # #logging.info('BW list length: {}'.format(len(bw_queue_list))) #logging.info('Group list length: {}'.format(len(group_queue_list))) # for x in bw_queue_list: bw_values[x['conn_id']] = x # for x in group_queue_list: group_values[x['conn_id']] = x # group_set = set([x['group_id'] for x in group_values.values()]) for group in group_set: # doesn't handle group == None conns_in_group = [x[0] for x in group_values.items() if x[1]['group_id'] == group] in_group = [x for x in bw_values.values() if x['conn_id'] in conns_in_group] if len(in_group) > 0: avg_data_size = sum([x['data_size'] for x in in_group])/len(in_group) avg_transfer_rate = sum([x['transfer_rate'] for x in in_group])/len(in_group) total_transfer_rate = sum([x['data_size'] for x in in_group])/(max([x['time_of_last_byte'] for x in in_group])-min([x['time_of_first_byte'] for x in in_group])) # logging.info('Group size: %d', len(in_group)) logging.info('Avg Transferred (MiB): %.4f', avg_data_size/(1024**2)) logging.info('Avg Transfer rate (MiB/s): %.4f', avg_transfer_rate/(1024**2)) logging.info('Total Transfer rate (MiB/s): %.4f', total_transfer_rate/(1024**2)) # ''' import math histories = [{'start_time':x['byte_counter_start_time'], 'history':x['byte_counter']} for x in in_group] total_history = overlap_byte_counters(histories) # logging.info('Max Transfer rate (MiB/s): %.4f', max(total_history)/(1024**2)) if sum(total_history) != sum([x['data_size'] for x in in_group]): logging.warning('History doesn\'t add up ({} != {}).'.format(sum(total_history), sum([x['data_size'] for x in in_group]))) # import json with open('/tmp/group-{}.json'.format(group), 'w') as f: json.dump({'id':group, 'history':total_history, 'individual_histories':histories, 'size':len(in_group), 'avg_transferred':avg_data_size, 'avg_transfer_rate':avg_transfer_rate, 'total_transfer_rate':total_transfer_rate}, f) # ''' custom_data = [x['custom_data'].decode('utf-8') for x in in_group] # histories = [x['deltas'] for x in in_group] combined_timestamps, combined_bytes = zip(*sorted(zip([x for y in histories for x in y['timestamps']], [x for y in histories for x in y['bytes']]))) combined_history = {'bytes':combined_bytes, 'timestamps':combined_timestamps} #combined_history = sorted([item for sublist in histories for item in sublist['deltas']], key=lambda x: x['timestamp']) # sum_history_bytes = sum(combined_history['bytes']) sum_data_bytes = sum([x['data_size'] for x in in_group]) if sum_history_bytes != sum_data_bytes: logging.warning('History doesn\'t add up ({} != {}).'.format(sum_history_bytes, sum_data_bytes)) # import json import gzip with gzip.GzipFile('/tmp/group-{}.json.gz'.format(group), 'w') as f: f.write(json.dumps({'id':group, 'history':combined_history, 'individual_histories':histories, 'size':len(in_group), 'avg_transferred':avg_data_size, 'avg_transfer_rate':avg_transfer_rate, 'total_transfer_rate':total_transfer_rate, 'custom_data':custom_data}, f).encode('utf-8')) # # # # for p in processes: p.join() # #