aboutsummaryrefslogtreecommitdiff
path: root/src/event/ngx_event_pipe.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/event/ngx_event_pipe.c')
-rw-r--r--src/event/ngx_event_pipe.c65
1 files changed, 48 insertions, 17 deletions
diff --git a/src/event/ngx_event_pipe.c b/src/event/ngx_event_pipe.c
index c5b15bda9..b7cb6b6ae 100644
--- a/src/event/ngx_event_pipe.c
+++ b/src/event/ngx_event_pipe.c
@@ -254,7 +254,6 @@ int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
p->upstream_eof = 1;
break;
}
-
}
p->read_length += n;
@@ -269,7 +268,7 @@ int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
if (n >= size) {
cl->hunk->last = cl->hunk->end;
-/* STUB */ cl->hunk->num = p->num++;
+ /* STUB */ cl->hunk->num = p->num++;
if (p->input_filter(p, cl->hunk) == NGX_ERROR) {
return NGX_ABORT;
@@ -288,7 +287,9 @@ int ngx_event_pipe_read_upstream(ngx_event_pipe_t *p)
}
if ((p->upstream_eof || p->upstream_error) && p->free_raw_hunks) {
-/* STUB */ p->free_raw_hunks->hunk->num = p->num++;
+
+ /* STUB */ p->free_raw_hunks->hunk->num = p->num++;
+
if (p->input_filter(p, p->free_raw_hunks->hunk) == NGX_ERROR) {
return NGX_ABORT;
}
@@ -326,9 +327,39 @@ int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
return ngx_event_pipe_drain_chains(p);
}
- if ((p->upstream_eof || p->upstream_error || p->upstream_done)
- && p->out == NULL && p->in == NULL)
- {
+ if (p->upstream_eof || p->upstream_error || p->upstream_done) {
+
+ /* pass the p->out and p->in chains to the output filter */
+
+ if (p->out) {
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "pipe write downstream flush out");
+
+ if (p->output_filter(p->output_ctx, p->out) == NGX_ERROR) {
+ p->downstream_error = 1;
+ return ngx_event_pipe_drain_chains(p);
+ }
+
+ p->out = NULL;
+ }
+
+ if (p->in) {
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "pipe write downstream flush in");
+
+ if (p->output_filter(p->output_ctx, p->in) == NGX_ERROR) {
+ p->downstream_error = 1;
+ return ngx_event_pipe_drain_chains(p);
+ }
+
+ p->in = NULL;
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_EVENT, p->log, 0,
+ "pipe write downstream done");
+
+ /* TODO: free unused hunks */
+
p->downstream_done = 1;
break;
}
@@ -338,8 +369,9 @@ int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
}
/*
- * bsize is the busy hunks size
- * to_write is the size of data that to be written
+ * bsize is the size of the busy hunks,
+ * to_write is the size of data in these hunks that
+ * would be written to a socket
*/
bsize = 0;
@@ -390,25 +422,24 @@ int ngx_event_pipe_write_to_downstream(ngx_event_pipe_t *p)
}
if (out == NULL) {
+
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, p->log, 0,
- "pipe busy hunk data: %d", to_write);
+ "pipe busy hunk data to write: %d", to_write);
if (!(p->upstream_blocked && to_write)) {
break;
}
- /*
- * if the upstream is blocked and there are the busy hunks
- * to write then write these hunks
- */
}
+ /*
+ * if the upstream is blocked and there are the busy hunks
+ * to write then write these hunks
+ */
+
if (p->output_filter(p->output_ctx, out) == NGX_ERROR) {
p->downstream_error = 1;
-
- /* handle the downstream error at the begin of a cycle */
-
- continue;
+ return ngx_event_pipe_drain_chains(p);
}
ngx_chain_update_chains(&p->free, &p->busy, &out, p->tag);