ev->disabled = 0;
ev->oneshot = (flags & NGX_ONESHOT_EVENT) ? 1 : 0;
- if (nchanges > 0
+ if (ngx_thread_main()
+ && nchanges > 0
&& ev->index < (u_int) nchanges
&& ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1)
== (uintptr_t) ev)
ev->disabled = 0;
ev->posted = 0;
- if (nchanges > 0
+ if (ngx_thread_main()
+ && nchanges > 0
&& ev->index < (u_int) nchanges
&& ((uintptr_t) change_list[ev->index].udata & (uintptr_t) ~1)
== (uintptr_t) ev)
{
struct timespec ts;
ngx_connection_t *c;
+ struct kevent *kev, kv;
c = ev->data;
"kevent set event: %d: ft:%d fl:%04X",
c->fd, filter, flags);
- if (nchanges >= max_changes) {
+ if (ngx_thread_main() && nchanges >= max_changes) {
ngx_log_error(NGX_LOG_WARN, ev->log, 0,
"kqueue change list is filled up");
nchanges = 0;
}
- change_list[nchanges].ident = c->fd;
- change_list[nchanges].filter = filter;
- change_list[nchanges].flags = flags;
- change_list[nchanges].udata = (void *) ((uintptr_t) ev | ev->instance);
+ kev = ngx_thread_main() ? &change_list[nchanges] : &kv;
+
+ kev->ident = c->fd;
+ kev->filter = filter;
+ kev->flags = flags;
+ kev->udata = (void *) ((uintptr_t) ev | ev->instance);
if (filter == EVFILT_VNODE) {
- change_list[nchanges].fflags = NOTE_DELETE|NOTE_WRITE|NOTE_EXTEND
- |NOTE_ATTRIB|NOTE_RENAME
+ kev->fflags = NOTE_DELETE|NOTE_WRITE|NOTE_EXTEND
+ |NOTE_ATTRIB|NOTE_RENAME
#if (__FreeBSD__ == 4 && __FreeBSD_version >= 430000) \
|| __FreeBSD_version >= 500018
- |NOTE_REVOKE
+ |NOTE_REVOKE
#endif
;
- change_list[nchanges].data = 0;
+ kev->data = 0;
} else {
#if (HAVE_LOWAT_EVENT)
if (flags & NGX_LOWAT_EVENT) {
- change_list[nchanges].fflags = NOTE_LOWAT;
- change_list[nchanges].data = ev->available;
+ kev->fflags = NOTE_LOWAT;
+ kev->data = ev->available;
} else {
- change_list[nchanges].fflags = 0;
- change_list[nchanges].data = 0;
+ kev->fflags = 0;
+ kev->data = 0;
}
#else
- change_list[nchanges].fflags = 0;
- change_list[nchanges].data = 0;
+ kev->fflags = 0;
+ kev->data = 0;
#endif
}
- ev->index = nchanges;
+ if (ngx_thread_main()) {
+ ev->index = nchanges;
+ nchanges++;
+
+ } else {
+ ts.tv_sec = 0;
+ ts.tv_nsec = 0;
- nchanges++;
+ if (kevent(ngx_kqueue, &kv, 1, NULL, 0, &ts) == -1) {
+ ngx_log_error(NGX_LOG_ALERT, ev->log, ngx_errno, "kevent() failed");
+ return NGX_ERROR;
+ }
+ }
return NGX_OK;
}
|| (ev->use_instance && ev->instance != ev->returned_instance))
{
/*
- * the stale event from a file descriptor
- * that was just closed in this iteration
+ * The stale event from a file descriptor that was just
+ * closed in this iteration. We use ngx_cycle->log
+ * because ev->log may be already destoyed.
*/
- ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ev->log, 0,
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, ngx_cycle->log, 0,
"kevent: stale event " PTR_FMT, ev);
+ ngx_unlock(ev->lock);
+
ev = ev->next;
continue;
#define ngx_post_event(ev) \
+ if (!ev->posted) { \
ev->next = (ngx_event_t *) ngx_posted_events; \
ngx_posted_events = ev; \
- ev->posted = 1;
+ ev->posted = 1; \
+ }
void ngx_event_process_posted(ngx_cycle_t *cycle);
ev = (ngx_event_t *)
((char *) node - offsetof(ngx_event_t, rbtree_key));
+#if (NGX_THREADS)
if (ngx_trylock(ev->lock) == 0) {
break;
}
+#endif
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, ev->log, 0,
"event timer del: %d: %d",
ev->timer_set = 0;
ev->timedout = 1;
+#if (NGX_THREADS)
ngx_unlock(ev->lock);
+#endif
if (ngx_threaded) {
if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
void ngx_http_close_connection(ngx_connection_t *c)
{
+ ngx_socket_t fd;
+
ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
"close http connection: %d", c->fd);
}
}
- if (ngx_close_socket(c->fd) == -1) {
- ngx_log_error(NGX_LOG_ALERT, c->log, ngx_socket_errno,
- ngx_close_socket_n " failed");
- }
+ fd = c->fd;
c->fd = (ngx_socket_t) -1;
c->data = NULL;
-
ngx_destroy_pool(c->pool);
+ /*
+ * we has to clean the connection before the closing because another thread
+ * may reopen the same file descriptor before we clean the connection
+ */
+
+ if (ngx_close_socket(fd) == -1) {
+
+ /* we use ngx_cycle->log because c->log was in c->pool */
+
+ ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, ngx_socket_errno,
+ ngx_close_socket_n " failed");
+ }
+
return;
}
return NGX_ERROR;
}
+ ngx_log_debug2(NGX_LOG_DEBUG_CORE, m->log, 0,
+ "mutex waked up " PTR_FMT " lock:%X",
+ m, m->lock);
+
tries = 0;
old = m->lock;
continue;
/* wake up the thread that waits on semaphore */
+ ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
+ "wake up mutex " PTR_FMT "", m);
+
op.sem_num = 0;
op.sem_op = 1;
op.sem_flg = SEM_UNDO;
}
+#define ngx_thread_main() (ngx_gettid() == 0)
+
#else /* use pthreads */
#define ngx_mutex_lock(m) NGX_OK
#define ngx_mutex_unlock(m)
+#define ngx_cond_signal(cv)
+
+#define ngx_thread_main() 1
+
#endif