orchestratorMain.cpp 9.6 KB


  1. #include <iostream>
  2. #include <fstream>
  3. #include <sstream>
  4. #include <string>
  5. #include <cstring>
  6. #include <cstdlib>
  7. #include <vector>
  8. #include <chrono>
  9. #include <thread>
  10. #include "networking.hpp"
  11. using namespace std;
  12. chrono::seconds oneSecond(1);
  13. int clients_websocket_data_handler(
  14. struct mg_connection *conn,
  15. int bits,
  16. char *data,
  17. size_t data_len,
  18. void *user_data)
  19. {
  20. if ((bits & 0xf) == MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE)
  21. return false;
  22. if ((bits & 0xf) == MG_WEBSOCKET_OPCODE_BINARY)
  23. {
  24. struct synchronization_tool *synch = (struct synchronization_tool *) user_data;
  25. unique_lock<mutex> lck(synch->mtx);
  26. synch->val = atoi(data);
  27. return false;
  28. }
  29. if ((bits & 0xf) == MG_WEBSOCKET_OPCODE_DATACOMPLETE)
  30. return false;
  31. std::cerr << "Unknown response when trying to get update lock." << std::endl;
  32. return false;
  33. }
  34. int start_remote_actor(string target, bool server, string output)
  35. {
  36. stringstream buffer;
  37. string command;
  38. buffer << "ssh tmgurtle@" << target << " \"screen \'~/prsona/prsona/bin/"
  39. << (server ? "startServer.sh " : "startClient.sh ") << output << "\'\"" ;
  40. command = buffer.str();
  41. return system(command.c_str());
  42. }
  43. void wait_for_servers_ready(string dealer)
  44. {
  45. bool flag = false;
  46. while (!flag)
  47. {
  48. this_thread::sleep_for(oneSecond);
  49. stringstream sysString;
  50. string data;
  51. char buffer[255];
  52. struct mg_connection *conn =
  53. mg_connect_client(
  54. dealer.c_str(),
  55. PRSONA_PORT,
  56. USE_SSL,
  57. NULL,
  58. 0);
  59. if (!conn)
  60. continue;
  61. sysString << "GET " << EPOCH_READY_URI << " HTTP/1.1\r\n";
  62. sysString << "Host: " << dealer << "\r\n\r\n";
  63. data = sysString.str();
  64. mg_write(conn, data.c_str(), data.length());
  65. size_t readLen = mg_read(conn, (uint8_t*) buffer, 254);
  66. buffer[readLen] = 0;
  67. if (strstr(buffer, "200"))
  68. flag = true;
  69. mg_close_connection(conn);
  70. }
  71. }
  72. void wait_for_clients_ready(string dealer, size_t numClients)
  73. {
  74. struct synchronization_tool numClientsSync;
  75. bool flag = false;
  76. while (!flag)
  77. {
  78. this_thread::sleep_for(oneSecond);
  79. stringstream sysString;
  80. string data;
  81. struct mg_connection *conn =
  82. mg_connect_websocket_client(
  83. dealer.c_str(),
  84. PRSONA_PORT,
  85. USE_SSL,
  86. NULL,
  87. 0,
  88. NUM_CLIENTS_URI,
  89. "null",
  90. clients_websocket_data_handler,
  91. synchro_websocket_close_handler,
  92. &numClientsSync);
  93. if (!conn)
  94. continue;
  95. unique_lock<mutex> lck(numClientsSync.mtx);
  96. numClientsSync.val = 0;
  97. numClientsSync.val2 = 1;
  98. mg_websocket_client_write(
  99. conn,
  100. MG_WEBSOCKET_OPCODE_DATACOMPLETE,
  101. "",
  102. 0);
  103. while (numClientsSync.val2)
  104. numClientsSync.cv.wait(lck);
  105. mg_close_connection(conn);
  106. if (numClientsSync.val == numClients)
  107. flag = true;
  108. }
  109. }
  110. void trigger_epoch(string dealer)
  111. {
  112. this_thread::sleep_for(oneSecond);
  113. bool flag = false;
  114. while (!flag)
  115. {
  116. stringstream sysString;
  117. string data;
  118. struct mg_connection *conn =
  119. mg_connect_client(
  120. dealer.c_str(),
  121. PRSONA_PORT,
  122. USE_SSL,
  123. NULL,
  124. 0);
  125. if (!conn)
  126. continue;
  127. sysString << "GET " << TRIGGER_EPOCH_URI << " HTTP/1.1\r\n";
  128. sysString << "Host: " << dealer << "\r\n\r\n";
  129. data = sysString.str();
  130. mg_write(conn, data.c_str(), data.length());
  131. mg_close_connection(conn);
  132. flag = true;
  133. }
  134. wait_for_servers_ready(dealer);
  135. }
  136. void trigger_vote(string target)
  137. {
  138. bool flag = false;
  139. while (!flag)
  140. {
  141. stringstream sysString;
  142. string data;
  143. struct mg_connection *conn =
  144. mg_connect_client(
  145. target.c_str(),
  146. PRSONA_PORT,
  147. USE_SSL,
  148. NULL,
  149. 0);
  150. if (!conn)
  151. continue;
  152. sysString << "GET " << TRIGGER_VOTE_URI << " HTTP/1.1\r\n";
  153. sysString << "Host: " << target << "\r\n\r\n";
  154. data = sysString.str();
  155. mg_write(conn, data.c_str(), data.length());
  156. mg_close_connection(conn);
  157. flag = true;
  158. }
  159. }
  160. void trigger_reputation_proof(string target, string verifier)
  161. {
  162. bool flag = false;
  163. while (!flag)
  164. {
  165. stringstream sysString;
  166. string data;
  167. struct mg_connection *conn =
  168. mg_connect_client(
  169. target.c_str(),
  170. PRSONA_PORT,
  171. USE_SSL,
  172. NULL,
  173. 0);
  174. if (!conn)
  175. continue;
  176. sysString << "GET " << TRIGGER_REP_URI << verifier << " HTTP/1.1\r\n";
  177. sysString << "Host: " << target << "\r\n\r\n";
  178. data = sysString.str();
  179. mg_write(conn, data.c_str(), data.length());
  180. mg_close_connection(conn);
  181. flag = true;
  182. }
  183. }
  184. void execute_experiment(string dealer)
  185. {
  186. size_t line = 1;
  187. char buffer[128];
  188. ifstream commands("commands.cfg");
  189. while (!commands.eof())
  190. {
  191. commands.getline(buffer, 128);
  192. if (strlen(buffer) == 0)
  193. {
  194. line++;
  195. continue;
  196. }
  197. cout << "Command " << line << ": " << string(buffer) << endl;
  198. switch(buffer[0])
  199. {
  200. case 'V':
  201. trigger_vote(string(buffer + 2));
  202. break;
  203. case 'R':
  204. char *target, *verifier;
  205. target = strtok(buffer, " ");
  206. verifier = strtok(NULL, " ");
  207. trigger_reputation_proof(string(target), string(verifier));
  208. break;
  209. case 'E':
  210. trigger_epoch(dealer);
  211. break;
  212. default:
  213. break;
  214. }
  215. line++;
  216. }
  217. }
  218. int main(int argc, char* argv[])
  219. {
  220. string experimentOutput = random_string(8);
  221. #if USE_SSL
  222. mg_init_library(0);
  223. #else
  224. mg_init_library(MG_FEATURES_SSL);
  225. #endif
  226. vector<string> serverIPs, clientIPs;
  227. string dealerIP;
  228. char buffer[40];
  229. ifstream serverConfig("serverIPs.cfg");
  230. while (!serverConfig.eof())
  231. {
  232. serverConfig.getline(buffer, 40);
  233. if (strlen(buffer) > 0)
  234. serverIPs.push_back(string(buffer));
  235. }
  236. ifstream clientConfig("clientIPs.cfg");
  237. while (!clientConfig.eof())
  238. {
  239. clientConfig.getline(buffer, 40);
  240. if (strlen(buffer) > 0)
  241. clientIPs.push_back(string(buffer));
  242. }
  243. ifstream dealerConfig("dealerIP.cfg");
  244. while (!dealerConfig.eof())
  245. {
  246. dealerConfig.getline(buffer, 40);
  247. if (strlen(buffer) > 0)
  248. dealerIP = buffer;
  249. }
  250. size_t numServers = serverIPs.size();
  251. size_t numClients = clientIPs.size();
  252. cout << "This experiment is running with output code: " << experimentOutput << endl;
  253. cout << "Starting BGN dealer server." << endl;
  254. vector<thread> serverStartup, clientStartup;
  255. serverStartup.push_back(thread(start_remote_actor, dealerIP, true, experimentOutput));
  256. this_thread::sleep_for(oneSecond);
  257. cout << "Starting other servers." << endl;
  258. for (size_t i = 0; i < numServers; i++)
  259. {
  260. if (serverIPs[i] == dealerIP)
  261. continue;
  262. serverStartup.push_back(thread(start_remote_actor, serverIPs[i], true, experimentOutput));
  263. }
  264. cout << "Waiting for confirmation that servers are ready to continue." << endl;
  265. for (size_t i = 0; i < numServers; i++)
  266. serverStartup[i].join();
  267. wait_for_servers_ready(dealerIP);
  268. cout << "Starting clients." << endl;
  269. for (size_t i = 0; i < numClients; i++)
  270. clientStartup.push_back(thread(start_remote_actor, clientIPs[i], false, experimentOutput));
  271. cout << "Waiting for confirmation that servers have all clients logged." << endl;
  272. for (size_t i = 0; i < numClients; i++)
  273. clientStartup[i].join();
  274. wait_for_clients_ready(dealerIP, numClients);
  275. cout << "Beginning experiment." << endl;
  276. execute_experiment(dealerIP);
  277. cout << "Finishing experiment." << endl;
  278. cout << "Sending shutdown commands to clients." << endl;
  279. for (size_t i = 0; i < clientIPs.size(); i++)
  280. {
  281. stringstream sysString;
  282. string data;
  283. struct mg_connection *conn =
  284. mg_connect_client(
  285. clientIPs[i].c_str(),
  286. PRSONA_PORT,
  287. USE_SSL,
  288. NULL,
  289. 0);
  290. sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n";
  291. sysString << "Host: " << clientIPs[i] << "\r\n\r\n";
  292. data = sysString.str();
  293. mg_write(conn, data.c_str(), data.length());
  294. mg_close_connection(conn);
  295. }
  296. cout << "Sending shutdown commands to servers." << endl;
  297. for (size_t i = 0; i < serverIPs.size(); i++)
  298. {
  299. stringstream sysString;
  300. string data;
  301. struct mg_connection *conn =
  302. mg_connect_client(
  303. serverIPs[i].c_str(),
  304. PRSONA_PORT,
  305. USE_SSL,
  306. NULL,
  307. 0);
  308. sysString << "GET " << EXIT_URI << " HTTP/1.1\r\n";
  309. sysString << "Host: " << serverIPs[i] << "\r\n\r\n";
  310. data = sysString.str();
  311. mg_write(conn, data.c_str(), data.length());
  312. mg_close_connection(conn);
  313. }
  314. return 0;
  315. }