networkOrchestrator.cpp 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. #include <iostream>
  2. #include <fstream>
  3. #include <sstream>
  4. #include <cstdlib>
  5. #include <thread>
  6. #include "networkOrchestrator.hpp"
  7. /***************************************************
  8. ********* *********
  9. ********* orchestrator public functions *********
  10. ********* *********
  11. ***************************************************/
  12. /*
  13. * START UP AND SHUT DOWN INSTANCES
  14. */
  15. int start_remote_actor(
  16. const std::string& target,
  17. bool server,
  18. const std::string& id,
  19. const std::string& output,
  20. size_t lambda,
  21. bool maliciousServers)
  22. {
  23. std::stringstream buffer;
  24. std::string command;
  25. if (target != "tick0" && !target.empty())
  26. buffer << "ssh -n " << target << " \"~/prsona/prsona/scripts/startup.sh " << (server ? "server " : "client ") << id << " " << output << " " << lambda << (maliciousServers ? " T\" &" : " F\" &");
  27. else
  28. buffer << "bin/" << (server ? "server " : "client ") << id << " " << output << " " << lambda << (maliciousServers ? " T &" : " F &");
  29. command = buffer.str();
  30. return system(command.c_str());
  31. }
  32. void shut_down_remote_actors(
  33. const std::vector<std::string>& relevantIPs,
  34. const std::vector<int>& relevantPorts)
  35. {
  36. for (size_t i = 0; i < relevantIPs.size(); i++)
  37. {
  38. // Shut downs are triggered by a GET request to the correct location
  39. std::stringstream sysString;
  40. std::string data;
  41. sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n";
  42. sysString << "Host: " << relevantIPs[i] << ":" << relevantPorts[i] << "\r\n\r\n";
  43. data = sysString.str();
  44. struct mg_connection *conn = NULL;
  45. // Connect to the instance
  46. while (!conn)
  47. {
  48. conn = mg_connect_client(relevantIPs[i].c_str(), relevantPorts[i], USE_SSL, NULL, 0);
  49. if (!conn)
  50. std::cerr << "Couldn't connect to instance at " << relevantIPs[i] << ":" << relevantPorts[i] << " for shut down." << std::endl;
  51. }
  52. // Make correct GET request
  53. mg_write(conn, data.c_str(), data.length());
  54. // Close connection
  55. mg_close_connection(conn);
  56. }
  57. }
  58. /*
  59. * SYNCHRONIZATION
  60. */
  61. void wait_for_servers_ready(
  62. std::string dealer,
  63. int dealerPort)
  64. {
  65. // Requesting information about servers being ready is done via a GET request
  66. std::stringstream sysString;
  67. std::string data;
  68. sysString << "GET " << EPOCH_READY_URI << " HTTP/1.1\r\n";
  69. sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
  70. data = sysString.str();
  71. bool ready = false;
  72. while (!ready)
  73. {
  74. struct mg_connection *conn = NULL;
  75. // Connect to the dealer
  76. while (!conn)
  77. {
  78. std::this_thread::sleep_for(ONE_SECOND);
  79. conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
  80. if (!conn)
  81. std::cerr << "Couldn't make connection while waiting for servers to be ready." << std::endl;
  82. }
  83. // Make the correct GET request
  84. mg_write(conn, data.c_str(), data.length());
  85. // Wait for a response
  86. mg_get_response(conn, NULL, 0, 250);
  87. const struct mg_response_info *info = mg_get_response_info(conn);
  88. // Close connection
  89. mg_close_connection(conn);
  90. // If the dealer says it's ready, then we can move on
  91. if (info->status_code == 200)
  92. ready = true;
  93. }
  94. }
  95. void wait_for_clients_ready(
  96. std::string dealer,
  97. int dealerPort,
  98. size_t numClients)
  99. {
  100. bool ready = false;
  101. while (!ready)
  102. {
  103. struct synchronization_tool sync;
  104. struct mg_connection *conn = NULL;
  105. // Connect to the dealer
  106. std::unique_lock<std::mutex> lck(sync.mtx);
  107. sync.val = 0;
  108. sync.val2 = 0;
  109. while (!conn)
  110. {
  111. std::this_thread::sleep_for(ONE_SECOND);
  112. conn = mg_connect_websocket_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0, REQUEST_NUM_CLIENTS_URI, "null", clients_websocket_data_handler, synchro_websocket_close_handler, &sync);
  113. if (!conn)
  114. std::cerr << "Couldn't make connection while waiting for clients to be ready." << std::endl;
  115. }
  116. // Tell the dealer we're ready for its response
  117. mg_websocket_client_write(conn, MG_WEBSOCKET_OPCODE_DATACOMPLETE, "", 0);
  118. // Wait for that response
  119. while (!sync.val2)
  120. sync.cv.wait(lck);
  121. // Close connection
  122. mg_close_connection(conn);
  123. // If the dealer says it's ready, then we can move on
  124. if (sync.val == numClients)
  125. ready = true;
  126. }
  127. }
  128. /*
  129. * RUN EXPERIMENT
  130. */
  131. void execute_experiment(
  132. std::string dealer,
  133. int dealerPort,
  134. const char *filename)
  135. {
  136. size_t line = 1;
  137. // Iterate across each line in the command file, which contains one command per line
  138. char buffer[128];
  139. std::ifstream commands(filename);
  140. while (!commands.eof())
  141. {
  142. commands.getline(buffer, 128);
  143. if (strlen(buffer) == 0)
  144. {
  145. line++;
  146. continue;
  147. }
  148. std::cout << "Command " << line << ": " << std::string(buffer) << std::endl;
  149. // The first character of each command tells us which it is
  150. switch(buffer[0])
  151. {
  152. // Vote triggers come in form `V <voterIP>:<voterPort>`
  153. case 'V':
  154. char *voter, *voterPort;
  155. voter = strtok(buffer + 1, " :");
  156. voterPort = strtok(NULL, " :");
  157. trigger_vote(std::string(voter), atoi(voterPort));
  158. break;
  159. // Reputation proof triggers come in form `R <proverIP>:<proverPort> <verifierIP>:<verifierPort>`
  160. case 'R':
  161. char *target, *targetPortStr, *verifier, *verifierPortStr;
  162. target = strtok(buffer + 1, " :");
  163. targetPortStr = strtok(NULL, " :");
  164. verifier = strtok(NULL, " :");
  165. verifierPortStr = strtok(NULL, " :");
  166. trigger_reputation_proof(
  167. std::string(target),
  168. atoi(targetPortStr),
  169. std::string(verifier),
  170. atoi(verifierPortStr));
  171. break;
  172. // Epoch change triggers come in form `E`
  173. case 'E':
  174. trigger_epoch_change(dealer, dealerPort);
  175. break;
  176. default:
  177. break;
  178. }
  179. line++;
  180. }
  181. // Don't let ourselves shut down servers and clients until we're sure they're not in the middle of anything else
  182. wait_for_servers_ready(dealer, dealerPort);
  183. }
  184. /****************************************************
  185. ********* *********
  186. ********* orchestrator private functions *********
  187. ********* *********
  188. ****************************************************/
  189. /*
  190. * TRIGGER EXPERIMENT EVENTS
  191. */
  192. void trigger_epoch_change(
  193. std::string dealer,
  194. int dealerPort)
  195. {
  196. // Give other updates a chance to resolve
  197. wait_for_servers_ready(dealer, dealerPort);
  198. // Epoch changes are triggered via GET request to the correct location
  199. std::stringstream sysString;
  200. std::string data;
  201. sysString << "GET " << TRIGGER_EPOCH_URI << " HTTP/1.1\r\n";
  202. sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
  203. data = sysString.str();
  204. struct mg_connection *conn = NULL;
  205. // Connect to the dealer
  206. while (!conn)
  207. {
  208. conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
  209. if (!conn)
  210. std::cerr << "Couldn't connect to dealer to trigger epoch change." << std::endl;
  211. }
  212. // Make the relevant GET request
  213. mg_write(conn, data.c_str(), data.length());
  214. // Close connection
  215. mg_close_connection(conn);
  216. // Don't bother giving new commands until this one has resolved
  217. wait_for_servers_ready(dealer, dealerPort);
  218. }
  219. void trigger_vote(
  220. std::string target,
  221. int targetPort)
  222. {
  223. // New votes are triggered via GET request to the correct location
  224. std::stringstream sysString;
  225. std::string data;
  226. sysString << "GET " << TRIGGER_VOTE_URI << " HTTP/1.1\r\n";
  227. sysString << "Host: " << target << ":" << targetPort << "\r\n\r\n";
  228. data = sysString.str();
  229. struct mg_connection *conn = NULL;
  230. // Connect to the client
  231. while (!conn)
  232. {
  233. conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
  234. if (!conn)
  235. std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger new vote." << std::endl;
  236. }
  237. // Make the relevant GET request
  238. mg_write(conn, data.c_str(), data.length());
  239. // Close connection
  240. mg_close_connection(conn);
  241. }
  242. void trigger_reputation_proof(
  243. std::string target,
  244. int targetPort,
  245. std::string verifier,
  246. int verifierPort)
  247. {
  248. // Reputation proofs are triggered via GET request to the correct location (with a parameter for the intended verifier)
  249. std::stringstream sysString;
  250. std::string data;
  251. sysString << "GET " << TRIGGER_REP_URI << "?" << verifier << ":" << verifierPort << " HTTP/1.1\r\n";
  252. sysString << "Host: " << target << "\r\n\r\n";
  253. data = sysString.str();
  254. struct mg_connection *conn = NULL;
  255. // Connect to the client
  256. while (!conn)
  257. {
  258. conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
  259. if (!conn)
  260. std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger reputation proof." << std::endl;
  261. }
  262. // Make the relevant GET request
  263. mg_write(conn, data.c_str(), data.length());
  264. // Close connection
  265. mg_close_connection(conn);
  266. // Give this command a small amount of time to resolve
  267. std::this_thread::sleep_for(ONE_SECOND);
  268. }