diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/event/modules/ngx_kqueue_module.c | 7 | ||||
-rw-r--r-- | src/event/ngx_event.c | 6 | ||||
-rw-r--r-- | src/event/ngx_event.h | 2 | ||||
-rw-r--r-- | src/event/ngx_event_accept.c | 9 | ||||
-rw-r--r-- | src/event/ngx_event_connect.c | 5 | ||||
-rw-r--r-- | src/event/ngx_event_connect.h | 3 | ||||
-rw-r--r-- | src/event/ngx_event_posted.c | 75 | ||||
-rw-r--r-- | src/event/ngx_event_posted.h | 12 | ||||
-rw-r--r-- | src/http/modules/proxy/ngx_http_proxy_handler.c | 31 | ||||
-rw-r--r-- | src/http/modules/proxy/ngx_http_proxy_upstream.c | 3 | ||||
-rw-r--r-- | src/http/ngx_http_request.c | 20 | ||||
-rw-r--r-- | src/os/unix/ngx_freebsd_rfork_thread.c | 12 | ||||
-rw-r--r-- | src/os/unix/ngx_process_cycle.c | 7 |
13 files changed, 119 insertions, 73 deletions
diff --git a/src/event/modules/ngx_kqueue_module.c b/src/event/modules/ngx_kqueue_module.c index 865b343a6..8039c3d58 100644 --- a/src/event/modules/ngx_kqueue_module.c +++ b/src/event/modules/ngx_kqueue_module.c @@ -157,7 +157,6 @@ static ngx_int_t ngx_kqueue_init(ngx_cycle_t *cycle) #if (HAVE_LOWAT_EVENT) |NGX_HAVE_LOWAT_EVENT #endif - |NGX_HAVE_INSTANCE_EVENT |NGX_HAVE_KQUEUE_EVENT; return NGX_OK; @@ -370,7 +369,7 @@ static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle) for ( ;; ) { timer = ngx_event_find_timer(); -#if (NGX_THREADS0) +#if (NGX_THREADS) if (timer == NGX_TIMER_ERROR) { return NGX_ERROR; } @@ -522,13 +521,11 @@ static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle) ngx_kqueue_dump_event(ev->log, &event_list[i]); } - ev->returned_instance = instance; - #if (NGX_THREADS) if (ngx_threaded && !ev->accept) { ev->posted_ready = 1; - ev->posted_available += event_list[i].data; + ev->posted_available = event_list[i].data; if (event_list[i].flags & EV_EOF) { ev->posted_eof = 1; diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c index bf748f174..2690617b2 100644 --- a/src/event/ngx_event.c +++ b/src/event/ngx_event.c @@ -279,6 +279,9 @@ static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) rev = cycle->read_events; for (i = 0; i < cycle->connection_n; i++) { rev[i].closed = 1; +#if (NGX_THREADS) + rev[i].lock = &c[i].lock; +#endif } cycle->write_events = ngx_alloc(sizeof(ngx_event_t) * ecf->connections, @@ -290,6 +293,9 @@ static ngx_int_t ngx_event_process_init(ngx_cycle_t *cycle) wev = cycle->write_events; for (i = 0; i < cycle->connection_n; i++) { wev[i].closed = 1; +#if (NGX_THREADS) + wev[i].lock = &c[i].lock; +#endif } /* for each listening socket */ diff --git a/src/event/ngx_event.h b/src/event/ngx_event.h index 15654379c..05e1fb275 100644 --- a/src/event/ngx_event.h +++ b/src/event/ngx_event.h @@ -73,8 +73,6 @@ struct ngx_event_s { unsigned deferred_accept:1; - unsigned overflow:1; - /* the pending eof reported by kqueue or in aio chain operation */ unsigned pending_eof:1; diff --git a/src/event/ngx_event_accept.c b/src/event/ngx_event_accept.c index 6401b6aef..7de064f71 100644 --- a/src/event/ngx_event_accept.c +++ b/src/event/ngx_event_accept.c @@ -210,12 +210,17 @@ void ngx_event_accept(ngx_event_t *ev) rinstance = rev->returned_instance; winstance = wev->returned_instance; +#if (NGX_THREADS) + if (*(rev->lock)) { + ngx_spinlock(rev->lock, 1000); + ngx_unlock(rev->lock); + } +#endif + ngx_memzero(rev, sizeof(ngx_event_t)); ngx_memzero(wev, sizeof(ngx_event_t)); ngx_memzero(c, sizeof(ngx_connection_t)); - /* ngx_memzero(c) does ngx_unlock(&c->lock); */ - c->pool = pool; c->listening = ls->listening; diff --git a/src/event/ngx_event_connect.c b/src/event/ngx_event_connect.c index db56f10a3..4eecd8846 100644 --- a/src/event/ngx_event_connect.c +++ b/src/event/ngx_event_connect.c @@ -228,6 +228,11 @@ int ngx_event_connect_peer(ngx_peer_connection_t *pc) c->number = ngx_atomic_inc(ngx_connection_counter); +#if (NGX_THREADS) + rev->lock = pc->lock; + wev->lock = pc->lock; +#endif + if (ngx_add_conn) { if (ngx_add_conn(c) == NGX_ERROR) { return NGX_ERROR; diff --git a/src/event/ngx_event_connect.h b/src/event/ngx_event_connect.h index 1534c1436..01299f12b 100644 --- a/src/event/ngx_event_connect.h +++ b/src/event/ngx_event_connect.h @@ -41,6 +41,9 @@ typedef struct { int tries; ngx_connection_t *connection; +#if (NGX_THREADS) + ngx_atomic_t *lock; +#endif int rcvbuf; diff --git a/src/event/ngx_event_posted.c b/src/event/ngx_event_posted.c index 71853edf4..41be0406f 100644 --- a/src/event/ngx_event_posted.c +++ b/src/event/ngx_event_posted.c @@ -28,27 +28,6 @@ void ngx_event_process_posted(ngx_cycle_t *cycle) ngx_delete_posted_event(ev); -#if 0 - /* do not check instance ??? */ - - if (ev->accept) { - continue; - } -#endif - - if (ev->closed - || (ev->use_instance && ev->instance != ev->returned_instance)) - { - /* - * the stale event from a file descriptor - * that was just closed in this iteration - */ - - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "stale posted event " PTR_FMT, ev); - continue; - } - ev->event_handler(ev); } } @@ -58,7 +37,30 @@ void ngx_event_process_posted(ngx_cycle_t *cycle) void ngx_wakeup_worker_thread(ngx_cycle_t *cycle) { - ngx_int_t i; + ngx_int_t i; + ngx_uint_t busy; + ngx_event_t *ev; + +#if 0 + busy = 1; + + if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) { + return; + } + + for (ev = (ngx_event_t *) ngx_posted_events; ev; ev = ev->next) { + if (*(ev->lock) == 0) { + busy = 0; + break; + } + } + + ngx_mutex_unlock(ngx_posted_events_mutex); + + if (busy) { + return; + } +#endif for (i = 0; i < ngx_threads_n; i++) { if (ngx_threads[i].state == NGX_THREAD_FREE) { @@ -97,46 +99,25 @@ ngx_int_t ngx_event_thread_process_posted(ngx_cycle_t *cycle) ngx_delete_posted_event(ev); - ngx_log_debug3(NGX_LOG_DEBUG_EVENT, cycle->log, 0, - "event instance: c:%d i:%d r:%d", - ev->closed, ev->instance, ev->returned_instance); - - if (ev->closed - || (ev->use_instance && ev->instance != ev->returned_instance)) - { - /* - * The stale event from a file descriptor that was just - * closed in this iteration. We use ngx_cycle->log - * because ev->log may be already destoyed. - */ - - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0, - "stale posted event " PTR_FMT, ev); - - ngx_unlock(ev->lock); - - ev = ev->next; - - continue; - } - ev->locked = 1; ev->ready |= ev->posted_ready; ev->timedout |= ev->posted_timedout; - ev->available |= ev->posted_available; ev->pending_eof |= ev->posted_eof; #if (HAVE_KQUEUE) ev->kq_errno |= ev->posted_errno; #endif + if (ev->posted_available) { + ev->available = ev->posted_available; + } ev->posted_ready = 0; ev->posted_timedout = 0; - ev->posted_available = 0; ev->posted_eof = 0; #if (HAVE_KQUEUE) ev->posted_errno = 0; #endif + ev->posted_available = 0; ngx_mutex_unlock(ngx_posted_events_mutex); diff --git a/src/event/ngx_event_posted.h b/src/event/ngx_event_posted.h index cb916dbe1..5a3039357 100644 --- a/src/event/ngx_event_posted.h +++ b/src/event/ngx_event_posted.h @@ -12,8 +12,14 @@ ev->next = (ngx_event_t *) ngx_posted_events; \ ev->prev = (ngx_event_t **) &ngx_posted_events; \ ngx_posted_events = ev; \ - ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, 0, \ + if (ev->next) { \ + ev->next->prev = &ev->next; \ + } \ + ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, \ "post event " PTR_FMT, ev); \ + } else { \ + ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, \ + "update posted event " PTR_FMT, ev); \ } #define ngx_delete_posted_event(ev) \ @@ -21,7 +27,9 @@ if (ev->next) { \ ev->next->prev = ev->prev; \ } \ - ev->prev = NULL; + ev->prev = NULL; \ + ngx_log_debug1(NGX_LOG_DEBUG_CORE, ev->log, 0, \ + "delete posted event " PTR_FMT, ev); diff --git a/src/http/modules/proxy/ngx_http_proxy_handler.c b/src/http/modules/proxy/ngx_http_proxy_handler.c index 5d646bb1c..5479717b9 100644 --- a/src/http/modules/proxy/ngx_http_proxy_handler.c +++ b/src/http/modules/proxy/ngx_http_proxy_handler.c @@ -608,6 +608,7 @@ void ngx_http_proxy_finalize_request(ngx_http_proxy_ctx_t *p, int rc) void ngx_http_proxy_close_connection(ngx_http_proxy_ctx_t *p) { + ngx_socket_t fd; ngx_connection_t *c; c = p->upstream->peer.connection; @@ -650,12 +651,36 @@ void ngx_http_proxy_close_connection(ngx_http_proxy_ctx_t *p) } } - if (ngx_close_socket(c->fd) == -1) { - ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno, - ngx_close_socket_n " failed"); + /* + * we have to clean the connection information before the closing + * because another thread may reopen the same file descriptor + * before we clean the connection + */ + + if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_OK) { + + if (c->read->prev) { + ngx_delete_posted_event(c->read); + } + + if (c->write->prev) { + ngx_delete_posted_event(c->write); + } + + c->read->closed = 1; + c->write->closed = 1; + + ngx_mutex_unlock(ngx_posted_events_mutex); } + fd = c->fd; c->fd = (ngx_socket_t) -1; + c->data = NULL; + + if (ngx_close_socket(fd) == -1) { + ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno, + ngx_close_socket_n " failed"); + } } diff --git a/src/http/modules/proxy/ngx_http_proxy_upstream.c b/src/http/modules/proxy/ngx_http_proxy_upstream.c index 9ed4d7f61..335d1dacd 100644 --- a/src/http/modules/proxy/ngx_http_proxy_upstream.c +++ b/src/http/modules/proxy/ngx_http_proxy_upstream.c @@ -62,6 +62,9 @@ int ngx_http_proxy_request_upstream(ngx_http_proxy_ctx_t *p) u->peer.log_error = NGX_ERROR_ERR; u->peer.peers = p->lcf->peers; u->peer.tries = p->lcf->peers->number; +#if (NGX_THREADS) + u->peer.lock = &r->connection->lock; +#endif u->method = r->method; diff --git a/src/http/ngx_http_request.c b/src/http/ngx_http_request.c index 74618313b..e2c28913d 100644 --- a/src/http/ngx_http_request.c +++ b/src/http/ngx_http_request.c @@ -1749,13 +1749,6 @@ void ngx_http_close_connection(ngx_connection_t *c) if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_OK) { - ngx_unlock(&c->lock); - c->read->locked = 0; - c->write->locked = 0; - - c->read->closed = 1; - c->write->closed = 1; - if (c->read->prev) { ngx_delete_posted_event(c->read); } @@ -1764,14 +1757,18 @@ void ngx_http_close_connection(ngx_connection_t *c) ngx_delete_posted_event(c->write); } + c->read->closed = 1; + c->write->closed = 1; + + ngx_unlock(&c->lock); + c->read->locked = 0; + c->write->locked = 0; + ngx_mutex_unlock(ngx_posted_events_mutex); } #else - c->read->closed = 1; - c->write->closed = 1; - if (c->read->prev) { ngx_delete_posted_event(c->read); } @@ -1780,6 +1777,9 @@ void ngx_http_close_connection(ngx_connection_t *c) ngx_delete_posted_event(c->write); } + c->read->closed = 1; + c->write->closed = 1; + #endif fd = c->fd; diff --git a/src/os/unix/ngx_freebsd_rfork_thread.c b/src/os/unix/ngx_freebsd_rfork_thread.c index 6a367bdbc..fcc716c99 100644 --- a/src/os/unix/ngx_freebsd_rfork_thread.c +++ b/src/os/unix/ngx_freebsd_rfork_thread.c @@ -676,14 +676,24 @@ ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m) ngx_int_t ngx_cond_signal(ngx_cond_t *cv) { + ngx_err_t err; + ngx_log_debug3(NGX_LOG_DEBUG_CORE, cv->log, 0, "cv " PTR_FMT " to signal " PID_T_FMT " %d", cv, cv->tid, cv->signo); if (kill(cv->tid, cv->signo) == -1) { - ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, + + err = ngx_errno; + + ngx_log_error(NGX_LOG_ALERT, cv->log, err, "kill() failed while signaling condition variable " PTR_FMT, cv); + + if (err == NGX_ESRCH) { + cv->tid = -1; + } + return NGX_ERROR; } diff --git a/src/os/unix/ngx_process_cycle.c b/src/os/unix/ngx_process_cycle.c index 97b301459..1a5e5c420 100644 --- a/src/os/unix/ngx_process_cycle.c +++ b/src/os/unix/ngx_process_cycle.c @@ -792,7 +792,12 @@ static void ngx_wakeup_worker_threads(ngx_cycle_t *cycle) for (i = 0; i < ngx_threads_n; i++) { if (ngx_threads[i].state < NGX_THREAD_EXIT) { ngx_cond_signal(ngx_threads[i].cv); - live = 1; + + if (ngx_threads[i].cv->tid == -1) { + ngx_threads[i].state = NGX_THREAD_DONE; + } else { + live = 1; + } } if (ngx_threads[i].state == NGX_THREAD_EXIT) { |