]> git.kaiwu.me - haproxy.git/commitdiff
TMP
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 16 Oct 2024 16:17:14 +0000 (18:17 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 21 Oct 2024 09:08:27 +0000 (11:08 +0200)
include/haproxy/quic_conn.h
include/haproxy/quic_tx-t.h
src/mux_quic.c
src/quic_tx.c

index 212e187ba667c88d54e29a5c04a1a2c724796de3..36bff3be8bcb752383dc6c1eeae38efbcd0a8d13 100644 (file)
@@ -163,7 +163,7 @@ void quic_set_connection_close(struct quic_conn *qc, const struct quic_err err);
 void quic_set_tls_alert(struct quic_conn *qc, int alert);
 int quic_set_app_ops(struct quic_conn *qc, const unsigned char *alpn, size_t alpn_len);
 int qc_check_dcid(struct quic_conn *qc, unsigned char *dcid, size_t dcid_len);
-int qc_send_mux(struct quic_conn *qc, struct list *frms);
+enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms, int max_pkts);
 
 void qc_notify_err(struct quic_conn *qc);
 int qc_notify_send(struct quic_conn *qc);
index efbdfe68703ba1ffc7fb644dc0365357f98dee24..5f7e2342260672de3d08a23e548d38c1fc6058f4 100644 (file)
@@ -64,4 +64,10 @@ enum qc_build_pkt_err {
        QC_BUILD_PKT_ERR_BUFROOM,  /* no more room in input buf or congestion window */
 };
 
+enum quic_tx_err {
+       QUIC_TX_ERR_NONE,
+       QUIC_TX_ERR_AGAIN,
+       QUIC_TX_ERR_FATAL,
+};
+
 #endif /* _HAPROXY_TX_T_H */
index 895f227502e50d824bc3bdecb83a05c02f9e0d4c..4119b180774e0445961fa12d1ba4bc3939623f1d 100644 (file)
@@ -22,6 +22,7 @@
 #include <haproxy/quic_sock.h>
 #include <haproxy/quic_stream.h>
 #include <haproxy/quic_tp-t.h>
+#include <haproxy/quic_tx-t.h>
 #include <haproxy/session.h>
 #include <haproxy/ssl_sock-t.h>
 #include <haproxy/stconn.h>
@@ -388,6 +389,13 @@ static void qcc_refresh_timeout(struct qcc *qcc)
 
 static void qcc_wakeup(struct qcc *qcc)
 {
+       HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+       tasklet_wakeup(qcc->wait_event.tasklet);
+}
+
+static void qcc_wakeup_pacing(struct qcc *qcc)
+{
+       HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
        tasklet_wakeup(qcc->wait_event.tasklet);
 }
 
@@ -2076,36 +2084,42 @@ static int qcc_subscribe_send(struct qcc *qcc)
  *
  * Returns 0 if all data sent with success else non-zero.
  */
-static int qcc_send_frames(struct qcc *qcc, struct list *frms)
+static int qcc_send_frames(struct qcc *qcc, struct list *frms, int strm_content)
 {
+       enum quic_tx_err ret;
+       int max_burst = strm_content ? global.tune.quic_frontend_max_tx_burst : 0;
+
        TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
 
        if (LIST_ISEMPTY(frms)) {
                TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
-               return 1;
+               return -1;
        }
 
-       if (!qc_send_mux(qcc->conn->handle.qc, frms)) {
+       ret = qc_send_mux(qcc->conn->handle.qc, frms, max_burst);
+       if (ret == QUIC_TX_ERR_FATAL) {
                TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
                qcc_subscribe_send(qcc);
                goto err;
        }
 
+       BUG_ON(ret == QUIC_TX_ERR_AGAIN && !max_burst);
+
        /* If there is frames left at this stage, transport layer is blocked.
         * Subscribe on it to retry later.
         */
-       if (!LIST_ISEMPTY(frms)) {
+       if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_AGAIN) {
                TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
                qcc_subscribe_send(qcc);
                goto err;
        }
 
        TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
-       return 0;
+       return ret == QUIC_TX_ERR_AGAIN ? 1 : 0;
 
  err:
        TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
-       return 1;
+       return -1;
 }
 
 /* Emit a RESET_STREAM on <qcs>.
@@ -2130,7 +2144,7 @@ static int qcs_send_reset(struct qcs *qcs)
        frm->reset_stream.final_size = qcs->tx.fc.off_real;
 
        LIST_APPEND(&frms, &frm->list);
-       if (qcc_send_frames(qcs->qcc, &frms)) {
+       if (qcc_send_frames(qcs->qcc, &frms, 0)) {
                if (!LIST_ISEMPTY(&frms))
                        qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
                TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2181,7 +2195,7 @@ static int qcs_send_stop_sending(struct qcs *qcs)
        frm->stop_sending.app_error_code = qcs->err;
 
        LIST_APPEND(&frms, &frm->list);
-       if (qcc_send_frames(qcs->qcc, &frms)) {
+       if (qcc_send_frames(qcs->qcc, &frms, 0)) {
                if (!LIST_ISEMPTY(&frms))
                        qc_frm_free(qcc->conn->handle.qc, &frm);
                TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2286,7 +2300,7 @@ static int qcc_io_send(struct qcc *qcc)
        }
 
        if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
-               if (qcc_send_frames(qcc, &qcc->lfctl.frms)) {
+               if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
                        TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
                        goto out;
                }
@@ -2365,7 +2379,7 @@ static int qcc_io_send(struct qcc *qcc)
        /* Retry sending until no frame to send, data rejected or connection
         * flow-control limit reached.
         */
-       while (qcc_send_frames(qcc, &qcc->tx.frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
+       while ((ret = qcc_send_frames(qcc, &qcc->tx.frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
                window_conn = qfctl_rcap(&qcc->tx.fc);
                resent = 0;
 
@@ -2397,7 +2411,10 @@ static int qcc_io_send(struct qcc *qcc)
 
  sent_done:
        /* Deallocate frames that the transport layer has rejected. */
-       if (!LIST_ISEMPTY(&qcc->tx.frms)) {
+       if (ret == 1) {
+               qcc_wakeup_pacing(qcc);
+       }
+       else if (!LIST_ISEMPTY(&qcc->tx.frms)) {
                struct quic_frame *frm, *frm2;
 
                list_for_each_entry_safe(frm, frm2, &qcc->tx.frms, list)
@@ -2751,12 +2768,38 @@ static void qcc_release(struct qcc *qcc)
        TRACE_LEAVE(QMUX_EV_QCC_END);
 }
 
+static int qcc_purge_sending(struct qcc *qcc)
+{
+       int ret;
+
+       //fprintf(stderr, "%s\n", __func__);
+       ret = qcc_send_frames(qcc, &qcc->tx.frms, 1);
+       if (ret > 0) {
+               qcc_wakeup_pacing(qcc);
+               return 1;
+       }
+
+       return 0;
+}
+
 struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
 {
        struct qcc *qcc = ctx;
 
        TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
 
+       if (status & TASK_F_USR1) {
+               qcc_purge_sending(qcc);
+               return NULL;
+       }
+       else {
+               while (!LIST_ISEMPTY(&qcc->tx.frms)) {
+                       struct quic_frame *frm = LIST_ELEM(qcc->tx.frms.n, struct quic_frame *, list);
+                       qc_frm_free(qcc->conn->handle.qc, &frm);
+               }
+               LIST_INIT(&qcc->tx.frms);
+       }
+
        if (!(qcc->wait_event.events & SUB_RETRY_SEND))
                qcc_io_send(qcc);
 
index 2f2201f8ccca524deaacf3b094e5b92b2ef941ab..c6cf77e4ea1015890db13a29c9af75ec55a0dc0a 100644 (file)
@@ -468,10 +468,12 @@ int qc_purge_txbuf(struct quic_conn *qc, struct buffer *buf)
  *
  * Returns the result from qc_send() function.
  */
-int qc_send_mux(struct quic_conn *qc, struct list *frms)
+enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms,
+                             int max_dgram)
 {
        struct list send_list = LIST_HEAD_INIT(send_list);
-       int ret;
+       enum quic_tx_err ret = QUIC_TX_ERR_NONE;
+       int max = max_dgram;
 
        TRACE_ENTER(QUIC_EV_CONN_TXPKT, qc);
        BUG_ON(qc->mux_state != QC_MUX_READY); /* Only MUX can uses this function so it must be ready. */
@@ -479,7 +481,7 @@ int qc_send_mux(struct quic_conn *qc, struct list *frms)
        if (qc->conn->flags & CO_FL_SOCK_WR_SH) {
                qc->conn->flags |= CO_FL_ERROR | CO_FL_SOCK_RD_SH;
                TRACE_DEVEL("connection on error", QUIC_EV_CONN_TXPKT, qc);
-               return 0;
+               return QUIC_TX_ERR_FATAL;
        }
 
        /* Try to send post handshake frames first unless on 0-RTT. */
@@ -492,7 +494,15 @@ int qc_send_mux(struct quic_conn *qc, struct list *frms)
 
        TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc);
        qel_register_send(&send_list, qc->ael, frms);
-       ret = qc_send(qc, 0, &send_list, NULL);
+       if (!qc_send(qc, 0, &send_list, max_dgram ? &max : NULL)) {
+               ret = QUIC_TX_ERR_FATAL;
+               ABORT_NOW();
+       }
+
+       if (max_dgram && !max) {
+               ret = QUIC_TX_ERR_AGAIN;
+               //ABORT_NOW();
+       }
 
        TRACE_LEAVE(QUIC_EV_CONN_TXPKT, qc);
        return ret;