} ngx_http_js_loc_conf_t;
+typedef struct {
+ ngx_http_conf_ctx_t *conf_ctx;
+ ngx_connection_t *connection;
+ void *padding;
+
+ /**
+ * fd is used for event debug and should be at the same position
+ * as in ngx_connection_t: after a 3rd pointer.
+ */
+ ngx_socket_t fd;
+
+ ngx_str_t method;
+ ngx_msec_t interval;
+ ngx_msec_t jitter;
+
+ ngx_log_t log;
+ ngx_http_log_ctx_t log_ctx;
+ ngx_event_t event;
+} ngx_js_periodic_t;
+
+
#define NJS_HEADER_SEMICOLON 0x1
#define NJS_HEADER_SINGLE 0x2
#define NJS_HEADER_ARRAY 0x4
ngx_chain_t **last_out;
ngx_chain_t *free;
ngx_chain_t *busy;
+
+ ngx_js_periodic_t *periodic;
} ngx_http_js_ctx_t;
ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_js_variable_var(ngx_http_request_t *r,
ngx_http_variable_value_t *v, uintptr_t data);
-static ngx_int_t ngx_http_js_init_vm(ngx_http_request_t *r);
+static ngx_int_t ngx_http_js_init_vm(ngx_http_request_t *r,
+ unsigned inject_request);
static void ngx_http_js_cleanup_ctx(void *data);
static njs_int_t ngx_http_js_ext_keys_header(njs_vm_t *vm, njs_value_t *value,
static void ngx_http_js_handle_vm_event(ngx_http_request_t *r,
njs_vm_event_t vm_event, njs_value_t *args, njs_uint_t nargs);
+static void ngx_http_js_periodic_handler(ngx_event_t *ev);
+static void ngx_http_js_periodic_write_event_handler(ngx_http_request_t *r);
+static void ngx_http_js_periodic_shutdown_handler(ngx_event_t *ev);
+static void ngx_http_js_periodic_finalize(ngx_http_request_t *r, ngx_int_t rc);
+static void ngx_http_js_periodic_destroy(ngx_http_request_t *r,
+ ngx_js_periodic_t *periodic);
+
static njs_int_t ngx_js_http_init(njs_vm_t *vm);
static ngx_int_t ngx_http_js_init(ngx_conf_t *cf);
+static ngx_int_t ngx_http_js_init_worker(ngx_cycle_t *cycle);
+static char *ngx_http_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
static char *ngx_http_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static char *ngx_http_js_var(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static char *ngx_http_js_content(ngx_conf_t *cf, ngx_command_t *cmd,
0,
NULL },
+ { ngx_string("js_periodic"),
+ NGX_HTTP_LOC_CONF|NGX_CONF_ANY,
+ ngx_http_js_periodic,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ 0,
+ NULL },
+
{ ngx_string("js_preload_object"),
NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE13,
ngx_js_preload_object,
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
- NULL, /* init process */
+ ngx_http_js_init_worker, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
"http js content event handler");
- rc = ngx_http_js_init_vm(r);
+ rc = ngx_http_js_init_vm(r, 1);
if (rc == NGX_ERROR || rc == NGX_DECLINED) {
ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
return ngx_http_next_header_filter(r);
}
- rc = ngx_http_js_init_vm(r);
+ rc = ngx_http_js_init_vm(r, 1);
if (rc == NGX_ERROR || rc == NGX_DECLINED) {
return NGX_ERROR;
return ngx_http_next_body_filter(r, in);
}
- rc = ngx_http_js_init_vm(r);
+ rc = ngx_http_js_init_vm(r, 1);
if (rc == NGX_ERROR || rc == NGX_DECLINED) {
return NGX_ERROR;
ngx_str_t value;
ngx_http_js_ctx_t *ctx;
- rc = ngx_http_js_init_vm(r);
+ rc = ngx_http_js_init_vm(r, 1);
if (rc == NGX_ERROR) {
return NGX_ERROR;
static ngx_int_t
-ngx_http_js_init_vm(ngx_http_request_t *r)
+ngx_http_js_init_vm(ngx_http_request_t *r, unsigned inject_request)
{
njs_int_t rc;
ngx_str_t exception;
return NGX_ERROR;
}
- rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->request),
- ngx_http_js_request_proto_id, r, 0);
- if (rc != NJS_OK) {
- return NGX_ERROR;
+ if (inject_request) {
+ rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->request),
+ ngx_http_js_request_proto_id, r, 0);
+ if (rc != NJS_OK) {
+ return NGX_ERROR;
+ }
}
return NGX_OK;
}
+static void
+ngx_http_js_periodic_handler(ngx_event_t *ev)
+{
+ ngx_int_t rc;
+ ngx_msec_t timer;
+ ngx_connection_t *c;
+ ngx_js_periodic_t *periodic;
+ ngx_http_js_ctx_t *ctx;
+ ngx_http_request_t *r;
+ ngx_http_connection_t hc;
+
+ periodic = ev->data;
+
+ timer = periodic->interval;
+
+ if (periodic->jitter) {
+ timer += (ngx_msec_t) ngx_random() % periodic->jitter;
+ }
+
+ ngx_add_timer(&periodic->event, timer);
+
+ c = periodic->connection;
+
+ if (c != NULL) {
+ ngx_log_error(NGX_LOG_ERR, c->log, 0,
+ "http js periodic \"%V\" is already running, killing "
+ "previous instance", &periodic->method);
+
+ ngx_http_js_periodic_finalize(c->data, NGX_ERROR);
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, &periodic->log, 0,
+ "http js periodic handler: \"%V\"", &periodic->method);
+
+ c = ngx_get_connection(0, &periodic->log);
+
+ if (c == NULL) {
+ return;
+ }
+
+ ngx_memzero(&hc, sizeof(ngx_http_connection_t));
+
+ hc.conf_ctx = periodic->conf_ctx;
+
+ c->data = &hc;
+
+ r = ngx_http_create_request(c);
+
+ if (r == NULL) {
+ ngx_free_connection(c);
+ c->fd = (ngx_socket_t) -1;
+ return;
+ }
+
+ c->data = r;
+ c->destroyed = 0;
+ c->pool = r->pool;
+ c->read->handler = ngx_http_js_periodic_shutdown_handler;
+
+ periodic->connection = c;
+ periodic->log_ctx.request = r;
+ periodic->log_ctx.connection = c;
+
+ r->method = NGX_HTTP_GET;
+ r->method_name = ngx_http_core_get_method;
+
+ ngx_str_set(&r->uri, "/");
+ r->unparsed_uri = r->uri;
+ r->valid_unparsed_uri = 1;
+
+ r->health_check = 1;
+ r->write_event_handler = ngx_http_js_periodic_write_event_handler;
+
+ rc = ngx_http_js_init_vm(r, 0);
+
+ if (rc != NGX_OK) {
+ ngx_http_js_periodic_destroy(r, periodic);
+ return;
+ }
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_js_module);
+
+ ctx->periodic = periodic;
+
+ r->count++;
+
+ rc = ngx_js_invoke(ctx->vm, &periodic->method, &periodic->log, NULL, 0,
+ &ctx->retval);
+
+ if (rc == NGX_AGAIN) {
+ rc = NGX_OK;
+ }
+
+ r->count--;
+
+ ngx_http_js_periodic_finalize(r, rc);
+}
+
+
+static void
+ngx_http_js_periodic_write_event_handler(ngx_http_request_t *r)
+{
+ ngx_http_js_ctx_t *ctx;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http js periodic write event handler");
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_js_module);
+
+ if (!njs_vm_pending(ctx->vm)) {
+ ngx_http_js_periodic_finalize(r, NGX_OK);
+ return;
+ }
+}
+
+
+static void
+ngx_http_js_periodic_shutdown_handler(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+
+ c = ev->data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
+ "http js periodic shutdown handler");
+
+ if (c->close) {
+ ngx_http_js_periodic_finalize(c->data, NGX_ERROR);
+ return;
+ }
+
+ ngx_log_error(NGX_LOG_ERR, c->log, 0, "http js periodic shutdown handler "
+ "while not closing");
+}
+
+
+static void
+ngx_http_js_periodic_finalize(ngx_http_request_t *r, ngx_int_t rc)
+{
+ ngx_http_js_ctx_t *ctx;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_js_module);
+
+ ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "http js periodic finalize: \"%V\" rc: %i c: %i pending: %i",
+ &ctx->periodic->method, rc, r->count,
+ njs_vm_pending(ctx->vm));
+
+ if (r->count > 1 || (rc == NGX_OK && njs_vm_pending(ctx->vm))) {
+ return;
+ }
+
+ ngx_http_js_periodic_destroy(r, ctx->periodic);
+}
+
+
+static void
+ngx_http_js_periodic_destroy(ngx_http_request_t *r, ngx_js_periodic_t *periodic)
+{
+ ngx_connection_t *c;
+ ngx_http_cleanup_t *cln;
+
+ c = r->connection;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
+ "http js periodic destroy: \"%V\"",
+ &periodic->method);
+
+ periodic->connection = NULL;
+
+ for (cln = r->cleanup; cln; cln = cln->next) {
+ if (cln->handler) {
+ cln->handler(cln->data);
+ }
+ }
+
+ ngx_free_connection(c);
+
+ c->fd = (ngx_socket_t) -1;
+ c->pool = NULL;
+ c->destroyed = 1;
+
+ ngx_destroy_pool(r->pool);
+}
+
+
+static ngx_int_t
+ngx_http_js_periodic_init(ngx_js_periodic_t *periodic)
+{
+ ngx_log_t *log;
+ ngx_msec_t jitter;
+ ngx_http_core_loc_conf_t *clcf;
+
+ clcf = ngx_http_get_module_loc_conf(periodic->conf_ctx,
+ ngx_http_core_module);
+ log = clcf->error_log;
+
+ ngx_memcpy(&periodic->log, log, sizeof(ngx_log_t));
+
+ periodic->log.data = &periodic->log_ctx;
+ periodic->connection = NULL;
+
+ periodic->event.handler = ngx_http_js_periodic_handler;
+ periodic->event.data = periodic;
+ periodic->event.log = log;
+ periodic->event.cancelable = 1;
+
+ jitter = periodic->jitter ? (ngx_msec_t) ngx_random() % periodic->jitter
+ : 0;
+ ngx_add_timer(&periodic->event, jitter + 1);
+
+ return NGX_OK;
+}
+
+
static njs_host_event_t
ngx_http_js_set_timer(njs_external_ptr_t external, uint64_t delay,
njs_vm_event_t vm_event)
ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
"js exception: %V", &exception);
+ if (r->health_check) {
+ ngx_http_js_periodic_finalize(r, NGX_ERROR);
+ return;
+ }
+
ngx_http_finalize_request(r, NGX_ERROR);
return;
}
}
+static ngx_int_t
+ngx_http_js_init_worker(ngx_cycle_t *cycle)
+{
+ ngx_uint_t i;
+ ngx_js_periodic_t *periodics;
+ ngx_js_main_conf_t *jmcf;
+
+ if ((ngx_process != NGX_PROCESS_WORKER)
+ && ngx_process != NGX_PROCESS_SINGLE)
+ {
+ return NGX_OK;
+ }
+
+ jmcf = ngx_http_cycle_get_module_main_conf(cycle, ngx_http_js_module);
+
+ if (jmcf == NULL || jmcf->periodics == NULL) {
+ return NGX_OK;
+ }
+
+ periodics = jmcf->periodics->elts;
+
+ for (i = 0; i < jmcf->periodics->nelts; i++) {
+ periodics[i].fd = 1000000 + i;
+
+ if (ngx_http_js_periodic_init(&periodics[i]) != NGX_OK) {
+ return NGX_ERROR;
+ }
+ }
+
+ return NGX_OK;
+}
+
+
+static char *
+ngx_http_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_str_t *value, s;
+ ngx_msec_t interval, jitter;
+ ngx_uint_t i;
+ ngx_js_periodic_t *periodic;
+ ngx_js_main_conf_t *jmcf;
+
+ if (cf->args->nelts < 2) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "method name is required");
+ return NGX_CONF_ERROR;
+ }
+
+ jmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_js_module);
+
+ if (jmcf->periodics == NULL) {
+ jmcf->periodics = ngx_array_create(cf->pool, 1,
+ sizeof(ngx_js_periodic_t));
+ if (jmcf->periodics == NULL) {
+ return NGX_CONF_ERROR;
+ }
+ }
+
+ periodic = ngx_array_push(jmcf->periodics);
+ if (periodic == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+
+ jitter = 0;
+ interval = 5000;
+
+ value = cf->args->elts;
+
+ for (i = 2; i < cf->args->nelts; i++) {
+
+ if (ngx_strncmp(value[i].data, "interval=", 9) == 0) {
+ s.len = value[i].len - 9;
+ s.data = value[i].data + 9;
+
+ interval = ngx_parse_time(&s, 0);
+
+ if (interval == (ngx_msec_t) NGX_ERROR || interval == 0) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+ if (ngx_strncmp(value[i].data, "jitter=", 7) == 0) {
+ s.len = value[i].len - 7;
+ s.data = value[i].data + 7;
+
+ jitter = ngx_parse_time(&s, 0);
+
+ if (jitter == (ngx_msec_t) NGX_ERROR) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+invalid:
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid parameter \"%V\"", &value[i]);
+ return NGX_CONF_ERROR;
+ }
+
+ periodic->method = value[1];
+ periodic->interval = interval;
+ periodic->jitter = jitter;
+ periodic->conf_ctx = cf->ctx;
+
+ return NGX_CONF_OK;
+}
+
+
static char *
ngx_http_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
* set by ngx_pcalloc():
*
* jmcf->dicts = NULL;
+ * jmcf->periodics = NULL;
*/
return jmcf;
#define NGX_JS_COMMON_MAIN_CONF \
- ngx_js_dict_t *dicts \
+ ngx_js_dict_t *dicts; \
+ ngx_array_t *periodics \
#define _NGX_JS_COMMON_LOC_CONF \
} ngx_stream_js_ev_t;
+typedef struct {
+ ngx_stream_conf_ctx_t *conf_ctx;
+ ngx_connection_t *connection;
+ void *padding;
+
+ /**
+ * fd is used for event debug and should be at the same position
+ * as in ngx_connection_t: after a 3rd pointer.
+ */
+ ngx_socket_t fd;
+
+ ngx_str_t method;
+ ngx_msec_t interval;
+ ngx_msec_t jitter;
+
+ ngx_log_t log;
+ ngx_event_t event;
+} ngx_js_periodic_t;
+
+
typedef struct {
njs_vm_t *vm;
njs_opaque_value_t retval;
ngx_stream_js_ev_t events[2];
unsigned filter:1;
unsigned in_progress:1;
+ ngx_js_periodic_t *periodic;
} ngx_stream_js_ctx_t;
ngx_stream_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_stream_js_variable_var(ngx_stream_session_t *s,
ngx_stream_variable_value_t *v, uintptr_t data);
-static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s);
+static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s,
+ unsigned inject_session);
static void ngx_stream_js_drop_events(ngx_stream_js_ctx_t *ctx);
static void ngx_stream_js_cleanup(void *data);
static njs_int_t ngx_stream_js_run_event(ngx_stream_session_t *s,
static void ngx_stream_js_handle_event(ngx_stream_session_t *s,
njs_vm_event_t vm_event, njs_value_t *args, njs_uint_t nargs);
+static void ngx_stream_js_periodic_handler(ngx_event_t *ev);
+static void ngx_stream_js_periodic_event_handler(ngx_event_t *ev);
+static void ngx_stream_js_periodic_finalize(ngx_stream_session_t *s,
+ ngx_int_t rc);
+static void ngx_stream_js_periodic_destroy(ngx_stream_session_t *s,
+ ngx_js_periodic_t *periodic);
+
static njs_int_t ngx_js_stream_init(njs_vm_t *vm);
static ngx_int_t ngx_stream_js_init(ngx_conf_t *cf);
+static ngx_int_t ngx_stream_js_init_worker(ngx_cycle_t *cycle);
+static char *ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf);
static char *ngx_stream_js_set(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static char *ngx_stream_js_var(ngx_conf_t *cf, ngx_command_t *cmd,
0,
NULL },
+ { ngx_string("js_periodic"),
+ NGX_STREAM_SRV_CONF|NGX_CONF_ANY,
+ ngx_stream_js_periodic,
+ NGX_STREAM_SRV_CONF_OFFSET,
+ 0,
+ NULL },
+
{ ngx_string("js_preload_object"),
NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE13,
ngx_js_preload_object,
NGX_STREAM_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
- NULL, /* init process */
+ ngx_stream_js_init_worker, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
return NGX_DECLINED;
}
- rc = ngx_stream_js_init_vm(s);
+ rc = ngx_stream_js_init_vm(s, 1);
if (rc != NGX_OK) {
return rc;
}
ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "stream js filter u:%ui",
from_upstream);
- rc = ngx_stream_js_init_vm(s);
+ rc = ngx_stream_js_init_vm(s, 1);
if (rc == NGX_ERROR) {
return NGX_ERROR;
ngx_str_t value;
ngx_stream_js_ctx_t *ctx;
- rc = ngx_stream_js_init_vm(s);
+ rc = ngx_stream_js_init_vm(s, 1);
if (rc == NGX_ERROR) {
return NGX_ERROR;
static ngx_int_t
-ngx_stream_js_init_vm(ngx_stream_session_t *s)
+ngx_stream_js_init_vm(ngx_stream_session_t *s, unsigned inject_session)
{
njs_int_t rc;
njs_str_t key;
return NGX_ERROR;
}
- rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]),
- ngx_stream_js_session_proto_id, s, 0);
- if (rc != NJS_OK) {
- return NGX_ERROR;
+ if (inject_session) {
+ rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]),
+ ngx_stream_js_session_proto_id, s, 0);
+ if (rc != NJS_OK) {
+ return NGX_ERROR;
+ }
}
return NGX_OK;
rc = njs_vm_run(ctx->vm);
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "stream js post event handler rc: %i event: %p",
+ (ngx_int_t) rc, vm_event);
+
if (rc == NJS_ERROR) {
ngx_js_retval(ctx->vm, NULL, &exception);
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"js exception: %V", &exception);
+ if (s->health_check) {
+ ngx_stream_js_periodic_finalize(s, NGX_ERROR);
+ return;
+ }
+
ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
}
}
+static void
+ngx_stream_js_periodic_handler(ngx_event_t *ev)
+{
+ ngx_int_t rc;
+ ngx_msec_t timer;
+ ngx_js_periodic_t *periodic;
+ ngx_connection_t *c;
+ ngx_stream_js_ctx_t *ctx;
+ ngx_stream_session_t *s;
+ ngx_stream_core_main_conf_t *cmcf;
+
+ periodic = ev->data;
+
+ timer = periodic->interval;
+
+ if (periodic->jitter) {
+ timer += (ngx_msec_t) ngx_random() % periodic->jitter;
+ }
+
+ ngx_add_timer(&periodic->event, timer);
+
+ c = periodic->connection;
+
+ if (c != NULL) {
+ ngx_log_error(NGX_LOG_ERR, c->log, 0,
+ "js periodic \"%V\" is already running, killing previous "
+ "instance", &periodic->method);
+
+ ngx_stream_js_periodic_finalize(c->data, NGX_ERROR);
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, &periodic->log, 0,
+ "stream js periodic handler: \"%V\"", &periodic->method);
+
+ c = ngx_get_connection(0, &periodic->log);
+
+ if (c == NULL) {
+ return;
+ }
+
+ c->pool = ngx_create_pool(1024, c->log);
+ if (c->pool == NULL) {
+ goto free_connection;
+ }
+
+ s = ngx_pcalloc(c->pool, sizeof(ngx_stream_session_t));
+ if (s == NULL) {
+ goto free_pool;
+ }
+
+ s->main_conf = periodic->conf_ctx->main_conf;
+ s->srv_conf = periodic->conf_ctx->srv_conf;
+
+ s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module);
+ if (s->ctx == NULL) {
+ goto free_pool;
+ }
+
+ cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
+
+ s->variables = ngx_pcalloc(c->pool, cmcf->variables.nelts
+ * sizeof(ngx_stream_variable_value_t));
+ if (s->variables == NULL) {
+ goto free_pool;
+ }
+
+ c->data = s;
+ c->destroyed = 0;
+ c->read->log = &periodic->log;
+ c->read->handler = ngx_stream_js_periodic_event_handler;
+
+ s->received = 1;
+ s->connection = c;
+ s->signature = NGX_STREAM_MODULE;
+
+ s->health_check = 1;
+
+ rc = ngx_stream_js_init_vm(s, 0);
+
+ if (rc != NGX_OK) {
+ ngx_stream_js_periodic_destroy(s, periodic);
+ return;
+ }
+
+ periodic->connection = c;
+
+ ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+ ctx->periodic = periodic;
+
+ s->received++;
+
+ rc = ngx_js_invoke(ctx->vm, &periodic->method, &periodic->log, NULL, 0,
+ &ctx->retval);
+
+ if (rc == NGX_AGAIN) {
+ rc = NGX_OK;
+ }
+
+ s->received--;
+
+ ngx_stream_js_periodic_finalize(s, rc);
+
+ return;
+
+free_pool:
+
+ ngx_destroy_pool(c->pool);
+
+free_connection:
+
+ ngx_close_connection(c);
+}
+
+
+static void
+ngx_stream_js_periodic_event_handler(ngx_event_t *ev)
+{
+ ngx_connection_t *c;
+ ngx_stream_js_ctx_t *ctx;
+ ngx_stream_session_t *s;
+
+ c = ev->data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream js periodic event handler");
+
+ if (c->close) {
+ ngx_stream_js_periodic_finalize(c->data, NGX_ERROR);
+ return;
+ }
+
+ s = c->data;
+
+ ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+ if (!njs_vm_pending(ctx->vm)) {
+ ngx_stream_js_periodic_finalize(s, NGX_OK);
+ return;
+ }
+}
+
+
+static void
+ngx_stream_js_periodic_finalize(ngx_stream_session_t *s, ngx_int_t rc)
+{
+ ngx_stream_js_ctx_t *ctx;
+
+ ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+ ngx_log_debug4(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+ "stream js periodic finalize: \"%V\" rc: %i c: %i "
+ "pending: %i", &ctx->periodic->method, rc, s->received,
+ njs_vm_pending(ctx->vm));
+
+ if (s->received > 1 || (rc == NGX_OK && njs_vm_pending(ctx->vm))) {
+ return;
+ }
+
+ ngx_stream_js_periodic_destroy(s, ctx->periodic);
+}
+
+
+static void
+ngx_stream_js_periodic_destroy(ngx_stream_session_t *s,
+ ngx_js_periodic_t *periodic)
+{
+ ngx_connection_t *c;
+
+ c = s->connection;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream js periodic request destroy: \"%V\"",
+ &periodic->method);
+
+ periodic->connection = NULL;
+
+ ngx_free_connection(c);
+
+ ngx_destroy_pool(c->pool);
+
+ c->fd = (ngx_socket_t) -1;
+ c->pool = NULL;
+ c->destroyed = 1;
+
+ if (c->read->posted) {
+ ngx_delete_posted_event(c->read);
+ }
+}
+
+
+static ngx_int_t
+ngx_stream_js_periodic_init(ngx_js_periodic_t *periodic)
+{
+ ngx_log_t *log;
+ ngx_msec_t jitter;
+ ngx_stream_core_srv_conf_t *cscf;
+
+ cscf = ngx_stream_get_module_srv_conf(periodic->conf_ctx,
+ ngx_stream_core_module);
+ log = cscf->error_log;
+
+ ngx_memcpy(&periodic->log, log, sizeof(ngx_log_t));
+
+ periodic->connection = NULL;
+
+ periodic->event.handler = ngx_stream_js_periodic_handler;
+ periodic->event.data = periodic;
+ periodic->event.log = log;
+ periodic->event.cancelable = 1;
+
+ jitter = periodic->jitter ? (ngx_msec_t) ngx_random() % periodic->jitter
+ : 0;
+ ngx_add_timer(&periodic->event, jitter + 1);
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_js_init_worker(ngx_cycle_t *cycle)
+{
+ ngx_uint_t i;
+ ngx_js_periodic_t *periodics;
+ ngx_js_main_conf_t *jmcf;
+
+ if ((ngx_process != NGX_PROCESS_WORKER)
+ && ngx_process != NGX_PROCESS_SINGLE)
+ {
+ return NGX_OK;
+ }
+
+ jmcf = ngx_stream_cycle_get_module_main_conf(cycle, ngx_stream_js_module);
+
+ if (jmcf == NULL || jmcf->periodics == NULL) {
+ return NGX_OK;
+ }
+
+ periodics = jmcf->periodics->elts;
+
+ for (i = 0; i < jmcf->periodics->nelts; i++) {
+ periodics[i].fd = 1000000 + i;
+
+ if (ngx_stream_js_periodic_init(&periodics[i]) != NGX_OK) {
+ return NGX_ERROR;
+ }
+ }
+
+ return NGX_OK;
+}
+
+
+static char *
+ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_str_t *value, s;
+ ngx_msec_t interval, jitter;
+ ngx_uint_t i;
+ ngx_js_periodic_t *periodic;
+ ngx_js_main_conf_t *jmcf;
+
+ if (cf->args->nelts < 2) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "method name is required");
+ return NGX_CONF_ERROR;
+ }
+
+ jmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_js_module);
+
+ if (jmcf->periodics == NULL) {
+ jmcf->periodics = ngx_array_create(cf->pool, 1,
+ sizeof(ngx_js_periodic_t));
+ if (jmcf->periodics == NULL) {
+ return NGX_CONF_ERROR;
+ }
+ }
+
+ periodic = ngx_array_push(jmcf->periodics);
+ if (periodic == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+
+ jitter = 0;
+ interval = 5000;
+
+ value = cf->args->elts;
+
+ for (i = 2; i < cf->args->nelts; i++) {
+
+ if (ngx_strncmp(value[i].data, "interval=", 9) == 0) {
+ s.len = value[i].len - 9;
+ s.data = value[i].data + 9;
+
+ interval = ngx_parse_time(&s, 0);
+
+ if (interval == (ngx_msec_t) NGX_ERROR || interval == 0) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+ if (ngx_strncmp(value[i].data, "jitter=", 7) == 0) {
+ s.len = value[i].len - 7;
+ s.data = value[i].data + 7;
+
+ jitter = ngx_parse_time(&s, 0);
+
+ if (jitter == (ngx_msec_t) NGX_ERROR) {
+ goto invalid;
+ }
+
+ continue;
+ }
+
+invalid:
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid parameter \"%V\"", &value[i]);
+ return NGX_CONF_ERROR;
+ }
+
+ periodic->method = value[1];
+ periodic->interval = interval;
+ periodic->jitter = jitter;
+ periodic->conf_ctx = cf->ctx;
+
+ return NGX_CONF_OK;
+}
+
static char *
ngx_stream_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
* set by ngx_pcalloc():
*
* jmcf->dicts = NULL;
+ * jmcf->periodics = NULL;
*/
return jmcf;
--- /dev/null
+#!/usr/bin/perl
+
+# (C) Dmitry Volyntsev
+# (C) Nginx, Inc.
+
+# Tests for js_periodic directive.
+
+###############################################################################
+
+use warnings;
+use strict;
+
+use Test::More;
+use Socket qw/ CRLF /;
+
+BEGIN { use FindBin; chdir($FindBin::Bin); }
+
+use lib 'lib';
+use Test::Nginx;
+
+###############################################################################
+
+select STDERR; $| = 1;
+select STDOUT; $| = 1;
+
+my $t = Test::Nginx->new()->has(qw/http/)
+ ->write_file_expand('nginx.conf', <<'EOF');
+
+%%TEST_GLOBALS%%
+
+daemon off;
+
+events {
+}
+
+worker_shutdown_timeout 100ms;
+
+http {
+ %%TEST_GLOBALS_HTTP%%
+
+ js_import test.js;
+
+ js_shared_dict_zone zone=nums:32k type=number;
+ js_shared_dict_zone zone=strings:32k;
+
+ server {
+ listen 127.0.0.1:8080;
+ server_name localhost;
+
+ location @periodic {
+ js_periodic test.tick interval=30ms jitter=1ms;
+ js_periodic test.timer interval=1s;
+ js_periodic test.overrun interval=30ms;
+ js_periodic test.file interval=1s;
+ js_periodic test.fetch interval=40ms;
+ js_periodic test.multiple_fetches interval=1s;
+
+ js_periodic test.fetch_exception interval=1s;
+ js_periodic test.tick_exception interval=1s;
+ js_periodic test.timer_exception interval=1s;
+ js_periodic test.timeout_exception interval=30ms;
+ }
+
+ location /fetch_ok {
+ return 200 'ok';
+ }
+
+ location /fetch_foo {
+ return 200 'foo';
+ }
+
+ location /test_fetch {
+ js_content test.test_fetch;
+ }
+
+ location /test_file {
+ js_content test.test_file;
+ }
+
+ location /test_multiple_fetches {
+ js_content test.test_multiple_fetches;
+ }
+
+ location /test_tick {
+ js_content test.test_tick;
+ }
+
+ location /test_timer {
+ js_content test.test_timer;
+ }
+
+ location /test_timeout_exception {
+ js_content test.test_timeout_exception;
+ }
+ }
+}
+
+EOF
+
+my $p0 = port(8080);
+
+$t->write_file('test.js', <<EOF);
+ import fs from 'fs';
+
+ async function fetch() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok');
+ let body = await reply.text();
+
+ let v = ngx.shared.strings.get('fetch') || '';
+ ngx.shared.strings.set('fetch', v + body);
+ }
+
+ async function multiple_fetches() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok');
+ let reply2 = await ngx.fetch('http://127.0.0.1:$p0/fetch_foo');
+ let body = await reply.text();
+ let body2 = await reply2.text();
+
+ ngx.shared.strings.set('multiple_fetches', body + '\@' + body2);
+ }
+
+ async function fetch_exception() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ let reply = await ngx.fetch('garbage');
+ }
+
+ async function file() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+');
+
+ await fh.write('abc');
+ await fh.close();
+ }
+
+ async function overrun() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ setTimeout(() => {}, 100000);
+ }
+
+
+ function tick() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ ngx.shared.nums.incr('tick', 1);
+ }
+
+ function tick_exception() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ throw new Error("EXCEPTION");
+ }
+
+ function timer() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ setTimeout(() => {ngx.shared.nums.set('timer', 1)}, 10);
+ }
+
+ function timer_exception() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10);
+ throw new Error("EXCEPTION");
+ }
+
+ function timeout_exception() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ setTimeout(() => {
+ var v = ngx.shared.nums.get('timeout_exception') || 0;
+
+ if (v == 0) {
+ ngx.shared.nums.set('timeout_exception', 1);
+ throw new Error("EXCEPTION");
+ return;
+ }
+
+ ngx.shared.nums.incr('timeout_exception', 1);
+ }, 1);
+ }
+
+ function test_fetch(r) {
+ r.return(200, ngx.shared.strings.get('fetch').startsWith('okok'));
+ }
+
+ function test_file(r) {
+ r.return(200,
+ fs.readFileSync(ngx.conf_prefix + 'file').toString() == 'abc');
+ }
+
+ function test_multiple_fetches(r) {
+ r.return(200, ngx.shared.strings.get('multiple_fetches')
+ .startsWith('ok\@foo'));
+ }
+
+ function test_tick(r) {
+ r.return(200, ngx.shared.nums.get('tick') >= 3);
+ }
+
+ function test_timer(r) {
+ r.return(200, ngx.shared.nums.get('timer') == 1);
+ }
+
+ function test_timeout_exception(r) {
+ r.return(200, ngx.shared.nums.get('timeout_exception') >= 2);
+ }
+
+ export default { fetch, fetch_exception, file, multiple_fetches, overrun,
+ test_fetch, test_file, test_multiple_fetches, test_tick,
+ test_timeout_exception, test_timer, tick, tick_exception,
+ timer, timer_exception, timeout_exception };
+EOF
+
+$t->try_run('no js_periodic')->plan(7);
+
+###############################################################################
+
+select undef, undef, undef, 0.1;
+
+like(http_get('/test_tick'), qr/true/, '3x tick test');
+like(http_get('/test_timer'), qr/true/, 'timer test');
+like(http_get('/test_file'), qr/true/, 'file test');
+like(http_get('/test_fetch'), qr/true/, 'periodic fetch test');
+like(http_get('/test_multiple_fetches'), qr/true/, 'multiple fetch test');
+
+like(http_get('/test_timeout_exception'), qr/true/, 'timeout exception test');
+
+$t->stop();
+
+unlike($t->read_file('error.log'), qr/\[error\].*should not be seen/,
+ 'check for not discadred events');
--- /dev/null
+#!/usr/bin/perl
+
+# (C) Dmitry Volyntsev
+# (C) Nginx, Inc.
+
+# Tests for stream njs module, js_periodic directive.
+
+###############################################################################
+
+use warnings;
+use strict;
+
+use Test::More;
+use Socket qw/ CRLF /;
+
+BEGIN { use FindBin; chdir($FindBin::Bin); }
+
+use lib 'lib';
+use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
+
+###############################################################################
+
+select STDERR; $| = 1;
+select STDOUT; $| = 1;
+
+my $t = Test::Nginx->new()->has(qw/http stream/)
+ ->write_file_expand('nginx.conf', <<'EOF');
+
+%%TEST_GLOBALS%%
+
+daemon off;
+
+events {
+}
+
+worker_shutdown_timeout 100ms;
+
+stream {
+ %%TEST_GLOBALS_STREAM%%
+
+ js_import test.js;
+
+ js_shared_dict_zone zone=nums:32k type=number;
+ js_shared_dict_zone zone=strings:32k;
+
+ server {
+ listen 127.0.0.1:8080;
+
+ js_periodic test.tick interval=30ms jitter=1ms;
+ js_periodic test.timer interval=1s;
+ js_periodic test.overrun interval=30ms;
+ js_periodic test.file interval=1s;
+ js_periodic test.fetch interval=40ms;
+ js_periodic test.multiple_fetches interval=1s;
+
+ js_periodic test.fetch_exception interval=1s;
+ js_periodic test.tick_exception interval=1s;
+ js_periodic test.timer_exception interval=1s;
+ js_periodic test.timeout_exception interval=30ms;
+
+ js_preread test.test;
+
+ proxy_pass 127.0.0.1:8090;
+ }
+}
+
+http {
+ %%TEST_GLOBALS_HTTP%%
+
+ server {
+ listen 127.0.0.1:8081;
+ server_name localhost;
+
+ location /fetch_ok {
+ return 200 'ok';
+ }
+
+ location /fetch_foo {
+ return 200 'foo';
+ }
+ }
+}
+
+EOF
+
+my $p1 = port(8081);
+
+$t->write_file('test.js', <<EOF);
+ import fs from 'fs';
+
+ async function fetch() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok');
+ let body = await reply.text();
+
+ let v = ngx.shared.strings.get('fetch') || '';
+ ngx.shared.strings.set('fetch', v + body);
+ }
+
+ async function fetch_exception() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ let reply = await ngx.fetch('garbage');
+ }
+
+ async function multiple_fetches() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok');
+ let reply2 = await ngx.fetch('http://127.0.0.1:$p1/fetch_foo');
+ let body = await reply.text();
+ let body2 = await reply2.text();
+
+ ngx.shared.strings.set('multiple_fetches', body + '\@' + body2);
+ }
+
+ async function file() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+');
+
+ await fh.write('abc');
+ await fh.close();
+ }
+
+ async function overrun() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ setTimeout(() => {}, 100000);
+ }
+
+ function tick() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ ngx.shared.nums.incr('tick', 1);
+ }
+
+ function tick_exception() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ throw new Error("EXCEPTION");
+ }
+
+ function timer() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ setTimeout(() => {ngx.shared.nums.set('timer', 1)}, 10);
+ }
+
+ function timer_exception() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10);
+ throw new Error("EXCEPTION");
+ }
+
+ function timeout_exception() {
+ if (ngx.worker_id != 0) {
+ return;
+ }
+
+ setTimeout(() => {
+ var v = ngx.shared.nums.get('timeout_exception') || 0;
+
+ if (v == 0) {
+ ngx.shared.nums.set('timeout_exception', 1);
+ throw new Error("EXCEPTION");
+ return;
+ }
+
+ ngx.shared.nums.incr('timeout_exception', 1);
+ }, 1);
+ }
+
+ function test(s) {
+ s.on('upload', function (data) {
+ if (data.length > 0) {
+ switch (data) {
+ case 'fetch':
+ if (ngx.shared.strings.get('fetch').startsWith('okok')) {
+ s.done();
+ return;
+ }
+
+ break;
+
+ case 'multiple_fetches':
+ if (ngx.shared.strings.get('multiple_fetches')
+ .startsWith('ok\@foo'))
+ {
+ s.done();
+ return;
+ }
+
+ break;
+
+ case 'file':
+ let file_data = fs.readFileSync(ngx.conf_prefix + 'file')
+ .toString();
+
+ if (file_data == 'abc') {
+ s.done();
+ return;
+ }
+
+ break;
+
+ case 'tick':
+ if (ngx.shared.nums.get('tick') >= 3) {
+ s.done();
+ return;
+ }
+
+ break;
+
+ case 'timeout_exception':
+ if (ngx.shared.nums.get('timeout_exception') >= 2) {
+ s.done();
+ return;
+ }
+
+ break;
+
+ case 'timer':
+ if (ngx.shared.nums.get('timer') == 1) {
+ s.done();
+ return;
+ }
+
+ break;
+
+ default:
+ throw new Error(`Unknown test "\${data}"`);
+ }
+
+ throw new Error(`Test "\${data}" failed`);
+ }
+ });
+ }
+
+ export default { fetch, fetch_exception, multiple_fetches, file, overrun,
+ test, tick, tick_exception, timer, timer_exception,
+ timeout_exception };
+EOF
+
+$t->run_daemon(\&stream_daemon, port(8090));
+$t->try_run('no js_periodic')->plan(7);
+$t->waitforsocket('127.0.0.1:' . port(8090));
+
+###############################################################################
+
+select undef, undef, undef, 0.1;
+
+is(stream('127.0.0.1:' . port(8080))->io('tick'), 'tick', '3x tick test');
+is(stream('127.0.0.1:' . port(8080))->io('timer'), 'timer', 'timer test');
+is(stream('127.0.0.1:' . port(8080))->io('file'), 'file', 'file test');
+is(stream('127.0.0.1:' . port(8080))->io('fetch'), 'fetch', 'fetch test');
+is(stream('127.0.0.1:' . port(8080))->io('multiple_fetches'),
+ 'multiple_fetches', 'muliple fetches test');
+is(stream('127.0.0.1:' . port(8080))->io('timeout_exception'),
+ 'timeout_exception', 'timeout exception test');
+
+$t->stop();
+
+unlike($t->read_file('error.log'), qr/\[error\].*should not be seen/,
+ 'check for not discadred events');
+
+###############################################################################
+
+sub stream_daemon {
+ my $server = IO::Socket::INET->new(
+ Proto => 'tcp',
+ LocalAddr => '127.0.0.1:' . port(8090),
+ Listen => 5,
+ Reuse => 1
+ )
+ or die "Can't create listening socket: $!\n";
+
+ local $SIG{PIPE} = 'IGNORE';
+
+ while (my $client = $server->accept()) {
+ $client->autoflush(1);
+
+ log2c("(new connection $client)");
+
+ $client->sysread(my $buffer, 65536) or next;
+
+ log2i("$client $buffer");
+
+ log2o("$client $buffer");
+
+ $client->syswrite($buffer);
+
+ close $client;
+ }
+}
+
+sub log2i { Test::Nginx::log_core('|| <<', @_); }
+sub log2o { Test::Nginx::log_core('|| >>', @_); }
+sub log2c { Test::Nginx::log_core('||', @_); }
+
+###############################################################################