aboutsummaryrefslogtreecommitdiff
path: root/nginx/ngx_stream_js_module.c
diff options
context:
space:
mode:
Diffstat (limited to 'nginx/ngx_stream_js_module.c')
-rw-r--r--nginx/ngx_stream_js_module.c402
1 files changed, 392 insertions, 10 deletions
diff --git a/nginx/ngx_stream_js_module.c b/nginx/ngx_stream_js_module.c
index b9bba7d6..ab66de9d 100644
--- a/nginx/ngx_stream_js_module.c
+++ b/nginx/ngx_stream_js_module.c
@@ -28,6 +28,26 @@ typedef struct {
typedef struct {
+ ngx_stream_conf_ctx_t *conf_ctx;
+ ngx_connection_t *connection;
+ void *padding;
+
+ /**
+ * fd is used for event debug and should be at the same position
+ * as in ngx_connection_t: after a 3rd pointer.
+ */
+ ngx_socket_t fd;
+
+ ngx_str_t method;
+ ngx_msec_t interval;
+ ngx_msec_t jitter;
+
+ ngx_log_t log;
+ ngx_event_t event;
+} ngx_js_periodic_t;
+
+
+typedef struct {
njs_vm_t *vm;
njs_opaque_value_t retval;
njs_opaque_value_t args[3];
@@ -43,6 +63,7 @@ typedef struct {
ngx_stream_js_ev_t events[2];
unsigned filter:1;
unsigned in_progress:1;
+ ngx_js_periodic_t *periodic;
} ngx_stream_js_ctx_t;
@@ -66,7 +87,8 @@ static ngx_int_t ngx_stream_js_variable_set(ngx_stream_session_t *s,
ngx_stream_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_stream_js_variable_var(ngx_stream_session_t *s,
ngx_stream_variable_value_t *v, uintptr_t data);
-static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s);
+static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s,
+ unsigned inject_session);
static void ngx_stream_js_drop_events(ngx_stream_js_ctx_t *ctx);
static void ngx_stream_js_cleanup(void *data);
static njs_int_t ngx_stream_js_run_event(ngx_stream_session_t *s,
@@ -114,8 +136,18 @@ static size_t ngx_stream_js_max_response_buffer_size(njs_vm_t *vm,
static void ngx_stream_js_handle_event(ngx_stream_session_t *s,
njs_vm_event_t vm_event, njs_value_t *args, njs_uint_t nargs);
+static void ngx_stream_js_periodic_handler(ngx_event_t *ev);
+static void ngx_stream_js_periodic_event_handler(ngx_event_t *ev);
+static void ngx_stream_js_periodic_finalize(ngx_stream_session_t *s,
+ ngx_int_t rc);
+static void ngx_stream_js_periodic_destroy(ngx_stream_session_t *s,
+ ngx_js_periodic_t *periodic);
+
static njs_int_t ngx_js_stream_init(njs_vm_t *vm);
static ngx_int_t ngx_stream_js_init(ngx_conf_t *cf);
+static ngx_int_t ngx_stream_js_init_worker(ngx_cycle_t *cycle);
+static char *ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
static char *ngx_stream_js_set(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_stream_js_var(ngx_conf_t *cf, ngx_command_t *cmd,
@@ -154,6 +186,13 @@ static ngx_command_t ngx_stream_js_commands[] = {
0,
NULL },
+ { ngx_string("js_periodic"),
+ NGX_STREAM_SRV_CONF|NGX_CONF_ANY,
+ ngx_stream_js_periodic,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
{ ngx_string("js_preload_object"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE13,
ngx_js_preload_object,
@@ -293,7 +332,7 @@ ngx_module_t ngx_stream_js_module = {
NGX_STREAM_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
- NULL, /* init process */
+ ngx_stream_js_init_worker, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
@@ -647,7 +686,7 @@ ngx_stream_js_phase_handler(ngx_stream_session_t *s, ngx_str_t *name)
return NGX_DECLINED;
}
- rc = ngx_stream_js_init_vm(s);
+ rc = ngx_stream_js_init_vm(s, 1);
if (rc != NGX_OK) {
return rc;
}
@@ -728,7 +767,7 @@ ngx_stream_js_body_filter(ngx_stream_session_t *s, ngx_chain_t *in,
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "stream js filter u:%ui",
from_upstream);
- rc = ngx_stream_js_init_vm(s);
+ rc = ngx_stream_js_init_vm(s, 1);
if (rc == NGX_ERROR) {
return NGX_ERROR;
@@ -836,7 +875,7 @@ ngx_stream_js_variable_set(ngx_stream_session_t *s,
ngx_str_t value;
ngx_stream_js_ctx_t *ctx;
- rc = ngx_stream_js_init_vm(s);
+ rc = ngx_stream_js_init_vm(s, 1);
if (rc == NGX_ERROR) {
return NGX_ERROR;
@@ -910,7 +949,7 @@ ngx_stream_js_variable_var(ngx_stream_session_t *s,
static ngx_int_t
-ngx_stream_js_init_vm(ngx_stream_session_t *s)
+ngx_stream_js_init_vm(ngx_stream_session_t *s, unsigned inject_session)
{
njs_int_t rc;
njs_str_t key;
@@ -987,10 +1026,12 @@ ngx_stream_js_init_vm(ngx_stream_session_t *s)
return NGX_ERROR;
}
- rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]),
- ngx_stream_js_session_proto_id, s, 0);
- if (rc != NJS_OK) {
- return NGX_ERROR;
+ if (inject_session) {
+ rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]),
+ ngx_stream_js_session_proto_id, s, 0);
+ if (rc != NJS_OK) {
+ return NGX_ERROR;
+ }
}
return NGX_OK;
@@ -1695,12 +1736,21 @@ ngx_stream_js_handle_event(ngx_stream_session_t *s, njs_vm_event_t vm_event,
rc = njs_vm_run(ctx->vm);
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "stream js post event handler rc: %i event: %p",
+ (ngx_int_t) rc, vm_event);
+
if (rc == NJS_ERROR) {
ngx_js_retval(ctx->vm, NULL, &exception);
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"js exception: %V", &exception);
+ if (s->health_check) {
+ ngx_stream_js_periodic_finalize(s, NGX_ERROR);
+ return;
+ }
+
ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
}
@@ -1754,6 +1804,337 @@ ngx_stream_js_init_conf_vm(ngx_conf_t *cf, ngx_js_loc_conf_t *conf)
}
+static void
+ngx_stream_js_periodic_handler(ngx_event_t *ev)
+{
+ ngx_int_t rc;
+ ngx_msec_t timer;
+ ngx_js_periodic_t *periodic;
+ ngx_connection_t *c;
+ ngx_stream_js_ctx_t *ctx;
+ ngx_stream_session_t *s;
+ ngx_stream_core_main_conf_t *cmcf;
+
+ periodic = ev->data;
+
+ timer = periodic->interval;
+
+ if (periodic->jitter) {
+ timer += (ngx_msec_t) ngx_random() % periodic->jitter;
+ }
+
+ ngx_add_timer(&periodic->event, timer);
+
+ c = periodic->connection;
+
+ if (c != NULL) {
+ ngx_log_error(NGX_LOG_ERR, c->log, 0,
+ "js periodic \"%V\" is already running, killing previous "
+ "instance", &periodic->method);
+
+ ngx_stream_js_periodic_finalize(c->data, NGX_ERROR);
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, &periodic->log, 0,
+ "stream js periodic handler: \"%V\"", &periodic->method);
+
+ c = ngx_get_connection(0, &periodic->log);
+
+ if (c == NULL) {
+ return;
+ }
+
+ c->pool = ngx_create_pool(1024, c->log);
+ if (c->pool == NULL) {
+ goto free_connection;
+ }
+
+ s = ngx_pcalloc(c->pool, sizeof(ngx_stream_session_t));
+ if (s == NULL) {
+ goto free_pool;
+ }
+
+ s->main_conf = periodic->conf_ctx->main_conf;
+ s->srv_conf = periodic->conf_ctx->srv_conf;
+
+ s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module);
+ if (s->ctx == NULL) {
+ goto free_pool;
+ }
+
+ cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
+
+ s->variables = ngx_pcalloc(c->pool, cmcf->variables.nelts
+ * sizeof(ngx_stream_variable_value_t));
+ if (s->variables == NULL) {
+ goto free_pool;
+ }
+
+ c->data = s;
+ c->destroyed = 0;
+ c->read->log = &periodic->log;
+ c->read->handler = ngx_stream_js_periodic_event_handler;
+
+ s->received = 1;
+ s->connection = c;
+ s->signature = NGX_STREAM_MODULE;
+
+ s->health_check = 1;
+
+ rc = ngx_stream_js_init_vm(s, 0);
+
+ if (rc != NGX_OK) {
+ ngx_stream_js_periodic_destroy(s, periodic);
+ return;
+ }
+
+ periodic->connection = c;
+
+ ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+ ctx->periodic = periodic;
+
+ s->received++;
+
+ rc = ngx_js_invoke(ctx->vm, &periodic->method, &periodic->log, NULL, 0,
+ &ctx->retval);
+
+ if (rc == NGX_AGAIN) {
+ rc = NGX_OK;
+ }
+
+ s->received--;
+
+ ngx_stream_js_periodic_finalize(s, rc);
+
+ return;
+
+free_pool:
+
+ ngx_destroy_pool(c->pool);
+
+free_connection:
+
+ ngx_close_connection(c);
+}
+
+
+static void
+ngx_stream_js_periodic_event_handler(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+ ngx_stream_js_ctx_t *ctx;
+ ngx_stream_session_t *s;
+
+ c = ev->data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream js periodic event handler");
+
+ if (c->close) {
+ ngx_stream_js_periodic_finalize(c->data, NGX_ERROR);
+ return;
+ }
+
+ s = c->data;
+
+ ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+ if (!njs_vm_pending(ctx->vm)) {
+ ngx_stream_js_periodic_finalize(s, NGX_OK);
+ return;
+ }
+}
+
+
+static void
+ngx_stream_js_periodic_finalize(ngx_stream_session_t *s, ngx_int_t rc)
+{
+ ngx_stream_js_ctx_t *ctx;
+
+ ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+ ngx_log_debug4(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "stream js periodic finalize: \"%V\" rc: %i c: %i "
+ "pending: %i", &ctx->periodic->method, rc, s->received,
+ njs_vm_pending(ctx->vm));
+
+ if (s->received > 1 || (rc == NGX_OK && njs_vm_pending(ctx->vm))) {
+ return;
+ }
+
+ ngx_stream_js_periodic_destroy(s, ctx->periodic);
+}
+
+
+static void
+ngx_stream_js_periodic_destroy(ngx_stream_session_t *s,
+ ngx_js_periodic_t *periodic)
+{
+ ngx_connection_t *c;
+
+ c = s->connection;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream js periodic request destroy: \"%V\"",
+ &periodic->method);
+
+ periodic->connection = NULL;
+
+ ngx_free_connection(c);
+
+ ngx_destroy_pool(c->pool);
+
+ c->fd = (ngx_socket_t) -1;
+ c->pool = NULL;
+ c->destroyed = 1;
+
+ if (c->read->posted) {
+ ngx_delete_posted_event(c->read);
+ }
+}
+
+
+static ngx_int_t
+ngx_stream_js_periodic_init(ngx_js_periodic_t *periodic)
+{
+ ngx_log_t *log;
+ ngx_msec_t jitter;
+ ngx_stream_core_srv_conf_t *cscf;
+
+ cscf = ngx_stream_get_module_srv_conf(periodic->conf_ctx,
+ ngx_stream_core_module);
+ log = cscf->error_log;
+
+ ngx_memcpy(&periodic->log, log, sizeof(ngx_log_t));
+
+ periodic->connection = NULL;
+
+ periodic->event.handler = ngx_stream_js_periodic_handler;
+ periodic->event.data = periodic;
+ periodic->event.log = log;
+ periodic->event.cancelable = 1;
+
+ jitter = periodic->jitter ? (ngx_msec_t) ngx_random() % periodic->jitter
+ : 0;
+ ngx_add_timer(&periodic->event, jitter + 1);
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_js_init_worker(ngx_cycle_t *cycle)
+{
+ ngx_uint_t i;
+ ngx_js_periodic_t *periodics;
+ ngx_js_main_conf_t *jmcf;
+
+ if ((ngx_process != NGX_PROCESS_WORKER)
+ && ngx_process != NGX_PROCESS_SINGLE)
+ {
+ return NGX_OK;
+ }
+
+ jmcf = ngx_stream_cycle_get_module_main_conf(cycle, ngx_stream_js_module);
+
+ if (jmcf == NULL || jmcf->periodics == NULL) {
+ return NGX_OK;
+ }
+
+ periodics = jmcf->periodics->elts;
+
+ for (i = 0; i < jmcf->periodics->nelts; i++) {
+ periodics[i].fd = 1000000 + i;
+
+ if (ngx_stream_js_periodic_init(&periodics[i]) != NGX_OK) {
+ return NGX_ERROR;
+ }
+ }
+
+ return NGX_OK;
+}
+
+
+static char *
+ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_str_t *value, s;
+ ngx_msec_t interval, jitter;
+ ngx_uint_t i;
+ ngx_js_periodic_t *periodic;
+ ngx_js_main_conf_t *jmcf;
+
+ if (cf->args->nelts < 2) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "method name is required");
+ return NGX_CONF_ERROR;
+ }
+
+ jmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_js_module);
+
+ if (jmcf->periodics == NULL) {
+ jmcf->periodics = ngx_array_create(cf->pool, 1,
+ sizeof(ngx_js_periodic_t));
+ if (jmcf->periodics == NULL) {
+ return NGX_CONF_ERROR;
+ }
+ }
+
+ periodic = ngx_array_push(jmcf->periodics);
+ if (periodic == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+
+ jitter = 0;
+ interval = 5000;
+
+ value = cf->args->elts;
+
+ for (i = 2; i < cf->args->nelts; i++) {
+
+ if (ngx_strncmp(value[i].data, "interval=", 9) == 0) {
+ s.len = value[i].len - 9;
+ s.data = value[i].data + 9;
+
+ interval = ngx_parse_time(&s, 0);
+
+ if (interval == (ngx_msec_t) NGX_ERROR || interval == 0) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+ if (ngx_strncmp(value[i].data, "jitter=", 7) == 0) {
+ s.len = value[i].len - 7;
+ s.data = value[i].data + 7;
+
+ jitter = ngx_parse_time(&s, 0);
+
+ if (jitter == (ngx_msec_t) NGX_ERROR) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+invalid:
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid parameter \"%V\"", &value[i]);
+ return NGX_CONF_ERROR;
+ }
+
+ periodic->method = value[1];
+ periodic->interval = interval;
+ periodic->jitter = jitter;
+ periodic->conf_ctx = cf->ctx;
+
+ return NGX_CONF_OK;
+}
+
static char *
ngx_stream_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
@@ -1860,6 +2241,7 @@ ngx_stream_js_create_main_conf(ngx_conf_t *cf)
* set by ngx_pcalloc():
*
* jmcf->dicts = NULL;
+ * jmcf->periodics = NULL;
*/
return jmcf;