aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/bin/pg_dump/parallel.c178
-rw-r--r--src/bin/pg_dump/parallel.h4
-rw-r--r--src/bin/pg_dump/pg_backup_utils.c32
3 files changed, 98 insertions, 116 deletions
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 91672949e69..f650d3fef51 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -77,8 +77,6 @@ static ShutdownInformation shutdown_info;
static const char *modulename = gettext_noop("parallel archiver");
static ParallelSlot *GetMyPSlot(ParallelState *pstate);
-static void parallel_msg_master(ParallelSlot *slot, const char *modulename,
- const char *fmt, va_list ap) pg_attribute_printf(3, 0);
static void archive_close_connection(int code, void *arg);
static void ShutdownWorkersHard(ParallelState *pstate);
static void WaitForTerminatingWorkers(ParallelState *pstate);
@@ -163,65 +161,6 @@ GetMyPSlot(ParallelState *pstate)
}
/*
- * Fail and die, with a message to stderr. Parameters as for write_msg.
- *
- * This is defined in parallel.c, because in parallel mode, things are more
- * complicated. If the worker process does exit_horribly(), we forward its
- * last words to the master process. The master process then does
- * exit_horribly() with this error message itself and prints it normally.
- * After printing the message, exit_horribly() on the master will shut down
- * the remaining worker processes.
- */
-void
-exit_horribly(const char *modulename, const char *fmt,...)
-{
- va_list ap;
- ParallelState *pstate = shutdown_info.pstate;
- ParallelSlot *slot;
-
- va_start(ap, fmt);
-
- if (pstate == NULL)
- {
- /* Not in parallel mode, just write to stderr */
- vwrite_msg(modulename, fmt, ap);
- }
- else
- {
- slot = GetMyPSlot(pstate);
-
- if (!slot)
- /* We're the parent, just write the message out */
- vwrite_msg(modulename, fmt, ap);
- else
- /* If we're a worker process, send the msg to the master process */
- parallel_msg_master(slot, modulename, fmt, ap);
- }
-
- va_end(ap);
-
- exit_nicely(1);
-}
-
-/* Sends the error message from the worker to the master process */
-static void
-parallel_msg_master(ParallelSlot *slot, const char *modulename,
- const char *fmt, va_list ap)
-{
- char buf[512];
- int pipefd[2];
-
- pipefd[PIPE_READ] = slot->pipeRevRead;
- pipefd[PIPE_WRITE] = slot->pipeRevWrite;
-
- strcpy(buf, "ERROR ");
- vsnprintf(buf + strlen("ERROR "),
- sizeof(buf) - strlen("ERROR "), fmt, ap);
-
- sendMessageToMaster(pipefd, buf);
-}
-
-/*
* A thread-local version of getLocalPQExpBuffer().
*
* Non-reentrant but reduces memory leakage. (On Windows the memory leakage
@@ -271,7 +210,7 @@ getThreadLocalPQExpBuffer(void)
/*
* pg_dump and pg_restore register the Archive pointer for the exit handler
- * (called from exit_horribly). This function mainly exists so that we can
+ * (called from exit_nicely). This function mainly exists so that we can
* keep shutdown_info in file scope only.
*/
void
@@ -282,8 +221,8 @@ on_exit_close_archive(Archive *AHX)
}
/*
- * This function can close archives in both the parallel and non-parallel
- * case.
+ * on_exit_nicely handler for shutting down database connections and
+ * worker processes cleanly.
*/
static void
archive_close_connection(int code, void *arg)
@@ -292,34 +231,53 @@ archive_close_connection(int code, void *arg)
if (si->pstate)
{
+ /* In parallel mode, must figure out who we are */
ParallelSlot *slot = GetMyPSlot(si->pstate);
if (!slot)
{
/*
- * We're the master: We have already printed out the message
- * passed to exit_horribly() either from the master itself or from
- * a worker process. Now we need to close our own database
- * connection (only open during parallel dump but not restore) and
- * shut down the remaining workers.
+ * We're the master. Close our own database connection, if any,
+ * and then forcibly shut down workers.
*/
- DisconnectDatabase(si->AHX);
+ if (si->AHX)
+ DisconnectDatabase(si->AHX);
+
#ifndef WIN32
/*
- * Setting aborting to true switches to best-effort-mode
- * (send/receive but ignore errors) in communicating with our
- * workers.
+ * Setting aborting to true shuts off error/warning messages that
+ * are no longer useful once we start killing workers.
*/
aborting = true;
#endif
ShutdownWorkersHard(si->pstate);
}
- else if (slot->args->AH)
- DisconnectDatabase(&(slot->args->AH->public));
+ else
+ {
+ /*
+ * We're a worker. Shut down our own DB connection if any. On
+ * Windows, we also have to close our communication sockets, to
+ * emulate what will happen on Unix when the worker process exits.
+ * (Without this, if this is a premature exit, the master would
+ * fail to detect it because there would be no EOF condition on
+ * the other end of the pipe.)
+ */
+ if (slot->args->AH)
+ DisconnectDatabase(&(slot->args->AH->public));
+
+#ifdef WIN32
+ closesocket(slot->pipeRevRead);
+ closesocket(slot->pipeRevWrite);
+#endif
+ }
+ }
+ else
+ {
+ /* Non-parallel operation: just kill the master DB connection */
+ if (si->AHX)
+ DisconnectDatabase(si->AHX);
}
- else if (si->AHX)
- DisconnectDatabase(si->AHX);
}
/*
@@ -327,7 +285,8 @@ archive_close_connection(int code, void *arg)
* threads to terminate as well (and not finish with their 70 GB table dump
* first...). Now in UNIX we can just kill these processes, and let the signal
* handler set wantAbort to 1. In Windows we set a termEvent and this serves
- * as the signal for everyone to terminate.
+ * as the signal for everyone to terminate. We don't print any error message,
+ * that would just clutter the screen.
*/
void
checkAborting(ArchiveHandle *AH)
@@ -337,7 +296,7 @@ checkAborting(ArchiveHandle *AH)
#else
if (wantAbort)
#endif
- exit_horribly(modulename, "worker is terminating\n");
+ exit_nicely(1);
}
/*
@@ -352,8 +311,6 @@ ShutdownWorkersHard(ParallelState *pstate)
#ifndef WIN32
int i;
- signal(SIGPIPE, SIG_IGN);
-
/*
* Close our write end of the sockets so that the workers know they can
* exit.
@@ -428,27 +385,21 @@ sigTermHandler(int signum)
#endif
/*
- * This function is called by both UNIX and Windows variants to set up a
- * worker process.
+ * This function is called by both UNIX and Windows variants to set up
+ * and run a worker process. Caller should exit the process (or thread)
+ * upon return.
*/
static void
SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker)
{
/*
* Call the setup worker function that's defined in the ArchiveHandle.
- *
- * We get the raw connection only for the reason that we can close it
- * properly when we shut down. This happens only that way when it is
- * brought down because of an error.
*/
(AH->SetupWorkerPtr) ((Archive *) AH);
Assert(AH->connection != NULL);
WaitForCommands(AH, pipefd);
-
- closesocket(pipefd[PIPE_READ]);
- closesocket(pipefd[PIPE_WRITE]);
}
#ifdef WIN32
@@ -534,14 +485,22 @@ ParallelBackupStart(ArchiveHandle *AH)
pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs));
pstate->parallelSlot[i].args->AH = NULL;
pstate->parallelSlot[i].args->te = NULL;
+
+ /* master's ends of the pipes */
+ pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
+ pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
+ /* child's ends of the pipes */
+ pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
+ pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
+
#ifdef WIN32
/* Allocate a new structure for every worker */
wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo));
wi->worker = i;
wi->AH = AH;
- wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ];
- wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE];
+ wi->pipeRead = pipeMW[PIPE_READ];
+ wi->pipeWrite = pipeWM[PIPE_WRITE];
handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
wi, 0, &(pstate->parallelSlot[i].threadId));
@@ -557,15 +516,6 @@ ParallelBackupStart(ArchiveHandle *AH)
pipefd[0] = pipeMW[PIPE_READ];
pipefd[1] = pipeWM[PIPE_WRITE];
- /*
- * Store the fds for the reverse communication in pstate. Actually
- * we only use this in case of an error and don't use pstate
- * otherwise in the worker process. On Windows we write to the
- * global pstate, in Unix we write to our process-local copy but
- * that's also where we'd retrieve this information back from.
- */
- pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ];
- pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE];
pstate->parallelSlot[i].pid = getpid();
/*
@@ -584,7 +534,7 @@ ParallelBackupStart(ArchiveHandle *AH)
/*
* Close all inherited fds for communication of the master with
- * the other workers.
+ * previously-forked workers.
*/
for (j = 0; j < i; j++)
{
@@ -612,11 +562,16 @@ ParallelBackupStart(ArchiveHandle *AH)
pstate->parallelSlot[i].pid = pid;
#endif
-
- pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ];
- pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE];
}
+ /*
+ * Having forked off the workers, disable SIGPIPE so that master isn't
+ * killed if it tries to send a command to a dead worker.
+ */
+#ifndef WIN32
+ signal(SIGPIPE, SIG_IGN);
+#endif
+
return pstate;
}
@@ -977,16 +932,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait)
}
else
exit_horribly(modulename,
- "invalid message received from worker: %s\n", msg);
- }
- else if (messageStartsWith(msg, "ERROR "))
- {
- Assert(AH->format == archDirectory || AH->format == archCustom);
- pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED;
- exit_horribly(modulename, "%s", msg + strlen("ERROR "));
+ "invalid message received from worker: \"%s\"\n",
+ msg);
}
else
- exit_horribly(modulename, "invalid message received from worker: %s\n", msg);
+ exit_horribly(modulename,
+ "invalid message received from worker: \"%s\"\n",
+ msg);
/* both Unix and Win32 return pg_malloc()ed space, so we free it */
free(msg);
diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h
index 591653bcf0c..8d704283e18 100644
--- a/src/bin/pg_dump/parallel.h
+++ b/src/bin/pg_dump/parallel.h
@@ -42,9 +42,9 @@ typedef struct ParallelSlot
ParallelArgs *args;
T_WorkerStatus workerStatus;
int status;
- int pipeRead;
+ int pipeRead; /* master's end of the pipes */
int pipeWrite;
- int pipeRevRead;
+ int pipeRevRead; /* child's end of the pipes */
int pipeRevWrite;
#ifdef WIN32
uintptr_t hThread;
diff --git a/src/bin/pg_dump/pg_backup_utils.c b/src/bin/pg_dump/pg_backup_utils.c
index 0aa13cda433..5d1d87565a2 100644
--- a/src/bin/pg_dump/pg_backup_utils.c
+++ b/src/bin/pg_dump/pg_backup_utils.c
@@ -93,6 +93,23 @@ vwrite_msg(const char *modulename, const char *fmt, va_list ap)
vfprintf(stderr, _(fmt), ap);
}
+/*
+ * Fail and die, with a message to stderr. Parameters as for write_msg.
+ *
+ * Note that on_exit_nicely callbacks will get run.
+ */
+void
+exit_horribly(const char *modulename, const char *fmt,...)
+{
+ va_list ap;
+
+ va_start(ap, fmt);
+ vwrite_msg(modulename, fmt, ap);
+ va_end(ap);
+
+ exit_nicely(1);
+}
+
/* Register a callback to be run when exit_nicely is invoked. */
void
on_exit_nicely(on_exit_nicely_callback function, void *arg)
@@ -106,7 +123,20 @@ on_exit_nicely(on_exit_nicely_callback function, void *arg)
/*
* Run accumulated on_exit_nicely callbacks in reverse order and then exit
- * quietly. This needs to be thread-safe.
+ * without printing any message.
+ *
+ * If running in a parallel worker thread on Windows, we only exit the thread,
+ * not the whole process.
+ *
+ * Note that in parallel operation on Windows, the callback(s) will be run
+ * by each thread since the list state is necessarily shared by all threads;
+ * each callback must contain logic to ensure it does only what's appropriate
+ * for its thread. On Unix, callbacks are also run by each process, but only
+ * for callbacks established before we fork off the child processes. (It'd
+ * be cleaner to reset the list after fork(), and let each child establish
+ * its own callbacks; but then the behavior would be completely inconsistent
+ * between Windows and Unix. For now, just be sure to establish callbacks
+ * before forking to avoid inconsistency.)
*/
void
exit_nicely(int code)