Browse Source

Syncing to debug acceptor problem

Sajin Sasy 1 year ago
parent
commit
192acaccb0
5 changed files with 92 additions and 73 deletions
  1. 0 72
      App/net.cpp
  2. 3 1
      App/net.hpp
  3. 78 0
      App/start.cpp
  4. 8 0
      App/start.hpp
  5. 3 0
      App/teems.cpp

+ 0 - 72
App/net.cpp

@@ -171,62 +171,6 @@ void NodeIO::recv_commands(
         });
 }
 
-/*
-  Handler for received client messages.
-
-*/
-void NetIO::handle_async_clients(std::shared_ptr<tcp::socket> csocket,
-    const boost::system::error_code& error, size_t auth_size,
-    size_t msgbundle_size)
-{
-    if(!error) {
-#ifdef VERBOSE_NET
-        printf("Accept handler success\n");
-#endif
-        // Read header (1 uint64_t) from the socket and extract the client ID
-        size_t header;
-        clientid_t cid;
-        boost::asio::read(*csocket,
-             boost::asio::buffer(&header, sizeof(uint64_t)));
-
-        if((header & 0xff) == CLIENT_AUTHENTICATE) {
-            // Read the authentication token
-            boost::asio::read(*csocket,
-               boost::asio::buffer(&header, auth_size));
-
-        } else if ((header & 0xff) == CLIENT_MESSAGE_BUNDLE) {
-            unsigned char *msgbundle = (unsigned char*) malloc(msgbundle_size);
-            cid = (clientid_t)(header >> 8);
-
-            // Read the message_bundle
-            boost::asio::read(*csocket,
-               boost::asio::buffer(msgbundle, msgbundle_size));
-
-            //Ingest the message_bundle
-            bool ret = ecall_ingest_msgbundle(cid, msgbundle, apiparams.m_priv_out);
-            free(msgbundle);
-        }
-
-        start_accept(auth_size, msgbundle_size);
-    } else {
-        printf("Accept handler failed\n");
-    }
-}
-
-/*
-  Asynchronously accept client connections
-*/
-void NetIO::start_accept(size_t auth_size, size_t msgbundle_size)
-{
-    std::shared_ptr<tcp::socket> csocket(new tcp::socket(io_context()));
-#ifdef VERBOSE_NET
-    std::cout << "Accepting on " << myconf.clistenhost << ":" << myconf.clistenport << "\n";
-#endif
-    client_acceptor->async_accept(*csocket,
-        boost::bind(&NetIO::handle_async_clients, this, csocket,
-        boost::asio::placeholders::error, auth_size, msgbundle_size));
-}
-
 
 NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
     : context(io_context), conf(config),
@@ -297,24 +241,8 @@ NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
         std::cerr << "Connected to " << config.nodes[i].name << "\n";
 #endif
     }
-
-    if(myconf.roles & ROLE_INGESTION) {
-        client_acceptor = std::shared_ptr<tcp::acceptor>(
-            new tcp::acceptor(io_context,
-                resolver.resolve(this->myconf.clistenhost,
-                this->myconf.clistenport)->endpoint()));
-
-        size_t auth_size, msgbundle_size;
-        auth_size = SGX_AESGCM_MAC_SIZE;
-        msgbundle_size = SGX_AESGCM_IV_SIZE +
-            (apiparams.m_priv_out * apiparams.msg_size) + SGX_AESGCM_MAC_SIZE;
-        start_accept(auth_size, msgbundle_size);
-    }
-
-
 }
 
-
 void NetIO::recv_commands(
     std::function<void(boost::system::error_code)> error_cb,
     std::function<void(uint32_t)> epoch_cb)

+ 3 - 1
App/net.hpp

@@ -129,7 +129,6 @@ class NetIO {
     const Config &conf;
     const NodeConfig &myconf;
     std::deque<std::optional<NodeIO>> nodeios;
-    std::shared_ptr<tcp::acceptor> client_acceptor;
 
     void handle_async_clients(std::shared_ptr<tcp::socket> socket,
         const boost::system::error_code& error, size_t auth_size,
@@ -146,6 +145,7 @@ public:
         return nodeios[node_num].value();
     }
     const Config &config() { return conf; }
+    const NodeConfig &myconfig() {return myconf; };
     boost::asio::io_context &io_context() { return context; }
     // Call recv_commands with these arguments on each of the nodes (not
     // including ourselves)
@@ -153,6 +153,8 @@ public:
         std::function<void(boost::system::error_code)> error_cb,
         std::function<void(uint32_t)> epoch_cb);
 
+    void runAsyncListeners();
+
     // Close all the sockets
     void close();
 };

+ 78 - 0
App/start.cpp

@@ -182,6 +182,84 @@ void start(NetIO &netio, char **args)
     if (*args && !strcmp(*args, "route")) {
         ++args;
         route_test(netio, args);
+
         return;
     }
 }
+
+/*
+    Handler for received client messages.
+*/
+void handle_async_clients(NetIO &netio, std::shared_ptr<tcp::socket> csocket,
+    const boost::system::error_code& error, size_t auth_size,
+    size_t msgbundle_size)
+{
+    if(!error) {
+#ifdef VERBOSE_NET
+        printf("Accept handler success\n");
+#endif
+        // Read header (1 uint64_t) from the socket and extract the client ID
+        size_t header;
+        clientid_t cid;
+        boost::asio::read(*csocket,
+             boost::asio::buffer(&header, sizeof(uint64_t)));
+
+        if((header & 0xff) == CLIENT_AUTHENTICATE) {
+            // Read the authentication token
+            boost::asio::read(*csocket,
+               boost::asio::buffer(&header, auth_size));
+
+        } else if ((header & 0xff) == CLIENT_MESSAGE_BUNDLE) {
+            unsigned char *msgbundle = (unsigned char*) malloc(msgbundle_size);
+            cid = (clientid_t)(header >> 8);
+
+            // Read the message_bundle
+            boost::asio::read(*csocket,
+               boost::asio::buffer(msgbundle, msgbundle_size));
+
+            //Ingest the message_bundle
+            bool ret = ecall_ingest_msgbundle(cid, msgbundle, apiparams.m_priv_out);
+            free(msgbundle);
+        }
+
+        start_accept(netio, auth_size, msgbundle_size);
+    } else {
+        printf("Accept handler failed\n");
+    }
+}
+
+/*
+    Asynchronously accept client connections
+*/
+void start_accept(NetIO &netio, size_t auth_size, size_t msgbundle_size)
+{
+    boost::asio::io_context &io_context = netio.io_context();
+    const NodeConfig &myconf = netio.myconfig();
+    std::shared_ptr<tcp::socket> csocket(new tcp::socket(netio.io_context()));
+    tcp::resolver resolver(io_context);
+    std::shared_ptr<tcp::acceptor> client_acceptor;
+    client_acceptor = std::shared_ptr<tcp::acceptor>(
+        new tcp::acceptor(io_context,
+            resolver.resolve(myconf.clistenhost,
+            myconf.clistenport)->endpoint()));
+#ifdef VERBOSE_NET
+    std::cout << "Accepting on " << myconf.clistenhost << ":"
+        << myconf.clistenport << "\n";
+#endif
+    client_acceptor->async_accept(*csocket,
+        boost::bind(&handle_async_clients, netio, csocket,
+        boost::asio::placeholders::error, auth_size, msgbundle_size));
+}
+
+void runAsyncListeners(NetIO &netio)
+{
+    const Config &conf = netio.config();
+    const NodeConfig &myconf = netio.myconfig();
+    if(myconf.roles & ROLE_INGESTION) {
+        size_t auth_size, msgbundle_size;
+        auth_size = SGX_AESGCM_MAC_SIZE;
+        msgbundle_size = SGX_AESGCM_IV_SIZE +
+            (conf.m_priv_out * conf.msg_size) + SGX_AESGCM_MAC_SIZE;
+        start_accept(netio, auth_size, msgbundle_size);
+    }
+}

+ 8 - 0
App/start.hpp

@@ -7,4 +7,12 @@
 // to do on the command line
 void start(NetIO &netio, char **args);
 
+void handle_async_clients(NetIO &netio, std::shared_ptr<tcp::socket> csocket,
+    const boost::system::error_code& error, size_t auth_size,
+    size_t msgbundle_size);
+
+void start_accept(NetIO &netio, size_t auth_size, size_t msgbundle_size);
+
+void runAsyncListeners(NetIO &netio);
+
 #endif

+ 3 - 0
App/teems.cpp

@@ -251,6 +251,9 @@ int main(int argc, char **argv)
                 // all other nodes
                 printf("Starting\n");
                 start(netio, argv);
+
+                // Run the async handlers
+                runAsyncListeners(netio);
             });
         });
         printf("Reading\n");