]> git.kaiwu.me - haproxy.git/commitdiff
MEDIUM: queues: Introduce a new field to know if queues are empty.
authorOlivier Houchard <ohouchard@haproxy.com>
Tue, 3 Feb 2026 02:44:58 +0000 (03:44 +0100)
committerOlivier Houchard <cognet@ci0.org>
Mon, 2 Mar 2026 12:46:17 +0000 (13:46 +0100)
Proxies and servers both have a queueslength entry, that indicates how
many entries there are in their request queues. Those can get a lot of
contention, and they are often accessed just to know if the queue is
empty or not, which it will always be when there is no maxconn.
So for both, introduce a new field, "queues_not_empty", in a different,
less contended cache line, and use it every time we just want to know if
the queue is empty or not.
Those are only changed when the queues goes from empty to non-empty, and
vice-versa, which hopefully should not be too often.
For proxies, they are protected by the proxy lock. For servers, they are
protected by a new lock contained in the server, the status_lock.
In both case, to prevent any race condition, once the relevant lock is
held, the value of queueslength should be checked again before deciding
to change or not queues_not_empty, to prevent any race condition.

include/haproxy/proxy-t.h
include/haproxy/queue.h
include/haproxy/server-t.h
src/backend.c
src/lb_chash.c
src/lb_fas.c
src/lb_fwlc.c
src/lb_fwrr.c
src/lb_map.c
src/queue.c
src/server.c

index 18b25ce81223c7f7054c81bb2ae7b9555d84443c..df580e4ad53a254dbb8b1764b0b04aabf2d3c210 100644 (file)
@@ -510,6 +510,7 @@ struct proxy {
        EXTRA_COUNTERS(extra_counters_fe);
        EXTRA_COUNTERS(extra_counters_be);
 
+       uint8_t queues_not_empty;               /* Are the request queues not empty ? Only changed when the queues go from non-empty to empty, and vice-versa. Protected by proxy lock */
        THREAD_ALIGN();
        /* these ones change all the time */
        int served;                             /* # of active sessions currently being served */
index 2509f2b40c83fee4cd85c5ecb6c084282bcc3bee..524750d8694b6ed5d0c123d2ed010fe702e9aba6 100644 (file)
@@ -86,7 +86,7 @@ static inline int server_has_room(const struct server *s) {
  * for and if/else usage.
  */
 static inline int may_dequeue_tasks(const struct server *s, const struct proxy *p) {
-       return (s && (s->queueslength || (p->queueslength && srv_currently_usable(s))) &&
+       return (s && (s->queues_not_empty || (p->queues_not_empty && srv_currently_usable(s))) &&
                (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)));
 }
 
index c8f318c5ce541def2c3414c91890989c8687b29e..d820caca1579a6f9ae8ee7f22fc54893c5a09978 100644 (file)
@@ -553,6 +553,8 @@ struct server {
        struct sockaddr_storage socks4_addr;    /* the address of the SOCKS4 Proxy, including the port */
 
        EXTRA_COUNTERS(extra_counters);
+       __decl_thread(HA_SPINLOCK_T state_lock);/* protect the following state fields */
+       uint8_t queues_not_empty;               /* Are the request queues not empty ? Only changed when the queues go from non-empty to empty, and vice-versa. Protected by the state_lock lock when changed */
 };
 
 /* data provided to EVENT_HDL_SUB_SERVER handlers through event_hdl facility */
index 109fba3aa785db6939c35c36e0261f5b41a4ea23..259a25381d6bb0676029434840d24091d2265205 100644 (file)
@@ -597,7 +597,7 @@ struct server *get_server_rnd(struct stream *s, const struct server *avoid)
         * the backend's queue instead.
         */
        if (curr &&
-           (curr->queueslength || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr))))
+           (curr->queues_not_empty || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr))))
                curr = NULL;
 
        return curr;
@@ -1158,10 +1158,21 @@ int assign_server_and_queue(struct stream *s)
 
                                        _HA_ATOMIC_DEC(&p->queue->length);
 
-                                       if (p->queue->sv)
-                                               _HA_ATOMIC_DEC(&p->queue->sv->queueslength);
-                                       else
-                                               _HA_ATOMIC_DEC(&p->queue->px->queueslength);
+                                       if (p->queue->sv) {
+                                               if (_HA_ATOMIC_SUB_FETCH(&p->queue->sv->queueslength, 1) == 0) {
+                                                       HA_SPIN_LOCK(SERVER_LOCK, &p->queue->sv->state_lock);
+                                                       if (p->queue->sv->queueslength == 0)
+                                                               p->queue->sv->queues_not_empty = 0;
+                                                       HA_SPIN_UNLOCK(SERVER_LOCK, &p->queue->sv->state_lock);
+                                               }
+                                       } else {
+                                               if (_HA_ATOMIC_SUB_FETCH(&p->queue->px->queueslength, 1) == 0) {
+                                                       HA_SPIN_LOCK(PROXY_LOCK, &p->queue->px->lock);
+                                                       if (p->queue->px->queueslength == 0)
+                                                               p->queue->px->queues_not_empty = 0;
+                                                       HA_SPIN_UNLOCK(PROXY_LOCK, &p->queue->px->lock);
+                                               }
+                                       }
 
                                        _HA_ATOMIC_INC(&p->queue->idx);
                                        _HA_ATOMIC_DEC(&s->be->totpend);
index a852ae07dcf2c2fc7a655f788c88d63dd1b7e8c9..74a029f29bfa86c833499b73968a91438a340404 100644 (file)
@@ -532,7 +532,7 @@ struct server *chash_get_next_server(struct proxy *p, struct server *srvtoavoid)
                 * case we simply remember it for later use if needed.
                 */
                s = eb32_entry(node, struct tree_occ, node)->server;
-               if (!s->maxconn || (!s->queueslength && s->served < srv_dynamic_maxconn(s))) {
+               if (!s->maxconn || (!s->queueslength && server_has_room(s))) {
                        if (s != srvtoavoid) {
                                srv = s;
                                break;
index bac20d28bb764b9f3909b107d6360d0e01b8bb2a..defbb8d7040faad110c3d72013930fa2fd6d2d49 100644 (file)
@@ -322,7 +322,7 @@ struct server *fas_get_next_server(struct proxy *p, struct server *srvtoavoid)
                struct server *s;
 
                s = eb32_entry(node, struct server, lb_node);
-               if (!s->maxconn || (!s->queueslength && s->served < srv_dynamic_maxconn(s))) {
+               if (!s->maxconn || (!s->queueslength && server_has_room(s))) {
                        if (s != srvtoavoid) {
                                srv = s;
                                break;
index 20a679e3af5a681a7d908b0ae3a5762468919bea..f1bf52ad83a9a205c467762f4732a3b19b31c844 100644 (file)
@@ -797,7 +797,7 @@ redo:
                                eweight = _HA_ATOMIC_LOAD(&s->cur_eweight);
 
                                planned_inflight = tree_elt->lb_node.key * eweight / SRV_EWGHT_MAX;
-                               if (!s->maxconn || s->served + s->queueslength < srv_dynamic_maxconn(s) + s->maxqueue) {
+                               if (server_has_room(s)) {
                                        if (_HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queueslength) > planned_inflight + 2) {
                                                /*
                                                 * The server has more requests than expected,
index 26fca7c4aaae1814cf60ee3f0dce880f8e357aac..27758b80ad96c7b1780314595e4fb3fcf3526c09 100644 (file)
@@ -620,7 +620,7 @@ struct server *fwrr_get_next_server(struct proxy *p, struct server *srvtoavoid)
                fwrr_update_position(grp, srv, next_weight);
                fwrr_dequeue_srv(srv, tgid);
                grp->curr_pos++;
-               if (!srv->maxconn || (!srv->queueslength && srv->served < srv_dynamic_maxconn(srv))) {
+               if (!srv->maxconn || (!srv->queueslength && server_has_room(srv))) {
                        /* make sure it is not the server we are trying to exclude... */
                        if (srv != srvtoavoid || avoided)
                                break;
index 7dc27b444778e1d210acdb63c37bd810045822d7..7a57e93bc5220e39800056fa8ec6a6673f0c26fe 100644 (file)
@@ -230,7 +230,7 @@ struct server *map_get_server_rr(struct proxy *px, struct server *srvtoavoid)
        avoididx = 0; /* shut a gcc warning */
        do {
                srv = px->lbprm.map.srv[newidx++];
-               if (!srv->maxconn || (!srv->queueslength && srv->served < srv_dynamic_maxconn(srv))) {
+               if (!srv->maxconn || (!srv->queueslength && server_has_room(srv))) {
                        /* make sure it is not the server we are try to exclude... */
                        /* ...but remember that is was selected yet avoided */
                        avoided = srv;
index 1fa40ed34c5b0b9e0f44b31ed7221a5641f60579..8bad5e72bf2af9f00c7af856269ce799ba97e065 100644 (file)
@@ -197,11 +197,20 @@ void pendconn_unlink(struct pendconn *p)
                oldidx -= p->queue_idx;
                if (sv) {
                        p->strm->logs.srv_queue_pos += oldidx;
-                       _HA_ATOMIC_DEC(&sv->queueslength);
-               }
-               else {
+                       if (_HA_ATOMIC_FETCH_SUB(&sv->queueslength, 1) == 0) {
+                               HA_SPIN_LOCK(SERVER_LOCK, &sv->state_lock);
+                               if (sv->queueslength == 0)
+                                       sv->queues_not_empty = 0;
+                               HA_SPIN_UNLOCK(SERVER_LOCK, &sv->state_lock);
+                       }
+               } else {
                        p->strm->logs.prx_queue_pos += oldidx;
-                       _HA_ATOMIC_DEC(&px->queueslength);
+                       if (_HA_ATOMIC_FETCH_SUB(&px->queueslength, 1) == 0) {
+                               HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
+                               if (px->queueslength == 0)
+                                       px->queues_not_empty = 0;
+                               HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
+                       }
                }
 
                _HA_ATOMIC_DEC(&q->length);
@@ -350,7 +359,12 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
 
        _HA_ATOMIC_DEC(&px->per_tgrp[tgrp - 1].queue.length);
        _HA_ATOMIC_INC(&px->per_tgrp[tgrp - 1].queue.idx);
-       _HA_ATOMIC_DEC(&px->queueslength);
+       if (_HA_ATOMIC_SUB_FETCH(&px->queueslength, 1) == 0) {
+               HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
+               if (px->queueslength == 0)
+                       px->queues_not_empty = 0;
+               HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
+       }
        return 1;
 
  use_p:
@@ -372,7 +386,12 @@ static int pendconn_process_next_strm(struct server *srv, struct proxy *px, int
 
        _HA_ATOMIC_DEC(&srv->per_tgrp[tgrp - 1].queue.length);
        _HA_ATOMIC_INC(&srv->per_tgrp[tgrp - 1].queue.idx);
-       _HA_ATOMIC_DEC(&srv->queueslength);
+       if (_HA_ATOMIC_SUB_FETCH(&srv->queueslength, 1) == 0) {
+               HA_SPIN_LOCK(SERVER_LOCK, &srv->state_lock);
+               if (srv->queueslength == 0)
+                       srv->queues_not_empty = 0;
+               HA_SPIN_UNLOCK(SERVER_LOCK, &srv->state_lock);
+       }
        return 1;
 }
 
@@ -603,6 +622,17 @@ struct pendconn *pendconn_add(struct stream *strm)
        p->queue = q;
        p->queue_idx  = _HA_ATOMIC_LOAD(&q->idx) - 1; // for logging only
        new_max = _HA_ATOMIC_ADD_FETCH(queueslength, 1);
+       if (new_max == 1) {
+               if (srv) {
+                       HA_SPIN_LOCK(SERVER_LOCK, &srv->state_lock);
+                       srv->queues_not_empty = 1;
+                       HA_SPIN_UNLOCK(SERVER_LOCK, &srv->state_lock);
+               } else {
+                       HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
+                       px->queues_not_empty = 1;
+                       HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
+               }
+       }
        _HA_ATOMIC_INC(&q->length);
        old_max = _HA_ATOMIC_LOAD(max_ptr);
        while (new_max > old_max) {
index 4718257a12b38e96547b15cf157aec7957e78e83..819ab3dd09bb85fddf1fe1e740b46dc5b8fb7b17 100644 (file)
@@ -3259,6 +3259,8 @@ struct server *srv_drop(struct server *srv)
 
        HA_SPIN_DESTROY(&srv->lock);
 
+       HA_SPIN_DESTROY(&srv->state_lock);
+
        MT_LIST_DELETE(&srv->global_list);
        event_hdl_sub_list_destroy(&srv->e_subs);
 
@@ -3816,6 +3818,7 @@ static int _srv_parse_init(struct server **srv, char **args, int *cur_arg,
                } else
                        srv_settings_init(newsrv);
                HA_SPIN_INIT(&newsrv->lock);
+               HA_SPIN_INIT(&newsrv->state_lock);
        }
        else {
                /* This is a "default-server" line. Let's make certain the