#include #include #include #include #include // #define USE_NEW_HISTORY // static PyObject *py_push_data(PyObject *self, PyObject *args); static char push_data_docstring[] = "Send data as quickly as possible into a socket."; // static PyObject *py_pull_data(PyObject *self, PyObject *args); static char pull_data_docstring[] = "Receive data as quickly as possible from a socket."; // static char module_docstring[] = "This module provides accelerated functions which would perform slower in pure Python."; // static PyMethodDef module_methods[] = { {"push_data", py_push_data, METH_VARARGS, push_data_docstring}, {"pull_data", py_pull_data, METH_VARARGS, pull_data_docstring}, {NULL, NULL, 0, NULL} }; // static struct PyModuleDef _coremodule = { PyModuleDef_HEAD_INIT, "accelerated_functions", // name of module module_docstring, // module documentation, may be NULL -1, /* size of per-interpreter state of the module, or -1 if the module keeps state in global variables. */ module_methods, }; // PyMODINIT_FUNC PyInit_accelerated_functions(void){ return PyModule_Create(&_coremodule); } // long min(long num1, long num2){ return (num1 > num2) ? num2 : num1; } // int push_data(int socket, long bytes_total, char* buffer, int buffer_len){ long bytes_written = 0; // struct pollfd poll_fds[1]; int num_poll_fds = 0; // memset(poll_fds, 0, sizeof(poll_fds)); poll_fds[0].fd = socket; poll_fds[0].events = POLLOUT; num_poll_fds++; // while(bytes_written < bytes_total){ int rc = poll(poll_fds, num_poll_fds, 15*60*1000); // if(rc < 0){ return -1; }else if(rc == 0){ return -1; } // if(poll_fds[0].revents == 0){ continue; }else if(poll_fds[0].revents != POLLOUT){ return -1; } // long bytes_to_send = min(buffer_len, bytes_total-bytes_written); int n = send(poll_fds[0].fd, buffer, bytes_to_send, 0); // if(n < 0){ return -1; } // bytes_written += n; } // return 0; } // #ifdef USE_NEW_HISTORY typedef struct { unsigned long bytes; double timestamp; } byte_delta; int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr, double* time_last_ptr, byte_delta* deltas, size_t deltas_len, size_t* deltas_elements_needed){ #else int pull_data(int socket, long bytes_total, int buffer_len, double* time_first_ptr, double* time_last_ptr, unsigned long* byte_counter, size_t byte_counter_len, size_t* byte_counter_elements_needed, unsigned long* byte_counter_start){ #endif long bytes_read = 0; char* buffer = malloc(buffer_len); struct timeval time_of_first_byte, time_of_last_byte, time_current, time_elapsed; #ifdef USE_NEW_HISTORY #else struct timeval rounded_time_of_first_byte; #endif // struct pollfd poll_fds[1]; int num_poll_fds = 0; // if(buffer == NULL){ return -1; } // #ifdef USE_NEW_HISTORY *deltas_elements_needed = 0; #endif // memset(poll_fds, 0, sizeof(poll_fds)); poll_fds[0].fd = socket; poll_fds[0].events = POLLIN; num_poll_fds++; // while(bytes_read < bytes_total){ int rc = poll(poll_fds, num_poll_fds, 15*60*1000); // if(rc < 0){ printf("Here1\n"); free(buffer); return -1; }else if(rc == 0){ printf("Call to poll() timed out (bytes_read=%ld)\n", bytes_read); free(buffer); return -1; } // if(poll_fds[0].revents == 0){ continue; }else if(poll_fds[0].revents != POLLIN){ printf("Here3\n"); free(buffer); return -1; } // long bytes_to_recv = min(buffer_len, bytes_total-bytes_read); int n = recv(poll_fds[0].fd, buffer, bytes_to_recv, 0); // if(n < 0){ printf("Here4\n"); free(buffer); return -1; }else if(n == 0){ // the socket was closed gracefully, but before we finished reading all the data free(buffer); return -1; } // if(n > 0 && bytes_read == 0){ gettimeofday(&time_of_first_byte, NULL); #ifdef USE_NEW_HISTORY #else rounded_time_of_first_byte.tv_sec = time_of_first_byte.tv_sec; rounded_time_of_first_byte.tv_usec = 0; #endif } // if(n > 0){ gettimeofday(&time_current, NULL); #ifdef USE_NEW_HISTORY if(*deltas_elements_needed < deltas_len){ deltas[*deltas_elements_needed].bytes = n; deltas[*deltas_elements_needed].timestamp = time_current.tv_sec + time_current.tv_usec/(1000.0*1000.0); *deltas_elements_needed += 1; } #else timersub(&time_current, &rounded_time_of_first_byte, &time_elapsed); *byte_counter_elements_needed = time_elapsed.tv_sec+1; if(time_elapsed.tv_sec < byte_counter_len){ byte_counter[time_elapsed.tv_sec] += n; } #endif } // bytes_read += n; } // gettimeofday(&time_of_last_byte, NULL); *time_first_ptr = time_of_first_byte.tv_sec + time_of_first_byte.tv_usec/(1000.0*1000.0); *time_last_ptr = time_of_last_byte.tv_sec + time_of_last_byte.tv_usec/(1000.0*1000.0); #ifdef USE_NEW_HISTORY #else *byte_counter_start = rounded_time_of_first_byte.tv_sec; #endif // free(buffer); return 0; } // static PyObject *py_push_data(PyObject *self, PyObject *args){ PyObject *yerr_obj; int socket; long bytes_total; char* buffer = NULL; int buffer_len; // if(!PyArg_ParseTuple(args, "ily#", &socket, &bytes_total, &buffer, &buffer_len, &yerr_obj)){ return NULL; } // int ret_val; Py_BEGIN_ALLOW_THREADS // GIL is unlocked, but don't do expensive operations in // other threads or it might slow this one down ret_val = push_data(socket, bytes_total, buffer, buffer_len); Py_END_ALLOW_THREADS // PyObject* py_ret_val = PyLong_FromLong(ret_val); // return py_ret_val; } // static PyObject *py_pull_data(PyObject *self, PyObject *args){ PyObject *yerr_obj; int socket; long bytes_total; int buffer_len; // if(!PyArg_ParseTuple(args, "ili", &socket, &bytes_total, &buffer_len, &yerr_obj)){ return NULL; } // double time_of_first_byte = 0; double time_of_last_byte = 0; #ifdef USE_NEW_HISTORY byte_delta deltas[100000] = {0}; // 100000*16 B = ~1.5 MiB size_t deltas_elements_needed = 0; #else unsigned long byte_counter[60*10] = {0}; // record 10 minutes of data size_t byte_counter_elements_needed = 0; unsigned long byte_counter_start = 0; #endif int ret_val; // Py_BEGIN_ALLOW_THREADS // GIL is unlocked, but don't do expensive operations in // other threads or it might slow this one down #ifdef USE_NEW_HISTORY ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte, deltas, sizeof(deltas)/sizeof(deltas[0]), &deltas_elements_needed); #else ret_val = pull_data(socket, bytes_total, buffer_len, &time_of_first_byte, &time_of_last_byte, byte_counter, sizeof(byte_counter)/sizeof(byte_counter[0]), &byte_counter_elements_needed, &byte_counter_start); #endif Py_END_ALLOW_THREADS // #ifdef USE_NEW_HISTORY size_t deltas_elements_used = deltas_elements_needed; if(deltas_elements_used > sizeof(deltas)/sizeof(deltas[0])){ deltas_elements_used = sizeof(deltas)/sizeof(deltas[0]); } // PyObject* py_delta_bytes = PyList_New(deltas_elements_used); PyObject* py_delta_timestamps = PyList_New(deltas_elements_used); for(size_t i=0; i sizeof(byte_counter)/sizeof(byte_counter[0])){ byte_counter_elements_used = sizeof(byte_counter)/sizeof(byte_counter[0]); } // PyObject* py_byte_counter = PyList_New(byte_counter_elements_used); for(size_t i=0; i