diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/xlog.c | 821 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lwlock.c | 262 | ||||
-rw-r--r-- | src/backend/utils/misc/guc.c | 6 | ||||
-rw-r--r-- | src/include/access/xlog.h | 2 | ||||
-rw-r--r-- | src/include/storage/lwlock.h | 6 |
5 files changed, 502 insertions, 595 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index a2577314bcf..f9d6bf4ce53 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -89,7 +89,7 @@ int sync_method = DEFAULT_SYNC_METHOD; int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ -int num_xloginsert_slots = 8; +int num_xloginsert_locks = 8; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -292,7 +292,7 @@ XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; * (which is almost but not quite the same as a pointer to the most recent * CHECKPOINT record). We update this from the shared-memory copy, * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we - * hold an insertion slot). See XLogInsert for details. We are also allowed + * hold an insertion lock). See XLogInsert for details. We are also allowed * to update from XLogCtl->RedoRecPtr if we hold the info_lck; * see GetRedoRecPtr. A freshly spawned backend obtains the value during * InitXLOGAccess. @@ -364,63 +364,51 @@ typedef struct XLogwrtResult XLogRecPtr Flush; /* last byte + 1 flushed */ } XLogwrtResult; - /* - * A slot for inserting to the WAL. This is similar to an LWLock, the main - * difference is that there is an extra xlogInsertingAt field that is protected - * by the same mutex. Unlike an LWLock, a slot can only be acquired in - * exclusive mode. - * - * The xlogInsertingAt field is used to advertise to other processes how far - * the slot owner has progressed in inserting the record. When a backend - * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't - * yet know where it's going to insert the record. That's conservative - * but correct; the new insertion is certainly going to go to a byte position - * greater than 1. If another backend needs to flush the WAL, it will have to - * wait for the new insertion. xlogInsertingAt is updated after finishing the - * insert or when crossing a page boundary, which will wake up anyone waiting - * for it, whether the wait was necessary in the first place or not. - * - * A process can wait on a slot in two modes: LW_EXCLUSIVE or - * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is - * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes - * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is - * released, or xlogInsertingAt is updated. In other words, a process in - * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress - * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to - * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end. - * - * To join the wait queue, a process must set MyProc->lwWaitMode to the mode - * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head - * or tail of the wait queue. The same mechanism is used to wait on an LWLock, - * see lwlock.c for details. + * Inserting to WAL is protected by a small fixed number of WAL insertion + * locks. To insert to the WAL, you must hold one of the locks - it doesn't + * matter which one. To lock out other concurrent insertions, you must hold + * of them. Each WAL insertion lock consists of a lightweight lock, plus an + * indicator of how far the insertion has progressed (insertingAt). + * + * The insertingAt values are read when a process wants to flush WAL from + * the in-memory buffers to disk, to check that all the insertions to the + * region the process is about to write out have finished. You could simply + * wait for all currently in-progress insertions to finish, but the + * insertingAt indicator allows you to ignore insertions to later in the WAL, + * so that you only wait for the insertions that are modifying the buffers + * you're about to write out. + * + * This isn't just an optimization. If all the WAL buffers are dirty, an + * inserter that's holding a WAL insert lock might need to evict an old WAL + * buffer, which requires flushing the WAL. If it's possible for an inserter + * to block on another inserter unnecessarily, deadlock can arise when two + * inserters holding a WAL insert lock wait for each other to finish their + * insertion. + * + * Small WAL records that don't cross a page boundary never update the value, + * the WAL record is just copied to the page and the lock is released. But + * to avoid the deadlock-scenario explained above, the indicator is always + * updated before sleeping while holding an insertion lock. */ typedef struct { - slock_t mutex; /* protects the below fields */ - XLogRecPtr xlogInsertingAt; /* insert has completed up to this point */ - - PGPROC *owner; /* for debugging purposes */ - - bool releaseOK; /* T if ok to release waiters */ - char exclusive; /* # of exclusive holders (0 or 1) */ - PGPROC *head; /* head of list of waiting PGPROCs */ - PGPROC *tail; /* tail of list of waiting PGPROCs */ - /* tail is undefined when head is NULL */ -} XLogInsertSlot; + LWLock lock; + XLogRecPtr insertingAt; +} WALInsertLock; /* - * All the slots are allocated as an array in shared memory. We force the - * array stride to be a power of 2, which saves a few cycles in indexing, but - * more importantly also ensures that individual slots don't cross cache line - * boundaries. (Of course, we have to also ensure that the array start - * address is suitably aligned.) + * All the WAL insertion locks are allocated as an array in shared memory. We + * force the array stride to be a power of 2, which saves a few cycles in + * indexing, but more importantly also ensures that individual slots don't + * cross cache line boundaries. (Of course, we have to also ensure that the + * array start address is suitably aligned.) */ -typedef union XLogInsertSlotPadded +typedef union WALInsertLockPadded { - XLogInsertSlot slot; + WALInsertLock l; char pad[CACHE_LINE_SIZE]; -} XLogInsertSlotPadded; +} WALInsertLockPadded; /* * Shared state data for XLogInsert. @@ -455,8 +443,8 @@ typedef struct XLogCtlInsert * we must WAL-log it before it actually affects WAL-logging by backends. * Checkpointer sets at startup or after SIGHUP. * - * To read these fields, you must hold an insertion slot. To modify them, - * you must hold ALL the slots. + * To read these fields, you must hold an insertion lock. To modify them, + * you must hold ALL the locks. */ XLogRecPtr RedoRecPtr; /* current redo point for insertions */ bool forcePageWrites; /* forcing full-page writes for PITR? */ @@ -473,8 +461,12 @@ typedef struct XLogCtlInsert int nonExclusiveBackups; XLogRecPtr lastBackupStart; - /* insertion slots, see XLogInsertSlot struct above for details */ - XLogInsertSlotPadded *insertSlots; + /* + * WAL insertion locks. + */ + WALInsertLockPadded *WALInsertLocks; + LWLockTranche WALInsertLockTranche; + int WALInsertLockTrancheId; } XLogCtlInsert; /* @@ -612,6 +604,9 @@ typedef struct XLogCtlData static XLogCtlData *XLogCtl = NULL; +/* a private copy of XLogCtl->Insert.WALInsertLocks, for convenience */ +static WALInsertLockPadded *WALInsertLocks = NULL; + /* * We maintain an image of pg_control in shared memory. */ @@ -735,9 +730,9 @@ static bool InRedo = false; /* Have we launched bgwriter during recovery? */ static bool bgwriterLaunched = false; -/* For WALInsertSlotAcquire/Release functions */ -static int MySlotNo = 0; -static bool holdingAllSlots = false; +/* For WALInsertLockAcquire/Release functions */ +static int MyLockNo = 0; +static bool holdingAllLocks = false; static void readRecoveryCommandFile(void); static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo); @@ -811,16 +806,15 @@ static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); -static void WakeupWaiters(XLogRecPtr EndPos); static char *GetXLogBuffer(XLogRecPtr ptr); static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr); -static void WALInsertSlotAcquire(bool exclusive); -static void WALInsertSlotAcquireOne(int slotno); -static void WALInsertSlotRelease(void); -static void WALInsertSlotReleaseOne(int slotno); +static void WALInsertLockAcquire(void); +static void WALInsertLockAcquireExclusive(void); +static void WALInsertLockRelease(void); +static void WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt); /* * Insert an XLOG record having the specified RMID and info bytes, @@ -897,7 +891,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) * * We may have to loop back to here if a race condition is detected below. * We could prevent the race by doing all this work while holding an - * insertion slot, but it seems better to avoid doing CRC calculations + * insertion lock, but it seems better to avoid doing CRC calculations * while holding one. * * We add entries for backup blocks to the chain, so that they don't need @@ -915,8 +909,8 @@ begin:; /* * Decide if we need to do full-page writes in this XLOG record: true if * full_page_writes is on or we have a PITR request for it. Since we - * don't yet have an insertion slot, fullPageWrites and forcePageWrites - * could change under us, but we'll recheck them once we have a slot. + * don't yet have an insertion lock, fullPageWrites and forcePageWrites + * could change under us, but we'll recheck them once we have a lock. */ doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites; @@ -1090,16 +1084,15 @@ begin:; * record in place. This can be done concurrently in multiple processes. * * To keep track of which insertions are still in-progress, each concurrent - * inserter allocates an "insertion slot", which tells others how far the - * inserter has progressed. There is a small fixed number of insertion - * slots, determined by the num_xloginsert_slots GUC. When an inserter - * finishes, it updates the xlogInsertingAt of its slot to the end of the - * record it inserted, to let others know that it's done. xlogInsertingAt - * is also updated when crossing over to a new WAL buffer, to allow the - * the previous buffer to be flushed. + * inserter acquires an insertion lock. In addition to just indicating that + * an insertion is in progress, the lock tells others how far the inserter + * has progressed. There is a small fixed number of insertion locks, + * determined by the num_xloginsert_locks GUC. When an inserter crosses a + * page boundary, it updates the value stored in the lock to the how far it + * has inserted, to allow the the previous buffer to be flushed. * - * Holding onto a slot also protects RedoRecPtr and fullPageWrites from - * changing until the insertion is finished. + * Holding onto an insertion lock also protects RedoRecPtr and + * fullPageWrites from changing until the insertion is finished. * * Step 2 can usually be done completely in parallel. If the required WAL * page is not initialized yet, you have to grab WALBufMappingLock to @@ -1109,7 +1102,10 @@ begin:; *---------- */ START_CRIT_SECTION(); - WALInsertSlotAcquire(isLogSwitch); + if (isLogSwitch) + WALInsertLockAcquireExclusive(); + else + WALInsertLockAcquire(); /* * Check to see if my RedoRecPtr is out of date. If so, may have to go @@ -1138,7 +1134,7 @@ begin:; * Oops, this buffer now needs to be backed up, but we * didn't think so above. Start over. */ - WALInsertSlotRelease(); + WALInsertLockRelease(); END_CRIT_SECTION(); rdt_lastnormal->next = NULL; info = info_orig; @@ -1157,7 +1153,7 @@ begin:; if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites) { /* Oops, must redo it with full-page data. */ - WALInsertSlotRelease(); + WALInsertLockRelease(); END_CRIT_SECTION(); rdt_lastnormal->next = NULL; info = info_orig; @@ -1205,7 +1201,7 @@ begin:; /* * Done! Let others know that we're finished. */ - WALInsertSlotRelease(); + WALInsertLockRelease(); MarkCurrentTransactionIdLoggedIfAny(); @@ -1366,7 +1362,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) /* * These calculations are a bit heavy-weight to be done while holding a - * spinlock, but since we're holding all the WAL insertion slots, there + * spinlock, but since we're holding all the WAL insertion locks, there * are no other inserters competing for it. GetXLogInsertRecPtr() does * compete for it, but that's not called very frequently. */ @@ -1526,7 +1522,7 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, while (CurrPos < EndPos) { /* initialize the next page (if not initialized already) */ - WakeupWaiters(CurrPos); + WALInsertLockUpdateInsertingAt(CurrPos); AdvanceXLInsertBuffer(CurrPos, false); CurrPos += XLOG_BLCKSZ; } @@ -1537,452 +1533,123 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, } /* - * Allocate a slot for insertion. - * - * In exclusive mode, all slots are reserved for the current process. That - * blocks all concurrent insertions. + * Acquire a WAL insertion lock, for inserting to WAL. */ static void -WALInsertSlotAcquire(bool exclusive) +WALInsertLockAcquire(void) { - int i; - - if (exclusive) - { - for (i = 0; i < num_xloginsert_slots; i++) - WALInsertSlotAcquireOne(i); - holdingAllSlots = true; - } - else - WALInsertSlotAcquireOne(-1); -} - -/* - * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary - * one if slotno == -1. The index of the slot that was acquired is stored in - * MySlotNo. - * - * This is more or less equivalent to LWLockAcquire(). - */ -static void -WALInsertSlotAcquireOne(int slotno) -{ - volatile XLogInsertSlot *slot; - PGPROC *proc = MyProc; - bool retry = false; - int extraWaits = 0; - static int slotToTry = -1; + bool immed; /* - * Try to use the slot we used last time. If the system isn't particularly - * busy, it's a good bet that it's available, and it's good to have some - * affinity to a particular slot so that you don't unnecessarily bounce - * cache lines between processes when there is no contention. + * It doesn't matter which of the WAL insertion locks we acquire, so try + * the one we used last time. If the system isn't particularly busy, + * it's a good bet that it's still available, and it's good to have some + * affinity to a particular lock so that you don't unnecessarily bounce + * cache lines between processes when there's no contention. * - * If this is the first time through in this backend, pick a slot - * (semi-)randomly. This allows the slots to be used evenly if you have a - * lot of very short connections. + * If this is the first time through in this backend, pick a lock + * (semi-)randomly. This allows the locks to be used evenly if you have + * a lot of very short connections. */ - if (slotno != -1) - MySlotNo = slotno; - else - { - if (slotToTry == -1) - slotToTry = MyProc->pgprocno % num_xloginsert_slots; - MySlotNo = slotToTry; - } + static int lockToTry = -1; - /* - * We can't wait if we haven't got a PGPROC. This should only occur - * during bootstrap or shared memory initialization. Put an Assert here - * to catch unsafe coding practices. - */ - Assert(MyProc != NULL); - - /* - * Lock out cancel/die interrupts until we exit the code section protected - * by the slot. This ensures that interrupts will not interfere with - * manipulations of data structures in shared memory. There is no cleanup - * mechanism to release the slot if the backend dies while holding one, - * so make this a critical section. - */ - START_CRIT_SECTION(); + if (lockToTry == -1) + lockToTry = MyProc->pgprocno % num_xloginsert_locks; + MyLockNo = lockToTry; /* - * Loop here to try to acquire slot after each time we are signaled by - * WALInsertSlotRelease. + * The insertingAt value is initially set to 0, as we don't know our + * insert location yet. */ - for (;;) + immed = LWLockAcquireWithVar(&WALInsertLocks[MyLockNo].l.lock, + &WALInsertLocks[MyLockNo].l.insertingAt, + 0); + if (!immed) { - bool mustwait; - - slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot; - - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&slot->mutex); - - /* If retrying, allow WALInsertSlotRelease to release waiters again */ - if (retry) - slot->releaseOK = true; - - /* If I can get the slot, do so quickly. */ - if (slot->exclusive == 0) - { - slot->exclusive++; - mustwait = false; - } - else - mustwait = true; - - if (!mustwait) - break; /* got the lock */ - - Assert(slot->owner != MyProc); - - /* - * Add myself to wait queue. - */ - proc->lwWaiting = true; - proc->lwWaitMode = LW_EXCLUSIVE; - proc->lwWaitLink = NULL; - if (slot->head == NULL) - slot->head = proc; - else - slot->tail->lwWaitLink = proc; - slot->tail = proc; - - /* Can release the mutex now */ - SpinLockRelease(&slot->mutex); - /* - * Wait until awakened. - * - * Since we share the process wait semaphore with the regular lock - * manager and ProcWaitForSignal, and we may need to acquire a slot - * while one of those is pending, it is possible that we get awakened - * for a reason other than being signaled by WALInsertSlotRelease. If - * so, loop back and wait again. Once we've gotten the slot, - * re-increment the sema by the number of additional signals received, - * so that the lock manager or signal manager will see the received - * signal when it next waits. + * If we couldn't get the lock immediately, try another lock next + * time. On a system with more insertion locks than concurrent + * inserters, this causes all the inserters to eventually migrate + * to a lock that no-one else is using. On a system with more + * inserters than locks, it still helps to distribute the inserters + * evenly across the locks. */ - for (;;) - { - /* "false" means cannot accept cancel/die interrupt here. */ - PGSemaphoreLock(&proc->sem, false); - if (!proc->lwWaiting) - break; - extraWaits++; - } - - /* Now loop back and try to acquire lock again. */ - retry = true; + lockToTry = (lockToTry + 1) % num_xloginsert_locks; } - - slot->owner = proc; - - /* - * Normally, we initialize the xlogInsertingAt value of the slot to 1, - * because we don't yet know where in the WAL we're going to insert. It's - * not critical what it points to right now - leaving it to a too small - * value just means that WaitXlogInsertionsToFinish() might wait on us - * unnecessarily, until we update the value (when we finish the insert or - * move to next page). - * - * If we're grabbing all the slots, however, stamp all but the last one - * with InvalidXLogRecPtr, meaning there is no insert in progress. The last - * slot is the one that we will update as we proceed with the insert, the - * rest are held just to keep off other inserters. - */ - if (slotno != -1 && slotno != num_xloginsert_slots - 1) - slot->xlogInsertingAt = InvalidXLogRecPtr; - else - slot->xlogInsertingAt = 1; - - /* We are done updating shared state of the slot itself. */ - SpinLockRelease(&slot->mutex); - - /* - * Fix the process wait semaphore's count for any absorbed wakeups. - */ - while (extraWaits-- > 0) - PGSemaphoreUnlock(&proc->sem); - - /* - * If we couldn't get the slot immediately, try another slot next time. - * On a system with more insertion slots than concurrent inserters, this - * causes all the inserters to eventually migrate to a slot that no-one - * else is using. On a system with more inserters than slots, it still - * causes the inserters to be distributed quite evenly across the slots. - */ - if (slotno != -1 && retry) - slotToTry = (slotToTry + 1) % num_xloginsert_slots; } /* - * Wait for the given slot to become free, or for its xlogInsertingAt location - * to change to something else than 'waitptr'. In other words, wait for the - * inserter using the given slot to finish its insertion, or to at least make - * some progress. + * Acquire all WAL insertion locks, to prevent other backends from inserting + * to WAL. */ static void -WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr) +WALInsertLockAcquireExclusive(void) { - PGPROC *proc = MyProc; - int extraWaits = 0; - - /* - * Lock out cancel/die interrupts while we sleep on the slot. There is - * no cleanup mechanism to remove us from the wait queue if we got - * interrupted. - */ - HOLD_INTERRUPTS(); + int i; /* - * Loop here to try to acquire lock after each time we are signaled. + * When holding all the locks, we only update the last lock's insertingAt + * indicator. The others are set to 0xFFFFFFFFFFFFFFFF, which is higher + * than any real XLogRecPtr value, to make sure that no-one blocks + * waiting on those. */ - for (;;) + for (i = 0; i < num_xloginsert_locks - 1; i++) { - bool mustwait; - - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&slot->mutex); - - /* If I can get the lock, do so quickly. */ - if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr) - mustwait = false; - else - mustwait = true; - - if (!mustwait) - break; /* the lock was free */ - - Assert(slot->owner != MyProc); - - /* - * Add myself to wait queue. - */ - proc->lwWaiting = true; - proc->lwWaitMode = LW_WAIT_UNTIL_FREE; - proc->lwWaitLink = NULL; - - /* waiters are added to the front of the queue */ - proc->lwWaitLink = slot->head; - if (slot->head == NULL) - slot->tail = proc; - slot->head = proc; - - /* Can release the mutex now */ - SpinLockRelease(&slot->mutex); - - /* - * Wait until awakened. - * - * Since we share the process wait semaphore with other things, like - * the regular lock manager and ProcWaitForSignal, and we may need to - * acquire an LWLock while one of those is pending, it is possible that - * we get awakened for a reason other than being signaled by - * LWLockRelease. If so, loop back and wait again. Once we've gotten - * the LWLock, re-increment the sema by the number of additional - * signals received, so that the lock manager or signal manager will - * see the received signal when it next waits. - */ - for (;;) - { - /* "false" means cannot accept cancel/die interrupt here. */ - PGSemaphoreLock(&proc->sem, false); - if (!proc->lwWaiting) - break; - extraWaits++; - } - - /* Now loop back and try to acquire lock again. */ + LWLockAcquireWithVar(&WALInsertLocks[i].l.lock, + &WALInsertLocks[i].l.insertingAt, + UINT64CONST(0xFFFFFFFFFFFFFFFF)); } + LWLockAcquireWithVar(&WALInsertLocks[i].l.lock, + &WALInsertLocks[i].l.insertingAt, + 0); - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&slot->mutex); - - /* - * Fix the process wait semaphore's count for any absorbed wakeups. - */ - while (extraWaits-- > 0) - PGSemaphoreUnlock(&proc->sem); - - /* - * Now okay to allow cancel/die interrupts. - */ - RESUME_INTERRUPTS(); + holdingAllLocks = true; } /* - * Wake up all processes waiting for us with WaitOnSlot(). Sets our - * xlogInsertingAt value to EndPos, without releasing the slot. + * Release our insertion lock (or locks, if we're holding them all). */ static void -WakeupWaiters(XLogRecPtr EndPos) +WALInsertLockRelease(void) { - volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot; - PGPROC *head; - PGPROC *proc; - PGPROC *next; - - /* - * If we have already reported progress up to the same point, do nothing. - * No other process can modify xlogInsertingAt, so we can check this before - * grabbing the spinlock. - */ - if (slot->xlogInsertingAt == EndPos) - return; - /* xlogInsertingAt should not go backwards */ - Assert(slot->xlogInsertingAt < EndPos); - - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&slot->mutex); - - /* we should own the slot */ - Assert(slot->exclusive == 1 && slot->owner == MyProc); - - slot->xlogInsertingAt = EndPos; - - /* - * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken - * up. They are always in the front of the queue. - */ - head = slot->head; - - if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE) + if (holdingAllLocks) { - proc = head; - next = proc->lwWaitLink; - while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE) - { - proc = next; - next = next->lwWaitLink; - } + int i; + + for (i = 0; i < num_xloginsert_locks; i++) + LWLockRelease(&WALInsertLocks[i].l.lock); - /* proc is now the last PGPROC to be released */ - slot->head = next; - proc->lwWaitLink = NULL; + holdingAllLocks = false; } else - head = NULL; - - /* We are done updating shared state of the lock itself. */ - SpinLockRelease(&slot->mutex); - - /* - * Awaken any waiters I removed from the queue. - */ - while (head != NULL) { - proc = head; - head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - proc->lwWaiting = false; - PGSemaphoreUnlock(&proc->sem); + LWLockRelease(&WALInsertLocks[MyLockNo].l.lock); } } /* - * Release our insertion slot (or slots, if we're holding them all). + * Update our insertingAt value, to let others know that we've finished + * inserting up to that point. */ static void -WALInsertSlotRelease(void) +WALInsertLockUpdateInsertingAt(XLogRecPtr insertingAt) { - int i; - - if (holdingAllSlots) + if (holdingAllLocks) { - for (i = 0; i < num_xloginsert_slots; i++) - WALInsertSlotReleaseOne(i); - holdingAllSlots = false; + /* + * We use the last lock to mark our actual position, see comments in + * WALInsertLockAcquireExclusive. + */ + LWLockUpdateVar(&WALInsertLocks[num_xloginsert_locks - 1].l.lock, + &WALInsertLocks[num_xloginsert_locks - 1].l.insertingAt, + insertingAt); } else - WALInsertSlotReleaseOne(MySlotNo); -} - -static void -WALInsertSlotReleaseOne(int slotno) -{ - volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot; - PGPROC *head; - PGPROC *proc; - - /* Acquire mutex. Time spent holding mutex should be short! */ - SpinLockAcquire(&slot->mutex); - - /* we must be holding it */ - Assert(slot->exclusive == 1 && slot->owner == MyProc); - - slot->xlogInsertingAt = InvalidXLogRecPtr; - - /* Release my hold on the slot */ - slot->exclusive = 0; - slot->owner = NULL; - - /* - * See if I need to awaken any waiters.. - */ - head = slot->head; - if (head != NULL) - { - if (slot->releaseOK) - { - /* - * Remove the to-be-awakened PGPROCs from the queue. - */ - bool releaseOK = true; - - proc = head; - - /* - * First wake up any backends that want to be woken up without - * acquiring the lock. These are always in the front of the queue. - */ - while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink) - proc = proc->lwWaitLink; - - /* - * Awaken the first exclusive-waiter, if any. - */ - if (proc->lwWaitLink) - { - Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE); - proc = proc->lwWaitLink; - releaseOK = false; - } - /* proc is now the last PGPROC to be released */ - slot->head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - - slot->releaseOK = releaseOK; - } - else - head = NULL; - } - - /* We are done updating shared state of the slot itself. */ - SpinLockRelease(&slot->mutex); - - /* - * Awaken any waiters I removed from the queue. - */ - while (head != NULL) - { - proc = head; - head = proc->lwWaitLink; - proc->lwWaitLink = NULL; - proc->lwWaiting = false; - PGSemaphoreUnlock(&proc->sem); - } - - /* - * Now okay to allow cancel/die interrupts. - */ - END_CRIT_SECTION(); + LWLockUpdateVar(&WALInsertLocks[MyLockNo].l.lock, + &WALInsertLocks[MyLockNo].l.insertingAt, + insertingAt); } - /* * Wait for any WAL insertions < upto to finish. * @@ -2032,79 +1699,49 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) } /* + * Loop through all the locks, sleeping on any in-progress insert older + * than 'upto'. + * * finishedUpto is our return value, indicating the point upto which * all the WAL insertions have been finished. Initialize it to the head - * of reserved WAL, and as we iterate through the insertion slots, back it + * of reserved WAL, and as we iterate through the insertion locks, back it * out for any insertion that's still in progress. */ finishedUpto = reservedUpto; - - /* - * Loop through all the slots, sleeping on any in-progress insert older - * than 'upto'. - */ - for (i = 0; i < num_xloginsert_slots; i++) + for (i = 0; i < num_xloginsert_locks; i++) { - volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot; - XLogRecPtr insertingat; - - retry: - /* - * We can check if the slot is in use without grabbing the spinlock. - * The spinlock acquisition of insertpos_lck before this loop acts - * as a memory barrier. If someone acquires the slot after that, it - * can't possibly be inserting to anything < reservedUpto. If it was - * acquired before that, an unlocked test will return true. - */ - if (!slot->exclusive) - continue; - - SpinLockAcquire(&slot->mutex); - /* re-check now that we have the lock */ - if (!slot->exclusive) - { - SpinLockRelease(&slot->mutex); - continue; - } - insertingat = slot->xlogInsertingAt; - SpinLockRelease(&slot->mutex); - - if (insertingat == InvalidXLogRecPtr) + XLogRecPtr insertingat = InvalidXLogRecPtr; + do { /* - * slot is reserved just to hold off other inserters, there is no - * actual insert in progress. + * See if this insertion is in progress. LWLockWait will wait for + * the lock to be released, or for the 'value' to be set by a + * LWLockUpdateVar call. When a lock is initially acquired, its + * value is 0 (InvalidXLogRecPtr), which means that we don't know + * where it's inserting yet. We will have to wait for it. If + * it's a small insertion, the record will most likely fit on the + * same page and the inserter will release the lock without ever + * calling LWLockUpdateVar. But if it has to sleep, it will + * advertise the insertion point with LWLockUpdateVar before + * sleeping. */ - continue; - } + if (LWLockWaitForVar(&WALInsertLocks[i].l.lock, + &WALInsertLocks[i].l.insertingAt, + insertingat, &insertingat)) + { + /* the lock was free, so no insertion in progress */ + insertingat = InvalidXLogRecPtr; + break; + } - /* - * This insertion is still in progress. Do we need to wait for it? - * - * When an inserter acquires a slot, it doesn't reset 'insertingat', so - * it will initially point to the old value of some already-finished - * insertion. The inserter will update the value as soon as it finishes - * the insertion, moves to the next page, or has to do I/O to flush an - * old dirty buffer. That means that when we see a slot with - * insertingat value < upto, we don't know if that insertion is still - * truly in progress, or if the slot is reused by a new inserter that - * hasn't updated the insertingat value yet. We have to assume it's the - * latter, and wait. - */ - if (insertingat < upto) - { - WaitOnSlot(slot, insertingat); - goto retry; - } - else - { /* - * We don't need to wait for this insertion, but update the - * return value. + * This insertion is still in progress. Have to wait, unless the + * inserter has proceeded past 'upto'. */ - if (insertingat < finishedUpto) - finishedUpto = insertingat; - } + } while (insertingat < upto); + + if (insertingat != InvalidXLogRecPtr && insertingat < finishedUpto) + finishedUpto = insertingat; } return finishedUpto; } @@ -2118,8 +1755,8 @@ WaitXLogInsertionsToFinish(XLogRecPtr upto) * * The caller must ensure that the page containing the requested location * isn't evicted yet, and won't be evicted. The way to ensure that is to - * hold onto an XLogInsertSlot with the xlogInsertingAt position set to - * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs + * hold onto a WAL insertion lock with the insertingAt position set to + * something <= ptr. GetXLogBuffer() will update insertingAt if it needs * to evict an old page from the buffer. (This means that once you call * GetXLogBuffer() with a given 'ptr', you must not access anything before * that point anymore, and must not call GetXLogBuffer() with an older 'ptr' @@ -2179,7 +1816,7 @@ GetXLogBuffer(XLogRecPtr ptr) * Let others know that we're finished inserting the record up * to the page boundary. */ - WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ); + WALInsertLockUpdateInsertingAt(expectedEndPtr - XLOG_BLCKSZ); AdvanceXLInsertBuffer(ptr, false); endptr = XLogCtl->xlblocks[idx]; @@ -5117,8 +4754,8 @@ XLOGShmemSize(void) /* XLogCtl */ size = sizeof(XLogCtlData); - /* xlog insertion slots, plus alignment */ - size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1)); + /* WAL insertion locks, plus alignment */ + size = add_size(size, mul_size(sizeof(WALInsertLockPadded), num_xloginsert_locks + 1)); /* xlblocks array */ size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers)); /* extra alignment padding for XLOG I/O buffers */ @@ -5166,11 +4803,27 @@ XLOGShmemInit(void) memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers); allocptr += sizeof(XLogRecPtr) * XLOGbuffers; - /* Xlog insertion slots. Ensure they're aligned to the full padded size */ - allocptr += sizeof(XLogInsertSlotPadded) - - ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded); - XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr; - allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots; + + /* WAL insertion locks. Ensure they're aligned to the full padded size */ + allocptr += sizeof(WALInsertLockPadded) - + ((uintptr_t) allocptr) % sizeof(WALInsertLockPadded); + WALInsertLocks = XLogCtl->Insert.WALInsertLocks = + (WALInsertLockPadded *) allocptr; + allocptr += sizeof(WALInsertLockPadded) * num_xloginsert_locks; + + XLogCtl->Insert.WALInsertLockTrancheId = LWLockNewTrancheId(); + + XLogCtl->Insert.WALInsertLockTranche.name = "WALInsertLocks"; + XLogCtl->Insert.WALInsertLockTranche.array_base = WALInsertLocks; + XLogCtl->Insert.WALInsertLockTranche.array_stride = sizeof(WALInsertLockPadded); + + LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId, &XLogCtl->Insert.WALInsertLockTranche); + for (i = 0; i < num_xloginsert_locks; i++) + { + LWLockInitialize(&WALInsertLocks[i].l.lock, + XLogCtl->Insert.WALInsertLockTrancheId); + WALInsertLocks[i].l.insertingAt = InvalidXLogRecPtr; + } /* * Align the start of the page buffers to a full xlog block size boundary. @@ -5190,19 +4843,6 @@ XLOGShmemInit(void) XLogCtl->SharedHotStandbyActive = false; XLogCtl->WalWriterSleeping = false; - for (i = 0; i < num_xloginsert_slots; i++) - { - XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot; - SpinLockInit(&slot->mutex); - slot->xlogInsertingAt = InvalidXLogRecPtr; - slot->owner = NULL; - - slot->releaseOK = true; - slot->exclusive = 0; - slot->head = NULL; - slot->tail = NULL; - } - SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); SpinLockInit(&XLogCtl->ulsn_lck); @@ -7925,6 +7565,11 @@ InitXLOGAccess(void) ThisTimeLineID = XLogCtl->ThisTimeLineID; Assert(ThisTimeLineID != 0 || IsBootstrapProcessingMode()); + /* Initialize our copy of WALInsertLocks and register the tranche */ + WALInsertLocks = XLogCtl->Insert.WALInsertLocks; + LWLockRegisterTranche(XLogCtl->Insert.WALInsertLockTrancheId, + &XLogCtl->Insert.WALInsertLockTranche); + /* Use GetRedoRecPtr to copy the RedoRecPtr safely */ (void) GetRedoRecPtr(); } @@ -7943,7 +7588,7 @@ GetRedoRecPtr(void) /* * The possibly not up-to-date copy in XlogCtl is enough. Even if we - * grabbed a WAL insertion slot to read the master copy, someone might + * grabbed a WAL insertion lock to read the master copy, someone might * update it just after we've released the lock. */ SpinLockAcquire(&xlogctl->info_lck); @@ -7961,7 +7606,7 @@ GetRedoRecPtr(void) * * NOTE: The value *actually* returned is the position of the last full * xlog page. It lags behind the real insert position by at most 1 page. - * For that, we don't need to scan through WAL insertion slots, and an + * For that, we don't need to scan through WAL insertion locks, and an * approximation is enough for the current usage of this function. */ XLogRecPtr @@ -8322,7 +7967,7 @@ CreateCheckPoint(int flags) * We must block concurrent insertions while examining insert state to * determine the checkpoint REDO pointer. */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos); /* @@ -8347,7 +7992,7 @@ CreateCheckPoint(int flags) MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) && ControlFile->checkPoint == ControlFile->checkPointCopy.redo) { - WALInsertSlotRelease(); + WALInsertLockRelease(); LWLockRelease(CheckpointLock); END_CRIT_SECTION(); return; @@ -8391,7 +8036,7 @@ CreateCheckPoint(int flags) /* * Here we update the shared RedoRecPtr for future XLogInsert calls; this - * must be done while holding the insertion slots. + * must be done while holding all the insertion locks. * * Note: if we fail to complete the checkpoint, RedoRecPtr will be left * pointing past where it really needs to point. This is okay; the only @@ -8403,10 +8048,10 @@ CreateCheckPoint(int flags) RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo; /* - * Now we can release the WAL insertion slots, allowing other xacts to + * Now we can release the WAL insertion locks, allowing other xacts to * proceed while we are flushing disk buffers. */ - WALInsertSlotRelease(); + WALInsertLockRelease(); /* Update the info_lck-protected copy of RedoRecPtr as well */ SpinLockAcquire(&xlogctl->info_lck); @@ -8436,7 +8081,7 @@ CreateCheckPoint(int flags) * we wait till he's out of his commit critical section before proceeding. * See notes in RecordTransactionCommit(). * - * Because we've already released the insertion slots, this test is a bit + * Because we've already released the insertion locks, this test is a bit * fuzzy: it is possible that we will wait for xacts we didn't really need * to wait for. But the delay should be short and it seems better to make * checkpoint take a bit longer than to hold off insertions longer than @@ -8667,10 +8312,10 @@ CreateEndOfRecoveryRecord(void) xlrec.end_time = time(NULL); - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); xlrec.ThisTimeLineID = ThisTimeLineID; xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID; - WALInsertSlotRelease(); + WALInsertLockRelease(); LocalSetXLogInsertAllowed(); @@ -8856,9 +8501,9 @@ CreateRestartPoint(int flags) * during recovery this is just pro forma, because no WAL insertions are * happening. */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo; - WALInsertSlotRelease(); + WALInsertLockRelease(); /* Also update the info_lck-protected copy */ SpinLockAcquire(&xlogctl->info_lck); @@ -9318,9 +8963,9 @@ UpdateFullPageWrites(void) */ if (fullPageWrites) { - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); Insert->fullPageWrites = true; - WALInsertSlotRelease(); + WALInsertLockRelease(); } /* @@ -9341,9 +8986,9 @@ UpdateFullPageWrites(void) if (!fullPageWrites) { - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); Insert->fullPageWrites = false; - WALInsertSlotRelease(); + WALInsertLockRelease(); } END_CRIT_SECTION(); } @@ -9974,15 +9619,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, * Note that forcePageWrites has no effect during an online backup from * the standby. * - * We must hold all the insertion slots to change the value of + * We must hold all the insertion locks to change the value of * forcePageWrites, to ensure adequate interlocking against XLogInsert(). */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); if (exclusive) { if (XLogCtl->Insert.exclusiveBackup) { - WALInsertSlotRelease(); + WALInsertLockRelease(); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("a backup is already in progress"), @@ -9993,7 +9638,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, else XLogCtl->Insert.nonExclusiveBackups++; XLogCtl->Insert.forcePageWrites = true; - WALInsertSlotRelease(); + WALInsertLockRelease(); /* Ensure we release forcePageWrites if fail below */ PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive)); @@ -10108,13 +9753,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, * taking a checkpoint right after another is not that expensive * either because only few buffers have been dirtied yet. */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); if (XLogCtl->Insert.lastBackupStart < startpoint) { XLogCtl->Insert.lastBackupStart = startpoint; gotUniqueStartpoint = true; } - WALInsertSlotRelease(); + WALInsertLockRelease(); } while (!gotUniqueStartpoint); XLByteToSeg(startpoint, _logSegNo); @@ -10204,7 +9849,7 @@ pg_start_backup_callback(int code, Datum arg) bool exclusive = DatumGetBool(arg); /* Update backup counters and forcePageWrites on failure */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); if (exclusive) { Assert(XLogCtl->Insert.exclusiveBackup); @@ -10221,7 +9866,7 @@ pg_start_backup_callback(int code, Datum arg) { XLogCtl->Insert.forcePageWrites = false; } - WALInsertSlotRelease(); + WALInsertLockRelease(); } /* @@ -10290,7 +9935,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) /* * OK to update backup counters and forcePageWrites */ - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); if (exclusive) XLogCtl->Insert.exclusiveBackup = false; else @@ -10310,7 +9955,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) { XLogCtl->Insert.forcePageWrites = false; } - WALInsertSlotRelease(); + WALInsertLockRelease(); if (exclusive) { @@ -10595,7 +10240,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) void do_pg_abort_backup(void) { - WALInsertSlotAcquire(true); + WALInsertLockAcquireExclusive(); Assert(XLogCtl->Insert.nonExclusiveBackups > 0); XLogCtl->Insert.nonExclusiveBackups--; @@ -10604,7 +10249,7 @@ do_pg_abort_backup(void) { XLogCtl->Insert.forcePageWrites = false; } - WALInsertSlotRelease(); + WALInsertLockRelease(); } /* diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index 82ef4409494..f9c9bb299f4 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -10,6 +10,13 @@ * locking should be done with the full lock manager --- which depends on * LWLocks to protect its shared state. * + * In addition to exclusive and shared modes, lightweight locks can be used + * to wait until a variable changes value. The variable is initially set + * when the lock is acquired with LWLockAcquireWithVar, and can be updated + * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar + * waits for the variable to be updated, or until the lock is free. The + * meaning of the variable is up to the caller, the lightweight lock code + * just assigns and compares it. * * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -78,6 +85,9 @@ static LWLock *held_lwlocks[MAX_SIMUL_LWLOCKS]; static int lock_addin_request = 0; static bool lock_addin_request_allowed = true; +static bool LWLockAcquireCommon(LWLock *l, LWLockMode mode, uint64 *valptr, + uint64 val); + #ifdef LWLOCK_STATS typedef struct lwlock_stats_key { @@ -443,16 +453,36 @@ LWLockInitialize(LWLock *lock, int tranche_id) /* * LWLockAcquire - acquire a lightweight lock in the specified mode * - * If the lock is not available, sleep until it is. + * If the lock is not available, sleep until it is. Returns true if the lock + * was available immediately, false if we had to sleep. * * Side effect: cancel/die interrupts are held off until lock release. */ -void +bool LWLockAcquire(LWLock *l, LWLockMode mode) { + return LWLockAcquireCommon(l, mode, NULL, 0); +} + +/* + * LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val + * + * The lock is always acquired in exclusive mode with this function. + */ +bool +LWLockAcquireWithVar(LWLock *l, uint64 *valptr, uint64 val) +{ + return LWLockAcquireCommon(l, LW_EXCLUSIVE, valptr, val); +} + +/* internal function to implement LWLockAcquire and LWLockAcquireWithVar */ +static bool +LWLockAcquireCommon(LWLock *l, LWLockMode mode, uint64 *valptr, uint64 val) +{ volatile LWLock *lock = l; PGPROC *proc = MyProc; bool retry = false; + bool result = true; int extraWaits = 0; #ifdef LWLOCK_STATS lwlock_stats *lwstats; @@ -601,8 +631,13 @@ LWLockAcquire(LWLock *l, LWLockMode mode) /* Now loop back and try to acquire lock again. */ retry = true; + result = false; } + /* If there's a variable associated with this lock, initialize it */ + if (valptr) + *valptr = val; + /* We are done updating shared state of the lock itself. */ SpinLockRelease(&lock->mutex); @@ -616,6 +651,8 @@ LWLockAcquire(LWLock *l, LWLockMode mode) */ while (extraWaits-- > 0) PGSemaphoreUnlock(&proc->sem); + + return result; } /* @@ -835,6 +872,227 @@ LWLockAcquireOrWait(LWLock *l, LWLockMode mode) } /* + * LWLockWaitForVar - Wait until lock is free, or a variable is updated. + * + * If the lock is held and *valptr equals oldval, waits until the lock is + * either freed, or the lock holder updates *valptr by calling + * LWLockUpdateVar. If the lock is free on exit (immediately or after + * waiting), returns true. If the lock is still held, but *valptr no longer + * matches oldval, returns false and sets *newval to the current value in + * *valptr. + * + * It's possible that the lock holder releases the lock, but another backend + * acquires it again before we get a chance to observe that the lock was + * momentarily released. We wouldn't need to wait for the new lock holder, + * but we cannot distinguish that case, so we will have to wait. + * + * Note: this function ignores shared lock holders; if the lock is held + * in shared mode, returns 'true'. + */ +bool +LWLockWaitForVar(LWLock *l, uint64 *valptr, uint64 oldval, uint64 *newval) +{ + volatile LWLock *lock = l; + volatile uint64 *valp = valptr; + PGPROC *proc = MyProc; + int extraWaits = 0; + bool result = false; + + /* + * Quick test first to see if it the slot is free right now. + * + * XXX: the caller uses a spinlock before this, so we don't need a memory + * barrier here as far as the current usage is concerned. But that might + * not be safe in general. + */ + if (lock->exclusive == 0) + return true; + + /* + * Lock out cancel/die interrupts while we sleep on the lock. There is + * no cleanup mechanism to remove us from the wait queue if we got + * interrupted. + */ + HOLD_INTERRUPTS(); + + /* + * Loop here to check the lock's status after each time we are signaled. + */ + for (;;) + { + bool mustwait; + uint64 value; + + /* Acquire mutex. Time spent holding mutex should be short! */ +#ifdef LWLOCK_STATS + lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex); +#else + SpinLockAcquire(&lock->mutex); +#endif + + /* Is the lock now free, and if not, does the value match? */ + if (lock->exclusive == 0) + { + result = true; + mustwait = false; + } + else + { + value = *valp; + if (value != oldval) + { + result = false; + mustwait = false; + *newval = value; + } + else + mustwait = true; + } + + if (!mustwait) + break; /* the lock was free or value didn't match */ + + /* + * Add myself to wait queue. + */ + proc->lwWaiting = true; + proc->lwWaitMode = LW_WAIT_UNTIL_FREE; + proc->lwWaitLink = NULL; + + /* waiters are added to the front of the queue */ + proc->lwWaitLink = lock->head; + if (lock->head == NULL) + lock->tail = proc; + lock->head = proc; + + /* Can release the mutex now */ + SpinLockRelease(&lock->mutex); + + /* + * Wait until awakened. + * + * Since we share the process wait semaphore with the regular lock + * manager and ProcWaitForSignal, and we may need to acquire an LWLock + * while one of those is pending, it is possible that we get awakened + * for a reason other than being signaled by LWLockRelease. If so, + * loop back and wait again. Once we've gotten the LWLock, + * re-increment the sema by the number of additional signals received, + * so that the lock manager or signal manager will see the received + * signal when it next waits. + */ + LOG_LWDEBUG("LWLockWaitForVar", T_NAME(l), T_ID(l), "waiting"); + +#ifdef LWLOCK_STATS + lwstats->block_count++; +#endif + + TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(l), T_ID(l), mode); + + for (;;) + { + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(l), T_ID(l), mode); + + LOG_LWDEBUG("LWLockWaitForVar", T_NAME(l), T_ID(l), "awakened"); + + /* Now loop back and check the status of the lock again. */ + } + + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&lock->mutex); + + TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(l), T_ID(l), mode); + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + + /* + * Now okay to allow cancel/die interrupts. + */ + RESUME_INTERRUPTS(); + + return result; +} + + +/* + * LWLockUpdateVar - Update a variable and wake up waiters atomically + * + * Sets *valptr to 'val', and wakes up all processes waiting for us with + * LWLockWaitForVar(). Setting the value and waking up the processes happen + * atomically so that any process calling LWLockWaitForVar() on the same lock + * is guaranteed to see the new value, and act accordingly. + * + * The caller must be holding the lock in exclusive mode. + */ +void +LWLockUpdateVar(LWLock *l, uint64 *valptr, uint64 val) +{ + volatile LWLock *lock = l; + volatile uint64 *valp = valptr; + PGPROC *head; + PGPROC *proc; + PGPROC *next; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&lock->mutex); + + /* we should hold the lock */ + Assert(lock->exclusive == 1); + + /* Update the lock's value */ + *valp = val; + + /* + * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken + * up. They are always in the front of the queue. + */ + head = lock->head; + + if (head != NULL && head->lwWaitMode == LW_WAIT_UNTIL_FREE) + { + proc = head; + next = proc->lwWaitLink; + while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE) + { + proc = next; + next = next->lwWaitLink; + } + + /* proc is now the last PGPROC to be released */ + lock->head = next; + proc->lwWaitLink = NULL; + } + else + head = NULL; + + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&lock->mutex); + + /* + * Awaken any waiters I removed from the queue. + */ + while (head != NULL) + { + proc = head; + head = proc->lwWaitLink; + proc->lwWaitLink = NULL; + proc->lwWaiting = false; + PGSemaphoreUnlock(&proc->sem); + } +} + + +/* * LWLockRelease - release a previously acquired lock */ void diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index da882b22dc6..2181a39853b 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -2120,12 +2120,12 @@ static struct config_int ConfigureNamesInt[] = }, { - {"xloginsert_slots", PGC_POSTMASTER, WAL_SETTINGS, - gettext_noop("Sets the number of slots for concurrent xlog insertions."), + {"xloginsert_locks", PGC_POSTMASTER, WAL_SETTINGS, + gettext_noop("Sets the number of locks used for concurrent xlog insertions."), NULL, GUC_NOT_IN_SAMPLE }, - &num_xloginsert_slots, + &num_xloginsert_locks, 8, 1, 1000, NULL, NULL, NULL }, diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 35092284664..56cfe63d8cf 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -192,7 +192,7 @@ extern bool EnableHotStandby; extern bool fullPageWrites; extern bool wal_log_hints; extern bool log_checkpoints; -extern int num_xloginsert_slots; +extern int num_xloginsert_locks; /* WAL levels */ typedef enum WalLevel diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 8840c791dd1..3a1953383e8 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -169,13 +169,17 @@ typedef enum LWLockMode extern bool Trace_lwlocks; #endif -extern void LWLockAcquire(LWLock *lock, LWLockMode mode); +extern bool LWLockAcquire(LWLock *lock, LWLockMode mode); extern bool LWLockConditionalAcquire(LWLock *lock, LWLockMode mode); extern bool LWLockAcquireOrWait(LWLock *lock, LWLockMode mode); extern void LWLockRelease(LWLock *lock); extern void LWLockReleaseAll(void); extern bool LWLockHeldByMe(LWLock *lock); +extern bool LWLockAcquireWithVar(LWLock *lock, uint64 *valptr, uint64 val); +extern bool LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval); +extern void LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 value); + extern Size LWLockShmemSize(void); extern void CreateLWLocks(void); |