|
@@ -172,6 +172,7 @@ event_listener_new(void *context)
|
|
TOR_TAILQ_INIT(&listener->pending_events);
|
|
TOR_TAILQ_INIT(&listener->pending_events);
|
|
listener->context = context;
|
|
listener->context = context;
|
|
listener->eventloop_ev = NULL;
|
|
listener->eventloop_ev = NULL;
|
|
|
|
+ listener->max_iterations = -1;
|
|
|
|
|
|
return listener;
|
|
return listener;
|
|
}
|
|
}
|
|
@@ -211,6 +212,18 @@ event_listener_free(event_listener_t *listener)
|
|
tor_free(listener);
|
|
tor_free(listener);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+void
|
|
|
|
+event_listener_set_max_iterations(event_listener_t *listener, int max_iterations)
|
|
|
|
+{
|
|
|
|
+ tor_assert(listener != NULL);
|
|
|
|
+
|
|
|
|
+ tor_mutex_acquire(&listener->lock);
|
|
|
|
+
|
|
|
|
+ listener->max_iterations = max_iterations;
|
|
|
|
+
|
|
|
|
+ tor_mutex_release(&listener->lock);
|
|
|
|
+}
|
|
|
|
+
|
|
void
|
|
void
|
|
event_listener_attach(event_listener_t *listener, struct event_base *base)
|
|
event_listener_attach(event_listener_t *listener, struct event_base *base)
|
|
{
|
|
{
|
|
@@ -362,11 +375,15 @@ event_listener_process(event_listener_t *listener)
|
|
{
|
|
{
|
|
tor_assert(listener != NULL);
|
|
tor_assert(listener != NULL);
|
|
|
|
|
|
|
|
+ int counter = 0;
|
|
|
|
+
|
|
tor_mutex_acquire(&listener->lock);
|
|
tor_mutex_acquire(&listener->lock);
|
|
|
|
|
|
void *context = listener->context;
|
|
void *context = listener->context;
|
|
|
|
+ int max_iterations = listener->max_iterations;
|
|
|
|
|
|
- while (!TOR_TAILQ_EMPTY(&listener->pending_events)) {
|
|
|
|
|
|
+ while (!TOR_TAILQ_EMPTY(&listener->pending_events) &&
|
|
|
|
+ (max_iterations < 0 || counter < max_iterations)) {
|
|
event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
|
|
event_wrapper_t *wrapper = TOR_TAILQ_FIRST(&listener->pending_events);
|
|
TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
|
|
TOR_TAILQ_REMOVE(&listener->pending_events, wrapper, next_event);
|
|
tor_assert(wrapper != NULL);
|
|
tor_assert(wrapper != NULL);
|
|
@@ -386,6 +403,8 @@ event_listener_process(event_listener_t *listener)
|
|
|
|
|
|
if (PREDICT_LIKELY(process_event_fn != NULL)) {
|
|
if (PREDICT_LIKELY(process_event_fn != NULL)) {
|
|
process_event_fn(wrapper->label, wrapper->data, context);
|
|
process_event_fn(wrapper->label, wrapper->data, context);
|
|
|
|
+ counter += 1;
|
|
|
|
+ // only increase the counter if a callback was run
|
|
} else {
|
|
} else {
|
|
// no callback available
|
|
// no callback available
|
|
log_warn(LD_BUG, "An event was received but had no callback");
|
|
log_warn(LD_BUG, "An event was received but had no callback");
|
|
@@ -395,7 +414,11 @@ event_listener_process(event_listener_t *listener)
|
|
tor_mutex_acquire(&listener->lock);
|
|
tor_mutex_acquire(&listener->lock);
|
|
}
|
|
}
|
|
|
|
|
|
- listener->is_pending = false;
|
|
|
|
|
|
+ if (TOR_TAILQ_EMPTY(&listener->pending_events)) {
|
|
|
|
+ listener->is_pending = false;
|
|
|
|
+ } else {
|
|
|
|
+ event_active(listener->eventloop_ev, EV_READ, 1);
|
|
|
|
+ }
|
|
|
|
|
|
tor_mutex_release(&listener->lock);
|
|
tor_mutex_release(&listener->lock);
|
|
}
|
|
}
|