#include #include #include #include // static PyObject *py_push_data(PyObject *self, PyObject *args); static char push_data_docstring[] = "Send data as quickly as possible into a socket."; // static PyObject *py_pull_data(PyObject *self, PyObject *args); static char pull_data_docstring[] = "Receive data as quickly as possible from a socket."; // static char module_docstring[] = "This module provides accelerated functions which would perform slower in pure Python."; // static PyMethodDef module_methods[] = { {"push_data", py_push_data, METH_VARARGS, push_data_docstring}, {"pull_data", py_pull_data, METH_VARARGS, pull_data_docstring}, {NULL, NULL, 0, NULL} }; // static struct PyModuleDef _coremodule = { PyModuleDef_HEAD_INIT, "accelerated_functions", // name of module module_docstring, // module documentation, may be NULL -1, /* size of per-interpreter state of the module, or -1 if the module keeps state in global variables. */ module_methods, }; // PyMODINIT_FUNC PyInit_accelerated_functions(void){ return PyModule_Create(&_coremodule); } // long min(long num1, long num2){ return (num1 > num2) ? num2 : num1; } // int push_data(int socket, long bytes_total, char* buffer, int buffer_len){ long bytes_written = 0; // struct pollfd poll_fds[1]; int num_poll_fds = 0; // memset(poll_fds, 0, sizeof(poll_fds)); poll_fds[0].fd = socket; poll_fds[0].events = POLLOUT; num_poll_fds++; // while(bytes_written < bytes_total){ int rc = poll(poll_fds, num_poll_fds, 1*60*1000); // if(rc < 0){ return -1; }else if(rc == 0){ return -1; } // if(poll_fds[0].revents == 0){ continue; }else if(poll_fds[0].revents != POLLOUT){ return -1; } // long bytes_to_send = min(buffer_len, bytes_total-bytes_written); int n = send(poll_fds[0].fd, buffer, bytes_to_send, 0); // if(n < 0){ return -1; } // bytes_written += n; } // return 0; } // int pull_data(int socket, long bytes_total, int buffer_len, double* time_ptr){ long bytes_read = 0; char* buffer = malloc(buffer_len); struct timeval time_of_first_byte, time_of_last_byte; // struct pollfd poll_fds[1]; int num_poll_fds = 0; // if(buffer == NULL){ return -1; } // memset(poll_fds, 0, sizeof(poll_fds)); poll_fds[0].fd = socket; poll_fds[0].events = POLLIN; num_poll_fds++; // while(bytes_read < bytes_total){ int rc = poll(poll_fds, num_poll_fds, 1*60*1000); // if(rc < 0){ printf("Here1\n"); free(buffer); return -1; }else if(rc == 0){ printf("Here2\n"); free(buffer); return -1; } // if(poll_fds[0].revents == 0){ continue; }else if(poll_fds[0].revents != POLLIN){ printf("Here3\n"); free(buffer); return -1; } // long bytes_to_recv = min(buffer_len, bytes_total-bytes_read); int n = recv(poll_fds[0].fd, buffer, bytes_to_recv, 0); // if(n < 0){ printf("Here4\n"); free(buffer); return -1; } // if(n > 0 && bytes_read == 0){ gettimeofday(&time_of_first_byte, NULL); } // bytes_read += n; } // gettimeofday(&time_of_last_byte, NULL); *time_ptr = (time_of_last_byte.tv_sec-time_of_first_byte.tv_sec) + (time_of_last_byte.tv_usec-time_of_first_byte.tv_usec)/(1000.0*1000.0); // free(buffer); return 0; } // static PyObject *py_push_data(PyObject *self, PyObject *args){ PyObject *yerr_obj; int socket; long bytes_total; char* buffer = NULL; int buffer_len; // if(!PyArg_ParseTuple(args, "ily#", &socket, &bytes_total, &buffer, &buffer_len, &yerr_obj)){ return NULL; } // int ret_val; Py_BEGIN_ALLOW_THREADS // GIL is unlocked, but don't do expensive operations in // other threads or it might slow this one down ret_val = push_data(socket, bytes_total, buffer, buffer_len); Py_END_ALLOW_THREADS // PyObject* py_ret_val = PyLong_FromLong(ret_val); // return py_ret_val; } // static PyObject *py_pull_data(PyObject *self, PyObject *args){ PyObject *yerr_obj; int socket; long bytes_total; int buffer_len; // if(!PyArg_ParseTuple(args, "ili", &socket, &bytes_total, &buffer_len, &yerr_obj)){ return NULL; } // double elapsed_time = 0; int ret_val; Py_BEGIN_ALLOW_THREADS // GIL is unlocked, but don't do expensive operations in // other threads or it might slow this one down ret_val = pull_data(socket, bytes_total, buffer_len, &elapsed_time); Py_END_ALLOW_THREADS // PyObject* py_ret_val = Py_BuildValue("(id)", ret_val, elapsed_time); // return py_ret_val; }