aboutsummaryrefslogtreecommitdiff
path: root/src/event/ngx_event_quic.c
diff options
context:
space:
mode:
authorRoman Arutyunyan <arut@nginx.com>2020-03-23 15:49:31 +0300
committerRoman Arutyunyan <arut@nginx.com>2020-03-23 15:49:31 +0300
commit3fa1dec9c71617d78322226b5c7e19faa6d35c1f (patch)
treed240eb483fbb050c169adeae5e72cd1e5ce302be /src/event/ngx_event_quic.c
parent72b0a1b32a8a9527fccf1d71ee1fc54859cffd4c (diff)
downloadnginx-3fa1dec9c71617d78322226b5c7e19faa6d35c1f.tar.gz
nginx-3fa1dec9c71617d78322226b5c7e19faa6d35c1f.zip
Better flow control and buffering for QUIC streams.
Diffstat (limited to 'src/event/ngx_event_quic.c')
-rw-r--r--src/event/ngx_event_quic.c95
1 files changed, 86 insertions, 9 deletions
diff --git a/src/event/ngx_event_quic.c b/src/event/ngx_event_quic.c
index 51668dd69..4afa86bb0 100644
--- a/src/event/ngx_event_quic.c
+++ b/src/event/ngx_event_quic.c
@@ -16,6 +16,9 @@ typedef enum {
} ngx_quic_state_t;
+#define NGX_QUIC_STREAM_BUFSIZE 16384
+
+
typedef struct {
ngx_rbtree_node_t node;
ngx_buf_t *b;
@@ -106,6 +109,8 @@ static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame);
static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
+static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
+ ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f);
static void ngx_quic_queue_frame(ngx_quic_connection_t *qc,
ngx_quic_frame_t *frame);
@@ -885,6 +890,18 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
ack_this = 1;
break;
+ case NGX_QUIC_FT_STREAM_DATA_BLOCKED:
+
+ if (ngx_quic_handle_stream_data_blocked_frame(c, pkt,
+ &frame.u.stream_data_blocked)
+ != NGX_OK)
+ {
+ return NGX_ERROR;
+ }
+
+ ack_this = 1;
+ break;
+
default:
return NGX_ERROR;
}
@@ -1002,6 +1019,7 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f)
{
ngx_buf_t *b;
+ ngx_event_t *rev;
ngx_quic_connection_t *qc;
ngx_quic_stream_node_t *sn;
@@ -1013,15 +1031,24 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
b = sn->b;
- if ((size_t) (b->end - b->pos) < f->length) {
+ if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
return NGX_ERROR;
}
- ngx_memcpy(b->pos, f->data, f->length);
- b->pos += f->length;
+ if ((size_t) (b->end - b->last) < f->length) {
+ b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
+ b->pos = b->start;
+ }
+
+ b->last = ngx_cpymem(b->last, f->data, f->length);
- // TODO: notify
+ rev = sn->c->read;
+ rev->ready = 1;
+
+ if (rev->active) {
+ rev->handler(rev);
+ }
return NGX_OK;
}
@@ -1071,6 +1098,48 @@ ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
}
+static ngx_int_t
+ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
+ ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
+{
+ size_t n;
+ ngx_buf_t *b;
+ ngx_quic_frame_t *frame;
+ ngx_quic_connection_t *qc;
+ ngx_quic_stream_node_t *sn;
+
+ qc = c->quic;
+ sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
+
+ if (sn == NULL) {
+ ngx_log_error(NGX_LOG_INFO, c->log, 0, "unknown stream id:%uL", f->id);
+ return NGX_ERROR;
+ }
+
+ b = sn->b;
+ n = (b->pos - b->start) + (b->end - b->last);
+
+ frame = ngx_pcalloc(c->pool, sizeof(ngx_quic_frame_t));
+ if (frame == NULL) {
+ return NGX_ERROR;
+ }
+
+ frame->level = pkt->level;
+ frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
+ frame->u.max_stream_data.id = f->id;
+ frame->u.max_stream_data.limit = n;
+
+ ngx_sprintf(frame->info, "MAX_STREAM_DATA id:%d limit:%d level=%d",
+ (int) frame->u.max_stream_data.id,
+ (int) frame->u.max_stream_data.limit,
+ frame->level);
+
+ ngx_quic_queue_frame(c->quic, frame);
+
+ return NGX_OK;
+}
+
+
static void
ngx_quic_queue_frame(ngx_quic_connection_t *qc, ngx_quic_frame_t *frame)
{
@@ -1349,6 +1418,7 @@ ngx_quic_find_stream(ngx_rbtree_t *rbtree, ngx_uint_t key)
static ngx_quic_stream_node_t *
ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id)
{
+ size_t n;
ngx_log_t *log;
ngx_pool_t *pool;
ngx_event_t *rev, *wev;
@@ -1402,8 +1472,11 @@ ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id)
sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
+ n = ngx_max(NGX_QUIC_STREAM_BUFSIZE,
+ qc->tp.initial_max_stream_data_bidi_remote);
+
sn->node.key =id;
- sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone
+ sn->b = ngx_create_temp_buf(pool, n);
if (sn->b == NULL) {
return NULL;
}
@@ -1456,11 +1529,10 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
b = sn->b;
- if (b->last - b->pos == 0) {
+ if (b->pos == b->last) {
c->read->ready = 0;
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic recv() not ready");
- return NGX_AGAIN; // ?
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic recv() not ready");
+ return NGX_AGAIN;
}
len = ngx_min(b->last - b->pos, (ssize_t) size);
@@ -1469,6 +1541,11 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
b->pos += len;
+ if (b->pos == b->last) {
+ b->pos = b->start;
+ b->last = b->start;
+ }
+
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic recv: %z of %uz", len, size);