mnettools.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. #!/usr/bin/env python3
  2. import argparse
  3. import glob
  4. import os
  5. import random
  6. import shutil
  7. from yaml import load, dump
  8. try:
  9. from yaml import CLoader as Loader, CDumper as Dumper
  10. except ImportError:
  11. from yaml import Loader, Dumper
  12. SECONDS_IN_HOUR = 60.0 * 60.0
  13. class Conversation:
  14. def __init__(self, size, users, waits):
  15. self.size = size
  16. self.users = users
  17. self.waits = waits
  18. def merge(self, other):
  19. self.users.extend(other.users)
  20. self.waits.extend(other.waits)
  21. return
  22. def merge_slice(conversations):
  23. first = conversations.pop()
  24. return [first.merge(o) for o in conversations]
  25. def add_silent_members(self, users):
  26. self.users.extend(users)
  27. self.waits.extend([SECONDS_IN_HOUR] * len(users))
  28. class User:
  29. def __init__(self, name, dists_path, client, tor_process):
  30. self.name = name
  31. self.dists_path = dists_path
  32. self.client = client
  33. self.tor_process = tor_process
  34. self.conversations = []
  35. def socks_port(self):
  36. # default tor socks port is 9050, default tor control port is 9051
  37. # each additional process needs both of those, so socks port goes up by 2
  38. return 9050 + self.tor_process * 2
  39. def control_port(self):
  40. return self.socks_port() + 1
  41. def save(self, config):
  42. assert(config['hosts'] is not None)
  43. client_path = '~/.cargo/bin/mgen-client'
  44. mgen_config_path = self.client + self.name + '.yaml'
  45. host_name = self.client.split('/')[-2]
  46. print("saving: ", self.name, flush=True)
  47. host = config['hosts'][host_name]
  48. process = next((p for p in host['processes'] if p['path'] == client_path), None)
  49. tors = [p for p in host['processes'] if p['path'] == '~/.local/bin/tor']
  50. torrc = '{}.torrc'.format(self.tor_process)
  51. tor_datadir = "tor-{}".format(self.tor_process)
  52. tor_start = tors[0]['start_time']
  53. if process is None:
  54. if len(tors) == 0:
  55. print('Error: No tor process for client {} in shadow config.'.format(self.client))
  56. exit(1)
  57. proc = {
  58. 'path': client_path,
  59. 'args': 'user*.yaml',
  60. 'start_time': 1170, #tor_start + 60,
  61. 'expected_final_state': 'running'
  62. }
  63. host['processes'].append(proc)
  64. if self.tor_process != 0 and not any('-f {}'.format(torrc) in tor['args'] for tor in tors):
  65. # we haven't setup this tor client yet, handle that first
  66. tor_proc = {
  67. 'path': tors[0]['path'],
  68. 'args': '--defaults-torrc torrc-defaults -f {} --DataDirectory ./{}'.format(torrc, tor_datadir),
  69. 'start_time': tor_start + self.tor_process,
  70. 'expected_final_state': 'running',
  71. 'environment': {'OPENBLAS_NUM_THREADS': '1'}
  72. }
  73. host['processes'].append(tor_proc)
  74. torrc_path = self.client + torrc
  75. torrc_contents = "SocksPort {}\n".format(self.socks_port())
  76. torrc_contents += "ControlPort {}\n".format(self.control_port())
  77. with open(torrc_path, 'w') as f:
  78. f.write(torrc_contents)
  79. os.mkdir(self.client + tor_datadir)
  80. yaml_str = 'user: "{}"\n'.format(self.name)
  81. # proxy starts commented out for baseline testing,
  82. # a simple sed replacement can enable it
  83. yaml_str += '#socks: "127.0.0.1:{}"\n'.format(self.socks_port())
  84. # defaults
  85. yaml_str += 'message_server: "1.1.1.2:6397"\n'
  86. yaml_str += 'web_server: "1.1.1.3:6398"\n'
  87. yaml_str += 'bootstrap: 5.0\n'
  88. yaml_str += 'retry: 5.0\n'
  89. yaml_str += 'distributions:\n'
  90. with open(self.dists_path + '/S.dat') as f:
  91. s = f.read().strip()
  92. yaml_str += ' s: {}\n'.format(s)
  93. with open(self.dists_path + '/R.dat') as f:
  94. r = f.read().strip()
  95. yaml_str += ' r: {}\n'.format(r)
  96. weighted_format = ' {}: {{ distribution: "Weighted", weights_file: "' + self.dists_path + '/{}.dat" }}\n'
  97. yaml_str += weighted_format.format('m', 'sizes')
  98. yaml_str += weighted_format.format('i', 'I')
  99. yaml_str += weighted_format.format('w', 'W')
  100. yaml_str += weighted_format.format('a_s', 'As')
  101. yaml_str += weighted_format.format('a_r', 'Ar')
  102. yaml_str += 'conversations:\n'
  103. for group in self.conversations:
  104. yaml_str += ' - group: "{}"\n'.format(group[0].name)
  105. yaml_str += ' bootstrap: {}\n'.format(group[1])
  106. yaml_str += ' message_server: "{}:6397"\n'.format(group[0].server_ip)
  107. yaml_str += ' web_server: "{}:6398"\n'.format(group[0].web_ip)
  108. with open(mgen_config_path, 'w') as f:
  109. f.write(yaml_str)
  110. def normalize_weights(weights):
  111. """ Normalize weights so they sum to 1 """
  112. tot = sum(weights)
  113. return [w/tot for w in weights]
  114. def read_dist_file(path):
  115. with open(path) as f:
  116. (weights, vals) = f.readlines()
  117. vals = list(map(int, vals.split(',')))
  118. weights = normalize_weights(list(map(float, weights.split(','))))
  119. return vals, weights
  120. def read_dist_file_float(path):
  121. with open(path) as f:
  122. (weights, vals) = f.readlines()
  123. vals = list(map(float, vals.split(',')))
  124. weights = normalize_weights(list(map(float, weights.split(','))))
  125. return vals, weights
  126. def main():
  127. parser = argparse.ArgumentParser(
  128. description="Generate messenger clients for use with mgen and shadow.")
  129. parser.add_argument('--dyadic', type=str, help='File containging the weighted distribution of the number of dyadic (1-on-1) conversations a user may have.', required=True)
  130. parser.add_argument('--group', type=str, help='File containging the weighted distribution of the number of group conversations a user may have.', required=True)
  131. parser.add_argument('--participants', type=str, help='File containing the weighted distribution of the number of participants in a group conversation.', required=True)
  132. parser.add_argument('--config', type=str, help='The original shadow.config.yaml file; a modified copy will be placed in the same directory as mnet.shadow.config.yaml', required=True)
  133. parser.add_argument('--clients', type=str, help='Glob specifying the paths to shadow host template directories where users will be assigned uniformly at random. This will also determine where the server and web server directories are created.', required=True)
  134. parser.add_argument('--empirical', type=str, help='Path of directory containing the directories for each empirical user distribution data.', required=True)
  135. parser.add_argument('--users', type=int, help='Number of concurrent simulated users to generate.', required=True)
  136. parser.add_argument('--servers', type=int, default=1, help='Number of message and web servers to run (defaults to 1).')
  137. parser.add_argument('--tors', type=int, default=0, help='Number of additional tor processes to run (if 0 or unset, clients use the original tor process, else clients only use new processes).')
  138. parser.add_argument('--seed', type=int, help='RNG seed, if deterministic config generation is desired.')
  139. args = parser.parse_args()
  140. random.seed(args.seed, version=2)
  141. print("loading config...", flush=True)
  142. with open(args.config) as f:
  143. config = load(f, Loader=Loader)
  144. assert(config['hosts'] is not None)
  145. print("adding servers to config...", flush=True)
  146. taken_ips = {host['ip_addr'] for host in config['hosts'].values() if 'ip_addr' in host}
  147. servers = []
  148. webs = []
  149. server_start = 300
  150. server_idle = 900
  151. # bump the bootstrap time to account for our bootstrap as well
  152. config['general']['bootstrap_end_time'] = server_start + server_idle
  153. for server_num in range(args.servers):
  154. server_ip = get_free_ip(2, taken_ips)
  155. servers.append(server_ip)
  156. web_ip = get_free_ip(int(server_ip.split('.')[-1]) + 1, taken_ips)
  157. webs.append(web_ip)
  158. taken_ips |= {server_ip, web_ip}
  159. server_name = 'server{}'.format(server_num)
  160. web_name = 'web{}'.format(server_num)
  161. config['hosts'][server_name] = {
  162. 'network_node_id': 2521, # FIXME: is this sufficiently stable?
  163. 'ip_addr': server_ip,
  164. 'bandwidth_down': '10000000 kilobit',
  165. 'bandwidth_up': '10000000 kilobit',
  166. 'processes': [{
  167. 'path': '~/.cargo/bin/mgen-server',
  168. 'args': ['server.crt', 'server.key', '{}:6397'.format(server_ip), str(server_idle)],
  169. 'start_time': str(server_start),
  170. 'expected_final_state': 'running'
  171. }]
  172. }
  173. config['hosts'][web_name] = {
  174. 'network_node_id': 2521, # FIXME: is this sufficiently stable?
  175. 'ip_addr': web_ip,
  176. 'bandwidth_down': '10000000 kilobit',
  177. 'bandwidth_up': '10000000 kilobit',
  178. 'processes': [{
  179. 'path': '~/.cargo/bin/mgen-web',
  180. 'args': ['server.crt', 'server.key', '{}:6398'.format(web_ip)],
  181. 'start_time': str(server_start),
  182. 'expected_final_state': 'running'
  183. }]
  184. }
  185. dyadic_dist_vals, dyadic_dist_weights = read_dist_file(args.dyadic)
  186. group_dist_vals, group_dist_weights = read_dist_file(args.group)
  187. participants_dist_vals, participants_dist_weights = read_dist_file(args.participants)
  188. client_paths = glob.glob(args.clients)
  189. empirical_users = [args.empirical + '/' + f for f in os.listdir(args.empirical)]
  190. print("caching idle distributions...", flush=True)
  191. idles = { path: read_dist_file_float(path + '/I.dat') for path in empirical_users }
  192. conversations = {2: []}
  193. users = set()
  194. print("sampling users...", flush=True)
  195. for i in range(args.users):
  196. user = sample_user(i, empirical_users, client_paths, args.tors)
  197. num_dyadic = sample_dyadic_conversation_count(dyadic_dist_vals, dyadic_dist_weights)
  198. num_group_conversations = sample_group_conversation_count(group_dist_vals, group_dist_weights)
  199. idle_dist_vals, idle_dist_weights = idles[user.dists_path]
  200. initial_waits = sample_initial_idle(idle_dist_vals, idle_dist_weights, num_dyadic + num_group_conversations)
  201. conversations[2].extend([Conversation(2, [user], [initial_waits.pop()]) for _ in range(num_dyadic)])
  202. for c in range(num_group_conversations):
  203. num_participants = sample_participant_count(participants_dist_vals, participants_dist_weights)
  204. if num_participants not in conversations:
  205. conversations[num_participants] = []
  206. conversations[num_participants].append(Conversation(num_participants, [user], [initial_waits.pop()]))
  207. users.add(user)
  208. group_count = 0
  209. for size in sorted(conversations):
  210. print("creating groups of size {}...".format(size), flush=True)
  211. remaining = conversations[size]
  212. grouped = []
  213. group = Conversation(size, [], [])
  214. while len(remaining) > 0:
  215. if len(group.users) == size:
  216. grouped.append(group)
  217. group = Conversation(size, [], [])
  218. for i in reversed(range(len(remaining))):
  219. if remaining[i].users[0] not in group.users:
  220. group.merge(remaining.pop(i))
  221. break
  222. else:
  223. # no remaining users not already in the group, we have to move on
  224. # (n.b. this is a python for/else, not an if/else)
  225. grouped.append(group)
  226. group = Conversation(size, [], [])
  227. break
  228. for group in grouped:
  229. group.name = "group" + str(group_count)
  230. server_num = random.randint(0, args.servers - 1)
  231. group.server_ip = servers[server_num]
  232. group.web_ip = webs[server_num]
  233. #print("creating group {} (size: {}, active members: {})".format(group.name, group.size, len(group.users)))
  234. if group.size == len(group.users):
  235. create_group(group)
  236. else:
  237. # add silent members to pad out group
  238. sample_from = list(users - set(group.users))
  239. sample_count = group.size - len(group.users)
  240. if len(sample_from) < sample_count:
  241. print("Error: trying to sample {} users from {} users not already in the group; try increasing the --users count.".format(
  242. sample_count, len(sample_from)))
  243. exit(1)
  244. silent = random.sample(sample_from, sample_count)
  245. group.add_silent_members(silent)
  246. create_group(group, set(silent))
  247. group_count += 1
  248. print("saving groups to disk...", flush=True)
  249. for user in users:
  250. user.save(config)
  251. print("saving config...", flush=True)
  252. new_config = os.path.dirname(args.config) + '/mnet.shadow.config.yaml'
  253. with open(new_config, 'w') as f:
  254. dump(config, f, Dumper=Dumper)
  255. print("copying cert and key...", flush=True)
  256. cert = os.path.dirname(__file__) + '/server.crt'
  257. key = os.path.dirname(__file__) + '/server.key'
  258. split_glob = [s for s in args.clients.split('/') if s != '']
  259. shadow_config_path = '/'+'/'.join(split_glob[:-1])
  260. for server_num in range(args.servers):
  261. server_dir = '{}/server{}'.format(shadow_config_path, server_num)
  262. web_dir = '{}/web{}'.format(shadow_config_path, server_num)
  263. os.makedirs(server_dir, exist_ok=True)
  264. os.makedirs(web_dir, exist_ok=True)
  265. shutil.copy(cert, server_dir)
  266. shutil.copy(cert, web_dir)
  267. shutil.copy(key, server_dir)
  268. shutil.copy(key, web_dir)
  269. print("done!")
  270. def create_group(group, silent=set()):
  271. if all(n >= SECONDS_IN_HOUR for n in group.waits):
  272. # every group member is going to do nothing, just drop it
  273. return
  274. [group.users[i].conversations.append((group, group.waits[i])) for i in range(len(group.users)) if group.users[i] not in silent]
  275. [user.conversations.append((group, SECONDS_IN_HOUR)) for user in silent]
  276. def sample_user(id_number, empirical_users, client_paths, tor_processes):
  277. name = "user{}".format(id_number)
  278. dists_path = random.choice(empirical_users)
  279. client = random.choice(client_paths)
  280. tor_process = (id_number % tor_processes) + 1 if tor_processes > 0 else 0
  281. return User(name, dists_path, client, tor_process)
  282. def sample_participant_count(participants_dist_vals, participants_dist_weights):
  283. return random.choices(participants_dist_vals, weights=participants_dist_weights)[0]
  284. def sample_dyadic_conversation_count(dyadic_dist_vals, dyadic_dist_weights):
  285. return random.choices(dyadic_dist_vals, dyadic_dist_weights)[0]
  286. def sample_group_conversation_count(group_dist_vals, group_dist_weights):
  287. return random.choices(group_dist_vals, group_dist_weights)[0]
  288. # takes I distribution, the function will scale it then return a list of samples
  289. def sample_initial_idle(idle_dist_vals, idle_dist_weights, n_samples):
  290. real_bootstrap = 30
  291. scaled_weights = [real_bootstrap + idle_dist_vals[i] * idle_dist_weights[i] for i in range(len(idle_dist_vals))]
  292. if sum(scaled_weights) == 0.0:
  293. # edge case where user always idled 0 seconds; say they were always idle instead
  294. return [SECONDS_IN_HOUR] * max(1, n_samples)
  295. return random.choices(idle_dist_vals, scaled_weights, k=n_samples)
  296. def get_free_ip(start, taken_ips):
  297. for i in range(start, 256):
  298. ip = "1.1.1.{}".format(i)
  299. if ip not in taken_ips:
  300. return ip
  301. else:
  302. print("Error: no IPs remaining in 1.1.1.0/24, modify source to use a different unused block.")
  303. exit(1)
  304. if __name__ == '__main__':
  305. main()