]> git.kaiwu.me - nginx.git/commitdiff
Stream: preread phase.
authorVladimir Homutov <vl@nginx.com>
Thu, 15 Sep 2016 11:56:02 +0000 (14:56 +0300)
committerVladimir Homutov <vl@nginx.com>
Thu, 15 Sep 2016 11:56:02 +0000 (14:56 +0300)
In this phase, head of a stream is read and analysed before proceeding to the
content phase.  Amount of data read is controlled by the module implementing
the phase, but not more than defined by the "preread_buffer_size" directive.
The time spent on processing preread is controlled by the "preread_timeout"
directive.

The typical preread phase module will parse the beginning of a stream and set
variable that may be used by the content phase, for example to make routing
decision.

src/stream/ngx_stream.c
src/stream/ngx_stream.h
src/stream/ngx_stream_core_module.c

index 9aed0a879ce2d410b46ca16aded13ef9c3acd76b..7312c3e2e86775183b4fd8449b54f6225fffe40d 100644 (file)
@@ -302,6 +302,13 @@ ngx_stream_init_phases(ngx_conf_t *cf, ngx_stream_core_main_conf_t *cmcf)
     }
 #endif
 
+    if (ngx_array_init(&cmcf->phases[NGX_STREAM_PREREAD_PHASE].handlers,
+                       cf->pool, 1, sizeof(ngx_stream_handler_pt))
+        != NGX_OK)
+    {
+        return NGX_ERROR;
+    }
+
     if (ngx_array_init(&cmcf->phases[NGX_STREAM_LOG_PHASE].handlers,
                        cf->pool, 1, sizeof(ngx_stream_handler_pt))
         != NGX_OK)
@@ -343,6 +350,10 @@ ngx_stream_init_phase_handlers(ngx_conf_t *cf,
 
         switch (i) {
 
+        case NGX_STREAM_PREREAD_PHASE:
+            checker = ngx_stream_core_preread_phase;
+            break;
+
         case NGX_STREAM_CONTENT_PHASE:
             ph->checker = ngx_stream_core_content_phase;
             n++;
index 0aded16919508c6ec229961654624fbfa0c109a7..deca8ae626e32db6f429fe6d92dedb1c5faafcd5 100644 (file)
@@ -122,6 +122,7 @@ typedef enum {
 #if (NGX_STREAM_SSL)
     NGX_STREAM_SSL_PHASE,
 #endif
+    NGX_STREAM_PREREAD_PHASE,
     NGX_STREAM_CONTENT_PHASE,
     NGX_STREAM_LOG_PHASE
 } ngx_stream_phases;
@@ -181,6 +182,8 @@ typedef struct {
     ngx_uint_t                     line;
 
     ngx_flag_t                     tcp_nodelay;
+    size_t                         preread_buffer_size;
+    ngx_msec_t                     preread_timeout;
 
     ngx_log_t                     *error_log;
 
@@ -280,6 +283,8 @@ typedef struct {
 void ngx_stream_core_run_phases(ngx_stream_session_t *s);
 ngx_int_t ngx_stream_core_generic_phase(ngx_stream_session_t *s,
     ngx_stream_phase_handler_t *ph);
+ngx_int_t ngx_stream_core_preread_phase(ngx_stream_session_t *s,
+    ngx_stream_phase_handler_t *ph);
 ngx_int_t ngx_stream_core_content_phase(ngx_stream_session_t *s,
     ngx_stream_phase_handler_t *ph);
 
index 3a9335e775509d3fa967e8b69e1b7e8bf6a3bddd..23644f3d26f676eee79d181731f39d48140e5497 100644 (file)
@@ -91,6 +91,20 @@ static ngx_command_t  ngx_stream_core_commands[] = {
       offsetof(ngx_stream_core_srv_conf_t, tcp_nodelay),
       NULL },
 
+    { ngx_string("preread_buffer_size"),
+      NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+      ngx_conf_set_size_slot,
+      NGX_STREAM_SRV_CONF_OFFSET,
+      offsetof(ngx_stream_core_srv_conf_t, preread_buffer_size),
+      NULL },
+
+    { ngx_string("preread_timeout"),
+      NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+      ngx_conf_set_msec_slot,
+      NGX_STREAM_SRV_CONF_OFFSET,
+      offsetof(ngx_stream_core_srv_conf_t, preread_timeout),
+      NULL },
+
       ngx_null_command
 };
 
@@ -153,7 +167,7 @@ ngx_stream_core_generic_phase(ngx_stream_session_t *s,
 
     /*
      * generic phase checker,
-     * used by all phases, except for content
+     * used by all phases, except for preread and content
      */
 
     ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
@@ -185,6 +199,112 @@ ngx_stream_core_generic_phase(ngx_stream_session_t *s,
 }
 
 
+ngx_int_t
+ngx_stream_core_preread_phase(ngx_stream_session_t *s,
+    ngx_stream_phase_handler_t *ph)
+{
+    size_t                       size;
+    ssize_t                      n;
+    ngx_int_t                    rc;
+    ngx_connection_t            *c;
+    ngx_stream_core_srv_conf_t  *cscf;
+
+    c = s->connection;
+
+    c->log->action = "prereading client data";
+
+    cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
+
+    if (c->read->timedout) {
+        rc = NGX_STREAM_OK;
+
+    } else if (c->read->timer_set) {
+        rc = NGX_AGAIN;
+
+    } else {
+        rc = ph->handler(s);
+    }
+
+    while (rc == NGX_AGAIN) {
+
+        if (c->buffer == NULL) {
+            c->buffer = ngx_create_temp_buf(c->pool, cscf->preread_buffer_size);
+            if (c->buffer == NULL) {
+                rc = NGX_ERROR;
+                break;
+            }
+        }
+
+        size = c->buffer->end - c->buffer->last;
+
+        if (size == 0) {
+            ngx_log_error(NGX_LOG_ERR, c->log, 0, "preread buffer full");
+            rc = NGX_STREAM_BAD_REQUEST;
+            break;
+        }
+
+        if (c->read->eof) {
+            rc = NGX_STREAM_OK;
+            break;
+        }
+
+        if (!c->read->ready) {
+            if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
+                rc = NGX_ERROR;
+                break;
+            }
+
+            if (!c->read->timer_set) {
+                ngx_add_timer(c->read, cscf->preread_timeout);
+            }
+
+            c->read->handler = ngx_stream_session_handler;
+
+            return NGX_OK;
+        }
+
+        n = c->recv(c, c->buffer->last, size);
+
+        if (n == NGX_ERROR) {
+            rc = NGX_STREAM_OK;
+            break;
+        }
+
+        if (n > 0) {
+            c->buffer->last += n;
+        }
+
+        rc = ph->handler(s);
+    }
+
+    if (c->read->timer_set) {
+        ngx_del_timer(c->read);
+    }
+
+    if (rc == NGX_OK) {
+        s->phase_handler = ph->next;
+        return NGX_AGAIN;
+    }
+
+    if (rc == NGX_DECLINED) {
+        s->phase_handler++;
+        return NGX_AGAIN;
+    }
+
+    if (rc == NGX_DONE) {
+        return NGX_OK;
+    }
+
+    if (rc == NGX_ERROR) {
+        rc = NGX_STREAM_INTERNAL_SERVER_ERROR;
+    }
+
+    ngx_stream_finalize_session(s, rc);
+
+    return NGX_OK;
+}
+
+
 ngx_int_t
 ngx_stream_core_content_phase(ngx_stream_session_t *s,
     ngx_stream_phase_handler_t *ph)
@@ -303,6 +423,8 @@ ngx_stream_core_create_srv_conf(ngx_conf_t *cf)
     cscf->resolver_timeout = NGX_CONF_UNSET_MSEC;
     cscf->proxy_protocol_timeout = NGX_CONF_UNSET_MSEC;
     cscf->tcp_nodelay = NGX_CONF_UNSET;
+    cscf->preread_buffer_size = NGX_CONF_UNSET_SIZE;
+    cscf->preread_timeout = NGX_CONF_UNSET_MSEC;
 
     return cscf;
 }
@@ -355,6 +477,12 @@ ngx_stream_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
 
     ngx_conf_merge_value(conf->tcp_nodelay, prev->tcp_nodelay, 1);
 
+    ngx_conf_merge_size_value(conf->preread_buffer_size,
+                              prev->preread_buffer_size, 16384);
+
+    ngx_conf_merge_msec_value(conf->preread_timeout,
+                              prev->preread_timeout, 30000);
+
     return NGX_CONF_OK;
 }