Browse Source

Sync state to debug client->server async connection

Sajin Sasy 1 year ago
parent
commit
7fe40246f7
4 changed files with 87 additions and 2 deletions
  1. 2 0
      App/appconfig.cpp
  2. 1 1
      App/mkconfig.py
  3. 57 0
      App/net.cpp
  4. 27 1
      Client/clients.cpp

+ 2 - 0
App/appconfig.cpp

@@ -116,6 +116,8 @@ bool config_parse(Config &config, const std::string configstr,
                             sizeof(nc.pubkey));
                     } else if (!nentry.first.compare("weight")) {
                         nc.weight = nentry.second.get_value<std::uint8_t>();
+                    } else if (!nentry.first.compare("roles")) {
+                        nc.roles = nentry.second.get_value<std::uint8_t>();
                     } else if (!nentry.first.compare("listen")) {
                         ret &= split_host_port(nc.listenhost, nc.listenport,
                             nentry.second.get_value<std::string>());

+ 1 - 1
App/mkconfig.py

@@ -47,7 +47,7 @@ def create_json(manifestfile, pubkeysfile, nodelist, params_override):
         nodeconf['pubkey'] = pubkeys[node]
         nodeconf['listen'] = m['listen']
         # Optional fields
-        for f in ['clisten', 'weight', 'role']:
+        for f in ['clisten', 'weight', 'roles']:
             if f in m:
                 nodeconf[f] = m[f]
         config['nodes'].append(nodeconf)

+ 57 - 0
App/net.cpp

@@ -170,6 +170,45 @@ void NodeIO::recv_commands(
         });
 }
 
+void accept_handler(const boost::system::error_code& error) {
+    if(!error) {
+        printf("Accept handler success\n");
+        // Read 2 bytes from the socket, which will be the
+        // connecting node's node number
+        //unsigned short node_num;
+        //boost::asio::read(acc_sock,
+        //     boost::asio::buffer(&node_num, sizeof(node_num)));
+        //printf("node_num received = %d\n", node_num);
+        } else {
+    }
+}
+
+/*
+void start_accept(boost::asio::io_context& io_context, const NodeConfig& myconf);
+
+void handle_accept(boost::asio::io_context& io_context, const NodeConfig &myconf, tcp_connection::pointer new_connection,
+      const boost::system::error_code& error)
+  {
+    if (!error)
+    {
+      new_connection->start();
+    }
+
+    start_accept(io_context, myconf);
+  }
+
+void start_accept(boost::asio::io_context& io_context, const NodeConfig& myconf) {
+    tcp::resolver resolver(io_context);
+    tcp::acceptor async_acceptor(io_context,
+        resolver.resolve(myconf.clistenhost, myconf.clistenport)->endpoint());
+    tcp_connection::pointer new_connection =
+        tcp_connection::create(io_context);
+    async_acceptor.async_accept(new_connection->socket(),
+        boost::bind(&handle_accept, io_context, myconf, new_connection,
+        boost::asio::placeholders::error));
+}
+*/
+
 NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
     : conf(config), myconf(config.nodes[config.my_node_num])
 {
@@ -238,8 +277,26 @@ NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
         std::cerr << "Connected to " << config.nodes[i].name << "\n";
 #endif
     }
+
+
+
+    if(me==0) {
+        tcp::acceptor async_acceptor(io_context,
+            resolver.resolve(myconf.clistenhost, myconf.clistenport)->endpoint());
+        std::shared_ptr<tcp::socket> new_socket(new tcp::socket(io_context));
+        std::cout << myconf.clistenhost << ":" <<  myconf.clistenport;
+        async_acceptor.async_accept(*new_socket, accept_handler);
+    }
+
+
+    /*
+    if(me==0) {
+        start_accept(io_context, myconf);
+    }
+    */
 }
 
+
 void NetIO::recv_commands(
     std::function<void(boost::system::error_code)> error_cb,
     std::function<void(uint32_t)> epoch_cb)

+ 27 - 1
Client/clients.cpp

@@ -5,6 +5,9 @@
 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
 #include "boost/property_tree/ptree.hpp"
 #include "boost/property_tree/json_parser.hpp"
+#include <boost/asio.hpp>
+#include <boost/thread.hpp>
+
 
 // Split a hostport string like "127.0.0.1:12000" at the rightmost colon
 // into a host part "127.0.0.1" and a port part "12000".
@@ -116,7 +119,7 @@ bool config_parse(Config &config, const std::string configstr,
                     } else if (!nentry.first.compare("clisten")) {
                         ret &= split_host_port(nc.clistenhost, nc.clistenport,
                             nentry.second.get_value<std::string>());
-                    } else if (!nentry.first.compare("role")) {
+                    } else if (!nentry.first.compare("roles")) {
                         nc.roles = nentry.second.get_value<std::uint8_t>();
                     } else {
                         std::cerr << "Unknown field in host config: " <<
@@ -191,6 +194,29 @@ int main(int argc, char **argv)
     printf("Number of ingestion_nodes = %ld, Number of storage_node = %ld\n",
         ingestion_nodes.size(), storage_nodes.size());
 
+    // Attempt sending a data packet to one of the ingestion servers
+    boost::asio::io_context io_context;
+    boost::system::error_code err;
+    boost::asio::ip::tcp::socket nodesock(io_context);
+    boost::asio::ip::tcp::resolver resolver(io_context);
+
+    while(1) {
+#ifdef VERBOSE_NET
+        std::cerr << "Connecting to " << ingestion_nodes[0].name << "...\n";
+#endif
+        std::cout << ingestion_nodes[0].clistenhost << ":" << ingestion_nodes[0].clistenport;
+        boost::asio::connect(nodesock,
+            resolver.resolve(ingestion_nodes[0].clistenhost,
+                ingestion_nodes[0].clistenport), err);
+        if (!err) break;
+        std::cerr << "Connection to " << ingestion_nodes[0].name <<
+            " refused, will retry.\n";
+        sleep(1);
+    }
+
+    nodenum_t node_num = 7;
+    boost::asio::write(nodesock,
+        boost::asio::buffer(&node_num, sizeof(node_num)));
 
     /*
         Spin config.user_client actual clients. Each client: