123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285 |
- #include <string.h>
- #include <sys/time.h>
- #include <sys/poll.h>
- #include <sys/socket.h>
- #include <Python.h>
- //
- #define USE_NEW_HISTORY
- //
- static PyObject *py_push_data(PyObject *self, PyObject *args);
- static char push_data_docstring[] =
- "Send data as quickly as possible into a socket.";
- //
- static PyObject *py_pull_data(PyObject *self, PyObject *args);
- static char pull_data_docstring[] =
- "Receive data as quickly as possible from a socket.";
- //
- static char module_docstring[] =
- "This module provides accelerated functions which would perform slower in pure Python.";
- //
- static PyMethodDef module_methods[] = {
- {"push_data", py_push_data, METH_VARARGS, push_data_docstring},
- {"pull_data", py_pull_data, METH_VARARGS, pull_data_docstring},
- {NULL, NULL, 0, NULL}
- };
- //
- static struct PyModuleDef _coremodule = {
- PyModuleDef_HEAD_INIT,
- "accelerated_functions", // name of module
- module_docstring, // module documentation, may be NULL
- -1, /* size of per-interpreter state of the module,
- or -1 if the module keeps state in global variables. */
- module_methods,
- };
- //
- PyMODINIT_FUNC PyInit_accelerated_functions(void){
- return PyModule_Create(&_coremodule);
- }
- //
- long min(long num1, long num2){
- return (num1 > num2) ? num2 : num1;
- }
- //
- int push_data(int socket, long bytes_total, char* buffer, int buffer_len){
- long bytes_written = 0;
- //
- struct pollfd poll_fds[1];
- int num_poll_fds = 0;
- //
- memset(poll_fds, 0, sizeof(poll_fds));
- poll_fds[0].fd = socket;
- poll_fds[0].events = POLLOUT;
- num_poll_fds++;
- //
- while(bytes_written < bytes_total){
- int rc = poll(poll_fds, num_poll_fds, 15*60*1000);
- //
- if(rc < 0){
- return -1;
- }else if(rc == 0){
- return -1;
- }
- //
- if(poll_fds[0].revents == 0){
- continue;
- }else if(poll_fds[0].revents != POLLOUT){
- return -1;
- }
- //
- long bytes_to_send = min(buffer_len, bytes_total-bytes_written);
- int n = send(poll_fds[0].fd, buffer, bytes_to_send, 0);
- //
- if(n < 0){
- return -1;
- }
- //
- bytes_written += n;
- }
- //
- return 0;
- }
- //
- #ifdef USE_NEW_HISTORY
- typedef struct {
- unsigned long bytes;
- double timestamp;
- } byte_delta;
- int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr,
- double* time_last_ptr, byte_delta* deltas, size_t deltas_len,
- size_t* deltas_elements_needed){
- #else
- int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr,
- double* time_last_ptr, unsigned long* byte_counter,
- size_t byte_counter_len, size_t* byte_counter_elements_needed,
- unsigned long* byte_counter_start){
- #endif
- long bytes_read = 0;
- char* buffer = malloc(buffer_len);
- struct timeval time_of_first_byte, time_of_last_byte, time_current, time_elapsed;
- #ifdef USE_NEW_HISTORY
- #else
- struct timeval rounded_time_of_first_byte;
- #endif
- //
- struct pollfd poll_fds[1];
- int num_poll_fds = 0;
- //
- if(buffer == NULL){
- return -1;
- }
- //
- #ifdef USE_NEW_HISTORY
- *deltas_elements_needed = 0;
- #endif
- //
- memset(poll_fds, 0, sizeof(poll_fds));
- poll_fds[0].fd = socket;
- poll_fds[0].events = POLLIN;
- num_poll_fds++;
- //
- while(bytes_read < bytes_total){
- int rc = poll(poll_fds, num_poll_fds, 15*60*1000);
- //
- if(rc < 0){
- printf("Here1\n");
- free(buffer);
- return -1;
- }else if(rc == 0){
- printf("Call to poll() timed out (bytes_read=%ld)\n", bytes_read);
- free(buffer);
- return -1;
- }
- //
- if(poll_fds[0].revents == 0){
- continue;
- }else if(poll_fds[0].revents != POLLIN){
- printf("Here3\n");
- free(buffer);
- return -1;
- }
- //
- long bytes_to_recv = min(buffer_len, bytes_total-bytes_read);
- int n = recv(poll_fds[0].fd, buffer, bytes_to_recv, 0);
- //
- if(n < 0){
- printf("Here4\n");
- free(buffer);
- return -1;
- }else if(n == 0){
- // the socket was closed gracefully, but before we finished reading all the data
- free(buffer);
- return -1;
- }
- //
- if(n > 0 && bytes_read == 0){
- gettimeofday(&time_of_first_byte, NULL);
- #ifdef USE_NEW_HISTORY
- #else
- rounded_time_of_first_byte.tv_sec = time_of_first_byte.tv_sec;
- rounded_time_of_first_byte.tv_usec = 0;
- #endif
- }
- //
- if(n > 0){
- gettimeofday(&time_current, NULL);
- #ifdef USE_NEW_HISTORY
- if(*deltas_elements_needed < deltas_len){
- deltas[*deltas_elements_needed].bytes = n;
- deltas[*deltas_elements_needed].timestamp = time_current.tv_sec + time_current.tv_usec/(1000.0*1000.0);
- *deltas_elements_needed += 1;
- }
- #else
- timersub(&time_current, &rounded_time_of_first_byte, &time_elapsed);
- *byte_counter_elements_needed = time_elapsed.tv_sec+1;
- if(time_elapsed.tv_sec < byte_counter_len){
- byte_counter[time_elapsed.tv_sec] += n;
- }
- #endif
- }
- //
- bytes_read += n;
- }
- //
- gettimeofday(&time_of_last_byte, NULL);
- *time_first_ptr = time_of_first_byte.tv_sec + time_of_first_byte.tv_usec/(1000.0*1000.0);
- *time_last_ptr = time_of_last_byte.tv_sec + time_of_last_byte.tv_usec/(1000.0*1000.0);
- #ifdef USE_NEW_HISTORY
- #else
- *byte_counter_start = rounded_time_of_first_byte.tv_sec;
- #endif
- //
- free(buffer);
- return 0;
- }
- //
- static PyObject *py_push_data(PyObject *self, PyObject *args){
- PyObject *yerr_obj;
- int socket;
- long bytes_total;
- char* buffer = NULL;
- int buffer_len;
- //
- if(!PyArg_ParseTuple(args, "ily#", &socket, &bytes_total, &buffer, &buffer_len, &yerr_obj)){
- return NULL;
- }
- //
- int ret_val;
- Py_BEGIN_ALLOW_THREADS
- // GIL is unlocked, but don't do expensive operations in
- // other threads or it might slow this one down
- ret_val = push_data(socket, bytes_total, buffer, buffer_len);
- Py_END_ALLOW_THREADS
- //
- PyObject* py_ret_val = PyLong_FromLong(ret_val);
- //
- return py_ret_val;
- }
- //
- static PyObject *py_pull_data(PyObject *self, PyObject *args){
- PyObject *yerr_obj;
- int socket;
- long bytes_total;
- int buffer_len;
- //
- if(!PyArg_ParseTuple(args, "ili", &socket, &bytes_total, &buffer_len, &yerr_obj)){
- return NULL;
- }
- //
- double time_of_first_byte = 0;
- double time_of_last_byte = 0;
- #ifdef USE_NEW_HISTORY
- byte_delta deltas[100000] = {0}; // 100000*16 B = ~1.5 MiB
- size_t deltas_elements_needed = 0;
- #else
- unsigned long byte_counter[60*10] = {0}; // record 10 minutes of data
- size_t byte_counter_elements_needed = 0;
- unsigned long byte_counter_start = 0;
- #endif
- int ret_val;
- //
- Py_BEGIN_ALLOW_THREADS
- // GIL is unlocked, but don't do expensive operations in
- // other threads or it might slow this one down
- #ifdef USE_NEW_HISTORY
- ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte,
- deltas, sizeof(deltas)/sizeof(deltas[0]), &deltas_elements_needed);
- #else
- ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte,
- byte_counter, sizeof(byte_counter)/sizeof(byte_counter[0]),
- &byte_counter_elements_needed, &byte_counter_start);
- #endif
- Py_END_ALLOW_THREADS
- //
- #ifdef USE_NEW_HISTORY
- size_t deltas_elements_used = deltas_elements_needed;
- if(deltas_elements_used > sizeof(deltas)/sizeof(deltas[0])){
- deltas_elements_used = sizeof(deltas)/sizeof(deltas[0]);
- }
- //
- PyObject* py_delta_bytes = PyList_New(deltas_elements_used);
- PyObject* py_delta_timestamps = PyList_New(deltas_elements_used);
- for(size_t i=0; i<deltas_elements_used; i++){
- PyList_SetItem(py_delta_bytes, i, PyLong_FromLong(deltas[i].bytes));
- PyList_SetItem(py_delta_timestamps, i, PyFloat_FromDouble(deltas[i].timestamp));
- }
- #else
- size_t byte_counter_elements_used = byte_counter_elements_needed;
- if(byte_counter_elements_used > sizeof(byte_counter)/sizeof(byte_counter[0])){
- byte_counter_elements_used = sizeof(byte_counter)/sizeof(byte_counter[0]);
- }
- //
- PyObject* py_byte_counter = PyList_New(byte_counter_elements_used);
- for(size_t i=0; i<byte_counter_elements_used; i++){
- PyList_SetItem(py_byte_counter, i, PyLong_FromLong(byte_counter[i]));
- }
- #endif
- //
- #ifdef USE_NEW_HISTORY
- PyObject* py_ret_val = Py_BuildValue("(idd{sNsN})", ret_val, time_of_first_byte, time_of_last_byte, "bytes", py_delta_bytes, "timestamps", py_delta_timestamps);
- #else
- PyObject* py_ret_val = Py_BuildValue("(iddNi)", ret_val, time_of_first_byte, time_of_last_byte, py_byte_counter, byte_counter_start);
- #endif
- //
- return py_ret_val;
- }
|