accelerated_functions.c 7.8 KB


  1. #include <string.h>
  2. #include <sys/time.h>
  3. #include <sys/poll.h>
  4. #include <sys/socket.h>
  5. #include <Python.h>
  6. //
  7. #define USE_NEW_HISTORY
  8. //
  9. static PyObject *py_push_data(PyObject *self, PyObject *args);
  10. static char push_data_docstring[] =
  11. "Send data as quickly as possible into a socket.";
  12. //
  13. static PyObject *py_pull_data(PyObject *self, PyObject *args);
  14. static char pull_data_docstring[] =
  15. "Receive data as quickly as possible from a socket.";
  16. //
  17. static char module_docstring[] =
  18. "This module provides accelerated functions which would perform slower in pure Python.";
  19. //
  20. static PyMethodDef module_methods[] = {
  21. {"push_data", py_push_data, METH_VARARGS, push_data_docstring},
  22. {"pull_data", py_pull_data, METH_VARARGS, pull_data_docstring},
  23. {NULL, NULL, 0, NULL}
  24. };
  25. //
  26. static struct PyModuleDef _coremodule = {
  27. PyModuleDef_HEAD_INIT,
  28. "accelerated_functions", // name of module
  29. module_docstring, // module documentation, may be NULL
  30. -1, /* size of per-interpreter state of the module,
  31. or -1 if the module keeps state in global variables. */
  32. module_methods,
  33. };
  34. //
  35. PyMODINIT_FUNC PyInit_accelerated_functions(void){
  36. return PyModule_Create(&_coremodule);
  37. }
  38. //
  39. long min(long num1, long num2){
  40. return (num1 > num2) ? num2 : num1;
  41. }
  42. //
  43. int push_data(int socket, long bytes_total, char* buffer, int buffer_len){
  44. long bytes_written = 0;
  45. //
  46. struct pollfd poll_fds[1];
  47. int num_poll_fds = 0;
  48. //
  49. memset(poll_fds, 0, sizeof(poll_fds));
  50. poll_fds[0].fd = socket;
  51. poll_fds[0].events = POLLOUT;
  52. num_poll_fds++;
  53. //
  54. while(bytes_written < bytes_total){
  55. int rc = poll(poll_fds, num_poll_fds, 1*60*1000);
  56. //
  57. if(rc < 0){
  58. return -1;
  59. }else if(rc == 0){
  60. return -1;
  61. }
  62. //
  63. if(poll_fds[0].revents == 0){
  64. continue;
  65. }else if(poll_fds[0].revents != POLLOUT){
  66. return -1;
  67. }
  68. //
  69. long bytes_to_send = min(buffer_len, bytes_total-bytes_written);
  70. int n = send(poll_fds[0].fd, buffer, bytes_to_send, 0);
  71. //
  72. if(n < 0){
  73. return -1;
  74. }
  75. //
  76. bytes_written += n;
  77. }
  78. //
  79. return 0;
  80. }
  81. //
  82. #ifdef USE_NEW_HISTORY
  83. typedef struct {
  84. unsigned long bytes;
  85. double timestamp;
  86. } byte_delta;
  87. int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr,
  88. double* time_last_ptr, byte_delta* deltas, size_t deltas_len,
  89. size_t* deltas_elements_needed){
  90. #else
  91. int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr,
  92. double* time_last_ptr, unsigned long* byte_counter,
  93. size_t byte_counter_len, size_t* byte_counter_elements_needed,
  94. unsigned long* byte_counter_start){
  95. #endif
  96. long bytes_read = 0;
  97. char* buffer = malloc(buffer_len);
  98. struct timeval time_of_first_byte, time_of_last_byte, time_current, time_elapsed;
  99. #ifdef USE_NEW_HISTORY
  100. #else
  101. struct timeval rounded_time_of_first_byte;
  102. #endif
  103. //
  104. struct pollfd poll_fds[1];
  105. int num_poll_fds = 0;
  106. //
  107. if(buffer == NULL){
  108. return -1;
  109. }
  110. //
  111. #ifdef USE_NEW_HISTORY
  112. *deltas_elements_needed = 0;
  113. #endif
  114. //
  115. memset(poll_fds, 0, sizeof(poll_fds));
  116. poll_fds[0].fd = socket;
  117. poll_fds[0].events = POLLIN;
  118. num_poll_fds++;
  119. //
  120. while(bytes_read < bytes_total){
  121. int rc = poll(poll_fds, num_poll_fds, 1*60*1000);
  122. //
  123. if(rc < 0){
  124. printf("Here1\n");
  125. free(buffer);
  126. return -1;
  127. }else if(rc == 0){
  128. printf("Here2\n");
  129. free(buffer);
  130. return -1;
  131. }
  132. //
  133. if(poll_fds[0].revents == 0){
  134. continue;
  135. }else if(poll_fds[0].revents != POLLIN){
  136. printf("Here3\n");
  137. free(buffer);
  138. return -1;
  139. }
  140. //
  141. long bytes_to_recv = min(buffer_len, bytes_total-bytes_read);
  142. int n = recv(poll_fds[0].fd, buffer, bytes_to_recv, 0);
  143. //
  144. if(n < 0){
  145. printf("Here4\n");
  146. free(buffer);
  147. return -1;
  148. }
  149. //
  150. if(n > 0 && bytes_read == 0){
  151. gettimeofday(&time_of_first_byte, NULL);
  152. #ifdef USE_NEW_HISTORY
  153. #else
  154. rounded_time_of_first_byte.tv_sec = time_of_first_byte.tv_sec;
  155. rounded_time_of_first_byte.tv_usec = 0;
  156. #endif
  157. }
  158. //
  159. if(n > 0){
  160. gettimeofday(&time_current, NULL);
  161. #ifdef USE_NEW_HISTORY
  162. if(*deltas_elements_needed < deltas_len){
  163. deltas[*deltas_elements_needed].bytes = n;
  164. deltas[*deltas_elements_needed].timestamp = time_current.tv_sec + time_current.tv_usec/(1000.0*1000.0);
  165. *deltas_elements_needed += 1;
  166. }
  167. #else
  168. timersub(&time_current, &rounded_time_of_first_byte, &time_elapsed);
  169. *byte_counter_elements_needed = time_elapsed.tv_sec+1;
  170. if(time_elapsed.tv_sec < byte_counter_len){
  171. byte_counter[time_elapsed.tv_sec] += n;
  172. }
  173. #endif
  174. }
  175. //
  176. bytes_read += n;
  177. }
  178. //
  179. gettimeofday(&time_of_last_byte, NULL);
  180. *time_first_ptr = time_of_first_byte.tv_sec + time_of_first_byte.tv_usec/(1000.0*1000.0);
  181. *time_last_ptr = time_of_last_byte.tv_sec + time_of_last_byte.tv_usec/(1000.0*1000.0);
  182. #ifdef USE_NEW_HISTORY
  183. #else
  184. *byte_counter_start = rounded_time_of_first_byte.tv_sec;
  185. #endif
  186. //
  187. free(buffer);
  188. return 0;
  189. }
  190. //
  191. static PyObject *py_push_data(PyObject *self, PyObject *args){
  192. PyObject *yerr_obj;
  193. int socket;
  194. long bytes_total;
  195. char* buffer = NULL;
  196. int buffer_len;
  197. //
  198. if(!PyArg_ParseTuple(args, "ily#", &socket, &bytes_total, &buffer, &buffer_len, &yerr_obj)){
  199. return NULL;
  200. }
  201. //
  202. int ret_val;
  203. Py_BEGIN_ALLOW_THREADS
  204. // GIL is unlocked, but don't do expensive operations in
  205. // other threads or it might slow this one down
  206. ret_val = push_data(socket, bytes_total, buffer, buffer_len);
  207. Py_END_ALLOW_THREADS
  208. //
  209. PyObject* py_ret_val = PyLong_FromLong(ret_val);
  210. //
  211. return py_ret_val;
  212. }
  213. //
  214. static PyObject *py_pull_data(PyObject *self, PyObject *args){
  215. PyObject *yerr_obj;
  216. int socket;
  217. long bytes_total;
  218. int buffer_len;
  219. //
  220. if(!PyArg_ParseTuple(args, "ili", &socket, &bytes_total, &buffer_len, &yerr_obj)){
  221. return NULL;
  222. }
  223. //
  224. double time_of_first_byte = 0;
  225. double time_of_last_byte = 0;
  226. #ifdef USE_NEW_HISTORY
  227. byte_delta deltas[20000] = {0};
  228. size_t deltas_elements_needed = 0;
  229. #else
  230. unsigned long byte_counter[60*10] = {0}; // record 10 minutes of data
  231. size_t byte_counter_elements_needed = 0;
  232. unsigned long byte_counter_start = 0;
  233. #endif
  234. int ret_val;
  235. //
  236. Py_BEGIN_ALLOW_THREADS
  237. // GIL is unlocked, but don't do expensive operations in
  238. // other threads or it might slow this one down
  239. #ifdef USE_NEW_HISTORY
  240. ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte,
  241. deltas, sizeof(deltas)/sizeof(deltas[0]), &deltas_elements_needed);
  242. #else
  243. ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte,
  244. byte_counter, sizeof(byte_counter)/sizeof(byte_counter[0]),
  245. &byte_counter_elements_needed, &byte_counter_start);
  246. #endif
  247. Py_END_ALLOW_THREADS
  248. //
  249. #ifdef USE_NEW_HISTORY
  250. size_t deltas_elements_used = deltas_elements_needed;
  251. if(deltas_elements_used > sizeof(deltas)/sizeof(deltas[0])){
  252. deltas_elements_used = sizeof(deltas)/sizeof(deltas[0]);
  253. }
  254. //
  255. PyObject* py_delta_bytes = PyList_New(deltas_elements_used);
  256. PyObject* py_delta_timestamps = PyList_New(deltas_elements_used);
  257. for(size_t i=0; i<deltas_elements_used; i++){
  258. PyList_SetItem(py_delta_bytes, i, PyLong_FromLong(deltas[i].bytes));
  259. PyList_SetItem(py_delta_timestamps, i, PyFloat_FromDouble(deltas[i].timestamp));
  260. }
  261. #else
  262. size_t byte_counter_elements_used = byte_counter_elements_needed;
  263. if(byte_counter_elements_used > sizeof(byte_counter)/sizeof(byte_counter[0])){
  264. byte_counter_elements_used = sizeof(byte_counter)/sizeof(byte_counter[0]);
  265. }
  266. //
  267. PyObject* py_byte_counter = PyList_New(byte_counter_elements_used);
  268. for(size_t i=0; i<byte_counter_elements_used; i++){
  269. PyList_SetItem(py_byte_counter, i, PyLong_FromLong(byte_counter[i]));
  270. }
  271. #endif
  272. //
  273. #ifdef USE_NEW_HISTORY
  274. PyObject* py_ret_val = Py_BuildValue("(idd{sNsN})", ret_val, time_of_first_byte, time_of_last_byte, "bytes", py_delta_bytes, "timestamps", py_delta_timestamps);
  275. #else
  276. PyObject* py_ret_val = Py_BuildValue("(iddNi)", ret_val, time_of_first_byte, time_of_last_byte, py_byte_counter, byte_counter_start);
  277. #endif
  278. //
  279. return py_ret_val;
  280. }