]> git.kaiwu.me - haproxy.git/commitdiff
[BUG] fix the dequeuing logic to ensure that all requests get served
authorWilly Tarreau <w@1wt.eu>
Fri, 20 Jun 2008 13:04:11 +0000 (15:04 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 20 Jun 2008 14:21:28 +0000 (16:21 +0200)
The dequeuing logic was completely wrong. First, a task was assigned
to all servers to process the queue, but this task was never scheduled
and was only woken up on session free. Second, there was no reservation
of server entries when a task was assigned a server. This means that
as long as the task was not connected to the server, its presence was
not accounted for. This was causing trouble when detecting whether or
not a server had reached maxconn. Third, during a redispatch, a session
could lose its place at the server's and get blocked because another
session at the same moment would have stolen the entry. Fourth, the
redispatch option did not work when maxqueue was reached for a server,
and it was not possible to do so without indefinitely hanging a session.

The root cause of all those problems was the lack of pre-reservation of
connections at the server's, and the lack of tracking of servers during
a redispatch. Everything relied on combinations of flags which could
appear similarly in quite distinct situations.

This patch is a major rework but there was no other solution, as the
internal logic was deeply flawed. The resulting code is cleaner, more
understandable, uses less magics and is overall more robust.

As an added bonus, "option redispatch" now works when maxqueue has
been reached on a server.

12 files changed:
include/proto/queue.h
include/proto/session.h
include/types/server.h
include/types/session.h
src/backend.c
src/cfgparse.c
src/client.c
src/haproxy.c
src/proto_http.c
src/proto_uxst.c
src/queue.c
src/session.c

index 092747a03e7c2524d9cdcd44d2ae6f375961c5c9..6899aee4882462252b44164c1f4462b3ef2ae5ca 100644 (file)
@@ -2,7 +2,7 @@
   include/proto/queue.h
   This file defines everything related to queues.
 
-  Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu
+  Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
   
   This library is free software; you can redistribute it and/or
   modify it under the terms of the GNU Lesser General Public
@@ -38,7 +38,7 @@ int init_pendconn();
 struct session *pendconn_get_next_sess(struct server *srv, struct proxy *px);
 struct pendconn *pendconn_add(struct session *sess);
 void pendconn_free(struct pendconn *p);
-void process_srv_queue(struct task *t, struct timeval *next);
+void process_srv_queue(struct server *s);
 unsigned int srv_dynamic_maxconn(const struct server *s);
 
 
@@ -68,8 +68,7 @@ static inline struct pendconn *pendconn_from_px(const struct proxy *px) {
  */
 static inline int may_dequeue_tasks(const struct server *s, const struct proxy *p) {
        return (s && (s->nbpend || p->nbpend) &&
-               (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)) &&
-               s->queue_mgt);
+               (!s->maxconn || s->cur_sess < srv_dynamic_maxconn(s)));
 }
 
 #endif /* _PROTO_QUEUE_H */
index 4b86af28ed26ef3c34eca854879f3ae5430fe14d..348e6508618abb70e51992b03510bd7ef4475b9a 100644 (file)
@@ -2,7 +2,7 @@
   include/proto/session.h
   This file defines everything related to sessions.
 
-  Copyright (C) 2000-2007 Willy Tarreau - w@1wt.eu
+  Copyright (C) 2000-2008 Willy Tarreau - w@1wt.eu
   
   This library is free software; you can redistribute it and/or
   modify it under the terms of the GNU Lesser General Public
@@ -34,6 +34,7 @@ void session_free(struct session *s);
 int init_session();
 
 void session_process_counters(struct session *s);
+void sess_change_server(struct session *sess, struct server *newsrv);
 
 #endif /* _PROTO_SESSION_H */
 
index c7772be591ccf0604176c6112657aa78c2673a09..8606c89bfdda96dc5736abb11066524b3179671f 100644 (file)
@@ -76,12 +76,12 @@ struct server {
        char *cookie;                           /* the id set in the cookie */
 
        struct proxy *proxy;                    /* the proxy this server belongs to */
+       int served;                             /* # of active sessions currently being served (ie not pending) */
        int cur_sess, cur_sess_max;             /* number of currently active sessions (including syn_sent) */
        unsigned maxconn, minconn;              /* max # of active sessions (0 = unlimited), min# for dynamic limit. */
        int nbpend, nbpend_max;                 /* number of pending connections */
        int maxqueue;                           /* maximum number of pending connections allowed */
        struct list pendconns;                  /* pending connections */
-       struct task *queue_mgt;                 /* the task associated to the queue processing */
 
        struct sockaddr_in addr;                /* the address to connect to */
        struct sockaddr_in source_addr;         /* the address to which we want to bind for connect() */
index cd354055fd916fd29661e0b96e022da59f94d1bd..9304bd399ef483d2dd4a40adff11935e87d03791 100644 (file)
 /* WARNING: if new fields are added, they must be initialized in event_accept()
  * and freed in session_free() !
  */
+
+/*
+ * Note: some session flags have dependencies :
+ *  - SN_DIRECT cannot exist without SN_ASSIGNED, because a server is
+ *    immediately assigned when SN_DIRECT is determined. Both must be cleared
+ *    when clearing SN_DIRECT (eg: redispatch).
+ *  - ->srv has no meaning without SN_ASSIGNED and must not be checked without
+ *    it. ->prev_srv should be used to check previous ->srv. If SN_ASSIGNED is
+ *    set and sess->srv is NULL, then it is a dispatch or proxy mode.
+ *  - a session being processed has srv_conn set.
+ *  - srv_conn might remain after SN_DIRECT has been reset, but the assigned
+ *    server should eventually be released.
+ */
 struct session {
        struct task *task;                      /* the task associated with this session */
        /* application specific below */
@@ -93,7 +106,9 @@ struct session {
        struct sockaddr_storage cli_addr;       /* the client address */
        struct sockaddr_storage frt_addr;       /* the frontend address reached by the client if SN_FRT_ADDR_SET is set */
        struct sockaddr_in srv_addr;            /* the address to connect to */
-       struct server *srv;                     /* the server being used */
+       struct server *srv;                     /* the server the session will be running or has been running on */
+       struct server *srv_conn;                /* session already has a slot on a server and is not in queue */
+       struct server *prev_srv;                /* the server the was running on, after a redispatch, otherwise NULL */
        struct pendconn *pend_pos;              /* if not NULL, points to the position in the pending queue */
        struct http_txn txn;                    /* current HTTP transaction being processed. Should become a list. */
        struct {
index e0b7ee533e2b56f853c5d905227f48fbe175c2c8..119dceaa1469655ee81b30672f2a98a5570fc439 100644 (file)
@@ -19,6 +19,7 @@
 
 #include <common/compat.h>
 #include <common/config.h>
+#include <common/debug.h>
 #include <common/eb32tree.h>
 #include <common/time.h>
 
@@ -38,6 +39,7 @@
 #include <proto/log.h>
 #include <proto/proto_http.h>
 #include <proto/queue.h>
+#include <proto/session.h>
 #include <proto/stream_sock.h>
 #include <proto/task.h>
 
@@ -782,7 +784,7 @@ static struct server *fwrr_get_next_server(struct proxy *p)
                fwrr_update_position(grp, srv);
                fwrr_dequeue_srv(srv);
                grp->curr_pos++;
-               if (!srv->maxconn || srv->cur_sess < srv_dynamic_maxconn(srv))
+               if (!srv->maxconn || (!srv->nbpend && srv->served < srv_dynamic_maxconn(srv)))
                        break;
 
                /* the server is saturated, let's chain it for later reinsertion */
@@ -882,98 +884,150 @@ struct server *get_server_ph(struct proxy *px, const char *uri, int uri_len)
 }
 
 /*
- * This function marks the session as 'assigned' in direct or dispatch modes,
- * or tries to assign one in balance mode, according to the algorithm. It does
- * nothing if the session had already been assigned a server.
+ * This function applies the load-balancing algorithm to the session, as
+ * defined by the backend it is assigned to. The session is then marked as
+ * 'assigned'.
+ *
+ * This function MAY NOT be called with SN_ASSIGNED already set. If the session
+ * had a server previously assigned, it is rebalanced, trying to avoid the same
+ * server.
+ * The function tries to keep the original connection slot if it reconnects to
+ * the same server, otherwise it releases it and tries to offer it.
+ *
+ * It is illegal to call this function with a session in a queue.
  *
  * It may return :
- *   SRV_STATUS_OK       if everything is OK. s->srv will be valid.
- *   SRV_STATUS_NOSRV    if no server is available. s->srv = NULL.
- *   SRV_STATUS_FULL     if all servers are saturated. s->srv = NULL.
+ *   SRV_STATUS_OK       if everything is OK. Session assigned to ->srv
+ *   SRV_STATUS_NOSRV    if no server is available. Session is not ASSIGNED
+ *   SRV_STATUS_FULL     if all servers are saturated. Session is not ASSIGNED
  *   SRV_STATUS_INTERNAL for other unrecoverable errors.
  *
- * Upon successful return, the session flag SN_ASSIGNED to indicate that it does
- * not need to be called anymore. This usually means that s->srv can be trusted
- * in balance and direct modes. This flag is not cleared, so it's to the caller
- * to clear it if required (eg: redispatch).
+ * Upon successful return, the session flag SN_ASSIGNED is set to indicate that
+ * it does not need to be called anymore. This means that s->srv can be trusted
+ * in balance and direct modes.
  *
  */
 
 int assign_server(struct session *s)
 {
+       struct server *conn_slot;
+       int err;
+
 #ifdef DEBUG_FULL
        fprintf(stderr,"assign_server : s=%p\n",s);
 #endif
 
-       if (s->pend_pos)
-               return SRV_STATUS_INTERNAL;
+       err = SRV_STATUS_INTERNAL;
+       if (unlikely(s->pend_pos || s->flags & SN_ASSIGNED))
+               goto out_err;
 
-       if (!(s->flags & SN_ASSIGNED)) {
-               if (s->be->lbprm.algo & BE_LB_ALGO) {
-                       int len;
-               
-                       if (s->flags & SN_DIRECT) {
-                               s->flags |= SN_ASSIGNED;
-                               return SRV_STATUS_OK;
-                       }
+       s->prev_srv = s->prev_srv;
+       conn_slot = s->srv_conn;
 
-                       if (!s->be->lbprm.tot_weight)
-                               return SRV_STATUS_NOSRV;
+       /* We have to release any connection slot before applying any LB algo,
+        * otherwise we may erroneously end up with no available slot.
+        */
+       if (conn_slot)
+               sess_change_server(s, NULL);
 
-                       switch (s->be->lbprm.algo & BE_LB_ALGO) {
-                       case BE_LB_ALGO_RR:
-                               s->srv = fwrr_get_next_server(s->be);
-                               if (!s->srv)
-                                       return SRV_STATUS_FULL;
-                               break;
-                       case BE_LB_ALGO_SH:
-                               if (s->cli_addr.ss_family == AF_INET)
-                                       len = 4;
-                               else if (s->cli_addr.ss_family == AF_INET6)
-                                       len = 16;
-                               else /* unknown IP family */
-                                       return SRV_STATUS_INTERNAL;
+       /* We will now try to find the good server and store it into <s->srv>.
+        * Note that <s->srv> may be NULL in case of dispatch or proxy mode,
+        * as well as if no server is available (check error code).
+        */
+
+       s->srv = NULL;
+       if (s->be->lbprm.algo & BE_LB_ALGO) {
+               int len;
+               /* we must check if we have at least one server available */
+               if (!s->be->lbprm.tot_weight) {
+                       err = SRV_STATUS_NOSRV;
+                       goto out;
+               }
+
+               switch (s->be->lbprm.algo & BE_LB_ALGO) {
+               case BE_LB_ALGO_RR:
+                       s->srv = fwrr_get_next_server(s->be);
+                       if (!s->srv) {
+                               err = SRV_STATUS_FULL;
+                               goto out;
+                       }
+                       break;
+               case BE_LB_ALGO_SH:
+                       if (s->cli_addr.ss_family == AF_INET)
+                               len = 4;
+                       else if (s->cli_addr.ss_family == AF_INET6)
+                               len = 16;
+                       else {
+                               /* unknown IP family */
+                               err = SRV_STATUS_INTERNAL;
+                               goto out;
+                       }
                
-                               s->srv = get_server_sh(s->be,
-                                                      (void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr,
-                                                      len);
-                               break;
-                       case BE_LB_ALGO_UH:
-                               /* URI hashing */
-                               s->srv = get_server_uh(s->be,
-                                                      s->txn.req.sol + s->txn.req.sl.rq.u,
-                                                      s->txn.req.sl.rq.u_l);
-                               break;
-                       case BE_LB_ALGO_PH:
-                               /* URL Parameter hashing */
-                               s->srv = get_server_ph(s->be,
-                                                      s->txn.req.sol + s->txn.req.sl.rq.u,
-                                                      s->txn.req.sl.rq.u_l);
+                       s->srv = get_server_sh(s->be,
+                                              (void *)&((struct sockaddr_in *)&s->cli_addr)->sin_addr,
+                                              len);
+                       break;
+               case BE_LB_ALGO_UH:
+                       /* URI hashing */
+                       s->srv = get_server_uh(s->be,
+                                              s->txn.req.sol + s->txn.req.sl.rq.u,
+                                              s->txn.req.sl.rq.u_l);
+                       break;
+               case BE_LB_ALGO_PH:
+                       /* URL Parameter hashing */
+                       s->srv = get_server_ph(s->be,
+                                              s->txn.req.sol + s->txn.req.sl.rq.u,
+                                              s->txn.req.sl.rq.u_l);
+
+                       if (!s->srv) {
+                               /* parameter not found, fall back to round robin on the map */
+                               s->srv = get_server_rr_with_conns(s->be);
                                if (!s->srv) {
-                                       /* parameter not found, fall back to round robin on the map */
-                                       s->srv = get_server_rr_with_conns(s->be);
-                                       if (!s->srv)
-                                               return SRV_STATUS_FULL;
+                                       err = SRV_STATUS_FULL;
+                                       goto out;
                                }
-                               break;
-                       default:
-                               /* unknown balancing algorithm */
-                               return SRV_STATUS_INTERNAL;
                        }
+                       break;
+               default:
+                       /* unknown balancing algorithm */
+                       err = SRV_STATUS_INTERNAL;
+                       goto out;
+               }
+               if (s->srv != s->prev_srv) {
                        s->be->cum_lbconn++;
                        s->srv->cum_lbconn++;
                }
-               else if (s->be->options & PR_O_HTTP_PROXY) {
-                       if (!s->srv_addr.sin_addr.s_addr)
-                               return SRV_STATUS_NOSRV;
+       }
+       else if (s->be->options & PR_O_HTTP_PROXY) {
+               if (!s->srv_addr.sin_addr.s_addr) {
+                       err = SRV_STATUS_NOSRV;
+                       goto out;
                }
-               else if (!*(int *)&s->be->dispatch_addr.sin_addr &&
-                        !(s->fe->options & PR_O_TRANSP)) {
-                       return SRV_STATUS_NOSRV;
+       }
+       else if (!*(int *)&s->be->dispatch_addr.sin_addr &&
+                !(s->fe->options & PR_O_TRANSP)) {
+               err = SRV_STATUS_NOSRV;
+               goto out;
+       }
+
+       s->flags |= SN_ASSIGNED;
+       err = SRV_STATUS_OK;
+ out:
+
+       /* Either we take back our connection slot, or we offer it to someone
+        * else if we don't need it anymore.
+        */
+       if (conn_slot) {
+               if (conn_slot == s->srv) {
+                       sess_change_server(s, s->srv);
+               } else {
+                       if (may_dequeue_tasks(conn_slot, s->be))
+                               process_srv_queue(conn_slot);
                }
-               s->flags |= SN_ASSIGNED;
        }
-       return SRV_STATUS_OK;
+
+ out_err:
+       return err;
 }
 
 
@@ -1046,6 +1100,11 @@ int assign_server_address(struct session *s)
 
 /* This function assigns a server to session <s> if required, and can add the
  * connection to either the assigned server's queue or to the proxy's queue.
+ * If ->srv_conn is set, the session is first released from the server.
+ * It may also be called with SN_DIRECT and/or SN_ASSIGNED though. It will
+ * be called before any connection and after any retry or redispatch occurs.
+ *
+ * It is not allowed to call this function with a session in a queue.
  *
  * Returns :
  *
@@ -1053,7 +1112,8 @@ int assign_server_address(struct session *s)
  *   SRV_STATUS_NOSRV    if no server is available. s->srv = NULL.
  *   SRV_STATUS_QUEUED   if the connection has been queued.
  *   SRV_STATUS_FULL     if the server(s) is/are saturated and the
- *                       connection could not be queued.
+ *                       connection could not be queued in s->srv,
+ *                       which may be NULL if we queue on the backend.
  *   SRV_STATUS_INTERNAL for other unrecoverable errors.
  *
  */
@@ -1065,41 +1125,65 @@ int assign_server_and_queue(struct session *s)
        if (s->pend_pos)
                return SRV_STATUS_INTERNAL;
 
-       if (s->flags & SN_ASSIGNED) {
-               if (s->srv && s->srv->maxqueue > 0 && s->srv->nbpend >= s->srv->maxqueue) {
-                       s->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
-                       s->srv = NULL;
-                       http_flush_cookie_flags(&s->txn);
-               } else {
-                       /* a server does not need to be assigned, perhaps because we're in
-                        * direct mode, or in dispatch or transparent modes where the server
-                        * is not needed.
+       err = SRV_STATUS_OK;
+       if (!(s->flags & SN_ASSIGNED)) {
+               err = assign_server(s);
+               if (s->prev_srv) {
+                       /* This session was previously assigned to a server. We have to
+                        * update the session's and the server's stats :
+                        *  - if the server changed :
+                        *    - set TX_CK_DOWN if txn.flags was TX_CK_VALID
+                        *    - increment srv->redispatches and be->redispatches
+                        *  - if the server remained the same : update retries.
                         */
-                       if (s->srv &&
-                           s->srv->maxconn && s->srv->cur_sess >= srv_dynamic_maxconn(s->srv)) {
-                               p = pendconn_add(s);
-                               if (p)
-                                       return SRV_STATUS_QUEUED;
-                               else
-                                       return SRV_STATUS_FULL;
+                       if (s->prev_srv != s->srv) {
+                               if ((s->txn.flags & TX_CK_MASK) == TX_CK_VALID) {
+                                       s->txn.flags &= ~TX_CK_MASK;
+                                       s->txn.flags |= TX_CK_DOWN;
+                               }
+                               s->be->redispatches++;
+                       } else {
+                               s->prev_srv->retries++;
+                               s->be->retries++;
                        }
-                       return SRV_STATUS_OK;
                }
        }
 
-       /* a server needs to be assigned */
-       err = assign_server(s);
        switch (err) {
        case SRV_STATUS_OK:
-               /* in balance mode, we might have servers with connection limits */
-               if (s->srv &&
-                   s->srv->maxconn && s->srv->cur_sess >= srv_dynamic_maxconn(s->srv)) {
+               /* we have SN_ASSIGNED set */
+               if (!s->srv)
+                       return SRV_STATUS_OK;   /* dispatch or proxy mode */
+
+               /* If we already have a connection slot, no need to check any queue */
+               if (s->srv_conn == s->srv)
+                       return SRV_STATUS_OK;
+
+               /* OK, this session already has an assigned server, but no
+                * connection slot yet. Either it is a redispatch, or it was
+                * assigned from persistence information (direct mode).
+                */
+
+               /* We might have to queue this session if the assigned server is full.
+                * We know we have to queue it into the server's queue, so if a maxqueue
+                * is set on the server, we must also check that the server's queue is
+                * not full, in which case we have to return FULL.
+                */
+               if (s->srv->maxconn &&
+                   (s->srv->nbpend || s->srv->served >= srv_dynamic_maxconn(s->srv))) {
+
+                       if (s->srv->maxqueue > 0 && s->srv->nbpend >= s->srv->maxqueue)
+                               return SRV_STATUS_FULL;
+
                        p = pendconn_add(s);
                        if (p)
                                return SRV_STATUS_QUEUED;
                        else
-                               return SRV_STATUS_FULL;
+                               return SRV_STATUS_INTERNAL;
                }
+
+               /* OK, we can use this server. Let's reserve our place */
+               sess_change_server(s, s->srv);
                return SRV_STATUS_OK;
 
        case SRV_STATUS_FULL:
@@ -1108,7 +1192,7 @@ int assign_server_and_queue(struct session *s)
                if (p)
                        return SRV_STATUS_QUEUED;
                else
-                       return SRV_STATUS_FULL;
+                       return SRV_STATUS_INTERNAL;
 
        case SRV_STATUS_NOSRV:
        case SRV_STATUS_INTERNAL:
@@ -1366,7 +1450,7 @@ int srv_count_retry_down(struct session *t, int conn_err)
                 * we have to inform the server that it may be used by another session.
                 */
                if (may_dequeue_tasks(t->srv, t->be))
-                       task_wakeup(t->srv->queue_mgt);
+                       process_srv_queue(t->srv);
                return 1;
        }
        return 0;
@@ -1409,7 +1493,7 @@ int srv_retryable_connect(struct session *t)
                        t->be->failed_conns++;
                        /* release other sessions waiting for this server */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
                        return 1;
                }
                /* ensure that we have enough retries left */
@@ -1423,7 +1507,7 @@ int srv_retryable_connect(struct session *t)
         */
        /* let's try to offer this slot to anybody */
        if (may_dequeue_tasks(t->srv, t->be))
-               task_wakeup(t->srv->queue_mgt);
+               process_srv_queue(t->srv);
 
        if (t->srv)
                t->srv->cum_sess++;
@@ -1432,8 +1516,7 @@ int srv_retryable_connect(struct session *t)
        t->be->redispatches++;
 
        t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
-       t->srv = NULL; /* it's left to the dispatcher to choose a server */
-       http_flush_cookie_flags(&t->txn);
+       t->prev_srv = t->srv;
        return 0;
 }
 
@@ -1453,11 +1536,34 @@ int srv_redispatch_connect(struct session *t)
        /* We know that we don't have any connection pending, so we will
         * try to get a new one, and wait in this state if it's queued
         */
+ redispatch:
        conn_err = assign_server_and_queue(t);
        switch (conn_err) {
        case SRV_STATUS_OK:
                break;
 
+       case SRV_STATUS_FULL:
+               /* The server has reached its maxqueue limit. Either PR_O_REDISP is set
+                * and we can redispatch to another server, or it is not and we return
+                * 503. This only makes sense in DIRECT mode however, because normal LB
+                * algorithms would never select such a server, and hash algorithms
+                * would bring us on the same server again. Note that t->srv is set in
+                * this case.
+                */
+               if ((t->flags & SN_DIRECT) && (t->be->options & PR_O_REDISP)) {
+                       t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
+                       t->prev_srv = t->srv;
+                       goto redispatch;
+               }
+
+               tv_eternity(&t->req->cex);
+               srv_close_with_err(t, SN_ERR_SRVTO, SN_FINST_Q,
+                                  503, error_message(t, HTTP_ERR_503));
+
+               t->srv->failed_conns++;
+               t->be->failed_conns++;
+               return 1;
+
        case SRV_STATUS_NOSRV:
                /* note: it is guaranteed that t->srv == NULL here */
                tv_eternity(&t->req->cex);
@@ -1479,7 +1585,6 @@ int srv_redispatch_connect(struct session *t)
                /* do nothing else and do not wake any other session up */
                return 1;
 
-       case SRV_STATUS_FULL:
        case SRV_STATUS_INTERNAL:
        default:
                tv_eternity(&t->req->cex);
@@ -1493,7 +1598,7 @@ int srv_redispatch_connect(struct session *t)
 
                /* release other sessions waiting for this server */
                if (may_dequeue_tasks(t->srv, t->be))
-                       task_wakeup(t->srv->queue_mgt);
+                       process_srv_queue(t->srv);
                return 1;
        }
        /* if we get here, it's because we got SRV_STATUS_OK, which also
index 2fd5007cdf18095feccc6e5c5ad8316f3c897742..32f5d65df59259843f836b1f3a7ae7822dad7905 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Configuration parser
  *
- * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
+ * Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
@@ -2873,26 +2873,6 @@ int readcfgfile(const char *file)
                                      file, proxy_type_str(curproxy), curproxy->id, linenum);
                                goto err;
                        }
-
-                       if (newsrv->maxconn > 0) {
-                               struct task *t;
-
-                               if ((t = pool_alloc2(pool2_task)) == NULL) {
-                                       Alert("parsing [%s:%d] : out of memory.\n", file, linenum);
-                                       goto err;
-                               }
-               
-                               t->qlist.p = NULL;
-                               t->wq = NULL;
-                               t->state = TASK_IDLE;
-                               t->process = process_srv_queue;
-                               t->context = newsrv;
-                               newsrv->queue_mgt = t;
-
-                               /* never run it unless specifically woken up */
-                               tv_eternity(&t->expire);
-                               task_queue(t);
-                       }
                        newsrv = newsrv->next;
                }
 
index 6db22e7469a6f15c229c486cf4c38b7941fd02c0..b0366e199fcb223018ed2cf0356008a321e6c131 100644 (file)
@@ -188,7 +188,7 @@ int event_accept(int fd) {
 
                s->cli_fd = cfd;
                s->srv_fd = -1;
-               s->srv = NULL;
+               s->srv = s->prev_srv = s->srv_conn = NULL;
                s->pend_pos = NULL;
                s->conn_retries = s->be->conn_retries;
 
index f4f7b17a77589a7b255a8910bd62cb8c452de48a..38d49e7bb76c69b359b62f60f4c55a99345889c9 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * HA-Proxy : High Availability-enabled HTTP/TCP proxy
- * Copyright 2000-2007  Willy Tarreau <w@1wt.eu>.
+ * Copyright 2000-2008  Willy Tarreau <w@1wt.eu>.
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
index 5d893415905a4b97142ab2cda0022826bcf06ec0..25ed54c5b1349e112b58e129622a56073bdb7539 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * HTTP protocol analyzer
  *
- * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
+ * Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
@@ -2437,9 +2437,9 @@ int process_srv(struct session *t)
                         * to any other session to release it and wake us up again.
                         */
                        if (t->pend_pos) {
-                               if (!tv_isle(&req->cex, &now))
+                               if (!tv_isle(&req->cex, &now)) {
                                        return 0;
-                               else {
+                               else {
                                        /* we've been waiting too long here */
                                        tv_eternity(&req->cex);
                                        t->logs.t_queue = tv_ms_elapsed(&t->logs.tv_accept, &now);
@@ -2474,8 +2474,10 @@ int process_srv(struct session *t)
                      t->be->options & PR_O_ABRT_CLOSE))) { /* give up */
                        tv_eternity(&req->cex);
                        fd_delete(t->srv_fd);
-                       if (t->srv)
+                       if (t->srv) {
                                t->srv->cur_sess--;
+                               sess_change_server(t, NULL);
+                       }
 
                        /* note that this must not return any error because it would be able to
                         * overwrite the client_retnclose() output.
@@ -2492,8 +2494,10 @@ int process_srv(struct session *t)
                        //fprintf(stderr,"2: c=%d, s=%d\n", c, s);
 
                        fd_delete(t->srv_fd);
-                       if (t->srv)
+                       if (t->srv) {
                                t->srv->cur_sess--;
+                               sess_change_server(t, NULL);
+                       }
 
                        if (!(req->flags & BF_WRITE_STATUS))
                                conn_err = SN_ERR_SRVTO; // it was a connect timeout.
@@ -2510,13 +2514,14 @@ int process_srv(struct session *t)
                                 */
                                /* let's try to offer this slot to anybody */
                                if (may_dequeue_tasks(t->srv, t->be))
-                                       task_wakeup(t->srv->queue_mgt);
+                                       process_srv_queue(t->srv);
 
                                if (t->srv)
                                        t->srv->failed_conns++;
                                t->be->redispatches++;
 
                                t->flags &= ~(SN_DIRECT | SN_ASSIGNED | SN_ADDR_SET);
+                               t->prev_srv = t->srv;
                                t->srv = NULL; /* it's left to the dispatcher to choose a server */
                                http_flush_cookie_flags(txn);
 
@@ -2678,6 +2683,7 @@ int process_srv(struct session *t)
                                if (t->srv) {
                                        t->srv->cur_sess--;
                                        t->srv->failed_resp++;
+                                       sess_change_server(t, NULL);
                                }
                                t->be->failed_resp++;
                                t->srv_state = SV_STCLOSE;
@@ -2691,7 +2697,7 @@ int process_srv(struct session *t)
                                 * we have to inform the server that it may be used by another session.
                                 */
                                if (t->srv && may_dequeue_tasks(t->srv, t->be))
-                                       task_wakeup(t->srv->queue_mgt);
+                                       process_srv_queue(t->srv);
 
                                return 1;
                        }
@@ -2720,6 +2726,7 @@ int process_srv(struct session *t)
                                if (t->srv) {
                                        t->srv->cur_sess--;
                                        t->srv->failed_resp++;
+                                       sess_change_server(t, NULL);
                                }
                                t->be->failed_resp++;
                                t->srv_state = SV_STCLOSE;
@@ -2733,7 +2740,7 @@ int process_srv(struct session *t)
                                 * we have to inform the server that it may be used by another session.
                                 */
                                if (t->srv && may_dequeue_tasks(t->srv, t->be))
-                                       task_wakeup(t->srv->queue_mgt);
+                                       process_srv_queue(t->srv);
                                return 1;
                        }
 
@@ -2888,6 +2895,7 @@ int process_srv(struct session *t)
                                        if (t->srv) {
                                                t->srv->cur_sess--;
                                                t->srv->failed_resp++;
+                                               sess_change_server(t, NULL);
                                        }
                                        cur_proxy->failed_resp++;
                                return_srv_prx_502:
@@ -2905,7 +2913,7 @@ int process_srv(struct session *t)
                                         * we have to inform the server that it may be used by another session.
                                         */
                                        if (t->srv && may_dequeue_tasks(t->srv, cur_proxy))
-                                               task_wakeup(t->srv->queue_mgt);
+                                               process_srv_queue(t->srv);
                                        return 1;
                                }
                        }
@@ -2915,6 +2923,7 @@ int process_srv(struct session *t)
                                if (t->srv) {
                                        t->srv->cur_sess--;
                                        t->srv->failed_secu++;
+                                       sess_change_server(t, NULL);
                                }
                                cur_proxy->denied_resp++;
                                goto return_srv_prx_502;
@@ -3050,6 +3059,7 @@ int process_srv(struct session *t)
                        if (t->srv) {
                                t->srv->cur_sess--;
                                t->srv->failed_secu++;
+                               sess_change_server(t, NULL);
                        }
                        t->be->denied_resp++;
 
@@ -3137,6 +3147,7 @@ int process_srv(struct session *t)
                        if (t->srv) {
                                t->srv->cur_sess--;
                                t->srv->failed_resp++;
+                               sess_change_server(t, NULL);
                        }
                        t->be->failed_resp++;
                        t->srv_state = SV_STCLOSE;
@@ -3148,7 +3159,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -3244,6 +3255,7 @@ int process_srv(struct session *t)
                        if (t->srv) {
                                t->srv->cur_sess--;
                                t->srv->failed_resp++;
+                               sess_change_server(t, NULL);
                        }
                        t->be->failed_resp++;
                        //close(t->srv_fd);
@@ -3256,7 +3268,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -3264,15 +3276,17 @@ int process_srv(struct session *t)
                        //EV_FD_CLR(t->srv_fd, DIR_WR);
                        buffer_shutw(req);
                        fd_delete(t->srv_fd);
-                       if (t->srv)
+                       if (t->srv) {
                                t->srv->cur_sess--;
+                               sess_change_server(t, NULL);
+                       }
                        //close(t->srv_fd);
                        t->srv_state = SV_STCLOSE;
                        /* We used to have a free connection slot. Since we'll never use it,
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -3280,8 +3294,10 @@ int process_srv(struct session *t)
                        //EV_FD_CLR(t->srv_fd, DIR_WR);
                        buffer_shutw(req);
                        fd_delete(t->srv_fd);
-                       if (t->srv)
+                       if (t->srv) {
                                t->srv->cur_sess--;
+                               sess_change_server(t, NULL);
+                       }
                        //close(t->srv_fd);
                        t->srv_state = SV_STCLOSE;
                        if (!(t->flags & SN_ERR_MASK))
@@ -3292,7 +3308,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -3319,6 +3335,7 @@ int process_srv(struct session *t)
                        if (t->srv) {
                                t->srv->cur_sess--;
                                t->srv->failed_resp++;
+                               sess_change_server(t, NULL);
                        }
                        t->be->failed_resp++;
                        //close(t->srv_fd);
@@ -3331,7 +3348,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -3339,15 +3356,17 @@ int process_srv(struct session *t)
                        //EV_FD_CLR(t->srv_fd, DIR_RD);
                        buffer_shutr(rep);
                        fd_delete(t->srv_fd);
-                       if (t->srv)
+                       if (t->srv) {
                                t->srv->cur_sess--;
+                               sess_change_server(t, NULL);
+                       }
                        //close(t->srv_fd);
                        t->srv_state = SV_STCLOSE;
                        /* We used to have a free connection slot. Since we'll never use it,
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -3355,8 +3374,10 @@ int process_srv(struct session *t)
                        //EV_FD_CLR(t->srv_fd, DIR_RD);
                        buffer_shutr(rep);
                        fd_delete(t->srv_fd);
-                       if (t->srv)
+                       if (t->srv) {
                                t->srv->cur_sess--;
+                               sess_change_server(t, NULL);
+                       }
                        //close(t->srv_fd);
                        t->srv_state = SV_STCLOSE;
                        if (!(t->flags & SN_ERR_MASK))
@@ -3367,7 +3388,7 @@ int process_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
index 870347661aaf2c6ce76ac550bf69f64b481c9452..c8e844b1fc5b9997500c0440339c0739ef520577 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * UNIX SOCK_STREAM protocol layer (uxst)
  *
- * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
+ * Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
@@ -915,7 +915,7 @@ static int process_uxst_srv(struct session *t)
                                 */
                                /* let's try to offer this slot to anybody */
                                if (may_dequeue_tasks(t->srv, t->be))
-                                       task_wakeup(t->srv->queue_mgt);
+                                       process_srv_queue(t->srv);
 
                                if (t->srv)
                                        t->srv->failed_conns++;
@@ -1002,7 +1002,7 @@ static int process_uxst_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -1110,7 +1110,7 @@ static int process_uxst_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -1126,7 +1126,7 @@ static int process_uxst_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -1146,7 +1146,7 @@ static int process_uxst_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -1185,7 +1185,7 @@ static int process_uxst_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -1201,7 +1201,7 @@ static int process_uxst_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
@@ -1221,7 +1221,7 @@ static int process_uxst_srv(struct session *t)
                         * we have to inform the server that it may be used by another session.
                         */
                        if (may_dequeue_tasks(t->srv, t->be))
-                               task_wakeup(t->srv->queue_mgt);
+                               process_srv_queue(t->srv);
 
                        return 1;
                }
index 4d383f03f9143cd0929195299eed3591adac8235..33f9867b822903d4f300e30ceb07a5f8935fa4b6 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Queue management functions.
  *
- * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
+ * Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
@@ -63,36 +63,34 @@ unsigned int srv_dynamic_maxconn(const struct server *s)
 
 
 /*
- * Manages a server's connection queue. If woken up, will try to dequeue as
- * many pending sessions as possible, and wake them up. The task has nothing
- * else to do, so it always returns ETERNITY.
+ * Manages a server's connection queue. This function will try to dequeue as
+ * many pending sessions as possible, and wake them up.
  */
-void process_srv_queue(struct task *t, struct timeval *next)
+void process_srv_queue(struct server *s)
 {
-       struct server *s = (struct server*)t->context;
        struct proxy  *p = s->proxy;
-       int xferred;
+       int maxconn;
 
        /* First, check if we can handle some connections queued at the proxy. We
         * will take as many as we can handle.
         */
-       for (xferred = 0; s->cur_sess + xferred < srv_dynamic_maxconn(s); xferred++) {
-               struct session *sess;
 
-               sess = pendconn_get_next_sess(s, p);
+       maxconn = srv_dynamic_maxconn(s);
+       while (s->served < maxconn) {
+               struct session *sess = pendconn_get_next_sess(s, p);
                if (sess == NULL)
                        break;
                task_wakeup(sess->task);
        }
-
-       tv_eternity(next);
 }
 
 /* Detaches the next pending connection from either a server or a proxy, and
  * returns its associated session. If no pending connection is found, NULL is
- * returned. Note that neither <srv> nor <px> can be NULL.
+ * returned. Note that neither <srv> nor <px> may be NULL.
  * Priority is given to the oldest request in the queue if both <srv> and <px>
  * have pending requests. This ensures that no request will be left unserved.
+ * The session is immediately marked as "assigned", and both its <srv> and
+ * <srv_conn> are set to <srv>,
  */
 struct session *pendconn_get_next_sess(struct server *srv, struct proxy *px)
 {
@@ -114,13 +112,20 @@ struct session *pendconn_get_next_sess(struct server *srv, struct proxy *px)
        }
        sess = ps->sess;
        pendconn_free(ps);
+
+       /* we want to note that the session has now been assigned a server */
+       sess->flags |= SN_ASSIGNED;
+       sess->srv = srv;
+       sess->srv_conn = srv;
+       srv->served++;
        return sess;
 }
 
 /* Adds the session <sess> to the pending connection list of server <sess>->srv
  * or to the one of <sess>->proxy if srv is NULL. All counters and back pointers
  * are updated accordingly. Returns NULL if no memory is available, otherwise the
- * pendconn itself.
+ * pendconn itself. If the session was already marked as served, its flag is
+ * cleared. It is illegal to call this function with a non-NULL sess->srv_conn.
  */
 struct pendconn *pendconn_add(struct session *sess)
 {
@@ -133,7 +138,8 @@ struct pendconn *pendconn_add(struct session *sess)
        sess->pend_pos = p;
        p->sess = sess;
        p->srv  = sess->srv;
-       if (sess->srv) {
+
+       if (sess->flags & SN_ASSIGNED && sess->srv) {
                LIST_ADDQ(&sess->srv->pendconns, &p->list);
                sess->srv->nbpend++;
                sess->logs.srv_queue_size += sess->srv->nbpend;
index 594f7df1fa3d17e85a9599cb0c3832f29d18ff4b..b70d47ca2b2bf3014d04e718576aea9ba9869cf1 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Server management functions.
  *
- * Copyright 2000-2007 Willy Tarreau <w@1wt.eu>
+ * Copyright 2000-2008 Willy Tarreau <w@1wt.eu>
  *
  * This program is free software; you can redistribute it and/or
  * modify it under the terms of the GNU General Public License
@@ -13,6 +13,7 @@
 #include <stdlib.h>
 
 #include <common/config.h>
+#include <common/debug.h>
 #include <common/memory.h>
 
 #include <types/backend.h>
@@ -40,6 +41,16 @@ void session_free(struct session *s)
 
        if (s->pend_pos)
                pendconn_free(s->pend_pos);
+       if (s->srv)  /* there may be requests left pending in queue */
+               process_srv_queue(s->srv);
+       if (unlikely(s->srv_conn)) {
+               /* the session still has a reserved slot on a server, but
+                * it should normally be only the same as the one above,
+                * so this should not happen in fact.
+                */
+               sess_change_server(s, NULL);
+       }
+
        if (s->req)
                pool_free2(pool2_buffer, s->req);
        if (s->rep)
@@ -135,6 +146,30 @@ void session_process_counters(struct session *s)
        }
 }
 
+/*
+ * This function adjusts sess->srv_conn and maintains the previous and new
+ * server's served session counts. Setting newsrv to NULL is enough to release
+ * current connection slot. This function also notifies any LB algo which might
+ * expect to be informed about any change in the number of active sessions on a
+ * server.
+ */
+void sess_change_server(struct session *sess, struct server *newsrv)
+{
+       if (sess->srv_conn == newsrv)
+               return;
+
+       if (sess->srv_conn) {
+               sess->srv_conn->served--;
+               sess->srv_conn = NULL;
+       }
+
+       if (newsrv) {
+               newsrv->served++;
+               sess->srv_conn = newsrv;
+       }
+}
+
+
 /*
  * Local variables:
  *  c-indent-level: 8