mnettortools.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. #!/usr/bin/env python3
  2. import argparse
  3. import glob
  4. import os
  5. import random
  6. import shutil
  7. from yaml import load, dump
  8. try:
  9. from yaml import CLoader as Loader, CDumper as Dumper
  10. except ImportError:
  11. from yaml import Loader, Dumper
  12. SECONDS_IN_HOUR = 60.0 * 60.0
  13. # modified from tornettools/generate_tgen.py
  14. def generate_onion_service_keys(tor_cmd, n):
  15. with tempfile.TemporaryDirectory(prefix='tornettools-hs-keygen-') as dir_name:
  16. config = {'DisableNetwork': '1', 'DataDirectory': dir_name, 'ControlPort': '9030'}
  17. tor_process = stem.process.launch_tor_with_config(config,
  18. tor_cmd=tor_cmd,
  19. init_msg_handler=logging.debug,
  20. take_ownership=True,
  21. completion_percent=0)
  22. controller = stem.connection.connect(control_port=('127.0.0.1', 9030))
  23. keys = []
  24. for x in range(n):
  25. hs = controller.create_ephemeral_hidden_service(80)
  26. assert hs.private_key_type == 'ED25519-V3'
  27. keys.append((hs.private_key, hs.service_id + '.onion'))
  28. controller.close()
  29. # must make sure process ends before the temporary directory is removed,
  30. # otherwise there's a race condition
  31. tor_process.kill()
  32. tor_process.wait()
  33. class Conversation:
  34. def __init__(self, size, users, waits):
  35. self.size = size
  36. self.users = users
  37. self.waits = waits
  38. def merge(self, other):
  39. self.users.extend(other.users)
  40. self.waits.extend(other.waits)
  41. return
  42. def merge_slice(conversations):
  43. first = conversations.pop()
  44. return [first.merge(o) for o in conversations]
  45. def add_silent_members(self, users):
  46. self.users.extend(users)
  47. self.waits.extend([SECONDS_IN_HOUR] * len(users))
  48. class User:
  49. def __init__(self, name, dists_path, client, tor_process, onion_port):
  50. self.name = name
  51. self.dists_path = dists_path
  52. self.client = client
  53. self.tor_process = tor_process
  54. self.onion_port = onion_port
  55. self.conversations = []
  56. def socks_port(self):
  57. # default tor socks port is 9050, default tor control port is 9051
  58. # each additional process needs both of those, so socks port goes up by 2
  59. return 9050 + self.tor_process * 2
  60. def control_port(self):
  61. return self.socks_port() + 1
  62. def save(self, config):
  63. assert(config['hosts'] is not None)
  64. client_path = '~/.cargo/bin/mgen-peer'
  65. mgen_config_path = self.client + self.name + '.yaml'
  66. onion_service_path = self.client + self.name + '.tor'
  67. host_name = self.client.split('/')[-2]
  68. print("saving: ", self.name, flush=True)
  69. host = config['hosts'][host_name]
  70. process = next((p for p in host['processes'] if p['path'] == client_path), None)
  71. tors = [p for p in host['processes'] if p['path'] == '~/.local/bin/tor']
  72. torrc = '{}.torrc'.format(self.tor_process)
  73. tor_datadir = "tor-{}".format(self.tor_process)
  74. torrc_path = self.client + torrc
  75. tor_start = tors[0]['start_time']
  76. if process is None:
  77. if len(tors) == 0:
  78. print('Error: No tor process for client {} in shadow config.'.format(self.client))
  79. exit(1)
  80. proc = {
  81. 'path': client_path,
  82. 'args': '../hosts user*.yaml',
  83. 'start_time': tor_start + 60,
  84. 'expected_final_state': 'running'
  85. }
  86. host['processes'].append(proc)
  87. if self.tor_process != 0 and not any('-f {}'.format(torrc) in tor['args'] for tor in tors):
  88. # we haven't setup this tor client yet, handle that first
  89. tor_proc = {
  90. 'path': tors[0]['path'],
  91. 'args': '--defaults-torrc torrc-defaults -f {} --DataDirectory ./{}'.format(torrc, tor_datadir),
  92. 'start_time': tor_start,
  93. 'expected_final_state': 'running',
  94. 'environment': {'OPENBLAS_NUM_THREADS': '1'}
  95. }
  96. host['processes'].append(tor_proc)
  97. torrc_contents = "SocksPort {}\n".format(self.socks_port())
  98. torrc_contents += "ControlPort {}\n".format(self.control_port())
  99. with open(torrc_path, 'w') as f:
  100. f.write(torrc_contents)
  101. os.mkdir(self.client + tor_datadir)
  102. with open(torrc_path, 'a') as f:
  103. torrc_contents = "HiddenServiceDir {}\n".format(onion_service_path)
  104. torrc_contents += "HiddenServicePort {} 127.0.0.1:{}\n".format(self.onion_port, self.onion_port)
  105. f.write(torrc_contents)
  106. os.makedirs(onion_service_path)
  107. yaml_str = 'user: "{}"\n'.format(self.name)
  108. yaml_str += 'socks: "127.0.0.1:{}"\n'.format(self.socks_port())
  109. yaml_str += 'listen: "127.0.0.1:{}"\n'.format(self.onion_port)
  110. # defaults
  111. yaml_str += 'bootstrap: 5.0\n'
  112. yaml_str += 'retry: 5.0\n'
  113. yaml_str += 'distributions:\n'
  114. with open(self.dists_path + '/S.dat') as f:
  115. s = f.read().strip()
  116. yaml_str += ' s: {}\n'.format(s)
  117. with open(self.dists_path + '/R.dat') as f:
  118. r = f.read().strip()
  119. yaml_str += ' r: {}\n'.format(r)
  120. weighted_format = ' {}: {{ distribution: "Weighted", weights_file: "' + self.dists_path + '/{}.dat" }}\n'
  121. yaml_str += weighted_format.format('m', 'sizes')
  122. yaml_str += weighted_format.format('i', 'I')
  123. yaml_str += weighted_format.format('w', 'W')
  124. yaml_str += weighted_format.format('a_s', 'As')
  125. yaml_str += weighted_format.format('a_r', 'Ar')
  126. yaml_str += 'conversations:\n'
  127. for group in self.conversations:
  128. yaml_str += ' - group: "{}"\n'.format(group[0].name)
  129. yaml_str += ' bootstrap: {}\n'.format(group[1])
  130. yaml_str += ' recipients: {}\n'.format([user.name for user in group[0].users])
  131. with open(mgen_config_path, 'w') as f:
  132. f.write(yaml_str)
  133. def normalize_weights(weights):
  134. """ Normalize weights so they sum to 1 """
  135. tot = sum(weights)
  136. return [w/tot for w in weights]
  137. def read_dist_file(path):
  138. with open(path) as f:
  139. (weights, vals) = f.readlines()
  140. vals = list(map(int, vals.split(',')))
  141. weights = normalize_weights(list(map(float, weights.split(','))))
  142. return vals, weights
  143. def read_dist_file_float(path):
  144. with open(path) as f:
  145. (weights, vals) = f.readlines()
  146. vals = list(map(float, vals.split(',')))
  147. weights = normalize_weights(list(map(float, weights.split(','))))
  148. return vals, weights
  149. def main():
  150. parser = argparse.ArgumentParser(
  151. description="Generate messenger clients for use with mgen and shadow.")
  152. parser.add_argument('--dyadic', type=str, help='File containging the weighted distribution of the number of dyadic (1-on-1) conversations a user may have.', required=True)
  153. parser.add_argument('--group', type=str, help='File containging the weighted distribution of the number of group conversations a user may have.', required=True)
  154. parser.add_argument('--participants', type=str, help='File containing the weighted distribution of the number of participants in a group conversation.', required=True)
  155. parser.add_argument('--config', type=str, help='The original shadow.config.yaml file; a modified copy will be placed in the same directory as mnet.shadow.config.yaml', required=True)
  156. parser.add_argument('--clients', type=str, help='Glob specifying the paths to shadow host template directories where users will be assigned uniformly at random.', required=True)
  157. parser.add_argument('--empirical', type=str, help='Path of directory containing the directories for each empirical user distribution data.', required=True)
  158. parser.add_argument('--users', type=int, help='Number of concurrent simulated users to generate.', required=True)
  159. parser.add_argument('--tors', type=int, default=0, help='Number of additional tor processes to run (if 0 or unset, clients use the original tor process, else clients only use new processes).')
  160. parser.add_argument('--seed', type=int, help='RNG seed, if deterministic config generation is desired.')
  161. args = parser.parse_args()
  162. random.seed(args.seed, version=2)
  163. print("loading config...", flush=True)
  164. with open(args.config) as f:
  165. config = load(f, Loader=Loader)
  166. assert(config['hosts'] is not None)
  167. dyadic_dist_vals, dyadic_dist_weights = read_dist_file(args.dyadic)
  168. group_dist_vals, group_dist_weights = read_dist_file(args.group)
  169. participants_dist_vals, participants_dist_weights = read_dist_file(args.participants)
  170. client_paths = [[65535, g] for g in glob.glob(args.clients)]
  171. empirical_users = [args.empirical + '/' + f for f in os.listdir(args.empirical)]
  172. print("caching idle distributions...", flush=True)
  173. idles = { path: read_dist_file_float(path + '/I.dat') for path in empirical_users }
  174. conversations = {2: []}
  175. users = set()
  176. print("sampling users...", flush=True)
  177. for i in range(args.users):
  178. user = sample_user(i, empirical_users, client_paths, args.tors)
  179. num_dyadic = sample_dyadic_conversation_count(dyadic_dist_vals, dyadic_dist_weights)
  180. num_group_conversations = sample_group_conversation_count(group_dist_vals, group_dist_weights)
  181. idle_dist_vals, idle_dist_weights = idles[user.dists_path]
  182. initial_waits = sample_initial_idle(idle_dist_vals, idle_dist_weights, num_dyadic + num_group_conversations)
  183. conversations[2].extend([Conversation(2, [user], [initial_waits.pop()]) for _ in range(num_dyadic)])
  184. for c in range(num_group_conversations):
  185. num_participants = sample_participant_count(participants_dist_vals, participants_dist_weights)
  186. if num_participants not in conversations:
  187. conversations[num_participants] = []
  188. conversations[num_participants].append(Conversation(num_participants, [user], [initial_waits.pop()]))
  189. users.add(user)
  190. group_count = 0
  191. for size in sorted(conversations):
  192. print("creating groups of size {}...".format(size), flush=True)
  193. remaining = conversations[size]
  194. grouped = []
  195. group = Conversation(size, [], [])
  196. while len(remaining) > 0:
  197. if len(group.users) == size:
  198. grouped.append(group)
  199. group = Conversation(size, [], [])
  200. for i in reversed(range(len(remaining))):
  201. if remaining[i].users[0] not in group.users:
  202. group.merge(remaining.pop(i))
  203. break
  204. else:
  205. # no remaining users not already in the group, we have to move on
  206. # (n.b. this is a python for/else, not an if/else)
  207. grouped.append(group)
  208. group = Conversation(size, [], [])
  209. break
  210. for group in grouped:
  211. group.name = "group" + str(group_count)
  212. if group.size == len(group.users):
  213. create_group(group)
  214. else:
  215. # add silent members to pad out group
  216. sample_from = list(users - set(group.users))
  217. sample_count = group.size - len(group.users)
  218. if len(sample_from) < sample_count:
  219. print("Error: trying to sample {} users from {} users not already in the group; try increasing the --users count.".format(
  220. sample_count, len(sample_from)))
  221. exit(1)
  222. silent = random.sample(sample_from, sample_count)
  223. group.add_silent_members(silent)
  224. create_group(group, set(silent))
  225. group_count += 1
  226. hosts_lines = ""
  227. print("saving groups to disk...", flush=True)
  228. for user in users:
  229. user.save(config)
  230. # structured for easy sed replacement with onion address generated later
  231. hosts_lines += "{}:{} {}\n".format(user.name, user.onion_port, user.name)
  232. split_glob = [s for s in args.clients.split('/') if s != '']
  233. shadow_config_path = '/'+'/'.join(split_glob[:-1])
  234. shadow_hosts_file = shadow_config_path + '/hosts'
  235. with open(shadow_hosts_file, 'w') as f:
  236. f.write(hosts_lines)
  237. print("saving config...", flush=True)
  238. new_config = os.path.dirname(args.config) + '/mnet.shadow.config.yaml'
  239. with open(new_config, 'w') as f:
  240. dump(config, f, Dumper=Dumper)
  241. print("done!")
  242. def create_group(group, silent=set()):
  243. if all(n >= SECONDS_IN_HOUR for n in group.waits):
  244. # every group member is going to do nothing, just drop it
  245. return
  246. [group.users[i].conversations.append((group, group.waits[i])) for i in range(len(group.users)) if group.users[i] not in silent]
  247. [user.conversations.append((group, SECONDS_IN_HOUR)) for user in silent]
  248. def sample_user(id_number, empirical_users, client_paths, tor_processes):
  249. name = "user{}".format(id_number)
  250. dists_path = random.choice(empirical_users)
  251. client = random.choice(client_paths)
  252. client[0] -= 1
  253. tor_process = (id_number % tor_processes) + 1 if tor_processes > 0 else 0
  254. return User(name, dists_path, client[1], tor_process, client[0])
  255. def sample_participant_count(participants_dist_vals, participants_dist_weights):
  256. return random.choices(participants_dist_vals, weights=participants_dist_weights)[0]
  257. def sample_dyadic_conversation_count(dyadic_dist_vals, dyadic_dist_weights):
  258. return random.choices(dyadic_dist_vals, dyadic_dist_weights)[0]
  259. def sample_group_conversation_count(group_dist_vals, group_dist_weights):
  260. return random.choices(group_dist_vals, group_dist_weights)[0]
  261. # takes I distribution, the function will scale it then return a list of samples
  262. def sample_initial_idle(idle_dist_vals, idle_dist_weights, n_samples):
  263. real_bootstrap = 30
  264. scaled_weights = [real_bootstrap + idle_dist_vals[i] * idle_dist_weights[i] for i in range(len(idle_dist_vals))]
  265. if sum(scaled_weights) == 0.0:
  266. # edge case where user always idled 0 seconds; say they were always idle instead
  267. return [SECONDS_IN_HOUR] * max(1, n_samples)
  268. return random.choices(idle_dist_vals, scaled_weights, k=n_samples)
  269. def get_free_ip(start, taken_ips):
  270. for i in range(start, 256):
  271. ip = "1.1.1.{}".format(i)
  272. if ip not in taken_ips:
  273. return ip
  274. else:
  275. print("Error: no IPs remaining in 1.1.1.0/24, modify source to use a different unused block.")
  276. exit(1)
  277. if __name__ == '__main__':
  278. main()