EXTRA_COUNTERS(extra_counters_fe);
EXTRA_COUNTERS(extra_counters_be);
+ uint8_t queues_not_empty; /* Are the request queues not empty ? Only changed when the queues go from non-empty to empty, and vice-versa. Protected by proxy lock */
THREAD_ALIGN();
/* these ones change all the time */
int served; /* # of active sessions currently being served */
* for and if/else usage.
*/
static inline int may_dequeue_tasks(const struct server *s, const struct proxy *p) {
- return (s && (s->queueslength || (p->queueslength && srv_currently_usable(s))) &&
+ return (s && (s->queues_not_empty || (p->queues_not_empty && srv_currently_usable(s))) &&
(!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)));
}
struct sockaddr_storage socks4_addr; /* the address of the SOCKS4 Proxy, including the port */
EXTRA_COUNTERS(extra_counters);
+ __decl_thread(HA_SPINLOCK_T state_lock);/* protect the following state fields */
+ uint8_t queues_not_empty; /* Are the request queues not empty ? Only changed when the queues go from non-empty to empty, and vice-versa. Protected by the state_lock lock when changed */
};
/* data provided to EVENT_HDL_SUB_SERVER handlers through event_hdl facility */
* the backend's queue instead.
*/
if (curr &&
- (curr->queueslength || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr))))
+ (curr->queues_not_empty || (curr->maxconn && curr->served >= srv_dynamic_maxconn(curr))))
curr = NULL;
return curr;
_HA_ATOMIC_DEC(&p->queue->length);
- if (p->queue->sv)
- _HA_ATOMIC_DEC(&p->queue->sv->queueslength);
- else
- _HA_ATOMIC_DEC(&p->queue->px->queueslength);
+ if (p->queue->sv) {
+ if (_HA_ATOMIC_SUB_FETCH(&p->queue->sv->queueslength, 1) == 0) {
+ HA_SPIN_LOCK(SERVER_LOCK, &p->queue->sv->state_lock);
+ if (p->queue->sv->queueslength == 0)
+ p->queue->sv->queues_not_empty = 0;
+ HA_SPIN_UNLOCK(SERVER_LOCK, &p->queue->sv->state_lock);
+ }
+ } else {
+ if (_HA_ATOMIC_SUB_FETCH(&p->queue->px->queueslength, 1) == 0) {
+ HA_SPIN_LOCK(PROXY_LOCK, &p->queue->px->lock);
+ if (p->queue->px->queueslength == 0)
+ p->queue->px->queues_not_empty = 0;
+ HA_SPIN_UNLOCK(PROXY_LOCK, &p->queue->px->lock);
+ }
+ }
_HA_ATOMIC_INC(&p->queue->idx);
_HA_ATOMIC_DEC(&s->be->totpend);
* case we simply remember it for later use if needed.
*/
s = eb32_entry(node, struct tree_occ, node)->server;
- if (!s->maxconn || (!s->queueslength && s->served < srv_dynamic_maxconn(s))) {
+ if (!s->maxconn || (!s->queueslength && server_has_room(s))) {
if (s != srvtoavoid) {
srv = s;
break;
struct server *s;
s = eb32_entry(node, struct server, lb_node);
- if (!s->maxconn || (!s->queueslength && s->served < srv_dynamic_maxconn(s))) {
+ if (!s->maxconn || (!s->queueslength && server_has_room(s))) {
if (s != srvtoavoid) {
srv = s;
break;
eweight = _HA_ATOMIC_LOAD(&s->cur_eweight);
planned_inflight = tree_elt->lb_node.key * eweight / SRV_EWGHT_MAX;
- if (!s->maxconn || s->served + s->queueslength < srv_dynamic_maxconn(s) + s->maxqueue) {
+ if (server_has_room(s)) {
if (_HA_ATOMIC_LOAD(&s->served) + _HA_ATOMIC_LOAD(&s->queueslength) > planned_inflight + 2) {
/*
* The server has more requests than expected,
fwrr_update_position(grp, srv, next_weight);
fwrr_dequeue_srv(srv, tgid);
grp->curr_pos++;
- if (!srv->maxconn || (!srv->queueslength && srv->served < srv_dynamic_maxconn(srv))) {
+ if (!srv->maxconn || (!srv->queueslength && server_has_room(srv))) {
/* make sure it is not the server we are trying to exclude... */
if (srv != srvtoavoid || avoided)
break;
avoididx = 0; /* shut a gcc warning */
do {
srv = px->lbprm.map.srv[newidx++];
- if (!srv->maxconn || (!srv->queueslength && srv->served < srv_dynamic_maxconn(srv))) {
+ if (!srv->maxconn || (!srv->queueslength && server_has_room(srv))) {
/* make sure it is not the server we are try to exclude... */
/* ...but remember that is was selected yet avoided */
avoided = srv;
oldidx -= p->queue_idx;
if (sv) {
p->strm->logs.srv_queue_pos += oldidx;
- _HA_ATOMIC_DEC(&sv->queueslength);
- }
- else {
+ if (_HA_ATOMIC_FETCH_SUB(&sv->queueslength, 1) == 0) {
+ HA_SPIN_LOCK(SERVER_LOCK, &sv->state_lock);
+ if (sv->queueslength == 0)
+ sv->queues_not_empty = 0;
+ HA_SPIN_UNLOCK(SERVER_LOCK, &sv->state_lock);
+ }
+ } else {
p->strm->logs.prx_queue_pos += oldidx;
- _HA_ATOMIC_DEC(&px->queueslength);
+ if (_HA_ATOMIC_FETCH_SUB(&px->queueslength, 1) == 0) {
+ HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
+ if (px->queueslength == 0)
+ px->queues_not_empty = 0;
+ HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
+ }
}
_HA_ATOMIC_DEC(&q->length);
_HA_ATOMIC_DEC(&px->per_tgrp[tgrp - 1].queue.length);
_HA_ATOMIC_INC(&px->per_tgrp[tgrp - 1].queue.idx);
- _HA_ATOMIC_DEC(&px->queueslength);
+ if (_HA_ATOMIC_SUB_FETCH(&px->queueslength, 1) == 0) {
+ HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
+ if (px->queueslength == 0)
+ px->queues_not_empty = 0;
+ HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
+ }
return 1;
use_p:
_HA_ATOMIC_DEC(&srv->per_tgrp[tgrp - 1].queue.length);
_HA_ATOMIC_INC(&srv->per_tgrp[tgrp - 1].queue.idx);
- _HA_ATOMIC_DEC(&srv->queueslength);
+ if (_HA_ATOMIC_SUB_FETCH(&srv->queueslength, 1) == 0) {
+ HA_SPIN_LOCK(SERVER_LOCK, &srv->state_lock);
+ if (srv->queueslength == 0)
+ srv->queues_not_empty = 0;
+ HA_SPIN_UNLOCK(SERVER_LOCK, &srv->state_lock);
+ }
return 1;
}
p->queue = q;
p->queue_idx = _HA_ATOMIC_LOAD(&q->idx) - 1; // for logging only
new_max = _HA_ATOMIC_ADD_FETCH(queueslength, 1);
+ if (new_max == 1) {
+ if (srv) {
+ HA_SPIN_LOCK(SERVER_LOCK, &srv->state_lock);
+ srv->queues_not_empty = 1;
+ HA_SPIN_UNLOCK(SERVER_LOCK, &srv->state_lock);
+ } else {
+ HA_SPIN_LOCK(PROXY_LOCK, &px->lock);
+ px->queues_not_empty = 1;
+ HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock);
+ }
+ }
_HA_ATOMIC_INC(&q->length);
old_max = _HA_ATOMIC_LOAD(max_ptr);
while (new_max > old_max) {
HA_SPIN_DESTROY(&srv->lock);
+ HA_SPIN_DESTROY(&srv->state_lock);
+
MT_LIST_DELETE(&srv->global_list);
event_hdl_sub_list_destroy(&srv->e_subs);
} else
srv_settings_init(newsrv);
HA_SPIN_INIT(&newsrv->lock);
+ HA_SPIN_INIT(&newsrv->state_lock);
}
else {
/* This is a "default-server" line. Let's make certain the