aboutsummaryrefslogtreecommitdiff
path: root/src/stream/ngx_stream_proxy_module.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/ngx_stream_proxy_module.c')
-rw-r--r--src/stream/ngx_stream_proxy_module.c140
1 files changed, 114 insertions, 26 deletions
diff --git a/src/stream/ngx_stream_proxy_module.c b/src/stream/ngx_stream_proxy_module.c
index a83d627d7..ad3acbaf1 100644
--- a/src/stream/ngx_stream_proxy_module.c
+++ b/src/stream/ngx_stream_proxy_module.c
@@ -17,6 +17,7 @@ typedef struct {
size_t buffer_size;
size_t upload_rate;
size_t download_rate;
+ ngx_uint_t responses;
ngx_uint_t next_upstream_tries;
ngx_flag_t next_upstream;
ngx_flag_t proxy_protocol;
@@ -167,6 +168,13 @@ static ngx_command_t ngx_stream_proxy_commands[] = {
offsetof(ngx_stream_proxy_srv_conf_t, download_rate),
NULL },
+ { ngx_string("proxy_responses"),
+ NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE1,
+ ngx_conf_set_num_slot,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ offsetof(ngx_stream_proxy_srv_conf_t, responses),
+ NULL },
+
{ ngx_string("proxy_next_upstream"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_FLAG,
ngx_conf_set_flag_slot,
@@ -351,6 +359,7 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s)
u->peer.log_error = NGX_ERROR_ERR;
u->peer.local = pscf->local;
+ u->peer.type = c->type;
uscf = pscf->upstream;
@@ -370,6 +379,14 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s)
u->proxy_protocol = pscf->proxy_protocol;
u->start_sec = ngx_time();
+ c->write->handler = ngx_stream_proxy_downstream_handler;
+ c->read->handler = ngx_stream_proxy_downstream_handler;
+
+ if (c->type == SOCK_DGRAM) {
+ ngx_stream_proxy_connect(s);
+ return;
+ }
+
p = ngx_pnalloc(c->pool, pscf->buffer_size);
if (p == NULL) {
ngx_stream_proxy_finalize(s, NGX_ERROR);
@@ -381,9 +398,6 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s)
u->downstream_buf.pos = p;
u->downstream_buf.last = p;
- c->write->handler = ngx_stream_proxy_downstream_handler;
- c->read->handler = ngx_stream_proxy_downstream_handler;
-
if (u->proxy_protocol
#if (NGX_STREAM_SSL)
&& pscf->ssl == NULL
@@ -488,7 +502,10 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
- if (cscf->tcp_nodelay && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
+ if (pc->type == SOCK_STREAM
+ && cscf->tcp_nodelay
+ && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET)
+ {
ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay");
tcp_nodelay = 1;
@@ -516,7 +533,7 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
#if (NGX_STREAM_SSL)
- if (pscf->ssl && pc->ssl == NULL) {
+ if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) {
ngx_stream_proxy_ssl_init_connection(s);
return;
}
@@ -544,23 +561,35 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
c->log->action = "proxying connection";
- p = ngx_pnalloc(c->pool, pscf->buffer_size);
- if (p == NULL) {
- ngx_stream_proxy_finalize(s, NGX_ERROR);
- return;
+ if (u->upstream_buf.start == NULL) {
+ p = ngx_pnalloc(c->pool, pscf->buffer_size);
+ if (p == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_ERROR);
+ return;
+ }
+
+ u->upstream_buf.start = p;
+ u->upstream_buf.end = p + pscf->buffer_size;
+ u->upstream_buf.pos = p;
+ u->upstream_buf.last = p;
}
- u->upstream_buf.start = p;
- u->upstream_buf.end = p + pscf->buffer_size;
- u->upstream_buf.pos = p;
- u->upstream_buf.last = p;
+ if (c->type == SOCK_DGRAM) {
+ s->received = c->buffer->last - c->buffer->pos;
+ u->downstream_buf = *c->buffer;
+
+ if (pscf->responses == 0) {
+ pc->read->ready = 0;
+ pc->read->eof = 1;
+ }
+ }
u->connected = 1;
pc->read->handler = ngx_stream_proxy_upstream_handler;
pc->write->handler = ngx_stream_proxy_upstream_handler;
- if (pc->read->ready) {
+ if (pc->read->ready || pc->read->eof) {
ngx_post_event(pc->read, &ngx_posted_events);
}
@@ -894,11 +923,15 @@ ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream)
s = c->data;
u = s->upstream;
+ c = s->connection;
+ pc = u->peer.connection;
+
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
if (ev->timedout) {
+ ev->timedout = 0;
if (ev->delayed) {
-
- ev->timedout = 0;
ev->delayed = 0;
if (!ev->ready) {
@@ -907,20 +940,35 @@ ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream)
return;
}
- if (u->connected) {
- pc = u->peer.connection;
-
- if (!c->read->delayed && !pc->read->delayed) {
- pscf = ngx_stream_get_module_srv_conf(s,
- ngx_stream_proxy_module);
- ngx_add_timer(c->write, pscf->timeout);
- }
+ if (u->connected && !c->read->delayed && !pc->read->delayed) {
+ ngx_add_timer(c->write, pscf->timeout);
}
return;
}
} else {
+ if (s->connection->type == SOCK_DGRAM) {
+ if (pscf->responses == NGX_MAX_INT32_VALUE) {
+
+ /*
+ * successfully terminate timed out UDP session
+ * with unspecified number of responses
+ */
+
+ pc->read->ready = 0;
+ pc->read->eof = 1;
+
+ ngx_stream_proxy_process(s, 1, 0);
+ return;
+ }
+
+ if (u->received == 0) {
+ ngx_stream_proxy_next_upstream(s);
+ return;
+ }
+ }
+
ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
ngx_stream_proxy_finalize(s, NGX_DECLINED);
return;
@@ -1039,6 +1087,21 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
c = s->connection;
pc = u->connected ? u->peer.connection : NULL;
+ if (c->type == SOCK_DGRAM && (ngx_terminate || ngx_exiting)) {
+
+ /* socket is already closed on worker shutdown */
+
+ handler = c->log->handler;
+ c->log->handler = NULL;
+
+ ngx_log_error(NGX_LOG_INFO, c->log, 0, "disconnected on shutdown");
+
+ c->log->handler = handler;
+
+ ngx_stream_proxy_finalize(s, NGX_OK);
+ return;
+ }
+
pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
if (from_upstream) {
@@ -1066,7 +1129,17 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
n = dst->send(dst, b->pos, size);
+ if (n == NGX_AGAIN && dst->shared) {
+ /* cannot wait on a shared socket */
+ n = NGX_ERROR;
+ }
+
if (n == NGX_ERROR) {
+ if (c->type == SOCK_DGRAM && !from_upstream) {
+ ngx_stream_proxy_next_upstream(s);
+ return;
+ }
+
ngx_stream_proxy_finalize(s, NGX_DECLINED);
return;
}
@@ -1118,6 +1191,12 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
}
}
+ if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses)
+ {
+ src->read->ready = 0;
+ src->read->eof = 1;
+ }
+
*received += n;
b->last += n;
do_write = 1;
@@ -1126,6 +1205,11 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
}
if (n == NGX_ERROR) {
+ if (c->type == SOCK_DGRAM && u->received == 0) {
+ ngx_stream_proxy_next_upstream(s);
+ return;
+ }
+
src->read->eof = 1;
}
}
@@ -1152,13 +1236,13 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
flags = src->read->eof ? NGX_CLOSE_EVENT : 0;
- if (ngx_handle_read_event(src->read, flags) != NGX_OK) {
+ if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) {
ngx_stream_proxy_finalize(s, NGX_ERROR);
return;
}
if (dst) {
- if (ngx_handle_write_event(dst->write, 0) != NGX_OK) {
+ if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) {
ngx_stream_proxy_finalize(s, NGX_ERROR);
return;
}
@@ -1331,6 +1415,7 @@ ngx_stream_proxy_create_srv_conf(ngx_conf_t *cf)
conf->buffer_size = NGX_CONF_UNSET_SIZE;
conf->upload_rate = NGX_CONF_UNSET_SIZE;
conf->download_rate = NGX_CONF_UNSET_SIZE;
+ conf->responses = NGX_CONF_UNSET_UINT;
conf->next_upstream_tries = NGX_CONF_UNSET_UINT;
conf->next_upstream = NGX_CONF_UNSET;
conf->proxy_protocol = NGX_CONF_UNSET;
@@ -1373,6 +1458,9 @@ ngx_stream_proxy_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_size_value(conf->download_rate,
prev->download_rate, 0);
+ ngx_conf_merge_uint_value(conf->responses,
+ prev->responses, NGX_MAX_INT32_VALUE);
+
ngx_conf_merge_uint_value(conf->next_upstream_tries,
prev->next_upstream_tries, 0);