]> git.kaiwu.me - haproxy.git/commitdiff
MEDIUM: mux-quic: implement QMux receive
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 27 Mar 2026 13:39:34 +0000 (14:39 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 2 Apr 2026 12:02:04 +0000 (14:02 +0200)
This patch implements a new function qcc_qstrm_recv() dedicated to the
new QMux protocol. It is responsible to perform data reception via
rcv_buf() callback. This is defined in a new mux_quic_strm module.

Read data are parsed in frames. Each frame is handled via standard
mux-quic functions. Currently, only STREAM and RESET_STREAM types are
implemented.

One major difference between QUIC and QMux is that mux-quic is passive
on the reception side on the former protocol. For the new one, mux-quic
becomes active. Thus, a new call to qcc_qstrm_recv() is performed via
qcc_io_recv().

Makefile
include/haproxy/mux_quic_qstrm.h [new file with mode: 0644]
src/mux_quic.c
src/mux_quic_qstrm.c [new file with mode: 0644]

index 064fed3af4d0187882840805c91729203a6fb5e3..44d014784351d4d8671bf1d282092c6344e6af83 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -670,7 +670,7 @@ OPTIONS_OBJS += src/mux_quic.o src/h3.o src/quic_rx.o src/quic_tx.o \
                 src/quic_cc_nocc.o src/quic_cc.o src/quic_pacing.o     \
                 src/h3_stats.o src/quic_stats.o src/qpack-enc.o                \
                 src/qpack-tbl.o src/quic_cc_drs.o src/quic_fctl.o      \
-                src/quic_enc.o
+                src/quic_enc.o src/mux_quic_qstrm.o
 endif
 
 ifneq ($(USE_QUIC_OPENSSL_COMPAT:0=),)
diff --git a/include/haproxy/mux_quic_qstrm.h b/include/haproxy/mux_quic_qstrm.h
new file mode 100644 (file)
index 0000000..3e537d4
--- /dev/null
@@ -0,0 +1,8 @@
+#ifndef _HAPROXY_MUX_QUIC_QSTRM_H
+#define _HAPROXY_MUX_QUIC_QSTRM_H
+
+#include <haproxy/mux_quic.h>
+
+int qcc_qstrm_recv(struct qcc *qcc);
+
+#endif /* _HAPROXY_MUX_QUIC_QSTRM_H */
index 2c36d8d9d68cc5bba531b85c29b7d7d70599e4af..f696941ff6e940e302b0cdcc7e190ea6c2b25564 100644 (file)
@@ -10,6 +10,7 @@
 #include <haproxy/global-t.h>
 #include <haproxy/h3.h>
 #include <haproxy/list.h>
+#include <haproxy/mux_quic_qstrm.h>
 #include <haproxy/ncbuf.h>
 #include <haproxy/pool.h>
 #include <haproxy/proxy.h>
@@ -3182,6 +3183,11 @@ static int qcc_io_recv(struct qcc *qcc)
        if ((qcc->flags & QC_CF_WAIT_HS) && !(qcc->wait_event.events & SUB_RETRY_RECV))
                qcc_wait_for_hs(qcc);
 
+       if (!conn_is_quic(qcc->conn)) {
+               if (!(qcc->wait_event.events & SUB_RETRY_RECV))
+                       qcc_qstrm_recv(qcc);
+       }
+
        while (!LIST_ISEMPTY(&qcc->recv_list)) {
                qcs = LIST_ELEM(qcc->recv_list.n, struct qcs *, el_recv);
                /* No need to add an uni local stream in recv_list. */
diff --git a/src/mux_quic_qstrm.c b/src/mux_quic_qstrm.c
new file mode 100644 (file)
index 0000000..17ab977
--- /dev/null
@@ -0,0 +1,112 @@
+#include <haproxy/mux_quic_qstrm.h>
+
+#include <haproxy/api.h>
+#include <haproxy/buf.h>
+#include <haproxy/chunk.h>
+#include <haproxy/connection.h>
+#include <haproxy/mux_quic.h>
+#include <haproxy/qmux_trace.h>
+#include <haproxy/quic_frame.h>
+#include <haproxy/trace.h>
+
+/* Returns true if <frm> type can be used for QMux protocol. */
+static int qstrm_is_frm_valid(const struct quic_frame *frm)
+{
+       return frm->type == QUIC_FT_PADDING ||
+         frm->type == QUIC_FT_RESET_STREAM ||
+         frm->type == QUIC_FT_STOP_SENDING ||
+         (frm->type >= QUIC_FT_STREAM_8 && frm->type <= QUIC_FT_STREAM_F) ||
+         frm->type == QUIC_FT_MAX_DATA ||
+         frm->type == QUIC_FT_MAX_STREAM_DATA ||
+         frm->type == QUIC_FT_MAX_STREAMS_BIDI ||
+         frm->type == QUIC_FT_MAX_STREAMS_UNI ||
+         frm->type == QUIC_FT_DATA_BLOCKED ||
+         frm->type == QUIC_FT_STREAM_DATA_BLOCKED ||
+         frm->type == QUIC_FT_STREAMS_BLOCKED_BIDI ||
+         frm->type == QUIC_FT_STREAMS_BLOCKED_UNI ||
+         frm->type == QUIC_FT_CONNECTION_CLOSE ||
+         frm->type == QUIC_FT_CONNECTION_CLOSE_APP;
+}
+
+/* Parse the next frame in <buf> and handle it by the MUX layer.
+ *
+ * Returns the frame length on success. If frame is truncated, 0 is returned.
+ * A negative error code is used for fatal failures.
+ */
+static int qstrm_parse_frm(struct qcc *qcc, struct buffer *buf)
+{
+       struct quic_frame frm;
+       const unsigned char *pos, *old, *end;
+       int ret;
+
+       old = pos = (unsigned char *)b_head(buf);
+       end = (unsigned char *)b_head(buf) + b_data(buf);
+       ret = qc_parse_frm_type(&frm, &pos, end, NULL);
+       BUG_ON(!ret);
+
+       if (!qstrm_is_frm_valid(&frm)) {
+               /* TODO close connection with FRAME_ENCODING_ERROR */
+               b_reset(buf);
+               return -1;
+       }
+
+       ret = qc_parse_frm_payload(&frm, &pos, end, NULL);
+       BUG_ON(!ret);
+
+       if (frm.type >= QUIC_FT_STREAM_8 &&
+           frm.type <= QUIC_FT_STREAM_F) {
+               struct qf_stream *strm_frm = &frm.stream;
+
+               qcc_recv(qcc, strm_frm->id, strm_frm->len, strm_frm->offset,
+                        (frm.type & QUIC_STREAM_FRAME_TYPE_FIN_BIT), (char *)strm_frm->data);
+       }
+       else if (frm.type == QUIC_FT_RESET_STREAM) {
+               struct qf_reset_stream *rst_frm = &frm.reset_stream;
+               qcc_recv_reset_stream(qcc, rst_frm->id, rst_frm->app_error_code, rst_frm->final_size);
+       }
+       else {
+               ABORT_NOW();
+       }
+
+       return pos - old;
+}
+
+/* Perform data reception for <qcc> connection. Content is parsed as QMux
+ * frames. These operations are performed in loop until read returns no data.
+ *
+ * Returns the total amount of read data or -1 on error.
+ */
+int qcc_qstrm_recv(struct qcc *qcc)
+{
+       /* TODO add a buffer on the connection for incomplete data read */
+       struct connection *conn = qcc->conn;
+       int total = 0, frm_ret;
+       size_t ret;
+
+       TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
+
+       do {
+               b_reset(&trash);
+               ret = conn->xprt->rcv_buf(conn, conn->xprt_ctx, &trash, trash.size, NULL, 0, 0);
+               BUG_ON(conn->flags & CO_FL_ERROR);
+
+               total += ret;
+               while (b_data(&trash)) {
+                       frm_ret = qstrm_parse_frm(qcc, &trash);
+                       BUG_ON(!frm_ret);
+
+                       b_del(&trash, frm_ret);
+               }
+       } while (ret > 0);
+
+       if (!conn_xprt_read0_pending(qcc->conn)) {
+               conn->xprt->subscribe(conn, conn->xprt_ctx, SUB_RETRY_RECV,
+                                     &qcc->wait_event);
+       }
+
+       TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
+       return total;
+
+ err:
+       return -1;
+}