diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/pg_dump/parallel.c | 178 | ||||
-rw-r--r-- | src/bin/pg_dump/parallel.h | 4 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_backup_utils.c | 32 |
3 files changed, 98 insertions, 116 deletions
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 91672949e69..f650d3fef51 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -77,8 +77,6 @@ static ShutdownInformation shutdown_info; static const char *modulename = gettext_noop("parallel archiver"); static ParallelSlot *GetMyPSlot(ParallelState *pstate); -static void parallel_msg_master(ParallelSlot *slot, const char *modulename, - const char *fmt, va_list ap) pg_attribute_printf(3, 0); static void archive_close_connection(int code, void *arg); static void ShutdownWorkersHard(ParallelState *pstate); static void WaitForTerminatingWorkers(ParallelState *pstate); @@ -163,65 +161,6 @@ GetMyPSlot(ParallelState *pstate) } /* - * Fail and die, with a message to stderr. Parameters as for write_msg. - * - * This is defined in parallel.c, because in parallel mode, things are more - * complicated. If the worker process does exit_horribly(), we forward its - * last words to the master process. The master process then does - * exit_horribly() with this error message itself and prints it normally. - * After printing the message, exit_horribly() on the master will shut down - * the remaining worker processes. - */ -void -exit_horribly(const char *modulename, const char *fmt,...) -{ - va_list ap; - ParallelState *pstate = shutdown_info.pstate; - ParallelSlot *slot; - - va_start(ap, fmt); - - if (pstate == NULL) - { - /* Not in parallel mode, just write to stderr */ - vwrite_msg(modulename, fmt, ap); - } - else - { - slot = GetMyPSlot(pstate); - - if (!slot) - /* We're the parent, just write the message out */ - vwrite_msg(modulename, fmt, ap); - else - /* If we're a worker process, send the msg to the master process */ - parallel_msg_master(slot, modulename, fmt, ap); - } - - va_end(ap); - - exit_nicely(1); -} - -/* Sends the error message from the worker to the master process */ -static void -parallel_msg_master(ParallelSlot *slot, const char *modulename, - const char *fmt, va_list ap) -{ - char buf[512]; - int pipefd[2]; - - pipefd[PIPE_READ] = slot->pipeRevRead; - pipefd[PIPE_WRITE] = slot->pipeRevWrite; - - strcpy(buf, "ERROR "); - vsnprintf(buf + strlen("ERROR "), - sizeof(buf) - strlen("ERROR "), fmt, ap); - - sendMessageToMaster(pipefd, buf); -} - -/* * A thread-local version of getLocalPQExpBuffer(). * * Non-reentrant but reduces memory leakage. (On Windows the memory leakage @@ -271,7 +210,7 @@ getThreadLocalPQExpBuffer(void) /* * pg_dump and pg_restore register the Archive pointer for the exit handler - * (called from exit_horribly). This function mainly exists so that we can + * (called from exit_nicely). This function mainly exists so that we can * keep shutdown_info in file scope only. */ void @@ -282,8 +221,8 @@ on_exit_close_archive(Archive *AHX) } /* - * This function can close archives in both the parallel and non-parallel - * case. + * on_exit_nicely handler for shutting down database connections and + * worker processes cleanly. */ static void archive_close_connection(int code, void *arg) @@ -292,34 +231,53 @@ archive_close_connection(int code, void *arg) if (si->pstate) { + /* In parallel mode, must figure out who we are */ ParallelSlot *slot = GetMyPSlot(si->pstate); if (!slot) { /* - * We're the master: We have already printed out the message - * passed to exit_horribly() either from the master itself or from - * a worker process. Now we need to close our own database - * connection (only open during parallel dump but not restore) and - * shut down the remaining workers. + * We're the master. Close our own database connection, if any, + * and then forcibly shut down workers. */ - DisconnectDatabase(si->AHX); + if (si->AHX) + DisconnectDatabase(si->AHX); + #ifndef WIN32 /* - * Setting aborting to true switches to best-effort-mode - * (send/receive but ignore errors) in communicating with our - * workers. + * Setting aborting to true shuts off error/warning messages that + * are no longer useful once we start killing workers. */ aborting = true; #endif ShutdownWorkersHard(si->pstate); } - else if (slot->args->AH) - DisconnectDatabase(&(slot->args->AH->public)); + else + { + /* + * We're a worker. Shut down our own DB connection if any. On + * Windows, we also have to close our communication sockets, to + * emulate what will happen on Unix when the worker process exits. + * (Without this, if this is a premature exit, the master would + * fail to detect it because there would be no EOF condition on + * the other end of the pipe.) + */ + if (slot->args->AH) + DisconnectDatabase(&(slot->args->AH->public)); + +#ifdef WIN32 + closesocket(slot->pipeRevRead); + closesocket(slot->pipeRevWrite); +#endif + } + } + else + { + /* Non-parallel operation: just kill the master DB connection */ + if (si->AHX) + DisconnectDatabase(si->AHX); } - else if (si->AHX) - DisconnectDatabase(si->AHX); } /* @@ -327,7 +285,8 @@ archive_close_connection(int code, void *arg) * threads to terminate as well (and not finish with their 70 GB table dump * first...). Now in UNIX we can just kill these processes, and let the signal * handler set wantAbort to 1. In Windows we set a termEvent and this serves - * as the signal for everyone to terminate. + * as the signal for everyone to terminate. We don't print any error message, + * that would just clutter the screen. */ void checkAborting(ArchiveHandle *AH) @@ -337,7 +296,7 @@ checkAborting(ArchiveHandle *AH) #else if (wantAbort) #endif - exit_horribly(modulename, "worker is terminating\n"); + exit_nicely(1); } /* @@ -352,8 +311,6 @@ ShutdownWorkersHard(ParallelState *pstate) #ifndef WIN32 int i; - signal(SIGPIPE, SIG_IGN); - /* * Close our write end of the sockets so that the workers know they can * exit. @@ -428,27 +385,21 @@ sigTermHandler(int signum) #endif /* - * This function is called by both UNIX and Windows variants to set up a - * worker process. + * This function is called by both UNIX and Windows variants to set up + * and run a worker process. Caller should exit the process (or thread) + * upon return. */ static void SetupWorker(ArchiveHandle *AH, int pipefd[2], int worker) { /* * Call the setup worker function that's defined in the ArchiveHandle. - * - * We get the raw connection only for the reason that we can close it - * properly when we shut down. This happens only that way when it is - * brought down because of an error. */ (AH->SetupWorkerPtr) ((Archive *) AH); Assert(AH->connection != NULL); WaitForCommands(AH, pipefd); - - closesocket(pipefd[PIPE_READ]); - closesocket(pipefd[PIPE_WRITE]); } #ifdef WIN32 @@ -534,14 +485,22 @@ ParallelBackupStart(ArchiveHandle *AH) pstate->parallelSlot[i].args = (ParallelArgs *) pg_malloc(sizeof(ParallelArgs)); pstate->parallelSlot[i].args->AH = NULL; pstate->parallelSlot[i].args->te = NULL; + + /* master's ends of the pipes */ + pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ]; + pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE]; + /* child's ends of the pipes */ + pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ]; + pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE]; + #ifdef WIN32 /* Allocate a new structure for every worker */ wi = (WorkerInfo *) pg_malloc(sizeof(WorkerInfo)); wi->worker = i; wi->AH = AH; - wi->pipeRead = pstate->parallelSlot[i].pipeRevRead = pipeMW[PIPE_READ]; - wi->pipeWrite = pstate->parallelSlot[i].pipeRevWrite = pipeWM[PIPE_WRITE]; + wi->pipeRead = pipeMW[PIPE_READ]; + wi->pipeWrite = pipeWM[PIPE_WRITE]; handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32, wi, 0, &(pstate->parallelSlot[i].threadId)); @@ -557,15 +516,6 @@ ParallelBackupStart(ArchiveHandle *AH) pipefd[0] = pipeMW[PIPE_READ]; pipefd[1] = pipeWM[PIPE_WRITE]; - /* - * Store the fds for the reverse communication in pstate. Actually - * we only use this in case of an error and don't use pstate - * otherwise in the worker process. On Windows we write to the - * global pstate, in Unix we write to our process-local copy but - * that's also where we'd retrieve this information back from. - */ - pstate->parallelSlot[i].pipeRevRead = pipefd[PIPE_READ]; - pstate->parallelSlot[i].pipeRevWrite = pipefd[PIPE_WRITE]; pstate->parallelSlot[i].pid = getpid(); /* @@ -584,7 +534,7 @@ ParallelBackupStart(ArchiveHandle *AH) /* * Close all inherited fds for communication of the master with - * the other workers. + * previously-forked workers. */ for (j = 0; j < i; j++) { @@ -612,11 +562,16 @@ ParallelBackupStart(ArchiveHandle *AH) pstate->parallelSlot[i].pid = pid; #endif - - pstate->parallelSlot[i].pipeRead = pipeWM[PIPE_READ]; - pstate->parallelSlot[i].pipeWrite = pipeMW[PIPE_WRITE]; } + /* + * Having forked off the workers, disable SIGPIPE so that master isn't + * killed if it tries to send a command to a dead worker. + */ +#ifndef WIN32 + signal(SIGPIPE, SIG_IGN); +#endif + return pstate; } @@ -977,16 +932,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) } else exit_horribly(modulename, - "invalid message received from worker: %s\n", msg); - } - else if (messageStartsWith(msg, "ERROR ")) - { - Assert(AH->format == archDirectory || AH->format == archCustom); - pstate->parallelSlot[worker].workerStatus = WRKR_TERMINATED; - exit_horribly(modulename, "%s", msg + strlen("ERROR ")); + "invalid message received from worker: \"%s\"\n", + msg); } else - exit_horribly(modulename, "invalid message received from worker: %s\n", msg); + exit_horribly(modulename, + "invalid message received from worker: \"%s\"\n", + msg); /* both Unix and Win32 return pg_malloc()ed space, so we free it */ free(msg); diff --git a/src/bin/pg_dump/parallel.h b/src/bin/pg_dump/parallel.h index 591653bcf0c..8d704283e18 100644 --- a/src/bin/pg_dump/parallel.h +++ b/src/bin/pg_dump/parallel.h @@ -42,9 +42,9 @@ typedef struct ParallelSlot ParallelArgs *args; T_WorkerStatus workerStatus; int status; - int pipeRead; + int pipeRead; /* master's end of the pipes */ int pipeWrite; - int pipeRevRead; + int pipeRevRead; /* child's end of the pipes */ int pipeRevWrite; #ifdef WIN32 uintptr_t hThread; diff --git a/src/bin/pg_dump/pg_backup_utils.c b/src/bin/pg_dump/pg_backup_utils.c index 0aa13cda433..5d1d87565a2 100644 --- a/src/bin/pg_dump/pg_backup_utils.c +++ b/src/bin/pg_dump/pg_backup_utils.c @@ -93,6 +93,23 @@ vwrite_msg(const char *modulename, const char *fmt, va_list ap) vfprintf(stderr, _(fmt), ap); } +/* + * Fail and die, with a message to stderr. Parameters as for write_msg. + * + * Note that on_exit_nicely callbacks will get run. + */ +void +exit_horribly(const char *modulename, const char *fmt,...) +{ + va_list ap; + + va_start(ap, fmt); + vwrite_msg(modulename, fmt, ap); + va_end(ap); + + exit_nicely(1); +} + /* Register a callback to be run when exit_nicely is invoked. */ void on_exit_nicely(on_exit_nicely_callback function, void *arg) @@ -106,7 +123,20 @@ on_exit_nicely(on_exit_nicely_callback function, void *arg) /* * Run accumulated on_exit_nicely callbacks in reverse order and then exit - * quietly. This needs to be thread-safe. + * without printing any message. + * + * If running in a parallel worker thread on Windows, we only exit the thread, + * not the whole process. + * + * Note that in parallel operation on Windows, the callback(s) will be run + * by each thread since the list state is necessarily shared by all threads; + * each callback must contain logic to ensure it does only what's appropriate + * for its thread. On Unix, callbacks are also run by each process, but only + * for callbacks established before we fork off the child processes. (It'd + * be cleaner to reset the list after fork(), and let each child establish + * its own callbacks; but then the behavior would be completely inconsistent + * between Windows and Unix. For now, just be sure to establish callbacks + * before forking to avoid inconsistency.) */ void exit_nicely(int code) |