]> git.kaiwu.me - nginx.git/commitdiff
gRPC: improved keepalive handling.
authorMaxim Dounin <mdounin@mdounin.ru>
Mon, 3 Sep 2018 16:34:01 +0000 (19:34 +0300)
committerMaxim Dounin <mdounin@mdounin.ru>
Mon, 3 Sep 2018 16:34:01 +0000 (19:34 +0300)
The code is now able to parse additional control frames after
the response is received, and can send control frames as well.
This fixes keepalive problems as observed with grpc-c, which can
send window update and ping frames after the response, see
http://mailman.nginx.org/pipermail/nginx/2018-August/056620.html.

src/http/modules/ngx_http_grpc_module.c

index 1ab1d2990ad8a15c166c4165fd58ff11f63d52bb..a2cf055de11c001e6806ee7325f702085f3d7d23 100644 (file)
@@ -114,6 +114,7 @@ typedef struct {
     unsigned                   output_closed:1;
     unsigned                   parsing_headers:1;
     unsigned                   end_stream:1;
+    unsigned                   done:1;
     unsigned                   status:1;
 
     ngx_http_request_t        *request;
@@ -1077,6 +1078,7 @@ ngx_http_grpc_reinit_request(ngx_http_request_t *r)
     ctx->output_closed = 0;
     ctx->parsing_headers = 0;
     ctx->end_stream = 0;
+    ctx->done = 0;
     ctx->status = 0;
     ctx->connection = NULL;
 
@@ -1096,6 +1098,7 @@ ngx_http_grpc_body_output_filter(void *data, ngx_chain_t *in)
     ngx_int_t               rc;
     ngx_uint_t              next, last;
     ngx_chain_t            *cl, *out, **ll;
+    ngx_http_upstream_t    *u;
     ngx_http_grpc_ctx_t    *ctx;
     ngx_http_grpc_frame_t  *f;
 
@@ -1410,6 +1413,28 @@ ngx_http_grpc_body_output_filter(void *data, ngx_chain_t *in)
         rc = NGX_AGAIN;
     }
 
+    if (ctx->done) {
+
+        /*
+         * We have already got the response and were sending some additional
+         * control frames.  Even if there is still something unsent, stop
+         * here anyway.
+         */
+
+        u = r->upstream;
+        u->length = 0;
+
+        if (ctx->in == NULL
+            && ctx->out == NULL
+            && ctx->output_closed
+            && ctx->state == ngx_http_grpc_st_start)
+        {
+            u->keepalive = 1;
+        }
+
+        ngx_post_event(u->peer.connection->read, &ngx_posted_events);
+    }
+
     return rc;
 }
 
@@ -1835,6 +1860,33 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
             rc = ngx_http_grpc_parse_frame(r, ctx, b);
 
             if (rc == NGX_AGAIN) {
+
+                if (ctx->done) {
+
+                    /*
+                     * We have finished parsing the response and the
+                     * remaining control frames.  If there are unsent
+                     * control frames, post a write event to send them.
+                     */
+
+                    if (ctx->out) {
+                        ngx_post_event(u->peer.connection->write,
+                                       &ngx_posted_events);
+                        return NGX_AGAIN;
+                    }
+
+                    u->length = 0;
+
+                    if (ctx->in == NULL
+                        && ctx->output_closed
+                        && ctx->state == ngx_http_grpc_st_start)
+                    {
+                        u->keepalive = 1;
+                    }
+
+                    break;
+                }
+
                 return NGX_AGAIN;
             }
 
@@ -1901,6 +1953,13 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
                 return NGX_ERROR;
             }
 
+            if (ctx->stream_id && ctx->done) {
+                ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+                              "upstream sent frame for closed stream %ui",
+                              ctx->stream_id);
+                return NGX_ERROR;
+            }
+
             ctx->padding = 0;
         }
 
@@ -1917,17 +1976,7 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
             ctx->state = ngx_http_grpc_st_start;
 
             if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
-                u->length = 0;
-
-                if (ctx->in == NULL
-                    && ctx->out == NULL
-                    && ctx->output_closed
-                    && b->last == b->pos)
-                {
-                    u->keepalive = 1;
-                }
-
-                break;
+                ctx->done = 1;
             }
 
             continue;
@@ -2097,17 +2146,8 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
                                    "grpc trailer done");
 
                     if (ctx->end_stream) {
-                        u->length = 0;
-
-                        if (ctx->in == NULL
-                            && ctx->out == NULL
-                            && ctx->output_closed
-                            && b->last == b->pos)
-                        {
-                            u->keepalive = 1;
-                        }
-
-                        return NGX_OK;
+                        ctx->done = 1;
+                        break;
                     }
 
                     ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
@@ -2124,6 +2164,10 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
                 return NGX_ERROR;
             }
 
+            if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
+                continue;
+            }
+
             /* rc == NGX_AGAIN */
 
             if (ctx->rest == 0) {
@@ -2240,17 +2284,7 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
         ctx->state = ngx_http_grpc_st_start;
 
         if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
-            u->length = 0;
-
-            if (ctx->in == NULL
-                && ctx->out == NULL
-                && ctx->output_closed
-                && b->last == b->pos)
-            {
-                u->keepalive = 1;
-            }
-
-            break;
+            ctx->done = 1;
         }
     }