networkOrchestrator.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. #include <iostream>
  2. #include <fstream>
  3. #include <sstream>
  4. #include <algorithm>
  5. #include <cstdlib>
  6. #include <thread>
  7. #include "networkOrchestrator.hpp"
  8. /***************************************************
  9. ********* *********
  10. ********* orchestrator public functions *********
  11. ********* *********
  12. ***************************************************/
  13. /*
  14. * START UP AND SHUT DOWN INSTANCES
  15. */
  16. int start_remote_actor(
  17. const std::string& target,
  18. bool server,
  19. const std::string& id,
  20. const std::string& output,
  21. size_t lambda,
  22. bool maliciousServers)
  23. {
  24. std::stringstream buffer;
  25. std::string command;
  26. if (target != "tick0" && !target.empty())
  27. buffer << "ssh -n " << target << " \"~/prsona/prsona/scripts/startup.sh " << (server ? "server " : "client ") << id << " " << output << " " << lambda << (maliciousServers ? " T\" &" : " F\" &");
  28. else
  29. buffer << "bin/" << (server ? "server " : "client ") << id << " " << output << " " << lambda << (maliciousServers ? " T &" : " F &");
  30. command = buffer.str();
  31. return system(command.c_str());
  32. }
  33. void shut_down_remote_actors(
  34. const std::vector<std::string>& relevantIPs,
  35. const std::vector<int>& relevantPorts)
  36. {
  37. for (size_t i = 0; i < relevantIPs.size(); i++)
  38. {
  39. // Shut downs are triggered by a GET request to the correct location
  40. std::stringstream sysString;
  41. std::string data;
  42. sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n";
  43. sysString << "Host: " << relevantIPs[i] << ":" << relevantPorts[i] << "\r\n\r\n";
  44. data = sysString.str();
  45. struct mg_connection *conn = NULL;
  46. // Connect to the instance
  47. while (!conn)
  48. {
  49. conn = mg_connect_client(relevantIPs[i].c_str(), relevantPorts[i], USE_SSL, NULL, 0);
  50. if (!conn)
  51. std::cerr << "Couldn't connect to instance at " << relevantIPs[i] << ":" << relevantPorts[i] << " for shut down." << std::endl;
  52. }
  53. // Make correct GET request
  54. mg_write(conn, data.c_str(), data.length());
  55. // Close connection
  56. mg_close_connection(conn);
  57. }
  58. }
  59. /*
  60. * SYNCHRONIZATION
  61. */
  62. void wait_for_servers_ready(
  63. std::string dealer,
  64. int dealerPort)
  65. {
  66. // Requesting information about servers being ready is done via a GET request
  67. std::stringstream sysString;
  68. std::string data;
  69. sysString << "GET " << EPOCH_READY_URI << " HTTP/1.1\r\n";
  70. sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
  71. data = sysString.str();
  72. bool ready = false;
  73. while (!ready)
  74. {
  75. struct mg_connection *conn = NULL;
  76. // Connect to the dealer
  77. while (!conn)
  78. {
  79. conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
  80. if (!conn)
  81. {
  82. std::cerr << "Couldn't make connection while waiting for servers to be ready." << std::endl;
  83. std::this_thread::sleep_for(HALF_SECOND);
  84. }
  85. }
  86. // Make the correct GET request
  87. mg_write(conn, data.c_str(), data.length());
  88. // Wait for a response
  89. mg_get_response(conn, NULL, 0, 250);
  90. const struct mg_response_info *info = mg_get_response_info(conn);
  91. // Close connection
  92. mg_close_connection(conn);
  93. // If the dealer says it's ready, then we can move on
  94. if (info->status_code == 200)
  95. ready = true;
  96. }
  97. }
  98. void wait_for_clients_created(
  99. std::string dealer,
  100. int dealerPort,
  101. size_t numClients)
  102. {
  103. bool ready = false;
  104. while (!ready)
  105. {
  106. struct synchronization_tool sync;
  107. struct mg_connection *conn = NULL;
  108. // Connect to the dealer
  109. std::unique_lock<std::mutex> lck(sync.mtx);
  110. sync.val = 0;
  111. sync.val2 = 0;
  112. while (!conn)
  113. {
  114. conn = mg_connect_websocket_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0, REQUEST_NUM_CLIENTS_URI, "null", clients_websocket_data_handler, synchro_websocket_close_handler, &sync);
  115. if (!conn)
  116. {
  117. std::cerr << "Couldn't make connection while waiting for clients to be ready." << std::endl;
  118. std::this_thread::sleep_for(HALF_SECOND);
  119. }
  120. }
  121. // Tell the dealer we're ready for its response
  122. mg_websocket_client_write(conn, MG_WEBSOCKET_OPCODE_DATACOMPLETE, "", 0);
  123. // Wait for that response
  124. while (!sync.val2)
  125. sync.cv.wait(lck);
  126. // Close connection
  127. mg_close_connection(conn);
  128. // If the dealer says it's ready, then we can move on
  129. if (sync.val == numClients)
  130. ready = true;
  131. }
  132. }
  133. void wait_for_client_ready(
  134. std::string client,
  135. int clientPort)
  136. {
  137. // Requesting information about clients being ready is done via a GET request
  138. std::stringstream sysString;
  139. std::string data;
  140. sysString << "GET " << CLIENT_READY_URI << " HTTP/1.1\r\n";
  141. sysString << "Host: " << client << ":" << clientPort << "\r\n\r\n";
  142. data = sysString.str();
  143. bool ready = false;
  144. while (!ready)
  145. {
  146. struct mg_connection *conn = NULL;
  147. // Connect to the client
  148. while (!conn)
  149. {
  150. conn = mg_connect_client(client.c_str(), clientPort, USE_SSL, NULL, 0);
  151. if (!conn)
  152. {
  153. std::cerr << "Couldn't make connection while waiting for client (" << client << ":" << clientPort << ") to be ready." << std::endl;
  154. std::this_thread::sleep_for(HALF_SECOND);
  155. }
  156. }
  157. // Make the correct GET request
  158. mg_write(conn, data.c_str(), data.length());
  159. // Wait for a response
  160. mg_get_response(conn, NULL, 0, 250);
  161. const struct mg_response_info *info = mg_get_response_info(conn);
  162. // Close connection
  163. mg_close_connection(conn);
  164. // If the client says it's ready, then we can move on
  165. if (info->status_code == 200)
  166. ready = true;
  167. }
  168. }
  169. /*
  170. * RUN EXPERIMENT
  171. */
  172. void execute_experiment(
  173. std::default_random_engine& rng,
  174. std::string dealerIP,
  175. int dealerPort,
  176. std::vector<std::string> serverIPs,
  177. std::vector<int> serverPorts,
  178. std::vector<std::string> clientIPs,
  179. std::vector<int> clientPorts,
  180. const char *filename)
  181. {
  182. size_t line = 1;
  183. // Iterate across each line in the command file, which contains one command per line
  184. char buffer[128];
  185. std::ifstream commands(filename);
  186. while (!commands.eof())
  187. {
  188. commands.getline(buffer, 128);
  189. if (strlen(buffer) == 0)
  190. {
  191. line++;
  192. continue;
  193. }
  194. std::cout << "Command " << line << ": " << std::string(buffer) << std::endl;
  195. std::vector<size_t> whichActors;
  196. std::vector<std::vector<size_t>> proofActors;
  197. std::vector<std::thread> clientWaiters;
  198. int numVoters, numProofs;
  199. // The first character of each command tells us which it is
  200. switch(buffer[0])
  201. {
  202. // Vote triggers come in form `V <numVoters>`
  203. case 'V':
  204. numVoters = atoi(strtok(buffer + 1, " "));
  205. whichActors = generate_random_set(rng, numVoters, clientIPs.size());
  206. for (size_t i = 0; i < whichActors.size(); i++)
  207. trigger_vote(clientIPs[whichActors[i]], clientPorts[whichActors[i]]);
  208. for (size_t i = 0; i < whichActors.size(); i++)
  209. clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[whichActors[i]], clientPorts[whichActors[i]]));
  210. for (size_t i = 0; i < clientWaiters.size(); i++)
  211. clientWaiters[i].join();
  212. clientWaiters.clear();
  213. break;
  214. // Reputation proof triggers come in form `R <numProofs>`
  215. case 'R':
  216. numProofs = atoi(strtok(buffer + 1, " "));
  217. for (int i = 0; i < numProofs; i++)
  218. {
  219. whichActors = generate_random_set(rng, 2, clientIPs.size());
  220. trigger_reputation_proof(
  221. clientIPs[whichActors[0]],
  222. clientPorts[whichActors[0]],
  223. clientIPs[whichActors[1]],
  224. clientPorts[whichActors[1]]);
  225. proofActors.push_back(whichActors);
  226. }
  227. for (size_t i = 0; i < proofActors.size(); i++)
  228. clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[proofActors[i][0]], clientPorts[proofActors[i][0]]));
  229. for (size_t i = 0; i < clientWaiters.size(); i++)
  230. clientWaiters[i].join();
  231. proofActors.clear();
  232. clientWaiters.clear();
  233. break;
  234. // Epoch change triggers come in form `E`
  235. case 'E':
  236. trigger_epoch_change(dealerIP, dealerPort);
  237. wait_for_servers_ready(dealerIP, dealerPort);
  238. break;
  239. default:
  240. break;
  241. }
  242. line++;
  243. }
  244. // Don't let ourselves shut down servers and clients until we're sure they're not in the middle of anything else
  245. wait_for_servers_ready(dealerIP, dealerPort);
  246. for (size_t i = 0; i < clientIPs.size(); i++)
  247. wait_for_client_ready(clientIPs[i], clientPorts[i]);
  248. }
  249. /****************************************************
  250. ********* *********
  251. ********* orchestrator private functions *********
  252. ********* *********
  253. ****************************************************/
  254. /*
  255. * TRIGGER EXPERIMENT EVENTS
  256. */
  257. void trigger_epoch_change(
  258. std::string dealer,
  259. int dealerPort)
  260. {
  261. // Epoch changes are triggered via GET request to the correct location
  262. std::stringstream sysString;
  263. std::string data;
  264. sysString << "GET " << TRIGGER_EPOCH_URI << " HTTP/1.1\r\n";
  265. sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
  266. data = sysString.str();
  267. struct mg_connection *conn = NULL;
  268. // Connect to the dealer
  269. while (!conn)
  270. {
  271. conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
  272. if (!conn)
  273. std::cerr << "Couldn't connect to dealer to trigger epoch change." << std::endl;
  274. }
  275. // Make the relevant GET request
  276. mg_write(conn, data.c_str(), data.length());
  277. // Close connection
  278. mg_close_connection(conn);
  279. }
  280. void trigger_vote(
  281. std::string target,
  282. int targetPort)
  283. {
  284. // New votes are triggered via GET request to the correct location
  285. std::stringstream sysString;
  286. std::string data;
  287. sysString << "GET " << TRIGGER_VOTE_URI << " HTTP/1.1\r\n";
  288. sysString << "Host: " << target << ":" << targetPort << "\r\n\r\n";
  289. data = sysString.str();
  290. struct mg_connection *conn = NULL;
  291. // Connect to the client
  292. while (!conn)
  293. {
  294. conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
  295. if (!conn)
  296. std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger new vote." << std::endl;
  297. }
  298. // Make the relevant GET request
  299. mg_write(conn, data.c_str(), data.length());
  300. // Close connection
  301. mg_close_connection(conn);
  302. }
  303. void trigger_reputation_proof(
  304. std::string target,
  305. int targetPort,
  306. std::string verifier,
  307. int verifierPort)
  308. {
  309. // Reputation proofs are triggered via GET request to the correct location (with a parameter for the intended verifier)
  310. std::stringstream sysString;
  311. std::string data;
  312. sysString << "GET " << TRIGGER_REP_URI << "?" << verifier << ":" << verifierPort << " HTTP/1.1\r\n";
  313. sysString << "Host: " << target << "\r\n\r\n";
  314. data = sysString.str();
  315. struct mg_connection *conn = NULL;
  316. // Connect to the client
  317. while (!conn)
  318. {
  319. conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
  320. if (!conn)
  321. std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger reputation proof." << std::endl;
  322. }
  323. // Make the relevant GET request
  324. mg_write(conn, data.c_str(), data.length());
  325. // Close connection
  326. mg_close_connection(conn);
  327. }
  328. /*
  329. * EXECUTOR HELPER
  330. */
  331. std::vector<size_t> generate_random_set(
  332. std::default_random_engine& rng,
  333. size_t size,
  334. size_t maxVal)
  335. {
  336. std::vector<size_t> holder;
  337. for (size_t i = 0; i < maxVal; i++)
  338. holder.push_back(i);
  339. shuffle(holder.begin(), holder.end(), rng);
  340. if (size > holder.size())
  341. size = holder.size();
  342. return std::vector<size_t>(holder.begin(), holder.begin() + size);
  343. }