Prechádzať zdrojové kódy

Touch up Client/clients.cpp

Minor touchups, plus:

Fix a memory leak in sendStgAuthMessage

Fix a minor memory leak (only in the error case) in epoch_process

client_epoch_process was only used by run_epochs, and run_epochs
was never used, so I removed them

main should create nthreads background I/O threads; it was creating a
constant 6 of them.
Ian Goldberg 1 rok pred
rodič
commit
c178ad1c60
1 zmenil súbory, kde vykonal 144 pridanie a 125 odobranie
  1. 144 125
      Client/clients.cpp

+ 144 - 125
Client/clients.cpp

@@ -86,7 +86,7 @@ void displayMessage(unsigned char *msg, uint16_t msg_size)
     printf("Sender ID: %d, Receiver ID: %d, Token: N/A\n", sid, rid );
     printf("Message: ");
     for(int j = 0; j<msg_size - sizeof(sid)*2; j++) {
-        printf("%x", (*ptr));
+        printf("%02x", (*ptr));
         ptr++;
     }
     printf("\n");
@@ -133,32 +133,35 @@ void displayEncMessageBundle(unsigned char *bundle, uint16_t priv_out,
 
 static inline uint32_t encPubMsgBundleSize(uint16_t pub_out, uint16_t msg_size)
 {
-    return(SGX_AESGCM_IV_SIZE + (pub_out * msg_size) + SGX_AESGCM_MAC_SIZE);
+    return SGX_AESGCM_IV_SIZE + (uint32_t(pub_out) * msg_size)
+        + SGX_AESGCM_MAC_SIZE;
 }
 
 static inline uint32_t ptPubMsgBundleSize(uint16_t pub_out, uint16_t msg_size)
 {
-    return(pub_out * msg_size);
+    return uint32_t(pub_out) * msg_size;
 }
 
 static inline uint32_t encMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
 {
-    return(SGX_AESGCM_IV_SIZE + (priv_out * (msg_size + TOKEN_SIZE)) + SGX_AESGCM_MAC_SIZE);
+    return SGX_AESGCM_IV_SIZE + (uint32_t(priv_out) * (msg_size + TOKEN_SIZE))
+        + SGX_AESGCM_MAC_SIZE;
 }
 
 static inline uint32_t ptMsgBundleSize(uint16_t priv_out, uint16_t msg_size)
 {
-    return(priv_out * (msg_size + TOKEN_SIZE));
+    return uint32_t(priv_out) * (msg_size + TOKEN_SIZE);
 }
 
 static inline uint32_t encMailboxSize(uint16_t priv_in, uint16_t msg_size)
 {
-    return(SGX_AESGCM_IV_SIZE + (priv_in * msg_size) + SGX_AESGCM_MAC_SIZE);
+    return SGX_AESGCM_IV_SIZE + (uint32_t(priv_in) * msg_size)
+        + SGX_AESGCM_MAC_SIZE;
 }
 
 static inline uint32_t ptMailboxSize(uint16_t priv_in, uint16_t msg_size)
 {
-    return(priv_in * msg_size);
+    return uint32_t(priv_in) * msg_size;
 }
 
 bool config_parse(Config &config, const std::string configstr,
@@ -190,11 +193,13 @@ bool config_parse(Config &config, const std::string configstr,
                     config.m_pub_out = pentry.second.get_value<uint8_t>();
                 } else if (!pentry.first.compare("pub_in")) {
                     config.m_pub_in = pentry.second.get_value<uint8_t>();
-                // A hardcoded shared secret to derive various
-                // keys for client -> server communications and tokens
+                // A stub hardcoded shared secret to derive various
+                // keys for client <-> server communications and tokens
+                // In reality, this would be a key exchange
                 } else if (!pentry.first.compare("master_secret")) {
                     std::string hex_key = pentry.second.data();
-                    memcpy(config.master_secret, hex_key.c_str(), SGX_AESGCM_KEY_SIZE);
+                    memcpy(config.master_secret, hex_key.c_str(),
+                        SGX_AESGCM_KEY_SIZE);
                 } else if (!pentry.first.compare("private_routing")) {
                     config.private_routing = pentry.second.get_value<bool>();
                 } else {
@@ -401,9 +406,11 @@ void Client::initializeStgSocket(boost::asio::io_context &ioc,
         while(1) {
             boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
             storage_sock->bind(ep, err);
-            if(!err) break;
-            else {
-                printf("STG: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no);
+            if (!err) {
+                break;
+            } else {
+                printf("STG: Error %s. (%s:%d)\n", err.message().c_str(),
+                    (curr_ip->ip_str()).c_str(), port_no);
                 port_no++;
                 if(port_no >= PORT_END) {
                     port_no = PORT_START;
@@ -413,8 +420,10 @@ void Client::initializeStgSocket(boost::asio::io_context &ioc,
         }
 
 
-        boost::asio::ip::address stg_ip = boost::asio::ip::address::from_string(stg_server.slistenhost, err);
-        boost::asio::ip::tcp::endpoint stg_ep(stg_ip, std::stoi(stg_server.slistenport));
+        boost::asio::ip::address stg_ip =
+            boost::asio::ip::address::from_string(stg_server.slistenhost, err);
+        boost::asio::ip::tcp::endpoint
+            stg_ep(stg_ip, std::stoi(stg_server.slistenport));
         // just for printing
         // boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
 
@@ -422,9 +431,9 @@ void Client::initializeStgSocket(boost::asio::io_context &ioc,
         if (!err) {
             break;
         }
-        std::cerr <<"STG: Connection to " << stg_server.name <<
-            " refused, will , epoch_noretry.\n";
-        std::cerr << curr_ip->ip_str() << ":" << port_no << "\n";
+        std::cerr <<"STG: Connection from " <<
+            curr_ip->ip_str() << ":" << port_no << 
+            " to " << stg_server.name << " refused, will retry.\n";
 
 #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
         int sleep_delay = rand() % 100000;
@@ -432,7 +441,8 @@ void Client::initializeStgSocket(boost::asio::io_context &ioc,
 #else
         usleep(1000000);
 #endif
-        delete(storage_sock);
+        delete storage_sock;
+        storage_sock = nullptr;
     }
 }
 
@@ -461,9 +471,11 @@ void Client::initializeIngSocket(boost::asio::io_context &ioc,
         while(1) {
             boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
             ingestion_sock->bind(ep, err);
-            if(!err) break;
-            else {
-                printf("ING: Error %s. (%s:%d)\n", err.message().c_str(), (curr_ip->ip_str()).c_str(), port_no);
+            if (!err) {
+                break;
+            } else {
+                printf("ING: Error %s. (%s:%d)\n", err.message().c_str(),
+                    (curr_ip->ip_str()).c_str(), port_no);
                 port_no++;
                 if(port_no >= PORT_END) {
                     port_no = PORT_START;
@@ -475,8 +487,10 @@ void Client::initializeIngSocket(boost::asio::io_context &ioc,
 
         // just for printing
         boost::asio::ip::tcp::endpoint ep(ip_address, port_no);
-        boost::asio::ip::address ing_ip = boost::asio::ip::address::from_string(ing_server.clistenhost, err);
-        boost::asio::ip::tcp::endpoint ing_ep(ing_ip, std::stoi(ing_server.clistenport));
+        boost::asio::ip::address ing_ip =
+            boost::asio::ip::address::from_string(ing_server.clistenhost, err);
+        boost::asio::ip::tcp::endpoint
+            ing_ep(ing_ip, std::stoi(ing_server.clistenport));
         //std::cout << "Ing endpoint:" << ing_ep << "\n";
         //std::cout<<"ING: Attempting to connect client " << ep << " -> " << ing_ep <<"\n";
         ingestion_sock->connect(ing_ep, err);
@@ -491,16 +505,18 @@ void Client::initializeIngSocket(boost::asio::io_context &ioc,
            //std::cout<<"ING: Connected client " << ep << " -> " << ing_ep <<"\n";
            break;
         }
-        std::cerr << "ING: Connection to " << ing_server.name <<
-            " refused, will , epoch_noretry.\n";
-        std::cerr << curr_ip->ip_str() << ":" << port_no << "\n";
+        std::cerr << "ING: Connection from " <<
+            curr_ip->ip_str() << ":" << port_no <<
+            " to " << ing_server.name << " refused, will retry.\n";
+
 #ifdef RANDOMIZE_CLIENT_RETRY_SLEEP_TIME
         int sleep_delay = rand() % 100000;
         usleep(sleep_delay);
 #else
         usleep(1000000);
 #endif
-        delete(ingestion_sock);
+        delete ingestion_sock;
+        ingestion_sock = nullptr;
     }
 }
 
@@ -519,6 +535,8 @@ void Client::generateMessageBundle(uint8_t priv_out, uint32_t msg_size,
 
     // Setup message pt_msgbundle
     for(uint32_t i = 0; i < priv_out; i++) {
+        // For benchmarking, each client just sends messages to
+        // themselves, so the destination and source ids are the same.
         memcpy(ptr, &id, sizeof(id));
         ptr+=(sizeof(id));
 
@@ -537,16 +555,18 @@ void Client::generateMessageBundle(uint8_t priv_out, uint32_t msg_size,
 }
 
 
-bool Client::encryptMessageBundle(uint32_t enc_bundle_size, unsigned char *pt_msgbundle,
-    unsigned char *enc_msgbundle)
+bool Client::encryptMessageBundle(uint32_t enc_bundle_size,
+    unsigned char *pt_msgbundle, unsigned char *enc_msgbundle)
 {
     // Encrypt the pt_msgbundle
     unsigned char *pt_msgbundle_start = pt_msgbundle;
     unsigned char *enc_msgbundle_start = enc_msgbundle + SGX_AESGCM_IV_SIZE;
     unsigned char *enc_tag = enc_msgbundle + enc_bundle_size - SGX_AESGCM_MAC_SIZE;
-    size_t bytes_to_encrypt = enc_bundle_size - SGX_AESGCM_MAC_SIZE - SGX_AESGCM_IV_SIZE;
+    size_t bytes_to_encrypt = enc_bundle_size - SGX_AESGCM_MAC_SIZE -
+        SGX_AESGCM_IV_SIZE;
     if (bytes_to_encrypt != gcm_encrypt(pt_msgbundle_start, bytes_to_encrypt,
-        NULL, 0, ing_key, ing_iv, SGX_AESGCM_IV_SIZE, enc_msgbundle_start, enc_tag)) {
+        NULL, 0, ing_key, ing_iv, SGX_AESGCM_IV_SIZE, enc_msgbundle_start,
+        enc_tag)) {
             printf("Client: encryptMessageBundle FAIL\n");
             return 0;
     }
@@ -561,6 +581,7 @@ bool Client::encryptMessageBundle(uint32_t enc_bundle_size, unsigned char *pt_ms
 }
 
 #ifdef TRACE_SOCKIO
+
 class LimitLogger {
     std::string label;
     std::string thrid;
@@ -595,11 +616,10 @@ public:
     }
 };
 
-
 static thread_local LimitLogger
     recvlogger("recv"), queuelogger("queue"), sentlogger("sent");
-#endif
 
+#endif
 
 void Client::sendMessageBundle()
 {
@@ -616,19 +636,24 @@ void Client::sendMessageBundle()
         send_enc_msgbundle_size = encPubMsgBundleSize(pub_out, msg_size);
     }
 
-    unsigned char *send_pt_msgbundle = (unsigned char*) malloc (send_pt_msgbundle_size);
-    unsigned char *send_enc_msgbundle = (unsigned char*) malloc (send_enc_msgbundle_size);
+    unsigned char *send_pt_msgbundle =
+        (unsigned char*) malloc (send_pt_msgbundle_size);
+    unsigned char *send_enc_msgbundle =
+        (unsigned char*) malloc (send_enc_msgbundle_size);
     if(private_routing) {
         generateMessageBundle(priv_out, msg_size, send_pt_msgbundle);
     } else {
         generateMessageBundle(pub_out, msg_size, send_pt_msgbundle);
     }
-    encryptMessageBundle(send_enc_msgbundle_size, send_pt_msgbundle, send_enc_msgbundle);
+    encryptMessageBundle(send_enc_msgbundle_size, send_pt_msgbundle,
+        send_enc_msgbundle);
 
 #ifdef VERBOSE_CLIENT
     displayPtMessageBundle(send_pt_msgbundle, priv_out, msg_size);
 #endif
 
+    free(send_pt_msgbundle);
+
 #ifdef TRACE_SOCKIO
     queuelogger.log();
 #endif
@@ -645,27 +670,26 @@ void Client::sendMessageBundle()
         }
 #endif
 
+        free(send_enc_msgbundle);
+
         if (ecc) {
             if(ecc == boost::asio::error::eof) {
-                delete(storage_sock);
-            }
-            else {
-                printf("Client: boost async_write failed for sending message bundle\n");
+                delete storage_sock;
+                storage_sock = nullptr;
+            } else {
+                printf("Client: boost async_write failed for sending "
+                    "message bundle\n");
                 printf("Error %s\n", ecc.message().c_str());
             }
             return;
         }
-
-        free(send_enc_msgbundle);
-
     });
-
-    free(send_pt_msgbundle);
 }
 
 int Client::sendIngAuthMessage(unsigned long epoch_no)
 {
-    uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;
+    uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) +
+        SGX_AESGCM_KEY_SIZE;
     unsigned char *auth_message = (unsigned char*) malloc(auth_size);
     unsigned char *am_ptr = auth_message;
 
@@ -680,9 +704,10 @@ int Client::sendIngAuthMessage(unsigned long epoch_no)
     unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0};
     memcpy(epoch_iv, &epoch_no, sizeof(epoch_no));
 
-    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ing_key,
-            epoch_iv, SGX_AESGCM_IV_SIZE, am_ptr, tag)) {
-        printf("generateClientKeys failed\n");
+    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE,
+            NULL, 0, ing_key, epoch_iv, SGX_AESGCM_IV_SIZE,
+            am_ptr, tag)) {
+        printf("sendIngAuthMessage failed\n");
         return -1;
     }
 
@@ -712,7 +737,8 @@ int Client::sendIngAuthMessage(unsigned long epoch_no)
 
 int Client::sendStgAuthMessage(unsigned long epoch_no)
 {
-    uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) + SGX_AESGCM_KEY_SIZE;
+    uint32_t auth_size = sizeof(clientid_t) + sizeof(unsigned long) +
+        SGX_AESGCM_KEY_SIZE;
     unsigned char *auth_message = (unsigned char*) malloc(auth_size);
     unsigned char *am_ptr = auth_message;
 
@@ -727,9 +753,10 @@ int Client::sendStgAuthMessage(unsigned long epoch_no)
     unsigned char epoch_iv[SGX_AESGCM_IV_SIZE] = {0};
     memcpy(epoch_iv, &epoch_no, sizeof(epoch_no));
 
-    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, stg_key,
-            epoch_iv, SGX_AESGCM_IV_SIZE, am_ptr, tag)) {
-        printf("generateClientKeys failed\n");
+    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE,
+            NULL, 0, stg_key, epoch_iv, SGX_AESGCM_IV_SIZE,
+            am_ptr, tag)) {
+        printf("sendStgAuthMessage failed\n");
         return -1;
     }
 
@@ -743,18 +770,20 @@ int Client::sendStgAuthMessage(unsigned long epoch_no)
 
     boost::asio::async_write(*storage_sock,
         boost::asio::buffer(auth_message, auth_size),
-        [this] (boost::system::error_code ecc, std::size_t) {
+        [this, auth_message] (boost::system::error_code ecc, std::size_t) {
+
+        free(auth_message);
+
         if (ecc) {
             if(ecc == boost::asio::error::eof) {
-                delete(storage_sock);
-            }
-            else {
+                delete storage_sock;
+                storage_sock = nullptr;
+            } else {
                 printf("Error %s\n", ecc.message().c_str());
             }
             printf("Client::sendStgAuthMessage boost async_write failed\n");
             return;
         }
-
     });
 
     return 1;
@@ -764,7 +793,7 @@ void Client::setup_client(boost::asio::io_context &io_context,
     uint32_t sim_id, uint16_t ing_node_id, uint16_t stg_node_id,
     ip_addr *curr_ip, uint16_t &port_no)
 {
-    // Setup the client's
+    // Set up the client's
     // (i) client_id
     // (ii) symmetric keys shared with their ingestion and storage server
     // (iii) sockets to their ingestion and storage server
@@ -782,8 +811,8 @@ void Client::setup_client(boost::asio::io_context &io_context,
     // Authenticate clients to their ingestion and storage servers
     struct timespec ep;
     clock_gettime(CLOCK_REALTIME_COARSE, &ep);
-    unsigned long time_in_ns = ep.tv_sec * 1000000 + ep.tv_nsec/1000;
-    unsigned long epoch_no = CEILDIV(time_in_ns, 5000000);
+    unsigned long time_in_us = ep.tv_sec * 1000000 + ep.tv_nsec/1000;
+    unsigned long epoch_no = CEILDIV(time_in_us, 5000000);
     sendStgAuthMessage(epoch_no);
     sendIngAuthMessage(epoch_no);
     epoch_process();
@@ -841,7 +870,7 @@ using the tokens they received in this epoch
 
 void Client::epoch_process() {
 
-    uint32_t pt_token_size = (config.m_priv_out * SGX_AESGCM_KEY_SIZE);
+    uint32_t pt_token_size = uint32_t(config.m_priv_out) * SGX_AESGCM_KEY_SIZE;
     uint32_t token_bundle_size = pt_token_size + SGX_AESGCM_IV_SIZE
         + SGX_AESGCM_MAC_SIZE;
     unsigned char *enc_tokens = (unsigned char*) malloc (token_bundle_size);
@@ -854,12 +883,14 @@ void Client::epoch_process() {
 
             if (ec) {
                 if(ec == boost::asio::error::eof) {
-                    delete(storage_sock);
-                }
-                else {
+                    delete storage_sock;
+                    storage_sock = nullptr;
+                } else {
                     printf("Error %s\n", ec.message().c_str());
-                    printf("Client::epoch_process boost async_read_tokens failed\n");
+                    printf("Client::epoch_process boost "
+                        "async_read_tokens failed\n");
                 }
+                free(enc_tokens);
                 return;
             }
 
@@ -876,15 +907,18 @@ void Client::epoch_process() {
 
             // Decrypt the token bundle
             unsigned char *enc_tkn_ptr = enc_tokens + SGX_AESGCM_IV_SIZE;
-            unsigned char *enc_tkn_tag = enc_tokens + SGX_AESGCM_IV_SIZE + pt_token_size;
+            unsigned char *enc_tkn_tag = enc_tokens + SGX_AESGCM_IV_SIZE +
+                pt_token_size;
 
 
             int decrypted_bytes =  gcm_decrypt(enc_tkn_ptr, pt_token_size,
                     NULL, 0, enc_tkn_tag, (unsigned char*) &(this->stg_key),
-                    enc_tokens, SGX_AESGCM_IV_SIZE, (unsigned char*) (this->token_list));
+                    enc_tokens, SGX_AESGCM_IV_SIZE,
+                    (unsigned char*) (this->token_list));
 
             if(decrypted_bytes != pt_token_size) {
-                printf("Client::epoch_process gcm_decrypt tokens failed. decrypted_bytes = %d \n", decrypted_bytes);
+                printf("Client::epoch_process gcm_decrypt tokens failed. "
+                    "decrypted_bytes = %d\n", decrypted_bytes);
             }
 
             free(enc_tokens);
@@ -900,13 +934,15 @@ void Client::epoch_process() {
             }
             */
 
-            // Async read the messages recieved in the last epoch
+            // Async read the messages received in the last epoch
             uint16_t priv_in = config.m_priv_in;
             uint16_t msg_size = config.msg_size;
             uint32_t recv_pt_mailbox_size = ptMailboxSize(priv_in, msg_size);
             uint32_t recv_enc_mailbox_size = encMailboxSize(priv_in, msg_size);
-            unsigned char *recv_pt_mailbox = (unsigned char*) malloc (recv_pt_mailbox_size);
-            unsigned char *recv_enc_mailbox = (unsigned char*) malloc (recv_enc_mailbox_size);
+            unsigned char *recv_pt_mailbox =
+                (unsigned char*) malloc (recv_pt_mailbox_size);
+            unsigned char *recv_enc_mailbox =
+                (unsigned char*) malloc (recv_enc_mailbox_size);
 
             boost::asio::async_read(*storage_sock,
                 boost::asio::buffer(recv_enc_mailbox, recv_enc_mailbox_size),
@@ -918,12 +954,15 @@ void Client::epoch_process() {
 #endif
                 if (ecc) {
                     if(ecc == boost::asio::error::eof) {
-                        delete(storage_sock);
-                    }
-                    else {
-                        printf("Client: boost async_read failed for recieving msg_bundle\n");
+                        delete storage_sock;
+                        storage_sock = nullptr;
+                    } else {
+                        printf("Client: boost async_read failed for "
+                            "receiving msg_bundle\n");
                         printf("Error %s\n", ecc.message().c_str());
                     }
+                    free(recv_pt_mailbox);
+                    free(recv_enc_mailbox);
                     return;
                 }
 
@@ -934,6 +973,9 @@ void Client::epoch_process() {
     #endif
 
                 // Do whatever processing with the received messages here
+                // but for the benchmark, we just ignore the received
+                // messages
+
                 free(recv_enc_mailbox);
                 free(recv_pt_mailbox);
 
@@ -943,13 +985,15 @@ void Client::epoch_process() {
             });
         });
     } else {
-        // Async read the messages recieved in the last epoch
+        // Async read the messages received in the last epoch
         uint16_t pub_in = config.m_pub_in;
         uint16_t msg_size = config.msg_size;
         uint32_t recv_pt_mailbox_size = ptMailboxSize(pub_in, msg_size);
         uint32_t recv_enc_mailbox_size = encMailboxSize(pub_in, msg_size);
-        unsigned char *recv_pt_mailbox = (unsigned char*) malloc (recv_pt_mailbox_size);
-        unsigned char *recv_enc_mailbox = (unsigned char*) malloc (recv_enc_mailbox_size);
+        unsigned char *recv_pt_mailbox =
+            (unsigned char*) malloc (recv_pt_mailbox_size);
+        unsigned char *recv_enc_mailbox =
+            (unsigned char*) malloc (recv_enc_mailbox_size);
 
         boost::asio::async_read(*storage_sock,
             boost::asio::buffer(recv_enc_mailbox, recv_enc_mailbox_size),
@@ -961,16 +1005,22 @@ void Client::epoch_process() {
 #endif
             if (ecc) {
                 if(ecc == boost::asio::error::eof) {
-                    delete(storage_sock);
-                }
-                else {
+                    delete storage_sock;
+                    storage_sock = nullptr;
+                } else {
+                    printf("Client: boost async_read failed for "
+                        "receiving msg_bundle\n");
                     printf("Error %s\n", ecc.message().c_str());
                 }
-                printf("Client: boost async_read failed for recieving msg_bundle\n");
+                free(recv_pt_mailbox);
+                free(recv_enc_mailbox);
                 return;
             }
 
             // Do whatever processing with the received messages here
+            // but for the benchmark, we just ignore the received
+            // messages
+
             free(recv_enc_mailbox);
             free(recv_pt_mailbox);
 
@@ -981,13 +1031,6 @@ void Client::epoch_process() {
     }
 }
 
-void client_epoch_process(uint32_t cstart, uint32_t cstop)
-{
-    for(uint32_t i=cstart; i<cstop; i++) {
-        clients[i].epoch_process();
-    }
-}
-
 void initializeClients(boost::asio::io_context &io_context, uint16_t nthreads)
 {
     std::vector<boost::thread> threads;
@@ -1013,23 +1056,6 @@ void initializeClients(boost::asio::io_context &io_context, uint16_t nthreads)
     }
 }
 
-void run_epochs(int nthreads) {
-    size_t num_clients_total = config.user_count;
-    size_t clients_per_thread = CEILDIV(num_clients_total, nthreads);
-    std::vector<boost::thread> threads;
-
-    for(int i=0; i<nthreads; i++) {
-        uint32_t cstart, cstop;
-        cstart = i * clients_per_thread;
-        cstop = (i==nthreads-1)? num_clients_total: (i+1) * clients_per_thread;
-        threads.emplace_back(boost::thread(client_epoch_process,
-            cstart, cstop));
-    }
-    for(int i=0; i<nthreads; i++) {
-        threads[i].join();
-    }
-}
-
 int main(int argc, char **argv)
 {
     // Unbuffer stdout
@@ -1046,7 +1072,7 @@ int main(int argc, char **argv)
             }
             nthreads = uint16_t(atoi(argv[1]));
             argv += 2;
-        }  else {
+        } else {
             usage(progname);
         }
     }
@@ -1081,23 +1107,16 @@ int main(int argc, char **argv)
 
     });
 
-    // Start background threads; one will perform the work and the other
-    // will execute the async_write handlers
-    // TODO: Cleanup and distribute this based on nthreads.
-    // Currently assumes 4 threads are available for client simulator on chime.
-    boost::thread t([&]{io_context.run();});
-    boost::thread t2([&]{io_context.run();});
-    boost::thread t3([&]{io_context.run();});
-    boost::thread t4([&]{io_context.run();});
-    boost::thread t5([&]{io_context.run();});
-    boost::thread t6([&]{io_context.run();});
+    // Start background threads; one thread will perform the work and the
+    // others will execute the async_write/async_read handlers
+    std::vector<boost::thread> threads;
+    for (int i=0; i<nthreads; i++) {
+        threads.emplace_back([&]{io_context.run();});
+    }
     io_context.run();
-    t.join();
-    t2.join();
-    t3.join();
-    t4.join();
-    t5.join();
-    t6.join();
+    for (int i=0; i<nthreads; i++) {
+        threads[i].join();
+    }
 
     delete [] clients;
 }