aboutsummaryrefslogtreecommitdiff
path: root/src/event/ngx_event_proxy.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/ngx_event_proxy.c')
-rw-r--r--src/event/ngx_event_proxy.c374
1 files changed, 109 insertions, 265 deletions
diff --git a/src/event/ngx_event_proxy.c b/src/event/ngx_event_proxy.c
index 5dfa67109..c6bd8a373 100644
--- a/src/event/ngx_event_proxy.c
+++ b/src/event/ngx_event_proxy.c
@@ -4,21 +4,27 @@
#include <ngx_event.h>
#include <ngx_event_proxy.h>
+static int ngx_event_proxy_write_chain_to_temp_file(ngx_event_proxy_t *p);
+ngx_inline static void ngx_remove_shadow_links(ngx_hunk_t *hunk);
+ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free,
+ ngx_hunk_t *h);
+ngx_inline static void ngx_add_after_partially_filled_hunk(ngx_chain_t **chain,
+ ngx_chain_t *ce);
+
int ngx_event_proxy_read_upstream(ngx_event_proxy_t *p)
{
int n, rc, size;
- ngx_hunk_t *h, *nh;
- ngx_chain_t *chain, *rest, *ce, *next;
-
- rest = NULL;
+ ngx_hunk_t *h;
+ ngx_chain_t *chain, *ce, *te;
p->upstream_level++;
-ngx_log_debug(p->log, "read upstream");
-
- for ( ;; ) {
+ ngx_log_debug(p->log, "read upstream: %d" _ p->upstream->read->ready);
+ while (p->preread_hunks
+ || (p->upstream->read->ready && !p->upstream_done))
+ {
if (p->preread_hunks) {
/* use the pre-read hunks if they exist */
@@ -27,7 +33,7 @@ ngx_log_debug(p->log, "read upstream");
p->preread_hunks = NULL;
n = p->preread_size;
-ngx_log_debug(p->log, "preread: %d" _ n);
+ ngx_log_debug(p->log, "preread: %d" _ n);
} else {
@@ -46,24 +52,14 @@ ngx_log_debug(p->log, "preread: %d" _ n);
"readv() failed");
p->upstream_error = 1;
+ break;
+
} else if (p->upstream->read->eof
&& p->upstream->read->available == 0) {
p->upstream_eof = 1;
- }
- if ((p->upstream_eof || p->upstream_error)
- && p->free_raw_hunk)
- {
- if (p->input_filter(p->input_data, p->free_raw_hunk->hunk)
- == NGX_ERROR) {
- return NGX_ABORT;
- }
-
- /* TODO: p->free_raw_hunk->next can be free()ed */
- p->free_raw_hunk = p->free_raw_hunk->next;
+ break;
}
-
- break;
}
#endif
@@ -74,23 +70,18 @@ ngx_log_debug(p->log, "preread: %d" _ n);
chain = p->free_raw_hunks;
p->free_raw_hunks = NULL;
-ngx_log_debug(p->log, "free hunk: %08X:%d" _ chain->hunk _
- chain->hunk->end - chain->hunk->last);
-
} else if (p->hunks < p->bufs.num) {
/* allocate a new hunk if it's still allowed */
ngx_test_null(h, ngx_create_temp_hunk(p->pool,
- p->bufs.size, 20, 20);
+ p->bufs.size, 0, 0),
NGX_ABORT);
p->hunks++;
ngx_alloc_ce_and_set_hunk(te, h, p->pool, NGX_ABORT);
chain = te;
-ngx_log_debug(p->log, "new hunk: %08X" _ chain->hunk);
-
} else if (!p->cachable && p->downstream->write->ready) {
/*
@@ -98,7 +89,7 @@ ngx_log_debug(p->log, "new hunk: %08X" _ chain->hunk);
* a downstream is ready then write the hunks to a downstream.
*/
-ngx_log_debug(p->log, "downstream ready");
+ ngx_log_debug(p->log, "downstream ready");
break;
@@ -111,7 +102,7 @@ ngx_log_debug(p->log, "downstream ready");
rc = ngx_event_proxy_write_chain_to_temp_file(p);
-ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset);
+ ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset);
if (rc == NGX_AGAIN) {
if (ngx_event_flags & NGX_USE_LEVEL_EVENT
@@ -133,21 +124,18 @@ ngx_log_debug(p->log, "temp offset: %d" _ p->temp_offset);
chain = p->free_raw_hunks;
p->free_raw_hunks = NULL;
-ngx_log_debug(p->log, "new file hunk: %08X:%d" _ chain->hunk _
- chain->hunk->end - chain->hunk->last);
-
} else {
/* if there're no hunks to read in then disable a level event */
-ngx_log_debug(p->log, "no hunks to read in");
+ ngx_log_debug(p->log, "no hunks to read in");
break;
}
n = ngx_recv_chain(p->upstream, chain);
-ngx_log_debug(p->log, "recv_chain: %d" _ n);
+ ngx_log_debug(p->log, "recv_chain: %d" _ n);
if (n == NGX_ERROR) {
p->upstream_error = 1;
@@ -159,10 +147,7 @@ ngx_log_debug(p->log, "recv_chain: %d" _ n);
}
if (n == 0) {
- THINK
- if (chain->hunk->shadow == NULL) {
- p->free_hunks = chain;
- }
+ p->free_raw_hunks = chain;
p->upstream_eof = 1;
break;
@@ -172,180 +157,41 @@ ngx_log_debug(p->log, "recv_chain: %d" _ n);
for (ce = chain; ce && n > 0; ce = ce->next) {
+ ngx_remove_shadow_links(ce->hunk);
+
size = ce->hunk->end - ce->hunk->last;
if (n >= size) {
ce->hunk->last = ce->hunk->end;
- if (p->input_filter(p->input_data, ce->hunk) == NGX_ERROR) {
+ if (p->input_filter(p, ce->hunk) == NGX_ERROR) {
return NGX_ABORT;
}
n -= size;
- } else if (p->upstream_eof || p->upstream_error) {
-
- if (p->input_filter(p->input_data, ce->hunk) == NGX_ERROR) {
- return NGX_ABORT;
- }
-
- } else {
- ce->hunk->last += n;
- n = 0;
- }
-
-
-
-
-
-
-
-
- next = ce->next;
- ce->next = NULL;
-
- if (ce->hunk->shadow) {
- for (h = ce->hunk->shadow;
- (h->type & NGX_HUNK_LAST_SHADOW) == 0;
- h = nh)
- {
- nh = h->shadow;
- h->shadow = NULL;
- h->type &= ~(NGX_HUNK_TEMP
- |NGX_HUNK_IN_MEMORY
- |NGX_HUNK_RECYCLED);
- }
-
- h->shadow = NULL;
- h->type &= ~(NGX_HUNK_TEMP
- |NGX_HUNK_IN_MEMORY
- |NGX_HUNK_RECYCLED
- |NGX_HUNK_LAST_SHADOW);
- ce->hunk->shadow = NULL;
- }
-
- size = ce->hunk->end - ce->hunk->last;
-
- if (n >= size) {
- ce->hunk->last = ce->hunk->end;
-
- if (p->read_hunks) {
- p->last_read_hunk->next = ce;
-
- } else {
- p->read_hunks = ce;
- }
-
- p->last_read_hunk = ce;
-
- n -= size;
+ chain = ce->next;
} else {
ce->hunk->last += n;
- p->free_hunks = ce;
-
n = 0;
}
}
- if (chain == p->free_hunks) {
- chain = NULL;
- }
-
- /*
- * the input filter i.e. that moves HTTP/1.1 chunks
- * from a read chain to an incoming chain
- */
-
- if (p->input_filter(p, chain) == NGX_ERROR) {
- return NGX_ABORT;
- }
-
-ngx_log_debug(p->log, "rest chain: %08X" _ ce);
-
- /*
- * if the rest hunks are file hunks then move them to a file chain
- * otherwise add them to a free chain
- */
-
- if (rest) {
- if (rest->hunk->shadow) {
- p->file_hunks = rest;
-
- } else {
- if (p->free_hunks) {
- p->free_hunks->next = rest;
-
- } else {
- p->free_hunks = rest;
- }
- }
-
- break;
- }
+ p->free_raw_hunks = chain;
}
-ngx_log_debug(p->log, "eof: %d" _ p->upstream_eof);
-
- /*
- * if there's the end of upstream response then move
- * the partially filled hunk from a free chain to an incoming chain
- */
-
- if (p->upstream_eof) {
- if (p->free_hunks
- && p->free_hunks->hunk->pos < p->free_hunks->hunk->last)
- {
-
-#if (NGX_EVENT_COPY_FILTER)
-
- if (p->input_filter(p, NULL) == NGX_ERROR) {
- return NGX_ABORT;
- }
-#else
-
- if (p->input_filter) {
- if (p->input_filter(p, NULL) == NGX_ERROR) {
- return NGX_ABORT;
- }
-
- } else {
- ce = p->free_hunks;
-
- if (p->in_hunks) {
- p->last_in_hunk->next = ce;
-
- } else {
- p->in_hunks = ce;
- }
-
- p->last_in_hunk = ce;
- }
-
- p->free_hunks = ce->next;
- ce->next = NULL;
-
-#endif /* NGX_EVENT_COPY_FILTER */
- }
-
-#if 0
- /* free the unneeded hunks */
-
- for (ce = p->free_hunks; ce; ce = ce->next) {
- ngx_free_hunk(p->pool, ce->hunk);
+ if ((p->upstream_eof || p->upstream_error) && p->free_raw_hunks) {
+ if (p->input_filter(p, p->free_raw_hunks->hunk) == NGX_ERROR) {
+ return NGX_ABORT;
}
-#endif
- if (p->in_hunks) {
- p->last_in_hunk->hunk->type |= NGX_HUNK_LAST;
-
- } else if (p->out_hunks) {
- p->last_out_hunk->hunk->type |= NGX_HUNK_LAST;
- }
+ /* TODO: p->free_raw_hunk->next can be free()ed */
+ p->free_raw_hunks = p->free_raw_hunks->next;
}
if (p->cachable) {
- if (p->in_hunks) {
+ if (p->in) {
rc = ngx_event_proxy_write_chain_to_temp_file(p);
if (rc != NGX_OK) {
@@ -353,13 +199,13 @@ ngx_log_debug(p->log, "eof: %d" _ p->upstream_eof);
}
}
- if (p->out_hunks && p->downstream->write->ready) {
+ if (p->out && p->downstream->write->ready) {
if (ngx_event_proxy_write_to_downstream(p) == NGX_ABORT) {
return NGX_ABORT;
}
}
- } else if ((p->out_hunks || p->in_hunks) && p->downstream->write->ready) {
+ } else if ((p->out || p->in) && p->downstream->write->ready) {
if (ngx_event_proxy_write_to_downstream(p) == NGX_ABORT) {
return NGX_ABORT;
}
@@ -375,25 +221,22 @@ ngx_log_debug(p->log, "upstream level: %d" _ p->upstream_level);
}
}
- if (p->upstream_eof) {
- return NGX_OK;
-
- } else {
- return NGX_AGAIN;
- }
+ return NGX_OK;
}
int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p)
{
- ngx_chain_t *out, *ce;
+ int rc;
+ ngx_hunk_t *h;
+ ngx_chain_t *out, *ce, *te;
while (p->downstream->write->ready) {
if (p->out) {
out = p->out;
- if (p->busy_len + ngx_hunk_size(out->hunk)) > p->max_busy_len) {
+ if (p->busy_len + ngx_hunk_size(out->hunk) > p->max_busy_len) {
break;
}
@@ -403,7 +246,7 @@ int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p)
} else if (!p->cachable && p->in) {
out = p->in;
- if (p->busy_len + ngx_hunk_size(out->hunk)) > p->max_busy_len) {
+ if (p->busy_len + ngx_hunk_size(out->hunk) > p->max_busy_len) {
break;
}
@@ -415,9 +258,9 @@ int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p)
out->next = NULL;
- rc = p->output_filter(p->output_data, out->hunk);
+ rc = p->output_filter(p->output_ctx, out->hunk);
- ngx_chain_update_chains(p->free, p->busy, out);
+ ngx_chain_update_chains(&p->free, &p->busy, &out);
/* calculate p->busy_len */
@@ -435,14 +278,14 @@ int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p)
h->shadow = NULL;
ngx_alloc_ce_and_set_hunk(te, ce->hunk->shadow, p->pool,
NGX_ABORT);
- ngx_add_after_partially_filled_hunk(p->free_raw_hunks, te);
+ ngx_add_after_partially_filled_hunk(&p->free_raw_hunks, te);
ce->hunk->type &= ~NGX_HUNK_LAST_SHADOW;
}
ce->hunk->shadow = NULL;
}
- if (p->upstream.read->ready)
+ if (p->upstream->read->ready) {
if (ngx_event_proxy_read_upstream(p) == NGX_ERROR) {
return NGX_ABORT;
}
@@ -450,7 +293,8 @@ int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p)
}
if (p->downstream_level == 0) {
- if (ngx_handler_write_event(p->downstream->write) == NGX_ERROR) {
+ if (ngx_handle_write_event(p->downstream->write,
+ /* TODO: lowat */ 0) == NGX_ERROR) {
return NGX_ABORT;
}
}
@@ -463,13 +307,13 @@ int ngx_event_proxy_write_to_downstream(ngx_event_proxy_t *p)
}
-int ngx_event_proxy_write_chain_to_temp_file(ngx_event_proxy_t *p)
+static int ngx_event_proxy_write_chain_to_temp_file(ngx_event_proxy_t *p)
{
int rc, size;
ngx_hunk_t *h;
- ngx_chain_t *entry, *next, *saved_in, *saved_read;
+ ngx_chain_t *ce, *next, *in, **last;
-ngx_log_debug(p->log, "write to file");
+ ngx_log_debug(p->log, "write to file");
if (p->temp_file->fd == NGX_INVALID_FILE) {
rc = ngx_create_temp_file(p->temp_file, p->temp_path, p->pool,
@@ -483,86 +327,59 @@ ngx_log_debug(p->log, "write to file");
return NGX_AGAIN;
}
- if (p->cachable == 0 && p->temp_file_warn) {
+ if (!p->cachable && p->temp_file_warn) {
ngx_log_error(NGX_LOG_WARN, p->log, 0, p->temp_file_warn);
}
}
- if (p->cachable == 0) {
+ if (!p->cachable) {
- entry = p->read_hunks;
size = 0;
+ ce = p->in;
do {
- size += entry->hunk->last - entry->hunk->pos;
+ size += ce->hunk->last - ce->hunk->pos;
if (size >= p->temp_file_write_size) {
break;
}
- entry = entry->next;
-
- } while (entry);
+ ce = ce->next;
- saved_read = entry->next;
- entry->next = NULL;
-
- if (saved_read) {
- for (entry = p->in_hunks; entry->next; entry = entry->next) {
- if (entry->next->hunk->shadow == saved_read->hunk) {
- break;
- }
- }
- saved_in = entry->next;
- entry->next = NULL;
+ } while (ce);
- } else {
- saved_in = NULL;
- }
+ in = ce->next;
+ last = &ce->next;
+ ce->next = NULL;
} else {
- saved_read = NULL;
- saved_in = NULL;
+ in = NULL;
+ last = &p->in;
}
- if (ngx_write_chain_to_file(p->temp_file, p->in_hunks, p->temp_offset,
+ if (ngx_write_chain_to_file(p->temp_file, p->in, p->temp_offset,
p->pool) == NGX_ERROR) {
return NGX_ABORT;
}
- for (entry = p->in_hunks; entry; entry = next) {
- next = entry->next;
- entry->next = NULL;
+ for (ce = p->in; ce; ce = next) {
+ next = ce->next;
+ ce->next = NULL;
- h = entry->hunk;
+ h = ce->hunk;
h->type |= NGX_HUNK_FILE;
h->file = p->temp_file;
h->file_pos = p->temp_offset;
p->temp_offset += h->last - h->pos;
h->file_last = p->temp_offset;
-ngx_log_debug(p->log, "event proxy file hunk: %08X:%08X" _ h _ h->shadow);
-
if (h->type & NGX_HUNK_LAST_SHADOW) {
-#if 0
- h->shadow->last = h->shadow->pos;
-#else
h->shadow->last = h->shadow->pos = h->shadow->start;
-#endif
}
- if (p->out_hunks) {
- p->last_out_hunk->next = entry;
-
- } else {
- p->out_hunks = entry;
- }
-
- p->last_out_hunk = entry;
+ ngx_chain_add_ce(p->out, p->last_out, ce);
}
- p->file_hunks = p->read_hunks;
-
- p->read_hunks = saved_read;
- p->in_hunks = saved_in;
+ p->in = in;
+ p->last_in = last;
return NGX_OK;
}
@@ -593,22 +410,44 @@ int ngx_event_proxy_copy_input_filter(ngx_event_proxy_t *p, ngx_hunk_t *hunk)
hunk->shadow = h;
ngx_alloc_ce_and_set_hunk(ce, h, p->pool, NGX_ERROR);
- ngx_chain_add_ce(p->in_hunk, p->last_in_hunk, ce);
+ ngx_chain_add_ce(p->in, p->last_in, ce);
return NGX_OK;
}
-ngx_inline static void ngx_remove_shadow_links(ngx_chain_t *ce)
+ngx_inline static void ngx_remove_shadow_links(ngx_hunk_t *hunk)
{
- for (
+ ngx_hunk_t *h, *next;
+
+ if (hunk->shadow == NULL) {
+ return;
+ }
+
+ h = hunk->shadow;
+
+ while (!(h->type & NGX_HUNK_LAST_SHADOW)) {
+ next = h->shadow;
+ h->type &= ~(NGX_HUNK_TEMP|NGX_HUNK_IN_MEMORY|NGX_HUNK_RECYCLED);
+ h->shadow = NULL;
+ h = next;
+ }
+
+ h->type &= ~(NGX_HUNK_TEMP
+ |NGX_HUNK_IN_MEMORY
+ |NGX_HUNK_RECYCLED
+ |NGX_HUNK_LAST_SHADOW);
+ h->shadow = NULL;
+
+ hunk->shadow = NULL;
}
-ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free;
- ngx_hunk_t *h);
+ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free,
+ ngx_hunk_t *h)
{
- ngx_hunk_t *s;
+ ngx_hunk_t *s;
+ ngx_chain_t *ce, **le;
if (h->shadow == NULL) {
return;
@@ -635,15 +474,20 @@ ngx_inline static void ngx_remove_shadow_free_raw_hunk(ngx_chain_t **free;
}
-ngx_inline static void ngx_add_after_partially_filled_hunk(ngx_chain_t *chain,
+ngx_inline static void ngx_add_after_partially_filled_hunk(ngx_chain_t **chain,
ngx_chain_t *ce)
{
- if (chain->hunk->pos != chain->hunk->last) {
- ce->next = chain->next;
- chain->next = ce;
+ if (*chain == NULL) {
+ *chain = ce;
+ return;
+ }
+
+ if ((*chain)->hunk->pos != (*chain)->hunk->last) {
+ ce->next = (*chain)->next;
+ (*chain)->next = ce;
} else {
- ce->next = chain;
- chain = ce;
+ ce->next = (*chain);
+ (*chain) = ce;
}
}