aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--auto/make57
-rw-r--r--auto/modules36
-rw-r--r--auto/options24
-rw-r--r--auto/sources34
-rw-r--r--src/core/ngx_log.c2
-rw-r--r--src/core/ngx_log.h3
-rw-r--r--src/stream/ngx_stream.c557
-rw-r--r--src/stream/ngx_stream.h215
-rw-r--r--src/stream/ngx_stream_core_module.c495
-rw-r--r--src/stream/ngx_stream_handler.c296
-rw-r--r--src/stream/ngx_stream_proxy_module.c1288
-rw-r--r--src/stream/ngx_stream_ssl_module.c456
-rw-r--r--src/stream/ngx_stream_ssl_module.h49
-rw-r--r--src/stream/ngx_stream_upstream.c462
-rw-r--r--src/stream/ngx_stream_upstream.h103
-rw-r--r--src/stream/ngx_stream_upstream_hash_module.c657
-rw-r--r--src/stream/ngx_stream_upstream_least_conn_module.c305
-rw-r--r--src/stream/ngx_stream_upstream_round_robin.c697
-rw-r--r--src/stream/ngx_stream_upstream_round_robin.h138
-rw-r--r--src/stream/ngx_stream_upstream_zone_module.c207
20 files changed, 6079 insertions, 2 deletions
diff --git a/auto/make b/auto/make
index ed94e8f62..7e3c4454c 100644
--- a/auto/make
+++ b/auto/make
@@ -10,6 +10,7 @@ mkdir -p $NGX_OBJS/src/core $NGX_OBJS/src/event $NGX_OBJS/src/event/modules \
$NGX_OBJS/src/http $NGX_OBJS/src/http/modules \
$NGX_OBJS/src/http/modules/perl \
$NGX_OBJS/src/mail \
+ $NGX_OBJS/src/stream \
$NGX_OBJS/src/misc
@@ -121,6 +122,32 @@ END
fi
+# the stream dependences and include paths
+
+if [ $STREAM = YES ]; then
+
+ ngx_all_srcs="$ngx_all_srcs $STREAM_SRCS"
+
+ ngx_deps=`echo $STREAM_DEPS \
+ | sed -e "s/ *\([^ ][^ ]*\)/$ngx_regex_cont\1/g" \
+ -e "s/\//$ngx_regex_dirsep/g"`
+
+ ngx_incs=`echo $STREAM_INCS \
+ | sed -e "s/ *\([^ ][^ ]*\)/$ngx_regex_cont$ngx_include_opt\1/g" \
+ -e "s/\//$ngx_regex_dirsep/g"`
+
+ cat << END >> $NGX_MAKEFILE
+
+STREAM_DEPS = $ngx_deps
+
+
+STREAM_INCS = $ngx_include_opt$ngx_incs
+
+END
+
+fi
+
+
ngx_all_srcs="$ngx_all_srcs $NGX_MISC_SRCS"
@@ -306,6 +333,36 @@ END
fi
+# the stream sources
+
+if [ $STREAM = YES ]; then
+
+ if test -n "$NGX_PCH"; then
+ ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) $ngx_use_pch \$(ALL_INCS)"
+ else
+ ngx_cc="\$(CC) $ngx_compile_opt \$(CFLAGS) \$(CORE_INCS) \$(STREAM_INCS)"
+ fi
+
+ for ngx_src in $STREAM_SRCS
+ do
+ ngx_src=`echo $ngx_src | sed -e "s/\//$ngx_regex_dirsep/g"`
+ ngx_obj=`echo $ngx_src \
+ | sed -e "s#^\(.*\.\)cpp\\$#$ngx_objs_dir\1$ngx_objext#g" \
+ -e "s#^\(.*\.\)cc\\$#$ngx_objs_dir\1$ngx_objext#g" \
+ -e "s#^\(.*\.\)c\\$#$ngx_objs_dir\1$ngx_objext#g" \
+ -e "s#^\(.*\.\)S\\$#$ngx_objs_dir\1$ngx_objext#g"`
+
+ cat << END >> $NGX_MAKEFILE
+
+$ngx_obj: \$(CORE_DEPS) \$(STREAM_DEPS)$ngx_cont$ngx_src
+ $ngx_cc$ngx_tab$ngx_objout$ngx_obj$ngx_tab$ngx_src$NGX_AUX
+
+END
+ done
+
+fi
+
+
# the misc sources
if test -n "$NGX_MISC_SRCS"; then
diff --git a/auto/modules b/auto/modules
index a029cdd5f..482cb714b 100644
--- a/auto/modules
+++ b/auto/modules
@@ -435,6 +435,12 @@ if [ $MAIL_SSL = YES ]; then
fi
+if [ $STREAM_SSL = YES ]; then
+ have=NGX_STREAM_SSL . auto/have
+ USE_OPENSSL=YES
+fi
+
+
modules="$CORE_MODULES $EVENT_MODULES"
@@ -505,6 +511,36 @@ if [ $MAIL = YES ]; then
fi
+if [ $STREAM = YES ]; then
+ have=NGX_STREAM . auto/have
+ modules="$modules $STREAM_MODULES"
+
+ if [ $STREAM_SSL = YES ]; then
+ modules="$modules $STREAM_SSL_MODULE"
+ STREAM_DEPS="$STREAM_DEPS $STREAM_SSL_DEPS"
+ STREAM_SRCS="$STREAM_SRCS $STREAM_SSL_SRCS"
+ fi
+
+ if [ $STREAM_UPSTREAM_HASH = YES ]; then
+ modules="$modules $STREAM_UPSTREAM_HASH_MODULE"
+ STREAM_SRCS="$STREAM_SRCS $STREAM_UPSTREAM_HASH_SRCS"
+ fi
+
+ if [ $STREAM_UPSTREAM_LEAST_CONN = YES ]; then
+ modules="$modules $STREAM_UPSTREAM_LEAST_CONN_MODULE"
+ STREAM_SRCS="$STREAM_SRCS $STREAM_UPSTREAM_LEAST_CONN_SRCS"
+ fi
+
+ if [ $STREAM_UPSTREAM_ZONE = YES ]; then
+ have=NGX_STREAM_UPSTREAM_ZONE . auto/have
+ modules="$modules $STREAM_UPSTREAM_ZONE_MODULE"
+ STREAM_SRCS="$STREAM_SRCS $STREAM_UPSTREAM_ZONE_SRCS"
+ fi
+
+ NGX_ADDON_DEPS="$NGX_ADDON_DEPS \$(STREAM_DEPS)"
+fi
+
+
if [ $NGX_GOOGLE_PERFTOOLS = YES ]; then
modules="$modules $NGX_GOOGLE_PERFTOOLS_MODULE"
NGX_MISC_SRCS="$NGX_MISC_SRCS $NGX_GOOGLE_PERFTOOLS_SRCS"
diff --git a/auto/options b/auto/options
index 7a3909a7d..62f7d1829 100644
--- a/auto/options
+++ b/auto/options
@@ -114,6 +114,12 @@ MAIL_POP3=YES
MAIL_IMAP=YES
MAIL_SMTP=YES
+STREAM=NO
+STREAM_SSL=NO
+STREAM_UPSTREAM_HASH=YES
+STREAM_UPSTREAM_LEAST_CONN=YES
+STREAM_UPSTREAM_ZONE=YES
+
NGX_ADDONS=
USE_PCRE=NO
@@ -275,6 +281,15 @@ use the \"--without-http_limit_conn_module\" option instead"
--without-mail_imap_module) MAIL_IMAP=NO ;;
--without-mail_smtp_module) MAIL_SMTP=NO ;;
+ --with-stream) STREAM=YES ;;
+ --with-stream_ssl_module) STREAM_SSL=YES ;;
+ --without-stream_upstream_hash_module)
+ STREAM_UPSTREAM_HASH=NO ;;
+ --without-stream_upstream_least_conn_module)
+ STREAM_UPSTREAM_LEAST_CONN=NO ;;
+ --without-stream_upstream_zone_module)
+ STREAM_UPSTREAM_ZONE=NO ;;
+
--with-google_perftools_module) NGX_GOOGLE_PERFTOOLS=YES ;;
--with-cpp_test_module) NGX_CPP_TEST=YES ;;
@@ -436,6 +451,15 @@ cat << END
--without-mail_imap_module disable ngx_mail_imap_module
--without-mail_smtp_module disable ngx_mail_smtp_module
+ --with-stream enable TCP proxy module
+ --with-stream_ssl_module enable ngx_stream_ssl_module
+ --without-stream_upstream_hash_module
+ disable ngx_stream_upstream_hash_module
+ --without-stream_upstream_least_conn_module
+ disable ngx_stream_upstream_least_conn_module
+ --without-stream_upstream_zone_module
+ disable ngx_stream_upstream_zone_module
+
--with-google_perftools_module enable ngx_google_perftools_module
--with-cpp_test_module enable ngx_cpp_test_module
diff --git a/auto/sources b/auto/sources
index 021a767b9..d824cb9ed 100644
--- a/auto/sources
+++ b/auto/sources
@@ -554,6 +554,40 @@ MAIL_AUTH_HTTP_SRCS="src/mail/ngx_mail_auth_http_module.c"
MAIL_PROXY_MODULE="ngx_mail_proxy_module"
MAIL_PROXY_SRCS="src/mail/ngx_mail_proxy_module.c"
+
+STREAM_INCS="src/stream"
+
+STREAM_DEPS="src/stream/ngx_stream.h \
+ src/stream/ngx_stream_upstream.h \
+ src/stream/ngx_stream_upstream_round_robin.h"
+
+STREAM_MODULES="ngx_stream_module \
+ ngx_stream_core_module \
+ ngx_stream_proxy_module \
+ ngx_stream_upstream_module"
+
+STREAM_SRCS="src/stream/ngx_stream.c \
+ src/stream/ngx_stream_handler.c \
+ src/stream/ngx_stream_core_module.c \
+ src/stream/ngx_stream_proxy_module.c \
+ src/stream/ngx_stream_upstream.c \
+ src/stream/ngx_stream_upstream_round_robin.c"
+
+STREAM_SSL_MODULE="ngx_stream_ssl_module"
+STREAM_SSL_DEPS="src/stream/ngx_stream_ssl_module.h"
+STREAM_SSL_SRCS="src/stream/ngx_stream_ssl_module.c"
+
+STREAM_UPSTREAM_HASH_MODULE=ngx_stream_upstream_hash_module
+STREAM_UPSTREAM_HASH_SRCS=src/stream/ngx_stream_upstream_hash_module.c
+
+STREAM_UPSTREAM_LEAST_CONN_MODULE=ngx_stream_upstream_least_conn_module
+STREAM_UPSTREAM_LEAST_CONN_SRCS=" \
+ src/stream/ngx_stream_upstream_least_conn_module.c"
+
+STREAM_UPSTREAM_ZONE_MODULE=ngx_stream_upstream_zone_module
+STREAM_UPSTREAM_ZONE_SRCS=src/stream/ngx_stream_upstream_zone_module.c
+
+
NGX_GOOGLE_PERFTOOLS_MODULE=ngx_google_perftools_module
NGX_GOOGLE_PERFTOOLS_SRCS=src/misc/ngx_google_perftools_module.c
diff --git a/src/core/ngx_log.c b/src/core/ngx_log.c
index bf0050885..2aea37440 100644
--- a/src/core/ngx_log.c
+++ b/src/core/ngx_log.c
@@ -86,7 +86,7 @@ static ngx_str_t err_levels[] = {
static const char *debug_levels[] = {
"debug_core", "debug_alloc", "debug_mutex", "debug_event",
- "debug_http", "debug_mail", "debug_mysql"
+ "debug_http", "debug_mail", "debug_mysql", "debug_stream"
};
diff --git a/src/core/ngx_log.h b/src/core/ngx_log.h
index 6b04b7876..cb80b5f83 100644
--- a/src/core/ngx_log.h
+++ b/src/core/ngx_log.h
@@ -30,6 +30,7 @@
#define NGX_LOG_DEBUG_HTTP 0x100
#define NGX_LOG_DEBUG_MAIL 0x200
#define NGX_LOG_DEBUG_MYSQL 0x400
+#define NGX_LOG_DEBUG_STREAM 0x800
/*
* do not forget to update debug_levels[] in src/core/ngx_log.c
@@ -37,7 +38,7 @@
*/
#define NGX_LOG_DEBUG_FIRST NGX_LOG_DEBUG_CORE
-#define NGX_LOG_DEBUG_LAST NGX_LOG_DEBUG_MYSQL
+#define NGX_LOG_DEBUG_LAST NGX_LOG_DEBUG_STREAM
#define NGX_LOG_DEBUG_CONNECTION 0x80000000
#define NGX_LOG_DEBUG_ALL 0x7ffffff0
diff --git a/src/stream/ngx_stream.c b/src/stream/ngx_stream.c
new file mode 100644
index 000000000..f982170c1
--- /dev/null
+++ b/src/stream/ngx_stream.c
@@ -0,0 +1,557 @@
+
+/*
+ * Copyright (C) Roman Arutyunyan
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_event.h>
+#include <ngx_stream.h>
+
+
+static char *ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
+static ngx_int_t ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
+ ngx_stream_listen_t *listen);
+static char *ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports);
+static ngx_int_t ngx_stream_add_addrs(ngx_conf_t *cf, ngx_stream_port_t *stport,
+ ngx_stream_conf_addr_t *addr);
+#if (NGX_HAVE_INET6)
+static ngx_int_t ngx_stream_add_addrs6(ngx_conf_t *cf,
+ ngx_stream_port_t *stport, ngx_stream_conf_addr_t *addr);
+#endif
+static ngx_int_t ngx_stream_cmp_conf_addrs(const void *one, const void *two);
+
+
+ngx_uint_t ngx_stream_max_module;
+
+
+static ngx_command_t ngx_stream_commands[] = {
+
+ { ngx_string("stream"),
+ NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
+ ngx_stream_block,
+ 0,
+ 0,
+ NULL },
+
+ ngx_null_command
+};
+
+
+static ngx_core_module_t ngx_stream_module_ctx = {
+ ngx_string("stream"),
+ NULL,
+ NULL
+};
+
+
+ngx_module_t ngx_stream_module = {
+ NGX_MODULE_V1,
+ &ngx_stream_module_ctx, /* module context */
+ ngx_stream_commands, /* module directives */
+ NGX_CORE_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ NULL, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ NULL, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+
+static char *
+ngx_stream_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ char *rv;
+ ngx_uint_t i, m, mi, s;
+ ngx_conf_t pcf;
+ ngx_array_t ports;
+ ngx_stream_listen_t *listen;
+ ngx_stream_module_t *module;
+ ngx_stream_conf_ctx_t *ctx;
+ ngx_stream_core_srv_conf_t **cscfp;
+ ngx_stream_core_main_conf_t *cmcf;
+
+ /* the main stream context */
+
+ ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t));
+ if (ctx == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ *(ngx_stream_conf_ctx_t **) conf = ctx;
+
+ /* count the number of the stream modules and set up their indices */
+
+ ngx_stream_max_module = 0;
+ for (m = 0; ngx_modules[m]; m++) {
+ if (ngx_modules[m]->type != NGX_STREAM_MODULE) {
+ continue;
+ }
+
+ ngx_modules[m]->ctx_index = ngx_stream_max_module++;
+ }
+
+
+ /* the stream main_conf context, it's the same in the all stream contexts */
+
+ ctx->main_conf = ngx_pcalloc(cf->pool,
+ sizeof(void *) * ngx_stream_max_module);
+ if (ctx->main_conf == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+
+ /*
+ * the stream null srv_conf context, it is used to merge
+ * the server{}s' srv_conf's
+ */
+
+ ctx->srv_conf = ngx_pcalloc(cf->pool,
+ sizeof(void *) * ngx_stream_max_module);
+ if (ctx->srv_conf == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+
+ /*
+ * create the main_conf's and the null srv_conf's of the all stream modules
+ */
+
+ for (m = 0; ngx_modules[m]; m++) {
+ if (ngx_modules[m]->type != NGX_STREAM_MODULE) {
+ continue;
+ }
+
+ module = ngx_modules[m]->ctx;
+ mi = ngx_modules[m]->ctx_index;
+
+ if (module->create_main_conf) {
+ ctx->main_conf[mi] = module->create_main_conf(cf);
+ if (ctx->main_conf[mi] == NULL) {
+ return NGX_CONF_ERROR;
+ }
+ }
+
+ if (module->create_srv_conf) {
+ ctx->srv_conf[mi] = module->create_srv_conf(cf);
+ if (ctx->srv_conf[mi] == NULL) {
+ return NGX_CONF_ERROR;
+ }
+ }
+ }
+
+
+ /* parse inside the stream{} block */
+
+ pcf = *cf;
+ cf->ctx = ctx;
+
+ cf->module_type = NGX_STREAM_MODULE;
+ cf->cmd_type = NGX_STREAM_MAIN_CONF;
+ rv = ngx_conf_parse(cf, NULL);
+
+ if (rv != NGX_CONF_OK) {
+ *cf = pcf;
+ return rv;
+ }
+
+
+ /* init stream{} main_conf's, merge the server{}s' srv_conf's */
+
+ cmcf = ctx->main_conf[ngx_stream_core_module.ctx_index];
+ cscfp = cmcf->servers.elts;
+
+ for (m = 0; ngx_modules[m]; m++) {
+ if (ngx_modules[m]->type != NGX_STREAM_MODULE) {
+ continue;
+ }
+
+ module = ngx_modules[m]->ctx;
+ mi = ngx_modules[m]->ctx_index;
+
+ /* init stream{} main_conf's */
+
+ cf->ctx = ctx;
+
+ if (module->init_main_conf) {
+ rv = module->init_main_conf(cf, ctx->main_conf[mi]);
+ if (rv != NGX_CONF_OK) {
+ *cf = pcf;
+ return rv;
+ }
+ }
+
+ for (s = 0; s < cmcf->servers.nelts; s++) {
+
+ /* merge the server{}s' srv_conf's */
+
+ cf->ctx = cscfp[s]->ctx;
+
+ if (module->merge_srv_conf) {
+ rv = module->merge_srv_conf(cf,
+ ctx->srv_conf[mi],
+ cscfp[s]->ctx->srv_conf[mi]);
+ if (rv != NGX_CONF_OK) {
+ *cf = pcf;
+ return rv;
+ }
+ }
+ }
+ }
+
+ *cf = pcf;
+
+
+ if (ngx_array_init(&ports, cf->temp_pool, 4, sizeof(ngx_stream_conf_port_t))
+ != NGX_OK)
+ {
+ return NGX_CONF_ERROR;
+ }
+
+ listen = cmcf->listen.elts;
+
+ for (i = 0; i < cmcf->listen.nelts; i++) {
+ if (ngx_stream_add_ports(cf, &ports, &listen[i]) != NGX_OK) {
+ return NGX_CONF_ERROR;
+ }
+ }
+
+ return ngx_stream_optimize_servers(cf, &ports);
+}
+
+
+static ngx_int_t
+ngx_stream_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
+ ngx_stream_listen_t *listen)
+{
+ in_port_t p;
+ ngx_uint_t i;
+ struct sockaddr *sa;
+ struct sockaddr_in *sin;
+ ngx_stream_conf_port_t *port;
+ ngx_stream_conf_addr_t *addr;
+#if (NGX_HAVE_INET6)
+ struct sockaddr_in6 *sin6;
+#endif
+
+ sa = (struct sockaddr *) &listen->sockaddr;
+
+ switch (sa->sa_family) {
+
+#if (NGX_HAVE_INET6)
+ case AF_INET6:
+ sin6 = (struct sockaddr_in6 *) sa;
+ p = sin6->sin6_port;
+ break;
+#endif
+
+#if (NGX_HAVE_UNIX_DOMAIN)
+ case AF_UNIX:
+ p = 0;
+ break;
+#endif
+
+ default: /* AF_INET */
+ sin = (struct sockaddr_in *) sa;
+ p = sin->sin_port;
+ break;
+ }
+
+ port = ports->elts;
+ for (i = 0; i < ports->nelts; i++) {
+ if (p == port[i].port && sa->sa_family == port[i].family) {
+
+ /* a port is already in the port list */
+
+ port = &port[i];
+ goto found;
+ }
+ }
+
+ /* add a port to the port list */
+
+ port = ngx_array_push(ports);
+ if (port == NULL) {
+ return NGX_ERROR;
+ }
+
+ port->family = sa->sa_family;
+ port->port = p;
+
+ if (ngx_array_init(&port->addrs, cf->temp_pool, 2,
+ sizeof(ngx_stream_conf_addr_t))
+ != NGX_OK)
+ {
+ return NGX_ERROR;
+ }
+
+found:
+
+ addr = ngx_array_push(&port->addrs);
+ if (addr == NULL) {
+ return NGX_ERROR;
+ }
+
+ addr->sockaddr = (struct sockaddr *) &listen->sockaddr;
+ addr->socklen = listen->socklen;
+ addr->ctx = listen->ctx;
+ addr->bind = listen->bind;
+ addr->wildcard = listen->wildcard;
+ addr->so_keepalive = listen->so_keepalive;
+#if (NGX_HAVE_KEEPALIVE_TUNABLE)
+ addr->tcp_keepidle = listen->tcp_keepidle;
+ addr->tcp_keepintvl = listen->tcp_keepintvl;
+ addr->tcp_keepcnt = listen->tcp_keepcnt;
+#endif
+#if (NGX_STREAM_SSL)
+ addr->ssl = listen->ssl;
+#endif
+#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
+ addr->ipv6only = listen->ipv6only;
+#endif
+
+ return NGX_OK;
+}
+
+
+static char *
+ngx_stream_optimize_servers(ngx_conf_t *cf, ngx_array_t *ports)
+{
+ ngx_uint_t i, p, last, bind_wildcard;
+ ngx_listening_t *ls;
+ ngx_stream_port_t *stport;
+ ngx_stream_conf_port_t *port;
+ ngx_stream_conf_addr_t *addr;
+ ngx_stream_core_srv_conf_t *cscf;
+
+ port = ports->elts;
+ for (p = 0; p < ports->nelts; p++) {
+
+ ngx_sort(port[p].addrs.elts, (size_t) port[p].addrs.nelts,
+ sizeof(ngx_stream_conf_addr_t), ngx_stream_cmp_conf_addrs);
+
+ addr = port[p].addrs.elts;
+ last = port[p].addrs.nelts;
+
+ /*
+ * if there is the binding to the "*:port" then we need to bind()
+ * to the "*:port" only and ignore the other bindings
+ */
+
+ if (addr[last - 1].wildcard) {
+ addr[last - 1].bind = 1;
+ bind_wildcard = 1;
+
+ } else {
+ bind_wildcard = 0;
+ }
+
+ i = 0;
+
+ while (i < last) {
+
+ if (bind_wildcard && !addr[i].bind) {
+ i++;
+ continue;
+ }
+
+ ls = ngx_create_listening(cf, addr[i].sockaddr, addr[i].socklen);
+ if (ls == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ls->addr_ntop = 1;
+ ls->handler = ngx_stream_init_connection;
+ ls->pool_size = 256;
+
+ cscf = addr->ctx->srv_conf[ngx_stream_core_module.ctx_index];
+ ls->logp = cscf->error_log;
+
+ ls->log.data = &ls->addr_text;
+ ls->log.handler = ngx_accept_log_error;
+
+ ls->keepalive = addr[i].so_keepalive;
+#if (NGX_HAVE_KEEPALIVE_TUNABLE)
+ ls->keepidle = addr[i].tcp_keepidle;
+ ls->keepintvl = addr[i].tcp_keepintvl;
+ ls->keepcnt = addr[i].tcp_keepcnt;
+#endif
+
+#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
+ ls->ipv6only = addr[i].ipv6only;
+#endif
+
+ stport = ngx_palloc(cf->pool, sizeof(ngx_stream_port_t));
+ if (stport == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ls->servers = stport;
+
+ if (i == last - 1) {
+ stport->naddrs = last;
+
+ } else {
+ stport->naddrs = 1;
+ i = 0;
+ }
+
+ switch (ls->sockaddr->sa_family) {
+#if (NGX_HAVE_INET6)
+ case AF_INET6:
+ if (ngx_stream_add_addrs6(cf, stport, addr) != NGX_OK) {
+ return NGX_CONF_ERROR;
+ }
+ break;
+#endif
+ default: /* AF_INET */
+ if (ngx_stream_add_addrs(cf, stport, addr) != NGX_OK) {
+ return NGX_CONF_ERROR;
+ }
+ break;
+ }
+
+ addr++;
+ last--;
+ }
+ }
+
+ return NGX_CONF_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_add_addrs(ngx_conf_t *cf, ngx_stream_port_t *stport,
+ ngx_stream_conf_addr_t *addr)
+{
+ u_char *p;
+ size_t len;
+ ngx_uint_t i;
+ struct sockaddr_in *sin;
+ ngx_stream_in_addr_t *addrs;
+ u_char buf[NGX_SOCKADDR_STRLEN];
+
+ stport->addrs = ngx_pcalloc(cf->pool,
+ stport->naddrs * sizeof(ngx_stream_in_addr_t));
+ if (stport->addrs == NULL) {
+ return NGX_ERROR;
+ }
+
+ addrs = stport->addrs;
+
+ for (i = 0; i < stport->naddrs; i++) {
+
+ sin = (struct sockaddr_in *) addr[i].sockaddr;
+ addrs[i].addr = sin->sin_addr.s_addr;
+
+ addrs[i].conf.ctx = addr[i].ctx;
+#if (NGX_STREAM_SSL)
+ addrs[i].conf.ssl = addr[i].ssl;
+#endif
+
+ len = ngx_sock_ntop(addr[i].sockaddr, addr[i].socklen, buf,
+ NGX_SOCKADDR_STRLEN, 1);
+
+ p = ngx_pnalloc(cf->pool, len);
+ if (p == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_memcpy(p, buf, len);
+
+ addrs[i].conf.addr_text.len = len;
+ addrs[i].conf.addr_text.data = p;
+ }
+
+ return NGX_OK;
+}
+
+
+#if (NGX_HAVE_INET6)
+
+static ngx_int_t
+ngx_stream_add_addrs6(ngx_conf_t *cf, ngx_stream_port_t *stport,
+ ngx_stream_conf_addr_t *addr)
+{
+ u_char *p;
+ size_t len;
+ ngx_uint_t i;
+ struct sockaddr_in6 *sin6;
+ ngx_stream_in6_addr_t *addrs6;
+ u_char buf[NGX_SOCKADDR_STRLEN];
+
+ stport->addrs = ngx_pcalloc(cf->pool,
+ stport->naddrs * sizeof(ngx_stream_in6_addr_t));
+ if (stport->addrs == NULL) {
+ return NGX_ERROR;
+ }
+
+ addrs6 = stport->addrs;
+
+ for (i = 0; i < stport->naddrs; i++) {
+
+ sin6 = (struct sockaddr_in6 *) addr[i].sockaddr;
+ addrs6[i].addr6 = sin6->sin6_addr;
+
+ addrs6[i].conf.ctx = addr[i].ctx;
+#if (NGX_STREAM_SSL)
+ addrs6[i].conf.ssl = addr[i].ssl;
+#endif
+
+ len = ngx_sock_ntop(addr[i].sockaddr, addr[i].socklen, buf,
+ NGX_SOCKADDR_STRLEN, 1);
+
+ p = ngx_pnalloc(cf->pool, len);
+ if (p == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_memcpy(p, buf, len);
+
+ addrs6[i].conf.addr_text.len = len;
+ addrs6[i].conf.addr_text.data = p;
+ }
+
+ return NGX_OK;
+}
+
+#endif
+
+
+static ngx_int_t
+ngx_stream_cmp_conf_addrs(const void *one, const void *two)
+{
+ ngx_stream_conf_addr_t *first, *second;
+
+ first = (ngx_stream_conf_addr_t *) one;
+ second = (ngx_stream_conf_addr_t *) two;
+
+ if (first->wildcard) {
+ /* a wildcard must be the last resort, shift it to the end */
+ return 1;
+ }
+
+ if (second->wildcard) {
+ /* a wildcard must be the last resort, shift it to the end */
+ return -1;
+ }
+
+ if (first->bind && !second->bind) {
+ /* shift explicit bind()ed addresses to the start */
+ return -1;
+ }
+
+ if (!first->bind && second->bind) {
+ /* shift explicit bind()ed addresses to the start */
+ return 1;
+ }
+
+ /* do not sort by default */
+
+ return 0;
+}
diff --git a/src/stream/ngx_stream.h b/src/stream/ngx_stream.h
new file mode 100644
index 000000000..6ac1fd586
--- /dev/null
+++ b/src/stream/ngx_stream.h
@@ -0,0 +1,215 @@
+
+/*
+ * Copyright (C) Roman Arutyunyan
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#ifndef _NGX_STREAM_H_INCLUDED_
+#define _NGX_STREAM_H_INCLUDED_
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+
+#if (NGX_STREAM_SSL)
+#include <ngx_stream_ssl_module.h>
+#endif
+
+
+typedef struct ngx_stream_session_s ngx_stream_session_t;
+
+
+#include <ngx_stream_upstream.h>
+#include <ngx_stream_upstream_round_robin.h>
+
+
+typedef struct {
+ void **main_conf;
+ void **srv_conf;
+} ngx_stream_conf_ctx_t;
+
+
+typedef struct {
+ u_char sockaddr[NGX_SOCKADDRLEN];
+ socklen_t socklen;
+
+ /* server ctx */
+ ngx_stream_conf_ctx_t *ctx;
+
+ unsigned bind:1;
+ unsigned wildcard:1;
+#if (NGX_STREAM_SSL)
+ unsigned ssl:1;
+#endif
+#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
+ unsigned ipv6only:1;
+#endif
+ unsigned so_keepalive:2;
+#if (NGX_HAVE_KEEPALIVE_TUNABLE)
+ int tcp_keepidle;
+ int tcp_keepintvl;
+ int tcp_keepcnt;
+#endif
+} ngx_stream_listen_t;
+
+
+typedef struct {
+ ngx_stream_conf_ctx_t *ctx;
+ ngx_str_t addr_text;
+#if (NGX_STREAM_SSL)
+ ngx_uint_t ssl; /* unsigned ssl:1; */
+#endif
+} ngx_stream_addr_conf_t;
+
+typedef struct {
+ in_addr_t addr;
+ ngx_stream_addr_conf_t conf;
+} ngx_stream_in_addr_t;
+
+
+#if (NGX_HAVE_INET6)
+
+typedef struct {
+ struct in6_addr addr6;
+ ngx_stream_addr_conf_t conf;
+} ngx_stream_in6_addr_t;
+
+#endif
+
+
+typedef struct {
+ /* ngx_stream_in_addr_t or ngx_stream_in6_addr_t */
+ void *addrs;
+ ngx_uint_t naddrs;
+} ngx_stream_port_t;
+
+
+typedef struct {
+ int family;
+ in_port_t port;
+ ngx_array_t addrs; /* array of ngx_stream_conf_addr_t */
+} ngx_stream_conf_port_t;
+
+
+typedef struct {
+ struct sockaddr *sockaddr;
+ socklen_t socklen;
+
+ ngx_stream_conf_ctx_t *ctx;
+
+ unsigned bind:1;
+ unsigned wildcard:1;
+#if (NGX_STREAM_SSL)
+ unsigned ssl:1;
+#endif
+#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
+ unsigned ipv6only:1;
+#endif
+ unsigned so_keepalive:2;
+#if (NGX_HAVE_KEEPALIVE_TUNABLE)
+ int tcp_keepidle;
+ int tcp_keepintvl;
+ int tcp_keepcnt;
+#endif
+} ngx_stream_conf_addr_t;
+
+
+typedef struct {
+ ngx_array_t servers; /* ngx_stream_core_srv_conf_t */
+ ngx_array_t listen; /* ngx_stream_listen_t */
+} ngx_stream_core_main_conf_t;
+
+
+typedef void (*ngx_stream_handler_pt)(ngx_stream_session_t *s);
+
+
+typedef struct {
+ ngx_stream_handler_pt handler;
+ ngx_stream_conf_ctx_t *ctx;
+ u_char *file_name;
+ ngx_int_t line;
+ ngx_log_t *error_log;
+} ngx_stream_core_srv_conf_t;
+
+
+struct ngx_stream_session_s {
+ uint32_t signature; /* "STRM" */
+
+ ngx_connection_t *connection;
+
+ off_t received;
+
+ ngx_log_handler_pt log_handler;
+
+ void **ctx;
+ void **main_conf;
+ void **srv_conf;
+
+ ngx_stream_upstream_t *upstream;
+};
+
+
+typedef struct {
+ void *(*create_main_conf)(ngx_conf_t *cf);
+ char *(*init_main_conf)(ngx_conf_t *cf, void *conf);
+
+ void *(*create_srv_conf)(ngx_conf_t *cf);
+ char *(*merge_srv_conf)(ngx_conf_t *cf, void *prev,
+ void *conf);
+} ngx_stream_module_t;
+
+
+#define NGX_STREAM_MODULE 0x4d525453 /* "STRM" */
+
+#define NGX_STREAM_MAIN_CONF 0x02000000
+#define NGX_STREAM_SRV_CONF 0x04000000
+#define NGX_STREAM_UPS_CONF 0x08000000
+
+
+#define NGX_STREAM_MAIN_CONF_OFFSET offsetof(ngx_stream_conf_ctx_t, main_conf)
+#define NGX_STREAM_SRV_CONF_OFFSET offsetof(ngx_stream_conf_ctx_t, srv_conf)
+
+
+#define ngx_stream_get_module_ctx(s, module) (s)->ctx[module.ctx_index]
+#define ngx_stream_set_ctx(s, c, module) s->ctx[module.ctx_index] = c;
+#define ngx_stream_delete_ctx(s, module) s->ctx[module.ctx_index] = NULL;
+
+
+#define ngx_stream_get_module_main_conf(s, module) \
+ (s)->main_conf[module.ctx_index]
+#define ngx_stream_get_module_srv_conf(s, module) \
+ (s)->srv_conf[module.ctx_index]
+
+#define ngx_stream_conf_get_module_main_conf(cf, module) \
+ ((ngx_stream_conf_ctx_t *) cf->ctx)->main_conf[module.ctx_index]
+#define ngx_stream_conf_get_module_srv_conf(cf, module) \
+ ((ngx_stream_conf_ctx_t *) cf->ctx)->srv_conf[module.ctx_index]
+
+#define ngx_stream_cycle_get_module_main_conf(cycle, module) \
+ (cycle->conf_ctx[ngx_stream_module.index] ? \
+ ((ngx_stream_conf_ctx_t *) cycle->conf_ctx[ngx_stream_module.index]) \
+ ->main_conf[module.ctx_index]: \
+ NULL)
+
+#define ngx_stream_set_connection_log(c, l) \
+ \
+ c->log->file = l->file; \
+ c->log->next = l->next; \
+ c->log->writer = l->writer; \
+ c->log->wdata = l->wdata; \
+ if (!(c->log->log_level & NGX_LOG_DEBUG_CONNECTION)) { \
+ c->log->log_level = l->log_level; \
+ }
+
+
+void ngx_stream_init_connection(ngx_connection_t *c);
+void ngx_stream_close_connection(ngx_connection_t *c);
+
+
+extern ngx_module_t ngx_stream_module;
+extern ngx_uint_t ngx_stream_max_module;
+extern ngx_module_t ngx_stream_core_module;
+
+
+#endif /* _NGX_STREAM_H_INCLUDED_ */
diff --git a/src/stream/ngx_stream_core_module.c b/src/stream/ngx_stream_core_module.c
new file mode 100644
index 000000000..c0df412a5
--- /dev/null
+++ b/src/stream/ngx_stream_core_module.c
@@ -0,0 +1,495 @@
+
+/*
+ * Copyright (C) Roman Arutyunyan
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+static void *ngx_stream_core_create_main_conf(ngx_conf_t *cf);
+static void *ngx_stream_core_create_srv_conf(ngx_conf_t *cf);
+static char *ngx_stream_core_merge_srv_conf(ngx_conf_t *cf, void *parent,
+ void *child);
+static char *ngx_stream_core_error_log(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
+static char *ngx_stream_core_server(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
+static char *ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
+
+
+static ngx_command_t ngx_stream_core_commands[] = {
+
+ { ngx_string("server"),
+ NGX_STREAM_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
+ ngx_stream_core_server,
+ 0,
+ 0,
+ NULL },
+
+ { ngx_string("listen"),
+ NGX_STREAM_SRV_CONF|NGX_CONF_1MORE,
+ ngx_stream_core_listen,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
+ { ngx_string("error_log"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_1MORE,
+ ngx_stream_core_error_log,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
+ ngx_null_command
+};
+
+
+static ngx_stream_module_t ngx_stream_core_module_ctx = {
+ ngx_stream_core_create_main_conf, /* create main configuration */
+ NULL, /* init main configuration */
+
+ ngx_stream_core_create_srv_conf, /* create server configuration */
+ ngx_stream_core_merge_srv_conf /* merge server configuration */
+};
+
+
+ngx_module_t ngx_stream_core_module = {
+ NGX_MODULE_V1,
+ &ngx_stream_core_module_ctx, /* module context */
+ ngx_stream_core_commands, /* module directives */
+ NGX_STREAM_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ NULL, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ NULL, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+
+static void *
+ngx_stream_core_create_main_conf(ngx_conf_t *cf)
+{
+ ngx_stream_core_main_conf_t *cmcf;
+
+ cmcf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_core_main_conf_t));
+ if (cmcf == NULL) {
+ return NULL;
+ }
+
+ if (ngx_array_init(&cmcf->servers, cf->pool, 4,
+ sizeof(ngx_stream_core_srv_conf_t *))
+ != NGX_OK)
+ {
+ return NULL;
+ }
+
+ if (ngx_array_init(&cmcf->listen, cf->pool, 4, sizeof(ngx_stream_listen_t))
+ != NGX_OK)
+ {
+ return NULL;
+ }
+
+ return cmcf;
+}
+
+
+static void *
+ngx_stream_core_create_srv_conf(ngx_conf_t *cf)
+{
+ ngx_stream_core_srv_conf_t *cscf;
+
+ cscf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_core_srv_conf_t));
+ if (cscf == NULL) {
+ return NULL;
+ }
+
+ /*
+ * set by ngx_pcalloc():
+ *
+ * cscf->handler = NULL;
+ * cscf->error_log = NULL;
+ */
+
+ cscf->file_name = cf->conf_file->file.name.data;
+ cscf->line = cf->conf_file->line;
+
+ return cscf;
+}
+
+
+static char *
+ngx_stream_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
+{
+ ngx_stream_core_srv_conf_t *prev = parent;
+ ngx_stream_core_srv_conf_t *conf = child;
+
+ if (conf->handler == NULL) {
+ ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
+ "no handler for server in %s:%ui",
+ conf->file_name, conf->line);
+ return NGX_CONF_ERROR;
+ }
+
+ if (conf->error_log == NULL) {
+ if (prev->error_log) {
+ conf->error_log = prev->error_log;
+ } else {
+ conf->error_log = &cf->cycle->new_log;
+ }
+ }
+
+ return NGX_CONF_OK;
+}
+
+
+static char *
+ngx_stream_core_error_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_stream_core_srv_conf_t *cscf = conf;
+
+ return ngx_log_set_log(cf, &cscf->error_log);
+}
+
+
+static char *
+ngx_stream_core_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ char *rv;
+ void *mconf;
+ ngx_uint_t m;
+ ngx_conf_t pcf;
+ ngx_stream_module_t *module;
+ ngx_stream_conf_ctx_t *ctx, *stream_ctx;
+ ngx_stream_core_srv_conf_t *cscf, **cscfp;
+ ngx_stream_core_main_conf_t *cmcf;
+
+ ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t));
+ if (ctx == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ stream_ctx = cf->ctx;
+ ctx->main_conf = stream_ctx->main_conf;
+
+ /* the server{}'s srv_conf */
+
+ ctx->srv_conf = ngx_pcalloc(cf->pool,
+ sizeof(void *) * ngx_stream_max_module);
+ if (ctx->srv_conf == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ for (m = 0; ngx_modules[m]; m++) {
+ if (ngx_modules[m]->type != NGX_STREAM_MODULE) {
+ continue;
+ }
+
+ module = ngx_modules[m]->ctx;
+
+ if (module->create_srv_conf) {
+ mconf = module->create_srv_conf(cf);
+ if (mconf == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ctx->srv_conf[ngx_modules[m]->ctx_index] = mconf;
+ }
+ }
+
+ /* the server configuration context */
+
+ cscf = ctx->srv_conf[ngx_stream_core_module.ctx_index];
+ cscf->ctx = ctx;
+
+ cmcf = ctx->main_conf[ngx_stream_core_module.ctx_index];
+
+ cscfp = ngx_array_push(&cmcf->servers);
+ if (cscfp == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ *cscfp = cscf;
+
+
+ /* parse inside server{} */
+
+ pcf = *cf;
+ cf->ctx = ctx;
+ cf->cmd_type = NGX_STREAM_SRV_CONF;
+
+ rv = ngx_conf_parse(cf, NULL);
+
+ *cf = pcf;
+
+ return rv;
+}
+
+
+static char *
+ngx_stream_core_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ size_t len, off;
+ in_port_t port;
+ ngx_str_t *value;
+ ngx_url_t u;
+ ngx_uint_t i;
+ struct sockaddr *sa;
+ struct sockaddr_in *sin;
+ ngx_stream_listen_t *ls;
+ ngx_stream_core_main_conf_t *cmcf;
+#if (NGX_HAVE_INET6)
+ struct sockaddr_in6 *sin6;
+#endif
+
+ value = cf->args->elts;
+
+ ngx_memzero(&u, sizeof(ngx_url_t));
+
+ u.url = value[1];
+ u.listen = 1;
+
+ if (ngx_parse_url(cf->pool, &u) != NGX_OK) {
+ if (u.err) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "%s in \"%V\" of the \"listen\" directive",
+ u.err, &u.url);
+ }
+
+ return NGX_CONF_ERROR;
+ }
+
+ cmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_core_module);
+
+ ls = cmcf->listen.elts;
+
+ for (i = 0; i < cmcf->listen.nelts; i++) {
+
+ sa = (struct sockaddr *) ls[i].sockaddr;
+
+ if (sa->sa_family != u.family) {
+ continue;
+ }
+
+ switch (sa->sa_family) {
+
+#if (NGX_HAVE_INET6)
+ case AF_INET6:
+ off = offsetof(struct sockaddr_in6, sin6_addr);
+ len = 16;
+ sin6 = (struct sockaddr_in6 *) sa;
+ port = sin6->sin6_port;
+ break;
+#endif
+
+#if (NGX_HAVE_UNIX_DOMAIN)
+ case AF_UNIX:
+ off = offsetof(struct sockaddr_un, sun_path);
+ len = sizeof(((struct sockaddr_un *) sa)->sun_path);
+ port = 0;
+ break;
+#endif
+
+ default: /* AF_INET */
+ off = offsetof(struct sockaddr_in, sin_addr);
+ len = 4;
+ sin = (struct sockaddr_in *) sa;
+ port = sin->sin_port;
+ break;
+ }
+
+ if (ngx_memcmp(ls[i].sockaddr + off, u.sockaddr + off, len) != 0) {
+ continue;
+ }
+
+ if (port != u.port) {
+ continue;
+ }
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "duplicate \"%V\" address and port pair", &u.url);
+ return NGX_CONF_ERROR;
+ }
+
+ ls = ngx_array_push(&cmcf->listen);
+ if (ls == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ngx_memzero(ls, sizeof(ngx_stream_listen_t));
+
+ ngx_memcpy(ls->sockaddr, u.sockaddr, u.socklen);
+
+ ls->socklen = u.socklen;
+ ls->wildcard = u.wildcard;
+ ls->ctx = cf->ctx;
+
+#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
+ ls->ipv6only = 1;
+#endif
+
+ for (i = 2; i < cf->args->nelts; i++) {
+
+ if (ngx_strcmp(value[i].data, "bind") == 0) {
+ ls->bind = 1;
+ continue;
+ }
+
+ if (ngx_strncmp(value[i].data, "ipv6only=o", 10) == 0) {
+#if (NGX_HAVE_INET6 && defined IPV6_V6ONLY)
+ struct sockaddr *sa;
+ u_char buf[NGX_SOCKADDR_STRLEN];
+
+ sa = (struct sockaddr *) ls->sockaddr;
+
+ if (sa->sa_family == AF_INET6) {
+
+ if (ngx_strcmp(&value[i].data[10], "n") == 0) {
+ ls->ipv6only = 1;
+
+ } else if (ngx_strcmp(&value[i].data[10], "ff") == 0) {
+ ls->ipv6only = 0;
+
+ } else {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid ipv6only flags \"%s\"",
+ &value[i].data[9]);
+ return NGX_CONF_ERROR;
+ }
+
+ ls->bind = 1;
+
+ } else {
+ len = ngx_sock_ntop(sa, ls->socklen, buf,
+ NGX_SOCKADDR_STRLEN, 1);
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "ipv6only is not supported "
+ "on addr \"%*s\", ignored", len, buf);
+ }
+
+ continue;
+#else
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "bind ipv6only is not supported "
+ "on this platform");
+ return NGX_CONF_ERROR;
+#endif
+ }
+
+ if (ngx_strcmp(value[i].data, "ssl") == 0) {
+#if (NGX_STREAM_SSL)
+ ls->ssl = 1;
+ continue;
+#else
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "the \"ssl\" parameter requires "
+ "ngx_stream_ssl_module");
+ return NGX_CONF_ERROR;
+#endif
+ }
+
+ if (ngx_strncmp(value[i].data, "so_keepalive=", 13) == 0) {
+
+ if (ngx_strcmp(&value[i].data[13], "on") == 0) {
+ ls->so_keepalive = 1;
+
+ } else if (ngx_strcmp(&value[i].data[13], "off") == 0) {
+ ls->so_keepalive = 2;
+
+ } else {
+
+#if (NGX_HAVE_KEEPALIVE_TUNABLE)
+ u_char *p, *end;
+ ngx_str_t s;
+
+ end = value[i].data + value[i].len;
+ s.data = value[i].data + 13;
+
+ p = ngx_strlchr(s.data, end, ':');
+ if (p == NULL) {
+ p = end;
+ }
+
+ if (p > s.data) {
+ s.len = p - s.data;
+
+ ls->tcp_keepidle = ngx_parse_time(&s, 1);
+ if (ls->tcp_keepidle == (time_t) NGX_ERROR) {
+ goto invalid_so_keepalive;
+ }
+ }
+
+ s.data = (p < end) ? (p + 1) : end;
+
+ p = ngx_strlchr(s.data, end, ':');
+ if (p == NULL) {
+ p = end;
+ }
+
+ if (p > s.data) {
+ s.len = p - s.data;
+
+ ls->tcp_keepintvl = ngx_parse_time(&s, 1);
+ if (ls->tcp_keepintvl == (time_t) NGX_ERROR) {
+ goto invalid_so_keepalive;
+ }
+ }
+
+ s.data = (p < end) ? (p + 1) : end;
+
+ if (s.data < end) {
+ s.len = end - s.data;
+
+ ls->tcp_keepcnt = ngx_atoi(s.data, s.len);
+ if (ls->tcp_keepcnt == NGX_ERROR) {
+ goto invalid_so_keepalive;
+ }
+ }
+
+ if (ls->tcp_keepidle == 0 && ls->tcp_keepintvl == 0
+ && ls->tcp_keepcnt == 0)
+ {
+ goto invalid_so_keepalive;
+ }
+
+ ls->so_keepalive = 1;
+
+#else
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "the \"so_keepalive\" parameter accepts "
+ "only \"on\" or \"off\" on this platform");
+ return NGX_CONF_ERROR;
+
+#endif
+ }
+
+ ls->bind = 1;
+
+ continue;
+
+#if (NGX_HAVE_KEEPALIVE_TUNABLE)
+ invalid_so_keepalive:
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid so_keepalive value: \"%s\"",
+ &value[i].data[13]);
+ return NGX_CONF_ERROR;
+#endif
+ }
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "the invalid \"%V\" parameter", &value[i]);
+ return NGX_CONF_ERROR;
+ }
+
+ return NGX_CONF_OK;
+}
diff --git a/src/stream/ngx_stream_handler.c b/src/stream/ngx_stream_handler.c
new file mode 100644
index 000000000..c05e905c4
--- /dev/null
+++ b/src/stream/ngx_stream_handler.c
@@ -0,0 +1,296 @@
+
+/*
+ * Copyright (C) Roman Arutyunyan
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_event.h>
+#include <ngx_stream.h>
+
+
+static u_char *ngx_stream_log_error(ngx_log_t *log, u_char *buf, size_t len);
+static void ngx_stream_init_session(ngx_connection_t *c);
+
+#if (NGX_STREAM_SSL)
+static void ngx_stream_ssl_init_connection(ngx_ssl_t *ssl, ngx_connection_t *c);
+static void ngx_stream_ssl_handshake_handler(ngx_connection_t *c);
+#endif
+
+
+void
+ngx_stream_init_connection(ngx_connection_t *c)
+{
+ u_char text[NGX_SOCKADDR_STRLEN];
+ size_t len;
+ ngx_uint_t i;
+ struct sockaddr *sa;
+ ngx_stream_port_t *port;
+ struct sockaddr_in *sin;
+ ngx_stream_in_addr_t *addr;
+ ngx_stream_session_t *s;
+ ngx_stream_addr_conf_t *addr_conf;
+#if (NGX_HAVE_INET6)
+ struct sockaddr_in6 *sin6;
+ ngx_stream_in6_addr_t *addr6;
+#endif
+ ngx_stream_core_srv_conf_t *cscf;
+
+ /* find the server configuration for the address:port */
+
+ port = c->listening->servers;
+
+ if (port->naddrs > 1) {
+
+ /*
+ * There are several addresses on this port and one of them
+ * is the "*:port" wildcard so getsockname() is needed to determine
+ * the server address.
+ *
+ * AcceptEx() already gave this address.
+ */
+
+ if (ngx_connection_local_sockaddr(c, NULL, 0) != NGX_OK) {
+ ngx_stream_close_connection(c);
+ return;
+ }
+
+ sa = c->local_sockaddr;
+
+ switch (sa->sa_family) {
+
+#if (NGX_HAVE_INET6)
+ case AF_INET6:
+ sin6 = (struct sockaddr_in6 *) sa;
+
+ addr6 = port->addrs;
+
+ /* the last address is "*" */
+
+ for (i = 0; i < port->naddrs - 1; i++) {
+ if (ngx_memcmp(&addr6[i].addr6, &sin6->sin6_addr, 16) == 0) {
+ break;
+ }
+ }
+
+ addr_conf = &addr6[i].conf;
+
+ break;
+#endif
+
+ default: /* AF_INET */
+ sin = (struct sockaddr_in *) sa;
+
+ addr = port->addrs;
+
+ /* the last address is "*" */
+
+ for (i = 0; i < port->naddrs - 1; i++) {
+ if (addr[i].addr == sin->sin_addr.s_addr) {
+ break;
+ }
+ }
+
+ addr_conf = &addr[i].conf;
+
+ break;
+ }
+
+ } else {
+ switch (c->local_sockaddr->sa_family) {
+
+#if (NGX_HAVE_INET6)
+ case AF_INET6:
+ addr6 = port->addrs;
+ addr_conf = &addr6[0].conf;
+ break;
+#endif
+
+ default: /* AF_INET */
+ addr = port->addrs;
+ addr_conf = &addr[0].conf;
+ break;
+ }
+ }
+
+ s = ngx_pcalloc(c->pool, sizeof(ngx_stream_session_t));
+ if (s == NULL) {
+ ngx_stream_close_connection(c);
+ return;
+ }
+
+ s->signature = NGX_STREAM_MODULE;
+ s->main_conf = addr_conf->ctx->main_conf;
+ s->srv_conf = addr_conf->ctx->srv_conf;
+
+ s->connection = c;
+ c->data = s;
+
+ cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
+
+ ngx_stream_set_connection_log(c, cscf->error_log);
+
+ len = ngx_sock_ntop(c->sockaddr, c->socklen, text, NGX_SOCKADDR_STRLEN, 1);
+
+ ngx_log_error(NGX_LOG_INFO, c->log, 0, "*%uA client %*s connected to %V",
+ c->number, len, text, &addr_conf->addr_text);
+
+ c->log->connection = c->number;
+ c->log->handler = ngx_stream_log_error;
+ c->log->data = s;
+ c->log->action = "initializing connection";
+ c->log_error = NGX_ERROR_INFO;
+
+#if (NGX_STREAM_SSL)
+ {
+ ngx_stream_ssl_conf_t *sslcf;
+
+ sslcf = ngx_stream_get_module_srv_conf(s, ngx_stream_ssl_module);
+
+ if (addr_conf->ssl) {
+ c->log->action = "SSL handshaking";
+
+ if (sslcf->ssl.ctx == NULL) {
+ ngx_log_error(NGX_LOG_ERR, c->log, 0,
+ "no \"ssl_certificate\" is defined "
+ "in server listening on SSL port");
+ ngx_stream_close_connection(c);
+ return;
+ }
+
+ ngx_stream_ssl_init_connection(&sslcf->ssl, c);
+ return;
+ }
+ }
+#endif
+
+ ngx_stream_init_session(c);
+}
+
+
+static void
+ngx_stream_init_session(ngx_connection_t *c)
+{
+ ngx_stream_session_t *s;
+ ngx_stream_core_srv_conf_t *cscf;
+
+ s = c->data;
+ c->log->action = "handling client connection";
+
+ cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
+
+ s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module);
+ if (s->ctx == NULL) {
+ ngx_stream_close_connection(c);
+ return;
+ }
+
+ cscf->handler(s);
+}
+
+
+#if (NGX_STREAM_SSL)
+
+static void
+ngx_stream_ssl_init_connection(ngx_ssl_t *ssl, ngx_connection_t *c)
+{
+ ngx_stream_session_t *s;
+ ngx_stream_ssl_conf_t *sslcf;
+
+ if (ngx_ssl_create_connection(ssl, c, 0) == NGX_ERROR) {
+ ngx_stream_close_connection(c);
+ return;
+ }
+
+ if (ngx_ssl_handshake(c) == NGX_AGAIN) {
+
+ s = c->data;
+
+ sslcf = ngx_stream_get_module_srv_conf(s, ngx_stream_ssl_module);
+
+ ngx_add_timer(c->read, sslcf->handshake_timeout);
+
+ c->ssl->handler = ngx_stream_ssl_handshake_handler;
+
+ return;
+ }
+
+ ngx_stream_ssl_handshake_handler(c);
+}
+
+
+static void
+ngx_stream_ssl_handshake_handler(ngx_connection_t *c)
+{
+ if (!c->ssl->handshaked) {
+ ngx_stream_close_connection(c);
+ return;
+ }
+
+ if (c->read->timer_set) {
+ ngx_del_timer(c->read);
+ }
+
+ ngx_stream_init_session(c);
+}
+
+#endif
+
+
+void
+ngx_stream_close_connection(ngx_connection_t *c)
+{
+ ngx_pool_t *pool;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "close stream connection: %d", c->fd);
+
+#if (NGX_STREAM_SSL)
+
+ if (c->ssl) {
+ if (ngx_ssl_shutdown(c) == NGX_AGAIN) {
+ c->ssl->handler = ngx_stream_close_connection;
+ return;
+ }
+ }
+
+#endif
+
+#if (NGX_STAT_STUB)
+ (void) ngx_atomic_fetch_add(ngx_stat_active, -1);
+#endif
+
+ pool = c->pool;
+
+ ngx_close_connection(c);
+
+ ngx_destroy_pool(pool);
+}
+
+
+static u_char *
+ngx_stream_log_error(ngx_log_t *log, u_char *buf, size_t len)
+{
+ u_char *p;
+ ngx_stream_session_t *s;
+
+ if (log->action) {
+ p = ngx_snprintf(buf, len, " while %s", log->action);
+ len -= p - buf;
+ buf = p;
+ }
+
+ s = log->data;
+
+ p = ngx_snprintf(buf, len, ", client: %V, server: %V",
+ &s->connection->addr_text,
+ &s->connection->listening->addr_text);
+
+ if (s->log_handler) {
+ return s->log_handler(log, p, len);
+ }
+
+ return p;
+}
diff --git a/src/stream/ngx_stream_proxy_module.c b/src/stream/ngx_stream_proxy_module.c
new file mode 100644
index 000000000..98a9aba06
--- /dev/null
+++ b/src/stream/ngx_stream_proxy_module.c
@@ -0,0 +1,1288 @@
+
+/*
+ * Copyright (C) Roman Arutyunyan
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+typedef void (*ngx_stream_proxy_handler_pt)(ngx_stream_session_t *s);
+
+
+typedef struct {
+ ngx_msec_t connect_timeout;
+ ngx_msec_t timeout;
+ ngx_msec_t next_upstream_timeout;
+ size_t downstream_buf_size;
+ size_t upstream_buf_size;
+ ngx_uint_t next_upstream_tries;
+ ngx_flag_t next_upstream;
+
+#if (NGX_STREAM_SSL)
+ ngx_flag_t ssl_enable;
+ ngx_flag_t ssl_session_reuse;
+ ngx_uint_t ssl_protocols;
+ ngx_str_t ssl_ciphers;
+ ngx_str_t ssl_name;
+ ngx_flag_t ssl_server_name;
+
+ ngx_flag_t ssl_verify;
+ ngx_uint_t ssl_verify_depth;
+ ngx_str_t ssl_trusted_certificate;
+ ngx_str_t ssl_crl;
+ ngx_str_t ssl_certificate;
+ ngx_str_t ssl_certificate_key;
+ ngx_array_t *ssl_passwords;
+
+ ngx_ssl_t *ssl;
+#endif
+
+ ngx_stream_upstream_srv_conf_t *upstream;
+} ngx_stream_proxy_srv_conf_t;
+
+
+static void ngx_stream_proxy_handler(ngx_stream_session_t *s);
+static void ngx_stream_proxy_connect(ngx_stream_session_t *s);
+static void ngx_stream_proxy_init_upstream(ngx_stream_session_t *s);
+static void ngx_stream_proxy_upstream_handler(ngx_event_t *ev);
+static void ngx_stream_proxy_downstream_handler(ngx_event_t *ev);
+static void ngx_stream_proxy_connect_handler(ngx_event_t *ev);
+static ngx_int_t ngx_stream_proxy_test_connect(ngx_connection_t *c);
+static ngx_int_t ngx_stream_proxy_process(ngx_stream_session_t *s,
+ ngx_uint_t from_upstream, ngx_uint_t do_write);
+static void ngx_stream_proxy_next_upstream(ngx_stream_session_t *s);
+static void ngx_stream_proxy_finalize(ngx_stream_session_t *s, ngx_int_t rc);
+static u_char *ngx_stream_proxy_log_error(ngx_log_t *log, u_char *buf,
+ size_t len);
+
+static void *ngx_stream_proxy_create_srv_conf(ngx_conf_t *cf);
+static char *ngx_stream_proxy_merge_srv_conf(ngx_conf_t *cf, void *parent,
+ void *child);
+static char *ngx_stream_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
+
+#if (NGX_STREAM_SSL)
+
+static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf,
+ ngx_command_t *cmd, void *conf);
+static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s);
+static void ngx_stream_proxy_ssl_handshake(ngx_connection_t *pc);
+static ngx_int_t ngx_stream_proxy_ssl_name(ngx_stream_session_t *s);
+static ngx_int_t ngx_stream_proxy_set_ssl(ngx_conf_t *cf,
+ ngx_stream_proxy_srv_conf_t *pscf);
+
+
+static ngx_conf_bitmask_t ngx_stream_proxy_ssl_protocols[] = {
+ { ngx_string("SSLv2"), NGX_SSL_SSLv2 },
+ { ngx_string("SSLv3"), NGX_SSL_SSLv3 },
+ { ngx_string("TLSv1"), NGX_SSL_TLSv1 },
+ { ngx_string("TLSv1.1"), NGX_SSL_TLSv1_1 },
+ { ngx_string("TLSv1.2"), NGX_SSL_TLSv1_2 },
+ { ngx_null_string, 0 }
+};
+
+#endif
+
+
+static ngx_command_t ngx_stream_proxy_commands[] = {
+
+ { ngx_string("proxy_pass"),
+ NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_stream_proxy_pass,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
+ { ngx_string("proxy_connect_timeout"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_msec_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, connect_timeout),
+ NULL },
+
+ { ngx_string("proxy_timeout"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_msec_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, timeout),
+ NULL },
+
+ { ngx_string("proxy_downstream_buffer"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_size_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, downstream_buf_size),
+ NULL },
+
+ { ngx_string("proxy_upstream_buffer"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_size_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, upstream_buf_size),
+ NULL },
+
+ { ngx_string("proxy_next_upstream"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
+ ngx_conf_set_flag_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, next_upstream),
+ NULL },
+
+ { ngx_string("proxy_next_upstream_tries"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_num_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, next_upstream_tries),
+ NULL },
+
+ { ngx_string("proxy_next_upstream_timeout"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_msec_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, next_upstream_timeout),
+ NULL },
+
+#if (NGX_STREAM_SSL)
+
+ { ngx_string("proxy_ssl"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
+ ngx_conf_set_flag_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_enable),
+ NULL },
+
+ { ngx_string("proxy_ssl_session_reuse"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
+ ngx_conf_set_flag_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_session_reuse),
+ NULL },
+
+ { ngx_string("proxy_ssl_protocols"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_1MORE,
+ ngx_conf_set_bitmask_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_protocols),
+ &ngx_stream_proxy_ssl_protocols },
+
+ { ngx_string("proxy_ssl_ciphers"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_ciphers),
+ NULL },
+
+ { ngx_string("proxy_ssl_name"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_name),
+ NULL },
+
+ { ngx_string("proxy_ssl_server_name"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
+ ngx_conf_set_flag_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_server_name),
+ NULL },
+
+ { ngx_string("proxy_ssl_verify"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
+ ngx_conf_set_flag_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_verify),
+ NULL },
+
+ { ngx_string("proxy_ssl_verify_depth"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_num_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_verify_depth),
+ NULL },
+
+ { ngx_string("proxy_ssl_trusted_certificate"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_trusted_certificate),
+ NULL },
+
+ { ngx_string("proxy_ssl_crl"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_crl),
+ NULL },
+
+ { ngx_string("proxy_ssl_certificate"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_certificate),
+ NULL },
+
+ { ngx_string("proxy_ssl_certificate_key"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, ssl_certificate_key),
+ NULL },
+
+ { ngx_string("proxy_ssl_password_file"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_stream_proxy_ssl_password_file,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
+#endif
+
+ ngx_null_command
+};
+
+
+static ngx_stream_module_t ngx_stream_proxy_module_ctx = {
+ NULL, /* create main configuration */
+ NULL, /* init main configuration */
+
+ ngx_stream_proxy_create_srv_conf, /* create server configuration */
+ ngx_stream_proxy_merge_srv_conf /* merge server configuration */
+};
+
+
+ngx_module_t ngx_stream_proxy_module = {
+ NGX_MODULE_V1,
+ &ngx_stream_proxy_module_ctx, /* module context */
+ ngx_stream_proxy_commands, /* module directives */
+ NGX_STREAM_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ NULL, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ NULL, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+
+static void
+ngx_stream_proxy_handler(ngx_stream_session_t *s)
+{
+ u_char *p;
+ ngx_connection_t *c;
+ ngx_stream_upstream_t *u;
+ ngx_stream_proxy_srv_conf_t *pscf;
+ ngx_stream_upstream_srv_conf_t *uscf;
+
+ c = s->connection;
+
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "proxy connection handler");
+
+ u = ngx_pcalloc(c->pool, sizeof(ngx_stream_upstream_t));
+ if (u == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return;
+ }
+
+ s->upstream = u;
+
+ s->log_handler = ngx_stream_proxy_log_error;
+
+ u->peer.log = c->log;
+ u->peer.log_error = NGX_ERROR_ERR;
+
+ uscf = pscf->upstream;
+
+ if (uscf->peer.init(s, uscf) != NGX_OK) {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return;
+ }
+
+ u->peer.start_time = ngx_current_msec;
+
+ if (pscf->next_upstream_tries
+ && u->peer.tries > pscf->next_upstream_tries)
+ {
+ u->peer.tries = pscf->next_upstream_tries;
+ }
+
+ p = ngx_pnalloc(c->pool, pscf->downstream_buf_size);
+ if (p == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return;
+ }
+
+ u->downstream_buf.start = p;
+ u->downstream_buf.end = p + pscf->downstream_buf_size;
+ u->downstream_buf.pos = p;
+ u->downstream_buf.last = p;
+
+ c->write->handler = ngx_stream_proxy_downstream_handler;
+ c->read->handler = ngx_stream_proxy_downstream_handler;
+
+ if (ngx_stream_proxy_process(s, 0, 0) != NGX_OK) {
+ return;
+ }
+
+ ngx_stream_proxy_connect(s);
+}
+
+
+static void
+ngx_stream_proxy_connect(ngx_stream_session_t *s)
+{
+ ngx_int_t rc;
+ ngx_connection_t *c, *pc;
+ ngx_stream_upstream_t *u;
+ ngx_stream_proxy_srv_conf_t *pscf;
+
+ c = s->connection;
+
+ c->log->action = "connecting to upstream";
+
+ u = s->upstream;
+
+ rc = ngx_event_connect_peer(&u->peer);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "proxy connect: %i", rc);
+
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
+ if (rc == NGX_ERROR) {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return;
+ }
+
+ if (rc == NGX_BUSY) {
+ ngx_log_error(NGX_LOG_ERR, c->log, 0, "no live upstreams");
+ ngx_stream_proxy_finalize(s, NGX_DECLINED);
+ return;
+ }
+
+ if (rc == NGX_DECLINED) {
+ ngx_stream_proxy_next_upstream(s);
+ return;
+ }
+
+ /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */
+
+ pc = u->peer.connection;
+
+ pc->data = s;
+ pc->log = c->log;
+ pc->pool = c->pool;
+ pc->read->log = c->log;
+ pc->write->log = c->log;
+
+ if (rc != NGX_AGAIN) {
+ ngx_stream_proxy_init_upstream(s);
+ return;
+ }
+
+ pc->read->handler = ngx_stream_proxy_connect_handler;
+ pc->write->handler = ngx_stream_proxy_connect_handler;
+
+ ngx_add_timer(pc->write, pscf->connect_timeout);
+}
+
+
+static void
+ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
+{
+ u_char *p;
+ ngx_connection_t *c, *pc;
+ ngx_log_handler_pt handler;
+ ngx_stream_upstream_t *u;
+ ngx_stream_proxy_srv_conf_t *pscf;
+
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
+ u = s->upstream;
+
+ pc = u->peer.connection;
+
+#if (NGX_STREAM_SSL)
+ if (pscf->ssl && pc->ssl == NULL) {
+ ngx_stream_proxy_ssl_init_connection(s);
+ return;
+ }
+#endif
+
+ c = s->connection;
+
+ if (c->log->log_level >= NGX_LOG_INFO) {
+ ngx_str_t s;
+ u_char addr[NGX_SOCKADDR_STRLEN];
+
+ s.len = NGX_SOCKADDR_STRLEN;
+ s.data = addr;
+
+ if (ngx_connection_local_sockaddr(pc, &s, 1) == NGX_OK) {
+ handler = c->log->handler;
+ c->log->handler = NULL;
+
+ ngx_log_error(NGX_LOG_INFO, c->log, 0, "proxy %V connected to %V",
+ &s, u->peer.name);
+
+ c->log->handler = handler;
+ }
+ }
+
+ c->log->action = "proxying connection";
+
+ p = ngx_pnalloc(c->pool, pscf->upstream_buf_size);
+ if (p == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return;
+ }
+
+ u->upstream_buf.start = p;
+ u->upstream_buf.end = p + pscf->upstream_buf_size;
+ u->upstream_buf.pos = p;
+ u->upstream_buf.last = p;
+
+ pc->read->handler = ngx_stream_proxy_upstream_handler;
+ pc->write->handler = ngx_stream_proxy_upstream_handler;
+
+ if (ngx_stream_proxy_process(s, 1, 0) != NGX_OK) {
+ return;
+ }
+
+ ngx_stream_proxy_process(s, 0, 1);
+}
+
+
+#if (NGX_STREAM_SSL)
+
+static char *
+ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf)
+{
+ ngx_stream_proxy_srv_conf_t *pscf = conf;
+
+ ngx_str_t *value;
+
+ if (pscf->ssl_passwords != NGX_CONF_UNSET_PTR) {
+ return "is duplicate";
+ }
+
+ value = cf->args->elts;
+
+ pscf->ssl_passwords = ngx_ssl_read_password_file(cf, &value[1]);
+
+ if (pscf->ssl_passwords == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ return NGX_CONF_OK;
+}
+
+
+static void
+ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s)
+{
+ ngx_int_t rc;
+ ngx_connection_t *pc;
+ ngx_stream_upstream_t *u;
+ ngx_stream_proxy_srv_conf_t *pscf;
+
+ u = s->upstream;
+
+ pc = u->peer.connection;
+
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
+ if (ngx_ssl_create_connection(pscf->ssl, pc, NGX_SSL_BUFFER|NGX_SSL_CLIENT)
+ != NGX_OK)
+ {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return;
+ }
+
+ if (pscf->ssl_server_name || pscf->ssl_verify) {
+ if (ngx_stream_proxy_ssl_name(s) != NGX_OK) {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return;
+ }
+ }
+
+ if (pscf->ssl_session_reuse) {
+ if (u->peer.set_session(&u->peer, u->peer.data) != NGX_OK) {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return;
+ }
+ }
+
+ s->connection->log->action = "SSL handshaking to upstream";
+
+ rc = ngx_ssl_handshake(pc);
+
+ if (rc == NGX_AGAIN) {
+
+ if (!pc->write->timer_set) {
+ ngx_add_timer(pc->write, pscf->connect_timeout);
+ }
+
+ pc->ssl->handler = ngx_stream_proxy_ssl_handshake;
+ return;
+ }
+
+ ngx_stream_proxy_ssl_handshake(pc);
+}
+
+
+static void
+ngx_stream_proxy_ssl_handshake(ngx_connection_t *pc)
+{
+ long rc;
+ ngx_stream_session_t *s;
+ ngx_stream_upstream_t *u;
+ ngx_stream_proxy_srv_conf_t *pscf;
+
+ s = pc->data;
+
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
+ if (pc->ssl->handshaked) {
+
+ if (pscf->ssl_verify) {
+ rc = SSL_get_verify_result(pc->ssl->connection);
+
+ if (rc != X509_V_OK) {
+ ngx_log_error(NGX_LOG_ERR, pc->log, 0,
+ "upstream SSL certificate verify error: (%l:%s)",
+ rc, X509_verify_cert_error_string(rc));
+ goto failed;
+ }
+
+ u = s->upstream;
+
+ if (ngx_ssl_check_host(pc, &u->ssl_name) != NGX_OK) {
+ ngx_log_error(NGX_LOG_ERR, pc->log, 0,
+ "upstream SSL certificate does not match \"%V\"",
+ &u->ssl_name);
+ goto failed;
+ }
+ }
+
+ if (pscf->ssl_session_reuse) {
+ u = s->upstream;
+ u->peer.save_session(&u->peer, u->peer.data);
+ }
+
+ ngx_stream_proxy_init_upstream(s);
+
+ return;
+ }
+
+failed:
+
+ ngx_stream_proxy_next_upstream(s);
+}
+
+
+static ngx_int_t
+ngx_stream_proxy_ssl_name(ngx_stream_session_t *s)
+{
+ u_char *p, *last;
+ ngx_str_t name;
+ ngx_stream_upstream_t *u;
+ ngx_stream_proxy_srv_conf_t *pscf;
+
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
+ u = s->upstream;
+
+ name = pscf->ssl_name;
+
+ if (name.len == 0) {
+ name = pscf->upstream->host;
+ }
+
+ if (name.len == 0) {
+ goto done;
+ }
+
+ /*
+ * ssl name here may contain port, strip it for compatibility
+ * with the http module
+ */
+
+ p = name.data;
+ last = name.data + name.len;
+
+ if (*p == '[') {
+ p = ngx_strlchr(p, last, ']');
+
+ if (p == NULL) {
+ p = name.data;
+ }
+ }
+
+ p = ngx_strlchr(p, last, ':');
+
+ if (p != NULL) {
+ name.len = p - name.data;
+ }
+
+ if (!pscf->ssl_server_name) {
+ goto done;
+ }
+
+#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
+
+ /* as per RFC 6066, literal IPv4 and IPv6 addresses are not permitted */
+
+ if (name.len == 0 || *name.data == '[') {
+ goto done;
+ }
+
+ if (ngx_inet_addr(name.data, name.len) != INADDR_NONE) {
+ goto done;
+ }
+
+ /*
+ * SSL_set_tlsext_host_name() needs a null-terminated string,
+ * hence we explicitly null-terminate name here
+ */
+
+ p = ngx_pnalloc(s->connection->pool, name.len + 1);
+ if (p == NULL) {
+ return NGX_ERROR;
+ }
+
+ (void) ngx_cpystrn(p, name.data, name.len + 1);
+
+ name.data = p;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "upstream SSL server name: \"%s\"", name.data);
+
+ if (SSL_set_tlsext_host_name(u->peer.connection->ssl->connection, name.data)
+ == 0)
+ {
+ ngx_ssl_error(NGX_LOG_ERR, s->connection->log, 0,
+ "SSL_set_tlsext_host_name(\"%s\") failed", name.data);
+ return NGX_ERROR;
+ }
+
+#endif
+
+done:
+
+ u->ssl_name = name;
+
+ return NGX_OK;
+}
+
+#endif
+
+
+static void
+ngx_stream_proxy_downstream_handler(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+ ngx_stream_session_t *s;
+ ngx_stream_upstream_t *u;
+
+ c = ev->data;
+ s = c->data;
+
+ if (ev->timedout) {
+ ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
+ ngx_stream_proxy_finalize(s, NGX_DECLINED);
+ return;
+ }
+
+ u = s->upstream;
+
+ if (!ev->write) {
+ ngx_stream_proxy_process(s, 0, 0);
+
+ } else if (u->upstream_buf.start) {
+ ngx_stream_proxy_process(s, 1, 1);
+ }
+}
+
+
+static void
+ngx_stream_proxy_upstream_handler(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+ ngx_stream_session_t *s;
+ ngx_stream_upstream_t *u;
+
+ c = ev->data;
+ s = c->data;
+
+ u = s->upstream;
+
+ if (ev->write) {
+ ngx_stream_proxy_process(s, 0, 1);
+
+ } else if (u->upstream_buf.start) {
+ ngx_stream_proxy_process(s, 1, 0);
+ }
+}
+
+
+static void
+ngx_stream_proxy_connect_handler(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+ ngx_stream_session_t *s;
+
+ c = ev->data;
+ s = c->data;
+
+ if (ev->timedout) {
+ ngx_log_error(NGX_LOG_ERR, c->log, NGX_ETIMEDOUT, "upstream timed out");
+ ngx_stream_proxy_next_upstream(s);
+ return;
+ }
+
+ ngx_del_timer(c->write);
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream proxy connect upstream");
+
+ if (ngx_stream_proxy_test_connect(c) != NGX_OK) {
+ ngx_stream_proxy_next_upstream(s);
+ return;
+ }
+
+ ngx_stream_proxy_init_upstream(s);
+}
+
+
+static ngx_int_t
+ngx_stream_proxy_test_connect(ngx_connection_t *c)
+{
+ int err;
+ socklen_t len;
+
+#if (NGX_HAVE_KQUEUE)
+
+ if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
+ err = c->write->kq_errno ? c->write->kq_errno : c->read->kq_errno;
+
+ if (err) {
+ (void) ngx_connection_error(c, err,
+ "kevent() reported that connect() failed");
+ return NGX_ERROR;
+ }
+
+ } else
+#endif
+ {
+ err = 0;
+ len = sizeof(int);
+
+ /*
+ * BSDs and Linux return 0 and set a pending error in err
+ * Solaris returns -1 and sets errno
+ */
+
+ if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len)
+ == -1)
+ {
+ err = ngx_socket_errno;
+ }
+
+ if (err) {
+ (void) ngx_connection_error(c, err, "connect() failed");
+ return NGX_ERROR;
+ }
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
+ ngx_uint_t do_write)
+{
+ size_t size;
+ ssize_t n;
+ ngx_buf_t *b;
+ ngx_connection_t *c, *pc, *src, *dst;
+ ngx_log_handler_pt handler;
+ ngx_stream_upstream_t *u;
+ ngx_stream_proxy_srv_conf_t *pscf;
+
+ u = s->upstream;
+
+ c = s->connection;
+ pc = u->upstream_buf.start ? u->peer.connection : NULL;
+
+ if (from_upstream) {
+ src = pc;
+ dst = c;
+ b = &u->upstream_buf;
+
+ } else {
+ src = c;
+ dst = pc;
+ b = &u->downstream_buf;
+ }
+
+ for ( ;; ) {
+
+ if (do_write) {
+
+ size = b->last - b->pos;
+
+ if (size && dst && dst->write->ready) {
+
+ n = dst->send(dst, b->pos, size);
+
+ if (n == NGX_ERROR) {
+ ngx_stream_proxy_finalize(s, NGX_DECLINED);
+ return NGX_ERROR;
+ }
+
+ if (n > 0) {
+ b->pos += n;
+
+ if (b->pos == b->last) {
+ b->pos = b->start;
+ b->last = b->start;
+ }
+ }
+ }
+ }
+
+ size = b->end - b->last;
+
+ if (size && src->read->ready) {
+
+ n = src->recv(src, b->last, size);
+
+ if (n == NGX_AGAIN || n == 0) {
+ break;
+ }
+
+ if (n > 0) {
+ if (from_upstream) {
+ u->received += n;
+
+ } else {
+ s->received += n;
+ }
+
+ do_write = 1;
+ b->last += n;
+ continue;
+ }
+
+ if (n == NGX_ERROR) {
+ src->read->eof = 1;
+ }
+ }
+
+ break;
+ }
+
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
+ if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) {
+ handler = c->log->handler;
+ c->log->handler = NULL;
+
+ ngx_log_error(NGX_LOG_INFO, c->log, 0,
+ "%s disconnected"
+ ", bytes from/to client:%O/%O"
+ ", bytes from/to upstream:%O/%O",
+ from_upstream ? "upstream" : "client",
+ s->received, c->sent, u->received, pc ? pc->sent : 0);
+
+ c->log->handler = handler;
+
+ ngx_stream_proxy_finalize(s, NGX_OK);
+ return NGX_DONE;
+ }
+
+ if (ngx_handle_read_event(src->read, 0) != NGX_OK) {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return NGX_ERROR;
+ }
+
+ if (dst) {
+ if (ngx_handle_write_event(dst->write, 0) != NGX_OK) {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return NGX_ERROR;
+ }
+
+ ngx_add_timer(c->read, pscf->timeout);
+ }
+
+ return NGX_OK;
+}
+
+
+static void
+ngx_stream_proxy_next_upstream(ngx_stream_session_t *s)
+{
+ ngx_msec_t timeout;
+ ngx_connection_t *pc;
+ ngx_stream_upstream_t *u;
+ ngx_stream_proxy_srv_conf_t *pscf;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "stream proxy next upstream");
+
+ u = s->upstream;
+
+ if (u->peer.sockaddr) {
+ u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED);
+ u->peer.sockaddr = NULL;
+ }
+
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
+ timeout = pscf->next_upstream_timeout;
+
+ if (u->peer.tries == 0
+ || !pscf->next_upstream
+ || (timeout && ngx_current_msec - u->peer.start_time >= timeout))
+ {
+ ngx_stream_proxy_finalize(s, NGX_DECLINED);
+ return;
+ }
+
+ pc = u->peer.connection;
+
+ if (pc) {
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "close proxy upstream connection: %d", pc->fd);
+
+#if (NGX_STREAM_SSL)
+ if (pc->ssl) {
+ pc->ssl->no_wait_shutdown = 1;
+ pc->ssl->no_send_shutdown = 1;
+
+ (void) ngx_ssl_shutdown(pc);
+ }
+#endif
+
+ ngx_close_connection(pc);
+ u->peer.connection = NULL;
+ }
+
+ ngx_stream_proxy_connect(s);
+}
+
+
+static void
+ngx_stream_proxy_finalize(ngx_stream_session_t *s, ngx_int_t rc)
+{
+ ngx_connection_t *pc;
+ ngx_stream_upstream_t *u;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "finalize stream proxy: %i", rc);
+
+ u = s->upstream;
+
+ if (u == NULL) {
+ goto noupstream;
+ }
+
+ if (u->peer.free && u->peer.sockaddr) {
+ u->peer.free(&u->peer, u->peer.data, 0);
+ u->peer.sockaddr = NULL;
+ }
+
+ pc = u->peer.connection;
+
+ if (pc) {
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "close stream proxy upstream connection: %d", pc->fd);
+
+#if (NGX_STREAM_SSL)
+ if (pc->ssl) {
+ pc->ssl->no_wait_shutdown = 1;
+ (void) ngx_ssl_shutdown(pc);
+ }
+#endif
+
+ ngx_close_connection(pc);
+ u->peer.connection = NULL;
+ }
+
+noupstream:
+
+ ngx_stream_close_connection(s->connection);
+}
+
+
+static u_char *
+ngx_stream_proxy_log_error(ngx_log_t *log, u_char *buf, size_t len)
+{
+ u_char *p;
+ ngx_connection_t *pc;
+ ngx_stream_session_t *s;
+ ngx_stream_upstream_t *u;
+
+ s = log->data;
+
+ u = s->upstream;
+
+ p = buf;
+
+ if (u->peer.name) {
+ p = ngx_snprintf(p, len, ", upstream: \"%V\"", u->peer.name);
+ len -= p - buf;
+ }
+
+ pc = u->peer.connection;
+
+ p = ngx_snprintf(p, len,
+ ", bytes from/to client:%O/%O"
+ ", bytes from/to upstream:%O/%O",
+ s->received, s->connection->sent,
+ u->received, pc ? pc->sent : 0);
+
+ return p;
+}
+
+
+static void *
+ngx_stream_proxy_create_srv_conf(ngx_conf_t *cf)
+{
+ ngx_stream_proxy_srv_conf_t *conf;
+
+ conf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_proxy_srv_conf_t));
+ if (conf == NULL) {
+ return NULL;
+ }
+
+ /*
+ * set by ngx_pcalloc():
+ *
+ * conf->ssl_protocols = 0;
+ * conf->ssl_ciphers = { 0, NULL };
+ * conf->ssl_name = { 0, NULL };
+ * conf->ssl_trusted_certificate = { 0, NULL };
+ * conf->ssl_crl = { 0, NULL };
+ * conf->ssl_certificate = { 0, NULL };
+ * conf->ssl_certificate_key = { 0, NULL };
+ *
+ * conf->ssl = NULL;
+ * conf->upstream = NULL;
+ */
+
+ conf->connect_timeout = NGX_CONF_UNSET_MSEC;
+ conf->timeout = NGX_CONF_UNSET_MSEC;
+ conf->next_upstream_timeout = NGX_CONF_UNSET_MSEC;
+ conf->downstream_buf_size = NGX_CONF_UNSET_SIZE;
+ conf->upstream_buf_size = NGX_CONF_UNSET_SIZE;
+ conf->next_upstream_tries = NGX_CONF_UNSET_UINT;
+ conf->next_upstream = NGX_CONF_UNSET;
+
+#if (NGX_STREAM_SSL)
+ conf->ssl_enable = NGX_CONF_UNSET;
+ conf->ssl_session_reuse = NGX_CONF_UNSET;
+ conf->ssl_server_name = NGX_CONF_UNSET;
+ conf->ssl_verify = NGX_CONF_UNSET;
+ conf->ssl_verify_depth = NGX_CONF_UNSET_UINT;
+ conf->ssl_passwords = NGX_CONF_UNSET_PTR;
+#endif
+
+ return conf;
+}
+
+
+static char *
+ngx_stream_proxy_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
+{
+ ngx_stream_proxy_srv_conf_t *prev = parent;
+ ngx_stream_proxy_srv_conf_t *conf = child;
+
+ ngx_conf_merge_msec_value(conf->connect_timeout,
+ prev->connect_timeout, 60000);
+
+ ngx_conf_merge_msec_value(conf->timeout,
+ prev->timeout, 10 * 60000);
+
+ ngx_conf_merge_msec_value(conf->next_upstream_timeout,
+ prev->next_upstream_timeout, 0);
+
+ ngx_conf_merge_size_value(conf->downstream_buf_size,
+ prev->downstream_buf_size, 16384);
+
+ ngx_conf_merge_size_value(conf->upstream_buf_size,
+ prev->upstream_buf_size, 16384);
+
+ ngx_conf_merge_uint_value(conf->next_upstream_tries,
+ prev->next_upstream_tries, 0);
+
+ ngx_conf_merge_value(conf->next_upstream, prev->next_upstream, 1);
+
+#if (NGX_STREAM_SSL)
+
+ ngx_conf_merge_value(conf->ssl_enable, prev->ssl_enable, 0);
+
+ ngx_conf_merge_value(conf->ssl_session_reuse,
+ prev->ssl_session_reuse, 1);
+
+ ngx_conf_merge_bitmask_value(conf->ssl_protocols, prev->ssl_protocols,
+ (NGX_CONF_BITMASK_SET|NGX_SSL_SSLv3
+ |NGX_SSL_TLSv1|NGX_SSL_TLSv1_1
+ |NGX_SSL_TLSv1_2));
+
+ ngx_conf_merge_str_value(conf->ssl_ciphers, prev->ssl_ciphers, "DEFAULT");
+
+ ngx_conf_merge_str_value(conf->ssl_name, prev->ssl_name, "");
+
+ ngx_conf_merge_value(conf->ssl_server_name, prev->ssl_server_name, 0);
+
+ ngx_conf_merge_value(conf->ssl_verify, prev->ssl_verify, 0);
+
+ ngx_conf_merge_uint_value(conf->ssl_verify_depth,
+ prev->ssl_verify_depth, 1);
+
+ ngx_conf_merge_str_value(conf->ssl_trusted_certificate,
+ prev->ssl_trusted_certificate, "");
+
+ ngx_conf_merge_str_value(conf->ssl_crl, prev->ssl_crl, "");
+
+ ngx_conf_merge_str_value(conf->ssl_certificate,
+ prev->ssl_certificate, "");
+
+ ngx_conf_merge_str_value(conf->ssl_certificate_key,
+ prev->ssl_certificate_key, "");
+
+ ngx_conf_merge_ptr_value(conf->ssl_passwords, prev->ssl_passwords, NULL);
+
+ if (conf->ssl_enable && ngx_stream_proxy_set_ssl(cf, conf) != NGX_OK) {
+ return NGX_CONF_ERROR;
+ }
+
+#endif
+
+ return NGX_CONF_OK;
+}
+
+
+#if (NGX_STREAM_SSL)
+
+static ngx_int_t
+ngx_stream_proxy_set_ssl(ngx_conf_t *cf, ngx_stream_proxy_srv_conf_t *pscf)
+{
+ ngx_pool_cleanup_t *cln;
+
+ pscf->ssl = ngx_pcalloc(cf->pool, sizeof(ngx_ssl_t));
+ if (pscf->ssl == NULL) {
+ return NGX_ERROR;
+ }
+
+ pscf->ssl->log = cf->log;
+
+ if (ngx_ssl_create(pscf->ssl, pscf->ssl_protocols, NULL) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ cln = ngx_pool_cleanup_add(cf->pool, 0);
+ if (cln == NULL) {
+ return NGX_ERROR;
+ }
+
+ cln->handler = ngx_ssl_cleanup_ctx;
+ cln->data = pscf->ssl;
+
+ if (pscf->ssl_certificate.len) {
+
+ if (pscf->ssl_certificate_key.len == 0) {
+ ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
+ "no \"proxy_ssl_certificate_key\" is defined "
+ "for certificate \"%V\"", &pscf->ssl_certificate);
+ return NGX_ERROR;
+ }
+
+ if (ngx_ssl_certificate(cf, pscf->ssl, &pscf->ssl_certificate,
+ &pscf->ssl_certificate_key, pscf->ssl_passwords)
+ != NGX_OK)
+ {
+ return NGX_ERROR;
+ }
+ }
+
+ if (SSL_CTX_set_cipher_list(pscf->ssl->ctx,
+ (const char *) pscf->ssl_ciphers.data)
+ == 0)
+ {
+ ngx_ssl_error(NGX_LOG_EMERG, cf->log, 0,
+ "SSL_CTX_set_cipher_list(\"%V\") failed",
+ &pscf->ssl_ciphers);
+ return NGX_ERROR;
+ }
+
+ if (pscf->ssl_verify) {
+ if (pscf->ssl_trusted_certificate.len == 0) {
+ ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
+ "no proxy_ssl_trusted_certificate for proxy_ssl_verify");
+ return NGX_ERROR;
+ }
+
+ if (ngx_ssl_trusted_certificate(cf, pscf->ssl,
+ &pscf->ssl_trusted_certificate,
+ pscf->ssl_verify_depth)
+ != NGX_OK)
+ {
+ return NGX_ERROR;
+ }
+
+ if (ngx_ssl_crl(cf, pscf->ssl, &pscf->ssl_crl) != NGX_OK) {
+ return NGX_ERROR;
+ }
+ }
+
+ return NGX_OK;
+}
+
+#endif
+
+
+static char *
+ngx_stream_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_stream_proxy_srv_conf_t *pscf = conf;
+
+ ngx_url_t u;
+ ngx_str_t *value, *url;
+ ngx_stream_core_srv_conf_t *cscf;
+
+ if (pscf->upstream) {
+ return "is duplicate";
+ }
+
+ cscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_core_module);
+
+ cscf->handler = ngx_stream_proxy_handler;
+
+ value = cf->args->elts;
+
+ url = &value[1];
+
+ ngx_memzero(&u, sizeof(ngx_url_t));
+
+ u.url = *url;
+ u.no_resolve = 1;
+
+ pscf->upstream = ngx_stream_upstream_add(cf, &u, 0);
+ if (pscf->upstream == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ return NGX_CONF_OK;
+}
diff --git a/src/stream/ngx_stream_ssl_module.c b/src/stream/ngx_stream_ssl_module.c
new file mode 100644
index 000000000..ecdd14c56
--- /dev/null
+++ b/src/stream/ngx_stream_ssl_module.c
@@ -0,0 +1,456 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+#define NGX_DEFAULT_CIPHERS "HIGH:!aNULL:!MD5"
+#define NGX_DEFAULT_ECDH_CURVE "prime256v1"
+
+
+static void *ngx_stream_ssl_create_conf(ngx_conf_t *cf);
+static char *ngx_stream_ssl_merge_conf(ngx_conf_t *cf, void *parent,
+ void *child);
+
+static char *ngx_stream_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
+static char *ngx_stream_ssl_session_cache(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
+
+
+static ngx_conf_bitmask_t ngx_stream_ssl_protocols[] = {
+ { ngx_string("SSLv2"), NGX_SSL_SSLv2 },
+ { ngx_string("SSLv3"), NGX_SSL_SSLv3 },
+ { ngx_string("TLSv1"), NGX_SSL_TLSv1 },
+ { ngx_string("TLSv1.1"), NGX_SSL_TLSv1_1 },
+ { ngx_string("TLSv1.2"), NGX_SSL_TLSv1_2 },
+ { ngx_null_string, 0 }
+};
+
+
+static ngx_command_t ngx_stream_ssl_commands[] = {
+
+ { ngx_string("ssl_handshake_timeout"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_msec_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, handshake_timeout),
+ NULL },
+
+ { ngx_string("ssl_certificate"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, certificate),
+ NULL },
+
+ { ngx_string("ssl_certificate_key"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, certificate_key),
+ NULL },
+
+ { ngx_string("ssl_password_file"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_stream_ssl_password_file,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
+ { ngx_string("ssl_dhparam"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, dhparam),
+ NULL },
+
+ { ngx_string("ssl_ecdh_curve"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, ecdh_curve),
+ NULL },
+
+ { ngx_string("ssl_protocols"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_1MORE,
+ ngx_conf_set_bitmask_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, protocols),
+ &ngx_stream_ssl_protocols },
+
+ { ngx_string("ssl_ciphers"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, ciphers),
+ NULL },
+
+ { ngx_string("ssl_prefer_server_ciphers"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
+ ngx_conf_set_flag_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, prefer_server_ciphers),
+ NULL },
+
+ { ngx_string("ssl_session_cache"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE12,
+ ngx_stream_ssl_session_cache,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
+ { ngx_string("ssl_session_tickets"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
+ ngx_conf_set_flag_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, session_tickets),
+ NULL },
+
+ { ngx_string("ssl_session_ticket_key"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_str_array_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, session_ticket_keys),
+ NULL },
+
+ { ngx_string("ssl_session_timeout"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_sec_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_ssl_conf_t, session_timeout),
+ NULL },
+
+ ngx_null_command
+};
+
+
+static ngx_stream_module_t ngx_stream_ssl_module_ctx = {
+ NULL, /* create main configuration */
+ NULL, /* init main configuration */
+
+ ngx_stream_ssl_create_conf, /* create server configuration */
+ ngx_stream_ssl_merge_conf /* merge server configuration */
+};
+
+
+ngx_module_t ngx_stream_ssl_module = {
+ NGX_MODULE_V1,
+ &ngx_stream_ssl_module_ctx, /* module context */
+ ngx_stream_ssl_commands, /* module directives */
+ NGX_STREAM_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ NULL, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ NULL, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+
+static ngx_str_t ngx_stream_ssl_sess_id_ctx = ngx_string("STREAM");
+
+
+static void *
+ngx_stream_ssl_create_conf(ngx_conf_t *cf)
+{
+ ngx_stream_ssl_conf_t *scf;
+
+ scf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_ssl_conf_t));
+ if (scf == NULL) {
+ return NULL;
+ }
+
+ /*
+ * set by ngx_pcalloc():
+ *
+ * scf->protocols = 0;
+ * scf->certificate = { 0, NULL };
+ * scf->certificate_key = { 0, NULL };
+ * scf->dhparam = { 0, NULL };
+ * scf->ecdh_curve = { 0, NULL };
+ * scf->ciphers = { 0, NULL };
+ * scf->shm_zone = NULL;
+ */
+
+ scf->handshake_timeout = NGX_CONF_UNSET_MSEC;
+ scf->passwords = NGX_CONF_UNSET_PTR;
+ scf->prefer_server_ciphers = NGX_CONF_UNSET;
+ scf->builtin_session_cache = NGX_CONF_UNSET;
+ scf->session_timeout = NGX_CONF_UNSET;
+ scf->session_tickets = NGX_CONF_UNSET;
+ scf->session_ticket_keys = NGX_CONF_UNSET_PTR;
+
+ return scf;
+}
+
+
+static char *
+ngx_stream_ssl_merge_conf(ngx_conf_t *cf, void *parent, void *child)
+{
+ ngx_stream_ssl_conf_t *prev = parent;
+ ngx_stream_ssl_conf_t *conf = child;
+
+ ngx_pool_cleanup_t *cln;
+
+ ngx_conf_merge_msec_value(conf->handshake_timeout,
+ prev->handshake_timeout, 60000);
+
+ ngx_conf_merge_value(conf->session_timeout,
+ prev->session_timeout, 300);
+
+ ngx_conf_merge_value(conf->prefer_server_ciphers,
+ prev->prefer_server_ciphers, 0);
+
+ ngx_conf_merge_bitmask_value(conf->protocols, prev->protocols,
+ (NGX_CONF_BITMASK_SET|NGX_SSL_SSLv3|NGX_SSL_TLSv1
+ |NGX_SSL_TLSv1_1|NGX_SSL_TLSv1_2));
+
+ ngx_conf_merge_str_value(conf->certificate, prev->certificate, "");
+ ngx_conf_merge_str_value(conf->certificate_key, prev->certificate_key, "");
+
+ ngx_conf_merge_ptr_value(conf->passwords, prev->passwords, NULL);
+
+ ngx_conf_merge_str_value(conf->dhparam, prev->dhparam, "");
+
+ ngx_conf_merge_str_value(conf->ecdh_curve, prev->ecdh_curve,
+ NGX_DEFAULT_ECDH_CURVE);
+
+ ngx_conf_merge_str_value(conf->ciphers, prev->ciphers, NGX_DEFAULT_CIPHERS);
+
+
+ conf->ssl.log = cf->log;
+
+ if (conf->certificate.len == 0) {
+ return NGX_CONF_OK;
+ }
+
+ if (conf->certificate_key.len == 0) {
+ ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
+ "no \"ssl_certificate_key\" is defined "
+ "for certificate \"%V\"",
+ &conf->certificate);
+ return NGX_CONF_ERROR;
+ }
+
+ if (ngx_ssl_create(&conf->ssl, conf->protocols, NULL) != NGX_OK) {
+ return NGX_CONF_ERROR;
+ }
+
+ cln = ngx_pool_cleanup_add(cf->pool, 0);
+ if (cln == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ cln->handler = ngx_ssl_cleanup_ctx;
+ cln->data = &conf->ssl;
+
+ if (ngx_ssl_certificate(cf, &conf->ssl, &conf->certificate,
+ &conf->certificate_key, conf->passwords)
+ != NGX_OK)
+ {
+ return NGX_CONF_ERROR;
+ }
+
+ if (SSL_CTX_set_cipher_list(conf->ssl.ctx,
+ (const char *) conf->ciphers.data)
+ == 0)
+ {
+ ngx_ssl_error(NGX_LOG_EMERG, cf->log, 0,
+ "SSL_CTX_set_cipher_list(\"%V\") failed",
+ &conf->ciphers);
+ return NGX_CONF_ERROR;
+ }
+
+ if (conf->prefer_server_ciphers) {
+ SSL_CTX_set_options(conf->ssl.ctx, SSL_OP_CIPHER_SERVER_PREFERENCE);
+ }
+
+ SSL_CTX_set_tmp_rsa_callback(conf->ssl.ctx, ngx_ssl_rsa512_key_callback);
+
+ if (ngx_ssl_dhparam(cf, &conf->ssl, &conf->dhparam) != NGX_OK) {
+ return NGX_CONF_ERROR;
+ }
+
+ if (ngx_ssl_ecdh_curve(cf, &conf->ssl, &conf->ecdh_curve) != NGX_OK) {
+ return NGX_CONF_ERROR;
+ }
+
+ ngx_conf_merge_value(conf->builtin_session_cache,
+ prev->builtin_session_cache, NGX_SSL_NONE_SCACHE);
+
+ if (conf->shm_zone == NULL) {
+ conf->shm_zone = prev->shm_zone;
+ }
+
+ if (ngx_ssl_session_cache(&conf->ssl, &ngx_stream_ssl_sess_id_ctx,
+ conf->builtin_session_cache,
+ conf->shm_zone, conf->session_timeout)
+ != NGX_OK)
+ {
+ return NGX_CONF_ERROR;
+ }
+
+ ngx_conf_merge_value(conf->session_tickets,
+ prev->session_tickets, 1);
+
+#ifdef SSL_OP_NO_TICKET
+ if (!conf->session_tickets) {
+ SSL_CTX_set_options(conf->ssl.ctx, SSL_OP_NO_TICKET);
+ }
+#endif
+
+ ngx_conf_merge_ptr_value(conf->session_ticket_keys,
+ prev->session_ticket_keys, NULL);
+
+ if (ngx_ssl_session_ticket_keys(cf, &conf->ssl, conf->session_ticket_keys)
+ != NGX_OK)
+ {
+ return NGX_CONF_ERROR;
+ }
+
+ return NGX_CONF_OK;
+}
+
+
+static char *
+ngx_stream_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_stream_ssl_conf_t *scf = conf;
+
+ ngx_str_t *value;
+
+ if (scf->passwords != NGX_CONF_UNSET_PTR) {
+ return "is duplicate";
+ }
+
+ value = cf->args->elts;
+
+ scf->passwords = ngx_ssl_read_password_file(cf, &value[1]);
+
+ if (scf->passwords == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ return NGX_CONF_OK;
+}
+
+
+static char *
+ngx_stream_ssl_session_cache(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_stream_ssl_conf_t *scf = conf;
+
+ size_t len;
+ ngx_str_t *value, name, size;
+ ngx_int_t n;
+ ngx_uint_t i, j;
+
+ value = cf->args->elts;
+
+ for (i = 1; i < cf->args->nelts; i++) {
+
+ if (ngx_strcmp(value[i].data, "off") == 0) {
+ scf->builtin_session_cache = NGX_SSL_NO_SCACHE;
+ continue;
+ }
+
+ if (ngx_strcmp(value[i].data, "none") == 0) {
+ scf->builtin_session_cache = NGX_SSL_NONE_SCACHE;
+ continue;
+ }
+
+ if (ngx_strcmp(value[i].data, "builtin") == 0) {
+ scf->builtin_session_cache = NGX_SSL_DFLT_BUILTIN_SCACHE;
+ continue;
+ }
+
+ if (value[i].len > sizeof("builtin:") - 1
+ && ngx_strncmp(value[i].data, "builtin:", sizeof("builtin:") - 1)
+ == 0)
+ {
+ n = ngx_atoi(value[i].data + sizeof("builtin:") - 1,
+ value[i].len - (sizeof("builtin:") - 1));
+
+ if (n == NGX_ERROR) {
+ goto invalid;
+ }
+
+ scf->builtin_session_cache = n;
+
+ continue;
+ }
+
+ if (value[i].len > sizeof("shared:") - 1
+ && ngx_strncmp(value[i].data, "shared:", sizeof("shared:") - 1)
+ == 0)
+ {
+ len = 0;
+
+ for (j = sizeof("shared:") - 1; j < value[i].len; j++) {
+ if (value[i].data[j] == ':') {
+ break;
+ }
+
+ len++;
+ }
+
+ if (len == 0) {
+ goto invalid;
+ }
+
+ name.len = len;
+ name.data = value[i].data + sizeof("shared:") - 1;
+
+ size.len = value[i].len - j - 1;
+ size.data = name.data + len + 1;
+
+ n = ngx_parse_size(&size);
+
+ if (n == NGX_ERROR) {
+ goto invalid;
+ }
+
+ if (n < (ngx_int_t) (8 * ngx_pagesize)) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "session cache \"%V\" is too small",
+ &value[i]);
+
+ return NGX_CONF_ERROR;
+ }
+
+ scf->shm_zone = ngx_shared_memory_add(cf, &name, n,
+ &ngx_stream_ssl_module);
+ if (scf->shm_zone == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ scf->shm_zone->init = ngx_ssl_session_cache_init;
+
+ continue;
+ }
+
+ goto invalid;
+ }
+
+ if (scf->shm_zone && scf->builtin_session_cache == NGX_CONF_UNSET) {
+ scf->builtin_session_cache = NGX_SSL_NO_BUILTIN_SCACHE;
+ }
+
+ return NGX_CONF_OK;
+
+invalid:
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid session cache \"%V\"", &value[i]);
+
+ return NGX_CONF_ERROR;
+}
diff --git a/src/stream/ngx_stream_ssl_module.h b/src/stream/ngx_stream_ssl_module.h
new file mode 100644
index 000000000..85e8b6ede
--- /dev/null
+++ b/src/stream/ngx_stream_ssl_module.h
@@ -0,0 +1,49 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#ifndef _NGX_STREAM_SSL_H_INCLUDED_
+#define _NGX_STREAM_SSL_H_INCLUDED_
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+typedef struct {
+ ngx_msec_t handshake_timeout;
+
+ ngx_flag_t prefer_server_ciphers;
+
+ ngx_ssl_t ssl;
+
+ ngx_uint_t protocols;
+
+ ssize_t builtin_session_cache;
+
+ time_t session_timeout;
+
+ ngx_str_t certificate;
+ ngx_str_t certificate_key;
+ ngx_str_t dhparam;
+ ngx_str_t ecdh_curve;
+
+ ngx_str_t ciphers;
+
+ ngx_array_t *passwords;
+
+ ngx_shm_zone_t *shm_zone;
+
+ ngx_flag_t session_tickets;
+ ngx_array_t *session_ticket_keys;
+} ngx_stream_ssl_conf_t;
+
+
+extern ngx_module_t ngx_stream_ssl_module;
+
+
+#endif /* _NGX_STREAM_SSL_H_INCLUDED_ */
diff --git a/src/stream/ngx_stream_upstream.c b/src/stream/ngx_stream_upstream.c
new file mode 100644
index 000000000..a991f8a9f
--- /dev/null
+++ b/src/stream/ngx_stream_upstream.c
@@ -0,0 +1,462 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+static char *ngx_stream_upstream(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *dummy);
+static char *ngx_stream_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
+static void *ngx_stream_upstream_create_main_conf(ngx_conf_t *cf);
+static char *ngx_stream_upstream_init_main_conf(ngx_conf_t *cf, void *conf);
+
+
+static ngx_command_t ngx_stream_upstream_commands[] = {
+
+ { ngx_string("upstream"),
+ NGX_STREAM_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_TAKE1,
+ ngx_stream_upstream,
+ 0,
+ 0,
+ NULL },
+
+ { ngx_string("server"),
+ NGX_STREAM_UPS_CONF|NGX_CONF_1MORE,
+ ngx_stream_upstream_server,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
+ ngx_null_command
+};
+
+
+static ngx_stream_module_t ngx_stream_upstream_module_ctx = {
+ ngx_stream_upstream_create_main_conf, /* create main configuration */
+ ngx_stream_upstream_init_main_conf, /* init main configuration */
+
+ NULL, /* create server configuration */
+ NULL, /* merge server configuration */
+};
+
+
+ngx_module_t ngx_stream_upstream_module = {
+ NGX_MODULE_V1,
+ &ngx_stream_upstream_module_ctx, /* module context */
+ ngx_stream_upstream_commands, /* module directives */
+ NGX_STREAM_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ NULL, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ NULL, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+
+static char *
+ngx_stream_upstream(ngx_conf_t *cf, ngx_command_t *cmd, void *dummy)
+{
+ char *rv;
+ void *mconf;
+ ngx_str_t *value;
+ ngx_url_t u;
+ ngx_uint_t m;
+ ngx_conf_t pcf;
+ ngx_stream_module_t *module;
+ ngx_stream_conf_ctx_t *ctx, *stream_ctx;
+ ngx_stream_upstream_srv_conf_t *uscf;
+
+ ngx_memzero(&u, sizeof(ngx_url_t));
+
+ value = cf->args->elts;
+ u.host = value[1];
+ u.no_resolve = 1;
+ u.no_port = 1;
+
+ uscf = ngx_stream_upstream_add(cf, &u, NGX_STREAM_UPSTREAM_CREATE
+ |NGX_STREAM_UPSTREAM_WEIGHT
+ |NGX_STREAM_UPSTREAM_MAX_FAILS
+ |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
+ |NGX_STREAM_UPSTREAM_DOWN
+ |NGX_STREAM_UPSTREAM_BACKUP);
+ if (uscf == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+
+ ctx = ngx_pcalloc(cf->pool, sizeof(ngx_stream_conf_ctx_t));
+ if (ctx == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ stream_ctx = cf->ctx;
+ ctx->main_conf = stream_ctx->main_conf;
+
+ /* the upstream{}'s srv_conf */
+
+ ctx->srv_conf = ngx_pcalloc(cf->pool,
+ sizeof(void *) * ngx_stream_max_module);
+ if (ctx->srv_conf == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ctx->srv_conf[ngx_stream_upstream_module.ctx_index] = uscf;
+
+ uscf->srv_conf = ctx->srv_conf;
+
+ for (m = 0; ngx_modules[m]; m++) {
+ if (ngx_modules[m]->type != NGX_STREAM_MODULE) {
+ continue;
+ }
+
+ module = ngx_modules[m]->ctx;
+
+ if (module->create_srv_conf) {
+ mconf = module->create_srv_conf(cf);
+ if (mconf == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ctx->srv_conf[ngx_modules[m]->ctx_index] = mconf;
+ }
+ }
+
+ uscf->servers = ngx_array_create(cf->pool, 4,
+ sizeof(ngx_stream_upstream_server_t));
+ if (uscf->servers == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+
+ /* parse inside upstream{} */
+
+ pcf = *cf;
+ cf->ctx = ctx;
+ cf->cmd_type = NGX_STREAM_UPS_CONF;
+
+ rv = ngx_conf_parse(cf, NULL);
+
+ *cf = pcf;
+
+ if (rv != NGX_CONF_OK) {
+ return rv;
+ }
+
+ if (uscf->servers->nelts == 0) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "no servers are inside upstream");
+ return NGX_CONF_ERROR;
+ }
+
+ return rv;
+}
+
+
+static char *
+ngx_stream_upstream_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_stream_upstream_srv_conf_t *uscf = conf;
+
+ time_t fail_timeout;
+ ngx_str_t *value, s;
+ ngx_url_t u;
+ ngx_int_t weight, max_fails;
+ ngx_uint_t i;
+ ngx_stream_upstream_server_t *us;
+
+ us = ngx_array_push(uscf->servers);
+ if (us == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ngx_memzero(us, sizeof(ngx_stream_upstream_server_t));
+
+ value = cf->args->elts;
+
+ weight = 1;
+ max_fails = 1;
+ fail_timeout = 10;
+
+ for (i = 2; i < cf->args->nelts; i++) {
+
+ if (ngx_strncmp(value[i].data, "weight=", 7) == 0) {
+
+ if (!(uscf->flags & NGX_STREAM_UPSTREAM_WEIGHT)) {
+ goto not_supported;
+ }
+
+ weight = ngx_atoi(&value[i].data[7], value[i].len - 7);
+
+ if (weight == NGX_ERROR || weight == 0) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+ if (ngx_strncmp(value[i].data, "max_fails=", 10) == 0) {
+
+ if (!(uscf->flags & NGX_STREAM_UPSTREAM_MAX_FAILS)) {
+ goto not_supported;
+ }
+
+ max_fails = ngx_atoi(&value[i].data[10], value[i].len - 10);
+
+ if (max_fails == NGX_ERROR) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+ if (ngx_strncmp(value[i].data, "fail_timeout=", 13) == 0) {
+
+ if (!(uscf->flags & NGX_STREAM_UPSTREAM_FAIL_TIMEOUT)) {
+ goto not_supported;
+ }
+
+ s.len = value[i].len - 13;
+ s.data = &value[i].data[13];
+
+ fail_timeout = ngx_parse_time(&s, 1);
+
+ if (fail_timeout == (time_t) NGX_ERROR) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+ if (ngx_strcmp(value[i].data, "backup") == 0) {
+
+ if (!(uscf->flags & NGX_STREAM_UPSTREAM_BACKUP)) {
+ goto not_supported;
+ }
+
+ us->backup = 1;
+
+ continue;
+ }
+
+ if (ngx_strcmp(value[i].data, "down") == 0) {
+
+ if (!(uscf->flags & NGX_STREAM_UPSTREAM_DOWN)) {
+ goto not_supported;
+ }
+
+ us->down = 1;
+
+ continue;
+ }
+
+ goto invalid;
+ }
+
+ ngx_memzero(&u, sizeof(ngx_url_t));
+
+ u.url = value[1];
+
+ if (ngx_parse_url(cf->pool, &u) != NGX_OK) {
+ if (u.err) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "%s in upstream \"%V\"", u.err, &u.url);
+ }
+
+ return NGX_CONF_ERROR;
+ }
+
+ if (u.no_port) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "no port in upstream \"%V\"", &u.url);
+ return NGX_CONF_ERROR;
+ }
+
+ us->name = u.url;
+ us->addrs = u.addrs;
+ us->naddrs = u.naddrs;
+ us->weight = weight;
+ us->max_fails = max_fails;
+ us->fail_timeout = fail_timeout;
+
+ return NGX_CONF_OK;
+
+invalid:
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid parameter \"%V\"", &value[i]);
+
+ return NGX_CONF_ERROR;
+
+not_supported:
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "balancing method does not support parameter \"%V\"",
+ &value[i]);
+
+ return NGX_CONF_ERROR;
+}
+
+
+ngx_stream_upstream_srv_conf_t *
+ngx_stream_upstream_add(ngx_conf_t *cf, ngx_url_t *u, ngx_uint_t flags)
+{
+ ngx_uint_t i;
+ ngx_stream_upstream_server_t *us;
+ ngx_stream_upstream_srv_conf_t *uscf, **uscfp;
+ ngx_stream_upstream_main_conf_t *umcf;
+
+ if (!(flags & NGX_STREAM_UPSTREAM_CREATE)) {
+
+ if (ngx_parse_url(cf->pool, u) != NGX_OK) {
+ if (u->err) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "%s in upstream \"%V\"", u->err, &u->url);
+ }
+
+ return NULL;
+ }
+ }
+
+ umcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_upstream_module);
+
+ uscfp = umcf->upstreams.elts;
+
+ for (i = 0; i < umcf->upstreams.nelts; i++) {
+
+ if (uscfp[i]->host.len != u->host.len
+ || ngx_strncasecmp(uscfp[i]->host.data, u->host.data, u->host.len)
+ != 0)
+ {
+ continue;
+ }
+
+ if ((flags & NGX_STREAM_UPSTREAM_CREATE)
+ && (uscfp[i]->flags & NGX_STREAM_UPSTREAM_CREATE))
+ {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "duplicate upstream \"%V\"", &u->host);
+ return NULL;
+ }
+
+ if ((uscfp[i]->flags & NGX_STREAM_UPSTREAM_CREATE) && !u->no_port) {
+ ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
+ "upstream \"%V\" may not have port %d",
+ &u->host, u->port);
+ return NULL;
+ }
+
+ if ((flags & NGX_STREAM_UPSTREAM_CREATE) && !uscfp[i]->no_port) {
+ ngx_log_error(NGX_LOG_WARN, cf->log, 0,
+ "upstream \"%V\" may not have port %d in %s:%ui",
+ &u->host, uscfp[i]->port,
+ uscfp[i]->file_name, uscfp[i]->line);
+ return NULL;
+ }
+
+ if (uscfp[i]->port != u->port) {
+ continue;
+ }
+
+ if (flags & NGX_STREAM_UPSTREAM_CREATE) {
+ uscfp[i]->flags = flags;
+ }
+
+ return uscfp[i];
+ }
+
+ uscf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_srv_conf_t));
+ if (uscf == NULL) {
+ return NULL;
+ }
+
+ uscf->flags = flags;
+ uscf->host = u->host;
+ uscf->file_name = cf->conf_file->file.name.data;
+ uscf->line = cf->conf_file->line;
+ uscf->port = u->port;
+ uscf->no_port = u->no_port;
+
+ if (u->naddrs == 1) {
+ uscf->servers = ngx_array_create(cf->pool, 1,
+ sizeof(ngx_stream_upstream_server_t));
+ if (uscf->servers == NULL) {
+ return NULL;
+ }
+
+ us = ngx_array_push(uscf->servers);
+ if (us == NULL) {
+ return NULL;
+ }
+
+ ngx_memzero(us, sizeof(ngx_stream_upstream_server_t));
+
+ us->addrs = u->addrs;
+ us->naddrs = 1;
+ }
+
+ uscfp = ngx_array_push(&umcf->upstreams);
+ if (uscfp == NULL) {
+ return NULL;
+ }
+
+ *uscfp = uscf;
+
+ return uscf;
+}
+
+
+static void *
+ngx_stream_upstream_create_main_conf(ngx_conf_t *cf)
+{
+ ngx_stream_upstream_main_conf_t *umcf;
+
+ umcf = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_main_conf_t));
+ if (umcf == NULL) {
+ return NULL;
+ }
+
+ if (ngx_array_init(&umcf->upstreams, cf->pool, 4,
+ sizeof(ngx_stream_upstream_srv_conf_t *))
+ != NGX_OK)
+ {
+ return NULL;
+ }
+
+ return umcf;
+}
+
+
+static char *
+ngx_stream_upstream_init_main_conf(ngx_conf_t *cf, void *conf)
+{
+ ngx_stream_upstream_main_conf_t *umcf = conf;
+
+ ngx_uint_t i;
+ ngx_stream_upstream_init_pt init;
+ ngx_stream_upstream_srv_conf_t **uscfp;
+
+ uscfp = umcf->upstreams.elts;
+
+ for (i = 0; i < umcf->upstreams.nelts; i++) {
+
+ init = uscfp[i]->peer.init_upstream
+ ? uscfp[i]->peer.init_upstream
+ : ngx_stream_upstream_init_round_robin;
+
+ if (init(cf, uscfp[i]) != NGX_OK) {
+ return NGX_CONF_ERROR;
+ }
+ }
+
+ return NGX_CONF_OK;
+}
diff --git a/src/stream/ngx_stream_upstream.h b/src/stream/ngx_stream_upstream.h
new file mode 100644
index 000000000..83353edca
--- /dev/null
+++ b/src/stream/ngx_stream_upstream.h
@@ -0,0 +1,103 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#ifndef _NGX_STREAM_UPSTREAM_H_INCLUDED_
+#define _NGX_STREAM_UPSTREAM_H_INCLUDED_
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+#include <ngx_event_connect.h>
+
+
+#define NGX_STREAM_UPSTREAM_CREATE 0x0001
+#define NGX_STREAM_UPSTREAM_WEIGHT 0x0002
+#define NGX_STREAM_UPSTREAM_MAX_FAILS 0x0004
+#define NGX_STREAM_UPSTREAM_FAIL_TIMEOUT 0x0008
+#define NGX_STREAM_UPSTREAM_DOWN 0x0010
+#define NGX_STREAM_UPSTREAM_BACKUP 0x0020
+
+
+typedef struct {
+ ngx_array_t upstreams;
+ /* ngx_stream_upstream_srv_conf_t */
+} ngx_stream_upstream_main_conf_t;
+
+
+typedef struct ngx_stream_upstream_srv_conf_s ngx_stream_upstream_srv_conf_t;
+
+
+typedef ngx_int_t (*ngx_stream_upstream_init_pt)(ngx_conf_t *cf,
+ ngx_stream_upstream_srv_conf_t *us);
+typedef ngx_int_t (*ngx_stream_upstream_init_peer_pt)(ngx_stream_session_t *s,
+ ngx_stream_upstream_srv_conf_t *us);
+
+
+typedef struct {
+ ngx_stream_upstream_init_pt init_upstream;
+ ngx_stream_upstream_init_peer_pt init;
+ void *data;
+} ngx_stream_upstream_peer_t;
+
+
+typedef struct {
+ ngx_str_t name;
+ ngx_addr_t *addrs;
+ ngx_uint_t naddrs;
+ ngx_uint_t weight;
+ ngx_uint_t max_fails;
+ time_t fail_timeout;
+
+ unsigned down:1;
+ unsigned backup:1;
+} ngx_stream_upstream_server_t;
+
+
+struct ngx_stream_upstream_srv_conf_s {
+ ngx_stream_upstream_peer_t peer;
+ void **srv_conf;
+
+ ngx_array_t *servers;
+ /* ngx_stream_upstream_server_t */
+
+ ngx_uint_t flags;
+ ngx_str_t host;
+ u_char *file_name;
+ ngx_uint_t line;
+ in_port_t port;
+ ngx_uint_t no_port; /* unsigned no_port:1 */
+
+#if (NGX_STREAM_UPSTREAM_ZONE)
+ ngx_shm_zone_t *shm_zone;
+#endif
+};
+
+
+typedef struct {
+ ngx_peer_connection_t peer;
+ ngx_buf_t downstream_buf;
+ ngx_buf_t upstream_buf;
+ off_t received;
+#if (NGX_STREAM_SSL)
+ ngx_str_t ssl_name;
+#endif
+} ngx_stream_upstream_t;
+
+
+ngx_stream_upstream_srv_conf_t *ngx_stream_upstream_add(ngx_conf_t *cf,
+ ngx_url_t *u, ngx_uint_t flags);
+
+
+#define ngx_stream_conf_upstream_srv_conf(uscf, module) \
+ uscf->srv_conf[module.ctx_index]
+
+
+extern ngx_module_t ngx_stream_upstream_module;
+
+
+#endif /* _NGX_STREAM_UPSTREAM_H_INCLUDED_ */
diff --git a/src/stream/ngx_stream_upstream_hash_module.c b/src/stream/ngx_stream_upstream_hash_module.c
new file mode 100644
index 000000000..aa68cad97
--- /dev/null
+++ b/src/stream/ngx_stream_upstream_hash_module.c
@@ -0,0 +1,657 @@
+
+/*
+ * Copyright (C) Roman Arutyunyan
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+typedef struct {
+ uint32_t hash;
+ ngx_str_t *server;
+} ngx_stream_upstream_chash_point_t;
+
+
+typedef struct {
+ ngx_uint_t number;
+ ngx_stream_upstream_chash_point_t point[1];
+} ngx_stream_upstream_chash_points_t;
+
+
+typedef struct {
+ ngx_stream_upstream_chash_points_t *points;
+} ngx_stream_upstream_hash_srv_conf_t;
+
+
+typedef struct {
+ /* the round robin data must be first */
+ ngx_stream_upstream_rr_peer_data_t rrp;
+ ngx_stream_upstream_hash_srv_conf_t *conf;
+ ngx_str_t key;
+ ngx_uint_t tries;
+ ngx_uint_t rehash;
+ uint32_t hash;
+ ngx_event_get_peer_pt get_rr_peer;
+} ngx_stream_upstream_hash_peer_data_t;
+
+
+static ngx_int_t ngx_stream_upstream_init_hash(ngx_conf_t *cf,
+ ngx_stream_upstream_srv_conf_t *us);
+static ngx_int_t ngx_stream_upstream_init_hash_peer(ngx_stream_session_t *s,
+ ngx_stream_upstream_srv_conf_t *us);
+static ngx_int_t ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t *pc,
+ void *data);
+
+static ngx_int_t ngx_stream_upstream_init_chash(ngx_conf_t *cf,
+ ngx_stream_upstream_srv_conf_t *us);
+static int ngx_libc_cdecl
+ ngx_stream_upstream_chash_cmp_points(const void *one, const void *two);
+static ngx_uint_t ngx_stream_upstream_find_chash_point(
+ ngx_stream_upstream_chash_points_t *points, uint32_t hash);
+static ngx_int_t ngx_stream_upstream_init_chash_peer(ngx_stream_session_t *s,
+ ngx_stream_upstream_srv_conf_t *us);
+static ngx_int_t ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t *pc,
+ void *data);
+
+static void *ngx_stream_upstream_hash_create_conf(ngx_conf_t *cf);
+static char *ngx_stream_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
+
+
+static ngx_command_t ngx_stream_upstream_hash_commands[] = {
+
+ { ngx_string("hash"),
+ NGX_STREAM_UPS_CONF|NGX_CONF_TAKE12,
+ ngx_stream_upstream_hash,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
+ ngx_null_command
+};
+
+
+static ngx_stream_module_t ngx_stream_upstream_hash_module_ctx = {
+ NULL, /* create main configuration */
+ NULL, /* init main configuration */
+
+ ngx_stream_upstream_hash_create_conf, /* create server configuration */
+ NULL, /* merge server configuration */
+};
+
+
+ngx_module_t ngx_stream_upstream_hash_module = {
+ NGX_MODULE_V1,
+ &ngx_stream_upstream_hash_module_ctx, /* module context */
+ ngx_stream_upstream_hash_commands, /* module directives */
+ NGX_STREAM_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ NULL, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ NULL, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+
+static ngx_int_t
+ngx_stream_upstream_init_hash(ngx_conf_t *cf,
+ ngx_stream_upstream_srv_conf_t *us)
+{
+ if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ us->peer.init = ngx_stream_upstream_init_hash_peer;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_upstream_init_hash_peer(ngx_stream_session_t *s,
+ ngx_stream_upstream_srv_conf_t *us)
+{
+ ngx_stream_upstream_hash_srv_conf_t *hcf;
+ ngx_stream_upstream_hash_peer_data_t *hp;
+
+ hp = ngx_palloc(s->connection->pool,
+ sizeof(ngx_stream_upstream_hash_peer_data_t));
+ if (hp == NULL) {
+ return NGX_ERROR;
+ }
+
+ s->upstream->peer.data = &hp->rrp;
+
+ if (ngx_stream_upstream_init_round_robin_peer(s, us) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ s->upstream->peer.get = ngx_stream_upstream_get_hash_peer;
+
+ hcf = ngx_stream_conf_upstream_srv_conf(us,
+ ngx_stream_upstream_hash_module);
+
+ hp->key = s->connection->addr_text;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "upstream hash key:\"%V\"", &hp->key);
+
+ hp->conf = hcf;
+ hp->tries = 0;
+ hp->rehash = 0;
+ hp->hash = 0;
+ hp->get_rr_peer = ngx_stream_upstream_get_round_robin_peer;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t *pc, void *data)
+{
+ ngx_stream_upstream_hash_peer_data_t *hp = data;
+
+ time_t now;
+ u_char buf[NGX_INT_T_LEN];
+ size_t size;
+ uint32_t hash;
+ ngx_int_t w;
+ uintptr_t m;
+ ngx_uint_t i, n, p;
+ ngx_stream_upstream_rr_peer_t *peer;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "get hash peer, try: %ui", pc->tries);
+
+ ngx_stream_upstream_rr_peers_wlock(hp->rrp.peers);
+
+ if (hp->tries > 20 || hp->rrp.peers->single) {
+ ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
+ return hp->get_rr_peer(pc, &hp->rrp);
+ }
+
+ now = ngx_time();
+
+ pc->connection = NULL;
+
+ for ( ;; ) {
+
+ /*
+ * Hash expression is compatible with Cache::Memcached:
+ * ((crc32([REHASH] KEY) >> 16) & 0x7fff) + PREV_HASH
+ * with REHASH omitted at the first iteration.
+ */
+
+ ngx_crc32_init(hash);
+
+ if (hp->rehash > 0) {
+ size = ngx_sprintf(buf, "%ui", hp->rehash) - buf;
+ ngx_crc32_update(&hash, buf, size);
+ }
+
+ ngx_crc32_update(&hash, hp->key.data, hp->key.len);
+ ngx_crc32_final(hash);
+
+ hash = (hash >> 16) & 0x7fff;
+
+ hp->hash += hash;
+ hp->rehash++;
+
+ if (!hp->rrp.peers->weighted) {
+ p = hp->hash % hp->rrp.peers->number;
+
+ peer = hp->rrp.peers->peer;
+ for (i = 0; i < p; i++) {
+ peer = peer->next;
+ }
+
+ } else {
+ w = hp->hash % hp->rrp.peers->total_weight;
+
+ for (peer = hp->rrp.peers->peer, i = 0;
+ peer;
+ peer = peer->next, i++)
+ {
+ w -= peer->weight;
+ if (w < 0) {
+ break;
+ }
+ }
+
+ p = i;
+ }
+
+ n = p / (8 * sizeof(uintptr_t));
+ m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
+
+ if (hp->rrp.tried[n] & m) {
+ goto next;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "get hash peer, value:%uD, peer:%ui", hp->hash, p);
+
+ if (peer->down) {
+ goto next;
+ }
+
+ if (peer->max_fails
+ && peer->fails >= peer->max_fails
+ && now - peer->checked <= peer->fail_timeout)
+ {
+ goto next;
+ }
+
+ break;
+
+ next:
+
+ if (++hp->tries > 20) {
+ ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
+ return hp->get_rr_peer(pc, &hp->rrp);
+ }
+ }
+
+ hp->rrp.current = peer;
+
+ pc->sockaddr = peer->sockaddr;
+ pc->socklen = peer->socklen;
+ pc->name = &peer->name;
+
+ peer->conns++;
+
+ if (now - peer->checked > peer->fail_timeout) {
+ peer->checked = now;
+ }
+
+ ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
+
+ hp->rrp.tried[n] |= m;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_upstream_init_chash(ngx_conf_t *cf,
+ ngx_stream_upstream_srv_conf_t *us)
+{
+ u_char *host, *port, c;
+ size_t host_len, port_len, size;
+ uint32_t hash, base_hash, prev_hash;
+ ngx_str_t *server;
+ ngx_uint_t npoints, i, j;
+ ngx_stream_upstream_rr_peer_t *peer;
+ ngx_stream_upstream_rr_peers_t *peers;
+ ngx_stream_upstream_chash_points_t *points;
+ ngx_stream_upstream_hash_srv_conf_t *hcf;
+
+ if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ us->peer.init = ngx_stream_upstream_init_chash_peer;
+
+ peers = us->peer.data;
+ npoints = peers->total_weight * 160;
+
+ size = sizeof(ngx_stream_upstream_chash_points_t)
+ + sizeof(ngx_stream_upstream_chash_point_t) * (npoints - 1);
+
+ points = ngx_palloc(cf->pool, size);
+ if (points == NULL) {
+ return NGX_ERROR;
+ }
+
+ points->number = 0;
+
+ for (peer = peers->peer; peer; peer = peer->next) {
+ server = &peer->server;
+
+ /*
+ * Hash expression is compatible with Cache::Memcached::Fast:
+ * crc32(HOST \0 PORT PREV_HASH).
+ */
+
+ if (server->len >= 5
+ && ngx_strncasecmp(server->data, (u_char *) "unix:", 5) == 0)
+ {
+ host = server->data + 5;
+ host_len = server->len - 5;
+ port = NULL;
+ port_len = 0;
+ goto done;
+ }
+
+ for (j = 0; j < server->len; j++) {
+ c = server->data[server->len - j - 1];
+
+ if (c == ':') {
+ host = server->data;
+ host_len = server->len - j - 1;
+ port = server->data + server->len - j;
+ port_len = j;
+ goto done;
+ }
+
+ if (c < '0' || c > '9') {
+ break;
+ }
+ }
+
+ host = server->data;
+ host_len = server->len;
+ port = NULL;
+ port_len = 0;
+
+ done:
+
+ ngx_crc32_init(base_hash);
+ ngx_crc32_update(&base_hash, host, host_len);
+ ngx_crc32_update(&base_hash, (u_char *) "", 1);
+ ngx_crc32_update(&base_hash, port, port_len);
+
+ prev_hash = 0;
+ npoints = peer->weight * 160;
+
+ for (j = 0; j < npoints; j++) {
+ hash = base_hash;
+
+ ngx_crc32_update(&hash, (u_char *) &prev_hash, sizeof(uint32_t));
+ ngx_crc32_final(hash);
+
+ points->point[points->number].hash = hash;
+ points->point[points->number].server = server;
+ points->number++;
+
+ prev_hash = hash;
+ }
+ }
+
+ ngx_qsort(points->point,
+ points->number,
+ sizeof(ngx_stream_upstream_chash_point_t),
+ ngx_stream_upstream_chash_cmp_points);
+
+ for (i = 0, j = 1; j < points->number; j++) {
+ if (points->point[i].hash != points->point[j].hash) {
+ points->point[++i] = points->point[j];
+ }
+ }
+
+ points->number = i + 1;
+
+ hcf = ngx_stream_conf_upstream_srv_conf(us,
+ ngx_stream_upstream_hash_module);
+ hcf->points = points;
+
+ return NGX_OK;
+}
+
+
+static int ngx_libc_cdecl
+ngx_stream_upstream_chash_cmp_points(const void *one, const void *two)
+{
+ ngx_stream_upstream_chash_point_t *first =
+ (ngx_stream_upstream_chash_point_t *) one;
+ ngx_stream_upstream_chash_point_t *second =
+ (ngx_stream_upstream_chash_point_t *) two;
+
+ if (first->hash < second->hash) {
+ return -1;
+
+ } else if (first->hash > second->hash) {
+ return 1;
+
+ } else {
+ return 0;
+ }
+}
+
+
+static ngx_uint_t
+ngx_stream_upstream_find_chash_point(ngx_stream_upstream_chash_points_t *points,
+ uint32_t hash)
+{
+ ngx_uint_t i, j, k;
+ ngx_stream_upstream_chash_point_t *point;
+
+ /* find first point >= hash */
+
+ point = &points->point[0];
+
+ i = 0;
+ j = points->number;
+
+ while (i < j) {
+ k = (i + j) / 2;
+
+ if (hash > point[k].hash) {
+ i = k + 1;
+
+ } else if (hash < point[k].hash) {
+ j = k;
+
+ } else {
+ return k;
+ }
+ }
+
+ return i;
+}
+
+
+static ngx_int_t
+ngx_stream_upstream_init_chash_peer(ngx_stream_session_t *s,
+ ngx_stream_upstream_srv_conf_t *us)
+{
+ uint32_t hash;
+ ngx_stream_upstream_hash_srv_conf_t *hcf;
+ ngx_stream_upstream_hash_peer_data_t *hp;
+
+ if (ngx_stream_upstream_init_hash_peer(s, us) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ s->upstream->peer.get = ngx_stream_upstream_get_chash_peer;
+
+ hp = s->upstream->peer.data;
+ hcf = ngx_stream_conf_upstream_srv_conf(us,
+ ngx_stream_upstream_hash_module);
+
+ hash = ngx_crc32_long(hp->key.data, hp->key.len);
+
+ ngx_stream_upstream_rr_peers_rlock(hp->rrp.peers);
+
+ hp->hash = ngx_stream_upstream_find_chash_point(hcf->points, hash);
+
+ ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t *pc, void *data)
+{
+ ngx_stream_upstream_hash_peer_data_t *hp = data;
+
+ time_t now;
+ intptr_t m;
+ ngx_str_t *server;
+ ngx_int_t total;
+ ngx_uint_t i, n, best_i;
+ ngx_stream_upstream_rr_peer_t *peer, *best;
+ ngx_stream_upstream_chash_point_t *point;
+ ngx_stream_upstream_chash_points_t *points;
+ ngx_stream_upstream_hash_srv_conf_t *hcf;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "get consistent hash peer, try: %ui", pc->tries);
+
+ ngx_stream_upstream_rr_peers_wlock(hp->rrp.peers);
+
+ pc->connection = NULL;
+
+ now = ngx_time();
+ hcf = hp->conf;
+
+ points = hcf->points;
+ point = &points->point[0];
+
+ for ( ;; ) {
+ server = point[hp->hash % points->number].server;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "consistent hash peer:%uD, server:\"%V\"",
+ hp->hash, server);
+
+ best = NULL;
+ best_i = 0;
+ total = 0;
+
+ for (peer = hp->rrp.peers->peer, i = 0;
+ peer;
+ peer = peer->next, i++)
+ {
+
+ n = i / (8 * sizeof(uintptr_t));
+ m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
+
+ if (hp->rrp.tried[n] & m) {
+ continue;
+ }
+
+ if (peer->down) {
+ continue;
+ }
+
+ if (peer->server.len != server->len
+ || ngx_strncmp(peer->server.data, server->data, server->len)
+ != 0)
+ {
+ continue;
+ }
+
+ if (peer->max_fails
+ && peer->fails >= peer->max_fails
+ && now - peer->checked <= peer->fail_timeout)
+ {
+ continue;
+ }
+
+ peer->current_weight += peer->effective_weight;
+ total += peer->effective_weight;
+
+ if (peer->effective_weight < peer->weight) {
+ peer->effective_weight++;
+ }
+
+ if (best == NULL || peer->current_weight > best->current_weight) {
+ best = peer;
+ best_i = i;
+ }
+ }
+
+ if (best) {
+ best->current_weight -= total;
+ break;
+ }
+
+ hp->hash++;
+ hp->tries++;
+
+ if (hp->tries >= points->number) {
+ ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
+ return NGX_BUSY;
+ }
+ }
+
+ hp->rrp.current = best;
+
+ pc->sockaddr = best->sockaddr;
+ pc->socklen = best->socklen;
+ pc->name = &best->name;
+
+ best->conns++;
+
+ if (now - best->checked > best->fail_timeout) {
+ best->checked = now;
+ }
+
+ ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
+
+ n = best_i / (8 * sizeof(uintptr_t));
+ m = (uintptr_t) 1 << best_i % (8 * sizeof(uintptr_t));
+
+ hp->rrp.tried[n] |= m;
+
+ return NGX_OK;
+}
+
+
+static void *
+ngx_stream_upstream_hash_create_conf(ngx_conf_t *cf)
+{
+ ngx_stream_upstream_hash_srv_conf_t *conf;
+
+ conf = ngx_palloc(cf->pool, sizeof(ngx_stream_upstream_hash_srv_conf_t));
+ if (conf == NULL) {
+ return NULL;
+ }
+
+ conf->points = NULL;
+
+ return conf;
+}
+
+
+static char *
+ngx_stream_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_str_t *value;
+ ngx_stream_upstream_srv_conf_t *uscf;
+
+ value = cf->args->elts;
+
+ if (ngx_strcmp(value[1].data, "$remote_addr")) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "unsupported hash key \"%V\", use $remote_addr",
+ &value[1]);
+ return NGX_CONF_ERROR;
+ }
+
+ uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module);
+
+ if (uscf->peer.init_upstream) {
+ ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
+ "load balancing method redefined");
+ }
+
+ uscf->flags = NGX_STREAM_UPSTREAM_CREATE
+ |NGX_STREAM_UPSTREAM_WEIGHT
+ |NGX_STREAM_UPSTREAM_MAX_FAILS
+ |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
+ |NGX_STREAM_UPSTREAM_DOWN;
+
+ if (cf->args->nelts == 2) {
+ uscf->peer.init_upstream = ngx_stream_upstream_init_hash;
+
+ } else if (ngx_strcmp(value[2].data, "consistent") == 0) {
+ uscf->peer.init_upstream = ngx_stream_upstream_init_chash;
+
+ } else {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid parameter \"%V\"", &value[2]);
+ return NGX_CONF_ERROR;
+ }
+
+ return NGX_CONF_OK;
+}
diff --git a/src/stream/ngx_stream_upstream_least_conn_module.c b/src/stream/ngx_stream_upstream_least_conn_module.c
new file mode 100644
index 000000000..eae4b177d
--- /dev/null
+++ b/src/stream/ngx_stream_upstream_least_conn_module.c
@@ -0,0 +1,305 @@
+
+/*
+ * Copyright (C) Maxim Dounin
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+static ngx_int_t ngx_stream_upstream_init_least_conn_peer(
+ ngx_stream_session_t *s, ngx_stream_upstream_srv_conf_t *us);
+static ngx_int_t ngx_stream_upstream_get_least_conn_peer(
+ ngx_peer_connection_t *pc, void *data);
+static char *ngx_stream_upstream_least_conn(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
+
+
+static ngx_command_t ngx_stream_upstream_least_conn_commands[] = {
+
+ { ngx_string("least_conn"),
+ NGX_STREAM_UPS_CONF|NGX_CONF_NOARGS,
+ ngx_stream_upstream_least_conn,
+ 0,
+ 0,
+ NULL },
+
+ ngx_null_command
+};
+
+
+static ngx_stream_module_t ngx_stream_upstream_least_conn_module_ctx = {
+ NULL, /* create main configuration */
+ NULL, /* init main configuration */
+
+ NULL, /* create server configuration */
+ NULL, /* merge server configuration */
+};
+
+
+ngx_module_t ngx_stream_upstream_least_conn_module = {
+ NGX_MODULE_V1,
+ &ngx_stream_upstream_least_conn_module_ctx, /* module context */
+ ngx_stream_upstream_least_conn_commands, /* module directives */
+ NGX_STREAM_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ NULL, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ NULL, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+
+static ngx_int_t
+ngx_stream_upstream_init_least_conn(ngx_conf_t *cf,
+ ngx_stream_upstream_srv_conf_t *us)
+{
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, cf->log, 0,
+ "init least conn");
+
+ if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ us->peer.init = ngx_stream_upstream_init_least_conn_peer;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_upstream_init_least_conn_peer(ngx_stream_session_t *s,
+ ngx_stream_upstream_srv_conf_t *us)
+{
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "init least conn peer");
+
+ if (ngx_stream_upstream_init_round_robin_peer(s, us) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ s->upstream->peer.get = ngx_stream_upstream_get_least_conn_peer;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_upstream_get_least_conn_peer(ngx_peer_connection_t *pc, void *data)
+{
+ ngx_stream_upstream_rr_peer_data_t *rrp = data;
+
+ time_t now;
+ uintptr_t m;
+ ngx_int_t rc, total;
+ ngx_uint_t i, n, p, many;
+ ngx_stream_upstream_rr_peer_t *peer, *best;
+ ngx_stream_upstream_rr_peers_t *peers;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "get least conn peer, try: %ui", pc->tries);
+
+ if (rrp->peers->single) {
+ return ngx_stream_upstream_get_round_robin_peer(pc, rrp);
+ }
+
+ pc->connection = NULL;
+
+ now = ngx_time();
+
+ peers = rrp->peers;
+
+ ngx_stream_upstream_rr_peers_wlock(peers);
+
+ best = NULL;
+ total = 0;
+
+#if (NGX_SUPPRESS_WARN)
+ many = 0;
+ p = 0;
+#endif
+
+ for (peer = peers->peer, i = 0;
+ peer;
+ peer = peer->next, i++)
+ {
+
+ n = i / (8 * sizeof(uintptr_t));
+ m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
+
+ if (rrp->tried[n] & m) {
+ continue;
+ }
+
+ if (peer->down) {
+ continue;
+ }
+
+ if (peer->max_fails
+ && peer->fails >= peer->max_fails
+ && now - peer->checked <= peer->fail_timeout)
+ {
+ continue;
+ }
+
+ /*
+ * select peer with least number of connections; if there are
+ * multiple peers with the same number of connections, select
+ * based on round-robin
+ */
+
+ if (best == NULL
+ || peer->conns * best->weight < best->conns * peer->weight)
+ {
+ best = peer;
+ many = 0;
+ p = i;
+
+ } else if (peer->conns * best->weight == best->conns * peer->weight) {
+ many = 1;
+ }
+ }
+
+ if (best == NULL) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "get least conn peer, no peer found");
+
+ goto failed;
+ }
+
+ if (many) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "get least conn peer, many");
+
+ for (peer = best, i = p;
+ peer;
+ peer = peer->next, i++)
+ {
+ n = i / (8 * sizeof(uintptr_t));
+ m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
+
+ if (rrp->tried[n] & m) {
+ continue;
+ }
+
+ if (peer->down) {
+ continue;
+ }
+
+ if (peer->conns * best->weight != best->conns * peer->weight) {
+ continue;
+ }
+
+ if (peer->max_fails
+ && peer->fails >= peer->max_fails
+ && now - peer->checked <= peer->fail_timeout)
+ {
+ continue;
+ }
+
+ peer->current_weight += peer->effective_weight;
+ total += peer->effective_weight;
+
+ if (peer->effective_weight < peer->weight) {
+ peer->effective_weight++;
+ }
+
+ if (peer->current_weight > best->current_weight) {
+ best = peer;
+ p = i;
+ }
+ }
+ }
+
+ best->current_weight -= total;
+
+ if (now - best->checked > best->fail_timeout) {
+ best->checked = now;
+ }
+
+ pc->sockaddr = best->sockaddr;
+ pc->socklen = best->socklen;
+ pc->name = &best->name;
+
+ best->conns++;
+
+ rrp->current = best;
+
+ n = p / (8 * sizeof(uintptr_t));
+ m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
+
+ rrp->tried[n] |= m;
+
+ ngx_stream_upstream_rr_peers_unlock(peers);
+
+ return NGX_OK;
+
+failed:
+
+ if (peers->next) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "get least conn peer, backup servers");
+
+ rrp->peers = peers->next;
+
+ n = (rrp->peers->number + (8 * sizeof(uintptr_t) - 1))
+ / (8 * sizeof(uintptr_t));
+
+ for (i = 0; i < n; i++) {
+ rrp->tried[i] = 0;
+ }
+
+ ngx_stream_upstream_rr_peers_unlock(peers);
+
+ rc = ngx_stream_upstream_get_least_conn_peer(pc, rrp);
+
+ if (rc != NGX_BUSY) {
+ return rc;
+ }
+
+ ngx_stream_upstream_rr_peers_wlock(peers);
+ }
+
+ /* all peers failed, mark them as live for quick recovery */
+
+ for (peer = peers->peer; peer; peer = peer->next) {
+ peer->fails = 0;
+ }
+
+ ngx_stream_upstream_rr_peers_unlock(peers);
+
+ pc->name = peers->name;
+
+ return NGX_BUSY;
+}
+
+
+static char *
+ngx_stream_upstream_least_conn(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_stream_upstream_srv_conf_t *uscf;
+
+ uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module);
+
+ if (uscf->peer.init_upstream) {
+ ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
+ "load balancing method redefined");
+ }
+
+ uscf->peer.init_upstream = ngx_stream_upstream_init_least_conn;
+
+ uscf->flags = NGX_STREAM_UPSTREAM_CREATE
+ |NGX_STREAM_UPSTREAM_WEIGHT
+ |NGX_STREAM_UPSTREAM_MAX_FAILS
+ |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
+ |NGX_STREAM_UPSTREAM_DOWN
+ |NGX_STREAM_UPSTREAM_BACKUP;
+
+ return NGX_CONF_OK;
+}
diff --git a/src/stream/ngx_stream_upstream_round_robin.c b/src/stream/ngx_stream_upstream_round_robin.c
new file mode 100644
index 000000000..c9157cd99
--- /dev/null
+++ b/src/stream/ngx_stream_upstream_round_robin.c
@@ -0,0 +1,697 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+#define ngx_stream_upstream_tries(p) ((p)->number \
+ + ((p)->next ? (p)->next->number : 0))
+
+
+static ngx_stream_upstream_rr_peer_t *ngx_stream_upstream_get_peer(
+ ngx_stream_upstream_rr_peer_data_t *rrp);
+
+#if (NGX_STREAM_SSL)
+
+static ngx_int_t ngx_stream_upstream_set_round_robin_peer_session(
+ ngx_peer_connection_t *pc, void *data);
+static void ngx_stream_upstream_save_round_robin_peer_session(
+ ngx_peer_connection_t *pc, void *data);
+
+#endif
+
+
+ngx_int_t
+ngx_stream_upstream_init_round_robin(ngx_conf_t *cf,
+ ngx_stream_upstream_srv_conf_t *us)
+{
+ ngx_url_t u;
+ ngx_uint_t i, j, n, w;
+ ngx_stream_upstream_server_t *server;
+ ngx_stream_upstream_rr_peer_t *peer, **peerp;
+ ngx_stream_upstream_rr_peers_t *peers, *backup;
+
+ us->peer.init = ngx_stream_upstream_init_round_robin_peer;
+
+ if (us->servers) {
+ server = us->servers->elts;
+
+ n = 0;
+ w = 0;
+
+ for (i = 0; i < us->servers->nelts; i++) {
+ if (server[i].backup) {
+ continue;
+ }
+
+ n += server[i].naddrs;
+ w += server[i].naddrs * server[i].weight;
+ }
+
+ if (n == 0) {
+ ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
+ "no servers in upstream \"%V\" in %s:%ui",
+ &us->host, us->file_name, us->line);
+ return NGX_ERROR;
+ }
+
+ peers = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peers_t));
+ if (peers == NULL) {
+ return NGX_ERROR;
+ }
+
+ peer = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peer_t) * n);
+ if (peer == NULL) {
+ return NGX_ERROR;
+ }
+
+ peers->single = (n == 1);
+ peers->number = n;
+ peers->weighted = (w != n);
+ peers->total_weight = w;
+ peers->name = &us->host;
+
+ n = 0;
+ peerp = &peers->peer;
+
+ for (i = 0; i < us->servers->nelts; i++) {
+ if (server[i].backup) {
+ continue;
+ }
+
+ for (j = 0; j < server[i].naddrs; j++) {
+ peer[n].sockaddr = server[i].addrs[j].sockaddr;
+ peer[n].socklen = server[i].addrs[j].socklen;
+ peer[n].name = server[i].addrs[j].name;
+ peer[n].weight = server[i].weight;
+ peer[n].effective_weight = server[i].weight;
+ peer[n].current_weight = 0;
+ peer[n].max_fails = server[i].max_fails;
+ peer[n].fail_timeout = server[i].fail_timeout;
+ peer[n].down = server[i].down;
+ peer[n].server = server[i].name;
+
+ *peerp = &peer[n];
+ peerp = &peer[n].next;
+ n++;
+ }
+ }
+
+ us->peer.data = peers;
+
+ /* backup servers */
+
+ n = 0;
+ w = 0;
+
+ for (i = 0; i < us->servers->nelts; i++) {
+ if (!server[i].backup) {
+ continue;
+ }
+
+ n += server[i].naddrs;
+ w += server[i].naddrs * server[i].weight;
+ }
+
+ if (n == 0) {
+ return NGX_OK;
+ }
+
+ backup = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peers_t));
+ if (backup == NULL) {
+ return NGX_ERROR;
+ }
+
+ peer = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peer_t) * n);
+ if (peer == NULL) {
+ return NGX_ERROR;
+ }
+
+ peers->single = 0;
+ backup->single = 0;
+ backup->number = n;
+ backup->weighted = (w != n);
+ backup->total_weight = w;
+ backup->name = &us->host;
+
+ n = 0;
+ peerp = &backup->peer;
+
+ for (i = 0; i < us->servers->nelts; i++) {
+ if (!server[i].backup) {
+ continue;
+ }
+
+ for (j = 0; j < server[i].naddrs; j++) {
+ peer[n].sockaddr = server[i].addrs[j].sockaddr;
+ peer[n].socklen = server[i].addrs[j].socklen;
+ peer[n].name = server[i].addrs[j].name;
+ peer[n].weight = server[i].weight;
+ peer[n].effective_weight = server[i].weight;
+ peer[n].current_weight = 0;
+ peer[n].max_fails = server[i].max_fails;
+ peer[n].fail_timeout = server[i].fail_timeout;
+ peer[n].down = server[i].down;
+ peer[n].server = server[i].name;
+
+ *peerp = &peer[n];
+ peerp = &peer[n].next;
+ n++;
+ }
+ }
+
+ peers->next = backup;
+
+ return NGX_OK;
+ }
+
+
+ /* an upstream implicitly defined by proxy_pass, etc. */
+
+ if (us->port == 0) {
+ ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
+ "no port in upstream \"%V\" in %s:%ui",
+ &us->host, us->file_name, us->line);
+ return NGX_ERROR;
+ }
+
+ ngx_memzero(&u, sizeof(ngx_url_t));
+
+ u.host = us->host;
+ u.port = us->port;
+
+ if (ngx_inet_resolve_host(cf->pool, &u) != NGX_OK) {
+ if (u.err) {
+ ngx_log_error(NGX_LOG_EMERG, cf->log, 0,
+ "%s in upstream \"%V\" in %s:%ui",
+ u.err, &us->host, us->file_name, us->line);
+ }
+
+ return NGX_ERROR;
+ }
+
+ n = u.naddrs;
+
+ peers = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peers_t));
+ if (peers == NULL) {
+ return NGX_ERROR;
+ }
+
+ peer = ngx_pcalloc(cf->pool, sizeof(ngx_stream_upstream_rr_peer_t) * n);
+ if (peer == NULL) {
+ return NGX_ERROR;
+ }
+
+ peers->single = (n == 1);
+ peers->number = n;
+ peers->weighted = 0;
+ peers->total_weight = n;
+ peers->name = &us->host;
+
+ peerp = &peers->peer;
+
+ for (i = 0; i < u.naddrs; i++) {
+ peer[i].sockaddr = u.addrs[i].sockaddr;
+ peer[i].socklen = u.addrs[i].socklen;
+ peer[i].name = u.addrs[i].name;
+ peer[i].weight = 1;
+ peer[i].effective_weight = 1;
+ peer[i].current_weight = 0;
+ peer[i].max_fails = 1;
+ peer[i].fail_timeout = 10;
+ *peerp = &peer[i];
+ peerp = &peer[i].next;
+ }
+
+ us->peer.data = peers;
+
+ /* implicitly defined upstream has no backup servers */
+
+ return NGX_OK;
+}
+
+
+ngx_int_t
+ngx_stream_upstream_init_round_robin_peer(ngx_stream_session_t *s,
+ ngx_stream_upstream_srv_conf_t *us)
+{
+ ngx_uint_t n;
+ ngx_stream_upstream_rr_peer_data_t *rrp;
+
+ rrp = s->upstream->peer.data;
+
+ if (rrp == NULL) {
+ rrp = ngx_palloc(s->connection->pool,
+ sizeof(ngx_stream_upstream_rr_peer_data_t));
+ if (rrp == NULL) {
+ return NGX_ERROR;
+ }
+
+ s->upstream->peer.data = rrp;
+ }
+
+ rrp->peers = us->peer.data;
+ rrp->current = NULL;
+
+ n = rrp->peers->number;
+
+ if (rrp->peers->next && rrp->peers->next->number > n) {
+ n = rrp->peers->next->number;
+ }
+
+ if (n <= 8 * sizeof(uintptr_t)) {
+ rrp->tried = &rrp->data;
+ rrp->data = 0;
+
+ } else {
+ n = (n + (8 * sizeof(uintptr_t) - 1)) / (8 * sizeof(uintptr_t));
+
+ rrp->tried = ngx_pcalloc(s->connection->pool, n * sizeof(uintptr_t));
+ if (rrp->tried == NULL) {
+ return NGX_ERROR;
+ }
+ }
+
+ s->upstream->peer.get = ngx_stream_upstream_get_round_robin_peer;
+ s->upstream->peer.free = ngx_stream_upstream_free_round_robin_peer;
+ s->upstream->peer.tries = ngx_stream_upstream_tries(rrp->peers);
+#if (NGX_STREAM_SSL)
+ s->upstream->peer.set_session =
+ ngx_stream_upstream_set_round_robin_peer_session;
+ s->upstream->peer.save_session =
+ ngx_stream_upstream_save_round_robin_peer_session;
+#endif
+
+ return NGX_OK;
+}
+
+
+ngx_int_t
+ngx_stream_upstream_get_round_robin_peer(ngx_peer_connection_t *pc, void *data)
+{
+ ngx_stream_upstream_rr_peer_data_t *rrp = data;
+
+ ngx_int_t rc;
+ ngx_uint_t i, n;
+ ngx_stream_upstream_rr_peer_t *peer;
+ ngx_stream_upstream_rr_peers_t *peers;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "get rr peer, try: %ui", pc->tries);
+
+ pc->connection = NULL;
+
+ peers = rrp->peers;
+ ngx_stream_upstream_rr_peers_wlock(peers);
+
+ if (peers->single) {
+ peer = peers->peer;
+
+ if (peer->down) {
+ goto failed;
+ }
+
+ rrp->current = peer;
+
+ } else {
+
+ /* there are several peers */
+
+ peer = ngx_stream_upstream_get_peer(rrp);
+
+ if (peer == NULL) {
+ goto failed;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "get rr peer, current: %p %i",
+ peer, peer->current_weight);
+ }
+
+ pc->sockaddr = peer->sockaddr;
+ pc->socklen = peer->socklen;
+ pc->name = &peer->name;
+
+ peer->conns++;
+
+ ngx_stream_upstream_rr_peers_unlock(peers);
+
+ return NGX_OK;
+
+failed:
+
+ if (peers->next) {
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "backup servers");
+
+ rrp->peers = peers->next;
+
+ n = (rrp->peers->number + (8 * sizeof(uintptr_t) - 1))
+ / (8 * sizeof(uintptr_t));
+
+ for (i = 0; i < n; i++) {
+ rrp->tried[i] = 0;
+ }
+
+ ngx_stream_upstream_rr_peers_unlock(peers);
+
+ rc = ngx_stream_upstream_get_round_robin_peer(pc, rrp);
+
+ if (rc != NGX_BUSY) {
+ return rc;
+ }
+
+ ngx_stream_upstream_rr_peers_wlock(peers);
+ }
+
+ /* all peers failed, mark them as live for quick recovery */
+
+ for (peer = peers->peer; peer; peer = peer->next) {
+ peer->fails = 0;
+ }
+
+ ngx_stream_upstream_rr_peers_unlock(peers);
+
+ pc->name = peers->name;
+
+ return NGX_BUSY;
+}
+
+
+static ngx_stream_upstream_rr_peer_t *
+ngx_stream_upstream_get_peer(ngx_stream_upstream_rr_peer_data_t *rrp)
+{
+ time_t now;
+ uintptr_t m;
+ ngx_int_t total;
+ ngx_uint_t i, n, p;
+ ngx_stream_upstream_rr_peer_t *peer, *best;
+
+ now = ngx_time();
+
+ best = NULL;
+ total = 0;
+
+#if (NGX_SUPPRESS_WARN)
+ p = 0;
+#endif
+
+ for (peer = rrp->peers->peer, i = 0;
+ peer;
+ peer = peer->next, i++)
+ {
+
+ n = i / (8 * sizeof(uintptr_t));
+ m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
+
+ if (rrp->tried[n] & m) {
+ continue;
+ }
+
+ if (peer->down) {
+ continue;
+ }
+
+ if (peer->max_fails
+ && peer->fails >= peer->max_fails
+ && now - peer->checked <= peer->fail_timeout)
+ {
+ continue;
+ }
+
+ peer->current_weight += peer->effective_weight;
+ total += peer->effective_weight;
+
+ if (peer->effective_weight < peer->weight) {
+ peer->effective_weight++;
+ }
+
+ if (best == NULL || peer->current_weight > best->current_weight) {
+ best = peer;
+ p = i;
+ }
+ }
+
+ if (best == NULL) {
+ return NULL;
+ }
+
+ rrp->current = best;
+
+ n = p / (8 * sizeof(uintptr_t));
+ m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
+
+ rrp->tried[n] |= m;
+
+ best->current_weight -= total;
+
+ if (now - best->checked > best->fail_timeout) {
+ best->checked = now;
+ }
+
+ return best;
+}
+
+
+void
+ngx_stream_upstream_free_round_robin_peer(ngx_peer_connection_t *pc, void *data,
+ ngx_uint_t state)
+{
+ ngx_stream_upstream_rr_peer_data_t *rrp = data;
+
+ time_t now;
+ ngx_stream_upstream_rr_peer_t *peer;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "free rr peer %ui %ui", pc->tries, state);
+
+ peer = rrp->current;
+
+ ngx_stream_upstream_rr_peers_rlock(rrp->peers);
+ ngx_stream_upstream_rr_peer_lock(rrp->peers, peer);
+
+ if (rrp->peers->single) {
+ peer->conns--;
+
+ ngx_stream_upstream_rr_peer_unlock(rrp->peers, peer);
+ ngx_stream_upstream_rr_peers_unlock(rrp->peers);
+
+ pc->tries = 0;
+ return;
+ }
+
+ if (state & NGX_PEER_FAILED) {
+ now = ngx_time();
+
+ peer->fails++;
+ peer->accessed = now;
+ peer->checked = now;
+
+ if (peer->max_fails) {
+ peer->effective_weight -= peer->weight / peer->max_fails;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "free rr peer failed: %p %i",
+ peer, peer->effective_weight);
+
+ if (peer->effective_weight < 0) {
+ peer->effective_weight = 0;
+ }
+
+ } else {
+
+ /* mark peer live if check passed */
+
+ if (peer->accessed < peer->checked) {
+ peer->fails = 0;
+ }
+ }
+
+ peer->conns--;
+
+ ngx_stream_upstream_rr_peer_unlock(rrp->peers, peer);
+ ngx_stream_upstream_rr_peers_unlock(rrp->peers);
+
+ if (pc->tries) {
+ pc->tries--;
+ }
+}
+
+
+#if (NGX_STREAM_SSL)
+
+static ngx_int_t
+ngx_stream_upstream_set_round_robin_peer_session(ngx_peer_connection_t *pc,
+ void *data)
+{
+ ngx_stream_upstream_rr_peer_data_t *rrp = data;
+
+ ngx_int_t rc;
+ ngx_ssl_session_t *ssl_session;
+ ngx_stream_upstream_rr_peer_t *peer;
+#if (NGX_STREAM_UPSTREAM_ZONE)
+ int len;
+#if OPENSSL_VERSION_NUMBER >= 0x0090707fL
+ const
+#endif
+ u_char *p;
+ ngx_stream_upstream_rr_peers_t *peers;
+ u_char buf[NGX_SSL_MAX_SESSION_SIZE];
+#endif
+
+ peer = rrp->current;
+
+#if (NGX_STREAM_UPSTREAM_ZONE)
+ peers = rrp->peers;
+
+ if (peers->shpool) {
+ ngx_stream_upstream_rr_peers_rlock(peers);
+ ngx_stream_upstream_rr_peer_lock(peers, peer);
+
+ if (peer->ssl_session == NULL) {
+ ngx_stream_upstream_rr_peer_unlock(peers, peer);
+ ngx_stream_upstream_rr_peers_unlock(peers);
+ return NGX_OK;
+ }
+
+ len = peer->ssl_session_len;
+
+ ngx_memcpy(buf, peer->ssl_session, len);
+
+ ngx_stream_upstream_rr_peer_unlock(peers, peer);
+ ngx_stream_upstream_rr_peers_unlock(peers);
+
+ p = buf;
+ ssl_session = d2i_SSL_SESSION(NULL, &p, len);
+
+ rc = ngx_ssl_set_session(pc->connection, ssl_session);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "set session: %p", ssl_session);
+
+ ngx_ssl_free_session(ssl_session);
+
+ return rc;
+ }
+#endif
+
+ ssl_session = peer->ssl_session;
+
+ rc = ngx_ssl_set_session(pc->connection, ssl_session);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "set session: %p", ssl_session);
+
+ return rc;
+}
+
+
+static void
+ngx_stream_upstream_save_round_robin_peer_session(ngx_peer_connection_t *pc,
+ void *data)
+{
+ ngx_stream_upstream_rr_peer_data_t *rrp = data;
+
+ ngx_ssl_session_t *old_ssl_session, *ssl_session;
+ ngx_stream_upstream_rr_peer_t *peer;
+#if (NGX_STREAM_UPSTREAM_ZONE)
+ int len;
+ u_char *p;
+ ngx_stream_upstream_rr_peers_t *peers;
+ u_char buf[NGX_SSL_MAX_SESSION_SIZE];
+#endif
+
+#if (NGX_STREAM_UPSTREAM_ZONE)
+ peers = rrp->peers;
+
+ if (peers->shpool) {
+
+ ssl_session = SSL_get0_session(pc->connection->ssl->connection);
+
+ if (ssl_session == NULL) {
+ return;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "save session: %p", ssl_session);
+
+ len = i2d_SSL_SESSION(ssl_session, NULL);
+
+ /* do not cache too big session */
+
+ if (len > NGX_SSL_MAX_SESSION_SIZE) {
+ return;
+ }
+
+ p = buf;
+ (void) i2d_SSL_SESSION(ssl_session, &p);
+
+ peer = rrp->current;
+
+ ngx_stream_upstream_rr_peers_rlock(peers);
+ ngx_stream_upstream_rr_peer_lock(peers, peer);
+
+ if (len > peer->ssl_session_len) {
+ ngx_shmtx_lock(&peers->shpool->mutex);
+
+ if (peer->ssl_session) {
+ ngx_slab_free_locked(peers->shpool, peer->ssl_session);
+ }
+
+ peer->ssl_session = ngx_slab_alloc_locked(peers->shpool, len);
+
+ ngx_shmtx_unlock(&peers->shpool->mutex);
+
+ if (peer->ssl_session == NULL) {
+ peer->ssl_session_len = 0;
+
+ ngx_stream_upstream_rr_peer_unlock(peers, peer);
+ ngx_stream_upstream_rr_peers_unlock(peers);
+ return;
+ }
+
+ peer->ssl_session_len = len;
+ }
+
+ ngx_memcpy(peer->ssl_session, buf, len);
+
+ ngx_stream_upstream_rr_peer_unlock(peers, peer);
+ ngx_stream_upstream_rr_peers_unlock(peers);
+
+ return;
+ }
+#endif
+
+ ssl_session = ngx_ssl_get_session(pc->connection);
+
+ if (ssl_session == NULL) {
+ return;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "save session: %p", ssl_session);
+
+ peer = rrp->current;
+
+ old_ssl_session = peer->ssl_session;
+ peer->ssl_session = ssl_session;
+
+ if (old_ssl_session) {
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "old session: %p", old_ssl_session);
+
+ /* TODO: may block */
+
+ ngx_ssl_free_session(old_ssl_session);
+ }
+}
+
+#endif
diff --git a/src/stream/ngx_stream_upstream_round_robin.h b/src/stream/ngx_stream_upstream_round_robin.h
new file mode 100644
index 000000000..83fd8b5a8
--- /dev/null
+++ b/src/stream/ngx_stream_upstream_round_robin.h
@@ -0,0 +1,138 @@
+
+/*
+ * Copyright (C) Igor Sysoev
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#ifndef _NGX_STREAM_UPSTREAM_ROUND_ROBIN_H_INCLUDED_
+#define _NGX_STREAM_UPSTREAM_ROUND_ROBIN_H_INCLUDED_
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+typedef struct ngx_stream_upstream_rr_peer_s ngx_stream_upstream_rr_peer_t;
+
+struct ngx_stream_upstream_rr_peer_s {
+ struct sockaddr *sockaddr;
+ socklen_t socklen;
+ ngx_str_t name;
+ ngx_str_t server;
+
+ ngx_int_t current_weight;
+ ngx_int_t effective_weight;
+ ngx_int_t weight;
+
+ ngx_uint_t conns;
+
+ ngx_uint_t fails;
+ time_t accessed;
+ time_t checked;
+
+ ngx_uint_t max_fails;
+ time_t fail_timeout;
+
+ ngx_uint_t down; /* unsigned down:1; */
+
+#if (NGX_STREAM_SSL)
+ void *ssl_session;
+ int ssl_session_len;
+#endif
+
+ ngx_stream_upstream_rr_peer_t *next;
+
+#if (NGX_STREAM_UPSTREAM_ZONE)
+ ngx_atomic_t lock;
+#endif
+};
+
+
+typedef struct ngx_stream_upstream_rr_peers_s ngx_stream_upstream_rr_peers_t;
+
+struct ngx_stream_upstream_rr_peers_s {
+ ngx_uint_t number;
+
+#if (NGX_STREAM_UPSTREAM_ZONE)
+ ngx_slab_pool_t *shpool;
+ ngx_atomic_t rwlock;
+#endif
+
+ ngx_uint_t total_weight;
+
+ unsigned single:1;
+ unsigned weighted:1;
+
+ ngx_str_t *name;
+
+ ngx_stream_upstream_rr_peers_t *next;
+
+ ngx_stream_upstream_rr_peer_t *peer;
+};
+
+
+#if (NGX_STREAM_UPSTREAM_ZONE)
+
+#define ngx_stream_upstream_rr_peers_rlock(peers) \
+ \
+ if (peers->shpool) { \
+ ngx_rwlock_rlock(&peers->rwlock); \
+ }
+
+#define ngx_stream_upstream_rr_peers_wlock(peers) \
+ \
+ if (peers->shpool) { \
+ ngx_rwlock_wlock(&peers->rwlock); \
+ }
+
+#define ngx_stream_upstream_rr_peers_unlock(peers) \
+ \
+ if (peers->shpool) { \
+ ngx_rwlock_unlock(&peers->rwlock); \
+ }
+
+
+#define ngx_stream_upstream_rr_peer_lock(peers, peer) \
+ \
+ if (peers->shpool) { \
+ ngx_rwlock_wlock(&peer->lock); \
+ }
+
+#define ngx_stream_upstream_rr_peer_unlock(peers, peer) \
+ \
+ if (peers->shpool) { \
+ ngx_rwlock_unlock(&peer->lock); \
+ }
+
+#else
+
+#define ngx_stream_upstream_rr_peers_rlock(peers)
+#define ngx_stream_upstream_rr_peers_wlock(peers)
+#define ngx_stream_upstream_rr_peers_unlock(peers)
+#define ngx_stream_upstream_rr_peer_lock(peers, peer)
+#define ngx_stream_upstream_rr_peer_unlock(peers, peer)
+
+#endif
+
+
+typedef struct {
+ ngx_stream_upstream_rr_peers_t *peers;
+ ngx_stream_upstream_rr_peer_t *current;
+ uintptr_t *tried;
+ uintptr_t data;
+} ngx_stream_upstream_rr_peer_data_t;
+
+
+ngx_int_t ngx_stream_upstream_init_round_robin(ngx_conf_t *cf,
+ ngx_stream_upstream_srv_conf_t *us);
+ngx_int_t ngx_stream_upstream_init_round_robin_peer(ngx_stream_session_t *s,
+ ngx_stream_upstream_srv_conf_t *us);
+ngx_int_t ngx_stream_upstream_get_round_robin_peer(ngx_peer_connection_t *pc,
+ void *data);
+void ngx_stream_upstream_free_round_robin_peer(ngx_peer_connection_t *pc,
+ void *data, ngx_uint_t state);
+
+
+#endif /* _NGX_STREAM_UPSTREAM_ROUND_ROBIN_H_INCLUDED_ */
diff --git a/src/stream/ngx_stream_upstream_zone_module.c b/src/stream/ngx_stream_upstream_zone_module.c
new file mode 100644
index 000000000..3e6f34610
--- /dev/null
+++ b/src/stream/ngx_stream_upstream_zone_module.c
@@ -0,0 +1,207 @@
+
+/*
+ * Copyright (C) Ruslan Ermilov
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_stream.h>
+
+
+static char *ngx_stream_upstream_zone(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
+static ngx_int_t ngx_stream_upstream_init_zone(ngx_shm_zone_t *shm_zone,
+ void *data);
+
+
+static ngx_command_t ngx_stream_upstream_zone_commands[] = {
+
+ { ngx_string("zone"),
+ NGX_STREAM_UPS_CONF|NGX_CONF_TAKE2,
+ ngx_stream_upstream_zone,
+ 0,
+ 0,
+ NULL },
+
+ ngx_null_command
+};
+
+
+static ngx_stream_module_t ngx_stream_upstream_zone_module_ctx = {
+ NULL, /* create main configuration */
+ NULL, /* init main configuration */
+
+ NULL, /* create server configuration */
+ NULL, /* merge server configuration */
+};
+
+
+ngx_module_t ngx_stream_upstream_zone_module = {
+ NGX_MODULE_V1,
+ &ngx_stream_upstream_zone_module_ctx, /* module context */
+ ngx_stream_upstream_zone_commands, /* module directives */
+ NGX_STREAM_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ NULL, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ NULL, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+
+static char *
+ngx_stream_upstream_zone(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ssize_t size;
+ ngx_str_t *value;
+ ngx_stream_upstream_srv_conf_t *uscf;
+
+ uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module);
+
+ value = cf->args->elts;
+
+ if (!value[1].len) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid zone name \"%V\"", &value[1]);
+ return NGX_CONF_ERROR;
+ }
+
+ size = ngx_parse_size(&value[2]);
+
+ if (size == NGX_ERROR) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid zone size \"%V\"", &value[2]);
+ return NGX_CONF_ERROR;
+ }
+
+ if (size < (ssize_t) (8 * ngx_pagesize)) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "zone \"%V\" is too small", &value[1]);
+ return NGX_CONF_ERROR;
+ }
+
+ uscf->shm_zone = ngx_shared_memory_add(cf, &value[1], size,
+ &ngx_stream_upstream_module);
+ if (uscf->shm_zone == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ if (uscf->shm_zone->data) {
+ uscf = uscf->shm_zone->data;
+
+ ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
+ "upstream \"%V\" in %s:%ui "
+ "is already bound to zone \"%V\"",
+ &uscf->host, uscf->file_name, uscf->line,
+ &value[1]);
+ return NGX_CONF_ERROR;
+ }
+
+ uscf->shm_zone->init = ngx_stream_upstream_init_zone;
+ uscf->shm_zone->data = uscf;
+
+ uscf->shm_zone->noreuse = 1;
+
+ return NGX_CONF_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_upstream_init_zone(ngx_shm_zone_t *shm_zone, void *data)
+{
+ ngx_stream_upstream_srv_conf_t *ouscf = data;
+
+ size_t len;
+ ngx_slab_pool_t *shpool;
+ ngx_stream_upstream_rr_peer_t *peer, **peerp;
+ ngx_stream_upstream_rr_peers_t *peers, *backup;
+ ngx_stream_upstream_srv_conf_t *uscf;
+
+ uscf = shm_zone->data;
+
+ if (ouscf) {
+ ngx_log_error(NGX_LOG_EMERG, shm_zone->shm.log, 0,
+ "zone \"%V\" cannot be reused", &shm_zone->shm.name);
+ return NGX_ERROR;
+ }
+
+ shpool = (ngx_slab_pool_t *) shm_zone->shm.addr;
+
+ if (shm_zone->shm.exists) {
+ return NGX_ERROR;
+ }
+
+
+ /* copy peers to shared memory */
+
+ len = sizeof(" in upstream zone \"\"") + shm_zone->shm.name.len;
+
+ shpool->log_ctx = ngx_slab_alloc(shpool, len);
+ if (shpool->log_ctx == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_sprintf(shpool->log_ctx, " in upstream zone \"%V\"%Z",
+ &shm_zone->shm.name);
+
+ peers = ngx_slab_alloc(shpool, sizeof(ngx_stream_upstream_rr_peers_t));
+ if (peers == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_memcpy(peers, uscf->peer.data, sizeof(ngx_stream_upstream_rr_peers_t));
+
+ peers->shpool = shpool;
+
+ for (peerp = &peers->peer; *peerp; peerp = &peer->next) {
+ /* pool is unlocked */
+ peer = ngx_slab_calloc_locked(shpool,
+ sizeof(ngx_stream_upstream_rr_peer_t));
+ if (peer == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_memcpy(peer, *peerp, sizeof(ngx_stream_upstream_rr_peer_t));
+
+ *peerp = peer;
+ }
+
+ if (peers->next == NULL) {
+ goto done;
+ }
+
+ backup = ngx_slab_alloc(shpool, sizeof(ngx_stream_upstream_rr_peers_t));
+ if (backup == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_memcpy(backup, peers->next, sizeof(ngx_stream_upstream_rr_peers_t));
+
+ backup->shpool = shpool;
+
+ for (peerp = &backup->peer; *peerp; peerp = &peer->next) {
+ /* pool is unlocked */
+ peer = ngx_slab_calloc_locked(shpool,
+ sizeof(ngx_stream_upstream_rr_peer_t));
+ if (peer == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_memcpy(peer, *peerp, sizeof(ngx_stream_upstream_rr_peer_t));
+
+ *peerp = peer;
+ }
+
+ peers->next = backup;
+
+done:
+
+ uscf->peer.data = peers;
+
+ return NGX_OK;
+}