123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- #!/usr/bin/env python3
- import argparse
- import os
- import shlex
- import subprocess
- import sys
- import threading
- import yaml
- sys.path.insert(0, os.getcwd())
- import mkconfig
- # The default manifest file
- MANIFEST = "manifest.yaml"
- # The default pubkeys file
- PUBKEYS = "pubkeys.yaml"
- # The TEEMS binary
- TEEMS = "./teems"
- def launch(node, manifest, config, cmd, log_folder, epoch_time, num_epochs, num_WN):
- manifestdata = manifest[node]
- cmdline = ''
- if 'launchprefix' in manifestdata:
- cmdline = manifestdata['launchprefix'] + ' '
- cmdline += TEEMS + " -k %s -n %s" % (manifestdata['sprvfile'], node)
- if(epoch_time):
- cmdline+= ' -d ' + str(epoch_time)
- if(num_epochs):
- cmdline+= ' -e ' + str(num_epochs)
- if(num_WN):
- cmdline+= ' -w ' + str(num_WN)
- if 'args' in manifestdata:
- cmdline += ' ' + manifestdata['args']
- log_name = None
- stdout_file = subprocess.PIPE
- if(log_folder):
- log_name = log_folder + node + ".log"
- stdout_file = open(log_name, "a+")
- print(cmdline)
- proc = subprocess.Popen(shlex.split(cmdline) + cmd,
- stdin=subprocess.PIPE, stdout=stdout_file,
- stderr=subprocess.STDOUT, bufsize=0)
- proc.stdin.write(config.encode('utf-8'))
- if(log_folder == None):
- while True:
- line = proc.stdout.readline()
- if not line:
- break
- print(node + ": " + line.decode('utf-8'), end='', flush=True)
- else:
- stdout_file.close()
- if __name__ == "__main__":
- aparse = argparse.ArgumentParser(
- description='Launch TEEMS nodes'
- )
- aparse.add_argument('-m', default=MANIFEST,
- help='manifest.yaml file')
- aparse.add_argument('-p', default=PUBKEYS,
- help='pubkeys.yaml file')
- aparse.add_argument('-z', default=None,
- help='override message size')
- aparse.add_argument('-u', default=None,
- help='override max number of users')
- aparse.add_argument('-B', default=None,
- help='override max number of outgoing private messages per user per epoch')
- aparse.add_argument('-b', default=None,
- help='override max number of incoming private messages per user per epoch')
- aparse.add_argument('-C', default=None,
- help='override max number of outgoing public messages per user per epoch')
- aparse.add_argument('-c', default=None,
- help='override max number of incoming public messages per user per epoch')
- aparse.add_argument('-l', default=None,
- help='log folder to store logs of each server in an experiment')
- aparse.add_argument('-d', default=None,
- help='Set epoch interval time in seconds')
- aparse.add_argument('-e', default=None,
- help='Set number of epochs')
- aparse.add_argument('-w', default=None,
- help='Set number of Waksman Networks to precompute before starting epochs')
- aparse.add_argument('-r', default=None,
- help='override if routing private channel messages (or public)')
- aparse.add_argument('-n', nargs='*', help='nodes to include')
- aparse.add_argument('cmd', nargs='*', help='experiment to run')
- args = aparse.parse_args()
- with open(args.m) as mf:
- manifest = yaml.safe_load(mf)
- params_overrides = {
- 'msg_size': args.z,
- 'user_count': args.u,
- 'priv_out': args.B,
- 'priv_in': args.b,
- 'pub_out': args.C,
- 'pub_in': args.c,
- 'private_routing': args.r
- }
- config = mkconfig.create_json(args.m, args.p, args.n, params_overrides)
- # There must not be any newlines in the config json string
- if "\n" in config:
- print("Error: config.json must not contain embedded newlines")
- sys.exit(1)
- # Now add a trailing newline
- config += "\n"
- nodelist = args.n
- if nodelist is None or len(nodelist) == 0:
- nodelist = manifest.keys()
- threadlist = []
- for node in nodelist:
- if node == "params":
- continue
- thread = threading.Thread(target=launch,
- args=(node, manifest, config, args.cmd, args.l, args.d, args.e, args.w))
- thread.start()
- threadlist.append(thread)
- for thread in threadlist:
- thread.join()
|