Browse Source

rewrite add_packet and update_flow code to better handle misordered or missing packets

cecylia 8 years ago
parent
commit
20d948402d
4 changed files with 144 additions and 149 deletions
  1. 129 129
      server/flow.c
  2. 12 7
      server/flow.h
  3. 1 0
      server/relay.c
  4. 2 13
      server/slitheen-proxy.c

+ 129 - 129
server/flow.c

@@ -61,8 +61,19 @@ flow *add_flow(struct packet_info *info) {
 	new_flow->application = 0;
 	new_flow->resume_session = 0;
 	new_flow->current_session = NULL;
-	new_flow->packet_chain = NULL;
+
+	new_flow->us_packet_chain = calloc(1, sizeof(packet_chain));
+	new_flow->us_packet_chain->expected_seq_num = ntohl(info->tcp_hdr->sequence_num);
+	new_flow->us_packet_chain->record_len = 0;
+	new_flow->us_packet_chain->remaining_record_len = 0;
+	new_flow->us_packet_chain->first_packet = NULL;
+	new_flow->ds_packet_chain = calloc(1, sizeof(packet_chain));
+	new_flow->ds_packet_chain->expected_seq_num = ntohl(info->tcp_hdr->ack_num);
+	new_flow->ds_packet_chain->record_len = 0;
+	new_flow->ds_packet_chain->remaining_record_len = 0;
+	new_flow->ds_packet_chain->first_packet = NULL;
 	sem_init(&(new_flow->packet_chain_lock), 0, 1);
+	
 	new_flow->upstream_queue = NULL;
 	new_flow->upstream_remaining = 0;
 	sem_init(&(new_flow->upstream_queue_lock), 0, 1);
@@ -113,59 +124,20 @@ flow *add_flow(struct packet_info *info) {
  *
  *  Inputs:
  *  	f: the tagged flow
+ *  	record: a complete TLS record
  *
  *  Output:
  *  	0 on success, 1 on failure
  */
-int update_flow(flow *f) {
-	uint8_t *record;
+int update_flow(flow *f, uint8_t *record, uint8_t incoming) {
 	const struct record_header *record_hdr;
 	const struct handshake_header *handshake_hdr;
+	uint8_t *p;
 
-	sem_wait(&(f->packet_chain_lock));
-
-	if(f->packet_chain == NULL){
-		sem_post(&(f->packet_chain_lock));
-		return 0;
-	}
-	uint8_t *p = f->packet_chain->data;
-	record_hdr = (struct record_header*) p;
+	record_hdr = (struct record_header*) record;
 	int record_len;
-	int data_len;
 
 	record_len = RECORD_LEN(record_hdr)+RECORD_HEADER_LEN;
-	data_len = f->packet_chain->data_len;
-
-	packet *current = f->packet_chain;
-	int incoming = current->incoming;
-	record = calloc(1, record_len);
-	
-	for(int i=0; (i<data_len) && (i<record_len); i++){
-		record[i] = p[i];
-	}
-	printf("Processing seq num %u\n", current->seq_num);
-
-	while(record_len > data_len) {
-		if(current->next == NULL){
-			printf("Not enough data\n");
-			goto err;
-		}
-		if(current->next->seq_num != current->seq_num + current->len){
-			printf("Missing packet: seq_num= %u, datalen= %d, nextseq= %u\n", current->seq_num, current->len, current->next->seq_num);
-			goto err;
-		}
-
-		current = current->next;
-		printf("Processing seq num %u\n", current->seq_num);
-		
-		p = current->data;
-		int i;
-		for(i=0; (i<current->data_len) && (i+data_len < record_len); i++){
-
-			record[data_len+i] = p[i];
-		}
-		data_len += current->data_len;
-	}
 
 	switch(record_hdr->type){
 		case HS:
@@ -255,18 +227,6 @@ int update_flow(flow *f) {
 					if((f->in_encrypted == 2) && (f->out_encrypted == 2)){
 						printf("Handshake complete!\n");
 						f->application = 1;
-						if(current->incoming){
-							f->downstream_seq_num = current->seq_num + current->len;
-						} else {
-							f->upstream_seq_num = current->seq_num + current->len;
-						}
-						while(current->next != NULL){
-							current = current->next;
-							if(current->incoming)
-								f->downstream_seq_num = current->seq_num+ current->len;
-							else
-								f->upstream_seq_num = current->seq_num+ current->len;
-						}
 					}
 					break;
 				default:
@@ -307,64 +267,10 @@ int update_flow(flow *f) {
 		default:
 			printf("Error: Not a Record (%x:%d -> %x:%d)\n", f->src_ip.s_addr, ntohs(f->src_port), f->dst_ip.s_addr, ntohs(f->dst_port));
 			fflush(stdout);
-			//TODO: later figure this out, for now delete
-			packet *tmp = f->packet_chain;
-			f->packet_chain = f->packet_chain->next;
-			printf("Freed data %p\n", tmp->data);
-			printf("Freed packet %p\n", tmp);
-			free(tmp->data);
-			free(tmp);
-			
-			if( f->packet_chain != NULL){
-				sem_post(&(f->packet_chain_lock));
-				free(record);
-				update_flow(f);
-				return 0;
-			}
 			goto err;
 	}
 
-	//if(!f->application){
-		if(current->incoming){
-			f->downstream_seq_num = current->seq_num;
-		} else {
-			f->upstream_seq_num = current->seq_num;
-		}
-
-		if(record_len == data_len){
-			/* record ended on packet boundary */
-			current = current->next;
-			packet *tmp = f->packet_chain;
-			while(tmp != current){
-				f->packet_chain = tmp->next;
-				printf("Freed data %p\n", tmp->data);
-				printf("Freed packet %p\n", tmp);
-				free(tmp->data);
-				free(tmp);
-				tmp = f->packet_chain;
-			}
-		} else {
-			/* need to update data */
-			packet *tmp = f->packet_chain;
-			while(tmp != current){
-				f->packet_chain = tmp->next;
-				printf("Freed data %p\n", tmp->data);
-				printf("Freed packet %p\n", tmp);
-				free(tmp->data);
-				free(tmp);
-				tmp = f->packet_chain;
-			}
-			memmove(current->data, current->data + (current->data_len - (data_len - record_len)), data_len - record_len);
-			current->data_len = data_len - record_len;
-			sem_post(&(f->packet_chain_lock));
-			free(record);
-			update_flow(f);
-			return 0;
-		}
-	//}
-
 err:
-	sem_post(&(f->packet_chain_lock));
 	free(record);
 	return 0;
 }
@@ -411,15 +317,27 @@ int remove_flow(flow *f) {
 		free(f->current_session);
 	}
 
-	if(f->packet_chain != NULL){
-		packet *tmp = f->packet_chain;
+	if(f->ds_packet_chain != NULL){
+		packet *tmp = f->ds_packet_chain->first_packet;
 		while(tmp != NULL){
-			f->packet_chain = tmp->next;
+			f->ds_packet_chain->first_packet = tmp->next;
 			printf("Freed data %p\n", tmp->data);
 			printf("Freed packet %p\n", tmp);
 			free(tmp->data);
 			free(tmp);
-			tmp = f->packet_chain;
+			tmp = f->ds_packet_chain->first_packet;
+		}
+	}
+
+	if(f->us_packet_chain != NULL){
+		packet *tmp = f->us_packet_chain->first_packet;
+		while(tmp != NULL){
+			f->us_packet_chain->first_packet = tmp->next;
+			printf("Freed data %p\n", tmp->data);
+			printf("Freed packet %p\n", tmp);
+			free(tmp->data);
+			free(tmp);
+			tmp = f->us_packet_chain->first_packet;
 		}
 	}
 		
@@ -839,9 +757,10 @@ int save_session_ticket(flow *f, uint8_t *hs, uint32_t len){
 	return 0;
 }
 
-/* Adds a packet the flow's packet chain */
+/* Adds a packet the flow's packet chain. If it can complete a record, gives
+ * this record to update_flow */
 int add_packet(flow *f, struct packet_info *info){
-	if (info->tcp_hdr == NULL){
+	if (info->tcp_hdr == NULL || info->app_data_len <= 0){
 		return 0;
 	}
 
@@ -855,15 +774,22 @@ int add_packet(flow *f, struct packet_info *info){
 	memcpy(packet_data, info->app_data, new_packet->len);
 
 	new_packet->data = packet_data;
-	new_packet->data_len = new_packet->len;
 	new_packet->next = NULL;
-	new_packet->incoming = 
-		(info->ip_hdr->src.s_addr == f->src_ip.s_addr) ? 0 : 1;
+	uint8_t incoming = (info->ip_hdr->src.s_addr == f->src_ip.s_addr) ? 0 : 1;
+	packet_chain *chain = 
+		(info->ip_hdr->src.s_addr == f->src_ip.s_addr) ? f->us_packet_chain : f->ds_packet_chain;
+
+	if(new_packet->seq_num < chain->expected_seq_num){
+		//see if this packet contains any data we are missing
+		printf("Received repeat packet (expected %d, received %d\n", chain->expected_seq_num, new_packet->seq_num);
+		free(new_packet->data);
+		free(new_packet);
 
-	/* Find appropriate place in chain */
-	if(new_packet->data_len > 0){
+	} else {//new_packet->seq_num >= chain->expected_seq_num
+	
+		//Find appropriate place in chain
 		packet *previous = NULL;
-		packet *next = f->packet_chain;
+		packet *next = chain->first_packet;
 		while(next != NULL && (next->seq_num <= new_packet->seq_num)){
 			previous = next;
 			next = next->next;
@@ -871,21 +797,95 @@ int add_packet(flow *f, struct packet_info *info){
 
 		//place packet after current
 		if(previous == NULL){
+			printf("Packet goes to beginning of chain\n");
 			//goes at the beginning of chain
-			new_packet->next = f->packet_chain;
-			f->packet_chain = new_packet;
+			new_packet->next = chain->first_packet;
+			chain->first_packet = new_packet;
+
+			//if this is a new record, find lengths
+			if(new_packet->seq_num == chain->expected_seq_num){
+				const struct record_header *record_hdr = (struct record_header *) new_packet->data;
+				chain->record_len = RECORD_LEN(record_hdr)+RECORD_HEADER_LEN;
+				chain->remaining_record_len = chain->record_len;
+				fprintf(stdout, "New Record:\n");
+				for(int i=0; i< RECORD_HEADER_LEN; i++){
+					printf("%02x ", ((uint8_t *) record_hdr)[i]);
+				}
+				printf("\n");
+			}
+			
 		} else {
+			printf("Appended packet to chain\n");
 			new_packet->next = next;
 			previous->next = new_packet;
 		}
 
-	} else {
-		free(new_packet);
-		if(packet_data != NULL){
-			free(packet_data);
+		if(new_packet->seq_num == chain->expected_seq_num){
+			printf("Received expected packet (expected %u)\n", chain->expected_seq_num);
+			chain->expected_seq_num += new_packet->len;
+
+			//while there is still data left:
+			uint32_t available_data = new_packet->len;
+			while(available_data > 0){
+
+				//if full record, give to update_flow
+				if(chain->remaining_record_len <= new_packet->len){
+					printf("Completed remaining record!\n");
+					chain->remaining_record_len = 0;
+					uint8_t *record = calloc(1, chain->record_len);
+					uint32_t tmp_len = chain->record_len;
+					packet *next = chain->first_packet;
+					while(tmp_len > 0){
+						if(tmp_len >= next->len){
+							memcpy(record+chain->record_len - tmp_len, next->data, next->len);
+							tmp_len -= next->len;
+							chain->first_packet = next->next;
+							if(next == new_packet) new_packet = NULL;
+							free(next->data);
+							free(next);
+							next = chain->first_packet;
+							printf("used up packet\n");
+							available_data = 0;
+						} else {
+							memcpy(record+chain->record_len - tmp_len, next->data, tmp_len);
+							memmove(next->data, next->data+tmp_len, next->len - tmp_len);
+							next->len -= tmp_len;
+							available_data -= tmp_len;
+							tmp_len = 0;
+							printf("There are %d bytes of packet remaining\n", next->len);
+							//this is going to be a new record
+							const struct record_header *record_hdr = (struct record_header *) next->data;
+							chain->record_len = RECORD_LEN(record_hdr)+RECORD_HEADER_LEN;
+							chain->remaining_record_len = chain->record_len;
+							fprintf(stdout, "New Record:\n");
+							for(int i=0; i< RECORD_HEADER_LEN; i++){
+								printf("%02x ", ((uint8_t *) record_hdr)[i]);
+							}
+							printf("\n");
+
+						}
+					}
+					update_flow(f, record, incoming);
+				} else {
+					chain->remaining_record_len -= new_packet->len;
+					printf("Still have %d bytes remaining in record\n", chain->remaining_record_len);
+					//see if this packet filled a hole
+					new_packet = new_packet->next;
+					if(new_packet != NULL &&
+							new_packet->seq_num == chain->expected_seq_num){
+						available_data = new_packet->len;
+						chain->expected_seq_num += new_packet->len;
+					} else {
+						available_data = 0;
+					}
+				}
+			}
+		
+		} else {//
+			//add to end of packet_chain
+			printf("Missing packet (expected %d, received %d)\n", chain->expected_seq_num, new_packet->seq_num);
 		}
 	}
-	
 	return 0;
 
 }

+ 12 - 7
server/flow.h

@@ -33,14 +33,20 @@ typedef struct stream_table_st {
 	stream *first;
 } stream_table;
 
-struct packet_st{
+typedef struct packet_st{
 	uint32_t seq_num;
 	uint16_t len;
-	uint16_t data_len;
 	uint8_t *data;
 	struct packet_st *next;
 	int incoming; //0 for outgoing, 1 for incoming
-};
+} packet;
+
+typedef struct packet_chain_st {
+	packet *first_packet;
+	uint32_t expected_seq_num;
+	uint32_t record_len;
+	uint32_t remaining_record_len;
+} packet_chain;
 
 typedef struct queue_block_st{
 	int32_t len;
@@ -54,8 +60,6 @@ typedef struct data_queue_st {
 	queue_block *first_block;
 } data_queue;
 
-typedef struct packet_st packet;
-
 typedef struct session_st {
 	uint8_t session_id_len;
 	uint8_t session_id[SSL_MAX_SSL_SESSION_ID_LENGTH];
@@ -90,7 +94,8 @@ typedef struct flow_st {
 	stream_table *streams;
 	data_queue *downstream_queue;
 
-	packet *packet_chain; /* currently held data */
+	packet_chain *ds_packet_chain;
+	packet_chain *us_packet_chain;
 	sem_t packet_chain_lock;
 
 	queue_block *upstream_queue;
@@ -151,7 +156,7 @@ typedef struct flow_table_st {
 
 int init_tables(void);
 flow *add_flow(struct packet_info *info);
-int update_flow(flow *f);
+int update_flow(flow *f, uint8_t *record, uint8_t incoming);
 int remove_flow(flow *f);
 flow *check_flow(struct packet_info *info);
 flow *get_flow(int index);

+ 1 - 0
server/relay.c

@@ -242,6 +242,7 @@ int read_header(flow *f, struct packet_info *info){
 	
 	if(*header_ptr == '\r' || *header_ptr == '\0'){
 		printf("No messages\n");
+		free(decrypted_data);
 		return 0;
 	}
 	

+ 2 - 13
server/slitheen-proxy.c

@@ -134,7 +134,6 @@ end:
 	if((pcap_inject(handle, tmp_packet, header->len)) < 0 ){
 		fprintf(stderr, "Error: %s\n", pcap_geterr(handle));
 	}
-	free(info);//Note: don't free this while a thread is using it
 #ifdef DEBUG
 	fprintf(stderr, "injected the following packet:\n");
 	for(int i=0; i< header->len; i++){
@@ -150,6 +149,7 @@ end:
 	fprintf(stdout,"Acknowledgement number: %u\n", htonl(info->tcp_hdr->ack_num));
 	fflush(stdout);
 	}
+	free(info);//Note: don't free this while a thread is using it
 	free(tmp_packet);
 
 }
@@ -174,21 +174,10 @@ void process_packet(struct packet_info *info){
 		if(observed->application){
 			replace_packet(observed, info);
 		} else {
-			int32_t incoming = 
-			(info->ip_hdr->src.s_addr == observed->src_ip.s_addr) ? 0 : 1;
 
 			/* Pass data to packet chain */
-			if(incoming && (observed->downstream_seq_num <= htonl(info->tcp_hdr->sequence_num))){
-				add_packet(observed, info);
-			} else if ((!incoming) && (observed->upstream_seq_num <= htonl(info->tcp_hdr->sequence_num))){
-				add_packet(observed, info);
-			}
-
-			/* Update flow state */
-			if(observed->packet_chain != NULL){
-					update_flow(observed);
+			add_packet(observed, info);
 
-			}
 		}
 
 		/* Update TCP state */