123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324 |
- #!/usr/bin/env python3
- import argparse
- import glob
- import os
- import random
- import shutil
- from yaml import load, dump
- try:
- from yaml import CLoader as Loader, CDumper as Dumper
- except ImportError:
- from yaml import Loader, Dumper
- SECONDS_IN_HOUR = 60.0 * 60.0
- # modified from tornettools/generate_tgen.py
- def generate_onion_service_keys(tor_cmd, n):
- with tempfile.TemporaryDirectory(prefix='tornettools-hs-keygen-') as dir_name:
- config = {'DisableNetwork': '1', 'DataDirectory': dir_name, 'ControlPort': '9030'}
- tor_process = stem.process.launch_tor_with_config(config,
- tor_cmd=tor_cmd,
- init_msg_handler=logging.debug,
- take_ownership=True,
- completion_percent=0)
- controller = stem.connection.connect(control_port=('127.0.0.1', 9030))
- keys = []
- for x in range(n):
- hs = controller.create_ephemeral_hidden_service(80)
- assert hs.private_key_type == 'ED25519-V3'
- keys.append((hs.private_key, hs.service_id + '.onion'))
- controller.close()
- # must make sure process ends before the temporary directory is removed,
- # otherwise there's a race condition
- tor_process.kill()
- tor_process.wait()
- class Conversation:
- def __init__(self, size, users, waits):
- self.size = size
- self.users = users
- self.waits = waits
- def merge(self, other):
- self.users.extend(other.users)
- self.waits.extend(other.waits)
- return
- def merge_slice(conversations):
- first = conversations.pop()
- return [first.merge(o) for o in conversations]
- def add_silent_members(self, users):
- self.users.extend(users)
- self.waits.extend([SECONDS_IN_HOUR] * len(users))
- class User:
- def __init__(self, name, dists_path, client, tor_process, onion_port):
- self.name = name
- self.dists_path = dists_path
- self.client = client
- self.tor_process = tor_process
- self.onion_port = onion_port
- self.conversations = []
- def socks_port(self):
- # default tor socks port is 9050, default tor control port is 9051
- # each additional process needs both of those, so socks port goes up by 2
- return 9050 + self.tor_process * 2
- def control_port(self):
- return self.socks_port() + 1
- def save(self, config):
- assert(config['hosts'] is not None)
- client_path = '~/.cargo/bin/mgen-peer'
- mgen_config_path = self.client + self.name + '.yaml'
- onion_service_path = self.client + self.name + '.tor'
- host_name = self.client.split('/')[-2]
- print("saving: ", self.name, flush=True)
- host = config['hosts'][host_name]
- process = next((p for p in host['processes'] if p['path'] == client_path), None)
- tors = [p for p in host['processes'] if p['path'] == '~/.local/bin/tor']
- torrc = '{}.torrc'.format(self.tor_process)
- tor_datadir = "tor-{}".format(self.tor_process)
- torrc_path = self.client + torrc
- tor_start = tors[0]['start_time']
- if process is None:
- if len(tors) == 0:
- print('Error: No tor process for client {} in shadow config.'.format(self.client))
- exit(1)
- proc = {
- 'path': client_path,
- 'args': '../hosts user*.yaml',
- 'start_time': tor_start + 60,
- 'expected_final_state': 'running'
- }
- host['processes'].append(proc)
- if self.tor_process != 0 and not any('-f {}'.format(torrc) in tor['args'] for tor in tors):
- # we haven't setup this tor client yet, handle that first
- tor_proc = {
- 'path': tors[0]['path'],
- 'args': '--defaults-torrc torrc-defaults -f {} --DataDirectory ./{}'.format(torrc, tor_datadir),
- 'start_time': tor_start,
- 'expected_final_state': 'running',
- 'environment': {'OPENBLAS_NUM_THREADS': '1'}
- }
- host['processes'].append(tor_proc)
- torrc_contents = "SocksPort {}\n".format(self.socks_port())
- torrc_contents += "ControlPort {}\n".format(self.control_port())
- with open(torrc_path, 'w') as f:
- f.write(torrc_contents)
- os.mkdir(self.client + tor_datadir)
- with open(torrc_path, 'a') as f:
- torrc_contents = "HiddenServiceDir {}\n".format(onion_service_path)
- torrc_contents += "HiddenServicePort {} 127.0.0.1:{}\n".format(self.onion_port, self.onion_port)
- f.write(torrc_contents)
- os.makedirs(onion_service_path)
- yaml_str = 'user: "{}"\n'.format(self.name)
- yaml_str += 'socks: "127.0.0.1:{}"\n'.format(self.socks_port())
- yaml_str += 'listen: "127.0.0.1:{}"\n'.format(self.onion_port)
- # defaults
- yaml_str += 'bootstrap: 5.0\n'
- yaml_str += 'retry: 5.0\n'
- yaml_str += 'distributions:\n'
- with open(self.dists_path + '/S.dat') as f:
- s = f.read().strip()
- yaml_str += ' s: {}\n'.format(s)
- with open(self.dists_path + '/R.dat') as f:
- r = f.read().strip()
- yaml_str += ' r: {}\n'.format(r)
- weighted_format = ' {}: {{ distribution: "Weighted", weights_file: "' + self.dists_path + '/{}.dat" }}\n'
- yaml_str += weighted_format.format('m', 'sizes')
- yaml_str += weighted_format.format('i', 'I')
- yaml_str += weighted_format.format('w', 'W')
- yaml_str += weighted_format.format('a_s', 'As')
- yaml_str += weighted_format.format('a_r', 'Ar')
- yaml_str += 'conversations:\n'
- for group in self.conversations:
- yaml_str += ' - group: "{}"\n'.format(group[0].name)
- yaml_str += ' bootstrap: {}\n'.format(group[1])
- yaml_str += ' recipients: {}\n'.format([user.name for user in group[0].users])
- with open(mgen_config_path, 'w') as f:
- f.write(yaml_str)
- def normalize_weights(weights):
- """ Normalize weights so they sum to 1 """
- tot = sum(weights)
- return [w/tot for w in weights]
- def read_dist_file(path):
- with open(path) as f:
- (weights, vals) = f.readlines()
- vals = list(map(int, vals.split(',')))
- weights = normalize_weights(list(map(float, weights.split(','))))
- return vals, weights
- def read_dist_file_float(path):
- with open(path) as f:
- (weights, vals) = f.readlines()
- vals = list(map(float, vals.split(',')))
- weights = normalize_weights(list(map(float, weights.split(','))))
- return vals, weights
- def main():
- parser = argparse.ArgumentParser(
- description="Generate messenger clients for use with mgen and shadow.")
- parser.add_argument('--dyadic', type=str, help='File containging the weighted distribution of the number of dyadic (1-on-1) conversations a user may have.', required=True)
- parser.add_argument('--group', type=str, help='File containging the weighted distribution of the number of group conversations a user may have.', required=True)
- parser.add_argument('--participants', type=str, help='File containing the weighted distribution of the number of participants in a group conversation.', required=True)
- parser.add_argument('--config', type=str, help='The original shadow.config.yaml file; a modified copy will be placed in the same directory as mnet.shadow.config.yaml', required=True)
- parser.add_argument('--clients', type=str, help='Glob specifying the paths to shadow host template directories where users will be assigned uniformly at random.', required=True)
- parser.add_argument('--empirical', type=str, help='Path of directory containing the directories for each empirical user distribution data.', required=True)
- parser.add_argument('--users', type=int, help='Number of concurrent simulated users to generate.', required=True)
- parser.add_argument('--tors', type=int, default=0, help='Number of additional tor processes to run (if 0 or unset, clients use the original tor process, else clients only use new processes).')
- parser.add_argument('--seed', type=int, help='RNG seed, if deterministic config generation is desired.')
- args = parser.parse_args()
- random.seed(args.seed, version=2)
- print("loading config...", flush=True)
- with open(args.config) as f:
- config = load(f, Loader=Loader)
- assert(config['hosts'] is not None)
- dyadic_dist_vals, dyadic_dist_weights = read_dist_file(args.dyadic)
- group_dist_vals, group_dist_weights = read_dist_file(args.group)
- participants_dist_vals, participants_dist_weights = read_dist_file(args.participants)
- client_paths = [[65535, g] for g in glob.glob(args.clients)]
- empirical_users = [args.empirical + '/' + f for f in os.listdir(args.empirical)]
- print("caching idle distributions...", flush=True)
- idles = { path: read_dist_file_float(path + '/I.dat') for path in empirical_users }
- conversations = {2: []}
- users = set()
- print("sampling users...", flush=True)
- for i in range(args.users):
- user = sample_user(i, empirical_users, client_paths, args.tors)
- num_dyadic = sample_dyadic_conversation_count(dyadic_dist_vals, dyadic_dist_weights)
- num_group_conversations = sample_group_conversation_count(group_dist_vals, group_dist_weights)
- idle_dist_vals, idle_dist_weights = idles[user.dists_path]
- initial_waits = sample_initial_idle(idle_dist_vals, idle_dist_weights, num_dyadic + num_group_conversations)
- conversations[2].extend([Conversation(2, [user], [initial_waits.pop()]) for _ in range(num_dyadic)])
- for c in range(num_group_conversations):
- num_participants = sample_participant_count(participants_dist_vals, participants_dist_weights)
- if num_participants not in conversations:
- conversations[num_participants] = []
- conversations[num_participants].append(Conversation(num_participants, [user], [initial_waits.pop()]))
- users.add(user)
- group_count = 0
- for size in sorted(conversations):
- print("creating groups of size {}...".format(size), flush=True)
- remaining = conversations[size]
- grouped = []
- group = Conversation(size, [], [])
- while len(remaining) > 0:
- if len(group.users) == size:
- grouped.append(group)
- group = Conversation(size, [], [])
- for i in reversed(range(len(remaining))):
- if remaining[i].users[0] not in group.users:
- group.merge(remaining.pop(i))
- break
- else:
- # no remaining users not already in the group, we have to move on
- # (n.b. this is a python for/else, not an if/else)
- grouped.append(group)
- group = Conversation(size, [], [])
- break
- for group in grouped:
- group.name = "group" + str(group_count)
- if group.size == len(group.users):
- create_group(group)
- else:
- # add silent members to pad out group
- sample_from = list(users - set(group.users))
- sample_count = group.size - len(group.users)
- if len(sample_from) < sample_count:
- print("Error: trying to sample {} users from {} users not already in the group; try increasing the --users count.".format(
- sample_count, len(sample_from)))
- exit(1)
- silent = random.sample(sample_from, sample_count)
- group.add_silent_members(silent)
- create_group(group, set(silent))
- group_count += 1
- hosts_lines = ""
- print("saving groups to disk...", flush=True)
- for user in users:
- user.save(config)
- # structured for easy sed replacement with onion address generated later
- hosts_lines += "{}:{} {}\n".format(user.name, user.onion_port, user.name)
- split_glob = [s for s in args.clients.split('/') if s != '']
- shadow_config_path = '/'+'/'.join(split_glob[:-1])
- shadow_hosts_file = shadow_config_path + '/hosts'
- with open(shadow_hosts_file, 'w') as f:
- f.write(hosts_lines)
- print("saving config...", flush=True)
- new_config = os.path.dirname(args.config) + '/mnet.shadow.config.yaml'
- with open(new_config, 'w') as f:
- dump(config, f, Dumper=Dumper)
- print("done!")
- def create_group(group, silent=set()):
- if all(n >= SECONDS_IN_HOUR for n in group.waits):
- # every group member is going to do nothing, just drop it
- return
- [group.users[i].conversations.append((group, group.waits[i])) for i in range(len(group.users)) if group.users[i] not in silent]
- [user.conversations.append((group, SECONDS_IN_HOUR)) for user in silent]
- def sample_user(id_number, empirical_users, client_paths, tor_processes):
- name = "user{}".format(id_number)
- dists_path = random.choice(empirical_users)
- client = random.choice(client_paths)
- client[0] -= 1
- tor_process = (id_number % tor_processes) + 1 if tor_processes > 0 else 0
- return User(name, dists_path, client[1], tor_process, client[0])
- def sample_participant_count(participants_dist_vals, participants_dist_weights):
- return random.choices(participants_dist_vals, weights=participants_dist_weights)[0]
- def sample_dyadic_conversation_count(dyadic_dist_vals, dyadic_dist_weights):
- return random.choices(dyadic_dist_vals, dyadic_dist_weights)[0]
- def sample_group_conversation_count(group_dist_vals, group_dist_weights):
- return random.choices(group_dist_vals, group_dist_weights)[0]
- # takes I distribution, the function will scale it then return a list of samples
- def sample_initial_idle(idle_dist_vals, idle_dist_weights, n_samples):
- real_bootstrap = 30
- scaled_weights = [real_bootstrap + idle_dist_vals[i] * idle_dist_weights[i] for i in range(len(idle_dist_vals))]
- if sum(scaled_weights) == 0.0:
- # edge case where user always idled 0 seconds; say they were always idle instead
- return [SECONDS_IN_HOUR] * max(1, n_samples)
- return random.choices(idle_dist_vals, scaled_weights, k=n_samples)
- def get_free_ip(start, taken_ips):
- for i in range(start, 256):
- ip = "1.1.1.{}".format(i)
- if ip not in taken_ips:
- return ip
- else:
- print("Error: no IPs remaining in 1.1.1.0/24, modify source to use a different unused block.")
- exit(1)
- if __name__ == '__main__':
- main()
|