accelerated_functions.c 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. #include <string.h>
  2. #include <sys/time.h>
  3. #include <sys/poll.h>
  4. #include <sys/socket.h>
  5. #include <Python.h>
  6. //
  7. #define USE_NEW_HISTORY
  8. //
  9. static PyObject *py_push_data(PyObject *self, PyObject *args);
  10. static char push_data_docstring[] =
  11. "Send data as quickly as possible into a socket.";
  12. //
  13. static PyObject *py_pull_data(PyObject *self, PyObject *args);
  14. static char pull_data_docstring[] =
  15. "Receive data as quickly as possible from a socket.";
  16. //
  17. static char module_docstring[] =
  18. "This module provides accelerated functions which would perform slower in pure Python.";
  19. //
  20. static PyMethodDef module_methods[] = {
  21. {"push_data", py_push_data, METH_VARARGS, push_data_docstring},
  22. {"pull_data", py_pull_data, METH_VARARGS, pull_data_docstring},
  23. {NULL, NULL, 0, NULL}
  24. };
  25. //
  26. static struct PyModuleDef _coremodule = {
  27. PyModuleDef_HEAD_INIT,
  28. "accelerated_functions", // name of module
  29. module_docstring, // module documentation, may be NULL
  30. -1, /* size of per-interpreter state of the module,
  31. or -1 if the module keeps state in global variables. */
  32. module_methods,
  33. };
  34. //
  35. PyMODINIT_FUNC PyInit_accelerated_functions(void){
  36. return PyModule_Create(&_coremodule);
  37. }
  38. //
  39. long min(long num1, long num2){
  40. return (num1 > num2) ? num2 : num1;
  41. }
  42. //
  43. int push_data(int socket, long bytes_total, char* buffer, int buffer_len){
  44. long bytes_written = 0;
  45. //
  46. struct pollfd poll_fds[1];
  47. int num_poll_fds = 0;
  48. //
  49. memset(poll_fds, 0, sizeof(poll_fds));
  50. poll_fds[0].fd = socket;
  51. poll_fds[0].events = POLLOUT;
  52. num_poll_fds++;
  53. //
  54. while(bytes_written < bytes_total){
  55. int rc = poll(poll_fds, num_poll_fds, 15*60*1000);
  56. //
  57. if(rc < 0){
  58. return -1;
  59. }else if(rc == 0){
  60. return -1;
  61. }
  62. //
  63. if(poll_fds[0].revents == 0){
  64. continue;
  65. }else if(poll_fds[0].revents != POLLOUT){
  66. return -1;
  67. }
  68. //
  69. long bytes_to_send = min(buffer_len, bytes_total-bytes_written);
  70. int n = send(poll_fds[0].fd, buffer, bytes_to_send, 0);
  71. //
  72. if(n < 0){
  73. return -1;
  74. }
  75. //
  76. bytes_written += n;
  77. }
  78. //
  79. return 0;
  80. }
  81. //
  82. #ifdef USE_NEW_HISTORY
  83. typedef struct {
  84. unsigned long bytes;
  85. double timestamp;
  86. } byte_delta;
  87. int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr,
  88. double* time_last_ptr, byte_delta* deltas, size_t deltas_len,
  89. size_t* deltas_elements_needed){
  90. #else
  91. int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr,
  92. double* time_last_ptr, unsigned long* byte_counter,
  93. size_t byte_counter_len, size_t* byte_counter_elements_needed,
  94. unsigned long* byte_counter_start){
  95. #endif
  96. long bytes_read = 0;
  97. char* buffer = malloc(buffer_len);
  98. struct timeval time_of_first_byte, time_of_last_byte, time_current, time_elapsed;
  99. #ifdef USE_NEW_HISTORY
  100. #else
  101. struct timeval rounded_time_of_first_byte;
  102. #endif
  103. //
  104. struct pollfd poll_fds[1];
  105. int num_poll_fds = 0;
  106. //
  107. if(buffer == NULL){
  108. return -1;
  109. }
  110. //
  111. #ifdef USE_NEW_HISTORY
  112. *deltas_elements_needed = 0;
  113. #endif
  114. //
  115. memset(poll_fds, 0, sizeof(poll_fds));
  116. poll_fds[0].fd = socket;
  117. poll_fds[0].events = POLLIN;
  118. num_poll_fds++;
  119. //
  120. while(bytes_read < bytes_total){
  121. int rc = poll(poll_fds, num_poll_fds, 15*60*1000);
  122. //
  123. if(rc < 0){
  124. printf("Here1\n");
  125. free(buffer);
  126. return -1;
  127. }else if(rc == 0){
  128. printf("Call to poll() timed out (bytes_read=%ld)\n", bytes_read);
  129. free(buffer);
  130. return -1;
  131. }
  132. //
  133. if(poll_fds[0].revents == 0){
  134. continue;
  135. }else if(poll_fds[0].revents != POLLIN){
  136. printf("Here3\n");
  137. free(buffer);
  138. return -1;
  139. }
  140. //
  141. long bytes_to_recv = min(buffer_len, bytes_total-bytes_read);
  142. int n = recv(poll_fds[0].fd, buffer, bytes_to_recv, 0);
  143. //
  144. if(n < 0){
  145. printf("Here4\n");
  146. free(buffer);
  147. return -1;
  148. }else if(n == 0){
  149. // the socket was closed gracefully, but before we finished reading all the data
  150. free(buffer);
  151. return -1;
  152. }
  153. //
  154. if(n > 0 && bytes_read == 0){
  155. gettimeofday(&time_of_first_byte, NULL);
  156. #ifdef USE_NEW_HISTORY
  157. #else
  158. rounded_time_of_first_byte.tv_sec = time_of_first_byte.tv_sec;
  159. rounded_time_of_first_byte.tv_usec = 0;
  160. #endif
  161. }
  162. //
  163. if(n > 0){
  164. gettimeofday(&time_current, NULL);
  165. #ifdef USE_NEW_HISTORY
  166. if(*deltas_elements_needed < deltas_len){
  167. deltas[*deltas_elements_needed].bytes = n;
  168. deltas[*deltas_elements_needed].timestamp = time_current.tv_sec + time_current.tv_usec/(1000.0*1000.0);
  169. *deltas_elements_needed += 1;
  170. }
  171. #else
  172. timersub(&time_current, &rounded_time_of_first_byte, &time_elapsed);
  173. *byte_counter_elements_needed = time_elapsed.tv_sec+1;
  174. if(time_elapsed.tv_sec < byte_counter_len){
  175. byte_counter[time_elapsed.tv_sec] += n;
  176. }
  177. #endif
  178. }
  179. //
  180. bytes_read += n;
  181. }
  182. //
  183. gettimeofday(&time_of_last_byte, NULL);
  184. *time_first_ptr = time_of_first_byte.tv_sec + time_of_first_byte.tv_usec/(1000.0*1000.0);
  185. *time_last_ptr = time_of_last_byte.tv_sec + time_of_last_byte.tv_usec/(1000.0*1000.0);
  186. #ifdef USE_NEW_HISTORY
  187. #else
  188. *byte_counter_start = rounded_time_of_first_byte.tv_sec;
  189. #endif
  190. //
  191. free(buffer);
  192. return 0;
  193. }
  194. //
  195. static PyObject *py_push_data(PyObject *self, PyObject *args){
  196. PyObject *yerr_obj;
  197. int socket;
  198. long bytes_total;
  199. char* buffer = NULL;
  200. int buffer_len;
  201. //
  202. if(!PyArg_ParseTuple(args, "ily#", &socket, &bytes_total, &buffer, &buffer_len, &yerr_obj)){
  203. return NULL;
  204. }
  205. //
  206. int ret_val;
  207. Py_BEGIN_ALLOW_THREADS
  208. // GIL is unlocked, but don't do expensive operations in
  209. // other threads or it might slow this one down
  210. ret_val = push_data(socket, bytes_total, buffer, buffer_len);
  211. Py_END_ALLOW_THREADS
  212. //
  213. PyObject* py_ret_val = PyLong_FromLong(ret_val);
  214. //
  215. return py_ret_val;
  216. }
  217. //
  218. static PyObject *py_pull_data(PyObject *self, PyObject *args){
  219. PyObject *yerr_obj;
  220. int socket;
  221. long bytes_total;
  222. int buffer_len;
  223. //
  224. if(!PyArg_ParseTuple(args, "ili", &socket, &bytes_total, &buffer_len, &yerr_obj)){
  225. return NULL;
  226. }
  227. //
  228. double time_of_first_byte = 0;
  229. double time_of_last_byte = 0;
  230. #ifdef USE_NEW_HISTORY
  231. byte_delta deltas[100000] = {0}; // 100000*16 B = ~1.5 MiB
  232. size_t deltas_elements_needed = 0;
  233. #else
  234. unsigned long byte_counter[60*10] = {0}; // record 10 minutes of data
  235. size_t byte_counter_elements_needed = 0;
  236. unsigned long byte_counter_start = 0;
  237. #endif
  238. int ret_val;
  239. //
  240. Py_BEGIN_ALLOW_THREADS
  241. // GIL is unlocked, but don't do expensive operations in
  242. // other threads or it might slow this one down
  243. #ifdef USE_NEW_HISTORY
  244. ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte,
  245. deltas, sizeof(deltas)/sizeof(deltas[0]), &deltas_elements_needed);
  246. #else
  247. ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte,
  248. byte_counter, sizeof(byte_counter)/sizeof(byte_counter[0]),
  249. &byte_counter_elements_needed, &byte_counter_start);
  250. #endif
  251. Py_END_ALLOW_THREADS
  252. //
  253. #ifdef USE_NEW_HISTORY
  254. size_t deltas_elements_used = deltas_elements_needed;
  255. if(deltas_elements_used > sizeof(deltas)/sizeof(deltas[0])){
  256. deltas_elements_used = sizeof(deltas)/sizeof(deltas[0]);
  257. }
  258. //
  259. PyObject* py_delta_bytes = PyList_New(deltas_elements_used);
  260. PyObject* py_delta_timestamps = PyList_New(deltas_elements_used);
  261. for(size_t i=0; i<deltas_elements_used; i++){
  262. PyList_SetItem(py_delta_bytes, i, PyLong_FromLong(deltas[i].bytes));
  263. PyList_SetItem(py_delta_timestamps, i, PyFloat_FromDouble(deltas[i].timestamp));
  264. }
  265. #else
  266. size_t byte_counter_elements_used = byte_counter_elements_needed;
  267. if(byte_counter_elements_used > sizeof(byte_counter)/sizeof(byte_counter[0])){
  268. byte_counter_elements_used = sizeof(byte_counter)/sizeof(byte_counter[0]);
  269. }
  270. //
  271. PyObject* py_byte_counter = PyList_New(byte_counter_elements_used);
  272. for(size_t i=0; i<byte_counter_elements_used; i++){
  273. PyList_SetItem(py_byte_counter, i, PyLong_FromLong(byte_counter[i]));
  274. }
  275. #endif
  276. //
  277. #ifdef USE_NEW_HISTORY
  278. PyObject* py_ret_val = Py_BuildValue("(idd{sNsN})", ret_val, time_of_first_byte, time_of_last_byte, "bytes", py_delta_bytes, "timestamps", py_delta_timestamps);
  279. #else
  280. PyObject* py_ret_val = Py_BuildValue("(iddNi)", ret_val, time_of_first_byte, time_of_last_byte, py_byte_counter, byte_counter_start);
  281. #endif
  282. //
  283. return py_ret_val;
  284. }