diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/event/quic/ngx_event_quic.c | 1 | ||||
-rw-r--r-- | src/event/quic/ngx_event_quic.h | 4 | ||||
-rw-r--r-- | src/event/quic/ngx_event_quic_connection.h | 3 | ||||
-rw-r--r-- | src/event/quic/ngx_event_quic_frames.c | 4 | ||||
-rw-r--r-- | src/event/quic/ngx_event_quic_streams.c | 445 | ||||
-rw-r--r-- | src/http/v3/ngx_http_v3_uni.c | 2 |
6 files changed, 269 insertions, 190 deletions
diff --git a/src/event/quic/ngx_event_quic.c b/src/event/quic/ngx_event_quic.c index 66f63fe0e..c98f586b7 100644 --- a/src/event/quic/ngx_event_quic.c +++ b/src/event/quic/ngx_event_quic.c @@ -303,6 +303,7 @@ ngx_quic_new_connection(ngx_connection_t *c, ngx_quic_conf_t *conf, ctp->active_connection_id_limit = 2; ngx_queue_init(&qc->streams.uninitialized); + ngx_queue_init(&qc->streams.free); qc->streams.recv_max_data = qc->tp.initial_max_data; qc->streams.recv_window = qc->streams.recv_max_data; diff --git a/src/event/quic/ngx_event_quic.h b/src/event/quic/ngx_event_quic.h index af6b6838f..c2295816a 100644 --- a/src/event/quic/ngx_event_quic.h +++ b/src/event/quic/ngx_event_quic.h @@ -78,12 +78,14 @@ struct ngx_quic_stream_s { uint64_t id; uint64_t acked; uint64_t send_max_data; + uint64_t send_offset; + uint64_t send_final_size; uint64_t recv_max_data; uint64_t recv_offset; uint64_t recv_window; uint64_t recv_last; uint64_t recv_size; - uint64_t final_size; + uint64_t recv_final_size; ngx_chain_t *in; ngx_chain_t *out; ngx_uint_t cancelable; /* unsigned cancelable:1; */ diff --git a/src/event/quic/ngx_event_quic_connection.h b/src/event/quic/ngx_event_quic_connection.h index 173af10d1..2b29284af 100644 --- a/src/event/quic/ngx_event_quic_connection.h +++ b/src/event/quic/ngx_event_quic_connection.h @@ -114,13 +114,16 @@ struct ngx_quic_socket_s { typedef struct { ngx_rbtree_t tree; ngx_rbtree_node_t sentinel; + ngx_queue_t uninitialized; + ngx_queue_t free; uint64_t sent; uint64_t recv_offset; uint64_t recv_window; uint64_t recv_last; uint64_t recv_max_data; + uint64_t send_offset; uint64_t send_max_data; uint64_t server_max_streams_uni; diff --git a/src/event/quic/ngx_event_quic_frames.c b/src/event/quic/ngx_event_quic_frames.c index 4fa6c56c5..188235d9e 100644 --- a/src/event/quic/ngx_event_quic_frames.c +++ b/src/event/quic/ngx_event_quic_frames.c @@ -391,6 +391,10 @@ ngx_quic_split_frame(ngx_connection_t *c, ngx_quic_frame_t *f, size_t len) return NGX_ERROR; } + if (f->type == NGX_QUIC_FT_STREAM) { + f->u.stream.fin = 0; + } + ngx_queue_insert_after(&f->queue, &nf->queue); return NGX_OK; diff --git a/src/event/quic/ngx_event_quic_streams.c b/src/event/quic/ngx_event_quic_streams.c index 8b13f6edc..54ed051ca 100644 --- a/src/event/quic/ngx_event_quic_streams.c +++ b/src/event/quic/ngx_event_quic_streams.c @@ -13,6 +13,8 @@ #define NGX_QUIC_STREAM_GONE (void *) -1 +static ngx_int_t ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, + ngx_uint_t err); static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c); static ngx_int_t ngx_quic_shutdown_stream_recv(ngx_connection_t *c); static ngx_quic_stream_t *ngx_quic_get_stream(ngx_connection_t *c, uint64_t id); @@ -28,11 +30,12 @@ static ssize_t ngx_quic_stream_send(ngx_connection_t *c, u_char *buf, size_t size); static ngx_chain_t *ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit); -static size_t ngx_quic_max_stream_flow(ngx_connection_t *c); +static ngx_int_t ngx_quic_stream_flush(ngx_quic_stream_t *qs); static void ngx_quic_stream_cleanup_handler(void *data); -static ngx_int_t ngx_quic_control_flow(ngx_connection_t *c, uint64_t last); -static ngx_int_t ngx_quic_update_flow(ngx_connection_t *c, uint64_t last); -static ngx_int_t ngx_quic_update_max_stream_data(ngx_connection_t *c); +static ngx_int_t ngx_quic_close_stream(ngx_quic_stream_t *qs); +static ngx_int_t ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last); +static ngx_int_t ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last); +static ngx_int_t ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs); static ngx_int_t ngx_quic_update_max_data(ngx_connection_t *c); static void ngx_quic_set_event(ngx_event_t *ev); @@ -186,15 +189,20 @@ ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc) ns = 0; #endif - for (node = ngx_rbtree_min(tree->root, tree->sentinel); - node; - node = ngx_rbtree_next(tree, node)) - { + node = ngx_rbtree_min(tree->root, tree->sentinel); + + while (node) { qs = (ngx_quic_stream_t *) node; + node = ngx_rbtree_next(tree, node); qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; + if (qs->connection == NULL) { + ngx_quic_close_stream(qs); + continue; + } + ngx_quic_set_event(qs->connection->read); ngx_quic_set_event(qs->connection->write); @@ -213,13 +221,17 @@ ngx_quic_close_streams(ngx_connection_t *c, ngx_quic_connection_t *qc) ngx_int_t ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err) { + return ngx_quic_do_reset_stream(c->quic, err); +} + + +static ngx_int_t +ngx_quic_do_reset_stream(ngx_quic_stream_t *qs, ngx_uint_t err) +{ ngx_connection_t *pc; ngx_quic_frame_t *frame; - ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - qs = c->quic; - if (qs->send_state == NGX_QUIC_STREAM_SEND_DATA_RECVD || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_SENT || qs->send_state == NGX_QUIC_STREAM_SEND_RESET_RECVD) @@ -228,10 +240,14 @@ ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err) } qs->send_state = NGX_QUIC_STREAM_SEND_RESET_SENT; + qs->send_final_size = qs->send_offset; pc = qs->parent; qc = ngx_quic_get_connection(pc); + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, + "quic stream id:0x%xL reset", qs->id); + frame = ngx_quic_alloc_frame(pc); if (frame == NULL) { return NGX_ERROR; @@ -241,10 +257,13 @@ ngx_quic_reset_stream(ngx_connection_t *c, ngx_uint_t err) frame->type = NGX_QUIC_FT_RESET_STREAM; frame->u.reset_stream.id = qs->id; frame->u.reset_stream.error_code = err; - frame->u.reset_stream.final_size = c->sent; + frame->u.reset_stream.final_size = qs->send_offset; ngx_quic_queue_frame(qc, frame); + ngx_quic_free_chain(pc, qs->out); + qs->out = NULL; + return NGX_OK; } @@ -271,10 +290,7 @@ ngx_quic_shutdown_stream(ngx_connection_t *c, int how) static ngx_int_t ngx_quic_shutdown_stream_send(ngx_connection_t *c) { - ngx_connection_t *pc; - ngx_quic_frame_t *frame; - ngx_quic_stream_t *qs; - ngx_quic_connection_t *qc; + ngx_quic_stream_t *qs; qs = c->quic; @@ -284,32 +300,13 @@ ngx_quic_shutdown_stream_send(ngx_connection_t *c) return NGX_OK; } - qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT; - - pc = qs->parent; - qc = ngx_quic_get_connection(pc); - - frame = ngx_quic_alloc_frame(pc); - if (frame == NULL) { - return NGX_ERROR; - } + qs->send_state = NGX_QUIC_STREAM_SEND_SEND; + qs->send_final_size = c->sent; - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0, "quic stream id:0x%xL send shutdown", qs->id); - frame->level = ssl_encryption_application; - frame->type = NGX_QUIC_FT_STREAM; - frame->u.stream.off = 1; - frame->u.stream.len = 1; - frame->u.stream.fin = 1; - - frame->u.stream.stream_id = qs->id; - frame->u.stream.offset = c->sent; - frame->u.stream.length = 0; - - ngx_quic_queue_frame(qc, frame); - - return NGX_OK; + return ngx_quic_stream_flush(qs); } @@ -341,7 +338,7 @@ ngx_quic_shutdown_stream_recv(ngx_connection_t *c) return NGX_ERROR; } - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, "quic stream id:0x%xL recv shutdown", qs->id); frame->level = ssl_encryption_application; @@ -591,6 +588,7 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id) { ngx_log_t *log; ngx_pool_t *pool; + ngx_queue_t *q; ngx_connection_t *sc; ngx_quic_stream_t *qs; ngx_pool_cleanup_t *cln; @@ -601,25 +599,41 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id) qc = ngx_quic_get_connection(c); - pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log); - if (pool == NULL) { - return NULL; - } + if (!ngx_queue_empty(&qc->streams.free)) { + q = ngx_queue_head(&qc->streams.free); + qs = ngx_queue_data(q, ngx_quic_stream_t, queue); + ngx_queue_remove(&qs->queue); - qs = ngx_pcalloc(pool, sizeof(ngx_quic_stream_t)); - if (qs == NULL) { - ngx_destroy_pool(pool); - return NULL; + } else { + /* + * the number of streams is limited by transport + * parameters and application requirements + */ + + qs = ngx_palloc(c->pool, sizeof(ngx_quic_stream_t)); + if (qs == NULL) { + return NULL; + } } + ngx_memzero(qs, sizeof(ngx_quic_stream_t)); + qs->node.key = id; qs->parent = c; qs->id = id; - qs->final_size = (uint64_t) -1; + qs->send_final_size = (uint64_t) -1; + qs->recv_final_size = (uint64_t) -1; + + pool = ngx_create_pool(NGX_DEFAULT_POOL_SIZE, c->log); + if (pool == NULL) { + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); + return NULL; + } log = ngx_palloc(pool, sizeof(ngx_log_t)); if (log == NULL) { ngx_destroy_pool(pool); + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); return NULL; } @@ -629,6 +643,7 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id) sc = ngx_get_connection(c->fd, log); if (sc == NULL) { ngx_destroy_pool(pool); + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); return NULL; } @@ -697,6 +712,7 @@ ngx_quic_create_stream(ngx_connection_t *c, uint64_t id) if (cln == NULL) { ngx_close_connection(sc); ngx_destroy_pool(pool); + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); return NULL; } @@ -737,7 +753,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) return NGX_ERROR; } - ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, "quic stream id:0x%xL recv buf:%uz", qs->id, size); if (size == 0) { @@ -763,7 +779,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) rev->ready = 0; if (qs->recv_state == NGX_QUIC_STREAM_RECV_DATA_RECVD - && qs->recv_offset == qs->final_size) + && qs->recv_offset == qs->recv_final_size) { qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_READ; } @@ -781,7 +797,7 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size) ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic stream id:0x%xL recv len:%z", qs->id, len); - if (ngx_quic_update_flow(c, qs->recv_offset + len) != NGX_OK) { + if (ngx_quic_update_flow(qs, qs->recv_offset + len) != NGX_OK) { return NGX_ERROR; } @@ -822,9 +838,7 @@ ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) off_t flow; size_t n; ngx_event_t *wev; - ngx_chain_t *out; ngx_connection_t *pc; - ngx_quic_frame_t *frame; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; @@ -842,7 +856,8 @@ ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) qs->send_state = NGX_QUIC_STREAM_SEND_SEND; - flow = ngx_quic_max_stream_flow(c); + flow = qs->acked + qc->conf->stream_buffer_size - c->sent; + if (flow == 0) { wev->ready = 0; return in; @@ -852,37 +867,15 @@ ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) limit = flow; } - in = ngx_quic_write_chain(pc, &qs->out, in, limit, 0, &n); + in = ngx_quic_write_chain(pc, &qs->out, in, limit, + c->sent - qs->send_offset, &n); if (in == NGX_CHAIN_ERROR) { return NGX_CHAIN_ERROR; } - out = ngx_quic_read_chain(pc, &qs->out, n); - if (out == NGX_CHAIN_ERROR) { - return NGX_CHAIN_ERROR; - } - - frame = ngx_quic_alloc_frame(pc); - if (frame == NULL) { - return NGX_CHAIN_ERROR; - } - - frame->level = ssl_encryption_application; - frame->type = NGX_QUIC_FT_STREAM; - frame->data = out; - frame->u.stream.off = 1; - frame->u.stream.len = 1; - frame->u.stream.fin = 0; - - frame->u.stream.stream_id = qs->id; - frame->u.stream.offset = c->sent; - frame->u.stream.length = n; - c->sent += n; qc->streams.sent += n; - ngx_quic_queue_frame(qc, frame); - if (flow == (off_t) n) { wev->ready = 0; } @@ -890,61 +883,96 @@ ngx_quic_stream_send_chain(ngx_connection_t *c, ngx_chain_t *in, off_t limit) ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic send_chain sent:%uz", n); + if (ngx_quic_stream_flush(qs) != NGX_OK) { + return NGX_CHAIN_ERROR; + } + return in; } -static size_t -ngx_quic_max_stream_flow(ngx_connection_t *c) +static ngx_int_t +ngx_quic_stream_flush(ngx_quic_stream_t *qs) { - size_t size; - uint64_t sent, unacked; - ngx_quic_stream_t *qs; + off_t limit; + size_t len; + ngx_uint_t last; + ngx_chain_t *out, *cl; + ngx_quic_frame_t *frame; + ngx_connection_t *pc; ngx_quic_connection_t *qc; - qs = c->quic; - qc = ngx_quic_get_connection(qs->parent); + if (qs->send_state != NGX_QUIC_STREAM_SEND_SEND) { + return NGX_OK; + } - size = qc->conf->stream_buffer_size; - sent = c->sent; - unacked = sent - qs->acked; + pc = qs->parent; + qc = ngx_quic_get_connection(pc); if (qc->streams.send_max_data == 0) { qc->streams.send_max_data = qc->ctp.initial_max_data; } - if (unacked >= size) { - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic send flow hit buffer size"); - return 0; + limit = ngx_min(qc->streams.send_max_data - qc->streams.send_offset, + qs->send_max_data - qs->send_offset); + + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, + "quic stream id:0x%xL flush limit:%O", qs->id, limit); + + out = ngx_quic_read_chain(pc, &qs->out, limit); + if (out == NGX_CHAIN_ERROR) { + return NGX_ERROR; } - size -= unacked; + len = 0; + last = 0; - if (qc->streams.sent >= qc->streams.send_max_data) { - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic send flow hit MAX_DATA"); - return 0; + for (cl = out; cl; cl = cl->next) { + len += cl->buf->last - cl->buf->pos; } - if (qc->streams.sent + size > qc->streams.send_max_data) { - size = qc->streams.send_max_data - qc->streams.sent; + if (qs->send_final_size != (uint64_t) -1 + && qs->send_final_size == qs->send_offset + len) + { + qs->send_state = NGX_QUIC_STREAM_SEND_DATA_SENT; + last = 1; } - if (sent >= qs->send_max_data) { - ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic send flow hit MAX_STREAM_DATA"); - return 0; + if (len == 0 && !last) { + return NGX_OK; } - if (sent + size > qs->send_max_data) { - size = qs->send_max_data - sent; + frame = ngx_quic_alloc_frame(pc); + if (frame == NULL) { + return NGX_ERROR; } - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic send flow:%uz", size); + frame->level = ssl_encryption_application; + frame->type = NGX_QUIC_FT_STREAM; + frame->data = out; + + frame->u.stream.off = 1; + frame->u.stream.len = 1; + frame->u.stream.fin = last; + + frame->u.stream.stream_id = qs->id; + frame->u.stream.offset = qs->send_offset; + frame->u.stream.length = len; + + ngx_quic_queue_frame(qc, frame); + + qs->send_offset += len; + qc->streams.send_offset += len; - return size; + ngx_log_debug3(NGX_LOG_DEBUG_EVENT, pc->log, 0, + "quic stream id:0x%xL flush len:%uz last:%ui", + qs->id, len, last); + + if (qs->connection == NULL) { + return ngx_quic_close_stream(qs); + } + + return NGX_OK; } @@ -953,40 +981,67 @@ ngx_quic_stream_cleanup_handler(void *data) { ngx_connection_t *c = data; + ngx_quic_stream_t *qs; + + qs = c->quic; + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, qs->parent->log, 0, + "quic stream id:0x%xL cleanup", qs->id); + + if (ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN) != NGX_OK) { + ngx_quic_close_connection(c, NGX_ERROR); + return; + } + + qs->connection = NULL; + + if (ngx_quic_close_stream(qs) != NGX_OK) { + ngx_quic_close_connection(c, NGX_ERROR); + return; + } +} + + +static ngx_int_t +ngx_quic_close_stream(ngx_quic_stream_t *qs) +{ ngx_connection_t *pc; ngx_quic_frame_t *frame; - ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - qs = c->quic; pc = qs->parent; qc = ngx_quic_get_connection(pc); - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic stream id:0x%xL cleanup", qs->id); + if (!qc->closing) { + /* make sure everything is sent and final size is received */ + + if (qs->recv_state == NGX_QUIC_STREAM_RECV_RECV + || qs->send_state == NGX_QUIC_STREAM_SEND_READY + || qs->send_state == NGX_QUIC_STREAM_SEND_SEND) + { + return NGX_OK; + } + } + + ngx_log_debug1(NGX_LOG_DEBUG_EVENT, pc->log, 0, + "quic stream id:0x%xL close", qs->id); - ngx_rbtree_delete(&qc->streams.tree, &qs->node); ngx_quic_free_chain(pc, qs->in); ngx_quic_free_chain(pc, qs->out); + ngx_rbtree_delete(&qc->streams.tree, &qs->node); + ngx_queue_insert_tail(&qc->streams.free, &qs->queue); + if (qc->closing) { /* schedule handler call to continue ngx_quic_close_connection() */ ngx_post_event(pc->read, &ngx_posted_events); - return; - } - - if (qc->error) { - goto done; + return NGX_OK; } - (void) ngx_quic_shutdown_stream(c, NGX_RDWR_SHUTDOWN); - - (void) ngx_quic_update_flow(c, qs->recv_last); - if ((qs->id & NGX_QUIC_STREAM_SERVER_INITIATED) == 0) { frame = ngx_quic_alloc_frame(pc); if (frame == NULL) { - goto done; + return NGX_ERROR; } frame->level = ssl_encryption_application; @@ -1004,13 +1059,11 @@ ngx_quic_stream_cleanup_handler(void *data) ngx_quic_queue_frame(qc, frame); } -done: - - (void) ngx_quic_output(pc); - if (qc->shutdown) { ngx_post_event(pc->read, &ngx_posted_events); } + + return NGX_OK; } @@ -1020,7 +1073,6 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, { size_t size; uint64_t last; - ngx_connection_t *sc; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; ngx_quic_stream_frame_t *f; @@ -1048,19 +1100,17 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, return NGX_OK; } - sc = qs->connection; - if (qs->recv_state != NGX_QUIC_STREAM_RECV_RECV && qs->recv_state != NGX_QUIC_STREAM_RECV_SIZE_KNOWN) { return NGX_OK; } - if (ngx_quic_control_flow(sc, last) != NGX_OK) { + if (ngx_quic_control_flow(qs, last) != NGX_OK) { return NGX_ERROR; } - if (qs->final_size != (uint64_t) -1 && last > qs->final_size) { + if (qs->recv_final_size != (uint64_t) -1 && last > qs->recv_final_size) { qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; return NGX_ERROR; } @@ -1075,7 +1125,8 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, } if (f->fin) { - if (qs->final_size != (uint64_t) -1 && qs->final_size != last) { + if (qs->recv_final_size != (uint64_t) -1 && qs->recv_final_size != last) + { qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; return NGX_ERROR; } @@ -1085,7 +1136,7 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, return NGX_ERROR; } - qs->final_size = last; + qs->recv_final_size = last; qs->recv_state = NGX_QUIC_STREAM_RECV_SIZE_KNOWN; } @@ -1099,13 +1150,17 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, qs->recv_size += size; if (qs->recv_state == NGX_QUIC_STREAM_RECV_SIZE_KNOWN - && qs->recv_size == qs->final_size) + && qs->recv_size == qs->recv_final_size) { qs->recv_state = NGX_QUIC_STREAM_RECV_DATA_RECVD; } + if (qs->connection == NULL) { + return ngx_quic_close_stream(qs); + } + if (f->offset == qs->recv_offset) { - ngx_quic_set_event(sc->read); + ngx_quic_set_event(qs->connection->read); } return NGX_OK; @@ -1128,20 +1183,26 @@ ngx_quic_handle_max_data_frame(ngx_connection_t *c, return NGX_OK; } - if (tree->root != tree->sentinel - && qc->streams.sent >= qc->streams.send_max_data) + if (tree->root == tree->sentinel + || qc->streams.send_offset < qc->streams.send_max_data) { - - for (node = ngx_rbtree_min(tree->root, tree->sentinel); - node; - node = ngx_rbtree_next(tree, node)) - { - qs = (ngx_quic_stream_t *) node; - ngx_quic_set_event(qs->connection->write); - } + /* not blocked on MAX_DATA */ + qc->streams.send_max_data = f->max_data; + return NGX_OK; } qc->streams.send_max_data = f->max_data; + node = ngx_rbtree_min(tree->root, tree->sentinel); + + while (node && qc->streams.send_offset < qc->streams.send_max_data) { + + qs = (ngx_quic_stream_t *) node; + node = ngx_rbtree_next(tree, node); + + if (ngx_quic_stream_flush(qs) != NGX_OK) { + return NGX_ERROR; + } + } return NGX_OK; } @@ -1189,7 +1250,7 @@ ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c, return NGX_OK; } - return ngx_quic_update_max_stream_data(qs->connection); + return ngx_quic_update_max_stream_data(qs); } @@ -1197,7 +1258,6 @@ ngx_int_t ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, ngx_quic_max_stream_data_frame_t *f) { - uint64_t sent; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; @@ -1224,15 +1284,15 @@ ngx_quic_handle_max_stream_data_frame(ngx_connection_t *c, return NGX_OK; } - sent = qs->connection->sent; - - if (sent >= qs->send_max_data) { - ngx_quic_set_event(qs->connection->write); + if (qs->send_offset < qs->send_max_data) { + /* not blocked on MAX_STREAM_DATA */ + qs->send_max_data = f->limit; + return NGX_OK; } qs->send_max_data = f->limit; - return NGX_OK; + return ngx_quic_stream_flush(qs); } @@ -1240,7 +1300,6 @@ ngx_int_t ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, ngx_quic_header_t *pkt, ngx_quic_reset_stream_frame_t *f) { - ngx_connection_t *sc; ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; @@ -1271,13 +1330,13 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, qs->recv_state = NGX_QUIC_STREAM_RECV_RESET_RECVD; - sc = qs->connection; - - if (ngx_quic_control_flow(sc, f->final_size) != NGX_OK) { + if (ngx_quic_control_flow(qs, f->final_size) != NGX_OK) { return NGX_ERROR; } - if (qs->final_size != (uint64_t) -1 && qs->final_size != f->final_size) { + if (qs->recv_final_size != (uint64_t) -1 + && qs->recv_final_size != f->final_size) + { qc->error = NGX_QUIC_ERR_FINAL_SIZE_ERROR; return NGX_ERROR; } @@ -1287,12 +1346,16 @@ ngx_quic_handle_reset_stream_frame(ngx_connection_t *c, return NGX_ERROR; } - qs->final_size = f->final_size; + qs->recv_final_size = f->final_size; - if (ngx_quic_update_flow(sc, qs->final_size) != NGX_OK) { + if (ngx_quic_update_flow(qs, qs->recv_final_size) != NGX_OK) { return NGX_ERROR; } + if (qs->connection == NULL) { + return ngx_quic_close_stream(qs); + } + ngx_quic_set_event(qs->connection->read); return NGX_OK; @@ -1325,10 +1388,14 @@ ngx_quic_handle_stop_sending_frame(ngx_connection_t *c, return NGX_OK; } - if (ngx_quic_reset_stream(qs->connection, f->error_code) != NGX_OK) { + if (ngx_quic_do_reset_stream(qs, f->error_code) != NGX_OK) { return NGX_ERROR; } + if (qs->connection == NULL) { + return ngx_quic_close_stream(qs); + } + ngx_quic_set_event(qs->connection->write); return NGX_OK; @@ -1378,30 +1445,37 @@ ngx_quic_handle_stream_ack(ngx_connection_t *c, ngx_quic_frame_t *f) return; } + if (qs->connection == NULL) { + qs->acked += f->u.stream.length; + return; + } + sent = qs->connection->sent; unacked = sent - qs->acked; + qs->acked += f->u.stream.length; - if (unacked >= qc->conf->stream_buffer_size) { - ngx_quic_set_event(qs->connection->write); - } + ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, + "quic stream id:0x%xL ack len:%uL acked:%uL unacked:%uL", + qs->id, f->u.stream.length, qs->acked, sent - qs->acked); - qs->acked += f->u.stream.length; + if (unacked != qc->conf->stream_buffer_size) { + /* not blocked on buffer size */ + return; + } - ngx_log_debug3(NGX_LOG_DEBUG_EVENT, qs->connection->log, 0, - "quic stream ack len:%uL acked:%uL unacked:%uL", - f->u.stream.length, qs->acked, sent - qs->acked); + ngx_quic_set_event(qs->connection->write); } static ngx_int_t -ngx_quic_control_flow(ngx_connection_t *c, uint64_t last) +ngx_quic_control_flow(ngx_quic_stream_t *qs, uint64_t last) { uint64_t len; - ngx_quic_stream_t *qs; + ngx_connection_t *pc; ngx_quic_connection_t *qc; - qs = c->quic; - qc = ngx_quic_get_connection(qs->parent); + pc = qs->parent; + qc = ngx_quic_get_connection(pc); if (last <= qs->recv_last) { return NGX_OK; @@ -1409,9 +1483,9 @@ ngx_quic_control_flow(ngx_connection_t *c, uint64_t last) len = last - qs->recv_last; - ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic flow control msd:%uL/%uL md:%uL/%uL", - last, qs->recv_max_data, qc->streams.recv_last + len, + ngx_log_debug5(NGX_LOG_DEBUG_EVENT, pc->log, 0, + "quic stream id:0x%xL flow control msd:%uL/%uL md:%uL/%uL", + qs->id, last, qs->recv_max_data, qc->streams.recv_last + len, qc->streams.recv_max_data); qs->recv_last += len; @@ -1435,14 +1509,12 @@ ngx_quic_control_flow(ngx_connection_t *c, uint64_t last) static ngx_int_t -ngx_quic_update_flow(ngx_connection_t *c, uint64_t last) +ngx_quic_update_flow(ngx_quic_stream_t *qs, uint64_t last) { uint64_t len; ngx_connection_t *pc; - ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - qs = c->quic; pc = qs->parent; qc = ngx_quic_get_connection(pc); @@ -1452,13 +1524,13 @@ ngx_quic_update_flow(ngx_connection_t *c, uint64_t last) len = last - qs->recv_offset; - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic flow update %uL", last); + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, + "quic stream id:0x%xL flow update %uL", qs->id, last); qs->recv_offset += len; if (qs->recv_max_data <= qs->recv_offset + qs->recv_window / 2) { - if (ngx_quic_update_max_stream_data(c) != NGX_OK) { + if (ngx_quic_update_max_stream_data(qs) != NGX_OK) { return NGX_ERROR; } } @@ -1478,15 +1550,13 @@ ngx_quic_update_flow(ngx_connection_t *c, uint64_t last) static ngx_int_t -ngx_quic_update_max_stream_data(ngx_connection_t *c) +ngx_quic_update_max_stream_data(ngx_quic_stream_t *qs) { uint64_t recv_max_data; ngx_connection_t *pc; ngx_quic_frame_t *frame; - ngx_quic_stream_t *qs; ngx_quic_connection_t *qc; - qs = c->quic; pc = qs->parent; qc = ngx_quic_get_connection(pc); @@ -1502,8 +1572,9 @@ ngx_quic_update_max_stream_data(ngx_connection_t *c) qs->recv_max_data = recv_max_data; - ngx_log_debug1(NGX_LOG_DEBUG_EVENT, c->log, 0, - "quic flow update msd:%uL", qs->recv_max_data); + ngx_log_debug2(NGX_LOG_DEBUG_EVENT, pc->log, 0, + "quic stream id:0x%xL flow update msd:%uL", + qs->id, qs->recv_max_data); frame = ngx_quic_alloc_frame(pc); if (frame == NULL) { diff --git a/src/http/v3/ngx_http_v3_uni.c b/src/http/v3/ngx_http_v3_uni.c index bd7eb278b..96b7d7ebf 100644 --- a/src/http/v3/ngx_http_v3_uni.c +++ b/src/http/v3/ngx_http_v3_uni.c @@ -295,8 +295,6 @@ ngx_http_v3_uni_dummy_write_handler(ngx_event_t *wev) } -/* XXX async & buffered stream writes */ - ngx_connection_t * ngx_http_v3_create_push_stream(ngx_connection_t *c, uint64_t push_id) { |