diff options
Diffstat (limited to 'src/backend/commands/async.c')
-rw-r--r-- | src/backend/commands/async.c | 116 |
1 files changed, 58 insertions, 58 deletions
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c index 4613bd1cfb6..a3ba88d7ff4 100644 --- a/src/backend/commands/async.c +++ b/src/backend/commands/async.c @@ -107,7 +107,7 @@ * frontend during startup.) The above design guarantees that notifies from * other backends will never be missed by ignoring self-notifies. * - * The amount of shared memory used for notify management (NUM_ASYNC_BUFFERS) + * The amount of shared memory used for notify management (NUM_NOTIFY_BUFFERS) * can be varied without affecting anything but performance. The maximum * amount of notification data that can be queued at one time is determined * by slru.c's wraparound limit; see QUEUE_MAX_PAGE below. @@ -225,7 +225,7 @@ typedef struct QueuePosition * * Resist the temptation to make this really large. While that would save * work in some places, it would add cost in others. In particular, this - * should likely be less than NUM_ASYNC_BUFFERS, to ensure that backends + * should likely be less than NUM_NOTIFY_BUFFERS, to ensure that backends * catch up before the pages they'll need to read fall out of SLRU cache. */ #define QUEUE_CLEANUP_DELAY 4 @@ -244,7 +244,7 @@ typedef struct QueueBackendStatus /* * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff) * - * The AsyncQueueControl structure is protected by the AsyncQueueLock. + * The AsyncQueueControl structure is protected by the NotifyQueueLock. * * When holding the lock in SHARED mode, backends may only inspect their own * entries as well as the head and tail pointers. Consequently we can allow a @@ -254,9 +254,9 @@ typedef struct QueueBackendStatus * When holding the lock in EXCLUSIVE mode, backends can inspect the entries * of other backends and also change the head and tail pointers. * - * AsyncCtlLock is used as the control lock for the pg_notify SLRU buffers. + * NotifySLRULock is used as the control lock for the pg_notify SLRU buffers. * In order to avoid deadlocks, whenever we need both locks, we always first - * get AsyncQueueLock and then AsyncCtlLock. + * get NotifyQueueLock and then NotifySLRULock. * * Each backend uses the backend[] array entry with index equal to its * BackendId (which can range from 1 to MaxBackends). We rely on this to make @@ -292,9 +292,9 @@ static AsyncQueueControl *asyncQueueControl; /* * The SLRU buffer area through which we access the notification queue */ -static SlruCtlData AsyncCtlData; +static SlruCtlData NotifyCtlData; -#define AsyncCtl (&AsyncCtlData) +#define NotifyCtl (&NotifyCtlData) #define QUEUE_PAGESIZE BLCKSZ #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ @@ -506,7 +506,7 @@ AsyncShmemSize(void) size = mul_size(MaxBackends + 1, sizeof(QueueBackendStatus)); size = add_size(size, offsetof(AsyncQueueControl, backend)); - size = add_size(size, SimpleLruShmemSize(NUM_ASYNC_BUFFERS, 0)); + size = add_size(size, SimpleLruShmemSize(NUM_NOTIFY_BUFFERS, 0)); return size; } @@ -552,18 +552,18 @@ AsyncShmemInit(void) /* * Set up SLRU management of the pg_notify data. */ - AsyncCtl->PagePrecedes = asyncQueuePagePrecedes; - SimpleLruInit(AsyncCtl, "async", NUM_ASYNC_BUFFERS, 0, - AsyncCtlLock, "pg_notify", LWTRANCHE_ASYNC_BUFFERS); + NotifyCtl->PagePrecedes = asyncQueuePagePrecedes; + SimpleLruInit(NotifyCtl, "Notify", NUM_NOTIFY_BUFFERS, 0, + NotifySLRULock, "pg_notify", LWTRANCHE_NOTIFY_BUFFER); /* Override default assumption that writes should be fsync'd */ - AsyncCtl->do_fsync = false; + NotifyCtl->do_fsync = false; if (!found) { /* * During start or reboot, clean out the pg_notify directory. */ - (void) SlruScanDirectory(AsyncCtl, SlruScanDirCbDeleteAll, NULL); + (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL); } } @@ -918,7 +918,7 @@ PreCommit_Notify(void) * Make sure that we have an XID assigned to the current transaction. * GetCurrentTransactionId is cheap if we already have an XID, but not * so cheap if we don't, and we'd prefer not to do that work while - * holding AsyncQueueLock. + * holding NotifyQueueLock. */ (void) GetCurrentTransactionId(); @@ -949,7 +949,7 @@ PreCommit_Notify(void) { /* * Add the pending notifications to the queue. We acquire and - * release AsyncQueueLock once per page, which might be overkill + * release NotifyQueueLock once per page, which might be overkill * but it does allow readers to get in while we're doing this. * * A full queue is very uncommon and should really not happen, @@ -959,14 +959,14 @@ PreCommit_Notify(void) * transaction, but we have not yet committed to clog, so at this * point in time we can still roll the transaction back. */ - LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); asyncQueueFillWarning(); if (asyncQueueIsFull()) ereport(ERROR, (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("too many notifications in the NOTIFY queue"))); nextNotify = asyncQueueAddEntries(nextNotify); - LWLockRelease(AsyncQueueLock); + LWLockRelease(NotifyQueueLock); } } } @@ -1075,7 +1075,7 @@ Exec_ListenPreCommit(void) * We need exclusive lock here so we can look at other backends' entries * and manipulate the list links. */ - LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); head = QUEUE_HEAD; max = QUEUE_TAIL; prevListener = InvalidBackendId; @@ -1101,7 +1101,7 @@ Exec_ListenPreCommit(void) QUEUE_NEXT_LISTENER(MyBackendId) = QUEUE_FIRST_LISTENER; QUEUE_FIRST_LISTENER = MyBackendId; } - LWLockRelease(AsyncQueueLock); + LWLockRelease(NotifyQueueLock); /* Now we are listed in the global array, so remember we're listening */ amRegisteredListener = true; @@ -1308,7 +1308,7 @@ asyncQueueUnregister(void) /* * Need exclusive lock here to manipulate list links. */ - LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); /* Mark our entry as invalid */ QUEUE_BACKEND_PID(MyBackendId) = InvalidPid; QUEUE_BACKEND_DBOID(MyBackendId) = InvalidOid; @@ -1327,7 +1327,7 @@ asyncQueueUnregister(void) } } QUEUE_NEXT_LISTENER(MyBackendId) = InvalidBackendId; - LWLockRelease(AsyncQueueLock); + LWLockRelease(NotifyQueueLock); /* mark ourselves as no longer listed in the global array */ amRegisteredListener = false; @@ -1336,7 +1336,7 @@ asyncQueueUnregister(void) /* * Test whether there is room to insert more notification messages. * - * Caller must hold at least shared AsyncQueueLock. + * Caller must hold at least shared NotifyQueueLock. */ static bool asyncQueueIsFull(void) @@ -1437,8 +1437,8 @@ asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) * notification to write and return the first still-unwritten cell back. * Eventually we will return NULL indicating all is done. * - * We are holding AsyncQueueLock already from the caller and grab AsyncCtlLock - * locally in this function. + * We are holding NotifyQueueLock already from the caller and grab + * NotifySLRULock locally in this function. */ static ListCell * asyncQueueAddEntries(ListCell *nextNotify) @@ -1449,8 +1449,8 @@ asyncQueueAddEntries(ListCell *nextNotify) int offset; int slotno; - /* We hold both AsyncQueueLock and AsyncCtlLock during this operation */ - LWLockAcquire(AsyncCtlLock, LW_EXCLUSIVE); + /* We hold both NotifyQueueLock and NotifySLRULock during this operation */ + LWLockAcquire(NotifySLRULock, LW_EXCLUSIVE); /* * We work with a local copy of QUEUE_HEAD, which we write back to shared @@ -1475,13 +1475,13 @@ asyncQueueAddEntries(ListCell *nextNotify) */ pageno = QUEUE_POS_PAGE(queue_head); if (QUEUE_POS_IS_ZERO(queue_head)) - slotno = SimpleLruZeroPage(AsyncCtl, pageno); + slotno = SimpleLruZeroPage(NotifyCtl, pageno); else - slotno = SimpleLruReadPage(AsyncCtl, pageno, true, + slotno = SimpleLruReadPage(NotifyCtl, pageno, true, InvalidTransactionId); /* Note we mark the page dirty before writing in it */ - AsyncCtl->shared->page_dirty[slotno] = true; + NotifyCtl->shared->page_dirty[slotno] = true; while (nextNotify != NULL) { @@ -1512,7 +1512,7 @@ asyncQueueAddEntries(ListCell *nextNotify) } /* Now copy qe into the shared buffer page */ - memcpy(AsyncCtl->shared->page_buffer[slotno] + offset, + memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, &qe, qe.length); @@ -1527,7 +1527,7 @@ asyncQueueAddEntries(ListCell *nextNotify) * asyncQueueIsFull() ensured that there is room to create this * page without overrunning the queue. */ - slotno = SimpleLruZeroPage(AsyncCtl, QUEUE_POS_PAGE(queue_head)); + slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); /* * If the new page address is a multiple of QUEUE_CLEANUP_DELAY, @@ -1545,7 +1545,7 @@ asyncQueueAddEntries(ListCell *nextNotify) /* Success, so update the global QUEUE_HEAD */ QUEUE_HEAD = queue_head; - LWLockRelease(AsyncCtlLock); + LWLockRelease(NotifySLRULock); return nextNotify; } @@ -1562,9 +1562,9 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS) /* Advance the queue tail so we don't report a too-large result */ asyncQueueAdvanceTail(); - LWLockAcquire(AsyncQueueLock, LW_SHARED); + LWLockAcquire(NotifyQueueLock, LW_SHARED); usage = asyncQueueUsage(); - LWLockRelease(AsyncQueueLock); + LWLockRelease(NotifyQueueLock); PG_RETURN_FLOAT8(usage); } @@ -1572,7 +1572,7 @@ pg_notification_queue_usage(PG_FUNCTION_ARGS) /* * Return the fraction of the queue that is currently occupied. * - * The caller must hold AsyncQueueLock in (at least) shared mode. + * The caller must hold NotifyQueueLock in (at least) shared mode. */ static double asyncQueueUsage(void) @@ -1601,7 +1601,7 @@ asyncQueueUsage(void) * This is unlikely given the size of the queue, but possible. * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. * - * Caller must hold exclusive AsyncQueueLock. + * Caller must hold exclusive NotifyQueueLock. */ static void asyncQueueFillWarning(void) @@ -1665,7 +1665,7 @@ SignalBackends(void) /* * Identify backends that we need to signal. We don't want to send - * signals while holding the AsyncQueueLock, so this loop just builds a + * signals while holding the NotifyQueueLock, so this loop just builds a * list of target PIDs. * * XXX in principle these pallocs could fail, which would be bad. Maybe @@ -1676,7 +1676,7 @@ SignalBackends(void) ids = (BackendId *) palloc(MaxBackends * sizeof(BackendId)); count = 0; - LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) { int32 pid = QUEUE_BACKEND_PID(i); @@ -1710,7 +1710,7 @@ SignalBackends(void) ids[count] = i; count++; } - LWLockRelease(AsyncQueueLock); + LWLockRelease(NotifyQueueLock); /* Now send signals */ for (int i = 0; i < count; i++) @@ -1720,7 +1720,7 @@ SignalBackends(void) /* * Note: assuming things aren't broken, a signal failure here could * only occur if the target backend exited since we released - * AsyncQueueLock; which is unlikely but certainly possible. So we + * NotifyQueueLock; which is unlikely but certainly possible. So we * just log a low-level debug message if it happens. */ if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, ids[i]) < 0) @@ -1930,12 +1930,12 @@ asyncQueueReadAllNotifications(void) } page_buffer; /* Fetch current state */ - LWLockAcquire(AsyncQueueLock, LW_SHARED); + LWLockAcquire(NotifyQueueLock, LW_SHARED); /* Assert checks that we have a valid state entry */ Assert(MyProcPid == QUEUE_BACKEND_PID(MyBackendId)); pos = oldpos = QUEUE_BACKEND_POS(MyBackendId); head = QUEUE_HEAD; - LWLockRelease(AsyncQueueLock); + LWLockRelease(NotifyQueueLock); if (QUEUE_POS_EQUAL(pos, head)) { @@ -1990,7 +1990,7 @@ asyncQueueReadAllNotifications(void) * that happens it is critical that we not try to send the same message * over and over again. Therefore, we place a PG_TRY block here that will * forcibly advance our queue position before we lose control to an error. - * (We could alternatively retake AsyncQueueLock and move the position + * (We could alternatively retake NotifyQueueLock and move the position * before handling each individual message, but that seems like too much * lock traffic.) */ @@ -2007,11 +2007,11 @@ asyncQueueReadAllNotifications(void) /* * We copy the data from SLRU into a local buffer, so as to avoid - * holding the AsyncCtlLock while we are examining the entries and - * possibly transmitting them to our frontend. Copy only the part - * of the page we will actually inspect. + * holding the NotifySLRULock while we are examining the entries + * and possibly transmitting them to our frontend. Copy only the + * part of the page we will actually inspect. */ - slotno = SimpleLruReadPage_ReadOnly(AsyncCtl, curpage, + slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, InvalidTransactionId); if (curpage == QUEUE_POS_PAGE(head)) { @@ -2026,10 +2026,10 @@ asyncQueueReadAllNotifications(void) copysize = QUEUE_PAGESIZE - curoffset; } memcpy(page_buffer.buf + curoffset, - AsyncCtl->shared->page_buffer[slotno] + curoffset, + NotifyCtl->shared->page_buffer[slotno] + curoffset, copysize); /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ - LWLockRelease(AsyncCtlLock); + LWLockRelease(NotifySLRULock); /* * Process messages up to the stop position, end of page, or an @@ -2040,7 +2040,7 @@ asyncQueueReadAllNotifications(void) * But if it has, we will receive (or have already received and * queued) another signal and come here again. * - * We are not holding AsyncQueueLock here! The queue can only + * We are not holding NotifyQueueLock here! The queue can only * extend beyond the head pointer (see above) and we leave our * backend's pointer where it is so nobody will truncate or * rewrite pages under us. Especially we don't want to hold a lock @@ -2054,9 +2054,9 @@ asyncQueueReadAllNotifications(void) PG_FINALLY(); { /* Update shared state */ - LWLockAcquire(AsyncQueueLock, LW_SHARED); + LWLockAcquire(NotifyQueueLock, LW_SHARED); QUEUE_BACKEND_POS(MyBackendId) = pos; - LWLockRelease(AsyncQueueLock); + LWLockRelease(NotifyQueueLock); } PG_END_TRY(); @@ -2070,7 +2070,7 @@ asyncQueueReadAllNotifications(void) * * The current page must have been fetched into page_buffer from shared * memory. (We could access the page right in shared memory, but that - * would imply holding the AsyncCtlLock throughout this routine.) + * would imply holding the NotifySLRULock throughout this routine.) * * We stop if we reach the "stop" position, or reach a notification from an * uncommitted transaction, or reach the end of the page. @@ -2177,7 +2177,7 @@ asyncQueueAdvanceTail(void) int newtailpage; int boundary; - LWLockAcquire(AsyncQueueLock, LW_EXCLUSIVE); + LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); min = QUEUE_HEAD; for (BackendId i = QUEUE_FIRST_LISTENER; i > 0; i = QUEUE_NEXT_LISTENER(i)) { @@ -2186,7 +2186,7 @@ asyncQueueAdvanceTail(void) } oldtailpage = QUEUE_POS_PAGE(QUEUE_TAIL); QUEUE_TAIL = min; - LWLockRelease(AsyncQueueLock); + LWLockRelease(NotifyQueueLock); /* * We can truncate something if the global tail advanced across an SLRU @@ -2200,10 +2200,10 @@ asyncQueueAdvanceTail(void) if (asyncQueuePagePrecedes(oldtailpage, boundary)) { /* - * SimpleLruTruncate() will ask for AsyncCtlLock but will also release - * the lock again. + * SimpleLruTruncate() will ask for NotifySLRULock but will also + * release the lock again. */ - SimpleLruTruncate(AsyncCtl, newtailpage); + SimpleLruTruncate(NotifyCtl, newtailpage); } } |