ngx_str_t *name);
static ngx_int_t ngx_stream_js_body_filter(ngx_stream_session_t *s,
ngx_chain_t *in, ngx_uint_t from_upstream);
+static ngx_int_t ngx_stream_js_next_filter(ngx_stream_session_t *s,
+ ngx_stream_js_ctx_t *ctx, ngx_chain_t *out, ngx_uint_t from_upstream);
static ngx_int_t ngx_stream_js_variable_set(ngx_stream_session_t *s,
ngx_stream_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_stream_js_variable_var(ngx_stream_session_t *s,
static void ngx_stream_js_cleanup(void *data);
static void ngx_stream_js_cleanup_vm(void *data);
static njs_int_t ngx_stream_js_run_event(ngx_stream_session_t *s,
- ngx_stream_js_ctx_t *ctx, ngx_stream_js_ev_t *event);
+ ngx_stream_js_ctx_t *ctx, ngx_stream_js_ev_t *event,
+ ngx_uint_t from_upstream);
static njs_vm_event_t *ngx_stream_js_event(ngx_stream_session_t *s,
njs_str_t *event);
}
},
+ {
+ .flags = NJS_EXTERN_PROPERTY,
+ .name.string = njs_str("from_upstream"),
+ .enumerable = 1,
+ .u.property = {
+ .handler = ngx_js_ext_flags,
+ .magic16 = NGX_JS_BOOLEAN,
+ .magic32 = 0x00000002,
+ }
+ },
+
};
}
}
- ret = ngx_stream_js_run_event(s, ctx, &ctx->events[NGX_JS_EVENT_UPLOAD]);
+ ret = ngx_stream_js_run_event(s, ctx, &ctx->events[NGX_JS_EVENT_UPLOAD], 0);
if (ret != NJS_OK) {
ngx_js_retval(ctx->vm, NULL, &exception);
ngx_stream_js_body_filter(ngx_stream_session_t *s, ngx_chain_t *in,
ngx_uint_t from_upstream)
{
+ ngx_int_t rc;
ngx_str_t exception;
njs_int_t ret;
- ngx_int_t rc;
- ngx_chain_t *out, *cl, **busy;
- ngx_connection_t *c, *dst;
+ ngx_chain_t *out, *cl;
+ ngx_connection_t *c;
ngx_stream_js_ev_t *event;
ngx_stream_js_ctx_t *ctx;
ngx_stream_js_srv_conf_t *jscf;
event = ngx_stream_event(from_upstream);
if (event->ev != NULL) {
- ret = ngx_stream_js_run_event(s, ctx, event);
+ ret = ngx_stream_js_run_event(s, ctx, event, from_upstream);
if (ret != NJS_OK) {
ngx_js_retval(ctx->vm, NULL, &exception);
in = in->next;
}
+ ctx->buf = NULL;
*ctx->last_out = NULL;
+ return ngx_stream_js_next_filter(s, ctx, out, from_upstream);
+}
+
+
+static ngx_int_t
+ngx_stream_js_next_filter(ngx_stream_session_t *s, ngx_stream_js_ctx_t *ctx,
+ ngx_chain_t *out, ngx_uint_t from_upstream)
+{
+ ngx_int_t rc;
+ ngx_chain_t **busy;
+ ngx_connection_t *c, *dst;
+
+ c = s->connection;
+
if (from_upstream) {
dst = c;
busy = &ctx->downstream_busy;
static njs_int_t
ngx_stream_js_run_event(ngx_stream_session_t *s, ngx_stream_js_ctx_t *ctx,
- ngx_stream_js_ev_t *event)
+ ngx_stream_js_ev_t *event, ngx_uint_t from_upstream)
{
size_t len;
u_char *p;
return ret;
}
- flags = b && b->last_buf;
+ flags = from_upstream << 1 | (uintptr_t) (b && b->last_buf);
ret = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[2]),
ngx_stream_js_session_flags_proto_id, (void *) flags, 0);
static const njs_str_t last_key = njs_str("last");
static const njs_str_t flush_key = njs_str("flush");
+ static const njs_str_t from_key = njs_str("from_upstream");
s = njs_vm_external(vm, ngx_stream_js_session_proto_id,
njs_argument(args, 0));
return NJS_ERROR;
}
- flush = ctx->buf->flush;
- last_buf = ctx->buf->last_buf;
+ /*
+ * ctx->buf != NULL when s.send() is called while processing incoming
+ * data chunks, otherwise s.send() is called asynchronously
+ */
+
+ if (ctx->buf != NULL) {
+ flush = ctx->buf->flush;
+ last_buf = ctx->buf->last_buf;
+
+ } else {
+ flush = 0;
+ last_buf = 0;
+ }
flags = njs_arg(args, nargs, 2);
b->pos = b->start;
b->last = b->end;
- *ctx->last_out = cl;
- ctx->last_out = &cl->next;
+ if (ctx->buf != NULL) {
+ *ctx->last_out = cl;
+ ctx->last_out = &cl->next;
+
+ } else {
+ if (!njs_value_is_object(flags)) {
+ goto exception;
+ }
+
+ value = njs_vm_object_prop(vm, flags, &from_key, &lvalue);
+ if (value == NULL) {
+ goto exception;
+ }
+
+ if (ngx_stream_js_next_filter(s, ctx, cl, njs_value_bool(value))
+ == NGX_ERROR)
+ {
+ njs_vm_error(vm, "ngx_stream_js_next_filter() failed");
+ return NJS_ERROR;
+ }
+ }
njs_value_undefined_set(njs_vm_retval(vm));
return NJS_OK;
+
+exception:
+
+ njs_vm_error(vm, "\"from_upstream\" flag is expected when"
+ "called asynchronously");
+
+ return NJS_ERROR;
}