diff options
Diffstat (limited to 'nginx/ngx_stream_js_module.c')
-rw-r--r-- | nginx/ngx_stream_js_module.c | 402 |
1 files changed, 392 insertions, 10 deletions
diff --git a/nginx/ngx_stream_js_module.c b/nginx/ngx_stream_js_module.c index b9bba7d6..ab66de9d 100644 --- a/nginx/ngx_stream_js_module.c +++ b/nginx/ngx_stream_js_module.c @@ -28,6 +28,26 @@ typedef struct { typedef struct { + ngx_stream_conf_ctx_t *conf_ctx; + ngx_connection_t *connection; + void *padding; + + /** + * fd is used for event debug and should be at the same position + * as in ngx_connection_t: after a 3rd pointer. + */ + ngx_socket_t fd; + + ngx_str_t method; + ngx_msec_t interval; + ngx_msec_t jitter; + + ngx_log_t log; + ngx_event_t event; +} ngx_js_periodic_t; + + +typedef struct { njs_vm_t *vm; njs_opaque_value_t retval; njs_opaque_value_t args[3]; @@ -43,6 +63,7 @@ typedef struct { ngx_stream_js_ev_t events[2]; unsigned filter:1; unsigned in_progress:1; + ngx_js_periodic_t *periodic; } ngx_stream_js_ctx_t; @@ -66,7 +87,8 @@ static ngx_int_t ngx_stream_js_variable_set(ngx_stream_session_t *s, ngx_stream_variable_value_t *v, uintptr_t data); static ngx_int_t ngx_stream_js_variable_var(ngx_stream_session_t *s, ngx_stream_variable_value_t *v, uintptr_t data); -static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s); +static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s, + unsigned inject_session); static void ngx_stream_js_drop_events(ngx_stream_js_ctx_t *ctx); static void ngx_stream_js_cleanup(void *data); static njs_int_t ngx_stream_js_run_event(ngx_stream_session_t *s, @@ -114,8 +136,18 @@ static size_t ngx_stream_js_max_response_buffer_size(njs_vm_t *vm, static void ngx_stream_js_handle_event(ngx_stream_session_t *s, njs_vm_event_t vm_event, njs_value_t *args, njs_uint_t nargs); +static void ngx_stream_js_periodic_handler(ngx_event_t *ev); +static void ngx_stream_js_periodic_event_handler(ngx_event_t *ev); +static void ngx_stream_js_periodic_finalize(ngx_stream_session_t *s, + ngx_int_t rc); +static void ngx_stream_js_periodic_destroy(ngx_stream_session_t *s, + ngx_js_periodic_t *periodic); + static njs_int_t ngx_js_stream_init(njs_vm_t *vm); static ngx_int_t ngx_stream_js_init(ngx_conf_t *cf); +static ngx_int_t ngx_stream_js_init_worker(ngx_cycle_t *cycle); +static char *ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); static char *ngx_stream_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); static char *ngx_stream_js_var(ngx_conf_t *cf, ngx_command_t *cmd, @@ -154,6 +186,13 @@ static ngx_command_t ngx_stream_js_commands[] = { 0, NULL }, + { ngx_string("js_periodic"), + NGX_STREAM_SRV_CONF|NGX_CONF_ANY, + ngx_stream_js_periodic, + NGX_STREAM_SRV_CONF_OFFSET, + 0, + NULL }, + { ngx_string("js_preload_object"), NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE13, ngx_js_preload_object, @@ -293,7 +332,7 @@ ngx_module_t ngx_stream_js_module = { NGX_STREAM_MODULE, /* module type */ NULL, /* init master */ NULL, /* init module */ - NULL, /* init process */ + ngx_stream_js_init_worker, /* init process */ NULL, /* init thread */ NULL, /* exit thread */ NULL, /* exit process */ @@ -647,7 +686,7 @@ ngx_stream_js_phase_handler(ngx_stream_session_t *s, ngx_str_t *name) return NGX_DECLINED; } - rc = ngx_stream_js_init_vm(s); + rc = ngx_stream_js_init_vm(s, 1); if (rc != NGX_OK) { return rc; } @@ -728,7 +767,7 @@ ngx_stream_js_body_filter(ngx_stream_session_t *s, ngx_chain_t *in, ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "stream js filter u:%ui", from_upstream); - rc = ngx_stream_js_init_vm(s); + rc = ngx_stream_js_init_vm(s, 1); if (rc == NGX_ERROR) { return NGX_ERROR; @@ -836,7 +875,7 @@ ngx_stream_js_variable_set(ngx_stream_session_t *s, ngx_str_t value; ngx_stream_js_ctx_t *ctx; - rc = ngx_stream_js_init_vm(s); + rc = ngx_stream_js_init_vm(s, 1); if (rc == NGX_ERROR) { return NGX_ERROR; @@ -910,7 +949,7 @@ ngx_stream_js_variable_var(ngx_stream_session_t *s, static ngx_int_t -ngx_stream_js_init_vm(ngx_stream_session_t *s) +ngx_stream_js_init_vm(ngx_stream_session_t *s, unsigned inject_session) { njs_int_t rc; njs_str_t key; @@ -987,10 +1026,12 @@ ngx_stream_js_init_vm(ngx_stream_session_t *s) return NGX_ERROR; } - rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]), - ngx_stream_js_session_proto_id, s, 0); - if (rc != NJS_OK) { - return NGX_ERROR; + if (inject_session) { + rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]), + ngx_stream_js_session_proto_id, s, 0); + if (rc != NJS_OK) { + return NGX_ERROR; + } } return NGX_OK; @@ -1695,12 +1736,21 @@ ngx_stream_js_handle_event(ngx_stream_session_t *s, njs_vm_event_t vm_event, rc = njs_vm_run(ctx->vm); + ngx_log_debug2(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, + "stream js post event handler rc: %i event: %p", + (ngx_int_t) rc, vm_event); + if (rc == NJS_ERROR) { ngx_js_retval(ctx->vm, NULL, &exception); ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, "js exception: %V", &exception); + if (s->health_check) { + ngx_stream_js_periodic_finalize(s, NGX_ERROR); + return; + } + ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR); } @@ -1754,6 +1804,337 @@ ngx_stream_js_init_conf_vm(ngx_conf_t *cf, ngx_js_loc_conf_t *conf) } +static void +ngx_stream_js_periodic_handler(ngx_event_t *ev) +{ + ngx_int_t rc; + ngx_msec_t timer; + ngx_js_periodic_t *periodic; + ngx_connection_t *c; + ngx_stream_js_ctx_t *ctx; + ngx_stream_session_t *s; + ngx_stream_core_main_conf_t *cmcf; + + periodic = ev->data; + + timer = periodic->interval; + + if (periodic->jitter) { + timer += (ngx_msec_t) ngx_random() % periodic->jitter; + } + + ngx_add_timer(&periodic->event, timer); + + c = periodic->connection; + + if (c != NULL) { + ngx_log_error(NGX_LOG_ERR, c->log, 0, + "js periodic \"%V\" is already running, killing previous " + "instance", &periodic->method); + + ngx_stream_js_periodic_finalize(c->data, NGX_ERROR); + } + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, &periodic->log, 0, + "stream js periodic handler: \"%V\"", &periodic->method); + + c = ngx_get_connection(0, &periodic->log); + + if (c == NULL) { + return; + } + + c->pool = ngx_create_pool(1024, c->log); + if (c->pool == NULL) { + goto free_connection; + } + + s = ngx_pcalloc(c->pool, sizeof(ngx_stream_session_t)); + if (s == NULL) { + goto free_pool; + } + + s->main_conf = periodic->conf_ctx->main_conf; + s->srv_conf = periodic->conf_ctx->srv_conf; + + s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module); + if (s->ctx == NULL) { + goto free_pool; + } + + cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module); + + s->variables = ngx_pcalloc(c->pool, cmcf->variables.nelts + * sizeof(ngx_stream_variable_value_t)); + if (s->variables == NULL) { + goto free_pool; + } + + c->data = s; + c->destroyed = 0; + c->read->log = &periodic->log; + c->read->handler = ngx_stream_js_periodic_event_handler; + + s->received = 1; + s->connection = c; + s->signature = NGX_STREAM_MODULE; + + s->health_check = 1; + + rc = ngx_stream_js_init_vm(s, 0); + + if (rc != NGX_OK) { + ngx_stream_js_periodic_destroy(s, periodic); + return; + } + + periodic->connection = c; + + ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module); + + ctx->periodic = periodic; + + s->received++; + + rc = ngx_js_invoke(ctx->vm, &periodic->method, &periodic->log, NULL, 0, + &ctx->retval); + + if (rc == NGX_AGAIN) { + rc = NGX_OK; + } + + s->received--; + + ngx_stream_js_periodic_finalize(s, rc); + + return; + +free_pool: + + ngx_destroy_pool(c->pool); + +free_connection: + + ngx_close_connection(c); +} + + +static void +ngx_stream_js_periodic_event_handler(ngx_event_t *ev) +{ + ngx_connection_t *c; + ngx_stream_js_ctx_t *ctx; + ngx_stream_session_t *s; + + c = ev->data; + + ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream js periodic event handler"); + + if (c->close) { + ngx_stream_js_periodic_finalize(c->data, NGX_ERROR); + return; + } + + s = c->data; + + ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module); + + if (!njs_vm_pending(ctx->vm)) { + ngx_stream_js_periodic_finalize(s, NGX_OK); + return; + } +} + + +static void +ngx_stream_js_periodic_finalize(ngx_stream_session_t *s, ngx_int_t rc) +{ + ngx_stream_js_ctx_t *ctx; + + ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module); + + ngx_log_debug4(NGX_LOG_DEBUG_STREAM, s->connection->log, 0, + "stream js periodic finalize: \"%V\" rc: %i c: %i " + "pending: %i", &ctx->periodic->method, rc, s->received, + njs_vm_pending(ctx->vm)); + + if (s->received > 1 || (rc == NGX_OK && njs_vm_pending(ctx->vm))) { + return; + } + + ngx_stream_js_periodic_destroy(s, ctx->periodic); +} + + +static void +ngx_stream_js_periodic_destroy(ngx_stream_session_t *s, + ngx_js_periodic_t *periodic) +{ + ngx_connection_t *c; + + c = s->connection; + + ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, + "stream js periodic request destroy: \"%V\"", + &periodic->method); + + periodic->connection = NULL; + + ngx_free_connection(c); + + ngx_destroy_pool(c->pool); + + c->fd = (ngx_socket_t) -1; + c->pool = NULL; + c->destroyed = 1; + + if (c->read->posted) { + ngx_delete_posted_event(c->read); + } +} + + +static ngx_int_t +ngx_stream_js_periodic_init(ngx_js_periodic_t *periodic) +{ + ngx_log_t *log; + ngx_msec_t jitter; + ngx_stream_core_srv_conf_t *cscf; + + cscf = ngx_stream_get_module_srv_conf(periodic->conf_ctx, + ngx_stream_core_module); + log = cscf->error_log; + + ngx_memcpy(&periodic->log, log, sizeof(ngx_log_t)); + + periodic->connection = NULL; + + periodic->event.handler = ngx_stream_js_periodic_handler; + periodic->event.data = periodic; + periodic->event.log = log; + periodic->event.cancelable = 1; + + jitter = periodic->jitter ? (ngx_msec_t) ngx_random() % periodic->jitter + : 0; + ngx_add_timer(&periodic->event, jitter + 1); + + return NGX_OK; +} + + +static ngx_int_t +ngx_stream_js_init_worker(ngx_cycle_t *cycle) +{ + ngx_uint_t i; + ngx_js_periodic_t *periodics; + ngx_js_main_conf_t *jmcf; + + if ((ngx_process != NGX_PROCESS_WORKER) + && ngx_process != NGX_PROCESS_SINGLE) + { + return NGX_OK; + } + + jmcf = ngx_stream_cycle_get_module_main_conf(cycle, ngx_stream_js_module); + + if (jmcf == NULL || jmcf->periodics == NULL) { + return NGX_OK; + } + + periodics = jmcf->periodics->elts; + + for (i = 0; i < jmcf->periodics->nelts; i++) { + periodics[i].fd = 1000000 + i; + + if (ngx_stream_js_periodic_init(&periodics[i]) != NGX_OK) { + return NGX_ERROR; + } + } + + return NGX_OK; +} + + +static char * +ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_str_t *value, s; + ngx_msec_t interval, jitter; + ngx_uint_t i; + ngx_js_periodic_t *periodic; + ngx_js_main_conf_t *jmcf; + + if (cf->args->nelts < 2) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "method name is required"); + return NGX_CONF_ERROR; + } + + jmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_js_module); + + if (jmcf->periodics == NULL) { + jmcf->periodics = ngx_array_create(cf->pool, 1, + sizeof(ngx_js_periodic_t)); + if (jmcf->periodics == NULL) { + return NGX_CONF_ERROR; + } + } + + periodic = ngx_array_push(jmcf->periodics); + if (periodic == NULL) { + return NGX_CONF_ERROR; + } + + ngx_memzero(periodic, sizeof(ngx_js_periodic_t)); + + jitter = 0; + interval = 5000; + + value = cf->args->elts; + + for (i = 2; i < cf->args->nelts; i++) { + + if (ngx_strncmp(value[i].data, "interval=", 9) == 0) { + s.len = value[i].len - 9; + s.data = value[i].data + 9; + + interval = ngx_parse_time(&s, 0); + + if (interval == (ngx_msec_t) NGX_ERROR || interval == 0) { + goto invalid; + } + + continue; + } + + if (ngx_strncmp(value[i].data, "jitter=", 7) == 0) { + s.len = value[i].len - 7; + s.data = value[i].data + 7; + + jitter = ngx_parse_time(&s, 0); + + if (jitter == (ngx_msec_t) NGX_ERROR) { + goto invalid; + } + + continue; + } + +invalid: + + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid parameter \"%V\"", &value[i]); + return NGX_CONF_ERROR; + } + + periodic->method = value[1]; + periodic->interval = interval; + periodic->jitter = jitter; + periodic->conf_ctx = cf->ctx; + + return NGX_CONF_OK; +} + static char * ngx_stream_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) { @@ -1860,6 +2241,7 @@ ngx_stream_js_create_main_conf(ngx_conf_t *cf) * set by ngx_pcalloc(): * * jmcf->dicts = NULL; + * jmcf->periodics = NULL; */ return jmcf; |