diff options
author | Amit Kapila <akapila@postgresql.org> | 2023-05-09 09:28:06 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2023-05-09 09:28:06 +0530 |
commit | 3d144c6c86025272e1711539f5fafb6fb85c4feb (patch) | |
tree | e8e64126bdc26dbb43be7d0505118cd2a76d91f5 /src/backend/replication/logical/applyparallelworker.c | |
parent | 455f948b0d03a556533a7e4a1a8abf45f0eb202e (diff) | |
download | postgresql-3d144c6c86025272e1711539f5fafb6fb85c4feb.tar.gz postgresql-3d144c6c86025272e1711539f5fafb6fb85c4feb.zip |
Fix invalid memory access during the shutdown of the parallel apply worker.
The callback function pa_shutdown() accesses MyLogicalRepWorker which may
not be initialized if there is an error during the initialization of the
parallel apply worker. The other problem is that by the time it is invoked
even after the initialization of the worker, the MyLogicalRepWorker will
be reset by another callback logicalrep_worker_onexit. So, it won't have
the required information.
To fix this, register the shutdown callback after we are attached to the
worker slot.
After this fix, we observed another issue which is that sometimes the
leader apply worker tries to receive the message from the error queue that
might already be detached by the parallel apply worker leading to an
error. To prevent such an error, we ensure that the leader apply worker
detaches from the parallel apply worker's error queue before stopping it.
Reported-by: Sawada Masahiko
Author: Hou Zhijie
Reviewed-by: Sawada Masahiko, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoDo+yUwNq6nTrvE2h9bB2vZfcag=jxWc7QxuWCmkDAqcA@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/applyparallelworker.c')
-rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 30 |
1 files changed, 16 insertions, 14 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index ee7a18137fc..82c1ddcdcbf 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -577,16 +577,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo) list_length(ParallelApplyWorkerPool) > (max_parallel_apply_workers_per_subscription / 2)) { - int slot_no; - uint16 generation; - - SpinLockAcquire(&winfo->shared->mutex); - generation = winfo->shared->logicalrep_worker_generation; - slot_no = winfo->shared->logicalrep_worker_slot_no; - SpinLockRelease(&winfo->shared->mutex); - - logicalrep_pa_worker_stop(slot_no, generation); - + logicalrep_pa_worker_stop(winfo); pa_free_worker_info(winfo); return; @@ -636,8 +627,11 @@ pa_detach_all_error_mq(void) { ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); - shm_mq_detach(winfo->error_mq_handle); - winfo->error_mq_handle = NULL; + if (winfo->error_mq_handle) + { + shm_mq_detach(winfo->error_mq_handle); + winfo->error_mq_handle = NULL; + } } } @@ -845,6 +839,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh) * Make sure the leader apply worker tries to read from our error queue one more * time. This guards against the case where we exit uncleanly without sending * an ErrorResponse, for example because some code calls proc_exit directly. + * + * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks, + * if any. See ParallelWorkerShutdown for details. */ static void pa_shutdown(int code, Datum arg) @@ -901,8 +898,6 @@ ParallelApplyWorkerMain(Datum main_arg) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("bad magic number in dynamic shared memory segment"))); - before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); - /* Look up the shared information. */ shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false); MyParallelShared = shared; @@ -921,6 +916,13 @@ ParallelApplyWorkerMain(Datum main_arg) */ logicalrep_worker_attach(worker_slot); + /* + * Register the shutdown callback after we are attached to the worker + * slot. This is to ensure that MyLogicalRepWorker remains valid when this + * callback is invoked. + */ + before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); + SpinLockAcquire(&MyParallelShared->mutex); MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; MyParallelShared->logicalrep_worker_slot_no = worker_slot; |