diff options
Diffstat (limited to 'src/backend/replication/logical/applyparallelworker.c')
-rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 30 |
1 files changed, 16 insertions, 14 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index ee7a18137fc..82c1ddcdcbf 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -577,16 +577,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) list_length(ParallelApplyWorkerPool) > (max_parallel_apply_workers_per_subscription / 2)) { - int slot_no; - uint16 generation; - - SpinLockAcquire(&winfo->shared->mutex); - generation = winfo->shared->logicalrep_worker_generation; - slot_no = winfo->shared->logicalrep_worker_slot_no; - SpinLockRelease(&winfo->shared->mutex); - - logicalrep_pa_worker_stop(slot_no, generation); - + logicalrep_pa_worker_stop(winfo); pa_free_worker_info(winfo); return; @@ -636,8 +627,11 @@ pa_detach_all_error_mq(void) { ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); - shm_mq_detach(winfo->error_mq_handle); - winfo->error_mq_handle = NULL; + if (winfo->error_mq_handle) + { + shm_mq_detach(winfo->error_mq_handle); + winfo->error_mq_handle = NULL; + } } } @@ -845,6 +839,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) * Make sure the leader apply worker tries to read from our error queue one more * time. This guards against the case where we exit uncleanly without sending * an ErrorResponse, for example because some code calls proc_exit directly. + * + * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks, + * if any. See ParallelWorkerShutdown for details. */ static void pa_shutdown(int code, Datum arg) @@ -901,8 +898,6 @@ ParallelApplyWorkerMain(Datum main_arg) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("bad magic number in dynamic shared memory segment"))); - before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); - /* Look up the shared information. */ shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false); MyParallelShared = shared; @@ -921,6 +916,13 @@ ParallelApplyWorkerMain(Datum main_arg) */ logicalrep_worker_attach(worker_slot); + /* + * Register the shutdown callback after we are attached to the worker + * slot. This is to ensure that MyLogicalRepWorker remains valid when this + * callback is invoked. + */ + before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); + SpinLockAcquire(&MyParallelShared->mutex); MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; MyParallelShared->logicalrep_worker_slot_no = worker_slot; |