aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/http/modules/ngx_http_grpc_module.c100
1 files changed, 67 insertions, 33 deletions
diff --git a/src/http/modules/ngx_http_grpc_module.c b/src/http/modules/ngx_http_grpc_module.c
index 7fbf7366f..3a9b950b5 100644
--- a/src/http/modules/ngx_http_grpc_module.c
+++ b/src/http/modules/ngx_http_grpc_module.c
@@ -111,6 +111,7 @@ typedef struct {
unsigned output_closed:1;
unsigned parsing_headers:1;
unsigned end_stream:1;
+ unsigned done:1;
unsigned status:1;
ngx_http_request_t *request;
@@ -1074,6 +1075,7 @@ ngx_http_grpc_reinit_request(ngx_http_request_t *r)
ctx->output_closed = 0;
ctx->parsing_headers = 0;
ctx->end_stream = 0;
+ ctx->done = 0;
ctx->status = 0;
ctx->connection = NULL;
@@ -1093,6 +1095,7 @@ ngx_http_grpc_body_output_filter(void *data, ngx_chain_t *in)
ngx_int_t rc;
ngx_uint_t next, last;
ngx_chain_t *cl, *out, **ll;
+ ngx_http_upstream_t *u;
ngx_http_grpc_ctx_t *ctx;
ngx_http_grpc_frame_t *f;
@@ -1407,6 +1410,28 @@ ngx_http_grpc_body_output_filter(void *data, ngx_chain_t *in)
rc = NGX_AGAIN;
}
+ if (ctx->done) {
+
+ /*
+ * We have already got the response and were sending some additional
+ * control frames. Even if there is still something unsent, stop
+ * here anyway.
+ */
+
+ u = r->upstream;
+ u->length = 0;
+
+ if (ctx->in == NULL
+ && ctx->out == NULL
+ && ctx->output_closed
+ && ctx->state == ngx_http_grpc_st_start)
+ {
+ u->keepalive = 1;
+ }
+
+ ngx_post_event(u->peer.connection->read, &ngx_posted_events);
+ }
+
return rc;
}
@@ -1832,6 +1857,33 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
rc = ngx_http_grpc_parse_frame(r, ctx, b);
if (rc == NGX_AGAIN) {
+
+ if (ctx->done) {
+
+ /*
+ * We have finished parsing the response and the
+ * remaining control frames. If there are unsent
+ * control frames, post a write event to send them.
+ */
+
+ if (ctx->out) {
+ ngx_post_event(u->peer.connection->write,
+ &ngx_posted_events);
+ return NGX_AGAIN;
+ }
+
+ u->length = 0;
+
+ if (ctx->in == NULL
+ && ctx->output_closed
+ && ctx->state == ngx_http_grpc_st_start)
+ {
+ u->keepalive = 1;
+ }
+
+ break;
+ }
+
return NGX_AGAIN;
}
@@ -1898,6 +1950,13 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
return NGX_ERROR;
}
+ if (ctx->stream_id && ctx->done) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "upstream sent frame for closed stream %ui",
+ ctx->stream_id);
+ return NGX_ERROR;
+ }
+
ctx->padding = 0;
}
@@ -1914,17 +1973,7 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
ctx->state = ngx_http_grpc_st_start;
if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
- u->length = 0;
-
- if (ctx->in == NULL
- && ctx->out == NULL
- && ctx->output_closed
- && b->last == b->pos)
- {
- u->keepalive = 1;
- }
-
- break;
+ ctx->done = 1;
}
continue;
@@ -2094,17 +2143,8 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
"grpc trailer done");
if (ctx->end_stream) {
- u->length = 0;
-
- if (ctx->in == NULL
- && ctx->out == NULL
- && ctx->output_closed
- && b->last == b->pos)
- {
- u->keepalive = 1;
- }
-
- return NGX_OK;
+ ctx->done = 1;
+ break;
}
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
@@ -2121,6 +2161,10 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
return NGX_ERROR;
}
+ if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
+ continue;
+ }
+
/* rc == NGX_AGAIN */
if (ctx->rest == 0) {
@@ -2237,17 +2281,7 @@ ngx_http_grpc_filter(void *data, ssize_t bytes)
ctx->state = ngx_http_grpc_st_start;
if (ctx->flags & NGX_HTTP_V2_END_STREAM_FLAG) {
- u->length = 0;
-
- if (ctx->in == NULL
- && ctx->out == NULL
- && ctx->output_closed
- && b->last == b->pos)
- {
- u->keepalive = 1;
- }
-
- break;
+ ctx->done = 1;
}
}