diff options
Diffstat (limited to 'src/event/modules')
-rw-r--r-- | src/event/modules/ngx_aio_module.c | 13 | ||||
-rw-r--r-- | src/event/modules/ngx_devpoll_module.c | 155 | ||||
-rw-r--r-- | src/event/modules/ngx_epoll_module.c | 189 | ||||
-rw-r--r-- | src/event/modules/ngx_iocp_module.c | 85 | ||||
-rw-r--r-- | src/event/modules/ngx_kqueue_module.c | 234 | ||||
-rw-r--r-- | src/event/modules/ngx_poll_module.c | 244 | ||||
-rw-r--r-- | src/event/modules/ngx_rtsig_module.c | 227 | ||||
-rw-r--r-- | src/event/modules/ngx_select_module.c | 269 |
8 files changed, 452 insertions, 964 deletions
diff --git a/src/event/modules/ngx_aio_module.c b/src/event/modules/ngx_aio_module.c index bf78a8e00..fb3c8c85e 100644 --- a/src/event/modules/ngx_aio_module.c +++ b/src/event/modules/ngx_aio_module.c @@ -14,12 +14,13 @@ #endif -static ngx_int_t ngx_aio_init(ngx_cycle_t *cycle); +static ngx_int_t ngx_aio_init(ngx_cycle_t *cycle, ngx_msec_t timer); static void ngx_aio_done(ngx_cycle_t *cycle); static ngx_int_t ngx_aio_add_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_aio_del_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_aio_del_connection(ngx_connection_t *c, u_int flags); -static ngx_int_t ngx_aio_process_events(ngx_cycle_t *cycle); +static ngx_int_t ngx_aio_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags); ngx_os_io_t ngx_os_aio = { @@ -73,9 +74,9 @@ ngx_module_t ngx_aio_module = { #if (NGX_HAVE_KQUEUE) static ngx_int_t -ngx_aio_init(ngx_cycle_t *cycle) +ngx_aio_init(ngx_cycle_t *cycle, ngx_msec_t timer) { - if (ngx_kqueue_module_ctx.actions.init(cycle) == NGX_ERROR) { + if (ngx_kqueue_module_ctx.actions.init(cycle, timer) == NGX_ERROR) { return NGX_ERROR; } @@ -159,9 +160,9 @@ ngx_aio_del_connection(ngx_connection_t *c, u_int flags) static ngx_int_t -ngx_aio_process_events(ngx_cycle_t *cycle) +ngx_aio_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { - return ngx_kqueue_module_ctx.actions.process_events(cycle); + return ngx_kqueue_module_ctx.actions.process_events(cycle, timer, flags); } #endif /* NGX_HAVE_KQUEUE */ diff --git a/src/event/modules/ngx_devpoll_module.c b/src/event/modules/ngx_devpoll_module.c index 7ee8602fc..54a595671 100644 --- a/src/event/modules/ngx_devpoll_module.c +++ b/src/event/modules/ngx_devpoll_module.c @@ -31,12 +31,13 @@ typedef struct { } ngx_devpoll_conf_t; -static ngx_int_t ngx_devpoll_init(ngx_cycle_t *cycle); +static ngx_int_t ngx_devpoll_init(ngx_cycle_t *cycle, ngx_msec_t timer); static void ngx_devpoll_done(ngx_cycle_t *cycle); static ngx_int_t ngx_devpoll_add_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_devpoll_del_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_devpoll_set_event(ngx_event_t *ev, int event, u_int flags); -static ngx_int_t ngx_devpoll_process_events(ngx_cycle_t *cycle); +static ngx_int_t ngx_devpoll_process_events(ngx_cycle_t *cycle, + ngx_msec_t timer, ngx_uint_t flags); static void *ngx_devpoll_create_conf(ngx_cycle_t *cycle); static char *ngx_devpoll_init_conf(ngx_cycle_t *cycle, void *conf); @@ -107,7 +108,7 @@ ngx_module_t ngx_devpoll_module = { static ngx_int_t -ngx_devpoll_init(ngx_cycle_t *cycle) +ngx_devpoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { size_t n; ngx_devpoll_conf_t *dpcf; @@ -323,48 +324,21 @@ ngx_devpoll_set_event(ngx_event_t *ev, int event, u_int flags) ngx_int_t -ngx_devpoll_process_events(ngx_cycle_t *cycle) +ngx_devpoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags) { int events, revents; size_t n; ngx_err_t err; ngx_int_t i; - ngx_uint_t lock, accept_lock; - ngx_msec_t timer, delta; -#if 0 - ngx_cycle_t **old_cycle; -#endif - ngx_event_t *rev, *wev; + ngx_uint_t level; + ngx_msec_t delta; + ngx_event_t *rev, *wev, **queue; ngx_connection_t *c; struct dvpoll dvp; - struct timeval tv; - - timer = ngx_event_find_timer(); /* NGX_TIMER_INFINITE == INFTIM */ - accept_lock = 0; - - if (ngx_accept_mutex) { - if (ngx_accept_disabled > 0) { - ngx_accept_disabled--; - - } else { - if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { - return NGX_ERROR; - } - - if (ngx_accept_mutex_held) { - accept_lock = 1; - - } else if (timer == NGX_TIMER_INFINITE - || timer > ngx_accept_mutex_delay) - { - timer = ngx_accept_mutex_delay; - } - } - } - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "devpoll timer: %M", timer); @@ -373,7 +347,6 @@ ngx_devpoll_process_events(ngx_cycle_t *cycle) if (write(dp, change_list, n) != (ssize_t) n) { ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "write(/dev/poll) failed"); - ngx_accept_mutex_unlock(); return NGX_ERROR; } } @@ -391,21 +364,32 @@ ngx_devpoll_process_events(ngx_cycle_t *cycle) nchanges = 0; - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); - - delta = ngx_current_time; - ngx_current_time = (ngx_msec_t) tv.tv_sec * 1000 + tv.tv_usec / 1000; + delta = ngx_current_msec; + + if (flags & NGX_UPDATE_TIME) { + ngx_time_update(0, 0); + } if (err) { - ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT, - cycle->log, err, "ioctl(DP_POLL) failed"); - ngx_accept_mutex_unlock(); + if (err == NGX_EINTR) { + + if (ngx_event_timer_alarm) { + ngx_event_timer_alarm = 0; + return NGX_OK; + } + + level = NGX_LOG_INFO; + + } else { + level = NGX_LOG_ALERT; + } + + ngx_log_error(level, cycle->log, err, "ioctl(DP_POLL) failed"); return NGX_ERROR; } if (timer != NGX_TIMER_INFINITE) { - delta = ngx_current_time - delta; + delta = ngx_current_msec - delta; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "devpoll timer: %M, delta: %M", timer, delta); @@ -413,17 +397,15 @@ ngx_devpoll_process_events(ngx_cycle_t *cycle) if (events == 0) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "ioctl(DP_POLL) returned no events without timeout"); - ngx_accept_mutex_unlock(); return NGX_ERROR; } } - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - ngx_accept_mutex_unlock(); - return NGX_ERROR; + if (events == 0) { + return NGX_OK; } - lock = 1; + ngx_mutex_lock(ngx_posted_events_mutex); for (i = 0; i < events; i++) { c = ngx_cycle->files[event_list[i].fd]; @@ -468,74 +450,49 @@ ngx_devpoll_process_events(ngx_cycle_t *cycle) revents |= POLLIN|POLLOUT; } - wev = c->write; + rev = c->read; - if ((revents & POLLOUT) && wev->active) { - wev->ready = 1; + if ((revents & POLLIN) && rev->active) { - if (!ngx_threaded && !ngx_accept_mutex_held) { - wev->handler(wev); + if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) { + rev->posted_ready = 1; } else { - ngx_post_event(wev); + rev->ready = 1; } - } - /* - * POLLIN must be handled after POLLOUT because we use - * the optimization to avoid the unnecessary mutex locking/unlocking - * if the accept event is the last one. - */ + if (flags & NGX_POST_EVENTS) { + queue = (ngx_event_t **) (rev->accept ? + &ngx_posted_accept_events : &ngx_posted_events); - rev = c->read; - - if ((revents & POLLIN) && rev->active) { - rev->ready = 1; + ngx_locked_post_event(rev, queue); - if (!ngx_threaded && !ngx_accept_mutex_held) { + } else { rev->handler(rev); + } + } - } else if (!rev->accept) { - ngx_post_event(rev); + wev = c->write; - } else if (ngx_accept_disabled <= 0) { - ngx_mutex_unlock(ngx_posted_events_mutex); + if ((revents & POLLOUT) && wev->active) { - c->read->handler(rev); + if (flags & NGX_POST_THREAD_EVENTS) { + wev->posted_ready = 1; - if (ngx_accept_disabled > 0) { - ngx_accept_mutex_unlock(); - accept_lock = 0; - } + } else { + wev->ready = 1; + } - if (i + 1 == events) { - lock = 0; - break; - } + if (flags & NGX_POST_EVENTS) { + ngx_locked_post_event(wev, &ngx_posted_events); - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - if (accept_lock) { - ngx_accept_mutex_unlock(); - } - return NGX_ERROR; - } + } else { + wev->handler(wev); } } } - if (accept_lock) { - ngx_accept_mutex_unlock(); - } - - if (lock) { - ngx_mutex_unlock(ngx_posted_events_mutex); - } - - ngx_event_expire_timers(); - - if (!ngx_threaded) { - ngx_event_process_posted(cycle); - } + ngx_mutex_unlock(ngx_posted_events_mutex); return NGX_OK; } diff --git a/src/event/modules/ngx_epoll_module.c b/src/event/modules/ngx_epoll_module.c index a53b8eb26..14013dcd7 100644 --- a/src/event/modules/ngx_epoll_module.c +++ b/src/event/modules/ngx_epoll_module.c @@ -70,13 +70,14 @@ typedef struct { } ngx_epoll_conf_t; -static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle); +static ngx_int_t ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer); static void ngx_epoll_done(ngx_cycle_t *cycle); static ngx_int_t ngx_epoll_add_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_epoll_del_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_epoll_add_connection(ngx_connection_t *c); static ngx_int_t ngx_epoll_del_connection(ngx_connection_t *c, u_int flags); -static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle); +static ngx_int_t ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags); static void *ngx_epoll_create_conf(ngx_cycle_t *cycle); static char *ngx_epoll_init_conf(ngx_cycle_t *cycle, void *conf); @@ -137,7 +138,7 @@ ngx_module_t ngx_epoll_module = { static ngx_int_t -ngx_epoll_init(ngx_cycle_t *cycle) +ngx_epoll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_event_conf_t *ecf; ngx_epoll_conf_t *epcf; @@ -382,57 +383,20 @@ ngx_epoll_del_connection(ngx_connection_t *c, u_int flags) static ngx_int_t -ngx_epoll_process_events(ngx_cycle_t *cycle) +ngx_epoll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int events; uint32_t revents; ngx_int_t instance, i; - ngx_uint_t lock, accept_lock; + ngx_uint_t level; ngx_err_t err; ngx_log_t *log; - ngx_msec_t timer, delta; - ngx_event_t *rev, *wev; - struct timeval tv; + ngx_msec_t delta; + ngx_event_t *rev, *wev, **queue; ngx_connection_t *c; - timer = ngx_event_find_timer(); - -#if (NGX_THREADS) - - if (timer == NGX_TIMER_ERROR) { - return NGX_ERROR; - } - - if (timer == NGX_TIMER_INFINITE || timer > 500) { - timer = 500; - } - -#endif - /* NGX_TIMER_INFINITE == INFTIM */ - accept_lock = 0; - - if (ngx_accept_mutex) { - if (ngx_accept_disabled > 0) { - ngx_accept_disabled--; - - } else { - if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { - return NGX_ERROR; - } - - if (ngx_accept_mutex_held) { - accept_lock = 1; - - } else if (timer == NGX_TIMER_INFINITE - || timer > ngx_accept_mutex_delay) - { - timer = ngx_accept_mutex_delay; - } - } - } - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M", timer); @@ -444,14 +408,14 @@ ngx_epoll_process_events(ngx_cycle_t *cycle) err = 0; } - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); - - delta = ngx_current_time; - ngx_current_time = (ngx_msec_t) tv.tv_sec * 1000 + tv.tv_usec / 1000; + delta = ngx_current_msec; + + if (flags & NGX_UPDATE_TIME) { + ngx_time_update(0, 0); + } if (timer != NGX_TIMER_INFINITE) { - delta = ngx_current_time - delta; + delta = ngx_current_msec - delta; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "epoll timer: %M, delta: %M", timer, delta); @@ -459,30 +423,34 @@ ngx_epoll_process_events(ngx_cycle_t *cycle) if (events == 0) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "epoll_wait() returned no events without timeout"); - ngx_accept_mutex_unlock(); return NGX_ERROR; } } if (err) { - ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT, - cycle->log, err, "epoll_wait() failed"); - ngx_accept_mutex_unlock(); - return NGX_ERROR; - } + if (err == NGX_EINTR) { - if (events > 0) { - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - ngx_accept_mutex_unlock(); - return NGX_ERROR; - } + if (ngx_event_timer_alarm) { + ngx_event_timer_alarm = 0; + return NGX_OK; + } + + level = NGX_LOG_INFO; + + } else { + level = NGX_LOG_ALERT; + } - lock = 1; + ngx_log_error(level, cycle->log, err, "epoll_wait() failed"); + return NGX_ERROR; + } - } else { - lock =0; + if (events == 0) { + return NGX_OK; } + ngx_mutex_lock(ngx_posted_events_mutex); + log = cycle->log; for (i = 0; i < events; i++) { @@ -539,95 +507,48 @@ ngx_epoll_process_events(ngx_cycle_t *cycle) revents |= EPOLLIN|EPOLLOUT; } - wev = c->write; - - if ((revents & EPOLLOUT) && wev->active) { - - if (ngx_threaded) { - wev->posted_ready = 1; - ngx_post_event(wev); - - } else { - wev->ready = 1; - - if (!ngx_accept_mutex_held) { - wev->handler(wev); - - } else { - ngx_post_event(wev); - } - } - } - - /* - * EPOLLIN must be handled after EPOLLOUT because we use - * the optimization to avoid the unnecessary mutex locking/unlocking - * if the accept event is the last one. - */ - if ((revents & EPOLLIN) && rev->active) { - if (ngx_threaded && !rev->accept) { + if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) { rev->posted_ready = 1; - ngx_post_event(rev); - - continue; + } else { + rev->ready = 1; } - rev->ready = 1; - - if (!ngx_threaded && !ngx_accept_mutex_held) { - rev->handler(rev); - - } else if (!rev->accept) { - ngx_post_event(rev); + if (flags & NGX_POST_EVENTS) { + queue = (ngx_event_t **) (rev->accept ? + &ngx_posted_accept_events : &ngx_posted_events); - } else if (ngx_accept_disabled <= 0) { - - ngx_mutex_unlock(ngx_posted_events_mutex); + ngx_locked_post_event(rev, queue); + } else { rev->handler(rev); - - if (ngx_accept_disabled > 0) { - ngx_accept_mutex_unlock(); - accept_lock = 0; - } - - if (i + 1 == events) { - lock = 0; - break; - } - - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - if (accept_lock) { - ngx_accept_mutex_unlock(); - } - return NGX_ERROR; - } } } - } - if (accept_lock) { - ngx_accept_mutex_unlock(); - } + wev = c->write; - if (lock) { - ngx_mutex_unlock(ngx_posted_events_mutex); - } + if ((revents & EPOLLOUT) && wev->active) { - ngx_event_expire_timers(); + if (flags & NGX_POST_THREAD_EVENTS) { + wev->posted_ready = 1; + + } else { + wev->ready = 1; + } - if (ngx_posted_events) { - if (ngx_threaded) { - ngx_wakeup_worker_thread(cycle); + if (flags & NGX_POST_EVENTS) { + ngx_locked_post_event(wev, &ngx_posted_events); - } else { - ngx_event_process_posted(cycle); + } else { + wev->handler(wev); + } } } + ngx_mutex_unlock(ngx_posted_events_mutex); + return NGX_OK; } diff --git a/src/event/modules/ngx_iocp_module.c b/src/event/modules/ngx_iocp_module.c index 269d95552..b7b1c62bc 100644 --- a/src/event/modules/ngx_iocp_module.c +++ b/src/event/modules/ngx_iocp_module.c @@ -10,11 +10,13 @@ #include <ngx_iocp_module.h> -static ngx_int_t ngx_iocp_init(ngx_cycle_t *cycle); +static ngx_int_t ngx_iocp_init(ngx_cycle_t *cycle, ngx_msec_t timer); +static ngx_thread_value_t __stdcall ngx_iocp_timer(void *data); static void ngx_iocp_done(ngx_cycle_t *cycle); static ngx_int_t ngx_iocp_add_event(ngx_event_t *ev, int event, u_int key); static ngx_int_t ngx_iocp_del_connection(ngx_connection_t *c, u_int flags); -static ngx_int_t ngx_iocp_process_events(ngx_cycle_t *cycle); +static ngx_int_t ngx_iocp_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags); static void *ngx_iocp_create_conf(ngx_cycle_t *cycle); static char *ngx_iocp_init_conf(ngx_cycle_t *cycle, void *conf); @@ -93,11 +95,13 @@ ngx_os_io_t ngx_iocp_io = { }; -static HANDLE iocp; +static HANDLE iocp; +static ngx_tid_t timer_thread; +static ngx_msec_t msec; static ngx_int_t -ngx_iocp_init(ngx_cycle_t *cycle) +ngx_iocp_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_iocp_conf_t *cf; @@ -109,7 +113,7 @@ ngx_iocp_init(ngx_cycle_t *cycle) } if (iocp == NULL) { - ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, + ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, "CreateIoCompletionPort() failed"); return NGX_ERROR; } @@ -120,10 +124,55 @@ ngx_iocp_init(ngx_cycle_t *cycle) ngx_event_flags = NGX_USE_AIO_EVENT|NGX_USE_IOCP_EVENT; + if (timer == 0) { + return NGX_OK; + } + + /* + * The waitable timer could not be used, because + * GetQueuedCompletionStatus() does not set a thread to alertable state + */ + + if (timer_thread == NULL) { + + msec = timer; + + if (ngx_create_thread(&timer_thread, ngx_iocp_timer, &msec, cycle->log) + != 0) + { + return NGX_ERROR; + } + } + + ngx_event_flags |= NGX_USE_TIMER_EVENT; + return NGX_OK; } +static ngx_thread_value_t __stdcall +ngx_iocp_timer(void *data) +{ + ngx_msec_t timer = *(ngx_msec_t *) data; + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0, + "THREAD %p %p", &msec, data); + + for ( ;; ) { + Sleep(timer); + + ngx_time_update(0, 0); +#if 1 + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0, "timer"); +#endif + } + +#ifdef __WATCOMC__ + return 0; +#endif +} + + static void ngx_iocp_done(ngx_cycle_t *cycle) { @@ -178,19 +227,17 @@ ngx_iocp_del_connection(ngx_connection_t *c, u_int flags) static -ngx_int_t ngx_iocp_process_events(ngx_cycle_t *cycle) +ngx_int_t ngx_iocp_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags) { int rc; u_int key; u_long bytes; ngx_err_t err; - ngx_msec_t timer, delta; + ngx_msec_t delta; ngx_event_t *ev; - struct timeval tv; ngx_event_ovlp_t *ovlp; - timer = ngx_event_find_timer(); - if (timer == NGX_TIMER_INFINITE) { timer = INFINITE; } @@ -206,17 +253,17 @@ ngx_int_t ngx_iocp_process_events(ngx_cycle_t *cycle) err = 0; } - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); + delta = ngx_current_msec; + + if (flags & NGX_UPDATE_TIME) { + ngx_time_update(0, 0); + } ngx_log_debug4(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "iocp: %d b:%d k:%d ov:%p", rc, bytes, key, ovlp); - delta = ngx_current_time; - ngx_current_time = (ngx_msec_t) tv.tv_sec * 1000 + tv.tv_usec / 1000; - if (timer != INFINITE) { - delta = ngx_current_time - delta; + delta = ngx_current_msec - delta; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "iocp timer: %M, delta: %M", timer, delta); @@ -231,8 +278,6 @@ ngx_int_t ngx_iocp_process_events(ngx_cycle_t *cycle) return NGX_ERROR; } - ngx_event_expire_timers(); - return NGX_OK; } @@ -263,8 +308,6 @@ ngx_int_t ngx_iocp_process_events(ngx_cycle_t *cycle) ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, err, "iocp: aborted event %p", ev); - ngx_event_expire_timers(); - return NGX_OK; } @@ -297,8 +340,6 @@ ngx_int_t ngx_iocp_process_events(ngx_cycle_t *cycle) ev->handler(ev); - ngx_event_expire_timers(); - return NGX_OK; } diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c index e80e1aba0..1c6ab55dc 100644 --- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -16,13 +16,14 @@ typedef struct { } ngx_kqueue_conf_t; -static ngx_int_t ngx_kqueue_init(ngx_cycle_t *cycle); +static ngx_int_t ngx_kqueue_init(ngx_cycle_t *cycle, ngx_msec_t timer); static void ngx_kqueue_done(ngx_cycle_t *cycle); static ngx_int_t ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags); static ngx_int_t ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try); -static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle); +static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags); static ngx_inline void ngx_kqueue_dump_event(ngx_log_t *log, struct kevent *kev); @@ -111,8 +112,9 @@ ngx_module_t ngx_kqueue_module = { static ngx_int_t -ngx_kqueue_init(ngx_cycle_t *cycle) +ngx_kqueue_init(ngx_cycle_t *cycle, ngx_msec_t timer) { + struct kevent kev; struct timespec ts; ngx_kqueue_conf_t *kcf; @@ -191,24 +193,49 @@ ngx_kqueue_init(ngx_cycle_t *cycle) } } - nevents = kcf->events; + ngx_event_flags = 0; - ngx_io = ngx_os_io; +#if (NGX_HAVE_TIMER_EVENT) - ngx_event_actions = ngx_kqueue_module_ctx.actions; + if (timer) { + kev.ident = 0; + kev.filter = EVFILT_TIMER; + kev.flags = EV_ADD|EV_ENABLE; + kev.fflags = 0; + kev.data = timer; + kev.udata = 0; + + ts.tv_sec = 0; + ts.tv_nsec = 0; + + if (kevent(ngx_kqueue, &kev, 1, NULL, 0, &ts) == -1) { + ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno, + "kevent(EVFILT_TIMER) failed"); + return NGX_ERROR; + } + + ngx_event_flags |= NGX_USE_TIMER_EVENT; + } + +#endif + + ngx_event_flags |= NGX_USE_ONESHOT_EVENT|NGX_USE_KQUEUE_EVENT; - ngx_event_flags = NGX_USE_ONESHOT_EVENT -#if 1 #if (NGX_HAVE_CLEAR_EVENT) - |NGX_USE_CLEAR_EVENT + ngx_event_flags |= NGX_USE_CLEAR_EVENT; #else - |NGX_USE_LEVEL_EVENT -#endif + ngx_event_flags |= NGX_USE_LEVEL_EVENT; #endif + #if (NGX_HAVE_LOWAT_EVENT) - |NGX_USE_LOWAT_EVENT + ngx_event_flags |= NGX_USE_LOWAT_EVENT; #endif - |NGX_USE_KQUEUE_EVENT; + + nevents = kcf->events; + + ngx_io = ngx_os_io; + + ngx_event_actions = ngx_kqueue_module_ctx.actions; return NGX_OK; } @@ -254,9 +281,7 @@ ngx_kqueue_add_event(ngx_event_t *ev, int event, u_int flags) ev->disabled = 0; ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0; - if (ngx_mutex_lock(list_mutex) == NGX_ERROR) { - return NGX_ERROR; - } + ngx_mutex_lock(list_mutex); #if 1 @@ -317,9 +342,7 @@ ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags) ev->active = 0; ev->disabled = 0; - if (ngx_mutex_lock(list_mutex) == NGX_ERROR) { - return NGX_ERROR; - } + ngx_mutex_lock(list_mutex); #if 1 @@ -441,56 +464,19 @@ ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags) static ngx_int_t -ngx_kqueue_process_events(ngx_cycle_t *cycle) +ngx_kqueue_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags) { - int events, n; - ngx_int_t i, instance; - ngx_uint_t lock, accept_lock; - ngx_err_t err; - ngx_msec_t timer, delta; - ngx_event_t *ev; - struct timeval tv; - struct timespec ts, *tp; - - timer = ngx_event_find_timer(); - -#if (NGX_THREADS) - - if (timer == NGX_TIMER_ERROR) { - return NGX_ERROR; - } - - if (timer == NGX_TIMER_INFINITE || timer > 500) { - timer = 500; - } - -#endif - - accept_lock = 0; - - if (ngx_accept_mutex) { - if (ngx_accept_disabled > 0) { - ngx_accept_disabled--; - - } else { - if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { - return NGX_ERROR; - } - - if (ngx_accept_mutex_held) { - accept_lock = 1; - - } else if (timer == NGX_TIMER_INFINITE - || timer > ngx_accept_mutex_delay) - { - timer = ngx_accept_mutex_delay; - } - } - } + int events, n; + ngx_int_t i, instance; + ngx_uint_t level; + ngx_err_t err; + ngx_msec_t delta; + ngx_event_t *ev, **queue; + struct timespec ts, *tp; if (ngx_threaded) { if (ngx_kqueue_process_changes(cycle, 0) == NGX_ERROR) { - ngx_accept_mutex_unlock(); return NGX_ERROR; } @@ -521,24 +507,35 @@ ngx_kqueue_process_events(ngx_cycle_t *cycle) err = 0; } - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); + delta = ngx_current_msec; + + if (flags & NGX_UPDATE_TIME) { + ngx_time_update(0, 0); + } ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "kevent events: %d", events); - delta = ngx_current_time; - ngx_current_time = (ngx_msec_t) tv.tv_sec * 1000 + tv.tv_usec / 1000; - if (err) { - ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT, - cycle->log, err, "kevent() failed"); - ngx_accept_mutex_unlock(); + if (err == NGX_EINTR) { + + if (ngx_event_timer_alarm) { + ngx_event_timer_alarm = 0; + return NGX_OK; + } + + level = NGX_LOG_INFO; + + } else { + level = NGX_LOG_ALERT; + } + + ngx_log_error(level, cycle->log, err, "kevent() failed"); return NGX_ERROR; } if (timer != NGX_TIMER_INFINITE) { - delta = ngx_current_time - delta; + delta = ngx_current_msec - delta; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "kevent timer: %M, delta: %M", timer, delta); @@ -547,23 +544,16 @@ ngx_kqueue_process_events(ngx_cycle_t *cycle) if (events == 0) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "kevent() returned no events without timeout"); - ngx_accept_mutex_unlock(); return NGX_ERROR; } } - if (events > 0) { - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - ngx_accept_mutex_unlock(); - return NGX_ERROR; - } - - lock = 1; - - } else { - lock =0; + if (events == 0) { + return NGX_OK; } + ngx_mutex_lock(ngx_posted_events_mutex); + for (i = 0; i < events; i++) { ngx_kqueue_dump_event(cycle->log, &event_list[i]); @@ -574,6 +564,15 @@ ngx_kqueue_process_events(ngx_cycle_t *cycle) continue; } +#if (NGX_HAVE_TIMER_EVENT) + + if (event_list[i].filter == EVFILT_TIMER) { + ngx_time_update(0, 0); + continue; + } + +#endif + ev = (ngx_event_t *) event_list[i].udata; switch (event_list[i].filter) { @@ -606,7 +605,7 @@ ngx_kqueue_process_events(ngx_cycle_t *cycle) #if (NGX_THREADS) - if (ngx_threaded && !ev->accept) { + if ((flags & NGX_POST_THREAD_EVENTS) && !ev->accept) { ev->posted_ready = 1; ev->posted_available = event_list[i].data; @@ -615,7 +614,7 @@ ngx_kqueue_process_events(ngx_cycle_t *cycle) ev->posted_errno = event_list[i].fflags; } - ngx_post_event(ev); + ngx_locked_post_event(ev, &ngx_posted_events); continue; } @@ -651,60 +650,18 @@ ngx_kqueue_process_events(ngx_cycle_t *cycle) continue; } - if (!ngx_threaded && !ngx_accept_mutex_held) { - ev->handler(ev); - continue; - } + if (flags & NGX_POST_EVENTS) { + queue = (ngx_event_t **) (ev->accept ? &ngx_posted_accept_events: + &ngx_posted_events); + ngx_locked_post_event(ev, queue); - if (!ev->accept) { - ngx_post_event(ev); continue; } - if (ngx_accept_disabled > 0) { - continue; - } - - ngx_mutex_unlock(ngx_posted_events_mutex); - ev->handler(ev); - - if (ngx_accept_disabled > 0) { - ngx_accept_mutex_unlock(); - accept_lock = 0; - } - - if (i + 1 == events) { - lock = 0; - break; - } - - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - if (accept_lock) { - ngx_accept_mutex_unlock(); - } - return NGX_ERROR; - } } - if (accept_lock) { - ngx_accept_mutex_unlock(); - } - - if (lock) { - ngx_mutex_unlock(ngx_posted_events_mutex); - } - - ngx_event_expire_timers(); - - if (ngx_posted_events) { - if (ngx_threaded) { - ngx_wakeup_worker_thread(cycle); - - } else { - ngx_event_process_posted(cycle); - } - } + ngx_mutex_unlock(ngx_posted_events_mutex); return NGX_OK; } @@ -719,14 +676,9 @@ ngx_kqueue_process_changes(ngx_cycle_t *cycle, ngx_uint_t try) struct timespec ts; struct kevent *changes; - if (ngx_mutex_lock(kevent_mutex) == NGX_ERROR) { - return NGX_ERROR; - } + ngx_mutex_lock(kevent_mutex); - if (ngx_mutex_lock(list_mutex) == NGX_ERROR) { - ngx_mutex_unlock(kevent_mutex); - return NGX_ERROR; - } + ngx_mutex_lock(list_mutex); if (nchanges == 0) { ngx_mutex_unlock(list_mutex); diff --git a/src/event/modules/ngx_poll_module.c b/src/event/modules/ngx_poll_module.c index 29b375aee..61d617913 100644 --- a/src/event/modules/ngx_poll_module.c +++ b/src/event/modules/ngx_poll_module.c @@ -9,23 +9,18 @@ #include <ngx_event.h> -static ngx_int_t ngx_poll_init(ngx_cycle_t *cycle); +static ngx_int_t ngx_poll_init(ngx_cycle_t *cycle, ngx_msec_t timer); static void ngx_poll_done(ngx_cycle_t *cycle); static ngx_int_t ngx_poll_add_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_poll_del_event(ngx_event_t *ev, int event, u_int flags); -static ngx_int_t ngx_poll_process_events(ngx_cycle_t *cycle); +static ngx_int_t ngx_poll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags); static char *ngx_poll_init_conf(ngx_cycle_t *cycle, void *conf); static struct pollfd *event_list; static int nevents; -#if 0 -static ngx_event_t **ready_index; -#endif - -static ngx_event_t *accept_events; - static ngx_str_t poll_name = ngx_string("poll"); @@ -67,7 +62,7 @@ ngx_module_t ngx_poll_module = { static ngx_int_t -ngx_poll_init(ngx_cycle_t *cycle) +ngx_poll_init(ngx_cycle_t *cycle, ngx_msec_t timer) { struct pollfd *list; @@ -91,27 +86,13 @@ ngx_poll_init(ngx_cycle_t *cycle) } event_list = list; - -#if 0 - if (ready_index) { - ngx_free(ready_index); - } - - ready_index = ngx_alloc(sizeof(ngx_event_t *) * 2 * cycle->connection_n, - cycle->log); - if (ready_index == NULL) { - return NGX_ERROR; - } -#endif } ngx_io = ngx_os_io; ngx_event_actions = ngx_poll_module_ctx.actions; - ngx_event_flags = NGX_USE_LEVEL_EVENT - |NGX_USE_ONESHOT_EVENT - |NGX_USE_FD_EVENT; + ngx_event_flags = NGX_USE_LEVEL_EVENT|NGX_USE_FD_EVENT; return NGX_OK; } @@ -121,14 +102,8 @@ static void ngx_poll_done(ngx_cycle_t *cycle) { ngx_free(event_list); -#if 0 - ngx_free(ready_index); -#endif event_list = NULL; -#if 0 - ready_index = NULL; -#endif } @@ -189,10 +164,8 @@ ngx_poll_add_event(ngx_event_t *ev, int event, u_int flags) static ngx_int_t ngx_poll_del_event(ngx_event_t *ev, int event, u_int flags) { - ngx_uint_t i; - ngx_cycle_t **cycle; - ngx_event_t *e; - ngx_connection_t *c; + ngx_event_t *e; + ngx_connection_t *c; c = ev->data; @@ -234,19 +207,6 @@ ngx_poll_del_event(ngx_event_t *ev, int event, u_int flags) c = ngx_cycle->files[event_list[nevents].fd]; if (c->fd == -1) { - cycle = ngx_old_cycles.elts; - for (i = 0; i < ngx_old_cycles.nelts; i++) { - if (cycle[i] == NULL) { - continue; - } - c = cycle[i]->files[event_list[nevents].fd]; - if (c->fd != -1) { - break; - } - } - } - - if (c->fd == -1) { ngx_log_error(NGX_LOG_ALERT, ev->log, 0, "unexpected last event"); @@ -275,19 +235,15 @@ ngx_poll_del_event(ngx_event_t *ev, int event, u_int flags) static ngx_int_t -ngx_poll_process_events(ngx_cycle_t *cycle) +ngx_poll_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int ready, revents; ngx_err_t err; ngx_int_t i, nready; - ngx_uint_t n, found, lock; - ngx_msec_t timer, delta; - ngx_cycle_t **old_cycle; - ngx_event_t *ev; + ngx_uint_t found, level; + ngx_msec_t delta; + ngx_event_t *ev, **queue; ngx_connection_t *c; - struct timeval tv; - - timer = ngx_event_find_timer(); /* NGX_TIMER_INFINITE == INFTIM */ @@ -301,24 +257,6 @@ ngx_poll_process_events(ngx_cycle_t *cycle) } #endif - if (ngx_accept_mutex) { - if (ngx_accept_disabled > 0) { - ngx_accept_disabled--; - - } else { - if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { - return NGX_ERROR; - } - - if (ngx_accept_mutex_held == 0 - && (timer == NGX_TIMER_INFINITE - || timer > ngx_accept_mutex_delay)) - { - timer = ngx_accept_mutex_delay; - } - } - } - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "poll timer: %M", timer); ready = poll(event_list, (u_int) nevents, (int) timer); @@ -329,24 +267,35 @@ ngx_poll_process_events(ngx_cycle_t *cycle) err = 0; } - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); + delta = ngx_current_msec; - delta = ngx_current_time; - ngx_current_time = (ngx_msec_t) tv.tv_sec * 1000 + tv.tv_usec / 1000; + if (flags & NGX_UPDATE_TIME) { + ngx_time_update(0, 0); + } ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "poll ready %d of %d", ready, nevents); if (err) { - ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT, - cycle->log, err, "poll() failed"); - ngx_accept_mutex_unlock(); + if (err == NGX_EINTR) { + + if (ngx_event_timer_alarm) { + ngx_event_timer_alarm = 0; + return NGX_OK; + } + + level = NGX_LOG_INFO; + + } else { + level = NGX_LOG_ALERT; + } + + ngx_log_error(level, cycle->log, err, "poll() failed"); return NGX_ERROR; } if (timer != NGX_TIMER_INFINITE) { - delta = ngx_current_time - delta; + delta = ngx_current_msec - delta; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "poll timer: %M, delta: %M", timer, delta); @@ -354,24 +303,23 @@ ngx_poll_process_events(ngx_cycle_t *cycle) if (ready == 0) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "poll() returned no events without timeout"); - ngx_accept_mutex_unlock(); return NGX_ERROR; } } - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - ngx_accept_mutex_unlock(); - return NGX_ERROR; + if (ready == 0) { + return NGX_OK; } - lock = 1; + ngx_mutex_lock(ngx_posted_events_mutex); + nready = 0; for (i = 0; i < nevents && ready; i++) { revents = event_list[i].revents; -#if 0 +#if 1 ngx_log_debug4(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "poll: %d: fd:%d ev:%04Xd rev:%04Xd", i, event_list[i].fd, event_list[i].events, revents); @@ -406,19 +354,6 @@ ngx_poll_process_events(ngx_cycle_t *cycle) c = ngx_cycle->files[event_list[i].fd]; if (c->fd == -1) { - old_cycle = ngx_old_cycles.elts; - for (n = 0; n < ngx_old_cycles.nelts; n++) { - if (old_cycle[n] == NULL) { - continue; - } - c = old_cycle[n]->files[event_list[i].fd]; - if (c->fd != -1) { - break; - } - } - } - - if (c->fd == -1) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "unexpected event"); /* @@ -453,43 +388,31 @@ ngx_poll_process_events(ngx_cycle_t *cycle) found = 1; ev = c->read; - ev->ready = 1; - if (ev->oneshot) { - if (ev->timer_set) { - ngx_del_timer(ev); - } - ngx_poll_del_event(ev, NGX_READ_EVENT, 0); - } + if ((flags & NGX_POST_THREAD_EVENTS) && !ev->accept) { + ev->posted_ready = 1; - if (ev->accept) { - ev->next = accept_events; - accept_events = ev; } else { - ngx_post_event(ev); + ev->ready = 1; } -#if 0 - ready_index[nready++] = c->read; -#endif + queue = (ngx_event_t **) (ev->accept ? &ngx_posted_accept_events: + &ngx_posted_events); + ngx_locked_post_event(ev, queue); } if (revents & POLLOUT) { found = 1; ev = c->write; - ev->ready = 1; - if (ev->oneshot) { - if (ev->timer_set) { - ngx_del_timer(ev); - } - ngx_poll_del_event(ev, NGX_WRITE_EVENT, 0); + if (flags & NGX_POST_THREAD_EVENTS) { + ev->posted_ready = 1; + + } else { + ev->ready = 1; } - ngx_post_event(ev); -#if 0 - ready_index[nready++] = c->write; -#endif + ngx_locked_post_event(ev, &ngx_posted_events); } if (found) { @@ -498,83 +421,12 @@ ngx_poll_process_events(ngx_cycle_t *cycle) } } -#if 0 - for (i = 0; i < nready; i++) { - ev = ready_index[i]; - - if (!ev->active) { - continue; - } - - ev->ready = 1; - - if (ev->oneshot) { - if (ev->timer_set) { - ngx_del_timer(ev); - } - - if (ev->write) { - ngx_poll_del_event(ev, NGX_WRITE_EVENT, 0); - } else { - ngx_poll_del_event(ev, NGX_READ_EVENT, 0); - } - } - - ev->handler(ev); - } -#endif - - ev = accept_events; - - for ( ;; ) { - - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "accept event %p", ev); - - if (ev == NULL) { - break; - } - - ngx_mutex_unlock(ngx_posted_events_mutex); - - ev->handler(ev); - - if (ngx_accept_disabled > 0) { - lock = 0; - break; - } - - ev = ev->next; - - if (ev == NULL) { - lock = 0; - break; - } - - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - ngx_accept_mutex_unlock(); - return NGX_ERROR; - } - - } - - ngx_accept_mutex_unlock(); - accept_events = NULL; - - if (lock) { - ngx_mutex_unlock(ngx_posted_events_mutex); - } + ngx_mutex_unlock(ngx_posted_events_mutex); if (ready != 0) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "poll ready != events"); } - ngx_event_expire_timers(); - - if (!ngx_threaded) { - ngx_event_process_posted(cycle); - } - return nready; } diff --git a/src/event/modules/ngx_rtsig_module.c b/src/event/modules/ngx_rtsig_module.c index f88dd5d6f..450943c2c 100644 --- a/src/event/modules/ngx_rtsig_module.c +++ b/src/event/modules/ngx_rtsig_module.c @@ -38,12 +38,14 @@ typedef struct { extern ngx_event_module_t ngx_poll_module_ctx; -static ngx_int_t ngx_rtsig_init(ngx_cycle_t *cycle); +static ngx_int_t ngx_rtsig_init(ngx_cycle_t *cycle, ngx_msec_t timer); static void ngx_rtsig_done(ngx_cycle_t *cycle); static ngx_int_t ngx_rtsig_add_connection(ngx_connection_t *c); static ngx_int_t ngx_rtsig_del_connection(ngx_connection_t *c, u_int flags); -static ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle); -static ngx_int_t ngx_rtsig_process_overflow(ngx_cycle_t *cycle); +static ngx_int_t ngx_rtsig_process_events(ngx_cycle_t *cycle, + ngx_msec_t timer, ngx_uint_t flags); +static ngx_int_t ngx_rtsig_process_overflow(ngx_cycle_t *cycle, + ngx_msec_t timer, ngx_uint_t flags); static void *ngx_rtsig_create_conf(ngx_cycle_t *cycle); static char *ngx_rtsig_init_conf(ngx_cycle_t *cycle, void *conf); @@ -134,7 +136,7 @@ ngx_module_t ngx_rtsig_module = { static ngx_int_t -ngx_rtsig_init(ngx_cycle_t *cycle) +ngx_rtsig_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_rtsig_conf_t *rtscf; @@ -144,6 +146,7 @@ ngx_rtsig_init(ngx_cycle_t *cycle) sigaddset(&set, rtscf->signo); sigaddset(&set, rtscf->signo + 1); sigaddset(&set, SIGIO); + sigaddset(&set, SIGALRM); if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) { ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno, @@ -279,59 +282,19 @@ ngx_rtsig_del_connection(ngx_connection_t *c, u_int flags) static ngx_int_t -ngx_rtsig_process_events(ngx_cycle_t *cycle) +ngx_rtsig_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, ngx_uint_t flags) { int signo; ngx_int_t instance; - ngx_msec_t timer, delta; + ngx_msec_t delta; ngx_err_t err; siginfo_t si; ngx_event_t *rev, *wev; - struct timeval tv; struct timespec ts, *tp; struct sigaction sa; ngx_connection_t *c; ngx_rtsig_conf_t *rtscf; - if (overflow) { - timer = 0; - - } else { - timer = ngx_event_find_timer(); - -#if (NGX_THREADS) - - if (timer == NGX_TIMER_ERROR) { - return NGX_ERROR; - } - - if (timer == NGX_TIMER_INFINITE || timer > 500) { - timer = 500; - } - -#endif - - if (ngx_accept_mutex) { - if (ngx_accept_disabled > 0) { - ngx_accept_disabled--; - - } else { - ngx_accept_mutex_held = 0; - - if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { - return NGX_ERROR; - } - - if (ngx_accept_mutex_held == 0 - && (timer == NGX_TIMER_INFINITE - || timer > ngx_accept_mutex_delay)) - { - timer = ngx_accept_mutex_delay; - } - } - } - } - if (timer == NGX_TIMER_INFINITE) { tp = NULL; @@ -357,7 +320,6 @@ ngx_rtsig_process_events(ngx_cycle_t *cycle) if (err == NGX_EAGAIN) { if (timer == NGX_TIMER_INFINITE) { - ngx_accept_mutex_unlock(); ngx_log_error(NGX_LOG_ALERT, cycle->log, err, "sigtimedwait() returned EAGAIN without timeout"); return NGX_ERROR; @@ -373,21 +335,20 @@ ngx_rtsig_process_events(ngx_cycle_t *cycle) signo, si.si_fd, si.si_band); } - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); - - delta = ngx_current_time; - ngx_current_time = (ngx_msec_t) tv.tv_sec * 1000 + tv.tv_usec / 1000; + delta = ngx_current_msec; + + if (flags & NGX_UPDATE_TIME) { + ngx_time_update(0, 0); + } if (err) { - ngx_accept_mutex_unlock(); ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT, cycle->log, err, "sigtimedwait() failed"); return NGX_ERROR; } if (timer != NGX_TIMER_INFINITE) { - delta = ngx_current_time - delta; + delta = ngx_current_msec - delta; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "rtsig timer: %M, delta: %M", timer, delta); @@ -404,9 +365,8 @@ ngx_rtsig_process_events(ngx_cycle_t *cycle) c = ngx_cycle->files[si.si_fd]; if (c == NULL) { - /* the stale event */ - ngx_accept_mutex_unlock(); + /* the stale event */ return NGX_OK; } @@ -415,84 +375,36 @@ ngx_rtsig_process_events(ngx_cycle_t *cycle) rev = c->read; - if (c->read->instance != instance) { + if (rev->instance != instance) { /* * the stale event from a file descriptor * that was just closed in this iteration */ - ngx_accept_mutex_unlock(); - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "rtsig: stale event %p", c); return NGX_OK; } - if (si.si_band & (POLLIN|POLLHUP|POLLERR)) { - if (rev->active) { - - if (ngx_threaded && !rev->accept) { - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - ngx_accept_mutex_unlock(); - return NGX_ERROR; - } - - rev->posted_ready = 1; - ngx_post_event(rev); - - ngx_mutex_unlock(ngx_posted_events_mutex); - - } else { - rev->ready = 1; - - if (!ngx_threaded && !ngx_accept_mutex_held) { - rev->handler(rev); - - } else if (rev->accept) { - if (ngx_accept_disabled <= 0) { - rev->handler(rev); - } - - } else { - ngx_post_event(rev); - } - } - } + if ((si.si_band & (POLLIN|POLLHUP|POLLERR)) && rev->active) { + rev->ready = 1; + rev->handler(rev); } wev = c->write; - if (si.si_band & (POLLOUT|POLLHUP|POLLERR)) { - if (wev->active) { - - if (ngx_threaded) { - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - ngx_accept_mutex_unlock(); - return NGX_ERROR; - } - - wev->posted_ready = 1; - ngx_post_event(wev); - - ngx_mutex_unlock(ngx_posted_events_mutex); - - } else { - wev->ready = 1; + if ((si.si_band & (POLLOUT|POLLHUP|POLLERR)) && wev->active) { + wev->ready = 1; + wev->handler(wev); + } - if (!ngx_threaded && !ngx_accept_mutex_held) { - wev->handler(wev); + } else if (signo == SIGALRM) { - } else { - ngx_post_event(wev); - } - } - } - } + return NGX_OK; } else if (signo == SIGIO) { - ngx_accept_mutex_unlock(); ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "rt signal queue overflowed"); @@ -520,48 +432,35 @@ ngx_rtsig_process_events(ngx_cycle_t *cycle) return NGX_ERROR; } else if (signo != -1) { - ngx_accept_mutex_unlock(); - ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "sigtimedwait() returned unexpected signal: %d", signo); return NGX_ERROR; } - ngx_accept_mutex_unlock(); - - ngx_event_expire_timers(); - - if (ngx_posted_events) { - if (ngx_threaded) { - ngx_wakeup_worker_thread(cycle); - - } else { - ngx_event_process_posted(cycle); - } - } - - if (signo == -1) { - return NGX_AGAIN; - } else { + if (signo != -1) { return NGX_OK; } -} + return NGX_AGAIN; +} -/* TODO: old cylces */ static ngx_int_t -ngx_rtsig_process_overflow(ngx_cycle_t *cycle) +ngx_rtsig_process_overflow(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags) { int name[2], rtsig_max, rtsig_nr, events, ready; size_t len; ngx_int_t tested, n, i; ngx_err_t err; - ngx_event_t *rev, *wev; + ngx_event_t *rev, *wev, **queue; ngx_connection_t *c; ngx_rtsig_conf_t *rtscf; + ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "rtsig process overflow"); + rtscf = ngx_event_get_conf(ngx_cycle->conf_ctx, ngx_rtsig_module); tested = 0; @@ -608,6 +507,9 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle) for ( ;; ) { ready = poll(overflow_list, n, 0); + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, + "rtsig overflow poll:%d", ready); + if (ready == -1) { err = ngx_errno; ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT, @@ -626,9 +528,7 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle) continue; } - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - return NGX_ERROR; - } + ngx_mutex_lock(ngx_posted_events_mutex); for (i = 0; i < n; i++) { c = cycle->files[overflow_list[i].fd]; @@ -647,13 +547,21 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle) { tested++; - if (ngx_threaded) { + if ((flags & NGX_POST_THREAD_EVENTS) && !rev->accept) { rev->posted_ready = 1; - ngx_post_event(rev); } else { rev->ready = 1; - rev->handler(rev); + } + + if (flags & NGX_POST_EVENTS) { + queue = (ngx_event_t **) (rev->accept ? + &ngx_posted_accept_events : &ngx_posted_events); + + ngx_locked_post_event(rev, queue); + + } else { + rev->handler(rev); } } @@ -667,13 +575,18 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle) { tested++; - if (ngx_threaded) { + if (flags & NGX_POST_THREAD_EVENTS) { wev->posted_ready = 1; - ngx_post_event(wev); } else { wev->ready = 1; - wev->handler(wev); + } + + if (flags & NGX_POST_EVENTS) { + ngx_locked_post_event(wev, &ngx_posted_events); + + } else { + wev->handler(wev); } } } @@ -688,8 +601,8 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle) * Check the current rt queue length to prevent * the new overflow. * - * Learn the /proc/sys/kernel/rtsig-max value because - * it can be changed since the last checking. + * learn the "/proc/sys/kernel/rtsig-max" value because + * it can be changed since the last checking */ name[0] = CTL_KERN; @@ -713,16 +626,17 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle) } /* - * drain the rt signal queue if the /proc/sys/kernel/rtsig-nr + * drain the rt signal queue if the /"proc/sys/kernel/rtsig-nr" * is bigger than - * /proc/sys/kernel/rtsig-max / rtsig_overflow_threshold + * "/proc/sys/kernel/rtsig-max" / "rtsig_overflow_threshold" */ if (rtsig_max / rtscf->overflow_threshold < rtsig_nr) { ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "rtsig queue state: %d/%d", rtsig_nr, rtsig_max); - while (ngx_rtsig_process_events(cycle) == NGX_OK) { + while (ngx_rtsig_process_events(cycle, 0, flags) == NGX_OK) + { /* void */ } } @@ -734,20 +648,17 @@ ngx_rtsig_process_overflow(ngx_cycle_t *cycle) * so drain the rt signal queue unconditionally */ - while (ngx_rtsig_process_events(cycle) == NGX_OK) { /* void */ } + while (ngx_rtsig_process_events(cycle, 0, flags) == NGX_OK) { + /* void */ + } } tested = 0; } } - if (ngx_posted_events) { - if (ngx_threaded) { - ngx_wakeup_worker_thread(cycle); - - } else { - ngx_event_process_posted(cycle); - } + if (flags & NGX_UPDATE_TIME) { + ngx_time_update(0, 0); } ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, diff --git a/src/event/modules/ngx_select_module.c b/src/event/modules/ngx_select_module.c index 5be3afa85..da4303def 100644 --- a/src/event/modules/ngx_select_module.c +++ b/src/event/modules/ngx_select_module.c @@ -10,11 +10,12 @@ -static ngx_int_t ngx_select_init(ngx_cycle_t *cycle); +static ngx_int_t ngx_select_init(ngx_cycle_t *cycle, ngx_msec_t timer); static void ngx_select_done(ngx_cycle_t *cycle); static ngx_int_t ngx_select_add_event(ngx_event_t *ev, int event, u_int flags); static ngx_int_t ngx_select_del_event(ngx_event_t *ev, int event, u_int flags); -static ngx_int_t ngx_select_process_events(ngx_cycle_t *cycle); +static ngx_int_t ngx_select_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags); static char *ngx_select_init_conf(ngx_cycle_t *cycle, void *conf); @@ -33,11 +34,6 @@ static int max_fd; static ngx_uint_t nevents; static ngx_event_t **event_index; -#if 0 -static ngx_event_t **ready_index; -#endif - -static ngx_event_t *accept_events; static ngx_str_t select_name = ngx_string("select"); @@ -79,7 +75,7 @@ ngx_module_t ngx_select_module = { static ngx_int_t -ngx_select_init(ngx_cycle_t *cycle) +ngx_select_init(ngx_cycle_t *cycle, ngx_msec_t timer) { ngx_event_t **index; @@ -103,26 +99,15 @@ ngx_select_init(ngx_cycle_t *cycle) ngx_memcpy(index, event_index, sizeof(ngx_event_t *) * nevents); ngx_free(event_index); } - event_index = index; -#if 0 - if (ready_index) { - ngx_free(ready_index); - } - - ready_index = ngx_alloc(sizeof(ngx_event_t *) * 2 * cycle->connection_n, - cycle->log); - if (ready_index == NULL) { - return NGX_ERROR; - } -#endif + event_index = index; } ngx_io = ngx_os_io; ngx_event_actions = ngx_select_module_ctx.actions; - ngx_event_flags = NGX_USE_LEVEL_EVENT|NGX_USE_ONESHOT_EVENT; + ngx_event_flags = NGX_USE_LEVEL_EVENT; #if (NGX_WIN32) max_read = max_write = 0; @@ -138,9 +123,6 @@ static void ngx_select_done(ngx_cycle_t *cycle) { ngx_free(event_index); -#if 0 - ngx_free(ready_index); -#endif event_index = NULL; } @@ -262,41 +244,22 @@ ngx_select_del_event(ngx_event_t *ev, int event, u_int flags) static ngx_int_t -ngx_select_process_events(ngx_cycle_t *cycle) +ngx_select_process_events(ngx_cycle_t *cycle, ngx_msec_t timer, + ngx_uint_t flags) { - int ready, nready; - ngx_uint_t i, found, lock; - ngx_err_t err; - ngx_msec_t timer, delta; - ngx_event_t *ev; - ngx_connection_t *c; - struct timeval tv, *tp; -#if (NGX_HAVE_SELECT_CHANGE_TIMEOUT) - static ngx_msec_t deltas = 0; + int ready, nready; + ngx_uint_t i, found; + ngx_err_t err; + ngx_msec_t delta; + ngx_event_t *ev, **queue; + ngx_connection_t *c; + struct timeval tv, *tp; +#if !(NGX_WIN32) + ngx_uint_t level; #endif - timer = ngx_event_find_timer(); - #if !(NGX_WIN32) - if (ngx_accept_mutex) { - if (ngx_accept_disabled > 0) { - ngx_accept_disabled--; - - } else { - if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) { - return NGX_ERROR; - } - - if (ngx_accept_mutex_held == 0 - && (timer == NGX_TIMER_INFINITE - || timer > ngx_accept_mutex_delay)) - { - timer = ngx_accept_mutex_delay; - } - } - } - if (max_fd == -1) { for (i = 0; i < nevents; i++) { c = event_index[i]->data; @@ -353,9 +316,13 @@ ngx_select_process_events(ngx_cycle_t *cycle) #endif #if (NGX_WIN32) + ready = select(0, &work_read_fd_set, &work_write_fd_set, NULL, tp); + #else + ready = select(max_fd + 1, &work_read_fd_set, &work_write_fd_set, NULL, tp); + #endif if (ready == -1) { @@ -364,58 +331,14 @@ ngx_select_process_events(ngx_cycle_t *cycle) err = 0; } -#if (NGX_HAVE_SELECT_CHANGE_TIMEOUT) - - if (timer != NGX_TIMER_INFINITE) { - delta = timer - ((ngx_msec_t) tv.tv_sec * 1000 + tv.tv_usec / 1000); - - /* - * learn the real time and update the cached time - * if the sum of the last deltas overcomes 1 second - */ - - deltas += delta; - if (deltas > 1000) { - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); - deltas = tv.tv_usec / 1000; + delta = ngx_current_msec; - ngx_current_time = (ngx_msec_t) tv.tv_sec * 1000 - + tv.tv_usec / 1000; - } else { - ngx_current_time += delta; - } - - ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "select timer: %M, delta: %M", timer, delta); - - } else { - deltas = 0; - - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); - - delta = ngx_current_time; - ngx_current_time = (ngx_msec_t) tv.tv_sec * 1000 + tv.tv_usec / 1000; - - if (ready == 0) { - ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, - "select() returned no events without timeout"); - ngx_accept_mutex_unlock(); - return NGX_ERROR; - } + if (flags & NGX_UPDATE_TIME) { + ngx_time_update(0, 0); } -#else /* !(NGX_HAVE_SELECT_CHANGE_TIMEOUT) */ - - ngx_gettimeofday(&tv); - ngx_time_update(tv.tv_sec); - - delta = ngx_current_time; - ngx_current_time = (ngx_msec_t) tv.tv_sec * 1000 + tv.tv_usec / 1000; - if (timer != NGX_TIMER_INFINITE) { - delta = ngx_current_time - delta; + delta = ngx_current_msec - delta; ngx_log_debug2(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select timer: %M, delta: %M", timer, delta); @@ -424,34 +347,48 @@ ngx_select_process_events(ngx_cycle_t *cycle) if (ready == 0) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "select() returned no events without timeout"); - ngx_accept_mutex_unlock(); return NGX_ERROR; } } -#endif /* NGX_HAVE_SELECT_CHANGE_TIMEOUT */ - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, "select ready %d", ready); - if (err) { #if (NGX_WIN32) + + if (err) { ngx_log_error(NGX_LOG_ALERT, cycle->log, err, "select() failed"); -#else - ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT, - cycle->log, err, "select() failed"); -#endif - ngx_accept_mutex_unlock(); return NGX_ERROR; } +#else + + if (err) { + if (err == NGX_EINTR) { + + if (ngx_event_timer_alarm) { + ngx_event_timer_alarm = 0; + return NGX_OK; + } + + level = NGX_LOG_INFO; + + } else { + level = NGX_LOG_ALERT; + } - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - ngx_accept_mutex_unlock(); + ngx_log_error(level, cycle->log, err, "select() failed"); return NGX_ERROR; } - lock = 1; +#endif + + if (nevents == 0) { + return NGX_OK; + } + + ngx_mutex_lock(ngx_posted_events_mutex); + nready = 0; for (i = 0; i < nevents; i++) { @@ -477,110 +414,20 @@ ngx_select_process_events(ngx_cycle_t *cycle) if (found) { ev->ready = 1; - if (ev->oneshot) { - if (ev->timer_set) { - ngx_del_timer(ev); - } - - if (ev->write) { - ngx_select_del_event(ev, NGX_WRITE_EVENT, 0); - } else { - ngx_select_del_event(ev, NGX_READ_EVENT, 0); - } - } - - if (ev->accept) { - ev->next = accept_events; - accept_events = ev; - } else { - ngx_post_event(ev); - } + queue = (ngx_event_t **) (ev->accept ? &ngx_posted_accept_events: + &ngx_posted_events); + ngx_locked_post_event(ev, queue); nready++; - -#if 0 - ready_index[nready++] = ev; -#endif - } - } - -#if 0 - for (i = 0; i < nready; i++) { - ev = ready_index[i]; - ready--; - - if (!ev->active) { - continue; - } - - ev->ready = 1; - - if (ev->oneshot) { - if (ev->timer_set) { - ngx_del_timer(ev); - } - - if (ev->write) { - ngx_select_del_event(ev, NGX_WRITE_EVENT, 0); - } else { - ngx_select_del_event(ev, NGX_READ_EVENT, 0); - } } - - ev->handler(ev); } -#endif - - ev = accept_events; - for ( ;; ) { - - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "accept event %p", ev); - - if (ev == NULL) { - break; - } - - ngx_mutex_unlock(ngx_posted_events_mutex); - - ev->handler(ev); - - if (ngx_accept_disabled > 0) { - lock = 0; - break; - } - - ev = ev->next; - - if (ev == NULL) { - lock = 0; - break; - } - - if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { - ngx_accept_mutex_unlock(); - return NGX_ERROR; - } - } - - ngx_accept_mutex_unlock(); - accept_events = NULL; - - if (lock) { - ngx_mutex_unlock(ngx_posted_events_mutex); - } + ngx_mutex_unlock(ngx_posted_events_mutex); if (ready != nready) { ngx_log_error(NGX_LOG_ALERT, cycle->log, 0, "select ready != events"); } - ngx_event_expire_timers(); - - if (!ngx_threaded) { - ngx_event_process_posted(cycle); - } - return NGX_OK; } @@ -599,19 +446,25 @@ ngx_select_init_conf(ngx_cycle_t *cycle, void *conf) /* disable warning: the default FD_SETSIZE is 1024U in FreeBSD 5.x */ #if !(NGX_WIN32) + if ((unsigned) ecf->connections > FD_SETSIZE) { ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, "the maximum number of files " "supported by select() is " ngx_value(FD_SETSIZE)); return NGX_CONF_ERROR; } + #endif #if (NGX_THREADS) && !(NGX_WIN32) + ngx_log_error(NGX_LOG_EMERG, cycle->log, 0, "select() is not supported in the threaded mode"); return NGX_CONF_ERROR; + #else + return NGX_CONF_OK; + #endif } |