networkOrchestrator.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472
  1. #include <iostream>
  2. #include <fstream>
  3. #include <sstream>
  4. #include <algorithm>
  5. #include <cstdlib>
  6. #include <thread>
  7. #include <unistd.h>
  8. #include <sys/types.h>
  9. #include <sys/wait.h>
  10. #include "networkOrchestrator.hpp"
  11. /***************************************************
  12. ********* *********
  13. ********* orchestrator public functions *********
  14. ********* *********
  15. ***************************************************/
  16. /*
  17. * START UP AND SHUT DOWN INSTANCES
  18. */
  19. void start_remote_actor(
  20. const std::string& target,
  21. bool server,
  22. const std::string& id,
  23. const std::string& output,
  24. size_t lambda,
  25. bool maliciousServers)
  26. {
  27. const char* sshFile = "/usr/bin/ssh";
  28. const char* serverFile = "bin/server";
  29. const char* clientFile = "bin/client";
  30. const char* calledFile = (target != "self" && !target.empty() ? sshFile : (server ? serverFile : clientFile));
  31. char *argv[6];
  32. char fileBuffer[13];
  33. strncpy(fileBuffer, calledFile, 13);
  34. argv[0] = fileBuffer;
  35. char flagBuffer[3];
  36. strncpy(flagBuffer, "-n", 3);
  37. char targetBuffer[64];
  38. strncpy(targetBuffer, target.c_str(), 64);
  39. std::string fullArgString;
  40. char fullArgBuffer[256];
  41. char idBuffer[64];
  42. strncpy(idBuffer, id.c_str(), 64);
  43. char outputBuffer[128];
  44. strncpy(outputBuffer, output.c_str(), 128);
  45. std::stringstream lambdaStream;
  46. lambdaStream << lambda;
  47. char lambdaBuffer[3];
  48. strncpy(lambdaBuffer, lambdaStream.str().c_str(), 3);
  49. char maliciousBuffer[3];
  50. if (maliciousServers)
  51. strncpy(maliciousBuffer, "T", 2);
  52. else
  53. strncpy(maliciousBuffer, "F", 2);
  54. if (target != "self" && !target.empty())
  55. {
  56. argv[1] = flagBuffer;
  57. argv[2] = targetBuffer;
  58. argv[3] = fullArgBuffer;
  59. argv[4] = NULL;
  60. }
  61. else
  62. {
  63. argv[1] = idBuffer;
  64. argv[2] = outputBuffer;
  65. argv[3] = lambdaBuffer;
  66. argv[4] = maliciousBuffer;
  67. argv[5] = NULL;
  68. }
  69. int pid = fork();
  70. if (pid < 0)
  71. exit(1);
  72. if (pid == 0)
  73. execv(calledFile, argv);
  74. }
  75. void shut_down_remote_actors(
  76. const std::vector<std::string>& relevantIPs,
  77. const std::vector<int>& relevantPorts)
  78. {
  79. for (size_t i = 0; i < relevantIPs.size(); i++)
  80. {
  81. // Shut downs are triggered by a GET request to the correct location
  82. std::stringstream sysString;
  83. std::string data;
  84. sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n";
  85. sysString << "Host: " << relevantIPs[i] << ":" << relevantPorts[i] << "\r\n\r\n";
  86. data = sysString.str();
  87. struct mg_connection *conn = NULL;
  88. // Connect to the instance
  89. while (!conn)
  90. {
  91. conn = mg_connect_client(relevantIPs[i].c_str(), relevantPorts[i], USE_SSL, NULL, 0);
  92. if (!conn)
  93. std::cerr << "Couldn't connect to instance at " << relevantIPs[i] << ":" << relevantPorts[i] << " for shut down." << std::endl;
  94. }
  95. // Make correct GET request
  96. mg_write(conn, data.c_str(), data.length());
  97. // Close connection
  98. mg_close_connection(conn);
  99. }
  100. }
  101. /*
  102. * SYNCHRONIZATION
  103. */
  104. void wait_for_servers_ready(
  105. std::string dealer,
  106. int dealerPort)
  107. {
  108. // Requesting information about servers being ready is done via a GET request
  109. std::stringstream sysString;
  110. std::string data;
  111. sysString << "GET " << EPOCH_READY_URI << " HTTP/1.1\r\n";
  112. sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
  113. data = sysString.str();
  114. bool ready = false;
  115. while (!ready)
  116. {
  117. struct mg_connection *conn = NULL;
  118. // Connect to the dealer
  119. while (!conn)
  120. {
  121. conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
  122. if (!conn)
  123. {
  124. std::cerr << "Couldn't make connection while waiting for servers to be ready." << std::endl;
  125. std::this_thread::sleep_for(HALF_SECOND);
  126. }
  127. }
  128. // Make the correct GET request
  129. mg_write(conn, data.c_str(), data.length());
  130. // Wait for a response
  131. mg_get_response(conn, NULL, 0, 250);
  132. const struct mg_response_info *info = mg_get_response_info(conn);
  133. // Close connection
  134. mg_close_connection(conn);
  135. // If the dealer says it's ready, then we can move on
  136. if (info->status_code == 200)
  137. ready = true;
  138. }
  139. }
  140. void wait_for_clients_created(
  141. std::string dealer,
  142. int dealerPort,
  143. size_t numClients)
  144. {
  145. bool ready = false;
  146. while (!ready)
  147. {
  148. struct synchronization_tool sync;
  149. struct mg_connection *conn = NULL;
  150. // Connect to the dealer
  151. std::unique_lock<std::mutex> lck(sync.mtx);
  152. sync.val = 0;
  153. sync.val2 = 0;
  154. while (!conn)
  155. {
  156. conn = mg_connect_websocket_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0, REQUEST_NUM_CLIENTS_URI, "null", clients_websocket_data_handler, synchro_websocket_close_handler, &sync);
  157. if (!conn)
  158. {
  159. std::cerr << "Couldn't make connection while waiting for clients to be ready." << std::endl;
  160. std::this_thread::sleep_for(HALF_SECOND);
  161. }
  162. }
  163. // Tell the dealer we're ready for its response
  164. mg_websocket_client_write(conn, MG_WEBSOCKET_OPCODE_DATACOMPLETE, "", 0);
  165. // Wait for that response
  166. while (!sync.val2)
  167. sync.cv.wait(lck);
  168. // Close connection
  169. mg_close_connection(conn);
  170. // If the dealer says it's ready, then we can move on
  171. if (sync.val == numClients)
  172. ready = true;
  173. }
  174. }
  175. void wait_for_client_ready(
  176. std::string client,
  177. int clientPort)
  178. {
  179. // Requesting information about clients being ready is done via a GET request
  180. std::stringstream sysString;
  181. std::string data;
  182. sysString << "GET " << CLIENT_READY_URI << " HTTP/1.1\r\n";
  183. sysString << "Host: " << client << ":" << clientPort << "\r\n\r\n";
  184. data = sysString.str();
  185. bool ready = false;
  186. while (!ready)
  187. {
  188. struct mg_connection *conn = NULL;
  189. // Connect to the client
  190. while (!conn)
  191. {
  192. conn = mg_connect_client(client.c_str(), clientPort, USE_SSL, NULL, 0);
  193. if (!conn)
  194. {
  195. std::cerr << "Couldn't make connection while waiting for client (" << client << ":" << clientPort << ") to be ready." << std::endl;
  196. std::this_thread::sleep_for(HALF_SECOND);
  197. }
  198. }
  199. // Make the correct GET request
  200. mg_write(conn, data.c_str(), data.length());
  201. // Wait for a response
  202. mg_get_response(conn, NULL, 0, 250);
  203. const struct mg_response_info *info = mg_get_response_info(conn);
  204. // Close connection
  205. mg_close_connection(conn);
  206. // If the client says it's ready, then we can move on
  207. if (info->status_code == 200)
  208. ready = true;
  209. }
  210. }
  211. /*
  212. * RUN EXPERIMENT
  213. */
  214. void execute_experiment(
  215. std::default_random_engine& rng,
  216. std::string dealerIP,
  217. int dealerPort,
  218. std::vector<std::string> serverIPs,
  219. std::vector<int> serverPorts,
  220. std::vector<std::string> clientIPs,
  221. std::vector<int> clientPorts,
  222. const char *filename)
  223. {
  224. size_t line = 1;
  225. // Iterate across each line in the command file, which contains one command per line
  226. char buffer[128];
  227. std::ifstream commands(filename);
  228. while (!commands.eof())
  229. {
  230. commands.getline(buffer, 128);
  231. if (strlen(buffer) == 0)
  232. {
  233. line++;
  234. continue;
  235. }
  236. std::cout << "Command " << line << ": " << std::string(buffer) << std::endl;
  237. std::vector<size_t> whichActors;
  238. std::vector<std::vector<size_t>> proofActors;
  239. std::vector<std::thread> clientWaiters;
  240. int numVoters, numProofs;
  241. // The first character of each command tells us which it is
  242. switch(buffer[0])
  243. {
  244. // Vote triggers come in form `V <numVoters>`
  245. case 'V':
  246. numVoters = atoi(strtok(buffer + 1, " "));
  247. whichActors = generate_random_set(rng, numVoters, clientIPs.size());
  248. for (size_t i = 0; i < whichActors.size(); i++)
  249. trigger_vote(clientIPs[whichActors[i]], clientPorts[whichActors[i]]);
  250. std::this_thread::sleep_for(HALF_SECOND);
  251. for (size_t i = 0; i < whichActors.size(); i++)
  252. clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[whichActors[i]], clientPorts[whichActors[i]]));
  253. for (size_t i = 0; i < clientWaiters.size(); i++)
  254. clientWaiters[i].join();
  255. clientWaiters.clear();
  256. break;
  257. // Reputation proof triggers come in form `R <numProofs>`
  258. case 'R':
  259. numProofs = atoi(strtok(buffer + 1, " "));
  260. for (int i = 0; i < numProofs; i++)
  261. {
  262. whichActors = generate_random_set(rng, 2, clientIPs.size());
  263. trigger_reputation_proof(
  264. clientIPs[whichActors[0]],
  265. clientPorts[whichActors[0]],
  266. clientIPs[whichActors[1]],
  267. clientPorts[whichActors[1]]);
  268. proofActors.push_back(whichActors);
  269. }
  270. std::this_thread::sleep_for(HALF_SECOND);
  271. for (size_t i = 0; i < proofActors.size(); i++)
  272. clientWaiters.push_back(std::thread(wait_for_client_ready, clientIPs[proofActors[i][0]], clientPorts[proofActors[i][0]]));
  273. for (size_t i = 0; i < clientWaiters.size(); i++)
  274. clientWaiters[i].join();
  275. proofActors.clear();
  276. clientWaiters.clear();
  277. break;
  278. // Epoch change triggers come in form `E`
  279. case 'E':
  280. trigger_epoch_change(dealerIP, dealerPort);
  281. std::this_thread::sleep_for(HALF_SECOND);
  282. wait_for_servers_ready(dealerIP, dealerPort);
  283. break;
  284. default:
  285. break;
  286. }
  287. line++;
  288. }
  289. // Don't let ourselves shut down servers and clients until we're sure they're not in the middle of anything else
  290. wait_for_servers_ready(dealerIP, dealerPort);
  291. for (size_t i = 0; i < clientIPs.size(); i++)
  292. wait_for_client_ready(clientIPs[i], clientPorts[i]);
  293. }
  294. /****************************************************
  295. ********* *********
  296. ********* orchestrator private functions *********
  297. ********* *********
  298. ****************************************************/
  299. /*
  300. * TRIGGER EXPERIMENT EVENTS
  301. */
  302. void trigger_epoch_change(
  303. std::string dealer,
  304. int dealerPort)
  305. {
  306. // Epoch changes are triggered via GET request to the correct location
  307. std::stringstream sysString;
  308. std::string data;
  309. sysString << "GET " << TRIGGER_EPOCH_URI << " HTTP/1.1\r\n";
  310. sysString << "Host: " << dealer << ":" << dealerPort << "\r\n\r\n";
  311. data = sysString.str();
  312. struct mg_connection *conn = NULL;
  313. // Connect to the dealer
  314. while (!conn)
  315. {
  316. conn = mg_connect_client(dealer.c_str(), dealerPort, USE_SSL, NULL, 0);
  317. if (!conn)
  318. std::cerr << "Couldn't connect to dealer to trigger epoch change." << std::endl;
  319. }
  320. // Make the relevant GET request
  321. mg_write(conn, data.c_str(), data.length());
  322. // Close connection
  323. mg_close_connection(conn);
  324. }
  325. void trigger_vote(
  326. std::string target,
  327. int targetPort)
  328. {
  329. // New votes are triggered via GET request to the correct location
  330. std::stringstream sysString;
  331. std::string data;
  332. sysString << "GET " << TRIGGER_VOTE_URI << " HTTP/1.1\r\n";
  333. sysString << "Host: " << target << ":" << targetPort << "\r\n\r\n";
  334. data = sysString.str();
  335. struct mg_connection *conn = NULL;
  336. // Connect to the client
  337. while (!conn)
  338. {
  339. conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
  340. if (!conn)
  341. std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger new vote." << std::endl;
  342. }
  343. // Make the relevant GET request
  344. mg_write(conn, data.c_str(), data.length());
  345. // Close connection
  346. mg_close_connection(conn);
  347. }
  348. void trigger_reputation_proof(
  349. std::string target,
  350. int targetPort,
  351. std::string verifier,
  352. int verifierPort)
  353. {
  354. // Reputation proofs are triggered via GET request to the correct location (with a parameter for the intended verifier)
  355. std::stringstream sysString;
  356. std::string data;
  357. sysString << "GET " << TRIGGER_REP_URI << "?" << verifier << ":" << verifierPort << " HTTP/1.1\r\n";
  358. sysString << "Host: " << target << "\r\n\r\n";
  359. data = sysString.str();
  360. struct mg_connection *conn = NULL;
  361. // Connect to the client
  362. while (!conn)
  363. {
  364. conn = mg_connect_client(target.c_str(), targetPort, USE_SSL, NULL, 0);
  365. if (!conn)
  366. std::cerr << "Couldn't connect to client at " << target << ":" << targetPort << " to trigger reputation proof." << std::endl;
  367. }
  368. // Make the relevant GET request
  369. mg_write(conn, data.c_str(), data.length());
  370. // Close connection
  371. mg_close_connection(conn);
  372. }
  373. /*
  374. * EXECUTOR HELPER
  375. */
  376. std::vector<size_t> generate_random_set(
  377. std::default_random_engine& rng,
  378. size_t size,
  379. size_t maxVal)
  380. {
  381. std::vector<size_t> holder;
  382. for (size_t i = 0; i < maxVal; i++)
  383. holder.push_back(i);
  384. shuffle(holder.begin(), holder.end(), rng);
  385. if (size > holder.size())
  386. size = holder.size();
  387. return std::vector<size_t>(holder.begin(), holder.begin() + size);
  388. }