diff options
author | Robert Haas <rhaas@postgresql.org> | 2018-02-02 09:00:59 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2018-02-02 09:00:59 -0500 |
commit | 9222c0d9ed9794d54fc3f5101498829eaec9e799 (patch) | |
tree | 53a706d621a1edc1a9c792f690604a23e978dff0 /src/backend/access/transam/parallel.c | |
parent | a2a22057617dc84b500f85938947c125183f1289 (diff) | |
download | postgresql-9222c0d9ed9794d54fc3f5101498829eaec9e799.tar.gz postgresql-9222c0d9ed9794d54fc3f5101498829eaec9e799.zip |
Add new function WaitForParallelWorkersToAttach.
Once this function has been called, we know that all workers have
started and attached to their error queues -- so if any of them
subsequently exit uncleanly, we'll be sure to throw an ERROR promptly.
Otherwise, users of the ParallelContext machinery must be careful not
to wait forever for a worker that has failed to start. Parallel query
manages to work without needing this for reasons explained in new
comments added by this patch, but it's a useful primitive for other
parallel operations, such as the pending patch to make creating a
btree index run in parallel.
Amit Kapila, revised by me. Additional review by Peter Geoghegan.
Discussion: http://postgr.es/m/CAA4eK1+e2MzyouF5bg=OtyhDSX+=Ao=3htN=T-r_6s3gCtKFiw@mail.gmail.com
Diffstat (limited to 'src/backend/access/transam/parallel.c')
-rw-r--r-- | src/backend/access/transam/parallel.c | 152 |
1 files changed, 144 insertions, 8 deletions
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 54d9ea7be05..5b45b07e7c1 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -437,10 +437,11 @@ ReinitializeParallelDSM(ParallelContext *pcxt) WaitForParallelWorkersToFinish(pcxt); WaitForParallelWorkersToExit(pcxt); pcxt->nworkers_launched = 0; - if (pcxt->any_message_received) + if (pcxt->known_attached_workers) { - pfree(pcxt->any_message_received); - pcxt->any_message_received = NULL; + pfree(pcxt->known_attached_workers); + pcxt->known_attached_workers = NULL; + pcxt->nknown_attached_workers = 0; } } @@ -542,17 +543,148 @@ LaunchParallelWorkers(ParallelContext *pcxt) /* * Now that nworkers_launched has taken its final value, we can initialize - * any_message_received. + * known_attached_workers. */ if (pcxt->nworkers_launched > 0) - pcxt->any_message_received = + { + pcxt->known_attached_workers = palloc0(sizeof(bool) * pcxt->nworkers_launched); + pcxt->nknown_attached_workers = 0; + } /* Restore previous memory context. */ MemoryContextSwitchTo(oldcontext); } /* + * Wait for all workers to attach to their error queues, and throw an error if + * any worker fails to do this. + * + * Callers can assume that if this function returns successfully, then the + * number of workers given by pcxt->nworkers_launched have initialized and + * attached to their error queues. Whether or not these workers are guaranteed + * to still be running depends on what code the caller asked them to run; + * this function does not guarantee that they have not exited. However, it + * does guarantee that any workers which exited must have done so cleanly and + * after successfully performing the work with which they were tasked. + * + * If this function is not called, then some of the workers that were launched + * may not have been started due to a fork() failure, or may have exited during + * early startup prior to attaching to the error queue, so nworkers_launched + * cannot be viewed as completely reliable. It will never be less than the + * number of workers which actually started, but it might be more. Any workers + * that failed to start will still be discovered by + * WaitForParallelWorkersToFinish and an error will be thrown at that time, + * provided that function is eventually reached. + * + * In general, the leader process should do as much work as possible before + * calling this function. fork() failures and other early-startup failures + * are very uncommon, and having the leader sit idle when it could be doing + * useful work is undesirable. However, if the leader needs to wait for + * all of its workers or for a specific worker, it may want to call this + * function before doing so. If not, it must make some other provision for + * the failure-to-start case, lest it wait forever. On the other hand, a + * leader which never waits for a worker that might not be started yet, or + * at least never does so prior to WaitForParallelWorkersToFinish(), need not + * call this function at all. + */ +void +WaitForParallelWorkersToAttach(ParallelContext *pcxt) +{ + int i; + + /* Skip this if we have no launched workers. */ + if (pcxt->nworkers_launched == 0) + return; + + for (;;) + { + /* + * This will process any parallel messages that are pending and it may + * also throw an error propagated from a worker. + */ + CHECK_FOR_INTERRUPTS(); + + for (i = 0; i < pcxt->nworkers_launched; ++i) + { + BgwHandleStatus status; + shm_mq *mq; + int rc; + pid_t pid; + + if (pcxt->known_attached_workers[i]) + continue; + + /* + * If error_mqh is NULL, then the worker has already exited + * cleanly. + */ + if (pcxt->worker[i].error_mqh == NULL) + { + pcxt->known_attached_workers[i] = true; + ++pcxt->nknown_attached_workers; + continue; + } + + status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid); + if (status == BGWH_STARTED) + { + /* Has the worker attached to the error queue? */ + mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); + if (shm_mq_get_sender(mq) != NULL) + { + /* Yes, so it is known to be attached. */ + pcxt->known_attached_workers[i] = true; + ++pcxt->nknown_attached_workers; + } + } + else if (status == BGWH_STOPPED) + { + /* + * If the worker stopped without attaching to the error queue, + * throw an error. + */ + mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); + if (shm_mq_get_sender(mq) == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("parallel worker failed to initialize"), + errhint("More details may be available in the server log."))); + + pcxt->known_attached_workers[i] = true; + ++pcxt->nknown_attached_workers; + } + else + { + /* + * Worker not yet started, so we must wait. The postmaster + * will notify us if the worker's state changes. Our latch + * might also get set for some other reason, but if so we'll + * just end up waiting for the same worker again. + */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_POSTMASTER_DEATH, + -1, WAIT_EVENT_BGWORKER_STARTUP); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + + if (rc & WL_LATCH_SET) + ResetLatch(MyLatch); + } + } + + /* If all workers are known to have started, we're done. */ + if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched) + { + Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched); + break; + } + } +} + +/* * Wait for all workers to finish computing. * * Even if the parallel operation seems to have completed successfully, it's @@ -589,7 +721,7 @@ WaitForParallelWorkersToFinish(ParallelContext *pcxt) */ if (pcxt->worker[i].error_mqh == NULL) ++nfinished; - else if (pcxt->any_message_received[i]) + else if (pcxt->known_attached_workers[i]) { anyone_alive = true; break; @@ -909,8 +1041,12 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) { char msgtype; - if (pcxt->any_message_received != NULL) - pcxt->any_message_received[i] = true; + if (pcxt->known_attached_workers != NULL && + !pcxt->known_attached_workers[i]) + { + pcxt->known_attached_workers[i] = true; + pcxt->nknown_attached_workers++; + } msgtype = pq_getmsgbyte(msg); |