|
@@ -7,44 +7,6 @@
|
|
|
|
|
|
Ingestion g_ing;
|
|
|
|
|
|
-void displayMessage(unsigned char *msg, uint16_t msg_size) {
|
|
|
- clientid_t sid, rid;
|
|
|
- unsigned char *ptr = msg;
|
|
|
- sid = *((clientid_t*) ptr);
|
|
|
- ptr+=sizeof(sid);
|
|
|
- rid = *((clientid_t*) ptr);
|
|
|
- ptr+=sizeof(rid);
|
|
|
- printf("Sender ID: %d, Receiver ID: %d, Token: N/A\n", sid, rid );
|
|
|
- printf("Message: ");
|
|
|
- for(int j = 0; j<msg_size - sizeof(sid)*2; j++) {
|
|
|
- printf("%x", (*ptr));
|
|
|
- ptr++;
|
|
|
- }
|
|
|
- printf("\n");
|
|
|
-}
|
|
|
-
|
|
|
-void displayEncMessageBundle(unsigned char *bundle, uint16_t priv_out, uint16_t msg_size) {
|
|
|
- unsigned char *ptr = bundle;
|
|
|
-
|
|
|
- printf("IV: ");
|
|
|
- for(int i=0; i<SGX_AESGCM_IV_SIZE; i++) {
|
|
|
- printf("%x", ptr[i]);
|
|
|
- }
|
|
|
- printf("\n");
|
|
|
- ptr+= SGX_AESGCM_IV_SIZE;
|
|
|
-
|
|
|
- for(int i=0; i<priv_out; i++) {
|
|
|
- displayMessage(ptr, msg_size);
|
|
|
- ptr+=msg_size;
|
|
|
- }
|
|
|
-
|
|
|
- printf("MAC: ");
|
|
|
- for(int i=0; i<SGX_AESGCM_MAC_SIZE; i++) {
|
|
|
- printf("%x", ptr[i]);
|
|
|
- }
|
|
|
- printf("\n");
|
|
|
-}
|
|
|
-
|
|
|
bool ecall_ingest_msgbundle(clientid_t cid, unsigned char *msgbundle,
|
|
|
uint32_t num_msgs)
|
|
|
{
|
|
@@ -60,7 +22,8 @@ bool ecall_ingestion_authenticate(clientid_t cid, unsigned char *auth_message)
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
-void Ingestion::initialize(uint32_t num, uint32_t start, sgx_aes_gcm_128bit_key_t &ESK) {
|
|
|
+void Ingestion::initialize(uint32_t num, uint32_t start,
|
|
|
+ sgx_aes_gcm_128bit_key_t &ESK) {
|
|
|
cnum = num;
|
|
|
cstart = start;
|
|
|
clients = new IngClient[cnum];
|
|
@@ -80,7 +43,7 @@ bool Ingestion::authenticate(clientid_t cid, unsigned char *auth_message)
|
|
|
uint32_t num_ing_nodes = g_teems_config.num_ingestion_nodes;
|
|
|
uint32_t lcid = cid / num_ing_nodes;
|
|
|
const sgx_aes_gcm_128bit_key_t *ckey = &(clients[lcid].key);
|
|
|
- return(authenticateClient(auth_message, ckey));
|
|
|
+ return authenticateClient(auth_message, ckey);
|
|
|
}
|
|
|
|
|
|
bool Ingestion::processMsgBundle(clientid_t cid, unsigned char *msgbundle,
|
|
@@ -88,7 +51,10 @@ bool Ingestion::processMsgBundle(clientid_t cid, unsigned char *msgbundle,
|
|
|
|
|
|
// Fetch corresponding client key
|
|
|
uint32_t num_ing_nodes = g_teems_config.num_ingestion_nodes;
|
|
|
+ uint32_t num_stg_nodes = g_teems_config.num_storage_nodes;
|
|
|
clientid_t lcid = cid / num_ing_nodes;
|
|
|
+ clientid_t global_client_id =
|
|
|
+ ((cid % num_stg_nodes) << DEST_UID_BITS) + (cid / num_stg_nodes);
|
|
|
sgx_aes_gcm_128bit_key_t *ckey = &(clients[lcid].key);
|
|
|
unsigned char *iv = msgbundle;
|
|
|
msgbundle += SGX_AESGCM_IV_SIZE;
|
|
@@ -101,10 +67,13 @@ bool Ingestion::processMsgBundle(clientid_t cid, unsigned char *msgbundle,
|
|
|
msgbundle_size = num_msgs * msg_size;
|
|
|
}
|
|
|
unsigned char *dec_msgbundle = (unsigned char *) malloc (msgbundle_size);
|
|
|
- sgx_aes_gcm_128bit_tag_t *tag = (sgx_aes_gcm_128bit_tag_t*) (msgbundle + msgbundle_size);
|
|
|
+ sgx_aes_gcm_128bit_tag_t *tag =
|
|
|
+ (sgx_aes_gcm_128bit_tag_t*) (msgbundle + msgbundle_size);
|
|
|
+
|
|
|
+ sgx_status_t ret = sgx_rijndael128GCM_decrypt(ckey, msgbundle,
|
|
|
+ msgbundle_size, dec_msgbundle, iv, SGX_AESGCM_IV_SIZE,
|
|
|
+ NULL, 0, tag);
|
|
|
|
|
|
- sgx_status_t ret = sgx_rijndael128GCM_decrypt(ckey, msgbundle, msgbundle_size,
|
|
|
- dec_msgbundle, iv, SGX_AESGCM_IV_SIZE, NULL, 0, tag);
|
|
|
if(ret!=SGX_SUCCESS) {
|
|
|
printf("Ingestion::processMsgBundle FAIL\n");
|
|
|
printf("Error code: %d", (uint32_t) ret);
|
|
@@ -112,23 +81,36 @@ bool Ingestion::processMsgBundle(clientid_t cid, unsigned char *msgbundle,
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- // TODO: Verify the tokens from end of the msgbundle
|
|
|
- // before appending them into the MsgBuffer
|
|
|
- bool verified = true;
|
|
|
- if(g_teems_config.private_routing) {
|
|
|
+ // Force the sender ids to be set properly.
|
|
|
+
|
|
|
+ // In reality, we would also set the priorities here, but for the
|
|
|
+ // benchmark, we'll let the clients choose them arbitrarily.
|
|
|
+
|
|
|
+ unsigned char *sid_ptr = dec_msgbundle + sizeof(clientid_t);
|
|
|
+ if (!g_teems_config.private_routing) {
|
|
|
+ // Leave room for the priority
|
|
|
+ sid_ptr += sizeof(uint32_t);
|
|
|
+ }
|
|
|
+ for (uint32_t k =0; k < num_msgs; k++) {
|
|
|
+ *(clientid_t*)sid_ptr = global_client_id;
|
|
|
+ sid_ptr += msg_size;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Verify the tokens from end of the msgbundle
|
|
|
+
|
|
|
+ if (g_teems_config.private_routing) {
|
|
|
unsigned char token_body[TOKEN_SIZE];
|
|
|
const sgx_aes_gcm_128bit_key_t *pTSK = &(g_teems_config.TSK);
|
|
|
unsigned char *dm_ptr = dec_msgbundle;
|
|
|
unsigned char *tkn_ptr = dm_ptr + (msg_size * num_msgs);
|
|
|
- sgx_cmac_state_handle_t cmac_handle;
|
|
|
sgx_cmac_128bit_tag_t token;
|
|
|
|
|
|
- for(int k =0; k < num_msgs; k++)
|
|
|
- {
|
|
|
+ for (uint32_t k = 0; k < num_msgs; k++) {
|
|
|
ret = SGX_SUCCESS;
|
|
|
memset(token_body, 0, TOKEN_SIZE);
|
|
|
memcpy(token_body, dm_ptr, sizeof(clientid_t) * 2);
|
|
|
- memcpy(token_body + sizeof(clientid_t) * 2, (uint8_t*) &ingestion_epoch, sizeof(ingestion_epoch));
|
|
|
+ memcpy(token_body + sizeof(clientid_t) * 2,
|
|
|
+ (uint8_t*) &ingestion_epoch, sizeof(ingestion_epoch));
|
|
|
ret = sgx_rijndael128_cmac_msg(pTSK, token_body, TOKEN_SIZE,
|
|
|
(sgx_cmac_128bit_tag_t*) &token);
|
|
|
if(ret!=SGX_SUCCESS) {
|
|
@@ -159,33 +141,30 @@ bool Ingestion::processMsgBundle(clientid_t cid, unsigned char *msgbundle,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if(verified) {
|
|
|
- // Append msgbundle to g_ing.buffer;
|
|
|
- MsgBuffer &msg_queue = *(g_ing.buffer);
|
|
|
-
|
|
|
- pthread_mutex_lock(&msg_queue.mutex);
|
|
|
- uint32_t head = msg_queue.reserved;
|
|
|
- if (head + num_msgs > g_ing.max_buffer_size) {
|
|
|
- pthread_mutex_unlock(&msg_queue.mutex);
|
|
|
- printf("Ingestions: Max %u messages exceeded\n",
|
|
|
- g_ing.max_buffer_size);
|
|
|
- return false;
|
|
|
- }
|
|
|
- msg_queue.reserved += num_msgs;
|
|
|
- pthread_mutex_unlock(&msg_queue.mutex);
|
|
|
+ // Append msgbundle to g_ing.buffer
|
|
|
|
|
|
- memmove(msg_queue.buf + head * msg_size,
|
|
|
- dec_msgbundle, num_msgs * msg_size);
|
|
|
+ MsgBuffer &msg_queue = *(g_ing.buffer);
|
|
|
|
|
|
- pthread_mutex_lock(&msg_queue.mutex);
|
|
|
- msg_queue.inserted += num_msgs;
|
|
|
+ pthread_mutex_lock(&msg_queue.mutex);
|
|
|
+ uint32_t head = msg_queue.reserved;
|
|
|
+ if (head + num_msgs > g_ing.max_buffer_size) {
|
|
|
pthread_mutex_unlock(&msg_queue.mutex);
|
|
|
-
|
|
|
- free(dec_msgbundle);
|
|
|
- return true;
|
|
|
+ printf("Ingestions: Max %u messages exceeded\n",
|
|
|
+ g_ing.max_buffer_size);
|
|
|
+ return false;
|
|
|
}
|
|
|
+ msg_queue.reserved += num_msgs;
|
|
|
+ pthread_mutex_unlock(&msg_queue.mutex);
|
|
|
+
|
|
|
+ memmove(msg_queue.buf + head * msg_size,
|
|
|
+ dec_msgbundle, num_msgs * msg_size);
|
|
|
+
|
|
|
+ pthread_mutex_lock(&msg_queue.mutex);
|
|
|
+ msg_queue.inserted += num_msgs;
|
|
|
+ pthread_mutex_unlock(&msg_queue.mutex);
|
|
|
|
|
|
- return false;
|
|
|
+ free(dec_msgbundle);
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
void Ingestion::generateClientKeys(sgx_aes_gcm_128bit_key_t &ESK)
|
|
@@ -210,7 +189,7 @@ void Ingestion::generateClientKeys(sgx_aes_gcm_128bit_key_t &ESK)
|
|
|
zeroes, SGX_AESGCM_KEY_SIZE, (uint8_t*) (clients[i].key), iv,
|
|
|
SGX_AESGCM_IV_SIZE, NULL, 0, &tag);
|
|
|
if(ret!=SGX_SUCCESS) {
|
|
|
- printf("Ingestion::GCK FAIL\n");
|
|
|
+ printf("Ingestion::GCM FAIL\n");
|
|
|
}
|
|
|
|
|
|
/*
|