123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- #!/usr/bin/python3
- #
- import itertools
- #
- def read_log(f):
- events = {}
- events['recv_edge_data'] = ('timestamp', 'cell_id', 'length')
- events['send_edge_data'] = ('timestamp', 'cell_id', 'length')
- events['recv_relay_cell'] = ('timestamp', 'cell_id', 'fingerprint', 'payload')
- events['send_relay_cell'] = ('timestamp', 'cell_id', 'fingerprint', 'payload')
- events['recv_sendme'] = ('timestamp', 'window_size')
- events['send_sendme'] = ('timestamp', 'window_size')
- #
- log = {}
- log['measureme_id'] = {}
- log['fingerprint'] = None
- #
- def new_measureme_id(measureme_id):
- log['measureme_id'][measureme_id] = {}
- log['measureme_id'][measureme_id]['stream_id'] = {}
- log['measureme_id'][measureme_id]['circuit'] = {}
- log['measureme_id'][measureme_id]['circuit']['event'] = {}
- #
- def new_stream_id(measureme_id, stream_id):
- log['measureme_id'][measureme_id]['stream_id'][stream_id] = {}
- log['measureme_id'][measureme_id]['stream_id'][stream_id]['event'] = {}
- #
- def new_event(where, event):
- where['event'][event] = {}
- where['event'][event]['forward'] = {}
- where['event'][event]['backward'] = {}
- #
- for value in events[event]:
- where['event'][event]['forward'][value] = []
- where['event'][event]['backward'][value] = []
- #
- #
- for line in f:
- line_values = [x.strip() for x in line.split(',')]
- timestamp = float(line_values[0])
- event = line_values[1].lower()
- event_values = line_values[2:]
- #
- if event == 'fingerprint':
- try:
- (fingerprint,) = event_values
- except Exception as e:
- raise Exception('Trying to parse: {}'.format(event_values)) from e
- #
- try:
- if int(fingerprint) == 0:
- # if the fingerprint is printed as all zeroes, then it is an OP
- fingerprint = None
- #
- except:
- pass
- #
- log['fingerprint'] = fingerprint
- elif event == 'recv_edge_data' or event == 'send_edge_data':
- try:
- (direction, measureme_id, stream_id, cell_id, length) = event_values
- #
- direction = direction.lower()
- measureme_id = int(measureme_id)
- stream_id = int(stream_id)
- cell_id = int(cell_id)
- length = int(length)
- except Exception as e:
- raise Exception('Trying to parse: {}'.format(event_values)) from e
- #
- if measureme_id not in log['measureme_id']:
- new_measureme_id(measureme_id)
- #
- if stream_id not in log['measureme_id'][measureme_id]['stream_id']:
- new_stream_id(measureme_id, stream_id)
- #
- where = log['measureme_id'][measureme_id]['stream_id'][stream_id]
- #
- if event not in where['event']:
- new_event(where, event)
- #
- where = where['event'][event][direction]
- #
- where['timestamp'].append(timestamp)
- where['cell_id'].append(cell_id)
- where['length'].append(length)
- elif event == 'recv_relay_cell' or event == 'send_relay_cell':
- try:
- (direction, measureme_id, cell_id, fingerprint, payload) = event_values
- #
- direction = direction.lower()
- measureme_id = int(measureme_id)
- cell_id = int(cell_id)
- payload = int(payload)
- except Exception as e:
- raise Exception('Trying to parse: {}'.format(event_values)) from e
- #
- if measureme_id not in log['measureme_id']:
- new_measureme_id(measureme_id)
- #
- where = log['measureme_id'][measureme_id]['circuit']
- #
- if event not in where['event']:
- new_event(where, event)
- #
- where = where['event'][event][direction]
- #
- where['timestamp'].append(timestamp)
- where['cell_id'].append(cell_id)
- where['fingerprint'].append(fingerprint)
- where['payload'].append(payload)
- elif event == 'recv_sendme' or event == 'send_sendme':
- try:
- (direction, measureme_id, stream_id, window_size) = event_values
- #
- direction = direction.lower()
- measureme_id = int(measureme_id)
- stream_id = int(stream_id)
- window_size = int(window_size)
- except Exception as e:
- raise Exception('Trying to parse: {}'.format(event_values)) from e
- #
- if measureme_id not in log['measureme_id']:
- new_measureme_id(measureme_id)
- #
- if stream_id not in log['measureme_id'][measureme_id]['stream_id']:
- new_stream_id(measureme_id, stream_id)
- #
- where = log['measureme_id'][measureme_id]['stream_id'][stream_id]
- #
- if event not in where['event']:
- new_event(where, event)
- #
- where = where['event'][event][direction]
- #
- where['timestamp'].append(timestamp)
- where['window_size'].append(window_size)
- #
- #
- return log
- #
- def follow_stream(this_relay, cell_ids_forward, cell_ids_backward, measureme_id, stream_id):
- this_hop = {}
- #
- for direction in ('forward', 'backward'):
- this_hop[direction] = {}
- for x in ('received', 'sent'):
- this_hop[direction][x] = {}
- this_hop[direction][x]['timestamp'] = []
- this_hop[direction][x]['length'] = []
- #
- #
- for stream_id in this_relay['measureme_id'][measureme_id]['stream_id']:
- for cell_id in cell_ids:
- try:
- cell_index = this_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]['cell_id'].index(cell_id)
- except ValueError:
- continue
- #
- this_hop[direction]['received']['timestamp'].append(
- this_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]['timestamp'][cell_index])
- this_hop[direction]['received']['length'].append(
- this_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]['length'][cell_index])
- #
- #
- #
- def link_relay_cells(relay_1, relay_2):
- common_measureme_ids = list(set(relay_1['measureme_id'].keys()) & set(relay_2['measureme_id'].keys()))
- results = {}
- for measureme_id in common_measureme_ids:
- for direction in ('forward', 'backward'):
- if direction == 'forward':
- relay_1_cells = relay_1['measureme_id'][measureme_id]['circuit']['event']['send_relay_cell'][direction]
- relay_2_cells = relay_2['measureme_id'][measureme_id]['circuit']['event']['recv_relay_cell'][direction]
- else:
- relay_1_cells = relay_1['measureme_id'][measureme_id]['circuit']['event']['recv_relay_cell'][direction]
- relay_2_cells = relay_2['measureme_id'][measureme_id]['circuit']['event']['send_relay_cell'][direction]
- #
- #print(relay_1['fingerprint'])
- #print(len(relay_1_cells['fingerprint']))
- #print(relay_2['fingerprint'])
- #print(len(relay_2_cells['fingerprint']))
- if len(relay_1_cells['fingerprint']) == 0 or len(relay_2_cells['fingerprint']) == 0:
- continue
- #
- assert all([x==relay_1_cells['fingerprint'][0] for x in relay_1_cells['fingerprint']])
- assert all([x==relay_2_cells['fingerprint'][0] for x in relay_2_cells['fingerprint']])
- # make sure all the fingerprints are the same for each relay cell
- #
- #if relay_1_cells['fingerprint'][0] != relay_2['fingerprint'] or relay_2_cells['fingerprint'][0] != relay_1['fingerprint']:
- if relay_1_cells['fingerprint'][0] != relay_2['fingerprint'][:8]:
- #print(relay_1_cells['fingerprint'][0])
- #print(relay_2['fingerprint'])
- continue
- #
- lookahead = 10
- relay_1_start = None
- relay_2_start = None
- #
- for (x,y) in itertools.product(range(lookahead),range(lookahead)):
- #for (x,y) in zip(range(lookahead),range(lookahead)):
- if relay_1_cells['payload'][x] == relay_2_cells['payload'][y]:
- relay_1_start = x
- relay_2_start = y
- break
- #
- #
- assert relay_1_start is not None and relay_2_start is not None
- #
- assert len(relay_1_cells['cell_id'][relay_1_start:]) == len(relay_2_cells['cell_id'][relay_2_start:]), \
- '{} /= {} for {} to {}'.format(len(relay_1_cells['cell_id'][relay_1_start:]), len(relay_2_cells['cell_id'][relay_2_start:]), relay_1['fingerprint'], relay_2['fingerprint'])
- # print('{} /= {} for {} to {}'.format(len(relay_1_cells['cell_id'][relay_1_start:]), len(relay_2_cells['cell_id'][relay_2_start:]), relay_1['fingerprint'], relay_2['fingerprint']))
- #
- #results[measureme_id][direction] = list(zip(relay_1_cells['cell_id'], relay_2_cells['cell_id']))
- if measureme_id not in results:
- results[measureme_id] = {}
- #
- results[measureme_id][direction] = {x[0]:x[1] for x in zip(relay_1_cells['cell_id'][relay_1_start:], relay_2_cells['cell_id'][relay_2_start:])}
- #
- #
- return results
- #
- def reverse_links(links):
- results = {}
- for measureme_id in links:
- results[measureme_id] = {}
- for direction in links[measureme_id]:
- results[measureme_id][direction] = {links[measureme_id][direction][x]: x for x in links[measureme_id][direction]}
- #
- #
- return results
- #
- def get_info_for_cell_ids(where, cell_ids, keys):
- output = {}
- for key in keys:
- output[key] = []
- #
- for cell_id in cell_ids:
- cell_index = where['cell_id'].index(cell_id)
- for key in keys:
- output[key].append(where[key][cell_index])
- #
- #
- return output
- #
- def get_streams_from_logs(logs):
- proxies = [log for log in logs if log['fingerprint'] is None]
- relays = [log for log in logs if log['fingerprint'] is not None]
- #relays_by_fingerprint = {log['fingerprint']: log for log in logs if log['fingerprint'] is not None}
- #
- measureme_ids = [proxy['measureme_id'].keys() for proxy in proxies]
- for x in range(len(proxies)):
- for y in range(x+1, len(proxies)):
- # make sure that two different proxies do not use the same measureme_id
- overlapping_ids = set(measureme_ids[x]) & set(measureme_ids[y])
- if len(overlapping_ids) > 0:
- raise Exception('Two proxies ({} and {}) have circuits with the same measureme_id ({}).'.format(x, y, overlapping_ids))
- #
- #
- #
- linked_relay_cells = {}
- #
- for proxy in proxies:
- for relay in relays:
- #
- links = link_relay_cells(proxy, relay)
- linked_relay_cells[(logs.index(proxy), logs.index(relay))] = links
- #linked_relay_cells[(logs.index(relay), logs.index(proxy))] = reverse_links(links)
- #
- #
- print('Done proxies')
- for x in range(len(relays)):
- for y in range(x+1, len(relays)):
- relay_x = relays[x]
- relay_y = relays[y]
- #
- links = link_relay_cells(relay_x, relay_y)
- #linked_relay_cells[(logs.index(relay_x), logs.index(relay_y))] = links
- #linked_relay_cells[(logs.index(relay_y), logs.index(relay_x))] = reverse_links(links)
- linked_relay_cells[(logs.index(relay_x), logs.index(relay_y))] = link_relay_cells(relay_x, relay_y)
- linked_relay_cells[(logs.index(relay_y), logs.index(relay_x))] = link_relay_cells(relay_y, relay_x)
- #
- print('x: {}'.format(x))
- #
- print('Started')
- streams = {}
- streams_completed = 0
- for proxy in proxies:
- for measureme_id in proxy['measureme_id']:
- streams[measureme_id] = {}
- for stream_id in proxy['measureme_id'][measureme_id]['stream_id']:
- streams[measureme_id][stream_id] = {}
- for direction in ('forward', 'backward'):
- streams[measureme_id][stream_id][direction] = []
- if direction == 'forward':
- where = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]
- #cell_ids = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data']['forward']['cell_id']
- #lengths = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data']['forward']['length']
- else:
- where = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['send_edge_data'][direction]
- #cell_ids = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['send_edge_data']['backward']['cell_id']
- #lengths = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['send_edge_data']['backward']['length']
- #
- lengths = where['length']
- cell_ids = where['cell_id']
- current_relay = proxy
- #
- while current_relay is not None:
- this_hop = {}
- this_hop['fingerprint'] = current_relay['fingerprint']
- #
- new_cell_ids = []
- next_relay = None
- #
- for x in ('received', 'sent'):
- this_hop[x] = {}
- this_hop[x]['timestamp'] = []
- this_hop[x]['length'] = []
- #
- if current_relay == proxy:
- if direction == 'forward':
- where = current_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]
- this_hop['received']['timestamp'] = where['timestamp']
- this_hop['received']['length'] = lengths
- else:
- where = current_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['send_edge_data'][direction]
- this_hop['sent']['timestamp'] = where['timestamp']
- this_hop['sent']['length'] = lengths
- #
- else:
- if direction == 'forward':
- where = current_relay['measureme_id'][measureme_id]['circuit']['event']['recv_relay_cell'][direction]
- info = get_info_for_cell_ids(where, cell_ids, ['timestamp'])
- this_hop['received']['timestamp'] = info['timestamp']
- this_hop['received']['length'] = lengths
- else:
- where = current_relay['measureme_id'][measureme_id]['circuit']['event']['send_relay_cell'][direction]
- info = get_info_for_cell_ids(where, cell_ids, ['timestamp'])
- this_hop['sent']['timestamp'] = info['timestamp']
- this_hop['sent']['length'] = lengths
- #
- #
- for x in linked_relay_cells:
- if x[0] == logs.index(current_relay) and measureme_id in linked_relay_cells[x] and \
- direction in linked_relay_cells[x][measureme_id]:
- # if the current relay has a linked relay, and they both have the current measureme_id
- for cell_id in cell_ids:
- paired_cell_id = linked_relay_cells[x][measureme_id][direction][cell_id]
- new_cell_ids.append(paired_cell_id)
- #
- next_relay = logs[x[1]]
- break
- #
- #
- if next_relay is None:
- # exiting
- if direction == 'forward':
- where = current_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['send_edge_data'][direction]
- this_hop['sent']['timestamp'] = where['timestamp']
- this_hop['sent']['length'] = lengths
- else:
- where = current_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]
- this_hop['received']['timestamp'] = where['timestamp']
- this_hop['received']['length'] = lengths
- #
- else:
- # passing to next hop
- assert(len(cell_ids) == len(new_cell_ids))
- if direction == 'forward':
- where = current_relay['measureme_id'][measureme_id]['circuit']['event']['send_relay_cell'][direction]
- info = get_info_for_cell_ids(where, cell_ids, ['timestamp'])
- this_hop['sent']['timestamp'] = info['timestamp']
- this_hop['sent']['length'] = lengths
- else:
- where = current_relay['measureme_id'][measureme_id]['circuit']['event']['recv_relay_cell'][direction]
- info = get_info_for_cell_ids(where, cell_ids, ['timestamp'])
- this_hop['received']['timestamp'] = info['timestamp']
- this_hop['received']['length'] = lengths
- #
- #
- current_relay = next_relay
- cell_ids = new_cell_ids
- streams[measureme_id][stream_id][direction].append(this_hop)
- #
- #
- streams_completed += 1
- print('Completed: {}'.format(streams_completed))
- #
- #
- #
- return streams
- #
- if __name__ == '__main__':
- import sys
- import os
- import pickle
- import gzip
- #
- logs = []
- #
- save_to = 'measureme-data.pickle.gz'
- if os.path.isfile(save_to):
- okay_to_overwrite = input('Output file \'{}\' already exists. Would you like to overwrite it? [y/n]: '.format(save_to)).strip()
- okay_to_overwrite = (okay_to_overwrite.lower() == 'y')
- if not okay_to_overwrite:
- print('Exiting')
- exit()
- #
- #
- for arg in sys.argv[1:]:
- with open(arg, 'r') as f:
- logs.append(read_log(f))
- #
- #
- streams = get_streams_from_logs(logs)
- #
- with gzip.GzipFile(save_to, 'wb') as f:
- pickle.dump(streams, f, protocol=4)
- #
- #
|