networkOrchestrator.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421
  1. #include <iostream>
  2. #include <fstream>
  3. #include <sstream>
  4. #include <algorithm>
  5. #include <cstdlib>
  6. #include <thread>
  7. #include "networkOrchestrator.hpp"
  8. /***************************************************
  9. ********* *********
  10. ********* orchestrator public functions *********
  11. ********* *********
  12. ***************************************************/
  13. /*
  14. * START UP AND SHUT DOWN INSTANCES
  15. */
  16. int start_remote_actor(
  17. const std::string& target,
  18. bool server,
  19. const std::string& id,
  20. const std::string& output,
  21. size_t lambda,
  22. bool maliciousServers)
  23. {
  24. std::stringstream buffer;
  25. std::string command;
  26. if (target != "tick0" && !target.empty())
  27. buffer << "ssh -n " << target << " \"~/prsona/prsona/scripts/startup.sh " << (server ? "server " : "client ") << id << " " << output << " " << lambda << (maliciousServers ? " T\" &" : " F\" &");
  28. else
  29. buffer << "bin/" << (server ? "server " : "client ") << id << " " << output << " " << lambda << (maliciousServers ? " T &" : " F &");
  30. command = buffer.str();
  31. return system(command.c_str());
  32. }
  33. void shut_down_remote_actors(
  34. const std::vector<std::string>& relevantIPs,
  35. const std::vector<int>& relevantPorts)
  36. {
  37. for (size_t i = 0; i < relevantIPs.size(); i++)
  38. {
  39. // Shut downs are triggered by a GET request to the correct location
  40. std::stringstream sysString;
  41. std::string data;
  42. sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n";
  43. sysString << "Host: " << relevantIPs[i] << ":" << relevantPorts[i] << "\r\n\r\n";
  44. data = sysString.str();
  45. struct mg_connection *conn = NULL;
  46. // Connect to the instance
  47. while (!conn)
  48. {
  49. conn = mg_connect_client(relevantIPs[i].c_str(), relevantPorts[i], USE_SSL, NULL, 0);
  50. if (!conn)
  51. std::cerr << "Couldn't connect to instance at " << relevantIPs[i] << ":" << relevantPorts[i] << " for shut down." << std::endl;
  52. }
  53. // Make correct GET request
  54. mg_write(conn, data.c_str(), data.length());
  55. // Close connection
  56. mg_close_connection(conn);
  57. }
  58. }
  59. /*
  60. * SYNCHRONIZATION
  61. */
  62. void wait_for_servers_ready(
  63. std::string dealer,
  64. int dealerPort)
  65. {
  66. // Requesting information about servers being ready is done via a GET request
  67. std::stringstream sysString;
  68. std::string data;
  69. sysString << "GET " << EPOCH_READY_URI << " HTTP/1.1\r\n";
  70. sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
  71. data = sysString.str();
  72. bool ready = false;
  73. while (!ready)
  74. {
  75. struct mg_connection *conn = NULL;
  76. // Connect to the dealer
  77. while (!conn)
  78. {
  79. conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
  80. if (!conn)
  81. {
  82. std::cerr << "Couldn't make connection while waiting for servers to be ready." << std::endl;
  83. std::this_thread::sleep_for(HALF_SECOND);
  84. }
  85. }
  86. // Make the correct GET request
  87. mg_write(conn, data.c_str(), data.length());
  88. // Wait for a response
  89. mg_get_response(conn, NULL, 0, 250);
  90. const struct mg_response_info *info = mg_get_response_info(conn);
  91. // Close connection
  92. mg_close_connection(conn);
  93. // If the dealer says it's ready, then we can move on
  94. if (info->status_code == 200)
  95. ready = true;
  96. }
  97. }
  98. void wait_for_clients_created(
  99. std::string dealer,
  100. int dealerPort,
  101. size_t numClients)
  102. {
  103. bool ready = false;
  104. while (!ready)
  105. {
  106. struct synchronization_tool sync;
  107. struct mg_connection *conn = NULL;
  108. // Connect to the dealer
  109. std::unique_lock<std::mutex> lck(sync.mtx);
  110. sync.val = 0;
  111. sync.val2 = 0;
  112. while (!conn)
  113. {
  114. conn = mg_connect_websocket_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0, REQUEST_NUM_CLIENTS_URI, "null", clients_websocket_data_handler, synchro_websocket_close_handler, &sync);
  115. if (!conn)
  116. {
  117. std::cerr << "Couldn't make connection while waiting for clients to be ready." << std::endl;
  118. std::this_thread::sleep_for(HALF_SECOND);
  119. }
  120. }
  121. // Tell the dealer we're ready for its response
  122. mg_websocket_client_write(conn, MG_WEBSOCKET_OPCODE_DATACOMPLETE, "", 0);
  123. // Wait for that response
  124. while (!sync.val2)
  125. sync.cv.wait(lck);
  126. // Close connection
  127. mg_close_connection(conn);
  128. // If the dealer says it's ready, then we can move on
  129. if (sync.val == numClients)
  130. ready = true;
  131. }
  132. }
  133. void wait_for_client_ready(
  134. std::string client,
  135. int clientPort)
  136. {
  137. // Requesting information about clients being ready is done via a GET request
  138. std::stringstream sysString;
  139. std::string data;
  140. sysString << "GET " << CLIENT_READY_URI << " HTTP/1.1\r\n";
  141. sysString << "Host: " << client << ":" << clientPort << "\r\n\r\n";
  142. data = sysString.str();
  143. bool ready = false;
  144. while (!ready)
  145. {
  146. struct mg_connection *conn = NULL;
  147. // Connect to the client
  148. while (!conn)
  149. {
  150. conn = mg_connect_client(client.c_str(), clientPort, USE_SSL, NULL, 0);
  151. if (!conn)
  152. {
  153. std::cerr << "Couldn't make connection while waiting for client (" << client << ":" << clientPort << ") to be ready." << std::endl;
  154. std::this_thread::sleep_for(HALF_SECOND);
  155. }
  156. }
  157. // Make the correct GET request
  158. mg_write(conn, data.c_str(), data.length());
  159. // Wait for a response
  160. mg_get_response(conn, NULL, 0, 250);
  161. const struct mg_response_info *info = mg_get_response_info(conn);
  162. // Close connection
  163. mg_close_connection(conn);
  164. // If the client says it's ready, then we can move on
  165. if (info->status_code == 200)
  166. ready = true;
  167. }
  168. }
  169. /*
  170. * RUN EXPERIMENT
  171. */
  172. void execute_experiment(
  173. std::default_random_engine& rng,
  174. std::string dealerIP,
  175. int dealerPort,
  176. std::vector<std::string> serverIPs,
  177. std::vector<int> serverPorts,
  178. std::vector<std::string> clientIPs,
  179. std::vector<int> clientPorts,
  180. const char *filename)
  181. {
  182. size_t line = 1;
  183. // Iterate across each line in the command file, which contains one command per line
  184. char buffer[128];
  185. std::ifstream commands(filename);
  186. while (!commands.eof())
  187. {
  188. commands.getline(buffer, 128);
  189. if (strlen(buffer) == 0)
  190. {
  191. line++;
  192. continue;
  193. }
  194. std::cout << "Command " << line << ": " << std::string(buffer) << std::endl;
  195. std::vector<size_t> whichActors;
  196. std::vector<std::vector<size_t>> proofActors;
  197. std::vector<std::thread> clientWaiters;
  198. int numVoters, numProofs;
  199. // The first character of each command tells us which it is
  200. switch(buffer[0])
  201. {
  202. // Vote triggers come in form `V <numVoters>`
  203. case 'V':
  204. numVoters = atoi(strtok(buffer + 1, " "));
  205. whichActors = generate_random_set(rng, numVoters, clientIPs.size());
  206. for (size_t i = 0; i < whichActors.size(); i++)
  207. trigger_vote(clientIPs[whichActors[i]], clientPorts[whichActors[i]]);
  208. std::this_thread::sleep_for(HALF_SECOND);
  209. for (size_t i = 0; i < whichActors.size(); i++)
  210. clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[whichActors[i]], clientPorts[whichActors[i]]));
  211. for (size_t i = 0; i < clientWaiters.size(); i++)
  212. clientWaiters[i].join();
  213. clientWaiters.clear();
  214. break;
  215. // Reputation proof triggers come in form `R <numProofs>`
  216. case 'R':
  217. numProofs = atoi(strtok(buffer + 1, " "));
  218. for (int i = 0; i < numProofs; i++)
  219. {
  220. whichActors = generate_random_set(rng, 2, clientIPs.size());
  221. trigger_reputation_proof(
  222. clientIPs[whichActors[0]],
  223. clientPorts[whichActors[0]],
  224. clientIPs[whichActors[1]],
  225. clientPorts[whichActors[1]]);
  226. proofActors.push_back(whichActors);
  227. }
  228. std::this_thread::sleep_for(HALF_SECOND);
  229. for (size_t i = 0; i < proofActors.size(); i++)
  230. clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[proofActors[i][0]], clientPorts[proofActors[i][0]]));
  231. for (size_t i = 0; i < clientWaiters.size(); i++)
  232. clientWaiters[i].join();
  233. proofActors.clear();
  234. clientWaiters.clear();
  235. break;
  236. // Epoch change triggers come in form `E`
  237. case 'E':
  238. trigger_epoch_change(dealerIP, dealerPort);
  239. std::this_thread::sleep_for(HALF_SECOND);
  240. wait_for_servers_ready(dealerIP, dealerPort);
  241. break;
  242. default:
  243. break;
  244. }
  245. line++;
  246. }
  247. // Don't let ourselves shut down servers and clients until we're sure they're not in the middle of anything else
  248. wait_for_servers_ready(dealerIP, dealerPort);
  249. for (size_t i = 0; i < clientIPs.size(); i++)
  250. wait_for_client_ready(clientIPs[i], clientPorts[i]);
  251. }
  252. /****************************************************
  253. ********* *********
  254. ********* orchestrator private functions *********
  255. ********* *********
  256. ****************************************************/
  257. /*
  258. * TRIGGER EXPERIMENT EVENTS
  259. */
  260. void trigger_epoch_change(
  261. std::string dealer,
  262. int dealerPort)
  263. {
  264. // Epoch changes are triggered via GET request to the correct location
  265. std::stringstream sysString;
  266. std::string data;
  267. sysString << "GET " << TRIGGER_EPOCH_URI << " HTTP/1.1\r\n";
  268. sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
  269. data = sysString.str();
  270. struct mg_connection *conn = NULL;
  271. // Connect to the dealer
  272. while (!conn)
  273. {
  274. conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
  275. if (!conn)
  276. std::cerr << "Couldn't connect to dealer to trigger epoch change." << std::endl;
  277. }
  278. // Make the relevant GET request
  279. mg_write(conn, data.c_str(), data.length());
  280. // Close connection
  281. mg_close_connection(conn);
  282. }
  283. void trigger_vote(
  284. std::string target,
  285. int targetPort)
  286. {
  287. // New votes are triggered via GET request to the correct location
  288. std::stringstream sysString;
  289. std::string data;
  290. sysString << "GET " << TRIGGER_VOTE_URI << " HTTP/1.1\r\n";
  291. sysString << "Host: " << target << ":" << targetPort << "\r\n\r\n";
  292. data = sysString.str();
  293. struct mg_connection *conn = NULL;
  294. // Connect to the client
  295. while (!conn)
  296. {
  297. conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
  298. if (!conn)
  299. std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger new vote." << std::endl;
  300. }
  301. // Make the relevant GET request
  302. mg_write(conn, data.c_str(), data.length());
  303. // Close connection
  304. mg_close_connection(conn);
  305. }
  306. void trigger_reputation_proof(
  307. std::string target,
  308. int targetPort,
  309. std::string verifier,
  310. int verifierPort)
  311. {
  312. // Reputation proofs are triggered via GET request to the correct location (with a parameter for the intended verifier)
  313. std::stringstream sysString;
  314. std::string data;
  315. sysString << "GET " << TRIGGER_REP_URI << "?" << verifier << ":" << verifierPort << " HTTP/1.1\r\n";
  316. sysString << "Host: " << target << "\r\n\r\n";
  317. data = sysString.str();
  318. struct mg_connection *conn = NULL;
  319. // Connect to the client
  320. while (!conn)
  321. {
  322. conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
  323. if (!conn)
  324. std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger reputation proof." << std::endl;
  325. }
  326. // Make the relevant GET request
  327. mg_write(conn, data.c_str(), data.length());
  328. // Close connection
  329. mg_close_connection(conn);
  330. }
  331. /*
  332. * EXECUTOR HELPER
  333. */
  334. std::vector<size_t> generate_random_set(
  335. std::default_random_engine& rng,
  336. size_t size,
  337. size_t maxVal)
  338. {
  339. std::vector<size_t> holder;
  340. for (size_t i = 0; i < maxVal; i++)
  341. holder.push_back(i);
  342. shuffle(holder.begin(), holder.end(), rng);
  343. if (size > holder.size())
  344. size = holder.size();
  345. return std::vector<size_t>(holder.begin(), holder.begin() + size);
  346. }