]> git.kaiwu.me - nginx.git/commitdiff
Threads: writing via threads pools in event pipe.
authorMaxim Dounin <mdounin@mdounin.ru>
Fri, 18 Mar 2016 03:44:49 +0000 (06:44 +0300)
committerMaxim Dounin <mdounin@mdounin.ru>
Fri, 18 Mar 2016 03:44:49 +0000 (06:44 +0300)
The "aio_write" directive is introduced, which enables use of aio
for writing.  Currently it is meaningful only with "aio threads".

Note that aio operations can be done by both event pipe and output
chain, so proper mapping between r->aio and p->aio is provided when
calling ngx_event_pipe() and in output filter.

In collaboration with Valentin Bartenev.

src/event/ngx_event_pipe.c
src/event/ngx_event_pipe.h
src/http/ngx_http_core_module.c
src/http/ngx_http_core_module.h
src/http/ngx_http_upstream.c

index 2d0e7d35e044c9dd4f3a3eef6f2148c5908be3c3..ee86c7e76b4ff4a2cc45fc266bad644efb08dd37 100644 (file)
@@ -112,6 +112,14 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
         return NGX_OK;
     }
 
+#if (NGX_THREADS)
+    if (p->aio) {
+        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
+                       "pipe read upstream: aio");
+        return NGX_AGAIN;
+    }
+#endif
+
     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    "pipe read upstream: %d", p->upstream->read->ready);
 
@@ -258,19 +266,6 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
                     break;
                 }
 
-                if (rc == NGX_AGAIN) {
-                    if (ngx_event_flags & NGX_USE_LEVEL_EVENT
-                        && p->upstream->read->active
-                        && p->upstream->read->ready)
-                    {
-                        if (ngx_del_event(p->upstream->read, NGX_READ_EVENT, 0)
-                            == NGX_ERROR)
-                        {
-                            return NGX_ABORT;
-                        }
-                    }
-                }
-
                 if (rc != NGX_OK) {
                     return rc;
                 }
@@ -475,8 +470,10 @@ ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
         ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                        "pipe write chain");
 
-        if (ngx_event_pipe_write_chain_to_temp_file(p) == NGX_ABORT) {
-            return NGX_ABORT;
+        rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+        if (rc != NGX_OK) {
+            return rc;
         }
     }
 
@@ -499,6 +496,18 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
     ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
                    "pipe write downstream: %d", downstream->write->ready);
 
+#if (NGX_THREADS)
+
+    if (p->writing) {
+        rc = ngx_event_pipe_write_chain_to_temp_file(p);
+
+        if (rc == NGX_ABORT) {
+            return NGX_ABORT;
+        }
+    }
+
+#endif
+
     flushed = 0;
 
     for ( ;; ) {
@@ -532,6 +541,10 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
                 p->out = NULL;
             }
 
+            if (p->writing) {
+                break;
+            }
+
             if (p->in) {
                 ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
                                "pipe write downstream flush in");
@@ -608,7 +621,7 @@ ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
 
                 p->out = p->out->next;
 
-            } else if (!p->cacheable && p->in) {
+            } else if (!p->cacheable && !p->writing && p->in) {
                 cl = p->in;
 
                 ngx_log_debug3(NGX_LOG_DEBUG_EVENT, p->log, 0,
@@ -710,12 +723,38 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
     ssize_t       size, bsize, n;
     ngx_buf_t    *b;
     ngx_uint_t    prev_last_shadow;
-    ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free, fl;
+    ngx_chain_t  *cl, *tl, *next, *out, **ll, **last_out, **last_free;
+
+#if (NGX_THREADS)
+
+    if (p->writing) {
+
+        if (p->aio) {
+            return NGX_AGAIN;
+        }
+
+        out = p->writing;
+        p->writing = NULL;
+
+        n = ngx_write_chain_to_temp_file(p->temp_file, NULL);
+
+        if (n == NGX_ERROR) {
+            return NGX_ABORT;
+        }
+
+        goto done;
+    }
+
+#endif
 
     if (p->buf_to_file) {
-        fl.buf = p->buf_to_file;
-        fl.next = p->in;
-        out = &fl;
+        out = ngx_alloc_chain_link(p->pool);
+        if (out == NULL) {
+            return NGX_ABORT;
+        }
+
+        out->buf = p->buf_to_file;
+        out->next = p->in;
 
     } else {
         out = p->in;
@@ -775,12 +814,31 @@ ngx_event_pipe_write_chain_to_temp_file(ngx_event_pipe_t *p)
         p->last_in = &p->in;
     }
 
+#if (NGX_THREADS)
+    p->temp_file->thread_write = p->thread_handler ? 1 : 0;
+    p->temp_file->file.thread_task = p->thread_task;
+    p->temp_file->file.thread_handler = p->thread_handler;
+    p->temp_file->file.thread_ctx = p->thread_ctx;
+#endif
+
     n = ngx_write_chain_to_temp_file(p->temp_file, out);
 
     if (n == NGX_ERROR) {
         return NGX_ABORT;
     }
 
+#if (NGX_THREADS)
+
+    if (n == NGX_AGAIN) {
+        p->writing = out;
+        p->thread_task = p->temp_file->file.thread_task;
+        return NGX_AGAIN;
+    }
+
+done:
+
+#endif
+
     if (p->buf_to_file) {
         p->temp_file->offset = p->buf_to_file->last - p->buf_to_file->pos;
         n -= p->buf_to_file->last - p->buf_to_file->pos;
index 451fc4c05513c6fdcfbcb5ca8d2a7e5d3b7e9b38..ef2e7a0064a467f9d14f67abd4fc34708a1e4da3 100644 (file)
@@ -30,6 +30,8 @@ struct ngx_event_pipe_s {
     ngx_chain_t       *in;
     ngx_chain_t      **last_in;
 
+    ngx_chain_t       *writing;
+
     ngx_chain_t       *out;
     ngx_chain_t       *free;
     ngx_chain_t       *busy;
@@ -45,6 +47,13 @@ struct ngx_event_pipe_s {
     ngx_event_pipe_output_filter_pt   output_filter;
     void                             *output_ctx;
 
+#if (NGX_THREADS)
+    ngx_int_t                       (*thread_handler)(ngx_thread_task_t *task,
+                                                      ngx_file_t *file);
+    void                             *thread_ctx;
+    ngx_thread_task_t                *thread_task;
+#endif
+
     unsigned           read:1;
     unsigned           cacheable:1;
     unsigned           single_buf:1;
@@ -56,6 +65,7 @@ struct ngx_event_pipe_s {
     unsigned           downstream_done:1;
     unsigned           downstream_error:1;
     unsigned           cyclic_temp_file:1;
+    unsigned           aio:1;
 
     ngx_int_t          allocated;
     ngx_bufs_t         bufs;
index 7b70a3f1fd18953768f2791c8432c6d70df26386..1ce1e23eae16115c2b1aa801511c39226d0fdc6e 100644 (file)
@@ -402,6 +402,13 @@ static ngx_command_t  ngx_http_core_commands[] = {
       0,
       NULL },
 
+    { ngx_string("aio_write"),
+      NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_FLAG,
+      ngx_conf_set_flag_slot,
+      NGX_HTTP_LOC_CONF_OFFSET,
+      offsetof(ngx_http_core_loc_conf_t, aio_write),
+      NULL },
+
     { ngx_string("read_ahead"),
       NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE1,
       ngx_conf_set_size_slot,
@@ -3608,6 +3615,7 @@ ngx_http_core_create_loc_conf(ngx_conf_t *cf)
     clcf->sendfile = NGX_CONF_UNSET;
     clcf->sendfile_max_chunk = NGX_CONF_UNSET_SIZE;
     clcf->aio = NGX_CONF_UNSET;
+    clcf->aio_write = NGX_CONF_UNSET;
 #if (NGX_THREADS)
     clcf->thread_pool = NGX_CONF_UNSET_PTR;
     clcf->thread_pool_value = NGX_CONF_UNSET_PTR;
@@ -3829,6 +3837,7 @@ ngx_http_core_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
                               prev->sendfile_max_chunk, 0);
 #if (NGX_HAVE_FILE_AIO || NGX_THREADS)
     ngx_conf_merge_value(conf->aio, prev->aio, NGX_HTTP_AIO_OFF);
+    ngx_conf_merge_value(conf->aio_write, prev->aio_write, 0);
 #endif
 #if (NGX_THREADS)
     ngx_conf_merge_ptr_value(conf->thread_pool, prev->thread_pool, NULL);
index dd434e4fa1ab8e8ab55180d6831c0bfd8c5b164a..961de3f9a0239dedce20213626f988f6689abe29 100644 (file)
@@ -404,6 +404,7 @@ struct ngx_http_core_loc_conf_s {
     ngx_flag_t    internal;                /* internal */
     ngx_flag_t    sendfile;                /* sendfile */
     ngx_flag_t    aio;                     /* aio */
+    ngx_flag_t    aio_write;               /* aio_write */
     ngx_flag_t    tcp_nopush;              /* tcp_nopush */
     ngx_flag_t    tcp_nodelay;             /* tcp_nodelay */
     ngx_flag_t    reset_timedout_connection; /* reset_timedout_connection */
index 4df485a996d09cd65cd3526b8ef2878d497e2896..67bd38333a68496f8200fbc9d1c7e355d80a662b 100644 (file)
@@ -76,6 +76,13 @@ static void
 static ngx_int_t ngx_http_upstream_non_buffered_filter_init(void *data);
 static ngx_int_t ngx_http_upstream_non_buffered_filter(void *data,
     ssize_t bytes);
+#if (NGX_THREADS)
+static ngx_int_t ngx_http_upstream_thread_handler(ngx_thread_task_t *task,
+    ngx_file_t *file);
+static void ngx_http_upstream_thread_event_handler(ngx_event_t *ev);
+#endif
+static ngx_int_t ngx_http_upstream_output_filter(void *data,
+    ngx_chain_t *chain);
 static void ngx_http_upstream_process_downstream(ngx_http_request_t *r);
 static void ngx_http_upstream_process_upstream(ngx_http_request_t *r,
     ngx_http_upstream_t *u);
@@ -2870,7 +2877,7 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
 
     p = u->pipe;
 
-    p->output_filter = (ngx_event_pipe_output_filter_pt) ngx_http_output_filter;
+    p->output_filter = ngx_http_upstream_output_filter;
     p->output_ctx = r;
     p->tag = u->output.tag;
     p->bufs = u->conf->bufs;
@@ -2913,6 +2920,13 @@ ngx_http_upstream_send_response(ngx_http_request_t *r, ngx_http_upstream_t *u)
     p->max_temp_file_size = u->conf->max_temp_file_size;
     p->temp_file_write_size = u->conf->temp_file_write_size;
 
+#if (NGX_THREADS)
+    if (clcf->aio == NGX_HTTP_AIO_THREADS && clcf->aio_write) {
+        p->thread_handler = ngx_http_upstream_thread_handler;
+        p->thread_ctx = r;
+    }
+#endif
+
     p->preread_bufs = ngx_alloc_chain_link(r->pool);
     if (p->preread_bufs == NULL) {
         ngx_http_upstream_finalize_request(r, u, NGX_ERROR);
@@ -3487,6 +3501,97 @@ ngx_http_upstream_non_buffered_filter(void *data, ssize_t bytes)
 }
 
 
+#if (NGX_THREADS)
+
+static ngx_int_t
+ngx_http_upstream_thread_handler(ngx_thread_task_t *task, ngx_file_t *file)
+{
+    ngx_str_t                  name;
+    ngx_event_pipe_t          *p;
+    ngx_thread_pool_t         *tp;
+    ngx_http_request_t        *r;
+    ngx_http_core_loc_conf_t  *clcf;
+
+    r = file->thread_ctx;
+    p = r->upstream->pipe;
+
+    clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
+    tp = clcf->thread_pool;
+
+    if (tp == NULL) {
+        if (ngx_http_complex_value(r, clcf->thread_pool_value, &name)
+            != NGX_OK)
+        {
+            return NGX_ERROR;
+        }
+
+        tp = ngx_thread_pool_get((ngx_cycle_t *) ngx_cycle, &name);
+
+        if (tp == NULL) {
+            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+                          "thread pool \"%V\" not found", &name);
+            return NGX_ERROR;
+        }
+    }
+
+    task->event.data = r;
+    task->event.handler = ngx_http_upstream_thread_event_handler;
+
+    if (ngx_thread_task_post(tp, task) != NGX_OK) {
+        return NGX_ERROR;
+    }
+
+    r->main->blocked++;
+    r->aio = 1;
+    p->aio = 1;
+
+    return NGX_OK;
+}
+
+
+static void
+ngx_http_upstream_thread_event_handler(ngx_event_t *ev)
+{
+    ngx_connection_t    *c;
+    ngx_http_request_t  *r;
+
+    r = ev->data;
+    c = r->connection;
+
+    ngx_http_set_log_request(c->log, r);
+
+    ngx_log_debug2(NGX_LOG_DEBUG_HTTP, c->log, 0,
+                   "http upstream thread: \"%V?%V\"", &r->uri, &r->args);
+
+    r->main->blocked--;
+    r->aio = 0;
+
+    r->write_event_handler(r);
+
+    ngx_http_run_posted_requests(c);
+}
+
+#endif
+
+
+static ngx_int_t
+ngx_http_upstream_output_filter(void *data, ngx_chain_t *chain)
+{
+    ngx_int_t            rc;
+    ngx_event_pipe_t    *p;
+    ngx_http_request_t  *r;
+
+    r = data;
+    p = r->upstream->pipe;
+
+    rc = ngx_http_output_filter(r, chain);
+
+    p->aio = r->aio;
+
+    return rc;
+}
+
+
 static void
 ngx_http_upstream_process_downstream(ngx_http_request_t *r)
 {
@@ -3505,6 +3610,10 @@ ngx_http_upstream_process_downstream(ngx_http_request_t *r)
 
     c->log->action = "sending to client";
 
+#if (NGX_THREADS)
+    p->aio = r->aio;
+#endif
+
     if (wev->timedout) {
 
         if (wev->delayed) {
@@ -3634,6 +3743,12 @@ ngx_http_upstream_process_request(ngx_http_request_t *r,
 
     p = u->pipe;
 
+#if (NGX_THREADS)
+    if (p->writing) {
+        return;
+    }
+#endif
+
     if (u->peer.connection) {
 
         if (u->store) {