123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- #include <string.h>
- #include <sys/poll.h>
- #include <sys/socket.h>
- #include <Python.h>
- //
- static PyObject *py_push_data(PyObject *self, PyObject *args);
- static char push_data_docstring[] =
- "Send data as quickly as possible into a socket.";
- //
- static PyObject *py_pull_data(PyObject *self, PyObject *args);
- static char pull_data_docstring[] =
- "Receive data as quickly as possible from a socket.";
- //
- static char module_docstring[] =
- "This module provides accelerated functions which would perform slower in pure Python.";
- //
- static PyMethodDef module_methods[] = {
- {"push_data", py_push_data, METH_VARARGS, push_data_docstring},
- {"pull_data", py_pull_data, METH_VARARGS, pull_data_docstring},
- {NULL, NULL, 0, NULL}
- };
- //
- static struct PyModuleDef _coremodule = {
- PyModuleDef_HEAD_INIT,
- "accelerated_functions", // name of module
- module_docstring, // module documentation, may be NULL
- -1, /* size of per-interpreter state of the module,
- or -1 if the module keeps state in global variables. */
- module_methods,
- };
- //
- PyMODINIT_FUNC PyInit_accelerated_functions(void){
- return PyModule_Create(&_coremodule);
- }
- //
- long min(long num1, long num2){
- return (num1 > num2) ? num2 : num1;
- }
- //
- int push_data(int socket, long bytes_total, char* buffer, int buffer_len){
- long bytes_written = 0;
- //
- struct pollfd poll_fds[1];
- int num_poll_fds = 0;
- //
- memset(poll_fds, 0, sizeof(poll_fds));
- poll_fds[0].fd = socket;
- poll_fds[0].events = POLLOUT;
- num_poll_fds++;
- //
- while(bytes_written < bytes_total){
- int rc = poll(poll_fds, num_poll_fds, 1*60*1000);
- //
- if(rc < 0){
- return -1;
- }else if(rc == 0){
- return -1;
- }
- //
- if(poll_fds[0].revents == 0){
- continue;
- }else if(poll_fds[0].revents != POLLOUT){
- return -1;
- }
- //
- long bytes_to_send = min(buffer_len, bytes_total-bytes_written);
- int n = send(poll_fds[0].fd, buffer, bytes_to_send, 0);
- //
- if(n < 0){
- return -1;
- }
- //
- bytes_written += n;
- }
- //
- return 0;
- }
- //
- int pull_data(int socket, long bytes_total, int buffer_len, double* time_ptr){
- long bytes_read = 0;
- char* buffer = malloc(buffer_len);
- struct timeval time_of_first_byte, time_of_last_byte;
- //
- struct pollfd poll_fds[1];
- int num_poll_fds = 0;
- //
- if(buffer == NULL){
- return -1;
- }
- //
- memset(poll_fds, 0, sizeof(poll_fds));
- poll_fds[0].fd = socket;
- poll_fds[0].events = POLLIN;
- num_poll_fds++;
- //
- while(bytes_read < bytes_total){
- int rc = poll(poll_fds, num_poll_fds, 1*60*1000);
- //
- if(rc < 0){
- printf("Here1\n");
- free(buffer);
- return -1;
- }else if(rc == 0){
- printf("Here2\n");
- free(buffer);
- return -1;
- }
- //
- if(poll_fds[0].revents == 0){
- continue;
- }else if(poll_fds[0].revents != POLLIN){
- printf("Here3\n");
- free(buffer);
- return -1;
- }
- //
- long bytes_to_recv = min(buffer_len, bytes_total-bytes_read);
- int n = recv(poll_fds[0].fd, buffer, bytes_to_recv, 0);
- //
- if(n < 0){
- printf("Here4\n");
- free(buffer);
- return -1;
- }
- //
- if(n > 0 && bytes_read == 0){
- gettimeofday(&time_of_first_byte, NULL);
- }
- //
- bytes_read += n;
- }
- //
- gettimeofday(&time_of_last_byte, NULL);
- *time_ptr = (time_of_last_byte.tv_sec-time_of_first_byte.tv_sec) + (time_of_last_byte.tv_usec-time_of_first_byte.tv_usec)/(1000.0*1000.0);
- //
- free(buffer);
- return 0;
- }
- //
- static PyObject *py_push_data(PyObject *self, PyObject *args){
- PyObject *yerr_obj;
- int socket;
- long bytes_total;
- char* buffer = NULL;
- int buffer_len;
- //
- if(!PyArg_ParseTuple(args, "ily#", &socket, &bytes_total, &buffer, &buffer_len, &yerr_obj)){
- return NULL;
- }
- //
- int ret_val = push_data(socket, bytes_total, buffer, buffer_len);
- PyObject* py_ret_val = PyLong_FromLong(ret_val);
- //
- return py_ret_val;
- }
- //
- static PyObject *py_pull_data(PyObject *self, PyObject *args){
- PyObject *yerr_obj;
- int socket;
- long bytes_total;
- int buffer_len;
- //
- if(!PyArg_ParseTuple(args, "ili", &socket, &bytes_total, &buffer_len, &yerr_obj)){
- return NULL;
- }
- //
- double elapsed_time = 0;
- int ret_val = pull_data(socket, bytes_total, buffer_len, &elapsed_time);
- PyObject* py_ret_val = Py_BuildValue("(id)", ret_val, elapsed_time);
- //
- return py_ret_val;
- }
|