Browse Source

Revert "Syncing to debug acceptor problem"

This reverts commit 192acaccb0d93332edb2ec3601c7668d4ccf1fa2.
Sajin Sasy 1 year ago
parent
commit
461aca4bf9
5 changed files with 73 additions and 92 deletions
  1. 72 0
      App/net.cpp
  2. 1 3
      App/net.hpp
  3. 0 78
      App/start.cpp
  4. 0 8
      App/start.hpp
  5. 0 3
      App/teems.cpp

+ 72 - 0
App/net.cpp

@@ -171,6 +171,62 @@ void NodeIO::recv_commands(
         });
 }
 
+/*
+  Handler for received client messages.
+
+*/
+void NetIO::handle_async_clients(std::shared_ptr<tcp::socket> csocket,
+    const boost::system::error_code& error, size_t auth_size,
+    size_t msgbundle_size)
+{
+    if(!error) {
+#ifdef VERBOSE_NET
+        printf("Accept handler success\n");
+#endif
+        // Read header (1 uint64_t) from the socket and extract the client ID
+        size_t header;
+        clientid_t cid;
+        boost::asio::read(*csocket,
+             boost::asio::buffer(&header, sizeof(uint64_t)));
+
+        if((header & 0xff) == CLIENT_AUTHENTICATE) {
+            // Read the authentication token
+            boost::asio::read(*csocket,
+               boost::asio::buffer(&header, auth_size));
+
+        } else if ((header & 0xff) == CLIENT_MESSAGE_BUNDLE) {
+            unsigned char *msgbundle = (unsigned char*) malloc(msgbundle_size);
+            cid = (clientid_t)(header >> 8);
+
+            // Read the message_bundle
+            boost::asio::read(*csocket,
+               boost::asio::buffer(msgbundle, msgbundle_size));
+
+            //Ingest the message_bundle
+            bool ret = ecall_ingest_msgbundle(cid, msgbundle, apiparams.m_priv_out);
+            free(msgbundle);
+        }
+
+        start_accept(auth_size, msgbundle_size);
+    } else {
+        printf("Accept handler failed\n");
+    }
+}
+
+/*
+  Asynchronously accept client connections
+*/
+void NetIO::start_accept(size_t auth_size, size_t msgbundle_size)
+{
+    std::shared_ptr<tcp::socket> csocket(new tcp::socket(io_context()));
+#ifdef VERBOSE_NET
+    std::cout << "Accepting on " << myconf.clistenhost << ":" << myconf.clistenport << "\n";
+#endif
+    client_acceptor->async_accept(*csocket,
+        boost::bind(&NetIO::handle_async_clients, this, csocket,
+        boost::asio::placeholders::error, auth_size, msgbundle_size));
+}
+
 
 NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
     : context(io_context), conf(config),
@@ -241,8 +297,24 @@ NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
         std::cerr << "Connected to " << config.nodes[i].name << "\n";
 #endif
     }
+
+    if(myconf.roles & ROLE_INGESTION) {
+        client_acceptor = std::shared_ptr<tcp::acceptor>(
+            new tcp::acceptor(io_context,
+                resolver.resolve(this->myconf.clistenhost,
+                this->myconf.clistenport)->endpoint()));
+
+        size_t auth_size, msgbundle_size;
+        auth_size = SGX_AESGCM_MAC_SIZE;
+        msgbundle_size = SGX_AESGCM_IV_SIZE +
+            (apiparams.m_priv_out * apiparams.msg_size) + SGX_AESGCM_MAC_SIZE;
+        start_accept(auth_size, msgbundle_size);
+    }
+
+
 }
 
+
 void NetIO::recv_commands(
     std::function<void(boost::system::error_code)> error_cb,
     std::function<void(uint32_t)> epoch_cb)

+ 1 - 3
App/net.hpp

@@ -129,6 +129,7 @@ class NetIO {
     const Config &conf;
     const NodeConfig &myconf;
     std::deque<std::optional<NodeIO>> nodeios;
+    std::shared_ptr<tcp::acceptor> client_acceptor;
 
     void handle_async_clients(std::shared_ptr<tcp::socket> socket,
         const boost::system::error_code& error, size_t auth_size,
@@ -145,7 +146,6 @@ public:
         return nodeios[node_num].value();
     }
     const Config &config() { return conf; }
-    const NodeConfig &myconfig() {return myconf; };
     boost::asio::io_context &io_context() { return context; }
     // Call recv_commands with these arguments on each of the nodes (not
     // including ourselves)
@@ -153,8 +153,6 @@ public:
         std::function<void(boost::system::error_code)> error_cb,
         std::function<void(uint32_t)> epoch_cb);
 
-    void runAsyncListeners();
-
     // Close all the sockets
     void close();
 };

+ 0 - 78
App/start.cpp

@@ -182,84 +182,6 @@ void start(NetIO &netio, char **args)
     if (*args && !strcmp(*args, "route")) {
         ++args;
         route_test(netio, args);
-
         return;
     }
 }
-
-/*
-    Handler for received client messages.
-*/
-void handle_async_clients(NetIO &netio, std::shared_ptr<tcp::socket> csocket,
-    const boost::system::error_code& error, size_t auth_size,
-    size_t msgbundle_size)
-{
-    if(!error) {
-#ifdef VERBOSE_NET
-        printf("Accept handler success\n");
-#endif
-        // Read header (1 uint64_t) from the socket and extract the client ID
-        size_t header;
-        clientid_t cid;
-        boost::asio::read(*csocket,
-             boost::asio::buffer(&header, sizeof(uint64_t)));
-
-        if((header & 0xff) == CLIENT_AUTHENTICATE) {
-            // Read the authentication token
-            boost::asio::read(*csocket,
-               boost::asio::buffer(&header, auth_size));
-
-        } else if ((header & 0xff) == CLIENT_MESSAGE_BUNDLE) {
-            unsigned char *msgbundle = (unsigned char*) malloc(msgbundle_size);
-            cid = (clientid_t)(header >> 8);
-
-            // Read the message_bundle
-            boost::asio::read(*csocket,
-               boost::asio::buffer(msgbundle, msgbundle_size));
-
-            //Ingest the message_bundle
-            bool ret = ecall_ingest_msgbundle(cid, msgbundle, apiparams.m_priv_out);
-            free(msgbundle);
-        }
-
-        start_accept(netio, auth_size, msgbundle_size);
-    } else {
-        printf("Accept handler failed\n");
-    }
-}
-
-/*
-    Asynchronously accept client connections
-*/
-void start_accept(NetIO &netio, size_t auth_size, size_t msgbundle_size)
-{
-    boost::asio::io_context &io_context = netio.io_context();
-    const NodeConfig &myconf = netio.myconfig();
-    std::shared_ptr<tcp::socket> csocket(new tcp::socket(netio.io_context()));
-    tcp::resolver resolver(io_context);
-    std::shared_ptr<tcp::acceptor> client_acceptor;
-    client_acceptor = std::shared_ptr<tcp::acceptor>(
-        new tcp::acceptor(io_context,
-            resolver.resolve(myconf.clistenhost,
-            myconf.clistenport)->endpoint()));
-#ifdef VERBOSE_NET
-    std::cout << "Accepting on " << myconf.clistenhost << ":"
-        << myconf.clistenport << "\n";
-#endif
-    client_acceptor->async_accept(*csocket,
-        boost::bind(&handle_async_clients, netio, csocket,
-        boost::asio::placeholders::error, auth_size, msgbundle_size));
-}
-
-void runAsyncListeners(NetIO &netio)
-{
-    const Config &conf = netio.config();
-    const NodeConfig &myconf = netio.myconfig();
-    if(myconf.roles & ROLE_INGESTION) {
-        size_t auth_size, msgbundle_size;
-        auth_size = SGX_AESGCM_MAC_SIZE;
-        msgbundle_size = SGX_AESGCM_IV_SIZE +
-            (conf.m_priv_out * conf.msg_size) + SGX_AESGCM_MAC_SIZE;
-        start_accept(netio, auth_size, msgbundle_size);
-    }
-}

+ 0 - 8
App/start.hpp

@@ -7,12 +7,4 @@
 // to do on the command line
 void start(NetIO &netio, char **args);
 
-void handle_async_clients(NetIO &netio, std::shared_ptr<tcp::socket> csocket,
-    const boost::system::error_code& error, size_t auth_size,
-    size_t msgbundle_size);
-
-void start_accept(NetIO &netio, size_t auth_size, size_t msgbundle_size);
-
-void runAsyncListeners(NetIO &netio);
-
 #endif

+ 0 - 3
App/teems.cpp

@@ -251,9 +251,6 @@ int main(int argc, char **argv)
                 // all other nodes
                 printf("Starting\n");
                 start(netio, argv);
-
-                // Run the async handlers
-                runAsyncListeners(netio);
             });
         });
         printf("Reading\n");