aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/event/ngx_event_quic.c95
-rw-r--r--src/event/ngx_event_quic_transport.c28
2 files changed, 114 insertions, 9 deletions
diff --git a/src/event/ngx_event_quic.c b/src/event/ngx_event_quic.c
index 51668dd69..4afa86bb0 100644
--- a/src/event/ngx_event_quic.c
+++ b/src/event/ngx_event_quic.c
@@ -16,6 +16,9 @@ typedef enum {
} ngx_quic_state_t;
+#define NGX_QUIC_STREAM_BUFSIZE 16384
+
+
typedef struct {
ngx_rbtree_node_t node;
ngx_buf_t *b;
@@ -106,6 +109,8 @@ static ngx_int_t ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *frame);
static ngx_int_t ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_streams_blocked_frame_t *f);
+static ngx_int_t ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
+ ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f);
static void ngx_quic_queue_frame(ngx_quic_connection_t *qc,
ngx_quic_frame_t *frame);
@@ -885,6 +890,18 @@ ngx_quic_payload_handler(ngx_connection_t *c, ngx_quic_header_t *pkt)
ack_this = 1;
break;
+ case NGX_QUIC_FT_STREAM_DATA_BLOCKED:
+
+ if (ngx_quic_handle_stream_data_blocked_frame(c, pkt,
+ &frame.u.stream_data_blocked)
+ != NGX_OK)
+ {
+ return NGX_ERROR;
+ }
+
+ ack_this = 1;
+ break;
+
default:
return NGX_ERROR;
}
@@ -1002,6 +1019,7 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_quic_header_t *pkt, ngx_quic_stream_frame_t *f)
{
ngx_buf_t *b;
+ ngx_event_t *rev;
ngx_quic_connection_t *qc;
ngx_quic_stream_node_t *sn;
@@ -1013,15 +1031,24 @@ ngx_quic_handle_stream_frame(ngx_connection_t *c,
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "existing stream");
b = sn->b;
- if ((size_t) (b->end - b->pos) < f->length) {
+ if ((size_t) ((b->pos - b->start) + (b->end - b->last)) < f->length) {
ngx_log_error(NGX_LOG_INFO, c->log, 0, "no space in stream buffer");
return NGX_ERROR;
}
- ngx_memcpy(b->pos, f->data, f->length);
- b->pos += f->length;
+ if ((size_t) (b->end - b->last) < f->length) {
+ b->last = ngx_movemem(b->start, b->pos, b->last - b->pos);
+ b->pos = b->start;
+ }
+
+ b->last = ngx_cpymem(b->last, f->data, f->length);
- // TODO: notify
+ rev = sn->c->read;
+ rev->ready = 1;
+
+ if (rev->active) {
+ rev->handler(rev);
+ }
return NGX_OK;
}
@@ -1071,6 +1098,48 @@ ngx_quic_handle_streams_blocked_frame(ngx_connection_t *c,
}
+static ngx_int_t
+ngx_quic_handle_stream_data_blocked_frame(ngx_connection_t *c,
+ ngx_quic_header_t *pkt, ngx_quic_stream_data_blocked_frame_t *f)
+{
+ size_t n;
+ ngx_buf_t *b;
+ ngx_quic_frame_t *frame;
+ ngx_quic_connection_t *qc;
+ ngx_quic_stream_node_t *sn;
+
+ qc = c->quic;
+ sn = ngx_quic_find_stream(&qc->streams.tree, f->id);
+
+ if (sn == NULL) {
+ ngx_log_error(NGX_LOG_INFO, c->log, 0, "unknown stream id:%uL", f->id);
+ return NGX_ERROR;
+ }
+
+ b = sn->b;
+ n = (b->pos - b->start) + (b->end - b->last);
+
+ frame = ngx_pcalloc(c->pool, sizeof(ngx_quic_frame_t));
+ if (frame == NULL) {
+ return NGX_ERROR;
+ }
+
+ frame->level = pkt->level;
+ frame->type = NGX_QUIC_FT_MAX_STREAM_DATA;
+ frame->u.max_stream_data.id = f->id;
+ frame->u.max_stream_data.limit = n;
+
+ ngx_sprintf(frame->info, "MAX_STREAM_DATA id:%d limit:%d level=%d",
+ (int) frame->u.max_stream_data.id,
+ (int) frame->u.max_stream_data.limit,
+ frame->level);
+
+ ngx_quic_queue_frame(c->quic, frame);
+
+ return NGX_OK;
+}
+
+
static void
ngx_quic_queue_frame(ngx_quic_connection_t *qc, ngx_quic_frame_t *frame)
{
@@ -1349,6 +1418,7 @@ ngx_quic_find_stream(ngx_rbtree_t *rbtree, ngx_uint_t key)
static ngx_quic_stream_node_t *
ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id)
{
+ size_t n;
ngx_log_t *log;
ngx_pool_t *pool;
ngx_event_t *rev, *wev;
@@ -1402,8 +1472,11 @@ ngx_quic_create_stream(ngx_connection_t *c, ngx_uint_t id)
sn->c->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
+ n = ngx_max(NGX_QUIC_STREAM_BUFSIZE,
+ qc->tp.initial_max_stream_data_bidi_remote);
+
sn->node.key =id;
- sn->b = ngx_create_temp_buf(pool, 16 * 1024); // XXX enough for everyone
+ sn->b = ngx_create_temp_buf(pool, n);
if (sn->b == NULL) {
return NULL;
}
@@ -1456,11 +1529,10 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
b = sn->b;
- if (b->last - b->pos == 0) {
+ if (b->pos == b->last) {
c->read->ready = 0;
- ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
- "quic recv() not ready");
- return NGX_AGAIN; // ?
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0, "quic recv() not ready");
+ return NGX_AGAIN;
}
len = ngx_min(b->last - b->pos, (ssize_t) size);
@@ -1469,6 +1541,11 @@ ngx_quic_stream_recv(ngx_connection_t *c, u_char *buf, size_t size)
b->pos += len;
+ if (b->pos == b->last) {
+ b->pos = b->start;
+ b->last = b->start;
+ }
+
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"quic recv: %z of %uz", len, size);
diff --git a/src/event/ngx_event_quic_transport.c b/src/event/ngx_event_quic_transport.c
index de49d2734..c1f2fc992 100644
--- a/src/event/ngx_event_quic_transport.c
+++ b/src/event/ngx_event_quic_transport.c
@@ -69,6 +69,8 @@ static size_t ngx_quic_create_crypto(u_char *p,
static size_t ngx_quic_create_stream(u_char *p, ngx_quic_stream_frame_t *sf);
static size_t ngx_quic_create_max_streams(u_char *p,
ngx_quic_max_streams_frame_t *ms);
+static size_t ngx_quic_create_max_stream_data(u_char *p,
+ ngx_quic_max_stream_data_frame_t *ms);
static size_t ngx_quic_create_close(u_char *p, ngx_quic_close_frame_t *cl);
static ngx_int_t ngx_quic_parse_transport_param(u_char *p, u_char *end,
@@ -1079,6 +1081,9 @@ ngx_quic_create_frame(u_char *p, u_char *end, ngx_quic_frame_t *f)
case NGX_QUIC_FT_MAX_STREAMS:
return ngx_quic_create_max_streams(p, &f->u.max_streams);
+ case NGX_QUIC_FT_MAX_STREAM_DATA:
+ return ngx_quic_create_max_stream_data(p, &f->u.max_stream_data);
+
default:
/* BUG: unsupported frame type generated */
return NGX_ERROR;
@@ -1459,6 +1464,29 @@ ngx_quic_parse_transport_params(u_char *p, u_char *end, ngx_quic_tp_t *tp,
}
+static size_t
+ngx_quic_create_max_stream_data(u_char *p, ngx_quic_max_stream_data_frame_t *ms)
+{
+ size_t len;
+ u_char *start;
+
+ if (p == NULL) {
+ len = ngx_quic_varint_len(NGX_QUIC_FT_MAX_STREAM_DATA);
+ len += ngx_quic_varint_len(ms->id);
+ len += ngx_quic_varint_len(ms->limit);
+ return len;
+ }
+
+ start = p;
+
+ ngx_quic_build_int(&p, NGX_QUIC_FT_MAX_STREAM_DATA);
+ ngx_quic_build_int(&p, ms->id);
+ ngx_quic_build_int(&p, ms->limit);
+
+ return p - start;
+}
+
+
ssize_t
ngx_quic_create_transport_params(u_char *pos, u_char *end, ngx_quic_tp_t *tp)
{