#include #include "Enclave_t.h" #include "utils.hpp" #include "config.hpp" #include "route.hpp" #include "ingest.hpp" Ingestion g_ing; bool ecall_ingest_msgbundle(clientid_t cid, unsigned char *msgbundle, uint32_t num_msgs) { bool ret; ret = g_ing.processMsgBundle(cid, msgbundle, num_msgs); return ret; } void Ingestion::initialize(uint32_t cnum, uint32_t cstart, sgx_aes_gcm_128bit_key_t &ESK) { clients.num = cnum; clients.start = cstart; clients.end = cnum + cstart; clients.keys = new sgx_aes_gcm_128bit_key_t[num]; generateClientKeys(ESK); // Initialize the MsgBuffer to correct size max_buffer_size = g_teems_config.m_priv_out * cnum; buffer.alloc(max_buffer_size); } bool Ingestion::processMsgBundle(clientid_t cid, unsigned char *msgbundle, uint32_t num_msgs) { // Fetch corresponding client key sgx_aes_gcm_128bit_key_t &ckey = g_ing.clients.keys[cid]; // Decrypt and verify tag for the message bundle // Append msgbundle to g_ing.buffer; uint16_t msg_size = g_teems_config.msg_size; MsgBuffer &msg_queue = g_ing.buffer; pthread_mutex_lock(&msg_queue.mutex); uint32_t head = msg_queue.reserved; if (head + num_msgs > g_ing.max_buffer_size) { pthread_mutex_unlock(&msg_queue.mutex); printf("Max %u messages exceeded\n", g_ing.max_buffer_size); return false; } msg_queue.reserved += num_msgs; pthread_mutex_unlock(&msg_queue.mutex); memmove(msg_queue.buf + head * msg_size, msgbundle, num_msgs * msg_size); pthread_mutex_lock(&msg_queue.mutex); msg_queue.inserted += num_msgs; pthread_mutex_unlock(&msg_queue.mutex); return true; } void Ingestion::generateClientKeys(sgx_aes_gcm_128bit_key_t &ESK) { printf("In Ingestion::genCK, num_clients = %d, client_start = %d, client_end = %d\n", clients.num, clients.start, clients.end); for(uint32_t i=0; i