]> git.kaiwu.me - njs.git/commitdiff
Modules: introduced js_periodic directive.
authorDmitry Volyntsev <xeioex@nginx.com>
Tue, 22 Aug 2023 18:13:09 +0000 (11:13 -0700)
committerDmitry Volyntsev <xeioex@nginx.com>
Tue, 22 Aug 2023 18:13:09 +0000 (11:13 -0700)
The directive specifies a JS handler to run at regular intervals. The JS
handler will be executed in each worker process. The handler receives no
arguments. It has access to ngx and other global objects.

example.conf:
    location @periodics {
        # Specifies a JS handler to be run at 1 minute intervals
        js_periodic main.handler interval=60s jitter=5s;

        resolver 10.0.0.1;
        js_fetch_trusted_certificate /path/to/ISRG_Root_X1.pem;
    }

example.js:
    async function handler() {
        if (ngx.worker_id != 0) {
            /* using ngx.worker_id to run handler only in one worker. */
            return;
        }

        let reply = async ngx.fetch('https://nginx.org/en/docs/njs/');
        let body = async reply.text();

        ngx.log(ngx.INFO, body);
    }

This closes #660 issue on Github.

nginx/ngx_http_js_module.c
nginx/ngx_js.h
nginx/ngx_stream_js_module.c
nginx/t/js_periodic.t [new file with mode: 0644]
nginx/t/stream_js_periodic.t [new file with mode: 0644]

index a4426f02965249f7a565c86bf01b93754f61955b..a1d54bdb37031dde2ba837367c03aabdc8c87708 100644 (file)
@@ -22,6 +22,27 @@ typedef struct {
 } ngx_http_js_loc_conf_t;
 
 
+typedef struct {
+    ngx_http_conf_ctx_t   *conf_ctx;
+    ngx_connection_t      *connection;
+    void                  *padding;
+
+    /**
+     * fd is used for event debug and should be at the same position
+     * as in ngx_connection_t: after a 3rd pointer.
+     */
+    ngx_socket_t           fd;
+
+    ngx_str_t              method;
+    ngx_msec_t             interval;
+    ngx_msec_t             jitter;
+
+    ngx_log_t              log;
+    ngx_http_log_ctx_t     log_ctx;
+    ngx_event_t            event;
+} ngx_js_periodic_t;
+
+
 #define NJS_HEADER_SEMICOLON   0x1
 #define NJS_HEADER_SINGLE      0x2
 #define NJS_HEADER_ARRAY       0x4
@@ -45,6 +66,8 @@ typedef struct {
     ngx_chain_t          **last_out;
     ngx_chain_t           *free;
     ngx_chain_t           *busy;
+
+    ngx_js_periodic_t     *periodic;
 } ngx_http_js_ctx_t;
 
 
@@ -88,7 +111,8 @@ static ngx_int_t ngx_http_js_variable_set(ngx_http_request_t *r,
     ngx_http_variable_value_t *v, uintptr_t data);
 static ngx_int_t ngx_http_js_variable_var(ngx_http_request_t *r,
     ngx_http_variable_value_t *v, uintptr_t data);
-static ngx_int_t ngx_http_js_init_vm(ngx_http_request_t *r);
+static ngx_int_t ngx_http_js_init_vm(ngx_http_request_t *r,
+    unsigned inject_request);
 static void ngx_http_js_cleanup_ctx(void *data);
 
 static njs_int_t ngx_http_js_ext_keys_header(njs_vm_t *vm, njs_value_t *value,
@@ -256,8 +280,18 @@ static size_t ngx_http_js_max_response_buffer_size(njs_vm_t *vm,
 static void ngx_http_js_handle_vm_event(ngx_http_request_t *r,
     njs_vm_event_t vm_event, njs_value_t *args, njs_uint_t nargs);
 
+static void ngx_http_js_periodic_handler(ngx_event_t *ev);
+static void ngx_http_js_periodic_write_event_handler(ngx_http_request_t *r);
+static void ngx_http_js_periodic_shutdown_handler(ngx_event_t *ev);
+static void ngx_http_js_periodic_finalize(ngx_http_request_t *r, ngx_int_t rc);
+static void ngx_http_js_periodic_destroy(ngx_http_request_t *r,
+    ngx_js_periodic_t *periodic);
+
 static njs_int_t ngx_js_http_init(njs_vm_t *vm);
 static ngx_int_t ngx_http_js_init(ngx_conf_t *cf);
+static ngx_int_t ngx_http_js_init_worker(ngx_cycle_t *cycle);
+static char *ngx_http_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd,
+    void *conf);
 static char *ngx_http_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
 static char *ngx_http_js_var(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
 static char *ngx_http_js_content(ngx_conf_t *cf, ngx_command_t *cmd,
@@ -297,6 +331,13 @@ static ngx_command_t  ngx_http_js_commands[] = {
       0,
       NULL },
 
+    { ngx_string("js_periodic"),
+      NGX_HTTP_LOC_CONF|NGX_CONF_ANY,
+      ngx_http_js_periodic,
+      NGX_HTTP_LOC_CONF_OFFSET,
+      0,
+      NULL },
+
     { ngx_string("js_preload_object"),
       NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_CONF_TAKE13,
       ngx_js_preload_object,
@@ -439,7 +480,7 @@ ngx_module_t  ngx_http_js_module = {
     NGX_HTTP_MODULE,               /* module type */
     NULL,                          /* init master */
     NULL,                          /* init module */
-    NULL,                          /* init process */
+    ngx_http_js_init_worker,       /* init process */
     NULL,                          /* init thread */
     NULL,                          /* exit thread */
     NULL,                          /* exit process */
@@ -863,7 +904,7 @@ ngx_http_js_content_event_handler(ngx_http_request_t *r)
     ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                    "http js content event handler");
 
-    rc = ngx_http_js_init_vm(r);
+    rc = ngx_http_js_init_vm(r, 1);
 
     if (rc == NGX_ERROR || rc == NGX_DECLINED) {
         ngx_http_finalize_request(r, NGX_HTTP_INTERNAL_SERVER_ERROR);
@@ -999,7 +1040,7 @@ ngx_http_js_header_filter(ngx_http_request_t *r)
         return ngx_http_next_header_filter(r);
     }
 
-    rc = ngx_http_js_init_vm(r);
+    rc = ngx_http_js_init_vm(r, 1);
 
     if (rc == NGX_ERROR || rc == NGX_DECLINED) {
         return NGX_ERROR;
@@ -1051,7 +1092,7 @@ ngx_http_js_body_filter(ngx_http_request_t *r, ngx_chain_t *in)
         return ngx_http_next_body_filter(r, in);
     }
 
-    rc = ngx_http_js_init_vm(r);
+    rc = ngx_http_js_init_vm(r, 1);
 
     if (rc == NGX_ERROR || rc == NGX_DECLINED) {
         return NGX_ERROR;
@@ -1165,7 +1206,7 @@ ngx_http_js_variable_set(ngx_http_request_t *r, ngx_http_variable_value_t *v,
     ngx_str_t           value;
     ngx_http_js_ctx_t  *ctx;
 
-    rc = ngx_http_js_init_vm(r);
+    rc = ngx_http_js_init_vm(r, 1);
 
     if (rc == NGX_ERROR) {
         return NGX_ERROR;
@@ -1239,7 +1280,7 @@ ngx_http_js_variable_var(ngx_http_request_t *r, ngx_http_variable_value_t *v,
 
 
 static ngx_int_t
-ngx_http_js_init_vm(ngx_http_request_t *r)
+ngx_http_js_init_vm(ngx_http_request_t *r, unsigned inject_request)
 {
     njs_int_t                rc;
     ngx_str_t                exception;
@@ -1318,10 +1359,12 @@ ngx_http_js_init_vm(ngx_http_request_t *r)
         return NGX_ERROR;
     }
 
-    rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->request),
-                                ngx_http_js_request_proto_id, r, 0);
-    if (rc != NJS_OK) {
-        return NGX_ERROR;
+    if (inject_request) {
+        rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->request),
+                                    ngx_http_js_request_proto_id, r, 0);
+        if (rc != NJS_OK) {
+            return NGX_ERROR;
+        }
     }
 
     return NGX_OK;
@@ -4048,6 +4091,221 @@ ngx_http_js_location(njs_vm_t *vm, ngx_http_request_t *r, unsigned flags,
 }
 
 
+static void
+ngx_http_js_periodic_handler(ngx_event_t *ev)
+{
+    ngx_int_t               rc;
+    ngx_msec_t              timer;
+    ngx_connection_t       *c;
+    ngx_js_periodic_t      *periodic;
+    ngx_http_js_ctx_t      *ctx;
+    ngx_http_request_t     *r;
+    ngx_http_connection_t   hc;
+
+    periodic = ev->data;
+
+    timer = periodic->interval;
+
+    if (periodic->jitter) {
+        timer += (ngx_msec_t) ngx_random() % periodic->jitter;
+    }
+
+    ngx_add_timer(&periodic->event, timer);
+
+    c = periodic->connection;
+
+    if (c != NULL) {
+        ngx_log_error(NGX_LOG_ERR, c->log, 0,
+                      "http js periodic \"%V\" is already running, killing "
+                      "previous instance", &periodic->method);
+
+        ngx_http_js_periodic_finalize(c->data, NGX_ERROR);
+    }
+
+    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, &periodic->log, 0,
+                   "http js periodic handler: \"%V\"", &periodic->method);
+
+    c = ngx_get_connection(0, &periodic->log);
+
+    if (c == NULL) {
+        return;
+    }
+
+    ngx_memzero(&hc, sizeof(ngx_http_connection_t));
+
+    hc.conf_ctx = periodic->conf_ctx;
+
+    c->data = &hc;
+
+    r = ngx_http_create_request(c);
+
+    if (r == NULL) {
+        ngx_free_connection(c);
+        c->fd = (ngx_socket_t) -1;
+        return;
+    }
+
+    c->data = r;
+    c->destroyed = 0;
+    c->pool = r->pool;
+    c->read->handler = ngx_http_js_periodic_shutdown_handler;
+
+    periodic->connection = c;
+    periodic->log_ctx.request = r;
+    periodic->log_ctx.connection = c;
+
+    r->method = NGX_HTTP_GET;
+    r->method_name = ngx_http_core_get_method;
+
+    ngx_str_set(&r->uri, "/");
+    r->unparsed_uri = r->uri;
+    r->valid_unparsed_uri = 1;
+
+    r->health_check = 1;
+    r->write_event_handler = ngx_http_js_periodic_write_event_handler;
+
+    rc = ngx_http_js_init_vm(r, 0);
+
+    if (rc != NGX_OK) {
+        ngx_http_js_periodic_destroy(r, periodic);
+        return;
+    }
+
+    ctx = ngx_http_get_module_ctx(r, ngx_http_js_module);
+
+    ctx->periodic = periodic;
+
+    r->count++;
+
+    rc = ngx_js_invoke(ctx->vm, &periodic->method, &periodic->log, NULL, 0,
+                       &ctx->retval);
+
+    if (rc == NGX_AGAIN) {
+        rc = NGX_OK;
+    }
+
+    r->count--;
+
+    ngx_http_js_periodic_finalize(r, rc);
+}
+
+
+static void
+ngx_http_js_periodic_write_event_handler(ngx_http_request_t *r)
+{
+    ngx_http_js_ctx_t  *ctx;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                   "http js periodic write event handler");
+
+    ctx = ngx_http_get_module_ctx(r, ngx_http_js_module);
+
+    if (!njs_vm_pending(ctx->vm)) {
+        ngx_http_js_periodic_finalize(r, NGX_OK);
+        return;
+    }
+}
+
+
+static void
+ngx_http_js_periodic_shutdown_handler(ngx_event_t *ev)
+{
+    ngx_connection_t  *c;
+
+    c = ev->data;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
+                   "http js periodic shutdown handler");
+
+    if (c->close) {
+        ngx_http_js_periodic_finalize(c->data, NGX_ERROR);
+        return;
+    }
+
+    ngx_log_error(NGX_LOG_ERR, c->log, 0, "http js periodic shutdown handler "
+                  "while not closing");
+}
+
+
+static void
+ngx_http_js_periodic_finalize(ngx_http_request_t *r, ngx_int_t rc)
+{
+    ngx_http_js_ctx_t  *ctx;
+
+    ctx = ngx_http_get_module_ctx(r, ngx_http_js_module);
+
+    ngx_log_debug4(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+                   "http js periodic finalize: \"%V\" rc: %i c: %i pending: %i",
+                   &ctx->periodic->method, rc, r->count,
+                   njs_vm_pending(ctx->vm));
+
+    if (r->count > 1 || (rc == NGX_OK && njs_vm_pending(ctx->vm))) {
+        return;
+    }
+
+    ngx_http_js_periodic_destroy(r, ctx->periodic);
+}
+
+
+static void
+ngx_http_js_periodic_destroy(ngx_http_request_t *r, ngx_js_periodic_t *periodic)
+{
+    ngx_connection_t    *c;
+    ngx_http_cleanup_t  *cln;
+
+    c = r->connection;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
+                   "http js periodic destroy: \"%V\"",
+                   &periodic->method);
+
+    periodic->connection = NULL;
+
+    for (cln = r->cleanup; cln; cln = cln->next) {
+        if (cln->handler) {
+            cln->handler(cln->data);
+        }
+    }
+
+    ngx_free_connection(c);
+
+    c->fd = (ngx_socket_t) -1;
+    c->pool = NULL;
+    c->destroyed = 1;
+
+    ngx_destroy_pool(r->pool);
+}
+
+
+static ngx_int_t
+ngx_http_js_periodic_init(ngx_js_periodic_t *periodic)
+{
+    ngx_log_t                 *log;
+    ngx_msec_t                 jitter;
+    ngx_http_core_loc_conf_t  *clcf;
+
+    clcf = ngx_http_get_module_loc_conf(periodic->conf_ctx,
+                                        ngx_http_core_module);
+    log = clcf->error_log;
+
+    ngx_memcpy(&periodic->log, log, sizeof(ngx_log_t));
+
+    periodic->log.data = &periodic->log_ctx;
+    periodic->connection = NULL;
+
+    periodic->event.handler = ngx_http_js_periodic_handler;
+    periodic->event.data = periodic;
+    periodic->event.log = log;
+    periodic->event.cancelable = 1;
+
+    jitter = periodic->jitter ? (ngx_msec_t) ngx_random() % periodic->jitter
+                              : 0;
+    ngx_add_timer(&periodic->event, jitter + 1);
+
+    return NGX_OK;
+}
+
+
 static njs_host_event_t
 ngx_http_js_set_timer(njs_external_ptr_t external, uint64_t delay,
     njs_vm_event_t vm_event)
@@ -4193,6 +4451,11 @@ ngx_http_js_handle_vm_event(ngx_http_request_t *r, njs_vm_event_t vm_event,
         ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                       "js exception: %V", &exception);
 
+        if (r->health_check) {
+            ngx_http_js_periodic_finalize(r, NGX_ERROR);
+            return;
+        }
+
         ngx_http_finalize_request(r, NGX_ERROR);
         return;
     }
@@ -4255,6 +4518,119 @@ ngx_http_js_init(ngx_conf_t *cf)
 }
 
 
+static ngx_int_t
+ngx_http_js_init_worker(ngx_cycle_t *cycle)
+{
+    ngx_uint_t           i;
+    ngx_js_periodic_t   *periodics;
+    ngx_js_main_conf_t  *jmcf;
+
+    if ((ngx_process != NGX_PROCESS_WORKER)
+        && ngx_process != NGX_PROCESS_SINGLE)
+    {
+        return NGX_OK;
+    }
+
+    jmcf = ngx_http_cycle_get_module_main_conf(cycle, ngx_http_js_module);
+
+    if (jmcf == NULL || jmcf->periodics == NULL) {
+        return NGX_OK;
+    }
+
+    periodics = jmcf->periodics->elts;
+
+    for (i = 0; i < jmcf->periodics->nelts; i++) {
+        periodics[i].fd = 1000000 + i;
+
+        if (ngx_http_js_periodic_init(&periodics[i]) != NGX_OK) {
+            return NGX_ERROR;
+        }
+    }
+
+    return NGX_OK;
+}
+
+
+static char *
+ngx_http_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+    ngx_str_t           *value, s;
+    ngx_msec_t           interval, jitter;
+    ngx_uint_t           i;
+    ngx_js_periodic_t   *periodic;
+    ngx_js_main_conf_t  *jmcf;
+
+    if (cf->args->nelts < 2) {
+        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "method name is required");
+        return NGX_CONF_ERROR;
+    }
+
+    jmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_js_module);
+
+    if (jmcf->periodics == NULL) {
+        jmcf->periodics = ngx_array_create(cf->pool, 1,
+                                           sizeof(ngx_js_periodic_t));
+        if (jmcf->periodics == NULL) {
+            return NGX_CONF_ERROR;
+        }
+    }
+
+    periodic = ngx_array_push(jmcf->periodics);
+    if (periodic == NULL) {
+        return NGX_CONF_ERROR;
+    }
+
+    ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+
+    jitter = 0;
+    interval = 5000;
+
+    value = cf->args->elts;
+
+    for (i = 2; i < cf->args->nelts; i++) {
+
+        if (ngx_strncmp(value[i].data, "interval=", 9) == 0) {
+            s.len = value[i].len - 9;
+            s.data = value[i].data + 9;
+
+            interval = ngx_parse_time(&s, 0);
+
+            if (interval == (ngx_msec_t) NGX_ERROR || interval == 0) {
+                goto invalid;
+            }
+
+            continue;
+        }
+
+        if (ngx_strncmp(value[i].data, "jitter=", 7) == 0) {
+            s.len = value[i].len - 7;
+            s.data = value[i].data + 7;
+
+            jitter = ngx_parse_time(&s, 0);
+
+            if (jitter == (ngx_msec_t) NGX_ERROR) {
+                goto invalid;
+            }
+
+            continue;
+        }
+
+invalid:
+
+        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+                           "invalid parameter \"%V\"", &value[i]);
+        return NGX_CONF_ERROR;
+    }
+
+    periodic->method = value[1];
+    periodic->interval = interval;
+    periodic->jitter = jitter;
+    periodic->conf_ctx = cf->ctx;
+
+    return NGX_CONF_OK;
+}
+
+
 static char *
 ngx_http_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
 {
@@ -4429,6 +4805,7 @@ ngx_http_js_create_main_conf(ngx_conf_t *cf)
      * set by ngx_pcalloc():
      *
      *     jmcf->dicts = NULL;
+     *     jmcf->periodics = NULL;
      */
 
     return jmcf;
index 0febe83647d6f561a1b411e182b8902f89a91108..f50b7fb31952ae4b4f0c49a8712ecb0b31362528 100644 (file)
@@ -56,7 +56,8 @@ typedef struct {
 
 
 #define NGX_JS_COMMON_MAIN_CONF                                               \
-    ngx_js_dict_t         *dicts                                              \
+    ngx_js_dict_t         *dicts;                                             \
+    ngx_array_t           *periodics                                          \
 
 
 #define _NGX_JS_COMMON_LOC_CONF                                               \
index b9bba7d6f2f35aca5ed00b6b69be91a4d8fe1448..ab66de9df57a9b05c587eec65d714796157d0a81 100644 (file)
@@ -27,6 +27,26 @@ typedef struct {
 } ngx_stream_js_ev_t;
 
 
+typedef struct {
+    ngx_stream_conf_ctx_t  *conf_ctx;
+    ngx_connection_t       *connection;
+    void                   *padding;
+
+    /**
+     * fd is used for event debug and should be at the same position
+     * as in ngx_connection_t: after a 3rd pointer.
+     */
+    ngx_socket_t            fd;
+
+    ngx_str_t               method;
+    ngx_msec_t              interval;
+    ngx_msec_t              jitter;
+
+    ngx_log_t               log;
+    ngx_event_t             event;
+} ngx_js_periodic_t;
+
+
 typedef struct {
     njs_vm_t               *vm;
     njs_opaque_value_t      retval;
@@ -43,6 +63,7 @@ typedef struct {
     ngx_stream_js_ev_t      events[2];
     unsigned                filter:1;
     unsigned                in_progress:1;
+    ngx_js_periodic_t      *periodic;
 } ngx_stream_js_ctx_t;
 
 
@@ -66,7 +87,8 @@ static ngx_int_t ngx_stream_js_variable_set(ngx_stream_session_t *s,
     ngx_stream_variable_value_t *v, uintptr_t data);
 static ngx_int_t ngx_stream_js_variable_var(ngx_stream_session_t *s,
     ngx_stream_variable_value_t *v, uintptr_t data);
-static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s);
+static ngx_int_t ngx_stream_js_init_vm(ngx_stream_session_t *s,
+    unsigned inject_session);
 static void ngx_stream_js_drop_events(ngx_stream_js_ctx_t *ctx);
 static void ngx_stream_js_cleanup(void *data);
 static njs_int_t ngx_stream_js_run_event(ngx_stream_session_t *s,
@@ -114,8 +136,18 @@ static size_t ngx_stream_js_max_response_buffer_size(njs_vm_t *vm,
 static void ngx_stream_js_handle_event(ngx_stream_session_t *s,
     njs_vm_event_t vm_event, njs_value_t *args, njs_uint_t nargs);
 
+static void ngx_stream_js_periodic_handler(ngx_event_t *ev);
+static void ngx_stream_js_periodic_event_handler(ngx_event_t *ev);
+static void ngx_stream_js_periodic_finalize(ngx_stream_session_t *s,
+    ngx_int_t rc);
+static void ngx_stream_js_periodic_destroy(ngx_stream_session_t *s,
+    ngx_js_periodic_t *periodic);
+
 static njs_int_t ngx_js_stream_init(njs_vm_t *vm);
 static ngx_int_t ngx_stream_js_init(ngx_conf_t *cf);
+static ngx_int_t ngx_stream_js_init_worker(ngx_cycle_t *cycle);
+static char *ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd,
+    void *conf);
 static char *ngx_stream_js_set(ngx_conf_t *cf, ngx_command_t *cmd,
     void *conf);
 static char *ngx_stream_js_var(ngx_conf_t *cf, ngx_command_t *cmd,
@@ -154,6 +186,13 @@ static ngx_command_t  ngx_stream_js_commands[] = {
       0,
       NULL },
 
+    { ngx_string("js_periodic"),
+      NGX_STREAM_SRV_CONF|NGX_CONF_ANY,
+      ngx_stream_js_periodic,
+      NGX_STREAM_SRV_CONF_OFFSET,
+      0,
+      NULL },
+
     { ngx_string("js_preload_object"),
       NGX_STREAM_MAIN_CONF|NGX_STREAM_SRV_CONF|NGX_CONF_TAKE13,
       ngx_js_preload_object,
@@ -293,7 +332,7 @@ ngx_module_t  ngx_stream_js_module = {
     NGX_STREAM_MODULE,              /* module type */
     NULL,                           /* init master */
     NULL,                           /* init module */
-    NULL,                           /* init process */
+    ngx_stream_js_init_worker,      /* init process */
     NULL,                           /* init thread */
     NULL,                           /* exit thread */
     NULL,                           /* exit process */
@@ -647,7 +686,7 @@ ngx_stream_js_phase_handler(ngx_stream_session_t *s, ngx_str_t *name)
         return NGX_DECLINED;
     }
 
-    rc = ngx_stream_js_init_vm(s);
+    rc = ngx_stream_js_init_vm(s, 1);
     if (rc != NGX_OK) {
         return rc;
     }
@@ -728,7 +767,7 @@ ngx_stream_js_body_filter(ngx_stream_session_t *s, ngx_chain_t *in,
     ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "stream js filter u:%ui",
                    from_upstream);
 
-    rc = ngx_stream_js_init_vm(s);
+    rc = ngx_stream_js_init_vm(s, 1);
 
     if (rc == NGX_ERROR) {
         return NGX_ERROR;
@@ -836,7 +875,7 @@ ngx_stream_js_variable_set(ngx_stream_session_t *s,
     ngx_str_t             value;
     ngx_stream_js_ctx_t  *ctx;
 
-    rc = ngx_stream_js_init_vm(s);
+    rc = ngx_stream_js_init_vm(s, 1);
 
     if (rc == NGX_ERROR) {
         return NGX_ERROR;
@@ -910,7 +949,7 @@ ngx_stream_js_variable_var(ngx_stream_session_t *s,
 
 
 static ngx_int_t
-ngx_stream_js_init_vm(ngx_stream_session_t *s)
+ngx_stream_js_init_vm(ngx_stream_session_t *s, unsigned inject_session)
 {
     njs_int_t                  rc;
     njs_str_t                  key;
@@ -987,10 +1026,12 @@ ngx_stream_js_init_vm(ngx_stream_session_t *s)
         return NGX_ERROR;
     }
 
-    rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]),
-                                ngx_stream_js_session_proto_id, s, 0);
-    if (rc != NJS_OK) {
-        return NGX_ERROR;
+    if (inject_session) {
+        rc = njs_vm_external_create(ctx->vm, njs_value_arg(&ctx->args[0]),
+                                    ngx_stream_js_session_proto_id, s, 0);
+        if (rc != NJS_OK) {
+            return NGX_ERROR;
+        }
     }
 
     return NGX_OK;
@@ -1695,12 +1736,21 @@ ngx_stream_js_handle_event(ngx_stream_session_t *s, njs_vm_event_t vm_event,
 
     rc = njs_vm_run(ctx->vm);
 
+    ngx_log_debug2(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+                   "stream js post event handler rc: %i event: %p",
+                   (ngx_int_t) rc, vm_event);
+
     if (rc == NJS_ERROR) {
         ngx_js_retval(ctx->vm, NULL, &exception);
 
         ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
                       "js exception: %V", &exception);
 
+        if (s->health_check) {
+            ngx_stream_js_periodic_finalize(s, NGX_ERROR);
+            return;
+        }
+
         ngx_stream_finalize_session(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
     }
 
@@ -1754,6 +1804,337 @@ ngx_stream_js_init_conf_vm(ngx_conf_t *cf, ngx_js_loc_conf_t *conf)
 }
 
 
+static void
+ngx_stream_js_periodic_handler(ngx_event_t *ev)
+{
+    ngx_int_t                     rc;
+    ngx_msec_t                    timer;
+    ngx_js_periodic_t            *periodic;
+    ngx_connection_t             *c;
+    ngx_stream_js_ctx_t          *ctx;
+    ngx_stream_session_t         *s;
+    ngx_stream_core_main_conf_t  *cmcf;
+
+    periodic = ev->data;
+
+    timer = periodic->interval;
+
+    if (periodic->jitter) {
+        timer += (ngx_msec_t) ngx_random() % periodic->jitter;
+    }
+
+    ngx_add_timer(&periodic->event, timer);
+
+    c = periodic->connection;
+
+    if (c != NULL) {
+        ngx_log_error(NGX_LOG_ERR, c->log, 0,
+                      "js periodic \"%V\" is already running, killing previous "
+                      "instance", &periodic->method);
+
+        ngx_stream_js_periodic_finalize(c->data, NGX_ERROR);
+    }
+
+    ngx_log_debug1(NGX_LOG_DEBUG_STREAM, &periodic->log, 0,
+                   "stream js periodic handler: \"%V\"", &periodic->method);
+
+    c = ngx_get_connection(0, &periodic->log);
+
+    if (c == NULL) {
+        return;
+    }
+
+    c->pool = ngx_create_pool(1024, c->log);
+    if (c->pool == NULL) {
+        goto free_connection;
+    }
+
+    s = ngx_pcalloc(c->pool, sizeof(ngx_stream_session_t));
+    if (s == NULL) {
+        goto free_pool;
+    }
+
+    s->main_conf = periodic->conf_ctx->main_conf;
+    s->srv_conf = periodic->conf_ctx->srv_conf;
+
+    s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_stream_max_module);
+    if (s->ctx == NULL) {
+        goto free_pool;
+    }
+
+    cmcf = ngx_stream_get_module_main_conf(s, ngx_stream_core_module);
+
+    s->variables = ngx_pcalloc(c->pool, cmcf->variables.nelts
+                                        * sizeof(ngx_stream_variable_value_t));
+    if (s->variables == NULL) {
+        goto free_pool;
+    }
+
+    c->data = s;
+    c->destroyed = 0;
+    c->read->log = &periodic->log;
+    c->read->handler = ngx_stream_js_periodic_event_handler;
+
+    s->received = 1;
+    s->connection = c;
+    s->signature = NGX_STREAM_MODULE;
+
+    s->health_check = 1;
+
+    rc = ngx_stream_js_init_vm(s, 0);
+
+    if (rc != NGX_OK) {
+        ngx_stream_js_periodic_destroy(s, periodic);
+        return;
+    }
+
+    periodic->connection = c;
+
+    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+    ctx->periodic = periodic;
+
+    s->received++;
+
+    rc = ngx_js_invoke(ctx->vm, &periodic->method, &periodic->log, NULL, 0,
+                       &ctx->retval);
+
+    if (rc == NGX_AGAIN) {
+        rc = NGX_OK;
+    }
+
+    s->received--;
+
+    ngx_stream_js_periodic_finalize(s, rc);
+
+    return;
+
+free_pool:
+
+    ngx_destroy_pool(c->pool);
+
+free_connection:
+
+    ngx_close_connection(c);
+}
+
+
+static void
+ngx_stream_js_periodic_event_handler(ngx_event_t *ev)
+{
+    ngx_connection_t      *c;
+    ngx_stream_js_ctx_t   *ctx;
+    ngx_stream_session_t  *s;
+
+    c = ev->data;
+
+    ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                   "stream js periodic event handler");
+
+    if (c->close) {
+        ngx_stream_js_periodic_finalize(c->data, NGX_ERROR);
+        return;
+    }
+
+    s = c->data;
+
+    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+    if (!njs_vm_pending(ctx->vm)) {
+        ngx_stream_js_periodic_finalize(s, NGX_OK);
+        return;
+    }
+}
+
+
+static void
+ngx_stream_js_periodic_finalize(ngx_stream_session_t *s, ngx_int_t rc)
+{
+    ngx_stream_js_ctx_t  *ctx;
+
+    ctx = ngx_stream_get_module_ctx(s, ngx_stream_js_module);
+
+    ngx_log_debug4(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
+                   "stream js periodic finalize: \"%V\" rc: %i c: %i "
+                   "pending: %i", &ctx->periodic->method, rc, s->received,
+                   njs_vm_pending(ctx->vm));
+
+    if (s->received > 1 || (rc == NGX_OK && njs_vm_pending(ctx->vm))) {
+        return;
+    }
+
+    ngx_stream_js_periodic_destroy(s, ctx->periodic);
+}
+
+
+static void
+ngx_stream_js_periodic_destroy(ngx_stream_session_t *s,
+    ngx_js_periodic_t *periodic)
+{
+    ngx_connection_t  *c;
+
+    c = s->connection;
+
+    ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+                   "stream js periodic request destroy: \"%V\"",
+                   &periodic->method);
+
+    periodic->connection = NULL;
+
+    ngx_free_connection(c);
+
+    ngx_destroy_pool(c->pool);
+
+    c->fd = (ngx_socket_t) -1;
+    c->pool = NULL;
+    c->destroyed = 1;
+
+    if (c->read->posted) {
+        ngx_delete_posted_event(c->read);
+    }
+}
+
+
+static ngx_int_t
+ngx_stream_js_periodic_init(ngx_js_periodic_t *periodic)
+{
+    ngx_log_t                   *log;
+    ngx_msec_t                   jitter;
+    ngx_stream_core_srv_conf_t  *cscf;
+
+    cscf = ngx_stream_get_module_srv_conf(periodic->conf_ctx,
+                                          ngx_stream_core_module);
+    log = cscf->error_log;
+
+    ngx_memcpy(&periodic->log, log, sizeof(ngx_log_t));
+
+    periodic->connection = NULL;
+
+    periodic->event.handler = ngx_stream_js_periodic_handler;
+    periodic->event.data = periodic;
+    periodic->event.log = log;
+    periodic->event.cancelable = 1;
+
+    jitter = periodic->jitter ? (ngx_msec_t) ngx_random() % periodic->jitter
+                              : 0;
+    ngx_add_timer(&periodic->event, jitter + 1);
+
+    return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_js_init_worker(ngx_cycle_t *cycle)
+{
+    ngx_uint_t           i;
+    ngx_js_periodic_t   *periodics;
+    ngx_js_main_conf_t  *jmcf;
+
+    if ((ngx_process != NGX_PROCESS_WORKER)
+        && ngx_process != NGX_PROCESS_SINGLE)
+    {
+        return NGX_OK;
+    }
+
+    jmcf = ngx_stream_cycle_get_module_main_conf(cycle, ngx_stream_js_module);
+
+    if (jmcf == NULL || jmcf->periodics == NULL) {
+        return NGX_OK;
+    }
+
+    periodics = jmcf->periodics->elts;
+
+    for (i = 0; i < jmcf->periodics->nelts; i++) {
+        periodics[i].fd = 1000000 + i;
+
+        if (ngx_stream_js_periodic_init(&periodics[i]) != NGX_OK) {
+            return NGX_ERROR;
+        }
+    }
+
+    return NGX_OK;
+}
+
+
+static char *
+ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+    ngx_str_t           *value, s;
+    ngx_msec_t           interval, jitter;
+    ngx_uint_t           i;
+    ngx_js_periodic_t   *periodic;
+    ngx_js_main_conf_t  *jmcf;
+
+    if (cf->args->nelts < 2) {
+        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "method name is required");
+        return NGX_CONF_ERROR;
+    }
+
+    jmcf = ngx_stream_conf_get_module_main_conf(cf, ngx_stream_js_module);
+
+    if (jmcf->periodics == NULL) {
+        jmcf->periodics = ngx_array_create(cf->pool, 1,
+                                           sizeof(ngx_js_periodic_t));
+        if (jmcf->periodics == NULL) {
+            return NGX_CONF_ERROR;
+        }
+    }
+
+    periodic = ngx_array_push(jmcf->periodics);
+    if (periodic == NULL) {
+        return NGX_CONF_ERROR;
+    }
+
+    ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+
+    jitter = 0;
+    interval = 5000;
+
+    value = cf->args->elts;
+
+    for (i = 2; i < cf->args->nelts; i++) {
+
+        if (ngx_strncmp(value[i].data, "interval=", 9) == 0) {
+            s.len = value[i].len - 9;
+            s.data = value[i].data + 9;
+
+            interval = ngx_parse_time(&s, 0);
+
+            if (interval == (ngx_msec_t) NGX_ERROR || interval == 0) {
+                goto invalid;
+            }
+
+            continue;
+        }
+
+        if (ngx_strncmp(value[i].data, "jitter=", 7) == 0) {
+            s.len = value[i].len - 7;
+            s.data = value[i].data + 7;
+
+            jitter = ngx_parse_time(&s, 0);
+
+            if (jitter == (ngx_msec_t) NGX_ERROR) {
+                goto invalid;
+            }
+
+            continue;
+        }
+
+invalid:
+
+        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+                           "invalid parameter \"%V\"", &value[i]);
+        return NGX_CONF_ERROR;
+    }
+
+    periodic->method = value[1];
+    periodic->interval = interval;
+    periodic->jitter = jitter;
+    periodic->conf_ctx = cf->ctx;
+
+    return NGX_CONF_OK;
+}
+
 static char *
 ngx_stream_js_set(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
 {
@@ -1860,6 +2241,7 @@ ngx_stream_js_create_main_conf(ngx_conf_t *cf)
      * set by ngx_pcalloc():
      *
      *     jmcf->dicts = NULL;
+     *     jmcf->periodics = NULL;
      */
 
     return jmcf;
diff --git a/nginx/t/js_periodic.t b/nginx/t/js_periodic.t
new file mode 100644 (file)
index 0000000..a8ee860
--- /dev/null
@@ -0,0 +1,258 @@
+#!/usr/bin/perl
+
+# (C) Dmitry Volyntsev
+# (C) Nginx, Inc.
+
+# Tests for js_periodic directive.
+
+###############################################################################
+
+use warnings;
+use strict;
+
+use Test::More;
+use Socket qw/ CRLF /;
+
+BEGIN { use FindBin; chdir($FindBin::Bin); }
+
+use lib 'lib';
+use Test::Nginx;
+
+###############################################################################
+
+select STDERR; $| = 1;
+select STDOUT; $| = 1;
+
+my $t = Test::Nginx->new()->has(qw/http/)
+       ->write_file_expand('nginx.conf', <<'EOF');
+
+%%TEST_GLOBALS%%
+
+daemon off;
+
+events {
+}
+
+worker_shutdown_timeout 100ms;
+
+http {
+    %%TEST_GLOBALS_HTTP%%
+
+    js_import test.js;
+
+    js_shared_dict_zone zone=nums:32k type=number;
+    js_shared_dict_zone zone=strings:32k;
+
+    server {
+        listen       127.0.0.1:8080;
+        server_name  localhost;
+
+        location @periodic {
+            js_periodic test.tick interval=30ms jitter=1ms;
+            js_periodic test.timer interval=1s;
+            js_periodic test.overrun interval=30ms;
+            js_periodic test.file interval=1s;
+            js_periodic test.fetch interval=40ms;
+            js_periodic test.multiple_fetches interval=1s;
+
+            js_periodic test.fetch_exception interval=1s;
+            js_periodic test.tick_exception interval=1s;
+            js_periodic test.timer_exception interval=1s;
+            js_periodic test.timeout_exception interval=30ms;
+        }
+
+        location /fetch_ok {
+            return 200 'ok';
+        }
+
+        location /fetch_foo {
+            return 200 'foo';
+        }
+
+        location /test_fetch {
+            js_content test.test_fetch;
+        }
+
+        location /test_file {
+            js_content test.test_file;
+        }
+
+        location /test_multiple_fetches {
+            js_content test.test_multiple_fetches;
+        }
+
+        location /test_tick {
+            js_content test.test_tick;
+        }
+
+        location /test_timer {
+            js_content test.test_timer;
+        }
+
+        location /test_timeout_exception {
+            js_content test.test_timeout_exception;
+        }
+    }
+}
+
+EOF
+
+my $p0 = port(8080);
+
+$t->write_file('test.js', <<EOF);
+    import fs from 'fs';
+
+    async function fetch() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok');
+        let body = await reply.text();
+
+        let v = ngx.shared.strings.get('fetch') || '';
+        ngx.shared.strings.set('fetch', v + body);
+    }
+
+    async function multiple_fetches() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok');
+        let reply2 = await ngx.fetch('http://127.0.0.1:$p0/fetch_foo');
+        let body = await reply.text();
+        let body2 = await reply2.text();
+
+        ngx.shared.strings.set('multiple_fetches', body + '\@' + body2);
+    }
+
+    async function fetch_exception() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        let reply = await ngx.fetch('garbage');
+     }
+
+    async function file() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+');
+
+        await fh.write('abc');
+        await fh.close();
+    }
+
+    async function overrun() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        setTimeout(() => {}, 100000);
+    }
+
+
+    function tick() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        ngx.shared.nums.incr('tick', 1);
+    }
+
+    function tick_exception() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        throw new Error("EXCEPTION");
+    }
+
+    function timer() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        setTimeout(() => {ngx.shared.nums.set('timer', 1)}, 10);
+    }
+
+    function timer_exception() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10);
+        throw new Error("EXCEPTION");
+    }
+
+    function timeout_exception() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        setTimeout(() => {
+            var v = ngx.shared.nums.get('timeout_exception') || 0;
+
+            if (v == 0) {
+                ngx.shared.nums.set('timeout_exception', 1);
+                throw new Error("EXCEPTION");
+                return;
+            }
+
+            ngx.shared.nums.incr('timeout_exception', 1);
+        }, 1);
+    }
+
+    function test_fetch(r) {
+        r.return(200, ngx.shared.strings.get('fetch').startsWith('okok'));
+    }
+
+    function test_file(r) {
+        r.return(200,
+             fs.readFileSync(ngx.conf_prefix + 'file').toString()  == 'abc');
+    }
+
+    function test_multiple_fetches(r) {
+        r.return(200, ngx.shared.strings.get('multiple_fetches')
+                                                        .startsWith('ok\@foo'));
+    }
+
+    function test_tick(r) {
+        r.return(200, ngx.shared.nums.get('tick') >= 3);
+    }
+
+    function test_timer(r) {
+        r.return(200, ngx.shared.nums.get('timer') == 1);
+    }
+
+    function test_timeout_exception(r) {
+        r.return(200, ngx.shared.nums.get('timeout_exception') >= 2);
+    }
+
+    export default { fetch, fetch_exception, file, multiple_fetches, overrun,
+                     test_fetch, test_file, test_multiple_fetches, test_tick,
+                     test_timeout_exception, test_timer, tick, tick_exception,
+                     timer, timer_exception, timeout_exception };
+EOF
+
+$t->try_run('no js_periodic')->plan(7);
+
+###############################################################################
+
+select undef, undef, undef, 0.1;
+
+like(http_get('/test_tick'), qr/true/, '3x tick test');
+like(http_get('/test_timer'), qr/true/, 'timer test');
+like(http_get('/test_file'), qr/true/, 'file test');
+like(http_get('/test_fetch'), qr/true/, 'periodic fetch test');
+like(http_get('/test_multiple_fetches'), qr/true/, 'multiple fetch test');
+
+like(http_get('/test_timeout_exception'), qr/true/, 'timeout exception test');
+
+$t->stop();
+
+unlike($t->read_file('error.log'), qr/\[error\].*should not be seen/,
+       'check for not discadred events');
diff --git a/nginx/t/stream_js_periodic.t b/nginx/t/stream_js_periodic.t
new file mode 100644 (file)
index 0000000..0a367a2
--- /dev/null
@@ -0,0 +1,322 @@
+#!/usr/bin/perl
+
+# (C) Dmitry Volyntsev
+# (C) Nginx, Inc.
+
+# Tests for stream njs module, js_periodic directive.
+
+###############################################################################
+
+use warnings;
+use strict;
+
+use Test::More;
+use Socket qw/ CRLF /;
+
+BEGIN { use FindBin; chdir($FindBin::Bin); }
+
+use lib 'lib';
+use Test::Nginx;
+use Test::Nginx::Stream qw/ stream /;
+
+###############################################################################
+
+select STDERR; $| = 1;
+select STDOUT; $| = 1;
+
+my $t = Test::Nginx->new()->has(qw/http stream/)
+       ->write_file_expand('nginx.conf', <<'EOF');
+
+%%TEST_GLOBALS%%
+
+daemon off;
+
+events {
+}
+
+worker_shutdown_timeout 100ms;
+
+stream {
+    %%TEST_GLOBALS_STREAM%%
+
+    js_import test.js;
+
+    js_shared_dict_zone zone=nums:32k type=number;
+    js_shared_dict_zone zone=strings:32k;
+
+    server {
+        listen       127.0.0.1:8080;
+
+        js_periodic test.tick interval=30ms jitter=1ms;
+        js_periodic test.timer interval=1s;
+        js_periodic test.overrun interval=30ms;
+        js_periodic test.file interval=1s;
+        js_periodic test.fetch interval=40ms;
+        js_periodic test.multiple_fetches interval=1s;
+
+        js_periodic test.fetch_exception interval=1s;
+        js_periodic test.tick_exception interval=1s;
+        js_periodic test.timer_exception interval=1s;
+        js_periodic test.timeout_exception interval=30ms;
+
+        js_preread test.test;
+
+        proxy_pass 127.0.0.1:8090;
+    }
+}
+
+http {
+    %%TEST_GLOBALS_HTTP%%
+
+    server {
+        listen       127.0.0.1:8081;
+        server_name  localhost;
+
+        location /fetch_ok {
+            return 200 'ok';
+        }
+
+        location /fetch_foo {
+            return 200 'foo';
+        }
+    }
+}
+
+EOF
+
+my $p1 = port(8081);
+
+$t->write_file('test.js', <<EOF);
+    import fs from 'fs';
+
+    async function fetch() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok');
+        let body = await reply.text();
+
+        let v = ngx.shared.strings.get('fetch') || '';
+        ngx.shared.strings.set('fetch', v + body);
+    }
+
+    async function fetch_exception() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        let reply = await ngx.fetch('garbage');
+     }
+
+    async function multiple_fetches() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok');
+        let reply2 = await ngx.fetch('http://127.0.0.1:$p1/fetch_foo');
+        let body = await reply.text();
+        let body2 = await reply2.text();
+
+        ngx.shared.strings.set('multiple_fetches', body + '\@' + body2);
+    }
+
+    async function file() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+');
+
+        await fh.write('abc');
+        await fh.close();
+    }
+
+    async function overrun() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        setTimeout(() => {}, 100000);
+    }
+
+    function tick() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        ngx.shared.nums.incr('tick', 1);
+    }
+
+    function tick_exception() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        throw new Error("EXCEPTION");
+    }
+
+    function timer() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        setTimeout(() => {ngx.shared.nums.set('timer', 1)}, 10);
+    }
+
+    function timer_exception() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10);
+        throw new Error("EXCEPTION");
+    }
+
+    function timeout_exception() {
+        if (ngx.worker_id != 0) {
+            return;
+        }
+
+        setTimeout(() => {
+            var v = ngx.shared.nums.get('timeout_exception') || 0;
+
+            if (v == 0) {
+                ngx.shared.nums.set('timeout_exception', 1);
+                throw new Error("EXCEPTION");
+                return;
+            }
+
+            ngx.shared.nums.incr('timeout_exception', 1);
+        }, 1);
+    }
+
+    function test(s) {
+        s.on('upload', function (data) {
+            if (data.length > 0) {
+                switch (data) {
+                case 'fetch':
+                    if (ngx.shared.strings.get('fetch').startsWith('okok')) {
+                        s.done();
+                        return;
+                    }
+
+                    break;
+
+                case 'multiple_fetches':
+                    if (ngx.shared.strings.get('multiple_fetches')
+                        .startsWith('ok\@foo'))
+                    {
+                        s.done();
+                        return;
+                    }
+
+                    break;
+
+                case 'file':
+                    let file_data = fs.readFileSync(ngx.conf_prefix + 'file')
+                                                                    .toString();
+
+                    if (file_data == 'abc') {
+                        s.done();
+                        return;
+                    }
+
+                    break;
+
+                case 'tick':
+                    if (ngx.shared.nums.get('tick') >= 3) {
+                        s.done();
+                        return;
+                    }
+
+                    break;
+
+                case 'timeout_exception':
+                    if (ngx.shared.nums.get('timeout_exception') >= 2) {
+                        s.done();
+                        return;
+                    }
+
+                    break;
+
+                case 'timer':
+                    if (ngx.shared.nums.get('timer') == 1) {
+                        s.done();
+                        return;
+                    }
+
+                    break;
+
+                default:
+                    throw new Error(`Unknown test "\${data}"`);
+                }
+
+                throw new Error(`Test "\${data}" failed`);
+            }
+        });
+    }
+
+    export default { fetch, fetch_exception, multiple_fetches, file, overrun,
+                     test, tick, tick_exception, timer, timer_exception,
+                     timeout_exception };
+EOF
+
+$t->run_daemon(\&stream_daemon, port(8090));
+$t->try_run('no js_periodic')->plan(7);
+$t->waitforsocket('127.0.0.1:' . port(8090));
+
+###############################################################################
+
+select undef, undef, undef, 0.1;
+
+is(stream('127.0.0.1:' . port(8080))->io('tick'), 'tick', '3x tick test');
+is(stream('127.0.0.1:' . port(8080))->io('timer'), 'timer', 'timer test');
+is(stream('127.0.0.1:' . port(8080))->io('file'), 'file', 'file test');
+is(stream('127.0.0.1:' . port(8080))->io('fetch'), 'fetch', 'fetch test');
+is(stream('127.0.0.1:' . port(8080))->io('multiple_fetches'),
+       'multiple_fetches', 'muliple fetches test');
+is(stream('127.0.0.1:' . port(8080))->io('timeout_exception'),
+       'timeout_exception', 'timeout exception test');
+
+$t->stop();
+
+unlike($t->read_file('error.log'), qr/\[error\].*should not be seen/,
+       'check for not discadred events');
+
+###############################################################################
+
+sub stream_daemon {
+       my $server = IO::Socket::INET->new(
+               Proto => 'tcp',
+               LocalAddr => '127.0.0.1:' . port(8090),
+               Listen => 5,
+               Reuse => 1
+       )
+               or die "Can't create listening socket: $!\n";
+
+       local $SIG{PIPE} = 'IGNORE';
+
+       while (my $client = $server->accept()) {
+               $client->autoflush(1);
+
+               log2c("(new connection $client)");
+
+               $client->sysread(my $buffer, 65536) or next;
+
+               log2i("$client $buffer");
+
+               log2o("$client $buffer");
+
+               $client->syswrite($buffer);
+
+               close $client;
+       }
+}
+
+sub log2i { Test::Nginx::log_core('|| <<', @_); }
+sub log2o { Test::Nginx::log_core('|| >>', @_); }
+sub log2c { Test::Nginx::log_core('||', @_); }
+
+###############################################################################