aboutsummaryrefslogtreecommitdiff
path: root/src/stream/ngx_stream_proxy_module.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/stream/ngx_stream_proxy_module.c')
-rw-r--r--src/stream/ngx_stream_proxy_module.c216
1 files changed, 138 insertions, 78 deletions
diff --git a/src/stream/ngx_stream_proxy_module.c b/src/stream/ngx_stream_proxy_module.c
index ed802e7e1..2e9047192 100644
--- a/src/stream/ngx_stream_proxy_module.c
+++ b/src/stream/ngx_stream_proxy_module.c
@@ -84,10 +84,10 @@ static char *ngx_stream_proxy_pass(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_stream_proxy_bind(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
-static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s);
#if (NGX_STREAM_SSL)
+static ngx_int_t ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s);
static char *ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf,
ngx_command_t *cmd, void *conf);
static void ngx_stream_proxy_ssl_init_connection(ngx_stream_session_t *s);
@@ -385,8 +385,6 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s)
}
u->peer.type = c->type;
-
- u->proxy_protocol = pscf->proxy_protocol;
u->start_sec = ngx_time();
c->write->handler = ngx_stream_proxy_downstream_handler;
@@ -411,28 +409,6 @@ ngx_stream_proxy_handler(ngx_stream_session_t *s)
u->downstream_buf.pos = p;
u->downstream_buf.last = p;
- if (u->proxy_protocol
-#if (NGX_STREAM_SSL)
- && pscf->ssl == NULL
-#endif
- && pscf->buffer_size >= NGX_PROXY_PROTOCOL_MAX_HEADER)
- {
- /* optimization for a typical case */
-
- ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
- "stream proxy send PROXY protocol header");
-
- p = ngx_proxy_protocol_write(c, u->downstream_buf.last,
- u->downstream_buf.end);
- if (p == NULL) {
- ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
- return;
- }
-
- u->downstream_buf.last = p;
- u->proxy_protocol = 0;
- }
-
if (c->read->ready) {
ngx_post_event(c->read, &ngx_posted_events);
}
@@ -682,8 +658,13 @@ ngx_stream_proxy_connect(ngx_stream_session_t *s)
c->log->action = "connecting to upstream";
+ pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
+
u = s->upstream;
+ u->connected = 0;
+ u->proxy_protocol = pscf->proxy_protocol;
+
if (u->state) {
u->state->response_time = ngx_current_msec - u->state->response_time;
}
@@ -740,8 +721,6 @@ ngx_stream_proxy_connect(ngx_stream_session_t *s)
pc->read->handler = ngx_stream_proxy_connect_handler;
pc->write->handler = ngx_stream_proxy_connect_handler;
- pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
-
ngx_add_timer(pc->write, pscf->connect_timeout);
}
@@ -751,6 +730,7 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
{
int tcp_nodelay;
u_char *p;
+ ngx_chain_t *cl;
ngx_connection_t *c, *pc;
ngx_log_handler_pt handler;
ngx_stream_upstream_t *u;
@@ -782,21 +762,26 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
pc->tcp_nodelay = NGX_TCP_NODELAY_SET;
}
- if (u->proxy_protocol) {
- if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
- return;
- }
-
- u->proxy_protocol = 0;
- }
-
pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
#if (NGX_STREAM_SSL)
- if (pc->type == SOCK_STREAM && pscf->ssl && pc->ssl == NULL) {
- ngx_stream_proxy_ssl_init_connection(s);
- return;
+
+ if (pc->type == SOCK_STREAM && pscf->ssl) {
+
+ if (u->proxy_protocol) {
+ if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
+ return;
+ }
+
+ u->proxy_protocol = 0;
+ }
+
+ if (pc->ssl == NULL) {
+ ngx_stream_proxy_ssl_init_connection(s);
+ return;
+ }
}
+
#endif
c = s->connection;
@@ -838,14 +823,66 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
u->upstream_buf.last = p;
}
- if (c->type == SOCK_DGRAM) {
- s->received = c->buffer->last - c->buffer->pos;
- u->downstream_buf = *c->buffer;
+ if (c->buffer && c->buffer->pos < c->buffer->last) {
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream proxy add preread buffer: %uz",
+ c->buffer->last - c->buffer->pos);
+
+ cl = ngx_chain_get_free_buf(c->pool, &u->free);
+ if (cl == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ *cl->buf = *c->buffer;
+
+ cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+ cl->buf->flush = 1;
+ cl->buf->last_buf = (c->type == SOCK_DGRAM);
+
+ cl->next = u->upstream_out;
+ u->upstream_out = cl;
+ }
+
+ if (u->proxy_protocol) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream proxy add PROXY protocol header");
+
+ cl = ngx_chain_get_free_buf(c->pool, &u->free);
+ if (cl == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ p = ngx_pnalloc(c->pool, NGX_PROXY_PROTOCOL_MAX_HEADER);
+ if (p == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ cl->buf->pos = p;
- if (pscf->responses == 0) {
- pc->read->ready = 0;
- pc->read->eof = 1;
+ p = ngx_proxy_protocol_write(c, p, p + NGX_PROXY_PROTOCOL_MAX_HEADER);
+ if (p == NULL) {
+ ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
}
+
+ cl->buf->last = p;
+ cl->buf->temporary = 1;
+ cl->buf->flush = 0;
+ cl->buf->last_buf = 0;
+ cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+
+ cl->next = u->upstream_out;
+ u->upstream_out = cl;
+
+ u->proxy_protocol = 0;
+ }
+
+ if (c->type == SOCK_DGRAM && pscf->responses == 0) {
+ pc->read->ready = 0;
+ pc->read->eof = 1;
}
u->connected = 1;
@@ -861,6 +898,8 @@ ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
}
+#if (NGX_STREAM_SSL)
+
static ngx_int_t
ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s)
{
@@ -931,8 +970,6 @@ ngx_stream_proxy_send_proxy_protocol(ngx_stream_session_t *s)
}
-#if (NGX_STREAM_SSL)
-
static char *
ngx_stream_proxy_ssl_password_file(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf)
@@ -1412,8 +1449,10 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
size_t size, limit_rate;
ssize_t n;
ngx_buf_t *b;
+ ngx_int_t rc;
ngx_uint_t flags;
ngx_msec_t delay;
+ ngx_chain_t *cl, **ll, **out, **busy;
ngx_connection_t *c, *pc, *src, *dst;
ngx_log_handler_pt handler;
ngx_stream_upstream_t *u;
@@ -1447,6 +1486,8 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
b = &u->upstream_buf;
limit_rate = pscf->download_rate;
received = &u->received;
+ out = &u->downstream_out;
+ busy = &u->downstream_busy;
} else {
src = c;
@@ -1454,24 +1495,18 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
b = &u->downstream_buf;
limit_rate = pscf->upload_rate;
received = &s->received;
+ out = &u->upstream_out;
+ busy = &u->upstream_busy;
}
for ( ;; ) {
- if (do_write) {
-
- size = b->last - b->pos;
+ if (do_write && dst) {
- if (size && dst && dst->write->ready) {
-
- n = dst->send(dst, b->pos, size);
-
- if (n == NGX_AGAIN && dst->shared) {
- /* cannot wait on a shared socket */
- n = NGX_ERROR;
- }
+ if (*out || *busy || dst->buffered) {
+ rc = ngx_stream_top_filter(s, *out, from_upstream);
- if (n == NGX_ERROR) {
+ if (rc == NGX_ERROR) {
if (c->type == SOCK_DGRAM && !from_upstream) {
ngx_stream_proxy_next_upstream(s);
return;
@@ -1481,13 +1516,12 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
return;
}
- if (n > 0) {
- b->pos += n;
+ ngx_chain_update_chains(c->pool, &u->free, busy, out,
+ (ngx_buf_tag_t) &ngx_stream_proxy_module);
- if (b->pos == b->last) {
- b->pos = b->start;
- b->last = b->start;
- }
+ if (*busy == NULL) {
+ b->pos = b->start;
+ b->last = b->start;
}
}
}
@@ -1514,11 +1548,21 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
n = src->recv(src, b->last, size);
- if (n == NGX_AGAIN || n == 0) {
+ if (n == NGX_AGAIN) {
break;
}
- if (n > 0) {
+ if (n == NGX_ERROR) {
+ if (c->type == SOCK_DGRAM && u->received == 0) {
+ ngx_stream_proxy_next_upstream(s);
+ return;
+ }
+
+ src->read->eof = 1;
+ n = 0;
+ }
+
+ if (n >= 0) {
if (limit_rate) {
delay = (ngx_msec_t) (n * 1000 / limit_rate);
@@ -1541,27 +1585,37 @@ ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
src->read->eof = 1;
}
+ for (ll = out; *ll; ll = &(*ll)->next) { /* void */ }
+
+ cl = ngx_chain_get_free_buf(c->pool, &u->free);
+ if (cl == NULL) {
+ ngx_stream_proxy_finalize(s,
+ NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
+
+ *ll = cl;
+
+ cl->buf->pos = b->last;
+ cl->buf->last = b->last + n;
+ cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
+
+ cl->buf->temporary = (n ? 1 : 0);
+ cl->buf->last_buf = src->read->eof;
+ cl->buf->flush = 1;
+
*received += n;
b->last += n;
do_write = 1;
continue;
}
-
- if (n == NGX_ERROR) {
- if (c->type == SOCK_DGRAM && u->received == 0) {
- ngx_stream_proxy_next_upstream(s);
- return;
- }
-
- src->read->eof = 1;
- }
}
break;
}
- if (src->read->eof && (b->pos == b->last || (dst && dst->read->eof))) {
+ if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) {
handler = c->log->handler;
c->log->handler = NULL;
@@ -1614,6 +1668,14 @@ ngx_stream_proxy_next_upstream(ngx_stream_session_t *s)
"stream proxy next upstream");
u = s->upstream;
+ pc = u->peer.connection;
+
+ if (u->upstream_out || u->upstream_busy || (pc && pc->buffered)) {
+ ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
+ "pending buffers on next upstream");
+ ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
+ return;
+ }
if (u->peer.sockaddr) {
u->peer.free(&u->peer, u->peer.data, NGX_PEER_FAILED);
@@ -1632,8 +1694,6 @@ ngx_stream_proxy_next_upstream(ngx_stream_session_t *s)
return;
}
- pc = u->peer.connection;
-
if (pc) {
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
"close proxy upstream connection: %d", pc->fd);