launch 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. #!/usr/bin/env python3
  2. import argparse
  3. import os
  4. import shlex
  5. import subprocess
  6. import sys
  7. import threading
  8. import yaml
  9. sys.path.insert(0, os.getcwd())
  10. import mkconfig
  11. # The default manifest file
  12. MANIFEST = "manifest.yaml"
  13. # The default pubkeys file
  14. PUBKEYS = "pubkeys.yaml"
  15. # The TEEMS binary
  16. TEEMS = "./teems"
  17. def launch(node, manifest, config, cmd, log_folder, epoch_time, num_epochs, num_WN):
  18. manifestdata = manifest[node]
  19. cmdline = ''
  20. if 'launchprefix' in manifestdata:
  21. cmdline = manifestdata['launchprefix'] + ' '
  22. cmdline += TEEMS + " -k %s -n %s" % (manifestdata['sprvfile'], node)
  23. if(epoch_time):
  24. cmdline+= ' -d ' + str(epoch_time)
  25. if(num_epochs):
  26. cmdline+= ' -e ' + str(num_epochs)
  27. if(num_WN):
  28. cmdline+= ' -w ' + str(num_WN)
  29. if 'args' in manifestdata:
  30. cmdline += ' ' + manifestdata['args']
  31. log_name = None
  32. stdout_file = subprocess.PIPE
  33. if(log_folder):
  34. log_name = log_folder + node + ".log"
  35. stdout_file = open(log_name, "a+")
  36. print(cmdline)
  37. proc = subprocess.Popen(shlex.split(cmdline) + cmd,
  38. stdin=subprocess.PIPE, stdout=stdout_file,
  39. stderr=subprocess.STDOUT, bufsize=0)
  40. proc.stdin.write(config.encode('utf-8'))
  41. if(log_folder == None):
  42. while True:
  43. line = proc.stdout.readline()
  44. if not line:
  45. break
  46. print(node + ": " + line.decode('utf-8'), end='', flush=True)
  47. else:
  48. stdout_file.close()
  49. if __name__ == "__main__":
  50. aparse = argparse.ArgumentParser(
  51. description='Launch TEEMS nodes'
  52. )
  53. aparse.add_argument('-m', default=MANIFEST,
  54. help='manifest.yaml file')
  55. aparse.add_argument('-p', default=PUBKEYS,
  56. help='pubkeys.yaml file')
  57. aparse.add_argument('-z', default=None,
  58. help='override message size')
  59. aparse.add_argument('-u', default=None,
  60. help='override max number of users')
  61. aparse.add_argument('-B', default=None,
  62. help='override max number of outgoing private messages per user per epoch')
  63. aparse.add_argument('-b', default=None,
  64. help='override max number of incoming private messages per user per epoch')
  65. aparse.add_argument('-C', default=None,
  66. help='override max number of outgoing public messages per user per epoch')
  67. aparse.add_argument('-c', default=None,
  68. help='override max number of incoming public messages per user per epoch')
  69. aparse.add_argument('-l', default=None,
  70. help='log folder to store logs of each server in an experiment')
  71. aparse.add_argument('-d', default=None,
  72. help='Set epoch interval time in seconds')
  73. aparse.add_argument('-e', default=None,
  74. help='Set number of epochs')
  75. aparse.add_argument('-w', default=None,
  76. help='Set number of Waksman Networks to precompute before starting epochs')
  77. aparse.add_argument('-r', default=None,
  78. help='override if routing private channel messages (or public)')
  79. aparse.add_argument('-n', nargs='*', help='nodes to include')
  80. aparse.add_argument('cmd', nargs='*', help='experiment to run')
  81. args = aparse.parse_args()
  82. with open(args.m) as mf:
  83. manifest = yaml.safe_load(mf)
  84. params_overrides = {
  85. 'msg_size': args.z,
  86. 'user_count': args.u,
  87. 'priv_out': args.B,
  88. 'priv_in': args.b,
  89. 'pub_out': args.C,
  90. 'pub_in': args.c,
  91. 'private_routing': args.r
  92. }
  93. config = mkconfig.create_json(args.m, args.p, args.n, params_overrides)
  94. # There must not be any newlines in the config json string
  95. if "\n" in config:
  96. print("Error: config.json must not contain embedded newlines")
  97. sys.exit(1)
  98. # Now add a trailing newline
  99. config += "\n"
  100. nodelist = args.n
  101. if nodelist is None or len(nodelist) == 0:
  102. nodelist = manifest.keys()
  103. threadlist = []
  104. for node in nodelist:
  105. if node == "params":
  106. continue
  107. thread = threading.Thread(target=launch,
  108. args=(node, manifest, config, args.cmd, args.l, args.d, args.e, args.w))
  109. thread.start()
  110. threadlist.append(thread)
  111. for thread in threadlist:
  112. thread.join()