aboutsummaryrefslogtreecommitdiff
path: root/nginx/ngx_stream_js_module.c
diff options
context:
space:
mode:
authorDmitry Volyntsev <xeioex@nginx.com>2022-08-23 19:36:16 -0700
committerDmitry Volyntsev <xeioex@nginx.com>2022-08-23 19:36:16 -0700
commit569292e0a74f2b1ec09566f3329f82bdd0d58e87 (patch)
treee97eddef237807acffe05d430fe7c2b8e12469e4 /nginx/ngx_stream_js_module.c
parent399580ff3bb1f1a5f584afb6bf7fc42d2f90a6d8 (diff)
downloadnjs-569292e0a74f2b1ec09566f3329f82bdd0d58e87.tar.gz
njs-569292e0a74f2b1ec09566f3329f82bdd0d58e87.zip
Stream: improved s.send() with async callbacks.
Previously, s.send() was a context dependant method because the direction it was sending data to was determined by a callback (upstream or downstream) it was called from. This works for synchronous callbacks it was originally designed, but fails with async functions (e.g. ngx.fetch()). The fix is to store the direction data was going to as a separate flag which can be used by s.send().
Diffstat (limited to 'nginx/ngx_stream_js_module.c')
-rw-r--r--nginx/ngx_stream_js_module.c91
1 files changed, 79 insertions, 12 deletions
diff --git a/nginx/ngx_stream_js_module.c b/nginx/ngx_stream_js_module.c
index 902b542e..ede8e060 100644
--- a/nginx/ngx_stream_js_module.c
+++ b/nginx/ngx_stream_js_module.c
@@ -83,6 +83,8 @@ static ngx_int_t ngx_stream_js_phase_handler(ngx_stream_session_t *s,
ngx_str_t *name);
static ngx_int_t ngx_stream_js_body_filter(ngx_stream_session_t *s,
ngx_chain_t *in, ngx_uint_t from_upstream);
+static ngx_int_t ngx_stream_js_next_filter(ngx_stream_session_t *s,
+ ngx_stream_js_ctx_t *ctx, ngx_chain_t *out, ngx_uint_t from_upstream);
static ngx_int_t ngx_stream_js_variable_set(ngx_stream_session_t *s,
ngx_stream_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_stream_js_variable_var(ngx_stream_session_t *s,
@@ -92,7 +94,8 @@ static void ngx_stream_js_drop_events(ngx_stream_js_ctx_t *ctx);
static void ngx_stream_js_cleanup(void *data);
static void ngx_stream_js_cleanup_vm(void *data);
static njs_int_t ngx_stream_js_run_event(ngx_stream_session_t *s,
- ngx_stream_js_ctx_t *ctx, ngx_stream_js_ev_t *event);
+ ngx_stream_js_ctx_t *ctx, ngx_stream_js_ev_t *event,
+ ngx_uint_t from_upstream);
static njs_vm_event_t *ngx_stream_js_event(ngx_stream_session_t *s,
njs_str_t *event);
@@ -514,6 +517,17 @@ static njs_external_t ngx_stream_js_ext_session_flags[] = {
}
},
+ {
+ .flags = NJS_EXTERN_PROPERTY,
+ .name.string = njs_str("from_upstream"),
+ .enumerable = 1,
+ .u.property = {
+ .handler = ngx_js_ext_flags,
+ .magic16 = NGX_JS_BOOLEAN,
+ .magic32 = 0x00000002,
+ }
+ },
+
};
@@ -620,7 +634,7 @@ ngx_stream_js_phase_handler(ngx_stream_session_t *s, ngx_str_t *name)
}
}
- ret = ngx_stream_js_run_event(s, ctx, &ctx->events[NGX_JS_EVENT_UPLOAD]);
+ ret = ngx_stream_js_run_event(s, ctx, &ctx->events[NGX_JS_EVENT_UPLOAD], 0);
if (ret != NJS_OK) {
ngx_js_retval(ctx->vm, NULL, &exception);
@@ -655,11 +669,11 @@ static ngx_int_t
ngx_stream_js_body_filter(ngx_stream_session_t *s, ngx_chain_t *in,
ngx_uint_t from_upstream)
{
+ ngx_int_t rc;
ngx_str_t exception;
njs_int_t ret;
- ngx_int_t rc;
- ngx_chain_t *out, *cl, **busy;
- ngx_connection_t *c, *dst;
+ ngx_chain_t *out, *cl;
+ ngx_connection_t *c;
ngx_stream_js_ev_t *event;
ngx_stream_js_ctx_t *ctx;
ngx_stream_js_srv_conf_t *jscf;
@@ -704,7 +718,7 @@ ngx_stream_js_body_filter(ngx_stream_session_t *s, ngx_chain_t *in,
event = ngx_stream_event(from_upstream);
if (event->ev != NULL) {
- ret = ngx_stream_js_run_event(s, ctx, event);
+ ret = ngx_stream_js_run_event(s, ctx, event, from_upstream);
if (ret != NJS_OK) {
ngx_js_retval(ctx->vm, NULL, &exception);
@@ -731,8 +745,23 @@ ngx_stream_js_body_filter(ngx_stream_session_t *s, ngx_chain_t *in,
in = in->next;
}
+ ctx->buf = NULL;
*ctx->last_out = NULL;
+ return ngx_stream_js_next_filter(s, ctx, out, from_upstream);
+}
+
+
+static ngx_int_t
+ngx_stream_js_next_filter(ngx_stream_session_t *s, ngx_stream_js_ctx_t *ctx,
+ ngx_chain_t *out, ngx_uint_t from_upstream)
+{
+ ngx_int_t rc;
+ ngx_chain_t **busy;
+ ngx_connection_t *c, *dst;
+
+ c = s->connection;
+
if (from_upstream) {
dst = c;
busy = &ctx->downstream_busy;
@@ -946,7 +975,7 @@ ngx_stream_js_cleanup_vm(void *data)
static njs_int_t
ngx_stream_js_run_event(ngx_stream_session_t *s, ngx_stream_js_ctx_t *ctx,
- ngx_stream_js_ev_t *event)
+ ngx_stream_js_ev_t *event, ngx_uint_t from_upstream)
{
size_t len;
u_char *p;
@@ -980,7 +1009,7 @@ ngx_stream_js_run_event(ngx_stream_session_t *s, ngx_stream_js_ctx_t *ctx,
return ret;
}
- flags = b && b->last_buf;
+ flags = from_upstream << 1 | (uintptr_t) (b && b->last_buf);
ret = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[2]),
ngx_stream_js_session_flags_proto_id, (void *) flags, 0);
@@ -1249,6 +1278,7 @@ ngx_stream_js_ext_send(njs_vm_t *vm, njs_value_t *args, njs_uint_t nargs,
static const njs_str_t last_key = njs_str("last");
static const njs_str_t flush_key = njs_str("flush");
+ static const njs_str_t from_key = njs_str("from_upstream");
s = njs_vm_external(vm, ngx_stream_js_session_proto_id,
njs_argument(args, 0));
@@ -1271,8 +1301,19 @@ ngx_stream_js_ext_send(njs_vm_t *vm, njs_value_t *args, njs_uint_t nargs,
return NJS_ERROR;
}
- flush = ctx->buf->flush;
- last_buf = ctx->buf->last_buf;
+ /*
+ * ctx->buf != NULL when s.send() is called while processing incoming
+ * data chunks, otherwise s.send() is called asynchronously
+ */
+
+ if (ctx->buf != NULL) {
+ flush = ctx->buf->flush;
+ last_buf = ctx->buf->last_buf;
+
+ } else {
+ flush = 0;
+ last_buf = 0;
+ }
flags = njs_arg(args, nargs, 2);
@@ -1308,12 +1349,38 @@ ngx_stream_js_ext_send(njs_vm_t *vm, njs_value_t *args, njs_uint_t nargs,
b->pos = b->start;
b->last = b->end;
- *ctx->last_out = cl;
- ctx->last_out = &cl->next;
+ if (ctx->buf != NULL) {
+ *ctx->last_out = cl;
+ ctx->last_out = &cl->next;
+
+ } else {
+ if (!njs_value_is_object(flags)) {
+ goto exception;
+ }
+
+ value = njs_vm_object_prop(vm, flags, &from_key, &lvalue);
+ if (value == NULL) {
+ goto exception;
+ }
+
+ if (ngx_stream_js_next_filter(s, ctx, cl, njs_value_bool(value))
+ == NGX_ERROR)
+ {
+ njs_vm_error(vm, "ngx_stream_js_next_filter() failed");
+ return NJS_ERROR;
+ }
+ }
njs_value_undefined_set(njs_vm_retval(vm));
return NJS_OK;
+
+exception:
+
+ njs_vm_error(vm, "\"from_upstream\" flag is expected when"
+ "called asynchronously");
+
+ return NJS_ERROR;
}