|
@@ -70,6 +70,7 @@ void displayMessage(unsigned char *msg, uint16_t msg_size) {
|
|
|
sid = *((clientid_t*) ptr);
|
|
|
ptr+=sizeof(sid);
|
|
|
rid = *((clientid_t*) ptr);
|
|
|
+ ptr+=sizeof(sid);
|
|
|
printf("Sender ID: %d, Receiver ID: %d, Token: N/A\n", sid, rid );
|
|
|
printf("Message: ");
|
|
|
for(int j = 0; j<msg_size - sizeof(sid)*2; j++) {
|
|
@@ -127,7 +128,8 @@ static inline uint32_t ptMsgBundleSize(uint16_t priv_out, uint16_t msg_size) {
|
|
|
|
|
|
bool config_parse(Config &config, const std::string configstr,
|
|
|
std::vector<NodeConfig> &ingestion_nodes,
|
|
|
- std::vector<NodeConfig> &storage_nodes)
|
|
|
+ std::vector<NodeConfig> &storage_nodes,
|
|
|
+ std::vector<uint16_t> &storage_map)
|
|
|
{
|
|
|
bool found_params = false;
|
|
|
bool ret = true;
|
|
@@ -136,6 +138,7 @@ bool config_parse(Config &config, const std::string configstr,
|
|
|
boost::property_tree::ptree conftree;
|
|
|
|
|
|
read_json(configstream, conftree);
|
|
|
+ uint16_t node_num = 0;
|
|
|
|
|
|
for (auto & entry : conftree) {
|
|
|
if (!entry.first.compare("params")) {
|
|
@@ -165,6 +168,7 @@ bool config_parse(Config &config, const std::string configstr,
|
|
|
}
|
|
|
found_params = true;
|
|
|
} else if (!entry.first.compare("nodes")) {
|
|
|
+
|
|
|
for (auto & node : entry.second) {
|
|
|
NodeConfig nc;
|
|
|
// All nodes need to be assigned their role in manifest.yaml
|
|
@@ -197,7 +201,9 @@ bool config_parse(Config &config, const std::string configstr,
|
|
|
}
|
|
|
if(nc.roles & ROLE_STORAGE) {
|
|
|
storage_nodes.push_back(std::move(nc));
|
|
|
+ storage_map.push_back(node_num);
|
|
|
}
|
|
|
+ node_num++;
|
|
|
}
|
|
|
} else {
|
|
|
std::cerr << "Unknown key in config: " <<
|
|
@@ -340,7 +346,7 @@ void Client::generateMessageBundle(uint8_t priv_out, uint32_t msg_size,
|
|
|
unsigned char *pt_msgbundle)
|
|
|
{
|
|
|
unsigned char *ptr = pt_msgbundle;
|
|
|
- uint64_t header = (id << 8) + CLIENT_MESSAGE_BUNDLE;
|
|
|
+ uint64_t header = (sim_id << 8) + CLIENT_MESSAGE_BUNDLE;
|
|
|
|
|
|
// Setup header
|
|
|
memcpy(ptr, (uint8_t*) &header, sizeof(header));
|
|
@@ -350,7 +356,8 @@ void Client::generateMessageBundle(uint8_t priv_out, uint32_t msg_size,
|
|
|
for(uint32_t i = 0; i < priv_out; i++) {
|
|
|
memcpy(ptr, &id, sizeof(id));
|
|
|
ptr+=(sizeof(id));
|
|
|
- memcpy(ptr, &id, sizeof(i));
|
|
|
+
|
|
|
+ memcpy(ptr, &id, sizeof(id));
|
|
|
ptr+=(sizeof(id));
|
|
|
|
|
|
uint32_t remaining_message_size = msg_size - (sizeof(id)*2);
|
|
@@ -401,7 +408,9 @@ void Client::sendMessageBundle(uint16_t priv_out, uint16_t msg_size,
|
|
|
|
|
|
encryptMessageBundle(enc_bundle_size, pt_msgbundle, enc_msgbundle);
|
|
|
|
|
|
+#ifdef VERBOSE_CLIENT
|
|
|
displayPtMessageBundle(pt_msgbundle, priv_out, msg_size);
|
|
|
+#endif
|
|
|
|
|
|
//displayEncMessageBundle(enc_msgbundle, priv_out, msg_size);
|
|
|
|
|
@@ -414,7 +423,7 @@ int Client::sendAuthMessage()
|
|
|
uint32_t auth_size = sizeof(uint64_t) + SGX_AESGCM_KEY_SIZE;
|
|
|
unsigned char *auth_string = (unsigned char*) malloc(auth_size);
|
|
|
unsigned char *as_ptr = auth_string;
|
|
|
- uint64_t header = (id << 8) + CLIENT_AUTHENTICATE;
|
|
|
+ uint64_t header = (sim_id << 8) + CLIENT_AUTHENTICATE;
|
|
|
memcpy(as_ptr, &header, sizeof(header));
|
|
|
as_ptr+=sizeof(header);
|
|
|
|
|
@@ -434,11 +443,13 @@ int Client::sendAuthMessage()
|
|
|
uint64_t *iv_ctr = (uint64_t*) iv;
|
|
|
(*iv_ctr)+=1;
|
|
|
|
|
|
+#ifdef VERBOSE_CLIENT
|
|
|
printf("Client %d auth_string: \n", id);
|
|
|
for(int i=0; i<auth_size; i++) {
|
|
|
printf("%x", auth_string[i]);
|
|
|
}
|
|
|
printf("\n");
|
|
|
+#endif
|
|
|
|
|
|
boost::asio::write(*ingestion_sock,
|
|
|
boost::asio::buffer(auth_string, auth_size));
|
|
@@ -449,10 +460,13 @@ int Client::sendAuthMessage()
|
|
|
void generateClients(boost::asio::io_context &io_context,
|
|
|
uint32_t cstart, uint32_t cstop, Client* &clients,
|
|
|
aes_key &EMK, Config &config, std::vector<NodeConfig> &ingestion_nodes,
|
|
|
+ std::vector<NodeConfig> &storage_nodes, std::vector<uint16_t> &storage_map,
|
|
|
uint32_t num_clients_total, uint32_t clients_per_ing,
|
|
|
uint32_t ing_with_additional)
|
|
|
{
|
|
|
aes_key client_key;
|
|
|
+ uint16_t num_stg_nodes = storage_nodes.size();
|
|
|
+ uint16_t *stg_map = new uint16_t[num_stg_nodes];
|
|
|
|
|
|
for(uint32_t i=cstart; i<cstop; i++) {
|
|
|
uint16_t ing_node_this_client = i/clients_per_ing;
|
|
@@ -462,7 +476,7 @@ void generateClients(boost::asio::io_context &io_context,
|
|
|
}
|
|
|
|
|
|
int ret = generateClientEncryptionKey(i, EMK, client_key);
|
|
|
- clients[i].initClient(i, client_key);
|
|
|
+ clients[i].initClient(i, client_key, num_stg_nodes, storage_map);
|
|
|
|
|
|
clients[i].initializeSocket(io_context, ingestion_nodes[ing_node_this_client]);
|
|
|
clients[i].sendAuthMessage();
|
|
@@ -516,6 +530,7 @@ int main(int argc, char **argv)
|
|
|
uint16_t nthreads = 1;
|
|
|
const char *progname = argv[0];
|
|
|
std::vector<NodeConfig> ingestion_nodes, storage_nodes;
|
|
|
+ std::vector<uint16_t> storage_map;
|
|
|
++argv;
|
|
|
|
|
|
// Parse options
|
|
@@ -538,12 +553,16 @@ int main(int argc, char **argv)
|
|
|
std::string configstr;
|
|
|
std::getline(std::cin, configstr);
|
|
|
|
|
|
+ // The epoch interval, in microseconds
|
|
|
+ uint32_t epoch_interval_us = 1000000;
|
|
|
+
|
|
|
Config config;
|
|
|
aes_key EMK, TMK;
|
|
|
boost::asio::io_context io_context;
|
|
|
boost::asio::ip::tcp::resolver resolver(io_context);
|
|
|
|
|
|
- if (!config_parse(config, configstr, ingestion_nodes, storage_nodes)) {
|
|
|
+ if (!config_parse(config, configstr, ingestion_nodes,
|
|
|
+ storage_nodes, storage_map)) {
|
|
|
exit(1);
|
|
|
}
|
|
|
|
|
@@ -570,7 +589,8 @@ int main(int argc, char **argv)
|
|
|
printf("Thread %d, cstart = %d, cstop = %d\n", i, cstart, cstop);
|
|
|
threads[i] = std::thread(generateClients, std::ref(io_context),
|
|
|
cstart, cstop, std::ref(clients), std::ref(EMK), std::ref(config),
|
|
|
- std::ref(ingestion_nodes), num_clients_total,
|
|
|
+ std::ref(ingestion_nodes), std::ref(storage_nodes),
|
|
|
+ std::ref(storage_map), num_clients_total,
|
|
|
clients_per_ing, ing_with_additional);
|
|
|
}
|
|
|
|
|
@@ -581,6 +601,10 @@ int main(int argc, char **argv)
|
|
|
// Multithreaded client message bundle generation and send
|
|
|
uint32_t epoch = 0;
|
|
|
while(epoch < 1) {
|
|
|
+ struct timespec tp;
|
|
|
+ clock_gettime(CLOCK_REALTIME_COARSE, &tp);
|
|
|
+ unsigned long start = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
|
|
|
+
|
|
|
for(int i=0; i<nthreads; i++) {
|
|
|
uint32_t cstart, cstop;
|
|
|
cstart = i * clients_per_thread;
|
|
@@ -591,7 +615,19 @@ int main(int argc, char **argv)
|
|
|
for(int i=0; i<nthreads; i++) {
|
|
|
threads[i].join();
|
|
|
}
|
|
|
+
|
|
|
+ clock_gettime(CLOCK_REALTIME_COARSE, &tp);
|
|
|
+ unsigned long end = tp.tv_sec * 1000000 + tp.tv_nsec/1000;
|
|
|
+ unsigned long diff = end - start;
|
|
|
+ // Sleep for the rest of the epoch interval
|
|
|
+ printf("Done with submissions for 1 epoch \n");
|
|
|
+ if (diff < epoch_interval_us) {
|
|
|
+ printf("diff = %ld\n", diff);
|
|
|
+ usleep(epoch_interval_us - (useconds_t)diff);
|
|
|
+ }
|
|
|
epoch++;
|
|
|
}
|
|
|
+
|
|
|
+ sleep(10000);
|
|
|
delete [] clients;
|
|
|
}
|