typedef struct {
ngx_http_conf_ctx_t *conf_ctx;
ngx_connection_t *connection;
- void *padding;
+ uint8_t *worker_affinity;
/**
* fd is used for event debug and should be at the same position
periodics = jmcf->periodics->elts;
for (i = 0; i < jmcf->periodics->nelts; i++) {
+ if (periodics[i].worker_affinity != NULL
+ && !periodics[i].worker_affinity[ngx_worker])
+ {
+ continue;
+ }
+
+ if (periodics[i].worker_affinity == NULL && ngx_worker != 0) {
+ continue;
+ }
+
periodics[i].fd = 1000000 + i;
if (ngx_http_js_periodic_init(&periodics[i]) != NGX_OK) {
static char *
ngx_http_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
+ uint8_t *mask;
ngx_str_t *value, s;
ngx_msec_t interval, jitter;
ngx_uint_t i;
+ ngx_core_conf_t *ccf;
ngx_js_periodic_t *periodic;
ngx_js_main_conf_t *jmcf;
ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+ mask = NULL;
jitter = 0;
interval = 5000;
continue;
}
+ if (ngx_strncmp(value[i].data, "worker_affinity=", 16) == 0) {
+ s.len = value[i].len - 16;
+ s.data = value[i].data + 16;
+
+ ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
+ ngx_core_module);
+
+ if (ccf->worker_processes == NGX_CONF_UNSET) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "\"worker_affinity\" is not supported "
+ "with unset \"worker_processes\" directive");
+ return NGX_CONF_ERROR;
+ }
+
+ mask = ngx_palloc(cf->pool, ccf->worker_processes);
+ if (mask == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ if (ngx_strncmp(s.data, "all", 3) == 0) {
+ memset(mask, 1, ccf->worker_processes);
+ continue;
+ }
+
+ if ((size_t) ccf->worker_processes != s.len) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "the number of "
+ "\"worker_processes\" is not equal to the "
+ "size of \"worker_affinity\" mask");
+ return NGX_CONF_ERROR;
+ }
+
+ for (i = 0; i < s.len; i++) {
+ if (s.data[i] == '0') {
+ mask[i] = 0;
+ continue;
+ }
+
+ if (s.data[i] == '1') {
+ mask[i] = 1;
+ continue;
+ }
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid character \"%c\" in \"worker_affinity=\"",
+ s.data[i]);
+
+ return NGX_CONF_ERROR;
+ }
+
+ continue;
+ }
+
invalid:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
periodic->method = value[1];
periodic->interval = interval;
periodic->jitter = jitter;
+ periodic->worker_affinity = mask;
periodic->conf_ctx = cf->ctx;
return NGX_CONF_OK;
typedef struct {
ngx_stream_conf_ctx_t *conf_ctx;
ngx_connection_t *connection;
- void *padding;
+ uint8_t *worker_affinity;
/**
* fd is used for event debug and should be at the same position
periodics = jmcf->periodics->elts;
for (i = 0; i < jmcf->periodics->nelts; i++) {
+ if (periodics[i].worker_affinity != NULL
+ && !periodics[i].worker_affinity[ngx_worker])
+ {
+ continue;
+ }
+
+ if (periodics[i].worker_affinity == NULL && ngx_worker != 0) {
+ continue;
+ }
+
periodics[i].fd = 1000000 + i;
if (ngx_stream_js_periodic_init(&periodics[i]) != NGX_OK) {
static char *
ngx_stream_js_periodic(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
+ uint8_t *mask;
ngx_str_t *value, s;
ngx_msec_t interval, jitter;
ngx_uint_t i;
+ ngx_core_conf_t *ccf;
ngx_js_periodic_t *periodic;
ngx_js_main_conf_t *jmcf;
ngx_memzero(periodic, sizeof(ngx_js_periodic_t));
+ mask = NULL;
jitter = 0;
interval = 5000;
continue;
}
+ if (ngx_strncmp(value[i].data, "worker_affinity=", 16) == 0) {
+ s.len = value[i].len - 16;
+ s.data = value[i].data + 16;
+
+ ccf = (ngx_core_conf_t *) ngx_get_conf(cf->cycle->conf_ctx,
+ ngx_core_module);
+
+ if (ccf->worker_processes == NGX_CONF_UNSET) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "\"worker_affinity\" is not supported "
+ "with unset \"worker_processes\" directive");
+ return NGX_CONF_ERROR;
+ }
+
+ mask = ngx_palloc(cf->pool, ccf->worker_processes);
+ if (mask == NULL) {
+ return NGX_CONF_ERROR;
+ }
+
+ if (ngx_strncmp(s.data, "all", 3) == 0) {
+ memset(mask, 1, ccf->worker_processes);
+ continue;
+ }
+
+ if ((size_t) ccf->worker_processes != s.len) {
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "the number of "
+ "\"worker_processes\" is not equal to the "
+ "size of \"worker_affinity\" mask");
+ return NGX_CONF_ERROR;
+ }
+
+ for (i = 0; i < s.len; i++) {
+ if (s.data[i] == '0') {
+ mask[i] = 0;
+ continue;
+ }
+
+ if (s.data[i] == '1') {
+ mask[i] = 1;
+ continue;
+ }
+
+ ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
+ "invalid character \"%c\" in \"worker_affinity=\"",
+ s.data[i]);
+
+ return NGX_CONF_ERROR;
+ }
+
+ continue;
+ }
+
invalid:
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
periodic->method = value[1];
periodic->interval = interval;
periodic->jitter = jitter;
+ periodic->worker_affinity = mask;
periodic->conf_ctx = cf->ctx;
return NGX_CONF_OK;
select STDERR; $| = 1;
select STDOUT; $| = 1;
-my $t = Test::Nginx->new()->has(qw/http/)
+my $t = Test::Nginx->new()->has(qw/http rewrite/)
->write_file_expand('nginx.conf', <<'EOF');
%%TEST_GLOBALS%%
daemon off;
+worker_processes 4;
events {
}
js_shared_dict_zone zone=nums:32k type=number;
js_shared_dict_zone zone=strings:32k;
+ js_shared_dict_zone zone=workers:32k type=number;
server {
listen 127.0.0.1:8080;
location @periodic {
js_periodic test.tick interval=30ms jitter=1ms;
- js_periodic test.timer interval=1s;
+ js_periodic test.timer interval=1s worker_affinity=all;
js_periodic test.overrun interval=30ms;
js_periodic test.file interval=1s;
js_periodic test.fetch interval=40ms;
js_periodic test.multiple_fetches interval=1s;
+ js_periodic test.affinity interval=50ms worker_affinity=0101;
js_periodic test.fetch_exception interval=1s;
js_periodic test.tick_exception interval=1s;
return 200 'foo';
}
+ location /test_affinity {
+ js_content test.test_affinity;
+ }
+
location /test_fetch {
js_content test.test_fetch;
}
$t->write_file('test.js', <<EOF);
import fs from 'fs';
- async function fetch() {
- if (ngx.worker_id != 0) {
- return;
- }
+ function affinity() {
+ ngx.shared.workers.set(ngx.worker_id, 1);
+ }
+ async function fetch() {
let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok');
let body = await reply.text();
}
async function multiple_fetches() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let reply = await ngx.fetch('http://127.0.0.1:$p0/fetch_ok');
let reply2 = await ngx.fetch('http://127.0.0.1:$p0/fetch_foo');
let body = await reply.text();
}
async function fetch_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let reply = await ngx.fetch('garbage');
}
async function file() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+');
await fh.write('abc');
}
async function overrun() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {}, 100000);
}
function tick() {
- if (ngx.worker_id != 0) {
- return;
- }
-
ngx.shared.nums.incr('tick', 1);
}
function tick_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
throw new Error("EXCEPTION");
}
}
function timer_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10);
throw new Error("EXCEPTION");
}
function timeout_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {
var v = ngx.shared.nums.get('timeout_exception') || 0;
}, 1);
}
+ function test_affinity(r) {
+ r.return(200, `[\${ngx.shared.workers.keys().toSorted()}]`);
+ }
+
function test_fetch(r) {
r.return(200, ngx.shared.strings.get('fetch').startsWith('okok'));
}
r.return(200, ngx.shared.nums.get('timeout_exception') >= 2);
}
- export default { fetch, fetch_exception, file, multiple_fetches, overrun,
- test_fetch, test_file, test_multiple_fetches, test_tick,
- test_timeout_exception, test_timer, tick, tick_exception,
- timer, timer_exception, timeout_exception };
+ export default { affinity, fetch, fetch_exception, file, multiple_fetches,
+ overrun, test_affinity, test_fetch, test_file,
+ test_multiple_fetches, test_tick, test_timeout_exception,
+ test_timer, tick, tick_exception, timer, timer_exception,
+ timeout_exception };
EOF
-$t->try_run('no js_periodic')->plan(7);
+$t->try_run('no js_periodic')->plan(8);
###############################################################################
select undef, undef, undef, 0.1;
+like(http_get('/test_affinity'), qr/\[1,3]/, 'affinity test');
like(http_get('/test_tick'), qr/true/, '3x tick test');
like(http_get('/test_timer'), qr/true/, 'timer test');
like(http_get('/test_file'), qr/true/, 'file test');
select STDERR; $| = 1;
select STDOUT; $| = 1;
-my $t = Test::Nginx->new()->has(qw/http stream/)
+my $t = Test::Nginx->new()->has(qw/http rewrite stream/)
->write_file_expand('nginx.conf', <<'EOF');
%%TEST_GLOBALS%%
daemon off;
+worker_processes 4;
events {
}
js_shared_dict_zone zone=nums:32k type=number;
js_shared_dict_zone zone=strings:32k;
+ js_shared_dict_zone zone=workers:32k type=number;
server {
listen 127.0.0.1:8080;
js_periodic test.tick interval=30ms jitter=1ms;
- js_periodic test.timer interval=1s;
+ js_periodic test.timer interval=1s worker_affinity=all;
js_periodic test.overrun interval=30ms;
js_periodic test.file interval=1s;
js_periodic test.fetch interval=40ms;
js_periodic test.multiple_fetches interval=1s;
+ js_periodic test.affinity interval=50ms worker_affinity=0101;
js_periodic test.fetch_exception interval=1s;
js_periodic test.tick_exception interval=1s;
$t->write_file('test.js', <<EOF);
import fs from 'fs';
- async function fetch() {
- if (ngx.worker_id != 0) {
- return;
- }
+ function affinity() {
+ ngx.shared.workers.set(ngx.worker_id, 1);
+ }
+ async function fetch() {
let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok');
let body = await reply.text();
}
async function fetch_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let reply = await ngx.fetch('garbage');
}
async function multiple_fetches() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let reply = await ngx.fetch('http://127.0.0.1:$p1/fetch_ok');
let reply2 = await ngx.fetch('http://127.0.0.1:$p1/fetch_foo');
let body = await reply.text();
}
async function file() {
- if (ngx.worker_id != 0) {
- return;
- }
-
let fh = await fs.promises.open(ngx.conf_prefix + 'file', 'a+');
await fh.write('abc');
}
async function overrun() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {}, 100000);
}
function tick() {
- if (ngx.worker_id != 0) {
- return;
- }
-
ngx.shared.nums.incr('tick', 1);
}
function tick_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
throw new Error("EXCEPTION");
}
}
function timer_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {ngx.log(ngx.ERR, 'should not be seen')}, 10);
throw new Error("EXCEPTION");
}
function timeout_exception() {
- if (ngx.worker_id != 0) {
- return;
- }
-
setTimeout(() => {
var v = ngx.shared.nums.get('timeout_exception') || 0;
s.on('upload', function (data) {
if (data.length > 0) {
switch (data) {
+ case 'affinity':
+ if (ngx.shared.workers.keys().toSorted().toString()
+ == '1,3')
+ {
+ s.done();
+ return;
+ }
+
+ break;
case 'fetch':
if (ngx.shared.strings.get('fetch').startsWith('okok')) {
s.done();
});
}
- export default { fetch, fetch_exception, multiple_fetches, file, overrun,
- test, tick, tick_exception, timer, timer_exception,
- timeout_exception };
+ export default { affinity, fetch, fetch_exception, multiple_fetches, file,
+ overrun, test, tick, tick_exception, timer,
+ timer_exception, timeout_exception };
EOF
$t->run_daemon(\&stream_daemon, port(8090));
-$t->try_run('no js_periodic')->plan(7);
+$t->try_run('no js_periodic')->plan(8);
$t->waitforsocket('127.0.0.1:' . port(8090));
###############################################################################
select undef, undef, undef, 0.1;
+is(stream('127.0.0.1:' . port(8080))->io('affinity'), 'affinity',
+ 'affinity test');
is(stream('127.0.0.1:' . port(8080))->io('tick'), 'tick', '3x tick test');
is(stream('127.0.0.1:' . port(8080))->io('timer'), 'timer', 'timer test');
is(stream('127.0.0.1:' . port(8080))->io('file'), 'file', 'file test');