#include <haproxy/quic_sock.h>
#include <haproxy/quic_stream.h>
#include <haproxy/quic_tp-t.h>
+#include <haproxy/quic_tx-t.h>
#include <haproxy/session.h>
#include <haproxy/ssl_sock-t.h>
#include <haproxy/stconn.h>
static void qcc_wakeup(struct qcc *qcc)
{
+ HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
+ tasklet_wakeup(qcc->wait_event.tasklet);
+}
+
+static void qcc_wakeup_pacing(struct qcc *qcc)
+{
+ HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
tasklet_wakeup(qcc->wait_event.tasklet);
}
*
* Returns 0 if all data sent with success else non-zero.
*/
-static int qcc_send_frames(struct qcc *qcc, struct list *frms)
+static int qcc_send_frames(struct qcc *qcc, struct list *frms, int strm_content)
{
+ enum quic_tx_err ret;
+ int max_burst = strm_content ? global.tune.quic_frontend_max_tx_burst : 0;
+
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
if (LIST_ISEMPTY(frms)) {
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
- return 1;
+ return -1;
}
- if (!qc_send_mux(qcc->conn->handle.qc, frms)) {
+ ret = qc_send_mux(qcc->conn->handle.qc, frms, max_burst);
+ if (ret == QUIC_TX_ERR_FATAL) {
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc);
goto err;
}
+ BUG_ON(ret == QUIC_TX_ERR_AGAIN && !max_burst);
+
/* If there is frames left at this stage, transport layer is blocked.
* Subscribe on it to retry later.
*/
- if (!LIST_ISEMPTY(frms)) {
+ if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_AGAIN) {
TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
qcc_subscribe_send(qcc);
goto err;
}
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
- return 0;
+ return ret == QUIC_TX_ERR_AGAIN ? 1 : 0;
err:
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
- return 1;
+ return -1;
}
/* Emit a RESET_STREAM on <qcs>.
frm->reset_stream.final_size = qcs->tx.fc.off_real;
LIST_APPEND(&frms, &frm->list);
- if (qcc_send_frames(qcs->qcc, &frms)) {
+ if (qcc_send_frames(qcs->qcc, &frms, 0)) {
if (!LIST_ISEMPTY(&frms))
qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
frm->stop_sending.app_error_code = qcs->err;
LIST_APPEND(&frms, &frm->list);
- if (qcc_send_frames(qcs->qcc, &frms)) {
+ if (qcc_send_frames(qcs->qcc, &frms, 0)) {
if (!LIST_ISEMPTY(&frms))
qc_frm_free(qcc->conn->handle.qc, &frm);
TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
}
if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
- if (qcc_send_frames(qcc, &qcc->lfctl.frms)) {
+ if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
goto out;
}
/* Retry sending until no frame to send, data rejected or connection
* flow-control limit reached.
*/
- while (qcc_send_frames(qcc, &qcc->tx.frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
+ while ((ret = qcc_send_frames(qcc, &qcc->tx.frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
window_conn = qfctl_rcap(&qcc->tx.fc);
resent = 0;
sent_done:
/* Deallocate frames that the transport layer has rejected. */
- if (!LIST_ISEMPTY(&qcc->tx.frms)) {
+ if (ret == 1) {
+ qcc_wakeup_pacing(qcc);
+ }
+ else if (!LIST_ISEMPTY(&qcc->tx.frms)) {
struct quic_frame *frm, *frm2;
list_for_each_entry_safe(frm, frm2, &qcc->tx.frms, list)
TRACE_LEAVE(QMUX_EV_QCC_END);
}
+static int qcc_purge_sending(struct qcc *qcc)
+{
+ int ret;
+
+ //fprintf(stderr, "%s\n", __func__);
+ ret = qcc_send_frames(qcc, &qcc->tx.frms, 1);
+ if (ret > 0) {
+ qcc_wakeup_pacing(qcc);
+ return 1;
+ }
+
+ return 0;
+}
+
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
{
struct qcc *qcc = ctx;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
+ if (status & TASK_F_USR1) {
+ qcc_purge_sending(qcc);
+ return NULL;
+ }
+ else {
+ while (!LIST_ISEMPTY(&qcc->tx.frms)) {
+ struct quic_frame *frm = LIST_ELEM(qcc->tx.frms.n, struct quic_frame *, list);
+ qc_frm_free(qcc->conn->handle.qc, &frm);
+ }
+ LIST_INIT(&qcc->tx.frms);
+ }
+
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
qcc_io_send(qcc);
*
* Returns the result from qc_send() function.
*/
-int qc_send_mux(struct quic_conn *qc, struct list *frms)
+enum quic_tx_err qc_send_mux(struct quic_conn *qc, struct list *frms,
+ int max_dgram)
{
struct list send_list = LIST_HEAD_INIT(send_list);
- int ret;
+ enum quic_tx_err ret = QUIC_TX_ERR_NONE;
+ int max = max_dgram;
TRACE_ENTER(QUIC_EV_CONN_TXPKT, qc);
BUG_ON(qc->mux_state != QC_MUX_READY); /* Only MUX can uses this function so it must be ready. */
if (qc->conn->flags & CO_FL_SOCK_WR_SH) {
qc->conn->flags |= CO_FL_ERROR | CO_FL_SOCK_RD_SH;
TRACE_DEVEL("connection on error", QUIC_EV_CONN_TXPKT, qc);
- return 0;
+ return QUIC_TX_ERR_FATAL;
}
/* Try to send post handshake frames first unless on 0-RTT. */
TRACE_STATE("preparing data (from MUX)", QUIC_EV_CONN_TXPKT, qc);
qel_register_send(&send_list, qc->ael, frms);
- ret = qc_send(qc, 0, &send_list, NULL);
+ if (!qc_send(qc, 0, &send_list, max_dgram ? &max : NULL)) {
+ ret = QUIC_TX_ERR_FATAL;
+ ABORT_NOW();
+ }
+
+ if (max_dgram && !max) {
+ ret = QUIC_TX_ERR_AGAIN;
+ //ABORT_NOW();
+ }
TRACE_LEAVE(QUIC_EV_CONN_TXPKT, qc);
return ret;