diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/http/modules/ngx_http_grpc_module.c | 100 |
1 files changed, 67 insertions, 33 deletions
diff --git a/src/http/modules/ngx_http_grpc_module.c b/src/http/modules/ngx_http_grpc_module.c index 7fbf7366f..3a9b950b5 100644 --- a/src/http/modules/ngx_http_grpc_module.c +++ b/src/http/modules/ngx_http_grpc_module.c @@ -111,6 +111,7 @@ typedef struct { unsigned output_closed:1; unsigned parsing_headers:1; unsigned end_stream:1; + unsigned done:1; unsigned status:1; ngx_http_request_t *request; @@ -1074,6 +1075,7 @@ ngx_http_grpc_reinit_request(ngx_http_request_t *r) ctx->output_closed = 0; ctx->parsing_headers = 0; ctx->end_stream = 0; + ctx->done = 0; ctx->status = 0; ctx->connection = NULL; @@ -1093,6 +1095,7 @@ ngx_http_grpc_body_output_filter(void *data, ngx_chain_t *in) ngx_int_t rc; ngx_uint_t next, last; ngx_chain_t *cl, *out, **ll; + ngx_http_upstream_t *u; ngx_http_grpc_ctx_t *ctx; ngx_http_grpc_frame_t *f; @@ -1407,6 +1410,28 @@ ngx_http_grpc_body_output_filter(void *data, ngx_chain_t *in) rc = NGX_AGAIN; } + if (ctx->done) { + + /* + * We have already got the response and were sending some additional + * control frames. Even if there is still something unsent, stop + * here anyway. + */ + + u = r->upstream; + u->length = 0; + + if (ctx->in == NULL + && ctx->out == NULL + && ctx->output_closed + && ctx->state == ngx_http_grpc_st_start) + { + u->keepalive = 1; + } + + ngx_post_event(u->peer.connection->read, &ngx_posted_events); + } + return rc; } @@ -1832,6 +1857,33 @@ ngx_http_grpc_filter(void *data, ssize_t bytes) rc = ngx_http_grpc_parse_frame(r, ctx, b); if (rc == NGX_AGAIN) { + + if (ctx->done) { + + /* + * We have finished parsing the response and the + * remaining control frames. If there are unsent + * control frames, post a write event to send them. + */ + + if (ctx->out) { + ngx_post_event(u->peer.connection->write, + &ngx_posted_events); + return NGX_AGAIN; + } + + u->length = 0; + + if (ctx->in == NULL + && ctx->output_closed + && ctx->state == ngx_http_grpc_st_start) + { + u->keepalive = 1; + } + + break; + } + return NGX_AGAIN; } @@ -1898,6 +1950,13 @@ ngx_http_grpc_filter(void *data, ssize_t bytes) return NGX_ERROR; } + if (ctx->stream_id && ctx->done) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "upstream sent frame for closed stream %ui", + ctx->stream_id); + return NGX_ERROR; + } + ctx->padding = 0; } @@ -1914,17 +1973,7 @@ ngx_http_grpc_filter(void *data, ssize_t bytes) ctx->state = ngx_http_grpc_st_start; if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) { - u->length = 0; - - if (ctx->in == NULL - && ctx->out == NULL - && ctx->output_closed - && b->last == b->pos) - { - u->keepalive = 1; - } - - break; + ctx->done = 1; } continue; @@ -2094,17 +2143,8 @@ ngx_http_grpc_filter(void *data, ssize_t bytes) "grpc trailer done"); if (ctx->end_stream) { - u->length = 0; - - if (ctx->in == NULL - && ctx->out == NULL - && ctx->output_closed - && b->last == b->pos) - { - u->keepalive = 1; - } - - return NGX_OK; + ctx->done = 1; + break; } ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, @@ -2121,6 +2161,10 @@ ngx_http_grpc_filter(void *data, ssize_t bytes) return NGX_ERROR; } + if (rc == NGX_HTTP_PARSE_HEADER_DONE) { + continue; + } + /* rc == NGX_AGAIN */ if (ctx->rest == 0) { @@ -2237,17 +2281,7 @@ ngx_http_grpc_filter(void *data, ssize_t bytes) ctx->state = ngx_http_grpc_st_start; if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) { - u->length = 0; - - if (ctx->in == NULL - && ctx->out == NULL - && ctx->output_closed - && b->last == b->pos) - { - u->keepalive = 1; - } - - break; + ctx->done = 1; } } |