diff options
Diffstat (limited to 'src/bin/pg_dump/parallel.c')
-rw-r--r-- | src/bin/pg_dump/parallel.c | 62 |
1 files changed, 54 insertions, 8 deletions
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 0e2bfa106a7..5630dc626d7 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -45,6 +45,8 @@ * WRKR_IDLE: it's waiting for a command * WRKR_WORKING: it's working on a command * WRKR_TERMINATED: process ended + * The pstate->te[] entry for each worker is valid when it's in WRKR_WORKING + * state, and must be NULL in other states. */ #include "postgres_fe.h" @@ -71,6 +73,45 @@ #define NO_SLOT (-1) /* Failure result for GetIdleWorker() */ +/* Worker process statuses */ +typedef enum +{ + WRKR_IDLE, + WRKR_WORKING, + WRKR_TERMINATED +} T_WorkerStatus; + +/* + * Private per-parallel-worker state (typedef for this is in parallel.h). + * + * Much of this is valid only in the master process (or, on Windows, should + * be touched only by the master thread). But the AH field should be touched + * only by workers. The pipe descriptors are valid everywhere. + */ +struct ParallelSlot +{ + T_WorkerStatus workerStatus; /* see enum above */ + + /* These fields are valid if workerStatus == WRKR_WORKING: */ + ParallelCompletionPtr callback; /* function to call on completion */ + void *callback_data; /* passthru data for it */ + + ArchiveHandle *AH; /* Archive data worker is using */ + + int pipeRead; /* master's end of the pipes */ + int pipeWrite; + int pipeRevRead; /* child's end of the pipes */ + int pipeRevWrite; + + /* Child process/thread identity info: */ +#ifdef WIN32 + uintptr_t hThread; + unsigned int threadId; +#else + pid_t pid; +#endif +}; + #ifdef WIN32 /* @@ -475,9 +516,10 @@ WaitForTerminatingWorkers(ParallelState *pstate) } #endif /* WIN32 */ - /* On all platforms, update workerStatus as well */ + /* On all platforms, update workerStatus and te[] as well */ Assert(j < pstate->numWorkers); slot->workerStatus = WRKR_TERMINATED; + pstate->te[j] = NULL; } } @@ -870,20 +912,22 @@ ParallelBackupStart(ArchiveHandle *AH) { ParallelState *pstate; int i; - const size_t slotSize = AH->public.numWorkers * sizeof(ParallelSlot); Assert(AH->public.numWorkers > 0); pstate = (ParallelState *) pg_malloc(sizeof(ParallelState)); pstate->numWorkers = AH->public.numWorkers; + pstate->te = NULL; pstate->parallelSlot = NULL; if (AH->public.numWorkers == 1) return pstate; - pstate->parallelSlot = (ParallelSlot *) pg_malloc(slotSize); - memset((void *) pstate->parallelSlot, 0, slotSize); + pstate->te = (TocEntry **) + pg_malloc0(pstate->numWorkers * sizeof(TocEntry *)); + pstate->parallelSlot = (ParallelSlot *) + pg_malloc0(pstate->numWorkers * sizeof(ParallelSlot)); #ifdef WIN32 /* Make fmtId() and fmtQualifiedId() use thread-local storage */ @@ -929,9 +973,10 @@ ParallelBackupStart(ArchiveHandle *AH) "could not create communication channels: %s\n", strerror(errno)); + pstate->te[i] = NULL; /* just for safety */ + slot->workerStatus = WRKR_IDLE; slot->AH = NULL; - slot->te = NULL; slot->callback = NULL; slot->callback_data = NULL; @@ -1062,6 +1107,7 @@ ParallelBackupEnd(ArchiveHandle *AH, ParallelState *pstate) set_cancel_pstate(NULL); /* Release state (mere neatnik-ism, since we're about to terminate) */ + free(pstate->te); free(pstate->parallelSlot); free(pstate); } @@ -1201,9 +1247,9 @@ DispatchJobForTocEntry(ArchiveHandle *AH, /* Remember worker is busy, and which TocEntry it's working on */ pstate->parallelSlot[worker].workerStatus = WRKR_WORKING; - pstate->parallelSlot[worker].te = te; pstate->parallelSlot[worker].callback = callback; pstate->parallelSlot[worker].callback_data = callback_data; + pstate->te[worker] = te; } /* @@ -1394,13 +1440,13 @@ ListenToWorkers(ArchiveHandle *AH, ParallelState *pstate, bool do_wait) if (messageStartsWith(msg, "OK ")) { ParallelSlot *slot = &pstate->parallelSlot[worker]; - TocEntry *te = slot->te; + TocEntry *te = pstate->te[worker]; int status; status = parseWorkerResponse(AH, te, msg); slot->callback(AH, te, status, slot->callback_data); slot->workerStatus = WRKR_IDLE; - slot->te = NULL; + pstate->te[worker] = NULL; } else exit_horribly(modulename, |