]> git.kaiwu.me - haproxy.git/commitdiff
MAJOR: mux-quic: switch to ncbmbuf for stream Rx 20260410-quic-use-ncbmbuf-for-streams
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 10 Apr 2026 09:01:43 +0000 (11:01 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 10 Apr 2026 09:01:43 +0000 (11:01 +0200)
include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
include/haproxy/ncbmbuf.h
src/mux_quic.c
src/ncbmbuf.c

index a8c2f5c83d3fa276731c376d49c757bcf8be869f..2e604b3eab94e12a78c61c943117e39f97c139dd 100644 (file)
@@ -12,7 +12,7 @@
 #include <haproxy/connection-t.h>
 #include <haproxy/htx-t.h>
 #include <haproxy/list-t.h>
-#include <haproxy/ncbuf-t.h>
+#include <haproxy/ncbmbuf-t.h>
 #include <haproxy/quic_fctl-t.h>
 #include <haproxy/quic_frame-t.h>
 #include <haproxy/quic_pacing-t.h>
@@ -153,7 +153,7 @@ enum qcs_state {
  */
 struct qc_stream_rxbuf {
        struct eb64_node off_node; /* base offset of current buffer, node for QCS rx.bufs */
-       struct ncbuf ncb;          /* data storage with support for out of order offset */
+       struct ncbmbuf ncb;        /* data storage with support for out of order offset */
        uint64_t off_end;          /* first offset directly outside of current buffer */
 };
 
index 2c808629781f021bbf905cfc798ad330f113d3e0..dd9e4ac815ad09c89569fe029ea39f60b411091b 100644 (file)
@@ -53,7 +53,8 @@ int qcc_recv_stop_sending(struct qcc *qcc, uint64_t id, uint64_t err);
 
 static inline int qmux_stream_rx_bufsz(void)
 {
-       return global.tune.bufsize - NCB_RESERVED_SZ;
+       ncb_sz_t size_bm = (global.tune.bufsize + 8) / 9;
+       return global.tune.bufsize - size_bm;
 }
 
 #define QCS_ID_TYPE_MASK         0x3
index 4974f53cfe0e1cd6523324ac6500da3f9c15533f..306eeec9e0ce8d0b0a260849ca824fcc683f6bea 100644 (file)
@@ -42,6 +42,8 @@ static inline ncb_sz_t ncbmb_size(const struct ncbmbuf *buf)
 
 int ncbmb_is_empty(const struct ncbmbuf *buf);
 
+int ncbmb_is_full(const struct ncbmbuf *buf);
+
 ncb_sz_t ncbmb_data(const struct ncbmbuf *buf, ncb_sz_t offset);
 
 enum ncb_ret ncbmb_add(struct ncbmbuf *buf, ncb_sz_t off,
index 0ea65fefc9d6e05ddf2811cce28b0acc4ebea6a1..8f7a45c225af5845dfe946853aff2eaaf42c1f2c 100644 (file)
@@ -12,7 +12,7 @@
 #include <haproxy/h3.h>
 #include <haproxy/list.h>
 #include <haproxy/mux_quic_qstrm.h>
-#include <haproxy/ncbuf.h>
+#include <haproxy/ncbmbuf.h>
 #include <haproxy/pool.h>
 #include <haproxy/proxy.h>
 #include <haproxy/qmux_http.h>
@@ -53,16 +53,16 @@ static int qcc_is_pacing_active(const struct connection *conn)
 /* Free <rxbuf> instance and its inner data storage attached to <qcs> stream. */
 static void qcs_free_rxbuf(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
 {
-       struct ncbuf *ncbuf;
+       struct ncbmbuf *ncbuf;
        struct buffer buf;
 
        ncbuf = &rxbuf->ncb;
-       if (!ncb_is_null(ncbuf)) {
+       if (!ncbmb_is_null(ncbuf)) {
                buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
                b_free(&buf);
                offer_buffers(NULL, 1);
        }
-       rxbuf->ncb = NCBUF_NULL;
+       rxbuf->ncb = NCBMBUF_NULL;
 
        /* Reset DEM_FULL as buffer is released. This ensures mux is not woken
         * up from rcv_buf stream callback when demux was previously blocked.
@@ -504,16 +504,16 @@ int qcs_is_close_remote(struct qcs *qcs)
  *
  * Returns the buffer instance or NULL on allocation failure.
  */
-static struct ncbuf *qcs_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
+static struct ncbmbuf *qcs_get_ncbuf(struct qcs *qcs, struct ncbmbuf *ncbuf)
 {
        struct buffer buf = BUF_NULL;
 
-       if (ncb_is_null(ncbuf)) {
+       if (ncbmb_is_null(ncbuf)) {
                if (!b_alloc(&buf, DB_MUX_RX))
                        return NULL;
 
-               *ncbuf = ncb_make(buf.area, buf.size, 0);
-               ncb_init(ncbuf, 0);
+               *ncbuf = ncbmb_make(buf.area, buf.size, 0);
+               ncbmb_init(ncbuf, 0);
        }
 
        return ncbuf;
@@ -1151,8 +1151,8 @@ int qcc_get_qcs(struct qcc *qcc, uint64_t id, int receive_only, int send_only,
 static inline struct buffer qcs_b_dup(const struct qc_stream_rxbuf *b)
 {
        if (b) {
-               const struct ncbuf *ncb = &b->ncb;
-               return b_make(ncb_orig(ncb), ncb->size, ncb->head, ncb_data(ncb, 0));
+               const struct ncbmbuf *ncb = &b->ncb;
+               return b_make(ncbmb_orig(ncb), ncb->size, ncb->head, ncbmb_data(ncb, 0));
        }
        else {
                return BUF_NULL;
@@ -1174,7 +1174,7 @@ static int qcs_transfer_rx_data(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
        size_t to_copy;
        int ret = 1;
 
-       BUG_ON(ncb_is_full(&rxbuf->ncb));
+       BUG_ON(ncbmb_is_full(&rxbuf->ncb));
 
        next = eb64_next(&rxbuf->off_node);
        if (!next)
@@ -1182,19 +1182,19 @@ static int qcs_transfer_rx_data(struct qcs *qcs, struct qc_stream_rxbuf *rxbuf)
 
        rxbuf_next = container_of(next, struct qc_stream_rxbuf, off_node);
        if (rxbuf_next->off_node.key == rxbuf->off_end &&
-           ncb_data(&rxbuf_next->ncb, 0)) {
+           ncbmb_data(&rxbuf_next->ncb, 0)) {
                eb64_delete(&rxbuf->off_node);
                eb64_delete(next);
 
                b = qcs_b_dup(rxbuf);
                b_next = qcs_b_dup(rxbuf_next);
-               to_copy = MIN(b_data(&b_next), ncb_size(&rxbuf->ncb) - b_data(&b));
+               to_copy = MIN(b_data(&b_next), ncbmb_size(&rxbuf->ncb) - b_data(&b));
 
-               ncb_ret = ncb_add(&rxbuf->ncb, ncb_data(&rxbuf->ncb, 0),
-                                 b_head(&b_next), to_copy, NCB_ADD_COMPARE);
+               ncb_ret = ncbmb_add(&rxbuf->ncb, ncbmb_data(&rxbuf->ncb, 0),
+                                   b_head(&b_next), to_copy, NCB_ADD_OVERWRT);
                BUG_ON(ncb_ret != NCB_RET_OK);
 
-               ncb_ret = ncb_advance(&rxbuf_next->ncb, to_copy);
+               ncb_ret = ncbmb_advance(&rxbuf_next->ncb, to_copy);
                BUG_ON(ncb_ret != NCB_RET_OK);
 
                rxbuf->off_node.key = qcs->rx.offset;
@@ -1251,7 +1251,7 @@ static struct qc_stream_rxbuf *qcs_get_curr_rxbuf(struct qcs *qcs)
 static ncb_sz_t qcs_rx_avail_data(struct qcs *qcs)
 {
        struct qc_stream_rxbuf *b = qcs_get_curr_rxbuf(qcs);
-       return b ? ncb_data(&b->ncb, 0) : 0;
+       return b ? ncbmb_data(&b->ncb, 0) : 0;
 }
 
 /* Remove <bytes> from <buf> current Rx buffer of <qcs> stream. Flow-control
@@ -1270,7 +1270,7 @@ static void qcs_consume(struct qcs *qcs, uint64_t bytes, struct qc_stream_rxbuf
        BUG_ON_HOT(buf->off_node.key > qcs->rx.offset ||
                   qcs->rx.offset >= buf->off_end);
 
-       ret = ncb_advance(&buf->ncb, bytes);
+       ret = ncbmb_advance(&buf->ncb, bytes);
        if (ret) {
                ABORT_NOW(); /* should not happens because removal only in data */
        }
@@ -1396,7 +1396,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
                 * restart decoding.
                 */
                if (!ret && rxbuf && !(qcs->flags & QC_SF_DEM_FULL) &&
-                   qcs->rx.offset + ncb_data(&rxbuf->ncb, 0) == rxbuf->off_end) {
+                   qcs->rx.offset + ncbmb_data(&rxbuf->ncb, 0) == rxbuf->off_end) {
                        if (!qcs_transfer_rx_data(qcs, rxbuf)) {
                                TRACE_DEVEL("restart parsing after data realignment", QMUX_EV_QCS_RECV, qcc->conn, qcs);
                                goto restart;
@@ -1419,7 +1419,7 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
                if (ret)
                        qcs_consume(qcs, ret, rxbuf);
 
-               if (ncb_is_empty(&rxbuf->ncb)) {
+               if (ncbmb_is_empty(&rxbuf->ncb)) {
                        qcs_free_rxbuf(qcs, rxbuf);
 
                        /* Close QCS remotely if only one Rx buffer remains and
@@ -1787,7 +1787,7 @@ static struct qc_stream_rxbuf *qcs_get_rxbuf(struct qcs *qcs, uint64_t offset,
        struct qcc *qcc = qcs->qcc;
        struct eb64_node *node;
        struct qc_stream_rxbuf *buf;
-       struct ncbuf *ncbuf;
+       struct ncbmbuf *ncbuf;
 
        TRACE_ENTER(QMUX_EV_QCS_RECV, qcs->qcc->conn, qcs);
 
@@ -1805,7 +1805,7 @@ static struct qc_stream_rxbuf *qcs_get_rxbuf(struct qcs *qcs, uint64_t offset,
                        goto err;
                }
 
-               buf->ncb = NCBUF_NULL;
+               buf->ncb = NCBMBUF_NULL;
                buf->off_node.key = aligned_off;
                buf->off_end = aligned_off + qmux_stream_rx_bufsz();
                eb64_insert(&qcs->rx.bufs, &buf->off_node);
@@ -1813,7 +1813,7 @@ static struct qc_stream_rxbuf *qcs_get_rxbuf(struct qcs *qcs, uint64_t offset,
        }
 
        ncbuf = &buf->ncb;
-       if (!qcs_get_ncbuf(qcs, ncbuf) || ncb_is_null(ncbuf)) {
+       if (!qcs_get_ncbuf(qcs, ncbuf) || ncbmb_is_null(ncbuf)) {
                TRACE_ERROR("receive ncbuf alloc failure", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
                goto err;
        }
@@ -1938,8 +1938,6 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
        left = len;
        while (left) {
                struct qc_stream_rxbuf *buf;
-               struct proxy *px;
-               struct quic_counters *prx_counters;
                ncb_sz_t ncb_off;
 
                buf = qcs_get_rxbuf(qcs, offset, &len);
@@ -1952,34 +1950,13 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
                /* For oldest buffer, ncb_advance() may already have been performed. */
                ncb_off = offset - MAX(qcs->rx.offset, buf->off_node.key);
 
-               ncb_ret = ncb_add(&buf->ncb, ncb_off, data, len, NCB_ADD_COMPARE);
+               ncb_ret = ncbmb_add(&buf->ncb, ncb_off, data, len, NCB_ADD_OVERWRT);
                switch (ncb_ret) {
                case NCB_RET_OK:
                        break;
 
-               case NCB_RET_DATA_REJ:
-                       /* RFC 9000 2.2. Sending and Receiving Data
-                        *
-                        * An endpoint could receive data for a stream at the
-                        * same stream offset multiple times. Data that has
-                        * already been received can be discarded. The data at
-                        * a given offset MUST NOT change if it is sent
-                        * multiple times; an endpoint MAY treat receipt of
-                        * different data at the same offset within a stream as
-                        * a connection error of type PROTOCOL_VIOLATION.
-                        */
-                       TRACE_ERROR("overlapping data rejected", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV|QMUX_EV_PROTO_ERR,
-                                   qcc->conn, qcs);
-                       qcc_set_error(qcc, QC_ERR_PROTOCOL_VIOLATION, 0);
-                       return 1;
-
-               case NCB_RET_GAP_SIZE:
-                       TRACE_DATA("cannot bufferize frame due to gap size limit", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
-                                  qcc->conn, qcs);
-                       px = qcc->proxy;
-                       prx_counters = EXTRA_COUNTERS_GET(px->extra_counters_fe, &quic_stats_module);
-                       HA_ATOMIC_INC(&prx_counters->ncbuf_gap_limit);
-                       return 1;
+               default:
+                       ABORT_NOW();
                }
 
                offset += len;
index 222737ea694715750365866889073cc985871016..69c6aafea6186f6ff72fd553764b1eae3996e894 100644 (file)
@@ -218,8 +218,14 @@ int ncbmb_is_empty(const struct ncbmbuf *buf)
 
 int ncbmb_is_full(const struct ncbmbuf *buf)
 {
-       /* TODO */
-       return 0;
+       size_t i = 0;
+
+       for (i = 0; i < buf->size_bm; ++i) {
+               if (!buf->bitmap[i])
+                       return 0;
+       }
+
+       return 1;
 }
 
 int ncbmb_is_fragmented(const struct ncbmbuf *buf)