diff options
-rw-r--r-- | src/bin/pg_dump/parallel.c | 53 | ||||
-rw-r--r-- | src/bin/pg_dump/pg_backup_utils.c | 2 |
2 files changed, 37 insertions, 18 deletions
diff --git a/src/bin/pg_dump/parallel.c b/src/bin/pg_dump/parallel.c index 2f12a8689b4..c656ba5a897 100644 --- a/src/bin/pg_dump/parallel.c +++ b/src/bin/pg_dump/parallel.c @@ -327,7 +327,7 @@ ShutdownWorkersHard(ParallelState *pstate) } /* - * Wait for the termination of the processes using the OS-specific method. + * Wait for all workers to terminate. */ static void WaitForTerminatingWorkers(ParallelState *pstate) @@ -338,39 +338,58 @@ WaitForTerminatingWorkers(ParallelState *pstate) int j; #ifndef WIN32 + /* On non-Windows, use wait() to wait for next worker to end */ int status; pid_t pid = wait(&status); + /* Find dead worker's slot, and clear the PID field */ for (j = 0; j < pstate->numWorkers; j++) - if (pstate->parallelSlot[j].pid == pid) - slot = &(pstate->parallelSlot[j]); -#else - uintptr_t hThread; - DWORD ret; - uintptr_t *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers); + { + slot = &(pstate->parallelSlot[j]); + if (slot->pid == pid) + { + slot->pid = 0; + break; + } + } +#else /* WIN32 */ + /* On Windows, we must use WaitForMultipleObjects() */ + HANDLE *lpHandles = pg_malloc(sizeof(HANDLE) * pstate->numWorkers); int nrun = 0; + DWORD ret; + uintptr_t hThread; for (j = 0; j < pstate->numWorkers; j++) + { if (pstate->parallelSlot[j].workerStatus != WRKR_TERMINATED) { - lpHandles[nrun] = pstate->parallelSlot[j].hThread; + lpHandles[nrun] = (HANDLE) pstate->parallelSlot[j].hThread; nrun++; } - ret = WaitForMultipleObjects(nrun, (HANDLE *) lpHandles, false, INFINITE); + } + ret = WaitForMultipleObjects(nrun, lpHandles, false, INFINITE); Assert(ret != WAIT_FAILED); - hThread = lpHandles[ret - WAIT_OBJECT_0]; + hThread = (uintptr_t) lpHandles[ret - WAIT_OBJECT_0]; + free(lpHandles); + /* Find dead worker's slot, and clear the hThread field */ for (j = 0; j < pstate->numWorkers; j++) - if (pstate->parallelSlot[j].hThread == hThread) - slot = &(pstate->parallelSlot[j]); - - free(lpHandles); -#endif - Assert(slot); + { + slot = &(pstate->parallelSlot[j]); + if (slot->hThread == hThread) + { + /* For cleanliness, close handles for dead threads */ + CloseHandle((HANDLE) slot->hThread); + slot->hThread = (uintptr_t) INVALID_HANDLE_VALUE; + break; + } + } +#endif /* WIN32 */ + /* On all platforms, update workerStatus as well */ + Assert(j < pstate->numWorkers); slot->workerStatus = WRKR_TERMINATED; } - Assert(HasEveryWorkerTerminated(pstate)); } #ifndef WIN32 diff --git a/src/bin/pg_dump/pg_backup_utils.c b/src/bin/pg_dump/pg_backup_utils.c index 5d1d87565a2..01bf5764551 100644 --- a/src/bin/pg_dump/pg_backup_utils.c +++ b/src/bin/pg_dump/pg_backup_utils.c @@ -149,7 +149,7 @@ exit_nicely(int code) #ifdef WIN32 if (parallel_init_done && GetCurrentThreadId() != mainThreadId) - ExitThread(code); + _endthreadex(code); #endif exit(code); |