networkOrchestrator.cpp 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. #include <iostream>
  2. #include <fstream>
  3. #include <sstream>
  4. #include <cstdlib>
  5. #include <thread>
  6. #include "networkOrchestrator.hpp"
  7. /***************************************************
  8. ********* *********
  9. ********* orchestrator public functions *********
  10. ********* *********
  11. ***************************************************/
  12. /*
  13. * START UP AND SHUT DOWN INSTANCES
  14. */
  15. int start_remote_actor(
  16. const std::string& target,
  17. bool server,
  18. const std::string& id,
  19. const std::string& output,
  20. bool maliciousServers)
  21. {
  22. std::stringstream buffer;
  23. std::string command;
  24. if (target != "tick0" && !target.empty())
  25. buffer << "ssh -n " << target << " \"~/prsona/prsona/scripts/startup.sh " << (server ? "server " : "client ") << id << " " << output << (maliciousServers ? " T\" &" : " F\" &");
  26. else
  27. buffer << "bin/" << (server ? "server " : "client ") << id << " " << output << (maliciousServers ? " T &" : " F &");
  28. command = buffer.str();
  29. return system(command.c_str());
  30. }
  31. void shut_down_remote_actors(
  32. const std::vector<std::string>& relevantIPs,
  33. const std::vector<int>& relevantPorts)
  34. {
  35. for (size_t i = 0; i < relevantIPs.size(); i++)
  36. {
  37. // Shut downs are triggered by a GET request to the correct location
  38. std::stringstream sysString;
  39. std::string data;
  40. sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n";
  41. sysString << "Host: " << relevantIPs[i] << ":" << relevantPorts[i] << "\r\n\r\n";
  42. data = sysString.str();
  43. struct mg_connection *conn = NULL;
  44. // Connect to the instance
  45. while (!conn)
  46. {
  47. conn = mg_connect_client(relevantIPs[i].c_str(), relevantPorts[i], USE_SSL, NULL, 0);
  48. if (!conn)
  49. std::cerr << "Couldn't connect to instance at " << relevantIPs[i] << ":" << relevantPorts[i] << " for shut down." << std::endl;
  50. }
  51. // Make correct GET request
  52. mg_write(conn, data.c_str(), data.length());
  53. // Close connection
  54. mg_close_connection(conn);
  55. }
  56. }
  57. /*
  58. * SYNCHRONIZATION
  59. */
  60. void wait_for_servers_ready(
  61. std::string dealer,
  62. int dealerPort)
  63. {
  64. // Requesting information about servers being ready is done via a GET request
  65. std::stringstream sysString;
  66. std::string data;
  67. sysString << "GET " << EPOCH_READY_URI << " HTTP/1.1\r\n";
  68. sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
  69. data = sysString.str();
  70. bool ready = false;
  71. while (!ready)
  72. {
  73. struct mg_connection *conn = NULL;
  74. // Connect to the dealer
  75. while (!conn)
  76. {
  77. std::this_thread::sleep_for(ONE_SECOND);
  78. conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
  79. if (!conn)
  80. std::cerr << "Couldn't make connection while waiting for servers to be ready." << std::endl;
  81. }
  82. // Make the correct GET request
  83. mg_write(conn, data.c_str(), data.length());
  84. // Wait for a response
  85. mg_get_response(conn, NULL, 0, 250);
  86. const struct mg_response_info *info = mg_get_response_info(conn);
  87. // Close connection
  88. mg_close_connection(conn);
  89. // If the dealer says it's ready, then we can move on
  90. if (info->status_code == 200)
  91. ready = true;
  92. }
  93. }
  94. void wait_for_clients_ready(
  95. std::string dealer,
  96. int dealerPort,
  97. size_t numClients)
  98. {
  99. bool ready = false;
  100. while (!ready)
  101. {
  102. struct synchronization_tool sync;
  103. struct mg_connection *conn = NULL;
  104. // Connect to the dealer
  105. std::unique_lock<std::mutex> lck(sync.mtx);
  106. sync.val = 0;
  107. sync.val2 = 0;
  108. while (!conn)
  109. {
  110. std::this_thread::sleep_for(ONE_SECOND);
  111. conn = mg_connect_websocket_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0, REQUEST_NUM_CLIENTS_URI, "null", clients_websocket_data_handler, synchro_websocket_close_handler, &sync);
  112. if (!conn)
  113. std::cerr << "Couldn't make connection while waiting for clients to be ready." << std::endl;
  114. }
  115. // Tell the dealer we're ready for its response
  116. mg_websocket_client_write(conn, MG_WEBSOCKET_OPCODE_DATACOMPLETE, "", 0);
  117. // Wait for that response
  118. while (!sync.val2)
  119. sync.cv.wait(lck);
  120. // Close connection
  121. mg_close_connection(conn);
  122. // If the dealer says it's ready, then we can move on
  123. if (sync.val == numClients)
  124. ready = true;
  125. }
  126. }
  127. /*
  128. * RUN EXPERIMENT
  129. */
  130. void execute_experiment(
  131. std::string dealer,
  132. int dealerPort,
  133. const char *filename)
  134. {
  135. size_t line = 1;
  136. // Iterate across each line in the command file, which contains one command per line
  137. char buffer[128];
  138. std::ifstream commands(filename);
  139. while (!commands.eof())
  140. {
  141. commands.getline(buffer, 128);
  142. if (strlen(buffer) == 0)
  143. {
  144. line++;
  145. continue;
  146. }
  147. std::cout << "Command " << line << ": " << std::string(buffer) << std::endl;
  148. // The first character of each command tells us which it is
  149. switch(buffer[0])
  150. {
  151. // Vote triggers come in form `V <voterIP>:<voterPort>`
  152. case 'V':
  153. char *voter, *voterPort;
  154. voter = strtok(buffer + 1, " :");
  155. voterPort = strtok(NULL, " :");
  156. trigger_vote(std::string(voter), atoi(voterPort));
  157. break;
  158. // Reputation proof triggers come in form `R <proverIP>:<proverPort> <verifierIP>:<verifierPort>`
  159. case 'R':
  160. char *target, *targetPortStr, *verifier, *verifierPortStr;
  161. target = strtok(buffer + 1, " :");
  162. targetPortStr = strtok(NULL, " :");
  163. verifier = strtok(NULL, " :");
  164. verifierPortStr = strtok(NULL, " :");
  165. trigger_reputation_proof(
  166. std::string(target),
  167. atoi(targetPortStr),
  168. std::string(verifier),
  169. atoi(verifierPortStr));
  170. break;
  171. // Epoch change triggers come in form `E`
  172. case 'E':
  173. trigger_epoch_change(dealer, dealerPort);
  174. break;
  175. default:
  176. break;
  177. }
  178. line++;
  179. }
  180. // Don't let ourselves shut down servers and clients until we're sure they're not in the middle of anything else
  181. wait_for_servers_ready(dealer, dealerPort);
  182. }
  183. /****************************************************
  184. ********* *********
  185. ********* orchestrator private functions *********
  186. ********* *********
  187. ****************************************************/
  188. /*
  189. * TRIGGER EXPERIMENT EVENTS
  190. */
  191. void trigger_epoch_change(
  192. std::string dealer,
  193. int dealerPort)
  194. {
  195. // Give other updates a chance to resolve
  196. wait_for_servers_ready(dealer, dealerPort);
  197. // Epoch changes are triggered via GET request to the correct location
  198. std::stringstream sysString;
  199. std::string data;
  200. sysString << "GET " << TRIGGER_EPOCH_URI << " HTTP/1.1\r\n";
  201. sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
  202. data = sysString.str();
  203. struct mg_connection *conn = NULL;
  204. // Connect to the dealer
  205. while (!conn)
  206. {
  207. conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
  208. if (!conn)
  209. std::cerr << "Couldn't connect to dealer to trigger epoch change." << std::endl;
  210. }
  211. // Make the relevant GET request
  212. mg_write(conn, data.c_str(), data.length());
  213. // Close connection
  214. mg_close_connection(conn);
  215. // Don't bother giving new commands until this one has resolved
  216. wait_for_servers_ready(dealer, dealerPort);
  217. }
  218. void trigger_vote(
  219. std::string target,
  220. int targetPort)
  221. {
  222. // New votes are triggered via GET request to the correct location
  223. std::stringstream sysString;
  224. std::string data;
  225. sysString << "GET " << TRIGGER_VOTE_URI << " HTTP/1.1\r\n";
  226. sysString << "Host: " << target << ":" << targetPort << "\r\n\r\n";
  227. data = sysString.str();
  228. struct mg_connection *conn = NULL;
  229. // Connect to the client
  230. while (!conn)
  231. {
  232. conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
  233. if (!conn)
  234. std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger new vote." << std::endl;
  235. }
  236. // Make the relevant GET request
  237. mg_write(conn, data.c_str(), data.length());
  238. // Close connection
  239. mg_close_connection(conn);
  240. }
  241. void trigger_reputation_proof(
  242. std::string target,
  243. int targetPort,
  244. std::string verifier,
  245. int verifierPort)
  246. {
  247. // Reputation proofs are triggered via GET request to the correct location (with a parameter for the intended verifier)
  248. std::stringstream sysString;
  249. std::string data;
  250. sysString << "GET " << TRIGGER_REP_URI << "?" << verifier << ":" << verifierPort << " HTTP/1.1\r\n";
  251. sysString << "Host: " << target << "\r\n\r\n";
  252. data = sysString.str();
  253. struct mg_connection *conn = NULL;
  254. // Connect to the client
  255. while (!conn)
  256. {
  257. conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
  258. if (!conn)
  259. std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger reputation proof." << std::endl;
  260. }
  261. // Make the relevant GET request
  262. mg_write(conn, data.c_str(), data.length());
  263. // Close connection
  264. mg_close_connection(conn);
  265. // Give this command a small amount of time to resolve
  266. std::this_thread::sleep_for(ONE_SECOND);
  267. }