parse_measureme_logs.py 15 KB


  1. #!/usr/bin/python3
  2. #
  3. import itertools
  4. #
  5. def read_log(f):
  6. events = {}
  7. events['recv_edge_data'] = ('timestamp', 'cell_id', 'length')
  8. events['send_edge_data'] = ('timestamp', 'cell_id', 'length')
  9. events['recv_relay_cell'] = ('timestamp', 'cell_id', 'fingerprint', 'payload')
  10. events['send_relay_cell'] = ('timestamp', 'cell_id', 'fingerprint', 'payload')
  11. events['recv_sendme'] = ('timestamp', 'window_size')
  12. events['send_sendme'] = ('timestamp', 'window_size')
  13. #
  14. log = {}
  15. log['measureme_id'] = {}
  16. log['fingerprint'] = None
  17. #
  18. def new_measureme_id(measureme_id):
  19. log['measureme_id'][measureme_id] = {}
  20. log['measureme_id'][measureme_id]['stream_id'] = {}
  21. log['measureme_id'][measureme_id]['circuit'] = {}
  22. log['measureme_id'][measureme_id]['circuit']['event'] = {}
  23. #
  24. def new_stream_id(measureme_id, stream_id):
  25. log['measureme_id'][measureme_id]['stream_id'][stream_id] = {}
  26. log['measureme_id'][measureme_id]['stream_id'][stream_id]['event'] = {}
  27. #
  28. def new_event(where, event):
  29. where['event'][event] = {}
  30. where['event'][event]['forward'] = {}
  31. where['event'][event]['backward'] = {}
  32. #
  33. for value in events[event]:
  34. where['event'][event]['forward'][value] = []
  35. where['event'][event]['backward'][value] = []
  36. #
  37. #
  38. for line in f:
  39. line_values = [x.strip() for x in line.split(',')]
  40. timestamp = float(line_values[0])
  41. event = line_values[1].lower()
  42. event_values = line_values[2:]
  43. #
  44. if event == 'fingerprint':
  45. try:
  46. (fingerprint,) = event_values
  47. except Exception as e:
  48. raise Exception('Trying to parse: {}'.format(event_values)) from e
  49. #
  50. try:
  51. if int(fingerprint) == 0:
  52. # if the fingerprint is printed as all zeroes, then it is an OP
  53. fingerprint = None
  54. #
  55. except:
  56. pass
  57. #
  58. log['fingerprint'] = fingerprint
  59. elif event == 'recv_edge_data' or event == 'send_edge_data':
  60. try:
  61. (direction, measureme_id, stream_id, cell_id, length) = event_values
  62. #
  63. direction = direction.lower()
  64. measureme_id = int(measureme_id)
  65. stream_id = int(stream_id)
  66. cell_id = int(cell_id)
  67. length = int(length)
  68. except Exception as e:
  69. raise Exception('Trying to parse: {}'.format(event_values)) from e
  70. #
  71. if measureme_id not in log['measureme_id']:
  72. new_measureme_id(measureme_id)
  73. #
  74. if stream_id not in log['measureme_id'][measureme_id]['stream_id']:
  75. new_stream_id(measureme_id, stream_id)
  76. #
  77. where = log['measureme_id'][measureme_id]['stream_id'][stream_id]
  78. #
  79. if event not in where['event']:
  80. new_event(where, event)
  81. #
  82. where = where['event'][event][direction]
  83. #
  84. where['timestamp'].append(timestamp)
  85. where['cell_id'].append(cell_id)
  86. where['length'].append(length)
  87. elif event == 'recv_relay_cell' or event == 'send_relay_cell':
  88. try:
  89. (direction, measureme_id, cell_id, fingerprint, payload) = event_values
  90. #
  91. direction = direction.lower()
  92. measureme_id = int(measureme_id)
  93. cell_id = int(cell_id)
  94. payload = int(payload)
  95. except Exception as e:
  96. raise Exception('Trying to parse: {}'.format(event_values)) from e
  97. #
  98. if measureme_id not in log['measureme_id']:
  99. new_measureme_id(measureme_id)
  100. #
  101. where = log['measureme_id'][measureme_id]['circuit']
  102. #
  103. if event not in where['event']:
  104. new_event(where, event)
  105. #
  106. where = where['event'][event][direction]
  107. #
  108. where['timestamp'].append(timestamp)
  109. where['cell_id'].append(cell_id)
  110. where['fingerprint'].append(fingerprint)
  111. where['payload'].append(payload)
  112. elif event == 'recv_sendme' or event == 'send_sendme':
  113. try:
  114. (direction, measureme_id, stream_id, window_size) = event_values
  115. #
  116. direction = direction.lower()
  117. measureme_id = int(measureme_id)
  118. stream_id = int(stream_id)
  119. window_size = int(window_size)
  120. except Exception as e:
  121. raise Exception('Trying to parse: {}'.format(event_values)) from e
  122. #
  123. if measureme_id not in log['measureme_id']:
  124. new_measureme_id(measureme_id)
  125. #
  126. if stream_id not in log['measureme_id'][measureme_id]['stream_id']:
  127. new_stream_id(measureme_id, stream_id)
  128. #
  129. where = log['measureme_id'][measureme_id]['stream_id'][stream_id]
  130. #
  131. if event not in where['event']:
  132. new_event(where, event)
  133. #
  134. where = where['event'][event][direction]
  135. #
  136. where['timestamp'].append(timestamp)
  137. where['window_size'].append(window_size)
  138. #
  139. #
  140. return log
  141. #
  142. def follow_stream(this_relay, cell_ids_forward, cell_ids_backward, measureme_id, stream_id):
  143. this_hop = {}
  144. #
  145. for direction in ('forward', 'backward'):
  146. this_hop[direction] = {}
  147. for x in ('received', 'sent'):
  148. this_hop[direction][x] = {}
  149. this_hop[direction][x]['timestamp'] = []
  150. this_hop[direction][x]['length'] = []
  151. #
  152. #
  153. for stream_id in this_relay['measureme_id'][measureme_id]['stream_id']:
  154. for cell_id in cell_ids:
  155. try:
  156. cell_index = this_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]['cell_id'].index(cell_id)
  157. except ValueError:
  158. continue
  159. #
  160. this_hop[direction]['received']['timestamp'].append(
  161. this_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]['timestamp'][cell_index])
  162. this_hop[direction]['received']['length'].append(
  163. this_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]['length'][cell_index])
  164. #
  165. #
  166. #
  167. def link_relay_cells(relay_1, relay_2):
  168. common_measureme_ids = list(set(relay_1['measureme_id'].keys()) & set(relay_2['measureme_id'].keys()))
  169. results = {}
  170. for measureme_id in common_measureme_ids:
  171. for direction in ('forward', 'backward'):
  172. if direction == 'forward':
  173. relay_1_cells = relay_1['measureme_id'][measureme_id]['circuit']['event']['send_relay_cell'][direction]
  174. relay_2_cells = relay_2['measureme_id'][measureme_id]['circuit']['event']['recv_relay_cell'][direction]
  175. else:
  176. relay_1_cells = relay_1['measureme_id'][measureme_id]['circuit']['event']['recv_relay_cell'][direction]
  177. relay_2_cells = relay_2['measureme_id'][measureme_id]['circuit']['event']['send_relay_cell'][direction]
  178. #
  179. #print(relay_1['fingerprint'])
  180. #print(len(relay_1_cells['fingerprint']))
  181. #print(relay_2['fingerprint'])
  182. #print(len(relay_2_cells['fingerprint']))
  183. if len(relay_1_cells['fingerprint']) == 0 or len(relay_2_cells['fingerprint']) == 0:
  184. continue
  185. #
  186. assert all([x==relay_1_cells['fingerprint'][0] for x in relay_1_cells['fingerprint']])
  187. assert all([x==relay_2_cells['fingerprint'][0] for x in relay_2_cells['fingerprint']])
  188. # make sure all the fingerprints are the same for each relay cell
  189. #
  190. #if relay_1_cells['fingerprint'][0] != relay_2['fingerprint'] or relay_2_cells['fingerprint'][0] != relay_1['fingerprint']:
  191. if relay_1_cells['fingerprint'][0] != relay_2['fingerprint'][:8]:
  192. #print(relay_1_cells['fingerprint'][0])
  193. #print(relay_2['fingerprint'])
  194. continue
  195. #
  196. lookahead = 10
  197. relay_1_start = None
  198. relay_2_start = None
  199. #
  200. for (x,y) in itertools.product(range(lookahead),range(lookahead)):
  201. #for (x,y) in zip(range(lookahead),range(lookahead)):
  202. if relay_1_cells['payload'][x] == relay_2_cells['payload'][y]:
  203. relay_1_start = x
  204. relay_2_start = y
  205. break
  206. #
  207. #
  208. assert relay_1_start is not None and relay_2_start is not None
  209. #
  210. assert len(relay_1_cells['cell_id'][relay_1_start:]) == len(relay_2_cells['cell_id'][relay_2_start:]), \
  211. '{} /= {} for {} to {}'.format(len(relay_1_cells['cell_id'][relay_1_start:]), len(relay_2_cells['cell_id'][relay_2_start:]), relay_1['fingerprint'], relay_2['fingerprint'])
  212. # print('{} /= {} for {} to {}'.format(len(relay_1_cells['cell_id'][relay_1_start:]), len(relay_2_cells['cell_id'][relay_2_start:]), relay_1['fingerprint'], relay_2['fingerprint']))
  213. #
  214. #results[measureme_id][direction] = list(zip(relay_1_cells['cell_id'], relay_2_cells['cell_id']))
  215. if measureme_id not in results:
  216. results[measureme_id] = {}
  217. #
  218. results[measureme_id][direction] = {x[0]:x[1] for x in zip(relay_1_cells['cell_id'][relay_1_start:], relay_2_cells['cell_id'][relay_2_start:])}
  219. #
  220. #
  221. return results
  222. #
  223. def reverse_links(links):
  224. results = {}
  225. for measureme_id in links:
  226. results[measureme_id] = {}
  227. for direction in links[measureme_id]:
  228. results[measureme_id][direction] = {links[measureme_id][direction][x]: x for x in links[measureme_id][direction]}
  229. #
  230. #
  231. return results
  232. #
  233. def get_info_for_cell_ids(where, cell_ids, keys):
  234. output = {}
  235. for key in keys:
  236. output[key] = []
  237. #
  238. for cell_id in cell_ids:
  239. cell_index = where['cell_id'].index(cell_id)
  240. for key in keys:
  241. output[key].append(where[key][cell_index])
  242. #
  243. #
  244. return output
  245. #
  246. def get_streams_from_logs(logs):
  247. proxies = [log for log in logs if log['fingerprint'] is None]
  248. relays = [log for log in logs if log['fingerprint'] is not None]
  249. #relays_by_fingerprint = {log['fingerprint']: log for log in logs if log['fingerprint'] is not None}
  250. #
  251. measureme_ids = [proxy['measureme_id'].keys() for proxy in proxies]
  252. for x in range(len(proxies)):
  253. for y in range(x+1, len(proxies)):
  254. # make sure that two different proxies do not use the same measureme_id
  255. overlapping_ids = set(measureme_ids[x]) & set(measureme_ids[y])
  256. if len(overlapping_ids) > 0:
  257. raise Exception('Two proxies ({} and {}) have circuits with the same measureme_id ({}).'.format(x, y, overlapping_ids))
  258. #
  259. #
  260. #
  261. linked_relay_cells = {}
  262. #
  263. for proxy in proxies:
  264. for relay in relays:
  265. #
  266. links = link_relay_cells(proxy, relay)
  267. linked_relay_cells[(logs.index(proxy), logs.index(relay))] = links
  268. #linked_relay_cells[(logs.index(relay), logs.index(proxy))] = reverse_links(links)
  269. #
  270. #
  271. print('Done proxies')
  272. for x in range(len(relays)):
  273. for y in range(x+1, len(relays)):
  274. relay_x = relays[x]
  275. relay_y = relays[y]
  276. #
  277. links = link_relay_cells(relay_x, relay_y)
  278. #linked_relay_cells[(logs.index(relay_x), logs.index(relay_y))] = links
  279. #linked_relay_cells[(logs.index(relay_y), logs.index(relay_x))] = reverse_links(links)
  280. linked_relay_cells[(logs.index(relay_x), logs.index(relay_y))] = link_relay_cells(relay_x, relay_y)
  281. linked_relay_cells[(logs.index(relay_y), logs.index(relay_x))] = link_relay_cells(relay_y, relay_x)
  282. #
  283. print('x: {}'.format(x))
  284. #
  285. print('Started')
  286. streams = {}
  287. streams_completed = 0
  288. for proxy in proxies:
  289. for measureme_id in proxy['measureme_id']:
  290. streams[measureme_id] = {}
  291. for stream_id in proxy['measureme_id'][measureme_id]['stream_id']:
  292. streams[measureme_id][stream_id] = {}
  293. for direction in ('forward', 'backward'):
  294. streams[measureme_id][stream_id][direction] = []
  295. if direction == 'forward':
  296. where = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]
  297. #cell_ids = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data']['forward']['cell_id']
  298. #lengths = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data']['forward']['length']
  299. else:
  300. where = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['send_edge_data'][direction]
  301. #cell_ids = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['send_edge_data']['backward']['cell_id']
  302. #lengths = proxy['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['send_edge_data']['backward']['length']
  303. #
  304. lengths = where['length']
  305. cell_ids = where['cell_id']
  306. current_relay = proxy
  307. #
  308. while current_relay is not None:
  309. this_hop = {}
  310. this_hop['fingerprint'] = current_relay['fingerprint']
  311. #
  312. new_cell_ids = []
  313. next_relay = None
  314. #
  315. for x in ('received', 'sent'):
  316. this_hop[x] = {}
  317. this_hop[x]['timestamp'] = []
  318. this_hop[x]['length'] = []
  319. #
  320. if current_relay == proxy:
  321. if direction == 'forward':
  322. where = current_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]
  323. this_hop['received']['timestamp'] = where['timestamp']
  324. this_hop['received']['length'] = lengths
  325. else:
  326. where = current_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['send_edge_data'][direction]
  327. this_hop['sent']['timestamp'] = where['timestamp']
  328. this_hop['sent']['length'] = lengths
  329. #
  330. else:
  331. if direction == 'forward':
  332. where = current_relay['measureme_id'][measureme_id]['circuit']['event']['recv_relay_cell'][direction]
  333. info = get_info_for_cell_ids(where, cell_ids, ['timestamp'])
  334. this_hop['received']['timestamp'] = info['timestamp']
  335. this_hop['received']['length'] = lengths
  336. else:
  337. where = current_relay['measureme_id'][measureme_id]['circuit']['event']['send_relay_cell'][direction]
  338. info = get_info_for_cell_ids(where, cell_ids, ['timestamp'])
  339. this_hop['sent']['timestamp'] = info['timestamp']
  340. this_hop['sent']['length'] = lengths
  341. #
  342. #
  343. for x in linked_relay_cells:
  344. if x[0] == logs.index(current_relay) and measureme_id in linked_relay_cells[x] and \
  345. direction in linked_relay_cells[x][measureme_id]:
  346. # if the current relay has a linked relay, and they both have the current measureme_id
  347. for cell_id in cell_ids:
  348. paired_cell_id = linked_relay_cells[x][measureme_id][direction][cell_id]
  349. new_cell_ids.append(paired_cell_id)
  350. #
  351. next_relay = logs[x[1]]
  352. break
  353. #
  354. #
  355. if next_relay is None:
  356. # exiting
  357. if direction == 'forward':
  358. where = current_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['send_edge_data'][direction]
  359. this_hop['sent']['timestamp'] = where['timestamp']
  360. this_hop['sent']['length'] = lengths
  361. else:
  362. where = current_relay['measureme_id'][measureme_id]['stream_id'][stream_id]['event']['recv_edge_data'][direction]
  363. this_hop['received']['timestamp'] = where['timestamp']
  364. this_hop['received']['length'] = lengths
  365. #
  366. else:
  367. # passing to next hop
  368. assert(len(cell_ids) == len(new_cell_ids))
  369. if direction == 'forward':
  370. where = current_relay['measureme_id'][measureme_id]['circuit']['event']['send_relay_cell'][direction]
  371. info = get_info_for_cell_ids(where, cell_ids, ['timestamp'])
  372. this_hop['sent']['timestamp'] = info['timestamp']
  373. this_hop['sent']['length'] = lengths
  374. else:
  375. where = current_relay['measureme_id'][measureme_id]['circuit']['event']['recv_relay_cell'][direction]
  376. info = get_info_for_cell_ids(where, cell_ids, ['timestamp'])
  377. this_hop['received']['timestamp'] = info['timestamp']
  378. this_hop['received']['length'] = lengths
  379. #
  380. #
  381. current_relay = next_relay
  382. cell_ids = new_cell_ids
  383. streams[measureme_id][stream_id][direction].append(this_hop)
  384. #
  385. #
  386. streams_completed += 1
  387. print('Completed: {}'.format(streams_completed))
  388. #
  389. #
  390. #
  391. return streams
  392. #
  393. if __name__ == '__main__':
  394. import sys
  395. import os
  396. import pickle
  397. import gzip
  398. #
  399. logs = []
  400. #
  401. save_to = 'measureme-data.pickle.gz'
  402. if os.path.isfile(save_to):
  403. okay_to_overwrite = input('Output file \'{}\' already exists. Would you like to overwrite it? [y/n]: '.format(save_to)).strip()
  404. okay_to_overwrite = (okay_to_overwrite.lower() == 'y')
  405. if not okay_to_overwrite:
  406. print('Exiting')
  407. exit()
  408. #
  409. #
  410. for arg in sys.argv[1:]:
  411. with open(arg, 'r') as f:
  412. logs.append(read_log(f))
  413. #
  414. #
  415. streams = get_streams_from_logs(logs)
  416. #
  417. with gzip.GzipFile(save_to, 'wb') as f:
  418. pickle.dump(streams, f, protocol=4)
  419. #
  420. #