aboutsummaryrefslogtreecommitdiff
path: root/src/bin/pg_dump/parallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/bin/pg_dump/parallel.c')
-rw-r--r--src/bin/pg_dump/parallel.c29
1 files changed, 17 insertions, 12 deletions
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c
index 1410bcdb662..c25e3f7a888 100644
--- a/src/bin/pg_dump/parallel.c
+++ b/src/bin/pg_dump/parallel.c
@@ -42,6 +42,7 @@
*
* In the master process, the workerStatus field for each worker has one of
* the following values:
+ * WRKR_NOT_STARTED: we've not yet forked this worker
* WRKR_IDLE: it's waiting for a command
* WRKR_WORKING: it's working on a command
* WRKR_TERMINATED: process ended
@@ -75,11 +76,15 @@
/* Worker process statuses */
typedef enum
{
+ WRKR_NOT_STARTED = 0,
WRKR_IDLE,
WRKR_WORKING,
WRKR_TERMINATED
} T_WorkerStatus;
+#define WORKER_IS_RUNNING(workerStatus) \
+ ((workerStatus) == WRKR_IDLE || (workerStatus) == WRKR_WORKING)
+
/*
* Private per-parallel-worker state (typedef for this is in parallel.h).
*
@@ -412,7 +417,9 @@ ShutdownWorkersHard(ParallelState *pstate)
/*
* Close our write end of the sockets so that any workers waiting for
- * commands know they can exit.
+ * commands know they can exit. (Note: some of the pipeWrite fields might
+ * still be zero, if we failed to initialize all the workers. Hence, just
+ * ignore errors here.)
*/
for (i = 0; i < pstate->numWorkers; i++)
closesocket(pstate->parallelSlot[i].pipeWrite);
@@ -486,7 +493,7 @@ WaitForTerminatingWorkers(ParallelState *pstate)
for (j = 0; j < pstate->numWorkers; j++)
{
- if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED)
+ if (WORKER_IS_RUNNING(pstate->parallelSlot[j].workerStatus))
{
lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread;
nrun++;
@@ -922,6 +929,7 @@ ParallelBackupStart(ArchiveHandle *AH)
if (AH->public.numWorkers == 1)
return pstate;
+ /* Create status arrays, being sure to initialize all fields to 0 */
pstate->te = (TocEntry **)
pg_malloc0(pstate->numWorkers * sizeof(TocEntry *));
pstate->parallelSlot = (ParallelSlot *)
@@ -969,13 +977,6 @@ ParallelBackupStart(ArchiveHandle *AH)
if (pgpipe(pipeMW) < 0 || pgpipe(pipeWM) < 0)
fatal("could not create communication channels: %m");
- pstate->te[i] = NULL; /* just for safety */
-
- slot->workerStatus = WRKR_IDLE;
- slot->AH = NULL;
- slot->callback = NULL;
- slot->callback_data = NULL;
-
/* master's ends of the pipes */
slot->pipeRead = pipeWM[PIPE_READ];
slot->pipeWrite = pipeMW[PIPE_WRITE];
@@ -993,6 +994,7 @@ ParallelBackupStart(ArchiveHandle *AH)
handle = _beginthreadex(NULL, 0, (void *) &init_spawned_worker_win32,
wi, 0, &(slot->threadId));
slot->hThread = handle;
+ slot->workerStatus = WRKR_IDLE;
#else /* !WIN32 */
pid = fork();
if (pid == 0)
@@ -1035,6 +1037,7 @@ ParallelBackupStart(ArchiveHandle *AH)
/* In Master after successful fork */
slot->pid = pid;
+ slot->workerStatus = WRKR_IDLE;
/* close read end of Master -> Worker */
closesocket(pipeMW[PIPE_READ]);
@@ -1262,7 +1265,7 @@ GetIdleWorker(ParallelState *pstate)
}
/*
- * Return true iff every worker is in the WRKR_TERMINATED state.
+ * Return true iff no worker is running.
*/
static bool
HasEveryWorkerTerminated(ParallelState *pstate)
@@ -1271,7 +1274,7 @@ HasEveryWorkerTerminated(ParallelState *pstate)
for (i = 0; i < pstate->numWorkers; i++)
{
- if (pstate->parallelSlot[i].workerStatus != WRKR_TERMINATED)
+ if (WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
return false;
}
return true;
@@ -1603,7 +1606,7 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
FD_ZERO(&workerset);
for (i = 0; i < pstate->numWorkers; i++)
{
- if (pstate->parallelSlot[i].workerStatus == WRKR_TERMINATED)
+ if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
continue;
FD_SET(pstate->parallelSlot[i].pipeRead, &workerset);
if (pstate->parallelSlot[i].pipeRead > maxFd)
@@ -1628,6 +1631,8 @@ getMessageFromWorker(ParallelState *pstate, bool do_wait, int *worker)
{
char *msg;
+ if (!WORKER_IS_RUNNING(pstate->parallelSlot[i].workerStatus))
+ continue;
if (!FD_ISSET(pstate->parallelSlot[i].pipeRead, &workerset))
continue;