aboutsummaryrefslogtreecommitdiff
path: root/contrib/worker_spi/worker_spi.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/worker_spi/worker_spi.c')
-rw-r--r--contrib/worker_spi/worker_spi.c66
1 files changed, 50 insertions, 16 deletions
diff --git a/contrib/worker_spi/worker_spi.c b/contrib/worker_spi/worker_spi.c
index 414721a70fe..ef19e4b39eb 100644
--- a/contrib/worker_spi/worker_spi.c
+++ b/contrib/worker_spi/worker_spi.c
@@ -42,8 +42,11 @@
#include "tcop/utility.h"
PG_MODULE_MAGIC;
+PG_FUNCTION_INFO_V1(worker_spi_launch);
void _PG_init(void);
+void worker_spi_main(Datum);
+Datum worker_spi_launch(PG_FUNCTION_ARGS);
/* flags set by signal handlers */
static volatile sig_atomic_t got_sighup = false;
@@ -153,11 +156,22 @@ initialize_worker_spi(worktable *table)
pgstat_report_activity(STATE_IDLE, NULL);
}
-static void
-worker_spi_main(void *main_arg)
+void
+worker_spi_main(Datum main_arg)
{
- worktable *table = (worktable *) main_arg;
+ int index = DatumGetInt32(main_arg);
+ worktable *table;
StringInfoData buf;
+ char name[20];
+
+ table = palloc(sizeof(worktable));
+ sprintf(name, "schema%d", index);
+ table->schema = pstrdup(name);
+ table->name = pstrdup("counted");
+
+ /* Establish signal handlers before unblocking signals. */
+ pqsignal(SIGHUP, worker_spi_sighup);
+ pqsignal(SIGTERM, worker_spi_sigterm);
/* We're now ready to receive signals */
BackgroundWorkerUnblockSignals();
@@ -279,7 +293,7 @@ worker_spi_main(void *main_arg)
pgstat_report_activity(STATE_IDLE, NULL);
}
- proc_exit(0);
+ proc_exit(1);
}
/*
@@ -292,9 +306,7 @@ void
_PG_init(void)
{
BackgroundWorker worker;
- worktable *table;
unsigned int i;
- char name[20];
/* get the configuration */
DefineCustomIntVariable("worker_spi.naptime",
@@ -309,6 +321,10 @@ _PG_init(void)
NULL,
NULL,
NULL);
+
+ if (!process_shared_preload_libraries_in_progress)
+ return;
+
DefineCustomIntVariable("worker_spi.total_workers",
"Number of workers.",
NULL,
@@ -328,23 +344,41 @@ _PG_init(void)
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = BGW_NEVER_RESTART;
worker.bgw_main = worker_spi_main;
- worker.bgw_sighup = worker_spi_sighup;
- worker.bgw_sigterm = worker_spi_sigterm;
+ worker.bgw_sighup = NULL;
+ worker.bgw_sigterm = NULL;
/*
* Now fill in worker-specific data, and do the actual registrations.
*/
for (i = 1; i <= worker_spi_total_workers; i++)
{
- sprintf(name, "worker %d", i);
- worker.bgw_name = pstrdup(name);
-
- table = palloc(sizeof(worktable));
- sprintf(name, "schema%d", i);
- table->schema = pstrdup(name);
- table->name = pstrdup("counted");
- worker.bgw_main_arg = (void *) table;
+ snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
+ worker.bgw_main_arg = Int32GetDatum(i);
RegisterBackgroundWorker(&worker);
}
}
+
+/*
+ * Dynamically launch an SPI worker.
+ */
+Datum
+worker_spi_launch(PG_FUNCTION_ARGS)
+{
+ int32 i = PG_GETARG_INT32(0);
+ BackgroundWorker worker;
+
+ worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
+ BGWORKER_BACKEND_DATABASE_CONNECTION;
+ worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
+ worker.bgw_restart_time = BGW_NEVER_RESTART;
+ worker.bgw_main = NULL; /* new worker might not have library loaded */
+ sprintf(worker.bgw_library_name, "worker_spi");
+ sprintf(worker.bgw_function_name, "worker_spi_main");
+ worker.bgw_sighup = NULL; /* new worker might not have library loaded */
+ worker.bgw_sigterm = NULL; /* new worker might not have library loaded */
+ snprintf(worker.bgw_name, BGW_MAXLEN, "worker %d", i);
+ worker.bgw_main_arg = Int32GetDatum(i);
+
+ PG_RETURN_BOOL(RegisterDynamicBackgroundWorker(&worker));
+}