Browse Source

added a semaphore to lock access to each client's downstream queue, and deleted queue blocks for closed connections

cecylia 7 years ago
parent
commit
a79cd0b520
3 changed files with 50 additions and 16 deletions
  1. 1 0
      relay_station/flow.c
  2. 47 16
      relay_station/relay.c
  3. 2 0
      relay_station/relay.h

+ 1 - 0
relay_station/flow.c

@@ -378,6 +378,7 @@ int update_flow(flow *f, uint8_t *record, uint8_t incoming) {
 
 					if((f->in_encrypted == 2) && (f->out_encrypted == 2)){
 						printf("Handshake complete!\n");
+						fflush(stdout);
 						f->application = 1;
 					}
 

+ 47 - 16
relay_station/relay.c

@@ -354,6 +354,7 @@ int read_header(flow *f, struct packet_info *info){
 
 				new_client->streams->first = NULL;
 				new_client->downstream_queue = emalloc(sizeof(data_queue));
+				sem_init(&(new_client->queue_lock), 0, 1);
 
 				new_client->downstream_queue->first_block = NULL;
 				new_client->encryption_counter = 0;
@@ -413,21 +414,9 @@ int read_header(flow *f, struct packet_info *info){
 			}
 
 			if(stream_pipe != -1){
-				//check to see if this is a close message
-				//if(stream_len == 0){
-					//close(stream_pipe);
-					//remove from stream id table
-					//if(last == streams->first){
-					//	streams->first = last->next;
-					//} else {
-					//	prev->next = last->next;
-					//}
-					//printf("Freed (1) %p\n", last);
-					//fflush(stdout);
-					//free(last);
-					//break;
-				//}
 				if(stream_len ==0){
+
+					printf("Client closed. We are here\n");
 					close(stream_pipe);
 					break;
 				}
@@ -462,6 +451,7 @@ int read_header(flow *f, struct packet_info *info){
 				thread_data->pipefd = pipefd[0];
 				thread_data->streams = f->streams;
 				thread_data->downstream_queue = f->downstream_queue;
+				thread_data->client = f->client_ptr;
 				
 				pthread_create(&proxy_thread, NULL, proxy_covert_site, (void *) thread_data);
 
@@ -513,8 +503,9 @@ int read_header(flow *f, struct packet_info *info){
  *  Input:
  *  	A struct that contains the following information:
  *  	- the tagged flow
- *  	- the initial upstream data (including connect request)
+ *  	- the initial upstream data + len (including connect request)
  *  	- the read end of the pipe
+ *  	- the downstream queue for the client
  *
  */
 void *proxy_covert_site(void *data){
@@ -530,6 +521,7 @@ void *proxy_covert_site(void *data){
 
 	stream_table *streams = thread_data->streams;
 	data_queue *downstream_queue = thread_data->downstream_queue;
+	client *clnt = thread_data->client;
 
 	struct socks_req *clnt_req = (struct socks_req *) p;
 	p += 4;
@@ -658,7 +650,40 @@ void *proxy_covert_site(void *data){
 					break;
 				}
 			} else {
-				printf("PROXY (id %d): read %d bytes from pipe\n", stream_id, bytes_read);
+				//Client closed the connection, we can delete this stream from the downstream queue
+
+				printf("Deleting stream %d from the downstream queue\n", stream_id);
+
+				sem_wait(&clnt->queue_lock);
+
+				queue_block *last = downstream_queue->first_block;
+				queue_block *prev = last;
+				while(last != NULL){
+					if(last->stream_id == stream_id){
+						//remove block from queue
+						printf("removing a block!\n");
+						fflush(stdout);
+						if(last == downstream_queue->first_block){
+							downstream_queue->first_block = last->next;
+							free(last->data);
+							free(last);
+							last = downstream_queue->first_block;
+							prev = last;
+						} else {
+							prev->next = last->next;
+							free(last->data);
+							free(last);
+							last = prev->next;
+						}
+					} else {
+						prev = last;
+						last = last->next;
+					}
+				}
+
+				sem_post(&clnt->queue_lock);
+				printf("Finished deleting from downstream queue\n");
+				fflush(stdout);
 				break;
 			}
 
@@ -688,6 +713,7 @@ void *proxy_covert_site(void *data){
 				new_block->data = new_data;
 				new_block->next = NULL;
 				new_block->stream_id = stream_id;
+				sem_wait(&clnt->queue_lock);
 				if(downstream_queue->first_block == NULL){
 					downstream_queue->first_block = new_block;
 				}
@@ -697,6 +723,7 @@ void *proxy_covert_site(void *data){
 						last = last->next;
 					last->next = new_block;
 				}
+				sem_post(&clnt->queue_lock);
 			} else {
 				printf("PROXY (id %d): read %d bytes from censored site\n",stream_id, bytes_read);
 				
@@ -1220,6 +1247,8 @@ int fill_with_downstream(flow *f, uint8_t *data, int32_t length){
 		int32_t fill_amount = remaining - SLITHEEN_HEADER_LEN - 32;
 		fill_amount -= fill_amount % 16; //rounded down to nearest block size
 
+		sem_wait(&client_ptr->queue_lock);
+
 		queue_block *first_block = downstream_queue->first_block;
 		int32_t block_length = first_block->len;
 		int32_t offset = first_block->offset;
@@ -1266,6 +1295,8 @@ int fill_with_downstream(flow *f, uint8_t *data, int32_t length){
 			remaining -= (block_length - offset);
 		}
 
+		sem_post(&client_ptr->queue_lock);
+
 		//pad to 16 bytes if necessary
 		uint8_t padding = 0;
 		if(sl_hdr->len %16){

+ 2 - 0
relay_station/relay.h

@@ -11,12 +11,14 @@ struct proxy_thread_data {
 	int32_t pipefd;
 	stream_table *streams;
 	data_queue *downstream_queue;
+	client *client;
 };
 
 typedef struct client_st {
 	uint8_t slitheen_id[SLITHEEN_ID_LEN];
 	stream_table *streams;
 	data_queue *downstream_queue;
+	sem_t queue_lock;
 	uint16_t encryption_counter;
 	struct client_st *next;
 	uint8_t *header_key;