accelerated_functions.c 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #include <string.h>
  2. #include <sys/poll.h>
  3. #include <sys/socket.h>
  4. #include <Python.h>
  5. //
  6. static PyObject *py_push_data(PyObject *self, PyObject *args);
  7. static char push_data_docstring[] =
  8. "Send data as quickly as possible into a socket.";
  9. //
  10. static PyObject *py_pull_data(PyObject *self, PyObject *args);
  11. static char pull_data_docstring[] =
  12. "Receive data as quickly as possible from a socket.";
  13. //
  14. static char module_docstring[] =
  15. "This module provides accelerated functions which would perform slower in pure Python.";
  16. //
  17. static PyMethodDef module_methods[] = {
  18. {"push_data", py_push_data, METH_VARARGS, push_data_docstring},
  19. {"pull_data", py_pull_data, METH_VARARGS, pull_data_docstring},
  20. {NULL, NULL, 0, NULL}
  21. };
  22. //
  23. static struct PyModuleDef _coremodule = {
  24. PyModuleDef_HEAD_INIT,
  25. "accelerated_functions", // name of module
  26. module_docstring, // module documentation, may be NULL
  27. -1, /* size of per-interpreter state of the module,
  28. or -1 if the module keeps state in global variables. */
  29. module_methods,
  30. };
  31. //
  32. PyMODINIT_FUNC PyInit_accelerated_functions(void){
  33. return PyModule_Create(&_coremodule);
  34. }
  35. //
  36. long min(long num1, long num2){
  37. return (num1 > num2) ? num2 : num1;
  38. }
  39. //
  40. int push_data(int socket, long bytes_total, char* buffer, int buffer_len){
  41. long bytes_written = 0;
  42. //
  43. struct pollfd poll_fds[1];
  44. int num_poll_fds = 0;
  45. //
  46. memset(poll_fds, 0, sizeof(poll_fds));
  47. poll_fds[0].fd = socket;
  48. poll_fds[0].events = POLLOUT;
  49. num_poll_fds++;
  50. //
  51. while(bytes_written < bytes_total){
  52. int rc = poll(poll_fds, num_poll_fds, 1*60*1000);
  53. //
  54. if(rc < 0){
  55. return -1;
  56. }else if(rc == 0){
  57. return -1;
  58. }
  59. //
  60. if(poll_fds[0].revents == 0){
  61. continue;
  62. }else if(poll_fds[0].revents != POLLOUT){
  63. return -1;
  64. }
  65. //
  66. long bytes_to_send = min(buffer_len, bytes_total-bytes_written);
  67. int n = send(poll_fds[0].fd, buffer, bytes_to_send, 0);
  68. //
  69. if(n < 0){
  70. return -1;
  71. }
  72. //
  73. bytes_written += n;
  74. }
  75. //
  76. return 0;
  77. }
  78. //
  79. int pull_data(int socket, long bytes_total, int buffer_len, double* time_ptr){
  80. long bytes_read = 0;
  81. char* buffer = malloc(buffer_len);
  82. struct timeval time_of_first_byte, time_of_last_byte;
  83. //
  84. struct pollfd poll_fds[1];
  85. int num_poll_fds = 0;
  86. //
  87. if(buffer == NULL){
  88. return -1;
  89. }
  90. //
  91. memset(poll_fds, 0, sizeof(poll_fds));
  92. poll_fds[0].fd = socket;
  93. poll_fds[0].events = POLLIN;
  94. num_poll_fds++;
  95. //
  96. while(bytes_read < bytes_total){
  97. int rc = poll(poll_fds, num_poll_fds, 1*60*1000);
  98. //
  99. if(rc < 0){
  100. printf("Here1\n");
  101. free(buffer);
  102. return -1;
  103. }else if(rc == 0){
  104. printf("Here2\n");
  105. free(buffer);
  106. return -1;
  107. }
  108. //
  109. if(poll_fds[0].revents == 0){
  110. continue;
  111. }else if(poll_fds[0].revents != POLLIN){
  112. printf("Here3\n");
  113. free(buffer);
  114. return -1;
  115. }
  116. //
  117. long bytes_to_recv = min(buffer_len, bytes_total-bytes_read);
  118. int n = recv(poll_fds[0].fd, buffer, bytes_to_recv, 0);
  119. //
  120. if(n < 0){
  121. printf("Here4\n");
  122. free(buffer);
  123. return -1;
  124. }
  125. //
  126. if(n > 0 && bytes_read == 0){
  127. gettimeofday(&time_of_first_byte, NULL);
  128. }
  129. //
  130. bytes_read += n;
  131. }
  132. //
  133. gettimeofday(&time_of_last_byte, NULL);
  134. *time_ptr = (time_of_last_byte.tv_sec-time_of_first_byte.tv_sec) + (time_of_last_byte.tv_usec-time_of_first_byte.tv_usec)/(1000.0*1000.0);
  135. //
  136. free(buffer);
  137. return 0;
  138. }
  139. //
  140. static PyObject *py_push_data(PyObject *self, PyObject *args){
  141. PyObject *yerr_obj;
  142. int socket;
  143. long bytes_total;
  144. char* buffer = NULL;
  145. int buffer_len;
  146. //
  147. if(!PyArg_ParseTuple(args, "ily#", &socket, &bytes_total, &buffer, &buffer_len, &yerr_obj)){
  148. return NULL;
  149. }
  150. //
  151. int ret_val;
  152. Py_BEGIN_ALLOW_THREADS
  153. // GIL is unlocked, but don't do expensive operations in
  154. // other threads or it might slow this one down
  155. ret_val = push_data(socket, bytes_total, buffer, buffer_len);
  156. Py_END_ALLOW_THREADS
  157. //
  158. PyObject* py_ret_val = PyLong_FromLong(ret_val);
  159. //
  160. return py_ret_val;
  161. }
  162. //
  163. static PyObject *py_pull_data(PyObject *self, PyObject *args){
  164. PyObject *yerr_obj;
  165. int socket;
  166. long bytes_total;
  167. int buffer_len;
  168. //
  169. if(!PyArg_ParseTuple(args, "ili", &socket, &bytes_total, &buffer_len, &yerr_obj)){
  170. return NULL;
  171. }
  172. //
  173. double elapsed_time = 0;
  174. int ret_val;
  175. Py_BEGIN_ALLOW_THREADS
  176. // GIL is unlocked, but don't do expensive operations in
  177. // other threads or it might slow this one down
  178. ret_val = pull_data(socket, bytes_total, buffer_len, &elapsed_time);
  179. Py_END_ALLOW_THREADS
  180. //
  181. PyObject* py_ret_val = Py_BuildValue("(id)", ret_val, elapsed_time);
  182. //
  183. return py_ret_val;
  184. }