diff options
Diffstat (limited to 'src/backend/storage/ipc/shm_mq.c')
-rw-r--r-- | src/backend/storage/ipc/shm_mq.c | 43 |
1 files changed, 36 insertions, 7 deletions
diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index f45a67cc278..770559a03e3 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -83,7 +83,9 @@ struct shm_mq * This structure is a backend-private handle for access to a queue. * * mqh_queue is a pointer to the queue we've attached, and mqh_segment is - * a pointer to the dynamic shared memory segment that contains it. + * an optional pointer to the dynamic shared memory segment that contains it. + * (If mqh_segment is provided, we register an on_dsm_detach callback to + * make sure we detach from the queue before detaching from DSM.) * * If this queue is intended to connect the current process with a background * worker that started it, the user can pass a pointer to the worker handle @@ -139,6 +141,7 @@ struct shm_mq_handle MemoryContext mqh_context; }; +static void shm_mq_detach_internal(shm_mq *mq); static shm_mq_result shm_mq_send_bytes(shm_mq_handle *mq, Size nbytes, const void *data, bool nowait, Size *bytes_written); static shm_mq_result shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, @@ -288,14 +291,15 @@ shm_mq_attach(shm_mq *mq, dsm_segment *seg, BackgroundWorkerHandle *handle) Assert(mq->mq_receiver == MyProc || mq->mq_sender == MyProc); mqh->mqh_queue = mq; mqh->mqh_segment = seg; - mqh->mqh_buffer = NULL; mqh->mqh_handle = handle; + mqh->mqh_buffer = NULL; mqh->mqh_buflen = 0; mqh->mqh_consume_pending = 0; - mqh->mqh_context = CurrentMemoryContext; mqh->mqh_partial_bytes = 0; + mqh->mqh_expected_bytes = 0; mqh->mqh_length_word_complete = false; mqh->mqh_counterparty_attached = false; + mqh->mqh_context = CurrentMemoryContext; if (seg != NULL) on_dsm_detach(seg, shm_mq_detach_callback, PointerGetDatum(mq)); @@ -765,7 +769,28 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh) } /* - * Detach a shared message queue. + * Detach from a shared message queue, and destroy the shm_mq_handle. + */ +void +shm_mq_detach(shm_mq_handle *mqh) +{ + /* Notify counterparty that we're outta here. */ + shm_mq_detach_internal(mqh->mqh_queue); + + /* Cancel on_dsm_detach callback, if any. */ + if (mqh->mqh_segment) + cancel_on_dsm_detach(mqh->mqh_segment, + shm_mq_detach_callback, + PointerGetDatum(mqh->mqh_queue)); + + /* Release local memory associated with handle. */ + if (mqh->mqh_buffer != NULL) + pfree(mqh->mqh_buffer); + pfree(mqh); +} + +/* + * Notify counterparty that we're detaching from shared message queue. * * The purpose of this function is to make sure that the process * with which we're communicating doesn't block forever waiting for us to @@ -773,9 +798,13 @@ shm_mq_wait_for_attach(shm_mq_handle *mqh) * detaches, the receiver can read any messages remaining in the queue; * further reads will return SHM_MQ_DETACHED. If the receiver detaches, * further attempts to send messages will likewise return SHM_MQ_DETACHED. + * + * This is separated out from shm_mq_detach() because if the on_dsm_detach + * callback fires, we only want to do this much. We do not try to touch + * the local shm_mq_handle, as it may have been pfree'd already. */ -void -shm_mq_detach(shm_mq *mq) +static void +shm_mq_detach_internal(shm_mq *mq) { volatile shm_mq *vmq = mq; PGPROC *victim; @@ -1193,5 +1222,5 @@ shm_mq_detach_callback(dsm_segment *seg, Datum arg) { shm_mq *mq = (shm_mq *) DatumGetPointer(arg); - shm_mq_detach(mq); + shm_mq_detach_internal(mq); } |