]> git.kaiwu.me - nginx.git/commitdiff
Threads: offloading of temp files writing to thread pools.
authorMaxim Dounin <mdounin@mdounin.ru>
Fri, 18 Mar 2016 03:44:03 +0000 (06:44 +0300)
committerMaxim Dounin <mdounin@mdounin.ru>
Fri, 18 Mar 2016 03:44:03 +0000 (06:44 +0300)
The ngx_thread_write_chain_to_file() function introduced, which
uses ngx_file_t thread_handler, thread_ctx and thread_task fields.
The task context structure (ngx_thread_file_ctx_t) is the same for
both reading and writing, and can be safely shared as long as
operations are serialized.

The task->handler field is now always set (and not only when task is
allocated), as the same task can be used with different handlers.

The thread_write flag is introduced in the ngx_temp_file_t structure
to explicitly enable use of ngx_thread_write_chain_to_file() in
ngx_write_chain_to_temp_file() when supported by caller.

In collaboration with Valentin Bartenev.

src/core/ngx_file.c
src/core/ngx_file.h
src/os/unix/ngx_files.c
src/os/unix/ngx_files.h

index 3ebd73d8b4e05bfafde550fd92379e56216327b2..2dc2228655362f005accc666a8410e475d7b3c77 100644 (file)
@@ -124,6 +124,15 @@ ngx_write_chain_to_temp_file(ngx_temp_file_t *tf, ngx_chain_t *chain)
         }
     }
 
+#if (NGX_THREADS && NGX_HAVE_PWRITEV)
+
+    if (tf->thread_write) {
+        return ngx_thread_write_chain_to_file(&tf->file, chain, tf->offset,
+                                              tf->pool);
+    }
+
+#endif
+
     return ngx_write_chain_to_file(&tf->file, chain, tf->offset, tf->pool);
 }
 
index aeb6c0cb285c4819a53543fe05babd1ca9e0dd32..5f8228b7b4ba8b24829c7803eae3788eb7450b9a 100644 (file)
@@ -78,6 +78,7 @@ typedef struct {
     unsigned                   log_level:8;
     unsigned                   persistent:1;
     unsigned                   clean:1;
+    unsigned                   thread_write:1;
 } ngx_temp_file_t;
 
 
index 13b9e3f0b3cfc033c16ed5e5288bd20db7bb7471..bcef7ecb0cc43b7cb5c776ea0a80cadebebab272 100644 (file)
@@ -12,6 +12,7 @@
 #if (NGX_THREADS)
 #include <ngx_thread_pool.h>
 static void ngx_thread_read_handler(void *data, ngx_log_t *log);
+static void ngx_thread_write_chain_to_file_handler(void *data, ngx_log_t *log);
 #endif
 
 static ngx_chain_t *ngx_chain_to_iovec(ngx_iovec_t *vec, ngx_chain_t *cl);
@@ -77,14 +78,17 @@ ngx_read_file(ngx_file_t *file, u_char *buf, size_t size, off_t offset)
 #if (NGX_THREADS)
 
 typedef struct {
-    ngx_fd_t     fd;
-    u_char      *buf;
-    size_t       size;
-    off_t        offset;
+    ngx_fd_t       fd;
+    ngx_uint_t     write;   /* unsigned  write:1; */
 
-    size_t       read;
-    ngx_err_t    err;
-} ngx_thread_read_ctx_t;
+    u_char        *buf;
+    size_t         size;
+    ngx_chain_t   *chain;
+    off_t          offset;
+
+    size_t         nbytes;
+    ngx_err_t      err;
+} ngx_thread_file_ctx_t;
 
 
 ssize_t
@@ -92,7 +96,7 @@ ngx_thread_read(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
     ngx_pool_t *pool)
 {
     ngx_thread_task_t      *task;
-    ngx_thread_read_ctx_t  *ctx;
+    ngx_thread_file_ctx_t  *ctx;
 
     ngx_log_debug4(NGX_LOG_DEBUG_CORE, file->log, 0,
                    "thread read: %d, %p, %uz, %O",
@@ -101,13 +105,11 @@ ngx_thread_read(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
     task = file->thread_task;
 
     if (task == NULL) {
-        task = ngx_thread_task_alloc(pool, sizeof(ngx_thread_read_ctx_t));
+        task = ngx_thread_task_alloc(pool, sizeof(ngx_thread_file_ctx_t));
         if (task == NULL) {
             return NGX_ERROR;
         }
 
-        task->handler = ngx_thread_read_handler;
-
         file->thread_task = task;
     }
 
@@ -116,15 +118,25 @@ ngx_thread_read(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
     if (task->event.complete) {
         task->event.complete = 0;
 
+        if (ctx->write) {
+            ngx_log_error(NGX_LOG_ALERT, file->log, 0,
+                          "invalid thread call, read instead of write");
+            return NGX_ERROR;
+        }
+
         if (ctx->err) {
             ngx_log_error(NGX_LOG_CRIT, file->log, ctx->err,
                           "pread() \"%s\" failed", file->name.data);
             return NGX_ERROR;
         }
 
-        return ctx->read;
+        return ctx->nbytes;
     }
 
+    task->handler = ngx_thread_read_handler;
+
+    ctx->write = 0;
+
     ctx->fd = file->fd;
     ctx->buf = buf;
     ctx->size = size;
@@ -143,7 +155,7 @@ ngx_thread_read(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
 static void
 ngx_thread_read_handler(void *data, ngx_log_t *log)
 {
-    ngx_thread_read_ctx_t *ctx = data;
+    ngx_thread_file_ctx_t *ctx = data;
 
     ssize_t  n;
 
@@ -155,7 +167,7 @@ ngx_thread_read_handler(void *data, ngx_log_t *log)
         ctx->err = ngx_errno;
 
     } else {
-        ctx->read = n;
+        ctx->nbytes = n;
         ctx->err = 0;
     }
 
@@ -454,6 +466,132 @@ eintr:
 }
 
 
+#if (NGX_THREADS)
+
+ssize_t
+ngx_thread_write_chain_to_file(ngx_file_t *file, ngx_chain_t *cl, off_t offset,
+    ngx_pool_t *pool)
+{
+    ngx_thread_task_t      *task;
+    ngx_thread_file_ctx_t  *ctx;
+
+    ngx_log_debug3(NGX_LOG_DEBUG_CORE, file->log, 0,
+                   "thread write chain: %d, %p, %O",
+                   file->fd, cl, offset);
+
+    task = file->thread_task;
+
+    if (task == NULL) {
+        task = ngx_thread_task_alloc(pool,
+                                     sizeof(ngx_thread_file_ctx_t));
+        if (task == NULL) {
+            return NGX_ERROR;
+        }
+
+        file->thread_task = task;
+    }
+
+    ctx = task->ctx;
+
+    if (task->event.complete) {
+        task->event.complete = 0;
+
+        if (!ctx->write) {
+            ngx_log_error(NGX_LOG_ALERT, file->log, 0,
+                          "invalid thread call, write instead of read");
+            return NGX_ERROR;
+        }
+
+        if (ctx->err || ctx->nbytes == 0) {
+            ngx_log_error(NGX_LOG_CRIT, file->log, ctx->err,
+                          "pwritev() \"%s\" failed", file->name.data);
+            return NGX_ERROR;
+        }
+
+        file->offset += ctx->nbytes;
+        return ctx->nbytes;
+    }
+
+    task->handler = ngx_thread_write_chain_to_file_handler;
+
+    ctx->write = 1;
+
+    ctx->fd = file->fd;
+    ctx->chain = cl;
+    ctx->offset = offset;
+
+    if (file->thread_handler(task, file) != NGX_OK) {
+        return NGX_ERROR;
+    }
+
+    return NGX_AGAIN;
+}
+
+
+static void
+ngx_thread_write_chain_to_file_handler(void *data, ngx_log_t *log)
+{
+    ngx_thread_file_ctx_t *ctx = data;
+
+#if (NGX_HAVE_PWRITEV)
+
+    off_t          offset;
+    ssize_t        n;
+    ngx_err_t      err;
+    ngx_chain_t   *cl;
+    ngx_iovec_t    vec;
+    struct iovec   iovs[NGX_IOVS_PREALLOCATE];
+
+    vec.iovs = iovs;
+    vec.nalloc = NGX_IOVS_PREALLOCATE;
+
+    cl = ctx->chain;
+    offset = ctx->offset;
+
+    ctx->nbytes = 0;
+    ctx->err = 0;
+
+    do {
+        /* create the iovec and coalesce the neighbouring bufs */
+        cl = ngx_chain_to_iovec(&vec, cl);
+
+eintr:
+
+        n = pwritev(ctx->fd, iovs, vec.count, offset);
+
+        if (n == -1) {
+            err = ngx_errno;
+
+            if (err == NGX_EINTR) {
+                ngx_log_debug0(NGX_LOG_DEBUG_CORE, log, err,
+                               "pwritev() was interrupted");
+                goto eintr;
+            }
+
+            ctx->err = err;
+            return;
+        }
+
+        if ((size_t) n != vec.size) {
+            ctx->nbytes = 0;
+            return;
+        }
+
+        ctx->nbytes += n;
+        offset += n;
+    } while (cl);
+
+#else
+
+    ctx->err = NGX_ENOSYS;
+    return;
+
+#endif
+}
+
+#endif /* NGX_THREADS */
+
+
 ngx_int_t
 ngx_set_file_time(u_char *name, ngx_fd_t fd, time_t s)
 {
index 88b2f81cc7ad2ce593132311ddc0d70c3e9374f9..07872b1388178409eeb2754d3392799bfce6cb29 100644 (file)
@@ -387,6 +387,8 @@ extern ngx_uint_t  ngx_file_aio;
 #if (NGX_THREADS)
 ssize_t ngx_thread_read(ngx_file_t *file, u_char *buf, size_t size,
     off_t offset, ngx_pool_t *pool);
+ssize_t ngx_thread_write_chain_to_file(ngx_file_t *file, ngx_chain_t *cl,
+    off_t offset, ngx_pool_t *pool);
 #endif