aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorRoman Arutyunyan <arut@nginx.com>2021-06-07 10:12:46 +0300
committerRoman Arutyunyan <arut@nginx.com>2021-06-07 10:12:46 +0300
commit64586eaa36f1dbc5e21e9007a372c6fb049a6986 (patch)
tree353897ab7fcf961452dcdae91a4e4aece03665d3 /src
parentdcdf62549f25f030b6cf518b9adb3d2a84313ea5 (diff)
downloadnginx-64586eaa36f1dbc5e21e9007a372c6fb049a6986.tar.gz
nginx-64586eaa36f1dbc5e21e9007a372c6fb049a6986.zip
QUIC: stream flow control refactored.
- Function ngx_quic_control_flow() is introduced. This functions does both MAX_DATA and MAX_STREAM_DATA flow controls. The function is called from STREAM and RESET_STREAM frame handlers. Previously, flow control was only accounted for STREAM. Also, MAX_DATA flow control was not accounted at all. - Function ngx_quic_update_flow() is introduced. This function advances flow control windows and sends MAX_DATA/MAX_STREAM_DATA. The function is called from RESET_STREAM frame handler, stream cleanup handler and stream recv() handler.
Diffstat (limited to 'src')
-rw-r--r--src/event/quic/ngx_event_quic.c1
-rw-r--r--src/event/quic/ngx_event_quic.h1
-rw-r--r--src/event/quic/ngx_event_quic_connection.h4
-rw-r--r--src/event/quic/ngx_event_quic_streams.c237
4 files changed, 179 insertions, 64 deletions
diff --git a/src/event/quic/ngx_event_quic.c b/src/event/quic/ngx_event_quic.c
index cc83df0ce..eaaed924d 100644
--- a/src/event/quic/ngx_event_quic.c
+++ b/src/event/quic/ngx_event_quic.c
@@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t *c, ngx_quic_conf_t *conf,
ctp->active_connection_id_limit = 2;
qc->streams.recv_max_data = qc->tp.initial_max_data;
+ qc->streams.recv_window = qc->streams.recv_max_data;
qc->streams.client_max_streams_uni = qc->tp.initial_max_streams_uni;
qc->streams.client_max_streams_bidi = qc->tp.initial_max_streams_bidi;
diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h
index 6d4308afa..fe0f7fef3 100644
--- a/src/event/quic/ngx_event_quic.h
+++ b/src/event/quic/ngx_event_quic.h
@@ -75,6 +75,7 @@ struct ngx_quic_stream_s {
uint64_t send_max_data;
uint64_t recv_max_data;
uint64_t recv_offset;
+ uint64_t recv_window;
uint64_t recv_last;
uint64_t final_size;
ngx_chain_t *in;
diff --git a/src/event/quic/ngx_event_quic_connection.h b/src/event/quic/ngx_event_quic_connection.h
index 784378647..ef8c1dacc 100644
--- a/src/event/quic/ngx_event_quic_connection.h
+++ b/src/event/quic/ngx_event_quic_connection.h
@@ -115,8 +115,10 @@ typedef struct {
ngx_rbtree_t tree;
ngx_rbtree_node_t sentinel;
- uint64_t received;
uint64_t sent;
+ uint64_t recv_offset;
+ uint64_t recv_window;
+ uint64_t recv_last;
uint64_t recv_max_data;
uint64_t send_max_data;
diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c
index 192300e09..c6f02a37f 100644
--- a/src/event/quic/ngx_event_quic_streams.c
+++ b/src/event/quic/ngx_event_quic_streams.c
@@ -25,6 +25,8 @@ static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c,
ngx_chain_t *in, off_t limit);
static size_t ngx_quic_max_stream_flow(ngx_connection_t *c);
static void ngx_quic_stream_cleanup_handler(void *data);
+static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last);
+static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last);
ngx_connection_t *
@@ -413,6 +415,8 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
}
}
+ qs->recv_window = qs->recv_max_data;
+
cln = ngx_pool_cleanup_add(pool, 0);
if (cln == NULL) {
ngx_close_connection(sc);
@@ -432,18 +436,15 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id)
static ssize_t
ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
{
- ssize_t len, n;
- ngx_buf_t *b;
- ngx_chain_t *cl, **ll;
- ngx_event_t *rev;
- ngx_connection_t *pc;
- ngx_quic_frame_t *frame;
- ngx_quic_stream_t *qs;
- ngx_quic_connection_t *qc;
+ ssize_t len, n;
+ ngx_buf_t *b;
+ ngx_chain_t *cl, **ll;
+ ngx_event_t *rev;
+ ngx_connection_t *pc;
+ ngx_quic_stream_t *qs;
qs = c->quic;
pc = qs->parent;
- qc = ngx_quic_get_connection(pc);
rev = c->read;
if (rev->error) {
@@ -495,10 +496,6 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
ngx_quic_free_bufs(pc, cl);
- qc->streams.received += len;
- qs->recv_max_data += len;
- qs->recv_offset += len;
-
if (qs->in == NULL) {
rev->ready = rev->pending_eof;
}
@@ -506,39 +503,8 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic stream id:0x%xL recv len:%z", qs->id, len);
- if (!rev->pending_eof) {
- frame = ngx_quic_alloc_frame(pc);
- if (frame == NULL) {
- return NGX_ERROR;
- }
-
- frame->level = ssl_encryption_application;
- frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
- frame->u.max_stream_data.id = qs->id;
- frame->u.max_stream_data.limit = qs->recv_max_data;
-
- ngx_quic_queue_frame(qc, frame);
- }
-
- if ((qc->streams.recv_max_data / 2) < qc->streams.received) {
-
- frame = ngx_quic_alloc_frame(pc);
-
- if (frame == NULL) {
- return NGX_ERROR;
- }
-
- qc->streams.recv_max_data *= 2;
-
- frame->level = ssl_encryption_application;
- frame->type = NGX_QUIC_FT_MAX_DATA;
- frame->u.max_data.max_data = qc->streams.recv_max_data;
-
- ngx_quic_queue_frame(qc, frame);
-
- ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic stream id:0x%xL recv: increased max_data:%uL",
- qs->id, qc->streams.recv_max_data);
+ if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) {
+ return NGX_ERROR;
}
return len;
@@ -729,6 +695,10 @@ ngx_quic_stream_cleanup_handler(void *data)
goto done;
}
+ c->read->pending_eof = 1;
+
+ (void) ngx_quic_update_flow(c, qs->recv_last);
+
if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0
|| (qs->id & NGX_QUIC_STREAM_UNIDIRECTIONAL) == 0)
{
@@ -848,8 +818,7 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
sc = qs->connection;
- if (last > qs->recv_max_data) {
- qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
+ if (ngx_quic_control_flow(sc, last) != NGX_OK) {
goto cleanup;
}
@@ -858,8 +827,6 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
qs->final_size = last;
}
- qs->recv_last = last;
-
if (f->offset == 0) {
sc->read->ready = 1;
}
@@ -873,8 +840,15 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
return NGX_OK;
}
- if (last > qs->recv_max_data) {
- qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
+ sc = qs->connection;
+
+ rev = sc->read;
+
+ if (rev->error) {
+ return NGX_OK;
+ }
+
+ if (ngx_quic_control_flow(sc, last) != NGX_OK) {
return NGX_ERROR;
}
@@ -887,17 +861,11 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt,
return NGX_OK;
}
- if (qs->recv_last < last) {
- qs->recv_last = last;
- }
-
if (f->offset < qs->recv_offset) {
ngx_quic_trim_bufs(frame->data, qs->recv_offset - f->offset);
f->offset = qs->recv_offset;
}
- rev = qs->connection->read;
-
if (f->fin) {
if (qs->final_size != (uint64_t) -1 && qs->final_size != last) {
qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
@@ -1108,6 +1076,7 @@ ngx_int_t
ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f)
{
+ ngx_pool_t *pool;
ngx_event_t *rev;
ngx_connection_t *sc;
ngx_quic_stream_t *qs;
@@ -1135,19 +1104,37 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
return NGX_OK;
}
- qs->final_size = f->final_size;
-
sc = qs->connection;
rev = sc->read;
rev->error = 1;
rev->ready = 1;
+ if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
+ goto cleanup;
+ }
+
+ qs->final_size = f->final_size;
+
+ if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
+ goto cleanup;
+ }
+
sc->listening->handler(sc);
return NGX_OK;
}
+ sc = qs->connection;
+
+ rev = sc->read;
+ rev->error = 1;
+ rev->ready = 1;
+
+ if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) {
qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR;
return NGX_ERROR;
@@ -1160,15 +1147,24 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c,
qs->final_size = f->final_size;
- rev = qs->connection->read;
- rev->error = 1;
- rev->ready = 1;
+ if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) {
+ return NGX_ERROR;
+ }
if (rev->active) {
rev->handler(rev);
}
return NGX_OK;
+
+cleanup:
+
+ pool = sc->pool;
+
+ ngx_close_connection(sc);
+ ngx_destroy_pool(pool);
+
+ return NGX_ERROR;
}
@@ -1285,3 +1281,118 @@ ngx_quic_handle_stream_ack(ngx_connection_t *c, ngx_quic_frame_t *f)
"quic stream ack len:%uL acked:%uL unacked:%uL",
f->u.stream.length, qs->acked, sent - qs->acked);
}
+
+
+static ngx_int_t
+ngx_quic_control_flow(ngx_connection_t *c, uint64_t last)
+{
+ uint64_t len;
+ ngx_event_t *rev;
+ ngx_quic_stream_t *qs;
+ ngx_quic_connection_t *qc;
+
+ rev = c->read;
+ qs = c->quic;
+ qc = ngx_quic_get_connection(qs->parent);
+
+ if (last <= qs->recv_last) {
+ return NGX_OK;
+ }
+
+ len = last - qs->recv_last;
+
+ ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "quic flow control msd:%uL/%uL md:%uL/%uL",
+ last, qs->recv_max_data, qc->streams.recv_last + len,
+ qc->streams.recv_max_data);
+
+ qs->recv_last += len;
+
+ if (!rev->error && qs->recv_last > qs->recv_max_data) {
+ qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
+ return NGX_ERROR;
+ }
+
+ qc->streams.recv_last += len;
+
+ if (qc->streams.recv_last > qc->streams.recv_max_data) {
+ qc->error = NGX_QUIC_ERR_FLOW_CONTROL_ERROR;
+ return NGX_ERROR;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_quic_update_flow(ngx_connection_t *c, uint64_t last)
+{
+ uint64_t len;
+ ngx_event_t *rev;
+ ngx_connection_t *pc;
+ ngx_quic_frame_t *frame;
+ ngx_quic_stream_t *qs;
+ ngx_quic_connection_t *qc;
+
+ rev = c->read;
+ qs = c->quic;
+ pc = qs->parent;
+ qc = ngx_quic_get_connection(pc);
+
+ if (last <= qs->recv_offset) {
+ return NGX_OK;
+ }
+
+ len = last - qs->recv_offset;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "quic flow update %uL", last);
+
+ qs->recv_offset += len;
+
+ if (!rev->pending_eof && !rev->error
+ && qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2)
+ {
+ qs->recv_max_data = qs->recv_offset + qs->recv_window;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0,
+ "quic flow update msd:%uL", qs->recv_max_data);
+
+ frame = ngx_quic_alloc_frame(pc);
+ if (frame == NULL) {
+ return NGX_ERROR;
+ }
+
+ frame->level = ssl_encryption_application;
+ frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
+ frame->u.max_stream_data.id = qs->id;
+ frame->u.max_stream_data.limit = qs->recv_max_data;
+
+ ngx_quic_queue_frame(qc, frame);
+ }
+
+ qc->streams.recv_offset += len;
+
+ if (qc->streams.recv_max_data
+ <= qc->streams.recv_offset + qc->streams.recv_window / 2)
+ {
+ qc->streams.recv_max_data = qc->streams.recv_offset
+ + qc->streams.recv_window;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0,
+ "quic flow update md:%uL", qc->streams.recv_max_data);
+
+ frame = ngx_quic_alloc_frame(pc);
+ if (frame == NULL) {
+ return NGX_ERROR;
+ }
+
+ frame->level = ssl_encryption_application;
+ frame->type = NGX_QUIC_FT_MAX_DATA;
+ frame->u.max_data.max_data = qc->streams.recv_max_data;
+
+ ngx_quic_queue_frame(qc, frame);
+ }
+
+ return NGX_OK;
+}