Explorar el Código

Client -> Ingestion send (unencrypted) message bundles. Ingestion servers queue them into a msgBuffer

Sajin Sasy hace 1 año
padre
commit
aa86ecf9c1
Se han modificado 15 ficheros con 467 adiciones y 209 borrados
  1. 2 1
      App/appconfig.cpp
  2. 4 0
      App/appconfig.hpp
  3. 86 13
      App/net.cpp
  4. 3 39
      App/net.hpp
  5. 5 3
      Client/clientlaunch
  6. 207 79
      Client/clients.cpp
  7. 5 0
      Enclave/Enclave.edl
  8. 29 8
      Enclave/config.cpp
  9. 57 6
      Enclave/ingest.cpp
  10. 15 17
      Enclave/ingest.hpp
  11. 0 40
      Enclave/route.cpp
  12. 41 0
      Enclave/route.hpp
  13. 1 1
      Makefile
  14. 9 2
      Untrusted/Untrusted.cpp
  15. 3 0
      Untrusted/Untrusted.hpp

+ 2 - 1
App/appconfig.cpp

@@ -7,6 +7,8 @@
 #include "boost/property_tree/ptree.hpp"
 #include "boost/property_tree/json_parser.hpp"
 
+EnclaveAPIParams apiparams;
+
 // Split a hostport string like "127.0.0.1:12000" at the rightmost colon
 // into a host part "127.0.0.1" and a port part "12000".
 static bool split_host_port(std::string &host, std::string &port,
@@ -151,7 +153,6 @@ bool config_parse(Config &config, const std::string configstr,
     if (!ret) return ret;
 
     // Now load the config into the enclave
-    EnclaveAPIParams apiparams;
     apiparams.user_count = config.user_count;
     apiparams.msg_size = config.msg_size;
     apiparams.m_priv_out = config.m_priv_out;

+ 4 - 0
App/appconfig.hpp

@@ -7,6 +7,9 @@
 #include "sgx_tcrypto.h"
 #include "../Enclave/enclave_api.h"
 
+#define CLIENT_AUTHENTICATE 0x00
+#define CLIENT_MESSAGE_BUNDLE 0x01
+
 // The per-node config
 struct NodeConfig {
     std::string name;
@@ -37,4 +40,5 @@ struct Config {
 bool config_parse(Config &config, const std::string configstr,
     const std::string &myname, threadid_t nthreads);
 
+extern EnclaveAPIParams apiparams;
 #endif

+ 86 - 13
App/net.cpp

@@ -170,24 +170,92 @@ void NodeIO::recv_commands(
         });
 }
 
+#ifdef VERBOSE_NET
+void displayMessage(unsigned char *msg, uint16_t msg_size) {
+    clientid_t sid, rid;
+    unsigned char *ptr = msg;
+    sid = *((clientid_t*) ptr);
+    ptr+=sizeof(sid);
+    rid = *((clientid_t*) ptr);
+    uint16_t srpair_size = sizeof(sid)*2;
+    printf("Sender ID: %d, Receiver ID: %d, Token: N/A\n", sid, rid );
+    printf("Message: ");
+    for(int j = 0; j<msg_size - srpair_size; j++) {
+        printf("%x", (*ptr));
+        ptr++;
+    }
+    printf("\n");
+}
+
+void displayMessageBundle(unsigned char *bundle, uint16_t priv_out, uint16_t msg_size) {
+    unsigned char *ptr = bundle;
+    /*
+    // Header is already parsed on this end
+    uint64_t header = *((uint64_t*) ptr);
+    ptr+=sizeof(uint64_t);
+    */
+
+    for(int i=0; i<priv_out; i++) {
+        displayMessage(ptr, msg_size);
+        printf("\n");
+        ptr+=msg_size;
+    }
+
+}
+#endif
+
 /*
   Handler for received client messages.
 
 */
 void NetIO::handle_async_clients(std::shared_ptr<tcp::socket> csocket,
-    const boost::system::error_code& error)
+    const boost::system::error_code& error, size_t auth_size,
+    size_t msgbundle_size)
 {
     if(!error) {
+#ifdef VERBOSE_NET
         printf("Accept handler success\n");
-        // Read 2 bytes from the socket, which will be the
-        // connecting node's node number
-        unsigned short node_num;
+#endif
+        // Read header (1 uint64_t) from the socket and extract the client ID
+        size_t header;
+        clientid_t cid;
         boost::asio::read(*csocket,
-             boost::asio::buffer(&node_num, sizeof(node_num)));
-        printf("node_num received = %d\n", node_num);
+             boost::asio::buffer(&header, sizeof(uint64_t)));
 
-        start_accept();
-        } else {
+        if((header & 0xff) == CLIENT_AUTHENTICATE) {
+            // Read the authentication token
+            boost::asio::read(*csocket,
+               boost::asio::buffer(&header, auth_size));
+
+        } else if ((header & 0xff) == CLIENT_MESSAGE_BUNDLE) {
+            unsigned char *msgbundle = (unsigned char*) malloc(msgbundle_size);
+            cid = (clientid_t)(header >> 8);
+
+            // Read the message_bundle
+            boost::asio::read(*csocket,
+               boost::asio::buffer(msgbundle, msgbundle_size));
+
+#ifdef VERBOSE_NET
+            displayMessageBundle(msgbundle, apiparams.m_priv_out, apiparams.msg_size);
+#endif
+
+            //Ingest the message_bundle
+            bool ret = ecall_ingest_msgbundle(cid, msgbundle, apiparams.m_priv_out);
+            free(msgbundle);
+        }
+
+
+        /*
+            This should read a MESSAGES_DROP_OFF packet of fixed length
+            from a client.
+            Send this packet over to ingestion processing:
+                - Decrypt the packet with the correct key for that client id
+                - Verify private channel token
+                - Buffer the message for route
+        */
+
+        start_accept(auth_size, msgbundle_size);
+    } else {
         printf("Accept handler failed\n");
     }
 }
@@ -195,13 +263,15 @@ void NetIO::handle_async_clients(std::shared_ptr<tcp::socket> csocket,
 /*
   Asynchronously accept client connections
 */
-void NetIO::start_accept()
+void NetIO::start_accept(size_t auth_size, size_t msgbundle_size)
 {
     std::shared_ptr<tcp::socket> csocket(new tcp::socket(io_context_));
+#ifdef VERBOSE_NET
     std::cout << "Accepting on " << myconf.clistenhost << ":" << myconf.clistenport << "\n";
+#endif
     client_acceptor->async_accept(*csocket,
         boost::bind(&NetIO::handle_async_clients, this, csocket,
-        boost::asio::placeholders::error));
+        boost::asio::placeholders::error, auth_size, msgbundle_size));
 }
 
 
@@ -275,13 +345,16 @@ NetIO::NetIO(boost::asio::io_context &io_context, const Config &config)
 #endif
     }
 
-    // Currently only server 0 handles incoming client connections.
-    if(me==0) {
+    if(myconf.roles & ROLE_INGESTION) {
         client_acceptor = std::shared_ptr<tcp::acceptor>(
             new tcp::acceptor(io_context,
                 resolver.resolve(this->myconf.clistenhost,
                 this->myconf.clistenport)->endpoint()));
-        start_accept();
+
+        size_t auth_size, msgbundle_size;
+        auth_size = SGX_AESGCM_MAC_SIZE;
+        msgbundle_size = (apiparams.m_priv_out * apiparams.msg_size) + SGX_AESGCM_MAC_SIZE;
+        start_accept(auth_size, msgbundle_size);
     }
 
 

+ 3 - 39
App/net.hpp

@@ -132,8 +132,9 @@ class NetIO {
     std::shared_ptr<tcp::acceptor> client_acceptor;
 
     void handle_async_clients(std::shared_ptr<tcp::socket> socket,
-        const boost::system::error_code& error);
-    void start_accept();
+        const boost::system::error_code& error, size_t auth_size,
+        size_t msgbundle_size);
+    void start_accept(size_t auth_size, size_t msgbundle_size);
 
 public:
     NetIO(boost::asio::io_context &io_context, const Config &config);
@@ -155,43 +156,6 @@ public:
     void close();
 };
 
-/*
-class async_server{
-public:
-    async_server(boost::asio::io_context& io_context)
-      : io_context_(io_context),
-        acceptor_(io_context, tcp::endpoint(tcp::v4(), 13000))
-    {
-      start_accept();
-    }
-
-private:
-    void start_accept()
-    {
-      std::shared_ptr<tcp::socket> csocket(new tcp::socket(io_context));
-
-      acceptor_.async_accept(csocket,
-          boost::bind(&tcp_server::handle_accept, this, new_connection,
-            boost::asio::placeholders::error));
-    }
-
-    void handle_accept(std::shared_ptr<tcp::socket> csocket,
-        const boost::system::error_code& error)
-    {
-      if (!error)
-      {
-        //new_connection->start();
-        printf("handle_accept: SUCCES\n");
-      }
-
-      start_accept();
-    }
-
-    boost::asio::io_context& io_context_;
-    tcp::acceptor acceptor_;
-};
-*/
-
 extern NetIO *g_netio;
 
 #endif

+ 5 - 3
Client/clientlaunch

@@ -20,9 +20,9 @@ PUBKEYS = "./../App/pubkeys.yaml"
 #The client binary
 CLIENTS = "./clients"
 
-def launch(manifest, config, cmd):
+def launch(config, cmd, threads):
     cmdline = ''
-    cmdline += CLIENTS + ""
+    cmdline += CLIENTS + " -t " + str(threads) + ""
     proc = subprocess.Popen(shlex.split(cmdline) + cmd,
         stdin=subprocess.PIPE, stdout=subprocess.PIPE,
         stderr=subprocess.STDOUT, bufsize=0)
@@ -44,6 +44,8 @@ if __name__ == "__main__":
         help='manifest.yaml file')
     aparse.add_argument('-p', default=PUBKEYS,
         help='pubkeys.yaml file')
+    aparse.add_argument('-t', default=1,
+        help='number of threads')
     aparse.add_argument('-z', default=None,
         help='override message size')
     aparse.add_argument('-u', default=None,
@@ -82,6 +84,6 @@ if __name__ == "__main__":
     config += "\n"
 
     thread = threading.Thread(target=launch,
-        args=(manifest, config, args.cmd))
+        args=(config, args.cmd, args.t))
     thread.start()
     thread.join()

+ 207 - 79
Client/clients.cpp

@@ -63,6 +63,39 @@ static bool hextobuf(unsigned char *buf, const char *str, size_t len)
     return true;
 }
 
+void displayMessage(unsigned char *msg, uint16_t msg_size) {
+    clientid_t sid, rid;
+    unsigned char *ptr = msg;
+    sid = *((clientid_t*) ptr);
+    ptr+=sizeof(sid);
+    rid = *((clientid_t*) ptr);
+    printf("Sender ID: %d, Receiver ID: %d, Token: N/A\n", sid, rid );
+    printf("Message: ");
+    for(int j = 0; j<msg_size - sizeof(sid)*2; j++) {
+        printf("%x", (*ptr));
+        ptr++;
+    }
+    printf("\n");
+}
+
+void displayMessageBundle(unsigned char *bundle, uint16_t priv_out, uint16_t msg_size) {
+    unsigned char *ptr = bundle;
+    uint64_t header = *((uint64_t*) ptr);
+    ptr+=sizeof(uint64_t);
+
+    for(int i=0; i<priv_out; i++) {
+        displayMessage(ptr, msg_size);
+        printf("\n");
+        ptr+=msg_size;
+    }
+
+}
+
+#define HEADER_SIZE 8
+static inline uint32_t messageBundleSize(uint16_t priv_out, uint16_t msg_size) {
+    return(HEADER_SIZE + (priv_out * msg_size) + SGX_AESGCM_MAC_SIZE);
+}
+
 bool config_parse(Config &config, const std::string configstr,
     std::vector<NodeConfig> &ingestion_nodes,
     std::vector<NodeConfig> &storage_nodes)
@@ -160,11 +193,11 @@ static void usage(const char *argv0)
 
 /*
 
-    Generate ESK (Encryption master Secret Key) and TSK (Token master Secret Key)
+    Generate EMK (Encryption master Secret Key) and TMK (Token master Secret Key)
 
 */
 int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
-   aes_key &ESK, aes_key &TSK )
+   aes_key &EMK, aes_key &TMK )
 {
     unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
     unsigned char iv[SGX_AESGCM_IV_SIZE];
@@ -174,30 +207,30 @@ int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
     memcpy(iv, "Encryption", sizeof("Encryption"));
 
     if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
-            master_secret, iv, SGX_AESGCM_IV_SIZE, ESK, mac)) {
+            master_secret, iv, SGX_AESGCM_IV_SIZE, EMK, mac)) {
         printf("Client: generateMasterKeys FAIL\n");
         return -1;
     }
 
-    printf("Encryption Master Key: ");
+    printf("\n\nEncryption Master Key: ");
     for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
-        printf("%x", ESK[i]);
+        printf("%x", EMK[i]);
     }
-    printf("\n\n");
+    printf("\n");
 
     memset(iv, 0, SGX_AESGCM_IV_SIZE);
     memcpy(iv, "Token", sizeof("Token"));
     if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0,
-            master_secret, iv, SGX_AESGCM_IV_SIZE, TSK, mac)) {
+            master_secret, iv, SGX_AESGCM_IV_SIZE, TMK, mac)) {
         printf("generateMasterKeys failed\n");
         return -1;
     }
 
     printf("Token Master Key: ");
     for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
-        printf("%x", TSK[i]);
+        printf("%x", TMK[i]);
     }
-    printf("\n");
+    printf("\n\n");
 
     return 1;
 }
@@ -206,26 +239,139 @@ int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
     Takes the client_number, the master aes_key for generating client encryption keys,
     and the client aes_key to be generated.
 */
-int generateClientEncryptionKey(clientid_t client_number, aes_key &ESK, aes_key &client_key)
+int generateClientEncryptionKey(clientid_t client_number, aes_key &EMK, aes_key &client_key)
 {
     unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
-    unsigned char mac[SGX_AESGCM_MAC_SIZE];
     unsigned char iv[SGX_AESGCM_IV_SIZE];
-    memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
+    unsigned char tag[SGX_AESGCM_MAC_SIZE];
     memset(iv, 0, SGX_AESGCM_IV_SIZE);
-    memcpy(iv, (unsigned char*) (&client_number), sizeof(client_number));
+    memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
+    memset(tag, 0, SGX_AESGCM_KEY_SIZE);
+    memcpy(iv, &client_number, sizeof(client_number));
+
+    printf("Client Key: (before Gen) ");
+    for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
+        printf("%x", client_key[i]);
+    }
+    printf("\n");
 
-    // GCM-encrypt, using the chunk key as the associated data
-    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, ESK,
-            iv, SGX_AESGCM_IV_SIZE, client_key, mac)) {
+
+    if (sizeof(zeroes) != gcm_encrypt(zeroes, SGX_AESGCM_KEY_SIZE, NULL, 0, EMK,
+            iv, SGX_AESGCM_IV_SIZE, client_key, tag)) {
         printf("generateClientEncryptionKey failed\n");
         return -1;
     }
 
+
+    printf("Client Key: (after Gen) ");
+    for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
+        printf("%x", client_key[i]);
+    }
+    printf("\n");
+
     return 1;
 }
 
 
+void Client::initializeSocket(boost::asio::io_context &ioc,
+    NodeConfig &ing_server) {
+
+    boost::system::error_code err;
+    boost::asio::ip::tcp::resolver resolver(ioc);
+
+    while(1) {
+#ifdef VERBOSE_NET
+        std::cerr << "Connecting to " << ing_server.name << "...\n";
+        std::cout << ing_server.clistenhost << ":" << ing_server.clistenport;
+#endif
+        // ingestion_sock needs io_context
+        ingestion_sock = new boost::asio::ip::tcp::socket(ioc);
+        boost::asio::connect(*ingestion_sock,
+            resolver.resolve(ing_server.clistenhost,
+                ing_server.clistenport), err);
+        if (!err) break;
+        std::cerr << "Connection to " << ing_server.name <<
+            " refused, will retry.\n";
+        sleep(1);
+    }
+
+    /*
+    // Test write 7 to the socket
+    nodenum_t node_num = 7;
+    boost::asio::write(*ingestion_sock,
+        boost::asio::buffer(&node_num, sizeof(node_num)));
+    */
+}
+
+/*
+
+    Populates the buffer payload with a valid message payload.
+    Assumes that it is supplied with a payload buffer of the correct length
+
+    Correct length for payload  =  8 + (priv_out)*(msg_size) + 16 bytes
+
+*/
+void Client::generateMessageBundle(uint8_t priv_out, uint32_t msg_size,
+    unsigned char *payload)
+{
+    unsigned char *ptr = payload;
+    uint64_t header = (id << 8) + CLIENT_MESSAGE_BUNDLE;
+
+    // Setup header
+    memcpy(ptr, (uint8_t*) &header, sizeof(header));
+    ptr+=sizeof(header);
+
+    // Setup message payload
+    for(uint32_t i = 0; i < priv_out; i++) {
+        memcpy(ptr, &id, sizeof(id));
+        ptr+=(sizeof(id));
+        memcpy(ptr, &i, sizeof(i));
+        ptr+=(sizeof(i));
+
+        uint32_t remaining_message_size = msg_size - (sizeof(id)*2);
+        memset(ptr, 0, remaining_message_size);
+        ptr+=(remaining_message_size);
+    }
+
+    memset(ptr, 0, SGX_AESGCM_MAC_SIZE);
+}
+
+
+void Client::encryptMessageBundle(uint32_t bundle_size, unsigned char *payload)
+{
+
+}
+
+/*
+
+      Assumes payload is a buffer of size messageBundleSize(priv_out, msg_size)
+*/
+
+void Client::sendMessageBundle(uint16_t priv_out, uint16_t msg_size,
+    unsigned char *payload)
+{
+    uint32_t bundle_size = messageBundleSize(priv_out, msg_size);
+
+    generateMessageBundle(priv_out, msg_size, payload);
+
+    //encryptMessageBundle(bundle_size, payload);
+
+    displayMessageBundle(payload, priv_out, msg_size);
+    //Send over the ingestion_sock
+
+    boost::asio::write(*ingestion_sock,
+        boost::asio::buffer(payload, bundle_size));
+
+}
+
+
+/*
+    Spin config.user_client actual clients. Each client:
+    1) Retrieve messages and tokens from their storage server
+    2) Send all their messages to the ingestion server
+    3) Wait for a predetermined EPOCH_DURATION time
+    4) Repeat from 1)
+*/
 
 int main(int argc, char **argv)
 {
@@ -250,6 +396,7 @@ int main(int argc, char **argv)
         }
     }
 
+    printf("nthreads = %d\n", nthreads);
     // Read the config.json from the first line of stdin.  We have to do
     // this before outputting anything to avoid potential deadlock with
     // the launch program.
@@ -257,87 +404,68 @@ int main(int argc, char **argv)
     std::getline(std::cin, configstr);
 
     Config config;
-    aes_key ESK, TSK;
-    Client *clients = new Client[config.user_count];
+    aes_key EMK, TMK, client_key;
+    boost::asio::io_context io_context;
+    boost::asio::ip::tcp::resolver resolver(io_context);
 
     if (!config_parse(config, configstr, ingestion_nodes, storage_nodes)) {
        exit(1);
     }
 
+    Client *clients = new Client[config.user_count];
     printf("Number of ingestion_nodes = %ld, Number of storage_node = %ld\n",
         ingestion_nodes.size(), storage_nodes.size());
 
-    generateMasterKeys(config.master_secret, ESK, TSK);
+    generateMasterKeys(config.master_secret, EMK, TMK);
 
-    uint32_t clients_per_ing = CEILDIV(config.user_count, ingestion_nodes.size());
-    aes_key client_key;
+    uint32_t num_clients_total = config.user_count;
+    uint16_t num_ing_nodes = ingestion_nodes.size();
+    uint32_t clients_per_ing = CEILDIV(num_clients_total, num_ing_nodes);
+    uint16_t ing_with_additional = num_clients_total % num_ing_nodes;
+    uint16_t priv_out = config.m_priv_out;
+    uint16_t msg_size = config.msg_size;
 
-    for(uint32_t i=0; i<config.user_count; i++) {
-        generateClientEncryptionKey(i, ESK, client_key);
-        clients[i].setKey(client_key);
+    uint32_t bundle_size = messageBundleSize(priv_out, msg_size);
+    unsigned char *payload = (unsigned char*) malloc (bundle_size);
 
-        printf("Client %d key: ", i);
-        for(int i=0;i<SGX_AESGCM_KEY_SIZE;i++) {
-            printf("%x", client_key[i]);
-        }
-        printf("\n");
-    }
 
+    uint64_t epoch = 1;
+    while(epoch<3) {
 
-    /*
-    // Attempt sending a data packet to one of the ingestion servers
-    boost::asio::io_context io_context;
-    boost::system::error_code err;
-    boost::asio::ip::tcp::socket nodesock(io_context);
-    boost::asio::ip::tcp::resolver resolver(io_context);
+        for(uint32_t i=0; i<num_clients_total; i++) {
+            if(epoch==1) {
+                uint16_t ing_node_this_client = i/clients_per_ing;
+                if(ing_node_this_client > ing_with_additional && ing_with_additional!=0) {
+                    uint16_t leftover = num_clients_total - (ing_with_additional * clients_per_ing);
+                    ing_node_this_client = ing_with_additional + (leftover / (clients_per_ing-1));
+                }
 
-    while(1) {
-#ifdef VERBOSE_NET
-        std::cerr << "Connecting to " << ingestion_nodes[0].name << "...\n";
-#endif
-        std::cout << ingestion_nodes[0].clistenhost << ":" << ingestion_nodes[0].clistenport;
-        boost::asio::connect(nodesock,
-            resolver.resolve(ingestion_nodes[0].clistenhost,
-                ingestion_nodes[0].clistenport), err);
-        if (!err) break;
-        std::cerr << "Connection to " << ingestion_nodes[0].name <<
-            " refused, will retry.\n";
-        sleep(1);
-    }
+                int ret = generateClientEncryptionKey(i, EMK, client_key);
+                clients[i].initClient(i, client_key);
 
-    nodenum_t node_num = 7;
-    boost::asio::write(nodesock,
-        boost::asio::buffer(&node_num, sizeof(node_num)));
-    */
+                clients[i].initializeSocket(io_context, ingestion_nodes[ing_node_this_client]);
+                //clients[i].sendAuthMessage();
 
-    /*
-    int randfd = open("/dev/urandom", O_RDONLY);
-    if (randfd < 0) {
-      throw std::runtime_error("Cannot open /dev/urandom");
-    }
 
-    unsigned char zeroes[config.msg_size];
-    unsigned char buffer[config.msg_size+4+12+16];
-    memset(zeroes, 0, sizeof(zeroes));
-    // An AES key for constructing and verifying the _plaintext_ of the blocks
-    // so that we can ensure that the blocks come out unaltered
-    unsigned char datakey[16];
-    read(randfd, datakey, 16);
-
-    // GCM-encrypt, using the chunk key as the associated data
-    if (sizeof(zeroes) != gcm_encrypt(zeroes, sizeof(zeroes), buffer, 4, datakey,
-            buffer+4, 12, buffer+4+12, buffer+4+12+sizeof(zeroes))) {
-        printf("Inner encryption failed\n");
-    }
-    */
+                /*
+                // Test that the keys generated match those generated within
+                // enclave config
+                unsigned char *ckey;
+                ckey = clients[i].getKey();
+                printf("Client %d, id = %d, key: ", i, clients[i].getid());
+                for(int j=0;j<SGX_AESGCM_KEY_SIZE;j++) {
+                    printf("%x", ckey[j]);
+                }
+                printf("\n\n");
+                */
+            }
 
-    /*
-        Spin config.user_client actual clients. Each client:
-        1) Retrieve messages and tokens from their storage server
-        2) Send all their messages to the ingestion server
-        3) Wait for a predetermined EPOCH_DURATION time
-        4) Repeat from 1)
-    */
+            clients[i].sendMessageBundle(priv_out, msg_size, payload);
+        }
+        epoch++;
+        sleep(1);
+    }
 
+    free(payload);
     delete [] clients;
 }

+ 5 - 0
Enclave/Enclave.edl

@@ -43,6 +43,11 @@ enclave {
 
         public void ecall_routing_proceed(
             [user_check]void *cbpointer);
+
+        public bool ecall_ingest_msgbundle(clientid_t cid,
+            [user_check] uint8_t *msgbundle,
+            uint32_t num_msgs);
+
     };
 
     untrusted {

+ 29 - 8
Enclave/config.cpp

@@ -5,6 +5,8 @@
 #include "route.hpp"
 #include "ingest.hpp"
 
+#define CEILDIV(x,y) (((x)+(y)-1)/(y))
+
 Config g_teems_config;
 
 int generateMasterKeys(sgx_aes_gcm_128bit_key_t master_secret,
@@ -136,21 +138,40 @@ bool ecall_config_load(threadid_t nthreads, bool private_routing,
     threadpool_init(nthreads);
     PRB_pool_init(nthreads);
 
-    sgx_aes_gcm_128bit_key_t ESK, TSK;
-    generateMasterKeys(g_teems_config.master_secret, ESK, TSK);
-    if(my_node_num==0) {
-        printf("My_node_num match!\n");
-        g_ing.initialize(g_teems_config.user_count, 0, ESK);
+    if(apinodeconfigs[my_node_num].roles & ROLE_INGESTION) {
+        sgx_aes_gcm_128bit_key_t ESK, TSK;
+        generateMasterKeys(g_teems_config.master_secret, ESK, TSK);
+
+        uint32_t num_clients_total = g_teems_config.user_count;
+        uint32_t num_ing_nodes = g_teems_config.num_ingestion_nodes;
+        uint32_t clients_per_server = CEILDIV(num_clients_total, num_ing_nodes);
+        uint32_t ing_with_additional = num_clients_total % num_ing_nodes;
+        uint32_t num_smaller_ing = (uint32_t) ing_smaller.size();
+        uint32_t num_clients_this_ing = clients_per_server;
+        num_clients_this_ing += (num_smaller_ing < ing_with_additional)? 1: 0;
+        uint32_t client_start = num_smaller_ing * clients_per_server;
+        if (ing_with_additional > 0) {
+            if(ing_with_additional > num_smaller_ing) {
+                client_start+= num_smaller_ing;
+            } else {
+                client_start+= ing_with_additional;
+            }
+        }
+        g_ing.initialize(num_clients_this_ing, client_start, ESK);
 
-        for(uint32_t i=0; i<g_teems_config.user_count; i++) {
-            printf("Client Key %d: ", i);
+        /*
+        // Check that the keys generated in Enclave match the ones generated
+        // in the client application
+        for(uint32_t i=0; i<g_ing.clients.num; i++) {
+            printf("Client Key %d: ", i + g_ing.clients.start);
             sgx_aes_gcm_128bit_key_t key;
-            memcpy(key, (g_ing.clients).client_keys[i], SGX_AESGCM_KEY_SIZE);
+            memcpy(key, (g_ing.clients).keys[i], SGX_AESGCM_KEY_SIZE);
             for(int j = 0; j<SGX_AESGCM_KEY_SIZE; j++) {
                 printf("%x", key[j]);
             }
             printf("\n");
         }
+        */
 
     }
 

+ 57 - 6
Enclave/ingest.cpp

@@ -5,12 +5,65 @@
 #include "route.hpp"
 #include "ingest.hpp"
 
+Ingestion g_ing;
+
+bool ecall_ingest_msgbundle(clientid_t cid, unsigned char *msgbundle,
+    uint32_t num_msgs) {
+
+    bool ret;
+    ret = g_ing.processMsgBundle(cid, msgbundle, num_msgs);
+    return ret;
+}
+
+
+void Ingestion::initialize(uint32_t cnum, uint32_t cstart, sgx_aes_gcm_128bit_key_t &ESK) {
+    clients.num = cnum;
+    clients.start = cstart;
+    clients.end = cnum + cstart;
+    clients.keys = new sgx_aes_gcm_128bit_key_t[num];
+    generateClientKeys(ESK);
+
+    // Initialize the MsgBuffer to correct size
+    max_buffer_size = g_teems_config.m_priv_out * cnum;
+    buffer.alloc(max_buffer_size);
+}
+
+bool Ingestion::processMsgBundle(clientid_t cid, unsigned char *msgbundle,
+    uint32_t num_msgs) {
+    // Fetch corresponding client key
+    sgx_aes_gcm_128bit_key_t &ckey = g_ing.clients.keys[cid];
+    // Decrypt and verify tag for the message bundle
+
+    // Append msgbundle to g_ing.buffer;
+    uint16_t msg_size = g_teems_config.msg_size;
+    MsgBuffer &msg_queue = g_ing.buffer;
+
+    pthread_mutex_lock(&msg_queue.mutex);
+    uint32_t head = msg_queue.reserved;
+    if (head + num_msgs > g_ing.max_buffer_size) {
+        pthread_mutex_unlock(&msg_queue.mutex);
+        printf("Max %u messages exceeded\n",
+            g_ing.max_buffer_size);
+        return false;
+    }
+    msg_queue.reserved += num_msgs;
+    pthread_mutex_unlock(&msg_queue.mutex);
+
+    memmove(msg_queue.buf + head * msg_size,
+        msgbundle, num_msgs * msg_size);
+
+    pthread_mutex_lock(&msg_queue.mutex);
+    msg_queue.inserted += num_msgs;
+    pthread_mutex_unlock(&msg_queue.mutex);
+
+    return true;
+}
 
 void Ingestion::generateClientKeys(sgx_aes_gcm_128bit_key_t &ESK)
 {
     printf("In Ingestion::genCK, num_clients = %d, client_start = %d, client_end = %d\n",
-        num_clients, client_start, client_end);
-    for(uint32_t i=0; i<clients.num_clients; i++)
+        clients.num, clients.start, clients.end);
+    for(uint32_t i=0; i<clients.num; i++)
     {
         unsigned char zeroes[SGX_AESGCM_KEY_SIZE];
         unsigned char iv[SGX_AESGCM_IV_SIZE];
@@ -18,12 +71,12 @@ void Ingestion::generateClientKeys(sgx_aes_gcm_128bit_key_t &ESK)
         memset(zeroes, 0, SGX_AESGCM_KEY_SIZE);
         memset(iv, 0, SGX_AESGCM_IV_SIZE);
 
-        uint32_t client_num = clients.client_start + i;
+        uint32_t client_num = clients.start + i;
         memcpy(iv, (uint8_t*) (&client_num), sizeof(client_num));
 
         sgx_status_t ret = SGX_SUCCESS;
         ret = sgx_rijndael128GCM_encrypt((const sgx_aes_gcm_128bit_key_t *) (ESK),
-            zeroes, SGX_AESGCM_KEY_SIZE, (uint8_t*) (clients.client_keys[i]), iv,
+            zeroes, SGX_AESGCM_KEY_SIZE, (uint8_t*) (clients.keys[i]), iv,
             SGX_AESGCM_IV_SIZE, NULL, 0, &mac);
         if(ret!=SGX_SUCCESS) {
             printf("Ingestion::GCK FAIL\n");
@@ -31,5 +84,3 @@ void Ingestion::generateClientKeys(sgx_aes_gcm_128bit_key_t &ESK)
 
     }
 }
-
-Ingestion g_ing;

+ 15 - 17
Enclave/ingest.hpp

@@ -2,37 +2,35 @@
 #define __INGEST_HPP__
 
 struct ClientList {
-    uint32_t num_clients;
-    uint32_t client_start;
-    uint32_t client_end;
-    sgx_aes_gcm_128bit_key_t *client_keys;
+    uint32_t num;
+    uint32_t start;
+    uint32_t end;
+    sgx_aes_gcm_128bit_key_t *keys;
 };
 
+
+
 class Ingestion : public ClientList {
 private:
-    //int decrypt();
-    //int verify_tokens();
-    //int queue();
 
-public:
     ClientList clients;
+    MsgBuffer buffer;
+    size_t max_buffer_size;
 
-    void generateClientKeys(sgx_aes_gcm_128bit_key_t &ESK);
+public:
 
     Ingestion() {
     }
 
-    void initialize(uint32_t num, uint32_t start, sgx_aes_gcm_128bit_key_t &ESK) {
-        clients.num_clients = num;
-        clients.client_start = start;
-        clients.client_end = num+start;
-        clients.client_keys = new sgx_aes_gcm_128bit_key_t[num];
-        generateClientKeys(ESK);
-    }
+    void generateClientKeys(sgx_aes_gcm_128bit_key_t &ESK);
 
-    //int process();
+    void initialize(uint32_t cnum, uint32_t cstart, sgx_aes_gcm_128bit_key_t &ESK);
+
+    bool processMsgBundle(clientid_t cid, unsigned char *msgbundle,
+        uint32_t num_msgs);
 };
 
+
 extern Ingestion g_ing;
 
 #endif

+ 0 - 40
Enclave/route.cpp

@@ -1,4 +1,3 @@
-#include <pthread.h>
 #include "Enclave_t.h"
 #include "config.hpp"
 #include "utils.hpp"
@@ -7,45 +6,6 @@
 
 #define PROFILE_ROUTING
 
-struct MsgBuffer {
-    pthread_mutex_t mutex;
-    uint8_t *buf;
-    // The number of messages (not bytes) in, or on their way into, the
-    // buffer
-    uint32_t reserved;
-    // The number of messages definitely in the buffer
-    uint32_t inserted;
-
-    MsgBuffer() : buf(NULL), reserved(0), inserted(0) {
-        pthread_mutex_init(&mutex, NULL);
-    }
-
-    ~MsgBuffer() {
-        delete[] buf;
-    }
-
-    // The number passed is messages, not bytes
-    void alloc(uint32_t msgs) {
-        delete[] buf;
-        buf = NULL;
-        reserved = 0;
-        inserted = 0;
-        // This may throw bad_alloc, but we'll catch it higher up
-        buf = new uint8_t[size_t(msgs) * g_teems_config.msg_size];
-    }
-
-    // Reset the contents of the buffer
-    void reset() {
-        memset(buf, 0, inserted * g_teems_config.msg_size);
-        reserved = 0;
-        inserted = 0;
-    }
-
-    // You can't copy a MsgBuffer
-    MsgBuffer(const MsgBuffer&) = delete;
-    MsgBuffer &operator=(const MsgBuffer&) = delete;
-};
-
 enum RouteStep {
     ROUTE_NOT_STARTED,
     ROUTE_ROUND_1,

+ 41 - 0
Enclave/route.hpp

@@ -1,6 +1,47 @@
 #ifndef __ROUTE_HPP__
 #define __ROUTE_HPP__
 
+#include <pthread.h>
+
+struct MsgBuffer {
+    pthread_mutex_t mutex;
+    uint8_t *buf;
+    // The number of messages (not bytes) in, or on their way into, the
+    // buffer
+    uint32_t reserved;
+    // The number of messages definitely in the buffer
+    uint32_t inserted;
+
+    MsgBuffer() : buf(NULL), reserved(0), inserted(0) {
+        pthread_mutex_init(&mutex, NULL);
+    }
+
+    ~MsgBuffer() {
+        delete[] buf;
+    }
+
+    // The number passed is messages, not bytes
+    void alloc(uint32_t msgs) {
+        delete[] buf;
+        buf = NULL;
+        reserved = 0;
+        inserted = 0;
+        // This may throw bad_alloc, but we'll catch it higher up
+        buf = new uint8_t[size_t(msgs) * g_teems_config.msg_size];
+    }
+
+    // Reset the contents of the buffer
+    void reset() {
+        memset(buf, 0, inserted * g_teems_config.msg_size);
+        reserved = 0;
+        inserted = 0;
+    }
+
+    // You can't copy a MsgBuffer
+    MsgBuffer(const MsgBuffer&) = delete;
+    MsgBuffer &operator=(const MsgBuffer&) = delete;
+};
+
 // Call this near the end of ecall_config_load, but before
 // comms_init_nodestate. Returns true on success, false on failure.
 bool route_init();

+ 1 - 1
Makefile

@@ -291,7 +291,7 @@ Clients/clients.o: Clients/clients.cpp
 
 Client/%.o: Client/%.cpp
 	@echo "CXX  <=  $<"
-	@$(CXX) $(App_Cpp_Flags) -c $< -o $@
+	@$(CXX) $(Client_Cpp_Flags) -c $< -o $@
 
 $(Client_Name): $(Client_Cpp_Objects)
 	@echo "LINK =>  $@"

+ 9 - 2
Untrusted/Untrusted.cpp

@@ -157,7 +157,7 @@ void print_error_message(sgx_status_t ret)
             break;
         }
     }
-    
+
     if (idx == ttl)
     	printf("Error code is 0x%X. Please refer to the \"Intel SGX SDK Developer Reference\" for more details.\n", ret);
 }
@@ -168,7 +168,7 @@ void print_error_message(sgx_status_t ret)
 int initialize_enclave(void)
 {
     sgx_status_t ret = SGX_ERROR_UNEXPECTED;
-    
+
     /* Call sgx_create_enclave to initialize an enclave instance */
     /* Debug Support: set 2nd parameter to 1 */
     ret = sgx_create_enclave(ENCLAVE_FILENAME, SGX_DEBUG_FLAG, NULL, NULL, &global_eid, NULL);
@@ -293,3 +293,10 @@ void ocall_routing_round_complete(void *cbpointer, uint32_t round_num)
     delete p;
     f(round_num);
 }
+
+bool ecall_ingest_msgbundle(clientid_t cid, unsigned char *msgbundle,
+    uint32_t num_msgs) {
+    bool ret;
+    ecall_ingest_msgbundle(global_eid, &ret, cid, msgbundle, num_msgs);
+    return ret;
+}

+ 3 - 0
Untrusted/Untrusted.hpp

@@ -40,4 +40,7 @@ bool ecall_ingest_raw(uint8_t *msgs, uint32_t num_msgs);
 
 void ecall_routing_proceed(std::function<void(uint32_t)>);
 
+bool ecall_ingest_msgbundle(clientid_t cid, unsigned char *msgbundle,
+    uint32_t num_msgs);
+
 #endif