diff options
Diffstat (limited to 'contrib/worker_spi/worker_spi.c')
-rw-r--r-- | contrib/worker_spi/worker_spi.c | 66 |
1 files changed, 50 insertions, 16 deletions
diff --git a/contrib/worker_spi/worker_spi.c b/contrib/worker_spi/worker_spi.c index 414721a70fe..ef19e4b39eb 100644 --- a/contrib/worker_spi/worker_spi.c +++ b/contrib/worker_spi/worker_spi.c @@ -42,8 +42,11 @@ #include "tcop/utility.h" PG_MODULE_MAGIC; +PG_FUNCTION_INFO_V1(worker_spi_launch); void _PG_init(void); +void worker_spi_main(Datum); +Datum worker_spi_launch(PG_FUNCTION_ARGS); /* flags set by signal handlers */ static volatile sig_atomic_t got_sighup = false; @@ -153,11 +156,22 @@ initialize_worker_spi(worktable *table) pgstat_report_activity(STATE_IDLE, NULL); } -static void -worker_spi_main(void *main_arg) +void +worker_spi_main(Datum main_arg) { - worktable *table = (worktable *) main_arg; + int index = DatumGetInt32(main_arg); + worktable *table; StringInfoData buf; + char name[20]; + + table = palloc(sizeof(worktable)); + sprintf(name, "schema%d", index); + table->schema = pstrdup(name); + table->name = pstrdup("counted"); + + /* Establish signal handlers before unblocking signals. */ + pqsignal(SIGHUP, worker_spi_sighup); + pqsignal(SIGTERM, worker_spi_sigterm); /* We're now ready to receive signals */ BackgroundWorkerUnblockSignals(); @@ -279,7 +293,7 @@ worker_spi_main(void *main_arg) pgstat_report_activity(STATE_IDLE, NULL); } - proc_exit(0); + proc_exit(1); } /* @@ -292,9 +306,7 @@ void _PG_init(void) { BackgroundWorker worker; - worktable *table; unsigned int i; - char name[20]; /* get the configuration */ DefineCustomIntVariable("worker_spi.naptime", @@ -309,6 +321,10 @@ _PG_init(void) NULL, NULL, NULL); + + if (!process_shared_preload_libraries_in_progress) + return; + DefineCustomIntVariable("worker_spi.total_workers", "Number of workers.", NULL, @@ -328,23 +344,41 @@ _PG_init(void) worker.bgw_start_time = BgWorkerStart_RecoveryFinished; worker.bgw_restart_time = BGW_NEVER_RESTART; worker.bgw_main = worker_spi_main; - worker.bgw_sighup = worker_spi_sighup; - worker.bgw_sigterm = worker_spi_sigterm; + worker.bgw_sighup = NULL; + worker.bgw_sigterm = NULL; /* * Now fill in worker-specific data, and do the actual registrations. */ for (i = 1; i <= worker_spi_total_workers; i++) { - sprintf(name, "worker %d", i); - worker.bgw_name = pstrdup(name); - - table = palloc(sizeof(worktable)); - sprintf(name, "schema%d", i); - table->schema = pstrdup(name); - table->name = pstrdup("counted"); - worker.bgw_main_arg = (void *) table; + snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i); + worker.bgw_main_arg = Int32GetDatum(i); RegisterBackgroundWorker(&worker); } } + +/* + * Dynamically launch an SPI worker. + */ +Datum +worker_spi_launch(PG_FUNCTION_ARGS) +{ + int32 i = PG_GETARG_INT32(0); + BackgroundWorker worker; + + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | + BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = NULL; /* new worker might not have library loaded */ + sprintf(worker.bgw_library_name, "worker_spi"); + sprintf(worker.bgw_function_name, "worker_spi_main"); + worker.bgw_sighup = NULL; /* new worker might not have library loaded */ + worker.bgw_sigterm = NULL; /* new worker might not have library loaded */ + snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i); + worker.bgw_main_arg = Int32GetDatum(i); + + PG_RETURN_BOOL(RegisterDynamicBackgroundWorker(&worker)); +} |