#include #include #include #include #include #include #include "networking.hpp" /********************************************* **** **** **** "public" generic helper functions **** **** **** *********************************************/ void initialize_prsona_classes() { Scalar::init(); PrsonaBase::init(); PrsonaBase::set_client_malicious(); } char *set_temp_filename( std::default_random_engine& rng, struct mg_connection *conn) { std::string filename = random_string(rng, TMP_FILE_SIZE); char *c_filename = new char[TMP_FILE_SIZE+TMP_DIR_SIZE+1]; strncpy(c_filename, TMP_DIR, TMP_DIR_SIZE); for (size_t i = 0; i < TMP_FILE_SIZE; i++) c_filename[i + TMP_DIR_SIZE] = filename[i]; c_filename[TMP_DIR_SIZE + TMP_FILE_SIZE] = 0; if (conn) mg_set_user_connection_data(conn, c_filename); return c_filename; } void load_multiple_instances_config( std::vector& relevantIPs, std::vector& relevantPorts, const char *filename) { relevantIPs.clear(); relevantPorts.clear(); char buffer[46], *helper; std::ifstream configFile(filename); while (!configFile.eof()) { configFile.getline(buffer, 46); if (strlen(buffer) > 0) { helper = buffer; if (strchr(helper, ':')) // File specifies a port { helper = strtok(helper, ":"); relevantIPs.push_back(std::string(helper)); helper = strtok(NULL, ":"); relevantPorts.push_back(atoi(helper)); } else // We use a default port { relevantIPs.push_back(std::string(helper)); relevantPorts.push_back(atoi(DEFAULT_PRSONA_PORT_STR)); } } } } void load_single_instance_config( std::string& relevantIP, std::string& relevantPortStr, int& relevantPort, const char *filename) { char buffer[46], *helper; std::ifstream configFile(filename); while (!configFile.eof()) { configFile.getline(buffer, 46); if (strlen(buffer) > 0) { helper = buffer; if (strchr(helper, ':')) // File specifies a port { helper = strtok(helper, ":"); relevantIP = helper; helper = strtok(NULL, ":"); relevantPortStr = helper; relevantPort = stoi(relevantPortStr); } else // We use default port { relevantIP = helper; relevantPortStr = DEFAULT_PRSONA_PORT_STR; relevantPort = stoi(relevantPortStr); } } } } std::vector get_server_log_data( const struct mg_context *ctx) { std::vector retval; char buffer[4096]; mg_get_context_info(ctx, buffer, 4096); retval.push_back(parse_log_for_data(buffer, "read")); retval.push_back(parse_log_for_data(buffer, "written")); return retval; } std::vector get_conn_log_data( const struct mg_context *ctx, bool receivedData) { std::vector retval; char buffer[4096]; mg_get_context_info(ctx, buffer, 4096); if (receivedData) { retval.push_back(parse_log_for_data(buffer, "maxUsed")); retval.push_back(0); } else { retval.push_back(0); retval.push_back(parse_log_for_data(buffer, "maxUsed")); } return retval; } void write_log_data( std::mutex& outputMtx, const std::string& outputFilename, const std::vector& timingData, const std::vector& bandwidthData) { std::unique_lock lck(outputMtx); FILE *outputFile = fopen(outputFilename.c_str(), "a"); fprintf(outputFile, "%f,%f,%zu,%zu\n", timingData[0], timingData[1], bandwidthData[0], bandwidthData[1]); fclose(outputFile); } void write_special_log_data( std::mutex& outputMtx, const std::string& outputFilename, const std::vector& timingData, const std::vector& bandwidthData, bool corruption) { std::unique_lock lck(outputMtx); FILE *outputFile = fopen(outputFilename.c_str(), "a"); if (!corruption) fprintf(outputFile, "%f,%f,%zu,%zu\n", timingData[0], timingData[1], bandwidthData[0], bandwidthData[1]); else fprintf(outputFile, "%f*,%f*,%zu,%zu\n", timingData[0], timingData[1], bandwidthData[0], bandwidthData[1]); fclose(outputFile); } void write_usage_data( std::mutex& outputMtx, const std::string& outputFilename) { std::unique_lock lck(outputMtx); FILE *outputFile = fopen(outputFilename.c_str(), "a"); unsigned long vsize = mem_usage(); fprintf(outputFile, "%lu\n", vsize); fclose(outputFile); } /*********************************************************** **** **** **** "private" functions to help the generic helpers **** **** **** ***********************************************************/ std::string random_string( std::default_random_engine& rng, size_t length) { const char charset[] = "0123456789_-" "ABCDEFGHIJKLMNOPQRSTUVWXYZ" "abcdefghijklmnopqrstuvwxyz"; const size_t max_index = (sizeof(charset) - 1); std::uniform_int_distribution dist(0, max_index - 1); auto randchar = [&]() -> char { return charset[ dist(rng) ]; }; std::string retval(length, 0); std::generate_n(retval.begin(), length, randchar); return retval; } size_t parse_log_for_data(const char *input, const char *key) { size_t length = strlen(input); char *copy = new char[length + 1]; strncpy(copy, input, length); copy[length] = 0; char *pos = strstr(copy, key) + strlen(key); pos = strtok(pos, "{}:,\" \n"); size_t retval = strtoul(pos, NULL, 10); delete [] copy; return retval; } unsigned long mem_usage() { std::ifstream stat_stream("/proc/self/stat", std::ios_base::in); std::string pid, comm, state, ppid, pgrp, session, tty_nr, tpgid, flags, minflt, cminflt, majflt, cmajflt, utime, stime, cutime, cstime, priority, nice, O, itrealvalue, starttime; unsigned long vsize; stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >> minflt >> cminflt >> majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >> nice >> O >> itrealvalue >> starttime >> vsize; stat_stream.close(); return vsize; } /*************************************** **** **** **** websocket handler functions **** **** **** ***************************************/ /* * NULL */ int empty_websocket_data_handler( struct mg_connection *conn, int bits, char *data, size_t data_len, void *user_data) { return false; } void empty_websocket_close_handler( const struct mg_connection *conn, void *user_data) { /* */ } /* * SYNCHRONIZATION */ int synchro_websocket_data_handler( struct mg_connection *conn, int bits, char *data, size_t data_len, void *user_data) { struct synchronization_tool *sync = (struct synchronization_tool *) user_data; std::unique_lock lck(sync->mtx, std::defer_lock); switch (bits & 0xf) { case MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE: break; // Responder has indicated receipt of submitted data case MG_WEBSOCKET_OPCODE_DATACOMPLETE: lck.lock(); sync->val++; break; // Something strange has happened default: std::cerr << "Unknown packet type received. Failing." << std::endl; break; } return false; } void synchro_websocket_close_handler( const struct mg_connection *conn, void *user_data) { struct synchronization_tool *synch = (struct synchronization_tool *) user_data; std::unique_lock lck(synch->mtx); synch->val2 = 1; synch->cv.notify_all(); } /* * RECEIVE SERIALIZED DATA */ int file_websocket_data_handler( struct mg_connection *conn, int bits, char *data, size_t data_len, void *user_data) { struct synchronization_tool *sync = (struct synchronization_tool *) user_data; char *filename = (char *) mg_get_user_connection_data(conn); FILE *currFile = NULL; std::unique_lock lck(sync->mtx, std::defer_lock); switch (bits & 0xf) { // Responder has indicated they have sent all relevant data case MG_WEBSOCKET_OPCODE_DATACOMPLETE: case MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE: break; // Responder has sent more data (which may theoretically be broken up into multiple packets) case MG_WEBSOCKET_OPCODE_BINARY: case MG_WEBSOCKET_OPCODE_CONTINUATION: lck.lock(); currFile = fopen(filename, "ab"); fwrite(data, sizeof(char), data_len, currFile); fclose(currFile); return true; // Something strange has happened default: std::cerr << "Unknown packet type received. Failing." << std::endl; break; } return false; } void file_websocket_close_handler( const struct mg_connection *conn, void *user_data) { struct synchronization_tool *sync = (struct synchronization_tool *) user_data; std::unique_lock lck(sync->mtx); sync->val = 1; sync->val2 = 1; sync->cv.notify_all(); } /* * SYNCHRONIZATION AND RECEIVE SERIALIZED DATA */ int epoch_websocket_data_handler( struct mg_connection *conn, int bits, char *data, size_t data_len, void *user_data) { struct synchronization_tool *sync = (struct synchronization_tool *) user_data; char *filename = (char *) mg_get_user_connection_data(conn); FILE *currFile = NULL; std::unique_lock lck(sync->mtx, std::defer_lock); switch (bits & 0xf) { case MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE: break; // Responder has indicated they have sent all relevant data case MG_WEBSOCKET_OPCODE_DATACOMPLETE: lck.lock(); sync->val++; break; // Responder has sent more data (which may theoretically be broken up into multiple packets) case MG_WEBSOCKET_OPCODE_BINARY: case MG_WEBSOCKET_OPCODE_CONTINUATION: lck.lock(); currFile = fopen(filename, "ab"); fwrite(data, sizeof(char), data_len, currFile); fclose(currFile); return true; // Something strange has happened default: std::cerr << "Unknown packet type received. Failing." << std::endl; break; } return false; } void epoch_websocket_close_handler( const struct mg_connection *conn, void *user_data) { struct synchronization_tool *sync = (struct synchronization_tool *) user_data; std::unique_lock lck(sync->mtx); sync->val2 = 1; sync->cv.notify_all(); } /* * SPECIAL FOR HANDLING UNUSUAL DATA */ int clients_websocket_data_handler( struct mg_connection *conn, int bits, char *data, size_t data_len, void *user_data) { struct synchronization_tool *sync = (struct synchronization_tool *) user_data; std::unique_lock lck(sync->mtx, std::defer_lock); switch (bits & 0xf) { // Responder has indicated they have sent all relevant data case MG_WEBSOCKET_OPCODE_DATACOMPLETE: case MG_WEBSOCKET_OPCODE_CONNECTION_CLOSE: break; // Responder has sent data case MG_WEBSOCKET_OPCODE_BINARY: lck.lock(); if (data_len == sizeof(sync->val)) sync->val = *((size_t *) data); break; // Something strange has happened default: std::cerr << "Unknown packet type received. Failing." << std::endl; break; } return false; } /******************************************** **** **** **** Generic handler member functions **** **** **** ********************************************/ /* * EXIT SYNCHRONIZATION HANDLER */ RemoteControlHandler::RemoteControlHandler( struct synchronization_tool *sync) : sync(sync) { /* */ } RemoteControlHandler::RemoteControlHandler( struct synchronization_tool *sync, const std::string& message) : sync(sync), message(message) { /* */ } bool RemoteControlHandler::handleGet( CivetServer *server, struct mg_connection *conn) { std::unique_lock lck(sync->mtx); mg_printf(conn, "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n\r\n"); if (message.empty()) mg_printf(conn, "Event triggered.\n"); else mg_printf(conn, "%s\n", message.c_str()); std::cout << "Hi hi!!!" sync->val++; sync->cv.notify_all(); return true; } /* * EXPERIMENT EVENT SYNCHRONIZATION HANDLER */ AltRemoteControlHandler::AltRemoteControlHandler( size_t value, struct synchronization_tool *sync) : value(value), sync(sync) { /* */ } AltRemoteControlHandler::AltRemoteControlHandler( size_t value, struct synchronization_tool *sync, const std::string& message) : value(value), sync(sync), message(message) { /* */ } bool AltRemoteControlHandler::handleGet( CivetServer *server, struct mg_connection *conn) { std::unique_lock lck(sync->mtx); const struct mg_request_info *info = mg_get_request_info(conn); if (info->query_string) query = info->query_string; mg_printf(conn, "HTTP/1.1 200 OK\r\n" "Content-Type: text/plain\r\n" "Connection: close\r\n\r\n"); if (message.empty()) mg_printf(conn, "Event triggered.\n"); else mg_printf(conn, "%s\n", message.c_str()); sync->val2 = value; sync->cv.notify_all(); return true; } std::string AltRemoteControlHandler::getQuery() const { return query; }