aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/stream/ngx_stream_core_module.c189
1 files changed, 146 insertions, 43 deletions
diff --git a/src/stream/ngx_stream_core_module.c b/src/stream/ngx_stream_core_module.c
index f0b79341d..2ef39af24 100644
--- a/src/stream/ngx_stream_core_module.c
+++ b/src/stream/ngx_stream_core_module.c
@@ -10,6 +10,11 @@
#include <ngx_stream.h>
+static ngx_uint_t ngx_stream_preread_can_peek(ngx_connection_t *c);
+static ngx_int_t ngx_stream_preread_peek(ngx_stream_session_t *s,
+ ngx_stream_phase_handler_t *ph);
+static ngx_int_t ngx_stream_preread(ngx_stream_session_t *s,
+ ngx_stream_phase_handler_t *ph);
static ngx_int_t ngx_stream_core_preconfiguration(ngx_conf_t *cf);
static void *ngx_stream_core_create_main_conf(ngx_conf_t *cf);
static char *ngx_stream_core_init_main_conf(ngx_conf_t *cf, void *conf);
@@ -203,8 +208,6 @@ ngx_int_t
ngx_stream_core_preread_phase(ngx_stream_session_t *s,
ngx_stream_phase_handler_t *ph)
{
- size_t size;
- ssize_t n;
ngx_int_t rc;
ngx_connection_t *c;
ngx_stream_core_srv_conf_t *cscf;
@@ -217,57 +220,34 @@ ngx_stream_core_preread_phase(ngx_stream_session_t *s,
if (c->read->timedout) {
rc = NGX_STREAM_OK;
-
- } else if (c->read->timer_set) {
- rc = NGX_AGAIN;
-
- } else {
- rc = ph->handler(s);
+ goto done;
}
- while (rc == NGX_AGAIN) {
-
- if (c->buffer == NULL) {
- c->buffer = ngx_create_temp_buf(c->pool, cscf->preread_buffer_size);
- if (c->buffer == NULL) {
- rc = NGX_ERROR;
- break;
- }
- }
-
- size = c->buffer->end - c->buffer->last;
-
- if (size == 0) {
- ngx_log_error(NGX_LOG_ERR, c->log, 0, "preread buffer full");
- rc = NGX_STREAM_BAD_REQUEST;
- break;
- }
-
- if (c->read->eof) {
- rc = NGX_STREAM_OK;
- break;
- }
-
- if (!c->read->ready) {
- break;
- }
-
- n = c->recv(c, c->buffer->last, size);
+ if (!c->read->timer_set) {
+ rc = ph->handler(s);
- if (n == NGX_ERROR || n == 0) {
- rc = NGX_STREAM_OK;
- break;
+ if (rc != NGX_AGAIN) {
+ goto done;
}
+ }
- if (n == NGX_AGAIN) {
- break;
+ if (c->buffer == NULL) {
+ c->buffer = ngx_create_temp_buf(c->pool, cscf->preread_buffer_size);
+ if (c->buffer == NULL) {
+ rc = NGX_ERROR;
+ goto done;
}
+ }
- c->buffer->last += n;
+ if (ngx_stream_preread_can_peek(c)) {
+ rc = ngx_stream_preread_peek(s, ph);
- rc = ph->handler(s);
+ } else {
+ rc = ngx_stream_preread(s, ph);
}
+done:
+
if (rc == NGX_AGAIN) {
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
@@ -311,6 +291,129 @@ ngx_stream_core_preread_phase(ngx_stream_session_t *s,
}
+static ngx_uint_t
+ngx_stream_preread_can_peek(ngx_connection_t *c)
+{
+#if (NGX_STREAM_SSL)
+ if (c->ssl) {
+ return 0;
+ }
+#endif
+
+ if ((ngx_event_flags & NGX_USE_CLEAR_EVENT) == 0) {
+ return 0;
+ }
+
+#if (NGX_HAVE_KQUEUE)
+ if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
+ return 1;
+ }
+#endif
+
+#if (NGX_HAVE_EPOLLRDHUP)
+ if ((ngx_event_flags & NGX_USE_EPOLL_EVENT) && ngx_use_epoll_rdhup) {
+ return 1;
+ }
+#endif
+
+ return 0;
+}
+
+
+static ngx_int_t
+ngx_stream_preread_peek(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph)
+{
+ ssize_t n;
+ ngx_int_t rc;
+ ngx_err_t err;
+ ngx_connection_t *c;
+
+ c = s->connection;
+
+ n = recv(c->fd, (char *) c->buffer->last,
+ c->buffer->end - c->buffer->last, MSG_PEEK);
+
+ err = ngx_socket_errno;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "stream recv(): %z", n);
+
+ if (n == -1) {
+ if (err == NGX_EAGAIN) {
+ c->read->ready = 0;
+ return NGX_AGAIN;
+ }
+
+ ngx_connection_error(c, err, "recv() failed");
+ return NGX_STREAM_OK;
+ }
+
+ if (n == 0) {
+ return NGX_STREAM_OK;
+ }
+
+ c->buffer->last += n;
+
+ rc = ph->handler(s);
+
+ if (rc != NGX_AGAIN) {
+ c->buffer->last = c->buffer->pos;
+ return rc;
+ }
+
+ if (c->buffer->last == c->buffer->end) {
+ ngx_log_error(NGX_LOG_ERR, c->log, 0, "preread buffer full");
+ return NGX_STREAM_BAD_REQUEST;
+ }
+
+ if (c->read->pending_eof) {
+ return NGX_STREAM_OK;
+ }
+
+ c->buffer->last = c->buffer->pos;
+
+ return NGX_AGAIN;
+}
+
+
+static ngx_int_t
+ngx_stream_preread(ngx_stream_session_t *s, ngx_stream_phase_handler_t *ph)
+{
+ ssize_t n;
+ ngx_int_t rc;
+ ngx_connection_t *c;
+
+ c = s->connection;
+
+ while (c->read->ready) {
+
+ n = c->recv(c, c->buffer->last, c->buffer->end - c->buffer->last);
+
+ if (n == NGX_AGAIN) {
+ return NGX_AGAIN;
+ }
+
+ if (n == NGX_ERROR || n == 0) {
+ return NGX_STREAM_OK;
+ }
+
+ c->buffer->last += n;
+
+ rc = ph->handler(s);
+
+ if (rc != NGX_AGAIN) {
+ return rc;
+ }
+
+ if (c->buffer->last == c->buffer->end) {
+ ngx_log_error(NGX_LOG_ERR, c->log, 0, "preread buffer full");
+ return NGX_STREAM_BAD_REQUEST;
+ }
+ }
+
+ return NGX_AGAIN;
+}
+
+
ngx_int_t
ngx_stream_core_content_phase(ngx_stream_session_t *s,
ngx_stream_phase_handler_t *ph)