aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/event/ngx_event_pipe.c98
-rw-r--r--src/event/ngx_event_pipe.h10
-rw-r--r--src/http/ngx_http_core_module.c9
-rw-r--r--src/http/ngx_http_core_module.h1
-rw-r--r--src/http/ngx_http_upstream.c117
5 files changed, 214 insertions, 21 deletions
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
index 2d0e7d35e..ee86c7e76 100644
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -112,6 +112,14 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
return NGX_OK;
}
+#if (NGX_THREADS)
+ if (p->aio) {
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "pipe read upstream: aio");
+ return NGX_AGAIN;
+ }
+#endif
+
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe read upstream: %d", p->upstream->read->ready);
@@ -258,19 +266,6 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
break;
}
- if (rc == NGX_AGAIN) {
- if (ngx_event_flags & NGX_USE_LEVEL_EVENT
- && p->upstream->read->active
- && p->upstream->read->ready)
- {
- if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
- == NGX_ERROR)
- {
- return NGX_ABORT;
- }
- }
- }
-
if (rc != NGX_OK) {
return rc;
}
@@ -475,8 +470,10 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write chain");
- if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
- return NGX_ABORT;
+ rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+ if (rc != NGX_OK) {
+ return rc;
}
}
@@ -499,6 +496,18 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream: %d", downstream->write->ready);
+#if (NGX_THREADS)
+
+ if (p->writing) {
+ rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+ if (rc == NGX_ABORT) {
+ return NGX_ABORT;
+ }
+ }
+
+#endif
+
flushed = 0;
for ( ;; ) {
@@ -532,6 +541,10 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
p->out = NULL;
}
+ if (p->writing) {
+ break;
+ }
+
if (p->in) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
"pipe write downstream flush in");
@@ -608,7 +621,7 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
p->out = p->out->next;
- } else if (!p->cacheable && p->in) {
+ } else if (!p->cacheable && !p->writing && p->in) {
cl = p->in;
ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
@@ -710,12 +723,38 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
ssize_t size, bsize, n;
ngx_buf_t *b;
ngx_uint_t prev_last_shadow;
- ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
+ ngx_chain_t *cl, *tl, *next, *out, **ll, **last_out, **last_free;
+
+#if (NGX_THREADS)
+
+ if (p->writing) {
+
+ if (p->aio) {
+ return NGX_AGAIN;
+ }
+
+ out = p->writing;
+ p->writing = NULL;
+
+ n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
+
+ if (n == NGX_ERROR) {
+ return NGX_ABORT;
+ }
+
+ goto done;
+ }
+
+#endif
if (p->buf_to_file) {
- fl.buf = p->buf_to_file;
- fl.next = p->in;
- out = &fl;
+ out = ngx_alloc_chain_link(p->pool);
+ if (out == NULL) {
+ return NGX_ABORT;
+ }
+
+ out->buf = p->buf_to_file;
+ out->next = p->in;
} else {
out = p->in;
@@ -775,12 +814,31 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
p->last_in = &p->in;
}
+#if (NGX_THREADS)
+ p->temp_file->thread_write = p->thread_handler ? 1 : 0;
+ p->temp_file->file.thread_task = p->thread_task;
+ p->temp_file->file.thread_handler = p->thread_handler;
+ p->temp_file->file.thread_ctx = p->thread_ctx;
+#endif
+
n = ngx_write_chain_to_temp_file(p->temp_file, out);
if (n == NGX_ERROR) {
return NGX_ABORT;
}
+#if (NGX_THREADS)
+
+ if (n == NGX_AGAIN) {
+ p->writing = out;
+ p->thread_task = p->temp_file->file.thread_task;
+ return NGX_AGAIN;
+ }
+
+done:
+
+#endif
+
if (p->buf_to_file) {
p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
n -= p->buf_to_file->last - p->buf_to_file->pos;
diff --git a/src/event/ngx_event_pipe.h b/src/event/ngx_event_pipe.h
index 451fc4c05..ef2e7a006 100644
--- a/src/event/ngx_event_pipe.h
+++ b/src/event/ngx_event_pipe.h
@@ -30,6 +30,8 @@ struct ngx_event_pipe_s {
ngx_chain_t *in;
ngx_chain_t **last_in;
+ ngx_chain_t *writing;
+
ngx_chain_t *out;
ngx_chain_t *free;
ngx_chain_t *busy;
@@ -45,6 +47,13 @@ struct ngx_event_pipe_s {
ngx_event_pipe_output_filter_pt output_filter;
void *output_ctx;
+#if (NGX_THREADS)
+ ngx_int_t (*thread_handler)(ngx_thread_task_t *task,
+ ngx_file_t *file);
+ void *thread_ctx;
+ ngx_thread_task_t *thread_task;
+#endif
+
unsigned read:1;
unsigned cacheable:1;
unsigned single_buf:1;
@@ -56,6 +65,7 @@ struct ngx_event_pipe_s {
unsigned downstream_done:1;
unsigned downstream_error:1;
unsigned cyclic_temp_file:1;
+ unsigned aio:1;
ngx_int_t allocated;
ngx_bufs_t bufs;
diff --git a/src/http/ngx_http_core_module.c b/src/http/ngx_http_core_module.c
index 7b70a3f1f..1ce1e23ea 100644
--- a/src/http/ngx_http_core_module.c
+++ b/src/http/ngx_http_core_module.c
@@ -402,6 +402,13 @@ static ngx_command_t ngx_http_core_commands[] = {
0,
NULL },
+ { ngx_string("aio_write"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG,
+ ngx_conf_set_flag_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_core_loc_conf_t, aio_write),
+ NULL },
+
{ ngx_string("read_ahead"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
@@ -3608,6 +3615,7 @@ ngx_http_core_create_loc_conf(ngx_conf_t *cf)
clcf->sendfile = NGX_CONF_UNSET;
clcf->sendfile_max_chunk = NGX_CONF_UNSET_SIZE;
clcf->aio = NGX_CONF_UNSET;
+ clcf->aio_write = NGX_CONF_UNSET;
#if (NGX_THREADS)
clcf->thread_pool = NGX_CONF_UNSET_PTR;
clcf->thread_pool_value = NGX_CONF_UNSET_PTR;
@@ -3829,6 +3837,7 @@ ngx_http_core_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
prev->sendfile_max_chunk, 0);
#if (NGX_HAVE_FILE_AIO || NGX_THREADS)
ngx_conf_merge_value(conf->aio, prev->aio, NGX_HTTP_AIO_OFF);
+ ngx_conf_merge_value(conf->aio_write, prev->aio_write, 0);
#endif
#if (NGX_THREADS)
ngx_conf_merge_ptr_value(conf->thread_pool, prev->thread_pool, NULL);
diff --git a/src/http/ngx_http_core_module.h b/src/http/ngx_http_core_module.h
index dd434e4fa..961de3f9a 100644
--- a/src/http/ngx_http_core_module.h
+++ b/src/http/ngx_http_core_module.h
@@ -404,6 +404,7 @@ struct ngx_http_core_loc_conf_s {
ngx_flag_t internal; /* internal */
ngx_flag_t sendfile; /* sendfile */
ngx_flag_t aio; /* aio */
+ ngx_flag_t aio_write; /* aio_write */
ngx_flag_t tcp_nopush; /* tcp_nopush */
ngx_flag_t tcp_nodelay; /* tcp_nodelay */
ngx_flag_t reset_timedout_connection; /* reset_timedout_connection */
diff --git a/src/http/ngx_http_upstream.c b/src/http/ngx_http_upstream.c
index 4df485a99..67bd38333 100644
--- a/src/http/ngx_http_upstream.c
+++ b/src/http/ngx_http_upstream.c
@@ -76,6 +76,13 @@ static void
static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data);
static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data,
ssize_t bytes);
+#if (NGX_THREADS)
+static ngx_int_t ngx_http_upstream_thread_handler(ngx_thread_task_t *task,
+ ngx_file_t *file);
+static void ngx_http_upstream_thread_event_handler(ngx_event_t *ev);
+#endif
+static ngx_int_t ngx_http_upstream_output_filter(void *data,
+ ngx_chain_t *chain);
static void ngx_http_upstream_process_downstream(ngx_http_request_t *r);
static void ngx_http_upstream_process_upstream(ngx_http_request_t *r,
ngx_http_upstream_t *u);
@@ -2870,7 +2877,7 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
p = u->pipe;
- p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
+ p->output_filter = ngx_http_upstream_output_filter;
p->output_ctx = r;
p->tag = u->output.tag;
p->bufs = u->conf->bufs;
@@ -2913,6 +2920,13 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
p->max_temp_file_size = u->conf->max_temp_file_size;
p->temp_file_write_size = u->conf->temp_file_write_size;
+#if (NGX_THREADS)
+ if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) {
+ p->thread_handler = ngx_http_upstream_thread_handler;
+ p->thread_ctx = r;
+ }
+#endif
+
p->preread_bufs = ngx_alloc_chain_link(r->pool);
if (p->preread_bufs == NULL) {
ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
@@ -3487,6 +3501,97 @@ ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
}
+#if (NGX_THREADS)
+
+static ngx_int_t
+ngx_http_upstream_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
+{
+ ngx_str_t name;
+ ngx_event_pipe_t *p;
+ ngx_thread_pool_t *tp;
+ ngx_http_request_t *r;
+ ngx_http_core_loc_conf_t *clcf;
+
+ r = file->thread_ctx;
+ p = r->upstream->pipe;
+
+ clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
+ tp = clcf->thread_pool;
+
+ if (tp == NULL) {
+ if (ngx_http_complex_value(r, clcf->thread_pool_value, &name)
+ != NGX_OK)
+ {
+ return NGX_ERROR;
+ }
+
+ tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name);
+
+ if (tp == NULL) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "thread pool \"%V\" not found", &name);
+ return NGX_ERROR;
+ }
+ }
+
+ task->event.data = r;
+ task->event.handler = ngx_http_upstream_thread_event_handler;
+
+ if (ngx_thread_task_post(tp, task) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ r->main->blocked++;
+ r->aio = 1;
+ p->aio = 1;
+
+ return NGX_OK;
+}
+
+
+static void
+ngx_http_upstream_thread_event_handler(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+ ngx_http_request_t *r;
+
+ r = ev->data;
+ c = r->connection;
+
+ ngx_http_set_log_request(c->log, r);
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
+ "http upstream thread: \"%V?%V\"", &r->uri, &r->args);
+
+ r->main->blocked--;
+ r->aio = 0;
+
+ r->write_event_handler(r);
+
+ ngx_http_run_posted_requests(c);
+}
+
+#endif
+
+
+static ngx_int_t
+ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain)
+{
+ ngx_int_t rc;
+ ngx_event_pipe_t *p;
+ ngx_http_request_t *r;
+
+ r = data;
+ p = r->upstream->pipe;
+
+ rc = ngx_http_output_filter(r, chain);
+
+ p->aio = r->aio;
+
+ return rc;
+}
+
+
static void
ngx_http_upstream_process_downstream(ngx_http_request_t *r)
{
@@ -3505,6 +3610,10 @@ ngx_http_upstream_process_downstream(ngx_http_request_t *r)
c->log->action = "sending to client";
+#if (NGX_THREADS)
+ p->aio = r->aio;
+#endif
+
if (wev->timedout) {
if (wev->delayed) {
@@ -3634,6 +3743,12 @@ ngx_http_upstream_process_request(ngx_http_request_t *r,
p = u->pipe;
+#if (NGX_THREADS)
+ if (p->writing) {
+ return;
+ }
+#endif
+
if (u->peer.connection) {
if (u->store) {