diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2013-07-08 11:23:56 +0300 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2013-07-08 11:23:56 +0300 |
commit | 9a20a9b21baa819df1760b36f3c36f25d11fc27b (patch) | |
tree | d4990fa07af24d6d9544ff5e1b7e4edfb5dd6c12 /src/backend/access/transam/xlog.c | |
parent | 5372275b4b5fc183c6c6dd4517cfd74d5b641446 (diff) | |
download | postgresql-9a20a9b21baa819df1760b36f3c36f25d11fc27b.tar.gz postgresql-9a20a9b21baa819df1760b36f3c36f25d11fc27b.zip |
Improve scalability of WAL insertions.
This patch replaces WALInsertLock with a number of WAL insertion slots,
allowing multiple backends to insert WAL records to the WAL buffers
concurrently. This is particularly useful for parallel loading large amounts
of data on a system with many CPUs.
This has one user-visible change: switching to a new WAL segment with
pg_switch_xlog() now fills the remaining unused portion of the segment with
zeros. This potentially adds some overhead, but it has been a very common
practice by DBA's to clear the "tail" of the segment with an external
pg_clearxlogtail utility anyway, to make the WAL files compress better.
With this patch, it's no longer necessary to do that.
This patch adds a new GUC, xloginsert_slots, to tune the number of WAL
insertion slots. Performance testing suggests that the default, 8, works
pretty well for all kinds of worklods, but I left the GUC in place to allow
others with different hardware to test that easily. We might want to remove
that before release.
Reviewed by Andres Freund.
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 2017 |
1 files changed, 1579 insertions, 438 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 77e5c3b5d85..acf0dd18761 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -41,6 +41,7 @@ #include "postmaster/startup.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#include "storage/barrier.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -83,6 +84,7 @@ int sync_method = DEFAULT_SYNC_METHOD; int wal_level = WAL_LEVEL_MINIMAL; int CommitDelay = 0; /* precommit delay in microseconds */ int CommitSiblings = 5; /* # concurrent xacts needed to sleep */ +int num_xloginsert_slots = 8; #ifdef WAL_DEBUG bool XLOG_DEBUG = false; @@ -279,8 +281,8 @@ XLogRecPtr XactLastRecEnd = InvalidXLogRecPtr; * (which is almost but not quite the same as a pointer to the most recent * CHECKPOINT record). We update this from the shared-memory copy, * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we - * hold the Insert lock). See XLogInsert for details. We are also allowed - * to update from XLogCtl->Insert.RedoRecPtr if we hold the info_lck; + * hold an insertion slot). See XLogInsert for details. We are also allowed + * to update from XLogCtl->RedoRecPtr if we hold the info_lck; * see GetRedoRecPtr. A freshly spawned backend obtains the value during * InitXLOGAccess. */ @@ -321,7 +323,10 @@ static XLogRecPtr RedoStartLSN = InvalidXLogRecPtr; * so it's a plain spinlock. The other locks are held longer (potentially * over I/O operations), so we use LWLocks for them. These locks are: * - * WALInsertLock: must be held to insert a record into the WAL buffers. + * WALBufMappingLock: must be held to replace a page in the WAL buffer cache. + * It is only held while initializing and changing the mapping. If the + * contents of the buffer being replaced haven't been written yet, the mapping + * lock is released while the write is done, and reacquired afterwards. * * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or * XLogFlush). @@ -348,17 +353,83 @@ typedef struct XLogwrtResult XLogRecPtr Flush; /* last byte + 1 flushed */ } XLogwrtResult; + +/* + * A slot for inserting to the WAL. This is similar to an LWLock, the main + * difference is that there is an extra xlogInsertingAt field that is protected + * by the same mutex. Unlike an LWLock, a slot can only be acquired in + * exclusive mode. + * + * The xlogInsertingAt field is used to advertise to other processes how far + * the slot owner has progressed in inserting the record. When a backend + * acquires a slot, it initializes xlogInsertingAt to 1, because it doesn't + * yet know where it's going to insert the record. That's conservative + * but correct; the new insertion is certainly going to go to a byte position + * greater than 1. If another backend needs to flush the WAL, it will have to + * wait for the new insertion. xlogInsertingAt is updated after finishing the + * insert or when crossing a page boundary, which will wake up anyone waiting + * for it, whether the wait was necessary in the first place or not. + * + * A process can wait on a slot in two modes: LW_EXCLUSIVE or + * LW_WAIT_UNTIL_FREE. LW_EXCLUSIVE works like in an lwlock; when the slot is + * released, the first LW_EXCLUSIVE waiter in the queue is woken up. Processes + * waiting in LW_WAIT_UNTIL_FREE mode are woken up whenever the slot is + * released, or xlogInsertingAt is updated. In other words, a process in + * LW_WAIT_UNTIL_FREE mode is woken up whenever the inserter makes any progress + * copying the record in place. LW_WAIT_UNTIL_FREE waiters are always added to + * the front of the queue, while LW_EXCLUSIVE waiters are appended to the end. + * + * To join the wait queue, a process must set MyProc->lwWaitMode to the mode + * it wants to wait in, MyProc->lwWaiting to true, and link MyProc to the head + * or tail of the wait queue. The same mechanism is used to wait on an LWLock, + * see lwlock.c for details. + */ +typedef struct +{ + slock_t mutex; /* protects the below fields */ + XLogRecPtr xlogInsertingAt; /* insert has completed up to this point */ + + PGPROC *owner; /* for debugging purposes */ + + bool releaseOK; /* T if ok to release waiters */ + char exclusive; /* # of exclusive holders (0 or 1) */ + PGPROC *head; /* head of list of waiting PGPROCs */ + PGPROC *tail; /* tail of list of waiting PGPROCs */ + /* tail is undefined when head is NULL */ +} XLogInsertSlot; + +/* + * All the slots are allocated as an array in shared memory. We force the + * array stride to be a power of 2, which saves a few cycles in indexing, but + * more importantly also ensures that individual slots don't cross cache line + * boundaries. (Of course, we have to also ensure that the array start + * address is suitably aligned.) + */ +typedef union XLogInsertSlotPadded +{ + XLogInsertSlot slot; + char pad[64]; +} XLogInsertSlotPadded; + /* * Shared state data for XLogInsert. */ typedef struct XLogCtlInsert { - XLogRecPtr PrevRecord; /* start of previously-inserted record */ - int curridx; /* current block index in cache */ - XLogPageHeader currpage; /* points to header of block in cache */ - char *currpos; /* current insertion point in cache */ - XLogRecPtr RedoRecPtr; /* current redo point for insertions */ - bool forcePageWrites; /* forcing full-page writes for PITR? */ + slock_t insertpos_lck; /* protects CurrBytePos and PrevBytePos */ + + /* + * CurrBytePos is the end of reserved WAL. The next record will be inserted + * at that position. PrevBytePos is the start position of the previously + * inserted (or rather, reserved) record - it is copied to the the prev- + * link of the next record. These are stored as "usable byte positions" + * rather than XLogRecPtrs (see XLogBytePosToRecPtr()). + */ + uint64 CurrBytePos; + uint64 PrevBytePos; + + /* insertion slots, see above for details */ + XLogInsertSlotPadded *insertSlots; /* * fullPageWrites is the master copy used by all backends to determine @@ -366,7 +437,12 @@ typedef struct XLogCtlInsert * This is required because, when full_page_writes is changed by SIGHUP, * we must WAL-log it before it actually affects WAL-logging by backends. * Checkpointer sets at startup or after SIGHUP. + * + * To read these fields, you must hold an insertion slot. To modify them, + * you must hold ALL the slots. */ + XLogRecPtr RedoRecPtr; /* current redo point for insertions */ + bool forcePageWrites; /* forcing full-page writes for PITR? */ bool fullPageWrites; /* @@ -395,11 +471,11 @@ typedef struct XLogCtlWrite */ typedef struct XLogCtlData { - /* Protected by WALInsertLock: */ XLogCtlInsert Insert; /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; + XLogRecPtr RedoRecPtr; /* a recent copy of Insert->RedoRecPtr */ uint32 ckptXidEpoch; /* nextXID & epoch of latest checkpoint */ TransactionId ckptXid; XLogRecPtr asyncXactLSN; /* LSN of newest async commit/abort */ @@ -420,9 +496,20 @@ typedef struct XLogCtlData XLogwrtResult LogwrtResult; /* + * Latest initialized block index in cache. + * + * To change curridx and the identity of a buffer, you need to hold + * WALBufMappingLock. To change the identity of a buffer that's still + * dirty, the old page needs to be written out first, and for that you + * need WALWriteLock, and you need to ensure that there are no in-progress + * insertions to the page by calling WaitXLogInsertionsToFinish(). + */ + int curridx; + + /* * These values do not change after startup, although the pointed-to pages - * and xlblocks values certainly do. Permission to read/write the pages - * and xlblocks values depends on WALInsertLock and WALWriteLock. + * and xlblocks values certainly do. xlblock values are protected by + * WALBufMappingLock. */ char *pages; /* buffers for unwritten XLOG pages */ XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ @@ -518,24 +605,34 @@ static XLogCtlData *XLogCtl = NULL; static ControlFileData *ControlFile = NULL; /* - * Macros for managing XLogInsert state. In most cases, the calling routine - * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx, - * so these are passed as parameters instead of being fetched via XLogCtl. + * Calculate the amount of space left on the page after 'endptr'. Beware + * multiple evaluation! */ +#define INSERT_FREESPACE(endptr) \ + (((endptr) % XLOG_BLCKSZ == 0) ? 0 : (XLOG_BLCKSZ - (endptr) % XLOG_BLCKSZ)) -/* Free space remaining in the current xlog page buffer */ -#define INSERT_FREESPACE(Insert) \ - (XLOG_BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage)) +/* Macro to advance to next buffer index. */ +#define NextBufIdx(idx) \ + (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1)) -/* Construct XLogRecPtr value for current insertion point */ -#define INSERT_RECPTR(recptr,Insert,curridx) \ - (recptr) = XLogCtl->xlblocks[curridx] - INSERT_FREESPACE(Insert) +/* + * XLogRecPtrToBufIdx returns the index of the WAL buffer that holds, or + * would hold if it was in cache, the page containing 'recptr'. + * + * XLogRecEndPtrToBufIdx is the same, but a pointer to the first byte of a + * page is taken to mean the previous page. + */ +#define XLogRecPtrToBufIdx(recptr) \ + (((recptr) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1)) -#define PrevBufIdx(idx) \ - (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1)) +#define XLogRecEndPtrToBufIdx(recptr) \ + ((((recptr) - 1) / XLOG_BLCKSZ) % (XLogCtl->XLogCacheBlck + 1)) -#define NextBufIdx(idx) \ - (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1)) +/* + * These are the number of bytes in a WAL page and segment usable for WAL data. + */ +#define UsableBytesInPage (XLOG_BLCKSZ - SizeOfXLogShortPHD) +#define UsableBytesInSegment ((XLOG_SEG_SIZE / XLOG_BLCKSZ) * UsableBytesInPage - (SizeOfXLogLongPHD - SizeOfXLogShortPHD)) /* * Private, possibly out-of-date copy of shared LogwrtResult. @@ -631,6 +728,9 @@ static bool InRedo = false; /* Have we launched bgwriter during recovery? */ static bool bgwriterLaunched = false; +/* For WALInsertSlotAcquire/Release functions */ +static int MySlotNo = 0; +static bool holdingAllSlots = false; static void readRecoveryCommandFile(void); static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo); @@ -651,9 +751,9 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, XLogRecPtr *lsn, BkpBlock *bkpb); static Buffer RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk, bool get_cleanup_lock, bool keep_buffer); -static bool AdvanceXLInsertBuffer(bool new_segment); +static void AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic); static bool XLogCheckpointNeeded(XLogSegNo new_segno); -static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); +static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible); static bool InstallXLogFileSegment(XLogSegNo *segno, char *tmppath, bool find_free, int *max_advance, bool use_lock); @@ -693,6 +793,24 @@ static bool read_backup_label(XLogRecPtr *checkPointLoc, static void rm_redo_error_callback(void *arg); static int get_sync_bit(int method); +static void CopyXLogRecordToWAL(int write_len, bool isLogSwitch, + XLogRecData *rdata, + XLogRecPtr StartPos, XLogRecPtr EndPos); +static void ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, + XLogRecPtr *EndPos, XLogRecPtr *PrevPtr); +static bool ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, + XLogRecPtr *PrevPtr); +static XLogRecPtr WaitXLogInsertionsToFinish(XLogRecPtr upto); +static void WakeupWaiters(XLogRecPtr EndPos); +static char *GetXLogBuffer(XLogRecPtr ptr); +static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos); +static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos); +static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr); + +static void WALInsertSlotAcquire(bool exclusive); +static void WALInsertSlotAcquireOne(int slotno); +static void WALInsertSlotRelease(void); +static void WALInsertSlotReleaseOne(int slotno); /* * Insert an XLOG record having the specified RMID and info bytes, @@ -713,10 +831,6 @@ XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) { XLogCtlInsert *Insert = &XLogCtl->Insert; - XLogRecPtr RecPtr; - XLogRecPtr WriteRqst; - uint32 freespace; - int curridx; XLogRecData *rdt; XLogRecData *rdt_lastnormal; Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; @@ -731,11 +845,13 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) uint32 len, write_len; unsigned i; - bool updrqst; bool doPageWrites; bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); + bool inserted; uint8 info_orig = info; static XLogRecord *rechdr; + XLogRecPtr StartPos; + XLogRecPtr EndPos; if (rechdr == NULL) { @@ -761,8 +877,8 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) */ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { - RecPtr = SizeOfXLogLongPHD; /* start of 1st chkpt record */ - return RecPtr; + EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */ + return EndPos; } /* @@ -770,9 +886,9 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) * up. * * We may have to loop back to here if a race condition is detected below. - * We could prevent the race by doing all this work while holding the - * insert lock, but it seems better to avoid doing CRC calculations while - * holding the lock. + * We could prevent the race by doing all this work while holding an + * insertion slot, but it seems better to avoid doing CRC calculations + * while holding one. * * We add entries for backup blocks to the chain, so that they don't need * any special treatment in the critical section where the chunks are @@ -789,8 +905,8 @@ begin:; /* * Decide if we need to do full-page writes in this XLOG record: true if * full_page_writes is on or we have a PITR request for it. Since we - * don't yet have the insert lock, fullPageWrites and forcePageWrites - * could change under us, but we'll recheck them once we have the lock. + * don't yet have an insertion slot, fullPageWrites and forcePageWrites + * could change under us, but we'll recheck them once we have a slot. */ doPageWrites = Insert->fullPageWrites || Insert->forcePageWrites; @@ -930,25 +1046,60 @@ begin:; COMP_CRC32(rdata_crc, rdt->data, rdt->len); /* - * Construct record header (prev-link and CRC are filled in later), and - * make that the first chunk in the chain. + * Construct record header (prev-link is filled in later, after reserving + * the space for the record), and make that the first chunk in the chain. + * + * The CRC calculated for the header here doesn't include prev-link, + * because we don't know it yet. It will be added later. */ rechdr->xl_xid = GetCurrentTransactionIdIfAny(); rechdr->xl_tot_len = SizeOfXLogRecord + write_len; rechdr->xl_len = len; /* doesn't include backup blocks */ rechdr->xl_info = info; rechdr->xl_rmid = rmid; + rechdr->xl_prev = InvalidXLogRecPtr; + COMP_CRC32(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev)); hdr_rdt.next = rdata; hdr_rdt.data = (char *) rechdr; hdr_rdt.len = SizeOfXLogRecord; - write_len += SizeOfXLogRecord; + /*---------- + * + * We have now done all the preparatory work we can without holding a + * lock or modifying shared state. From here on, inserting the new WAL + * record to the shared WAL buffer cache is a two-step process: + * + * 1. Reserve the right amount of space from the WAL. The current head of + * reserved space is kept in Insert->CurrBytePos, and is protected by + * insertpos_lck. + * + * 2. Copy the record to the reserved WAL space. This involves finding the + * correct WAL buffer containing the reserved space, and copying the + * record in place. This can be done concurrently in multiple processes. + * + * To keep track of which insertions are still in-progress, each concurrent + * inserter allocates an "insertion slot", which tells others how far the + * inserter has progressed. There is a small fixed number of insertion + * slots, determined by the num_xloginsert_slots GUC. When an inserter + * finishes, it updates the xlogInsertingAt of its slot to the end of the + * record it inserted, to let others know that it's done. xlogInsertingAt + * is also updated when crossing over to a new WAL buffer, to allow the + * the previous buffer to be flushed. + * + * Holding onto a slot also protects RedoRecPtr and fullPageWrites from + * changing until the insertion is finished. + * + * Step 2 can usually be done completely in parallel. If the required WAL + * page is not initialized yet, you have to grab WALBufMappingLock to + * initialize it, but the WAL writer tries to do that ahead of insertions + * to avoid that from happening in the critical path. + * + *---------- + */ START_CRIT_SECTION(); - - /* Now wait to get insert lock */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(isLogSwitch); /* * Check to see if my RedoRecPtr is out of date. If so, may have to go @@ -977,7 +1128,7 @@ begin:; * Oops, this buffer now needs to be backed up, but we * didn't think so above. Start over. */ - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); END_CRIT_SECTION(); rdt_lastnormal->next = NULL; info = info_orig; @@ -996,7 +1147,7 @@ begin:; if ((Insert->fullPageWrites || Insert->forcePageWrites) && !doPageWrites) { /* Oops, must redo it with full-page data. */ - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); END_CRIT_SECTION(); rdt_lastnormal->next = NULL; info = info_orig; @@ -1004,62 +1155,94 @@ begin:; } /* - * If the current page is completely full, the record goes to the next - * page, right after the page header. + * Reserve space for the record in the WAL. This also sets the xl_prev + * pointer. */ - updrqst = false; - freespace = INSERT_FREESPACE(Insert); - if (freespace == 0) + if (isLogSwitch) + inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev); + else + { + ReserveXLogInsertLocation(write_len, &StartPos, &EndPos, + &rechdr->xl_prev); + inserted = true; + } + + if (inserted) + { + /* + * Now that xl_prev has been filled in, finish CRC calculation of the + * record header. + */ + COMP_CRC32(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr)); + FIN_CRC32(rdata_crc); + rechdr->xl_crc = rdata_crc; + + /* + * All the record data, including the header, is now ready to be + * inserted. Copy the record in the space reserved. + */ + CopyXLogRecordToWAL(write_len, isLogSwitch, &hdr_rdt, StartPos, EndPos); + } + else { - updrqst = AdvanceXLInsertBuffer(false); - freespace = INSERT_FREESPACE(Insert); + /* + * This was an xlog-switch record, but the current insert location was + * already exactly at the beginning of a segment, so there was no need + * to do anything. + */ } - /* Compute record's XLOG location */ - curridx = Insert->curridx; - INSERT_RECPTR(RecPtr, Insert, curridx); + /* + * Done! Let others know that we're finished. + */ + WALInsertSlotRelease(); + + END_CRIT_SECTION(); /* - * If the record is an XLOG_SWITCH, and we are exactly at the start of a - * segment, we need not insert it (and don't want to because we'd like - * consecutive switch requests to be no-ops). Instead, make sure - * everything is written and flushed through the end of the prior segment, - * and return the prior segment's end address. + * Update shared LogwrtRqst.Write, if we crossed page boundary. */ - if (isLogSwitch && (RecPtr % XLogSegSize) == SizeOfXLogLongPHD) + if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) { - /* We can release insert lock immediately */ - LWLockRelease(WALInsertLock); + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; - RecPtr -= SizeOfXLogLongPHD; + SpinLockAcquire(&xlogctl->info_lck); + /* advance global request to include new block(s) */ + if (xlogctl->LogwrtRqst.Write < EndPos) + xlogctl->LogwrtRqst.Write = EndPos; + /* update local result copy while I have the chance */ + LogwrtResult = xlogctl->LogwrtResult; + SpinLockRelease(&xlogctl->info_lck); + } - LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; - if (LogwrtResult.Flush < RecPtr) + /* + * If this was an XLOG_SWITCH record, flush the record and the empty + * padding space that fills the rest of the segment, and perform + * end-of-segment actions (eg, notifying archiver). + */ + if (isLogSwitch) + { + TRACE_POSTGRESQL_XLOG_SWITCH(); + XLogFlush(EndPos); + /* + * Even though we reserved the rest of the segment for us, which is + * reflected in EndPos, we return a pointer to just the end of the + * xlog-switch record. + */ + if (inserted) { - XLogwrtRqst FlushRqst; - - FlushRqst.Write = RecPtr; - FlushRqst.Flush = RecPtr; - XLogWrite(FlushRqst, false, false); + EndPos = StartPos + SizeOfXLogRecord; + if (StartPos / XLOG_BLCKSZ != EndPos / XLOG_BLCKSZ) + { + if (EndPos % XLOG_SEG_SIZE == EndPos % XLOG_BLCKSZ) + EndPos += SizeOfXLogLongPHD; + else + EndPos += SizeOfXLogShortPHD; + } } - LWLockRelease(WALWriteLock); - - END_CRIT_SECTION(); - - /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); - return RecPtr; } - /* Finish the record header */ - rechdr->xl_prev = Insert->PrevRecord; - - /* Now we can finish computing the record's CRC */ - COMP_CRC32(rdata_crc, (char *) rechdr, offsetof(XLogRecord, xl_crc)); - FIN_CRC32(rdata_crc); - rechdr->xl_crc = rdata_crc; - #ifdef WAL_DEBUG if (XLOG_DEBUG) { @@ -1067,7 +1250,7 @@ begin:; initStringInfo(&buf); appendStringInfo(&buf, "INSERT @ %X/%X: ", - (uint32) (RecPtr >> 32), (uint32) RecPtr); + (uint32) (EndPos >> 32), (uint32) EndPos); xlog_outrec(&buf, rechdr); if (rdata->data != NULL) { @@ -1079,163 +1262,1062 @@ begin:; } #endif - /* Record begin of record in appropriate places */ - ProcLastRecPtr = RecPtr; - Insert->PrevRecord = RecPtr; + /* + * Update our global variables + */ + ProcLastRecPtr = StartPos; + XactLastRecEnd = EndPos; + + return EndPos; +} + +/* + * Reserves the right amount of space for a record of given size from the WAL. + * *StartPos is set to the beginning of the reserved section, *EndPos to + * its end+1. *PrevPtr is set to the beginning of the previous record; it is + * used to set the xl_prev of this record. + * + * This is the performance critical part of XLogInsert that must be serialized + * across backends. The rest can happen mostly in parallel. Try to keep this + * section as short as possible, insertpos_lck can be heavily contended on a + * busy system. + * + * NB: The space calculation here must match the code in CopyXLogRecordToWAL, + * where we actually copy the record to the reserved space. + */ +static void +ReserveXLogInsertLocation(int size, XLogRecPtr *StartPos, XLogRecPtr *EndPos, + XLogRecPtr *PrevPtr) +{ + volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + uint64 startbytepos; + uint64 endbytepos; + uint64 prevbytepos; + + size = MAXALIGN(size); + + /* All (non xlog-switch) records should contain data. */ + Assert(size > SizeOfXLogRecord); + + /* + * The duration the spinlock needs to be held is minimized by minimizing + * the calculations that have to be done while holding the lock. The + * current tip of reserved WAL is kept in CurrBytePos, as a byte position + * that only counts "usable" bytes in WAL, that is, it excludes all WAL + * page headers. The mapping between "usable" byte positions and physical + * positions (XLogRecPtrs) can be done outside the locked region, and + * because the usable byte position doesn't include any headers, reserving + * X bytes from WAL is almost as simple as "CurrBytePos += X". + */ + SpinLockAcquire(&Insert->insertpos_lck); + + startbytepos = Insert->CurrBytePos; + endbytepos = startbytepos + size; + prevbytepos = Insert->PrevBytePos; + Insert->CurrBytePos = endbytepos; + Insert->PrevBytePos = startbytepos; + + SpinLockRelease(&Insert->insertpos_lck); + + *StartPos = XLogBytePosToRecPtr(startbytepos); + *EndPos = XLogBytePosToEndRecPtr(endbytepos); + *PrevPtr = XLogBytePosToRecPtr(prevbytepos); + + /* + * Check that the conversions between "usable byte positions" and + * XLogRecPtrs work consistently in both directions. + */ + Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos); + Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos); + Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos); +} + +/* + * Like ReserveXLogInsertLocation(), but for an xlog-switch record. + * + * A log-switch record is handled slightly differently. The rest of the + * segment will be reserved for this insertion, as indicated by the returned + * *EndPos_p value. However, if we are already at the beginning of the current + * segment, *StartPos_p and *EndPos_p are set to the current location without + * reserving any space, and the function returns false. +*/ +static bool +ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) +{ + volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + uint64 startbytepos; + uint64 endbytepos; + uint64 prevbytepos; + uint32 size = SizeOfXLogRecord; + XLogRecPtr ptr; + uint32 segleft; + + /* + * These calculations are a bit heavy-weight to be done while holding a + * spinlock, but since we're holding all the WAL insertion slots, there + * are no other inserters competing for it. GetXLogInsertRecPtr() does + * compete for it, but that's not called very frequently. + */ + SpinLockAcquire(&Insert->insertpos_lck); + + startbytepos = Insert->CurrBytePos; + + ptr = XLogBytePosToEndRecPtr(startbytepos); + if (ptr % XLOG_SEG_SIZE == 0) + { + SpinLockRelease(&Insert->insertpos_lck); + *EndPos = *StartPos = ptr; + return false; + } + + endbytepos = startbytepos + size; + prevbytepos = Insert->PrevBytePos; + + *StartPos = XLogBytePosToRecPtr(startbytepos); + *EndPos = XLogBytePosToEndRecPtr(endbytepos); + + segleft = XLOG_SEG_SIZE - ((*EndPos) % XLOG_SEG_SIZE); + if (segleft != XLOG_SEG_SIZE) + { + /* consume the rest of the segment */ + *EndPos += segleft; + endbytepos = XLogRecPtrToBytePos(*EndPos); + } + Insert->CurrBytePos = endbytepos; + Insert->PrevBytePos = startbytepos; + + SpinLockRelease(&Insert->insertpos_lck); + + *PrevPtr = XLogBytePosToRecPtr(prevbytepos); + + Assert((*EndPos) % XLOG_SEG_SIZE == 0); + Assert(XLogRecPtrToBytePos(*EndPos) == endbytepos); + Assert(XLogRecPtrToBytePos(*StartPos) == startbytepos); + Assert(XLogRecPtrToBytePos(*PrevPtr) == prevbytepos); + + return true; +} + +/* + * Subroutine of XLogInsert. Copies a WAL record to an already-reserved + * area in the WAL. + */ +static void +CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, + XLogRecPtr StartPos, XLogRecPtr EndPos) +{ + char *currpos; + int freespace; + int written; + XLogRecPtr CurrPos; + XLogPageHeader pagehdr; + + /* The first chunk is the record header */ + Assert(rdata->len == SizeOfXLogRecord); /* - * Append the data, including backup blocks if any + * Get a pointer to the right place in the right WAL buffer to start + * inserting to. */ - rdata = &hdr_rdt; - while (write_len) + CurrPos = StartPos; + currpos = GetXLogBuffer(CurrPos); + freespace = INSERT_FREESPACE(CurrPos); + + /* + * there should be enough space for at least the first field (xl_tot_len) + * on this page. + */ + Assert(freespace >= sizeof(uint32)); + + /* Copy record data */ + written = 0; + while (rdata != NULL) { - while (rdata->data == NULL) - rdata = rdata->next; + char *rdata_data = rdata->data; + int rdata_len = rdata->len; - if (freespace > 0) + while (rdata_len > freespace) { - if (rdata->len > freespace) + /* + * Write what fits on this page, and continue on the next page. + */ + Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || freespace == 0); + memcpy(currpos, rdata_data, freespace); + rdata_data += freespace; + rdata_len -= freespace; + written += freespace; + CurrPos += freespace; + + /* + * Get pointer to beginning of next page, and set the xlp_rem_len + * in the page header. Set XLP_FIRST_IS_CONTRECORD. + * + * It's safe to set the contrecord flag and xlp_rem_len without a + * lock on the page. All the other flags were already set when the + * page was initialized, in AdvanceXLInsertBuffer, and we're the + * only backend that needs to set the contrecord flag. + */ + currpos = GetXLogBuffer(CurrPos); + pagehdr = (XLogPageHeader) currpos; + pagehdr->xlp_rem_len = write_len - written; + pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD; + + /* skip over the page header */ + if (CurrPos % XLogSegSize == 0) { - memcpy(Insert->currpos, rdata->data, freespace); - rdata->data += freespace; - rdata->len -= freespace; - write_len -= freespace; + CurrPos += SizeOfXLogLongPHD; + currpos += SizeOfXLogLongPHD; } else { - memcpy(Insert->currpos, rdata->data, rdata->len); - freespace -= rdata->len; - write_len -= rdata->len; - Insert->currpos += rdata->len; - rdata = rdata->next; - continue; + CurrPos += SizeOfXLogShortPHD; + currpos += SizeOfXLogShortPHD; } + freespace = INSERT_FREESPACE(CurrPos); + } + + Assert(CurrPos % XLOG_BLCKSZ >= SizeOfXLogShortPHD || rdata_len == 0); + memcpy(currpos, rdata_data, rdata_len); + currpos += rdata_len; + CurrPos += rdata_len; + freespace -= rdata_len; + written += rdata_len; + + rdata = rdata->next; + } + Assert(written == write_len); + + /* Align the end position, so that the next record starts aligned */ + CurrPos = MAXALIGN(CurrPos); + + /* + * If this was an xlog-switch, it's not enough to write the switch record, + * we also have to consume all the remaining space in the WAL segment. + * We have already reserved it for us, but we still need to make sure it's + * allocated and zeroed in the WAL buffers so that when the caller (or + * someone else) does XLogWrite(), it can really write out all the zeros. + */ + if (isLogSwitch && CurrPos % XLOG_SEG_SIZE != 0) + { + /* An xlog-switch record doesn't contain any data besides the header */ + Assert(write_len == SizeOfXLogRecord); + + /* + * We do this one page at a time, to make sure we don't deadlock + * against ourselves if wal_buffers < XLOG_SEG_SIZE. + */ + Assert(EndPos % XLogSegSize == 0); + + /* Use up all the remaining space on the first page */ + CurrPos += freespace; + + while (CurrPos < EndPos) + { + /* initialize the next page (if not initialized already) */ + WakeupWaiters(CurrPos); + AdvanceXLInsertBuffer(CurrPos, false); + CurrPos += XLOG_BLCKSZ; } + } + + if (CurrPos != EndPos) + elog(PANIC, "space reserved for WAL record does not match what was written"); +} + +/* + * Allocate a slot for insertion. + * + * In exclusive mode, all slots are reserved for the current process. That + * blocks all concurrent insertions. + */ +static void +WALInsertSlotAcquire(bool exclusive) +{ + int i; + + if (exclusive) + { + for (i = 0; i < num_xloginsert_slots; i++) + WALInsertSlotAcquireOne(i); + holdingAllSlots = true; + } + else + WALInsertSlotAcquireOne(-1); +} + +/* + * Workhorse of WALInsertSlotAcquire. Acquires the given slot, or an arbitrary + * one if slotno == -1. The index of the slot that was acquired is stored in + * MySlotNo. + * + * This is more or less equivalent to LWLockAcquire(). + */ +static void +WALInsertSlotAcquireOne(int slotno) +{ + volatile XLogInsertSlot *slot; + PGPROC *proc = MyProc; + bool retry = false; + int extraWaits = 0; + static int slotToTry = -1; - /* Use next buffer */ - updrqst = AdvanceXLInsertBuffer(false); - curridx = Insert->curridx; - /* Mark page header to indicate this record continues on the page */ - Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; - Insert->currpage->xlp_rem_len = write_len; - freespace = INSERT_FREESPACE(Insert); + /* + * Try to use the slot we used last time. If the system isn't particularly + * busy, it's a good bet that it's available, and it's good to have some + * affinity to a particular slot so that you don't unnecessarily bounce + * cache lines between processes when there is no contention. + * + * If this is the first time through in this backend, pick a slot + * (semi-)randomly. This allows the slots to be used evenly if you have a + * lot of very short connections. + */ + if (slotno != -1) + MySlotNo = slotno; + else + { + if (slotToTry == -1) + slotToTry = MyProc->pgprocno % num_xloginsert_slots; + MySlotNo = slotToTry; } - /* Ensure next record will be properly aligned */ - Insert->currpos = (char *) Insert->currpage + - MAXALIGN(Insert->currpos - (char *) Insert->currpage); - freespace = INSERT_FREESPACE(Insert); + /* + * We can't wait if we haven't got a PGPROC. This should only occur + * during bootstrap or shared memory initialization. Put an Assert here + * to catch unsafe coding practices. + */ + Assert(MyProc != NULL); /* - * The recptr I return is the beginning of the *next* record. This will be - * stored as LSN for changed data pages... + * Lock out cancel/die interrupts until we exit the code section protected + * by the slot. This ensures that interrupts will not interfere with + * manipulations of data structures in shared memory. */ - INSERT_RECPTR(RecPtr, Insert, curridx); + START_CRIT_SECTION(); /* - * If the record is an XLOG_SWITCH, we must now write and flush all the - * existing data, and then forcibly advance to the start of the next - * segment. It's not good to do this I/O while holding the insert lock, - * but there seems too much risk of confusion if we try to release the - * lock sooner. Fortunately xlog switch needn't be a high-performance - * operation anyway... + * Loop here to try to acquire slot after each time we are signaled by + * WALInsertSlotRelease. */ - if (isLogSwitch) + for (;;) { - XLogwrtRqst FlushRqst; - XLogRecPtr OldSegEnd; + bool mustwait; - TRACE_POSTGRESQL_XLOG_SWITCH(); + slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&slot->mutex); + + /* If retrying, allow WALInsertSlotRelease to release waiters again */ + if (retry) + slot->releaseOK = true; + + /* If I can get the slot, do so quickly. */ + if (slot->exclusive == 0) + { + slot->exclusive++; + mustwait = false; + } + else + mustwait = true; + + if (!mustwait) + break; /* got the lock */ + + Assert(slot->owner != MyProc); - LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); + /* + * Add myself to wait queue. + */ + proc->lwWaiting = true; + proc->lwWaitMode = LW_EXCLUSIVE; + proc->lwWaitLink = NULL; + if (slot->head == NULL) + slot->head = proc; + else + slot->tail->lwWaitLink = proc; + slot->tail = proc; + + /* Can release the mutex now */ + SpinLockRelease(&slot->mutex); /* - * Flush through the end of the page containing XLOG_SWITCH, and - * perform end-of-segment actions (eg, notifying archiver). + * Wait until awakened. + * + * Since we share the process wait semaphore with the regular lock + * manager and ProcWaitForSignal, and we may need to acquire a slot + * while one of those is pending, it is possible that we get awakened + * for a reason other than being signaled by WALInsertSlotRelease. If + * so, loop back and wait again. Once we've gotten the slot, + * re-increment the sema by the number of additional signals received, + * so that the lock manager or signal manager will see the received + * signal when it next waits. */ - WriteRqst = XLogCtl->xlblocks[curridx]; - FlushRqst.Write = WriteRqst; - FlushRqst.Flush = WriteRqst; - XLogWrite(FlushRqst, false, true); + for (;;) + { + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + + /* Now loop back and try to acquire lock again. */ + retry = true; + } + + slot->owner = proc; + + /* + * Normally, we initialize the xlogInsertingAt value of the slot to 1, + * because we don't yet know where in the WAL we're going to insert. It's + * not critical what it points to right now - leaving it to a too small + * value just means that WaitXlogInsertionsToFinish() might wait on us + * unnecessarily, until we update the value (when we finish the insert or + * move to next page). + * + * If we're grabbing all the slots, however, stamp all but the last one + * with InvalidXLogRecPtr, meaning there is no insert in progress. The last + * slot is the one that we will update as we proceed with the insert, the + * rest are held just to keep off other inserters. + */ + if (slotno != -1 && slotno != num_xloginsert_slots - 1) + slot->xlogInsertingAt = InvalidXLogRecPtr; + else + slot->xlogInsertingAt = 1; + + /* We are done updating shared state of the slot itself. */ + SpinLockRelease(&slot->mutex); + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + + /* + * If we couldn't get the slot immediately, try another slot next time. + * On a system with more insertion slots than concurrent inserters, this + * causes all the inserters to eventually migrate to a slot that no-one + * else is using. On a system with more inserters than slots, it still + * causes the inserters to be distributed quite evenly across the slots. + */ + if (slotno != -1 && retry) + slotToTry = (slotToTry + 1) % num_xloginsert_slots; +} + +/* + * Wait for the given slot to become free, or for its xlogInsertingAt location + * to change to something else than 'waitptr'. In other words, wait for the + * inserter using the given slot to finish its insertion, or to at least make + * some progress. + */ +static void +WaitOnSlot(volatile XLogInsertSlot *slot, XLogRecPtr waitptr) +{ + PGPROC *proc = MyProc; + int extraWaits = 0; + + /* + * Lock out cancel/die interrupts while we sleep on the slot. There is + * no cleanup mechanism to remove us from the wait queue if we got + * interrupted. + */ + HOLD_INTERRUPTS(); + + /* + * Loop here to try to acquire lock after each time we are signaled. + */ + for (;;) + { + bool mustwait; - /* Set up the next buffer as first page of next segment */ - /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */ - (void) AdvanceXLInsertBuffer(true); + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&slot->mutex); - /* There should be no unwritten data */ - curridx = Insert->curridx; - Assert(curridx == XLogCtl->Write.curridx); + /* If I can get the lock, do so quickly. */ + if (slot->exclusive == 0 || slot->xlogInsertingAt != waitptr) + mustwait = false; + else + mustwait = true; - /* Compute end address of old segment */ - OldSegEnd = XLogCtl->xlblocks[curridx]; - OldSegEnd -= XLOG_BLCKSZ; + if (!mustwait) + break; /* the lock was free */ - /* Make it look like we've written and synced all of old segment */ - LogwrtResult.Write = OldSegEnd; - LogwrtResult.Flush = OldSegEnd; + Assert(slot->owner != MyProc); /* - * Update shared-memory status --- this code should match XLogWrite + * Add myself to wait queue. */ + proc->lwWaiting = true; + proc->lwWaitMode = LW_WAIT_UNTIL_FREE; + proc->lwWaitLink = NULL; + + /* waiters are added to the front of the queue */ + proc->lwWaitLink = slot->head; + if (slot->head == NULL) + slot->tail = proc; + slot->head = proc; + + /* Can release the mutex now */ + SpinLockRelease(&slot->mutex); + + /* + * Wait until awakened. + * + * Since we share the process wait semaphore with other things, like + * the regular lock manager and ProcWaitForSignal, and we may need to + * acquire an LWLock while one of those is pending, it is possible that + * we get awakened for a reason other than being signaled by + * LWLockRelease. If so, loop back and wait again. Once we've gotten + * the LWLock, re-increment the sema by the number of additional + * signals received, so that the lock manager or signal manager will + * see the received signal when it next waits. + */ + for (;;) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } - SpinLockAcquire(&xlogctl->info_lck); - xlogctl->LogwrtResult = LogwrtResult; - if (xlogctl->LogwrtRqst.Write < LogwrtResult.Write) - xlogctl->LogwrtRqst.Write = LogwrtResult.Write; - if (xlogctl->LogwrtRqst.Flush < LogwrtResult.Flush) - xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush; - SpinLockRelease(&xlogctl->info_lck); + /* Now loop back and try to acquire lock again. */ + } + + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&slot->mutex); + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + + /* + * Now okay to allow cancel/die interrupts. + */ + RESUME_INTERRUPTS(); +} + +/* + * Wake up all processes waiting for us with WaitOnSlot(). Sets our + * xlogInsertingAt value to EndPos, without releasing the slot. + */ +static void +WakeupWaiters(XLogRecPtr EndPos) +{ + volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[MySlotNo].slot; + PGPROC *head; + PGPROC *proc; + PGPROC *next; + + /* + * If we have already reported progress up to the same point, do nothing. + * No other process can modify xlogInsertingAt, so we can check this before + * grabbing the spinlock. + */ + if (slot->xlogInsertingAt == EndPos) + return; + /* xlogInsertingAt should not go backwards */ + Assert(slot->xlogInsertingAt < EndPos); + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&slot->mutex); + + /* we should own the slot */ + Assert(slot->exclusive == 1 && slot->owner == MyProc); + + slot->xlogInsertingAt = EndPos; + + /* + * See if there are any waiters that need to be woken up. + */ + head = slot->head; + + if (head != NULL) + { + proc = head; + + /* LW_WAIT_UNTIL_FREE waiters are always in the front of the queue */ + next = proc->lwWaitLink; + while (next && next->lwWaitMode == LW_WAIT_UNTIL_FREE) + { + proc = next; + next = next->lwWaitLink; } - LWLockRelease(WALWriteLock); + /* proc is now the last PGPROC to be released */ + slot->head = next; + proc->lwWaitLink = NULL; + } + + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&slot->mutex); + + /* + * Awaken any waiters I removed from the queue. + */ + while (head != NULL) + { + proc = head; + head = proc->lwWaitLink; + proc->lwWaitLink = NULL; + proc->lwWaiting = false; + PGSemaphoreUnlock(&proc->sem); + } +} + +/* + * Release our insertion slot (or slots, if we're holding them all). + */ +static void +WALInsertSlotRelease(void) +{ + int i; - updrqst = false; /* done already */ + if (holdingAllSlots) + { + for (i = 0; i < num_xloginsert_slots; i++) + WALInsertSlotReleaseOne(i); + holdingAllSlots = false; } else + WALInsertSlotReleaseOne(MySlotNo); +} + +static void +WALInsertSlotReleaseOne(int slotno) +{ + volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[slotno].slot; + PGPROC *head; + PGPROC *proc; + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&slot->mutex); + + /* we must be holding it */ + Assert(slot->exclusive == 1 && slot->owner == MyProc); + + slot->xlogInsertingAt = InvalidXLogRecPtr; + + /* Release my hold on the slot */ + slot->exclusive = 0; + slot->owner = NULL; + + /* + * See if I need to awaken any waiters.. + */ + head = slot->head; + if (head != NULL) { - /* normal case, ie not xlog switch */ + if (slot->releaseOK) + { + /* + * Remove the to-be-awakened PGPROCs from the queue. + */ + bool releaseOK = true; - /* Need to update shared LogwrtRqst if some block was filled up */ - if (freespace == 0) + proc = head; + + /* + * First wake up any backends that want to be woken up without + * acquiring the lock. These are always in the front of the queue. + */ + while (proc->lwWaitMode == LW_WAIT_UNTIL_FREE && proc->lwWaitLink) + proc = proc->lwWaitLink; + + /* + * Awaken the first exclusive-waiter, if any. + */ + if (proc->lwWaitLink) + { + Assert(proc->lwWaitLink->lwWaitMode == LW_EXCLUSIVE); + proc = proc->lwWaitLink; + releaseOK = false; + } + /* proc is now the last PGPROC to be released */ + slot->head = proc->lwWaitLink; + proc->lwWaitLink = NULL; + + slot->releaseOK = releaseOK; + } + else + head = NULL; + } + + /* We are done updating shared state of the slot itself. */ + SpinLockRelease(&slot->mutex); + + /* + * Awaken any waiters I removed from the queue. + */ + while (head != NULL) + { + proc = head; + head = proc->lwWaitLink; + proc->lwWaitLink = NULL; + proc->lwWaiting = false; + PGSemaphoreUnlock(&proc->sem); + } + + /* + * Now okay to allow cancel/die interrupts. + */ + END_CRIT_SECTION(); +} + + +/* + * Wait for any WAL insertions < upto to finish. + * + * Returns the location of the oldest insertion that is still in-progress. + * Any WAL prior to that point has been fully copied into WAL buffers, and + * can be flushed out to disk. Because this waits for any insertions older + * than 'upto' to finish, the return value is always >= 'upto'. + * + * Note: When you are about to write out WAL, you must call this function + * *before* acquiring WALWriteLock, to avoid deadlocks. This function might + * need to wait for an insertion to finish (or at least advance to next + * uninitialized page), and the inserter might need to evict an old WAL buffer + * to make room for a new one, which in turn requires WALWriteLock. + */ +static XLogRecPtr +WaitXLogInsertionsToFinish(XLogRecPtr upto) +{ + uint64 bytepos; + XLogRecPtr reservedUpto; + XLogRecPtr finishedUpto; + volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + int i; + + if (MyProc == NULL) + elog(PANIC, "cannot wait without a PGPROC structure"); + + /* Read the current insert position */ + SpinLockAcquire(&Insert->insertpos_lck); + bytepos = Insert->CurrBytePos; + SpinLockRelease(&Insert->insertpos_lck); + reservedUpto = XLogBytePosToEndRecPtr(bytepos); + + /* + * No-one should request to flush a piece of WAL that hasn't even been + * reserved yet. However, it can happen if there is a block with a bogus + * LSN on disk, for example. XLogFlush checks for that situation and + * complains, but only after the flush. Here we just assume that to mean + * that all WAL that has been reserved needs to be finished. In this + * corner-case, the return value can be smaller than 'upto' argument. + */ + if (upto > reservedUpto) + { + elog(LOG, "request to flush past end of generated WAL; request %X/%X, currpos %X/%X", + (uint32) (upto >> 32), (uint32) upto, + (uint32) (reservedUpto >> 32), (uint32) reservedUpto); + upto = reservedUpto; + } + + /* + * finishedUpto is our return value, indicating the point upto which + * all the WAL insertions have been finished. Initialize it to the head + * of reserved WAL, and as we iterate through the insertion slots, back it + * out for any insertion that's still in progress. + */ + finishedUpto = reservedUpto; + + /* + * Loop through all the slots, sleeping on any in-progress insert older + * than 'upto'. + */ + for (i = 0; i < num_xloginsert_slots; i++) + { + volatile XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot; + XLogRecPtr insertingat; + + retry: + /* + * We can check if the slot is in use without grabbing the spinlock. + * The spinlock acquisition of insertpos_lck before this loop acts + * as a memory barrier. If someone acquires the slot after that, it + * can't possibly be inserting to anything < reservedUpto. If it was + * acquired before that, an unlocked test will return true. + */ + if (!slot->exclusive) + continue; + + SpinLockAcquire(&slot->mutex); + /* re-check now that we have the lock */ + if (!slot->exclusive) { - /* curridx is filled and available for writing out */ - updrqst = true; + SpinLockRelease(&slot->mutex); + continue; + } + insertingat = slot->xlogInsertingAt; + SpinLockRelease(&slot->mutex); + + if (insertingat == InvalidXLogRecPtr) + { + /* + * slot is reserved just to hold off other inserters, there is no + * actual insert in progress. + */ + continue; + } + + /* + * This insertion is still in progress. Do we need to wait for it? + * + * When an inserter acquires a slot, it doesn't reset 'insertingat', so + * it will initially point to the old value of some already-finished + * insertion. The inserter will update the value as soon as it finishes + * the insertion, moves to the next page, or has to do I/O to flush an + * old dirty buffer. That means that when we see a slot with + * insertingat value < upto, we don't know if that insertion is still + * truly in progress, or if the slot is reused by a new inserter that + * hasn't updated the insertingat value yet. We have to assume it's the + * latter, and wait. + */ + if (insertingat < upto) + { + WaitOnSlot(slot, insertingat); + goto retry; } else { - /* if updrqst already set, write through end of previous buf */ - curridx = PrevBufIdx(curridx); + /* + * We don't need to wait for this insertion, but update the + * return value. + */ + if (insertingat < finishedUpto) + finishedUpto = insertingat; } - WriteRqst = XLogCtl->xlblocks[curridx]; } + return finishedUpto; +} - LWLockRelease(WALInsertLock); +/* + * Get a pointer to the right location in the WAL buffer containing the + * given XLogRecPtr. + * + * If the page is not initialized yet, it is initialized. That might require + * evicting an old dirty buffer from the buffer cache, which means I/O. + * + * The caller must ensure that the page containing the requested location + * isn't evicted yet, and won't be evicted. The way to ensure that is to + * hold onto an XLogInsertSlot with the xlogInsertingAt position set to + * something <= ptr. GetXLogBuffer() will update xlogInsertingAt if it needs + * to evict an old page from the buffer. (This means that once you call + * GetXLogBuffer() with a given 'ptr', you must not access anything before + * that point anymore, and must not call GetXLogBuffer() with an older 'ptr' + * later, because older buffers might be recycled already) + */ +static char * +GetXLogBuffer(XLogRecPtr ptr) +{ + int idx; + XLogRecPtr endptr; + static uint64 cachedPage = 0; + static char *cachedPos = NULL; + XLogRecPtr expectedEndPtr; - if (updrqst) + /* + * Fast path for the common case that we need to access again the same + * page as last time. + */ + if (ptr / XLOG_BLCKSZ == cachedPage) { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; + Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC); + Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ)); + return cachedPos + ptr % XLOG_BLCKSZ; + } - SpinLockAcquire(&xlogctl->info_lck); - /* advance global request to include new block(s) */ - if (xlogctl->LogwrtRqst.Write < WriteRqst) - xlogctl->LogwrtRqst.Write = WriteRqst; - /* update local result copy while I have the chance */ - LogwrtResult = xlogctl->LogwrtResult; - SpinLockRelease(&xlogctl->info_lck); + /* + * The XLog buffer cache is organized so that a page is always loaded + * to a particular buffer. That way we can easily calculate the buffer + * a given page must be loaded into, from the XLogRecPtr alone. + */ + idx = XLogRecPtrToBufIdx(ptr); + + /* + * See what page is loaded in the buffer at the moment. It could be the + * page we're looking for, or something older. It can't be anything newer + * - that would imply the page we're looking for has already been written + * out to disk and evicted, and the caller is responsible for making sure + * that doesn't happen. + * + * However, we don't hold a lock while we read the value. If someone has + * just initialized the page, it's possible that we get a "torn read" of + * the XLogRecPtr if 64-bit fetches are not atomic on this platform. In + * that case we will see a bogus value. That's ok, we'll grab the mapping + * lock (in AdvanceXLInsertBuffer) and retry if we see anything else than + * the page we're looking for. But it means that when we do this unlocked + * read, we might see a value that appears to be ahead of the page we're + * looking for. Don't PANIC on that, until we've verified the value while + * holding the lock. + */ + expectedEndPtr = ptr; + expectedEndPtr += XLOG_BLCKSZ - ptr % XLOG_BLCKSZ; + + endptr = XLogCtl->xlblocks[idx]; + if (expectedEndPtr != endptr) + { + /* + * Let others know that we're finished inserting the record up + * to the page boundary. + */ + WakeupWaiters(expectedEndPtr - XLOG_BLCKSZ); + + AdvanceXLInsertBuffer(ptr, false); + endptr = XLogCtl->xlblocks[idx]; + + if (expectedEndPtr != endptr) + elog(PANIC, "could not find WAL buffer for %X/%X", + (uint32) (ptr >> 32) , (uint32) ptr); + } + else + { + /* + * Make sure the initialization of the page is visible to us, and + * won't arrive later to overwrite the WAL data we write on the page. + */ + pg_memory_barrier(); + } + + /* + * Found the buffer holding this page. Return a pointer to the right + * offset within the page. + */ + cachedPage = ptr / XLOG_BLCKSZ; + cachedPos = XLogCtl->pages + idx * (Size) XLOG_BLCKSZ; + + Assert(((XLogPageHeader) cachedPos)->xlp_magic == XLOG_PAGE_MAGIC); + Assert(((XLogPageHeader) cachedPos)->xlp_pageaddr == ptr - (ptr % XLOG_BLCKSZ)); + + return cachedPos + ptr % XLOG_BLCKSZ; +} + +/* + * Converts a "usable byte position" to XLogRecPtr. A usable byte position + * is the position starting from the beginning of WAL, excluding all WAL + * page headers. + */ +static XLogRecPtr +XLogBytePosToRecPtr(uint64 bytepos) +{ + uint64 fullsegs; + uint64 fullpages; + uint64 bytesleft; + uint32 seg_offset; + XLogRecPtr result; + + fullsegs = bytepos / UsableBytesInSegment; + bytesleft = bytepos % UsableBytesInSegment; + + if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD) + { + /* fits on first page of segment */ + seg_offset = bytesleft + SizeOfXLogLongPHD; } + else + { + /* account for the first page on segment with long header */ + seg_offset = XLOG_BLCKSZ; + bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD; - XactLastRecEnd = RecPtr; + fullpages = bytesleft / UsableBytesInPage; + bytesleft = bytesleft % UsableBytesInPage; - END_CRIT_SECTION(); + seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD; + } - /* wake up walsenders now that we've released heavily contended locks */ - WalSndWakeupProcessRequests(); + XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result); - return RecPtr; + return result; +} + +/* + * Like XLogBytePosToRecPtr, but if the position is at a page boundary, + * returns a pointer to the beginning of the page (ie. before page header), + * not to where the first xlog record on that page would go to. This is used + * when converting a pointer to the end of a record. + */ +static XLogRecPtr +XLogBytePosToEndRecPtr(uint64 bytepos) +{ + uint64 fullsegs; + uint64 fullpages; + uint64 bytesleft; + uint32 seg_offset; + XLogRecPtr result; + + fullsegs = bytepos / UsableBytesInSegment; + bytesleft = bytepos % UsableBytesInSegment; + + if (bytesleft < XLOG_BLCKSZ - SizeOfXLogLongPHD) + { + /* fits on first page of segment */ + if (bytesleft == 0) + seg_offset = 0; + else + seg_offset = bytesleft + SizeOfXLogLongPHD; + } + else + { + /* account for the first page on segment with long header */ + seg_offset = XLOG_BLCKSZ; + bytesleft -= XLOG_BLCKSZ - SizeOfXLogLongPHD; + + fullpages = bytesleft / UsableBytesInPage; + bytesleft = bytesleft % UsableBytesInPage; + + if (bytesleft == 0) + seg_offset += fullpages * XLOG_BLCKSZ + bytesleft; + else + seg_offset += fullpages * XLOG_BLCKSZ + bytesleft + SizeOfXLogShortPHD; + } + + XLogSegNoOffsetToRecPtr(fullsegs, seg_offset, result); + + return result; +} + +/* + * Convert an XLogRecPtr to a "usable byte position". + */ +static uint64 +XLogRecPtrToBytePos(XLogRecPtr ptr) +{ + uint64 fullsegs; + uint32 fullpages; + uint32 offset; + uint64 result; + + XLByteToSeg(ptr, fullsegs); + + fullpages = (ptr % XLOG_SEG_SIZE) / XLOG_BLCKSZ; + offset = ptr % XLOG_BLCKSZ; + + if (fullpages == 0) + { + result = fullsegs * UsableBytesInSegment; + if (offset > 0) + { + Assert(offset >= SizeOfXLogLongPHD); + result += offset - SizeOfXLogLongPHD; + } + } + else + { + result = fullsegs * UsableBytesInSegment + + (XLOG_BLCKSZ - SizeOfXLogLongPHD) + /* account for first page */ + (fullpages - 1) * UsableBytesInPage; /* full pages */ + if (offset > 0) + { + Assert(offset >= SizeOfXLogShortPHD); + result += offset - SizeOfXLogShortPHD; + } + } + + return result; } /* @@ -1303,158 +2385,181 @@ XLogCheckBuffer(XLogRecData *rdata, bool holdsExclusiveLock, } /* - * Advance the Insert state to the next buffer page, writing out the next - * buffer if it still contains unwritten data. - * - * If new_segment is TRUE then we set up the next buffer page as the first - * page of the next xlog segment file, possibly but not usually the next - * consecutive file page. - * - * The global LogwrtRqst.Write pointer needs to be advanced to include the - * just-filled page. If we can do this for free (without an extra lock), - * we do so here. Otherwise the caller must do it. We return TRUE if the - * request update still needs to be done, FALSE if we did it internally. - * - * Must be called with WALInsertLock held. + * Initialize XLOG buffers, writing out old buffers if they still contain + * unwritten data, upto the page containing 'upto'. Or if 'opportunistic' is + * true, initialize as many pages as we can without having to write out + * unwritten data. Any new pages are initialized to zeros, with pages headers + * initialized properly. */ -static bool -AdvanceXLInsertBuffer(bool new_segment) +static void +AdvanceXLInsertBuffer(XLogRecPtr upto, bool opportunistic) { XLogCtlInsert *Insert = &XLogCtl->Insert; - int nextidx = NextBufIdx(Insert->curridx); - bool update_needed = true; + int nextidx; XLogRecPtr OldPageRqstPtr; XLogwrtRqst WriteRqst; - XLogRecPtr NewPageEndPtr; + XLogRecPtr NewPageEndPtr = InvalidXLogRecPtr; XLogRecPtr NewPageBeginPtr; XLogPageHeader NewPage; + int npages = 0; + + LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); /* - * Get ending-offset of the buffer page we need to replace (this may be - * zero if the buffer hasn't been used yet). Fall through if it's already - * written out. + * Now that we have the lock, check if someone initialized the page + * already. */ - OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; - if (LogwrtResult.Write < OldPageRqstPtr) + while (upto >= XLogCtl->xlblocks[XLogCtl->curridx] || opportunistic) { - /* nope, got work to do... */ - XLogRecPtr FinishedPageRqstPtr; - - FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx]; - - /* Before waiting, get info_lck and update LogwrtResult */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - if (xlogctl->LogwrtRqst.Write < FinishedPageRqstPtr) - xlogctl->LogwrtRqst.Write = FinishedPageRqstPtr; - LogwrtResult = xlogctl->LogwrtResult; - SpinLockRelease(&xlogctl->info_lck); - } - - update_needed = false; /* Did the shared-request update */ + nextidx = NextBufIdx(XLogCtl->curridx); /* - * Now that we have an up-to-date LogwrtResult value, see if we still - * need to write it or if someone else already did. + * Get ending-offset of the buffer page we need to replace (this may + * be zero if the buffer hasn't been used yet). Fall through if it's + * already written out. */ + OldPageRqstPtr = XLogCtl->xlblocks[nextidx]; if (LogwrtResult.Write < OldPageRqstPtr) { - /* Must acquire write lock */ - LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); - LogwrtResult = XLogCtl->LogwrtResult; - if (LogwrtResult.Write >= OldPageRqstPtr) + /* + * Nope, got work to do. If we just want to pre-initialize as much + * as we can without flushing, give up now. + */ + if (opportunistic) + break; + + /* Before waiting, get info_lck and update LogwrtResult */ { - /* OK, someone wrote it already */ - LWLockRelease(WALWriteLock); + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + + SpinLockAcquire(&xlogctl->info_lck); + if (xlogctl->LogwrtRqst.Write < OldPageRqstPtr) + xlogctl->LogwrtRqst.Write = OldPageRqstPtr; + LogwrtResult = xlogctl->LogwrtResult; + SpinLockRelease(&xlogctl->info_lck); } - else + + /* + * Now that we have an up-to-date LogwrtResult value, see if we + * still need to write it or if someone else already did. + */ + if (LogwrtResult.Write < OldPageRqstPtr) { /* - * Have to write buffers while holding insert lock. This is - * not good, so only write as much as we absolutely must. + * Must acquire write lock. Release WALBufMappingLock first, + * to make sure that all insertions that we need to wait for + * can finish (up to this same position). Otherwise we risk + * deadlock. */ - TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START(); - WriteRqst.Write = OldPageRqstPtr; - WriteRqst.Flush = 0; - XLogWrite(WriteRqst, false, false); - LWLockRelease(WALWriteLock); - TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE(); + LWLockRelease(WALBufMappingLock); + + WaitXLogInsertionsToFinish(OldPageRqstPtr); + + LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); + + LogwrtResult = XLogCtl->LogwrtResult; + if (LogwrtResult.Write >= OldPageRqstPtr) + { + /* OK, someone wrote it already */ + LWLockRelease(WALWriteLock); + } + else + { + /* Have to write it ourselves */ + TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_START(); + WriteRqst.Write = OldPageRqstPtr; + WriteRqst.Flush = 0; + XLogWrite(WriteRqst, false); + LWLockRelease(WALWriteLock); + TRACE_POSTGRESQL_WAL_BUFFER_WRITE_DIRTY_DONE(); + } + /* Re-acquire WALBufMappingLock and retry */ + LWLockAcquire(WALBufMappingLock, LW_EXCLUSIVE); + continue; } } - } - /* - * Now the next buffer slot is free and we can set it up to be the next - * output page. - */ - NewPageBeginPtr = XLogCtl->xlblocks[Insert->curridx]; + /* + * Now the next buffer slot is free and we can set it up to be the next + * output page. + */ + NewPageBeginPtr = XLogCtl->xlblocks[XLogCtl->curridx]; + NewPageEndPtr = NewPageBeginPtr + XLOG_BLCKSZ; - if (new_segment) - { - /* force it to a segment start point */ - if (NewPageBeginPtr % XLogSegSize != 0) - NewPageBeginPtr += XLogSegSize - NewPageBeginPtr % XLogSegSize; - } + Assert(NewPageEndPtr % XLOG_BLCKSZ == 0); + Assert(XLogRecEndPtrToBufIdx(NewPageEndPtr) == nextidx); + Assert(XLogRecPtrToBufIdx(NewPageBeginPtr) == nextidx); - NewPageEndPtr = NewPageBeginPtr; - NewPageEndPtr += XLOG_BLCKSZ; - XLogCtl->xlblocks[nextidx] = NewPageEndPtr; - NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); + NewPage = (XLogPageHeader) (XLogCtl->pages + nextidx * (Size) XLOG_BLCKSZ); - Insert->curridx = nextidx; - Insert->currpage = NewPage; + /* + * Be sure to re-zero the buffer so that bytes beyond what we've + * written will look like zeroes and not valid XLOG records... + */ + MemSet((char *) NewPage, 0, XLOG_BLCKSZ); - Insert->currpos = ((char *) NewPage) +SizeOfXLogShortPHD; + /* + * Fill the new page's header + */ + NewPage ->xlp_magic = XLOG_PAGE_MAGIC; - /* - * Be sure to re-zero the buffer so that bytes beyond what we've written - * will look like zeroes and not valid XLOG records... - */ - MemSet((char *) NewPage, 0, XLOG_BLCKSZ); + /* NewPage->xlp_info = 0; */ /* done by memset */ + NewPage ->xlp_tli = ThisTimeLineID; + NewPage ->xlp_pageaddr = NewPageBeginPtr; + /* NewPage->xlp_rem_len = 0; */ /* done by memset */ - /* - * Fill the new page's header - */ - NewPage ->xlp_magic = XLOG_PAGE_MAGIC; + /* + * If online backup is not in progress, mark the header to indicate + * that* WAL records beginning in this page have removable backup + * blocks. This allows the WAL archiver to know whether it is safe to + * compress archived WAL data by transforming full-block records into + * the non-full-block format. It is sufficient to record this at the + * page level because we force a page switch (in fact a segment switch) + * when starting a backup, so the flag will be off before any records + * can be written during the backup. At the end of a backup, the last + * page will be marked as all unsafe when perhaps only part is unsafe, + * but at worst the archiver would miss the opportunity to compress a + * few records. + */ + if (!Insert->forcePageWrites) + NewPage ->xlp_info |= XLP_BKP_REMOVABLE; - /* NewPage->xlp_info = 0; */ /* done by memset */ - NewPage ->xlp_tli = ThisTimeLineID; - NewPage ->xlp_pageaddr = NewPageBeginPtr; + /* + * If first page of an XLOG segment file, make it a long header. + */ + if ((NewPage->xlp_pageaddr % XLogSegSize) == 0) + { + XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; - /* - * If online backup is not in progress, mark the header to indicate that - * WAL records beginning in this page have removable backup blocks. This - * allows the WAL archiver to know whether it is safe to compress archived - * WAL data by transforming full-block records into the non-full-block - * format. It is sufficient to record this at the page level because we - * force a page switch (in fact a segment switch) when starting a backup, - * so the flag will be off before any records can be written during the - * backup. At the end of a backup, the last page will be marked as all - * unsafe when perhaps only part is unsafe, but at worst the archiver - * would miss the opportunity to compress a few records. - */ - if (!Insert->forcePageWrites) - NewPage ->xlp_info |= XLP_BKP_REMOVABLE; + NewLongPage->xlp_sysid = ControlFile->system_identifier; + NewLongPage->xlp_seg_size = XLogSegSize; + NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ; + NewPage ->xlp_info |= XLP_LONG_HEADER; + } - /* - * If first page of an XLOG segment file, make it a long header. - */ - if ((NewPage->xlp_pageaddr % XLogSegSize) == 0) - { - XLogLongPageHeader NewLongPage = (XLogLongPageHeader) NewPage; + /* + * Make sure the initialization of the page becomes visible to others + * before the xlblocks update. GetXLogBuffer() reads xlblocks without + * holding a lock. + */ + pg_write_barrier(); + + *((volatile XLogRecPtr *) &XLogCtl->xlblocks[nextidx]) = NewPageEndPtr; - NewLongPage->xlp_sysid = ControlFile->system_identifier; - NewLongPage->xlp_seg_size = XLogSegSize; - NewLongPage->xlp_xlog_blcksz = XLOG_BLCKSZ; - NewPage ->xlp_info |= XLP_LONG_HEADER; + XLogCtl->curridx = nextidx; - Insert->currpos = ((char *) NewPage) +SizeOfXLogLongPHD; + npages++; } + LWLockRelease(WALBufMappingLock); - return update_needed; +#ifdef WAL_DEBUG + if (npages > 0) + { + elog(DEBUG1, "initialized %d pages, upto %X/%X", + npages, (uint32) (NewPageEndPtr >> 32), (uint32) NewPageEndPtr); + } +#endif } /* @@ -1486,16 +2591,12 @@ XLogCheckpointNeeded(XLogSegNo new_segno) * This option allows us to avoid uselessly issuing multiple writes when a * single one would do. * - * If xlog_switch == TRUE, we are intending an xlog segment switch, so - * perform end-of-segment actions after writing the last page, even if - * it's not physically the end of its segment. (NB: this will work properly - * only if caller specifies WriteRqst == page-end and flexible == false, - * and there is some data to write.) - * - * Must be called with WALWriteLock held. + * Must be called with WALWriteLock held. WaitXLogInsertionsToFinish(WriteRqst) + * must be called before grabbing the lock, to make sure the data is ready to + * write. */ static void -XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) +XLogWrite(XLogwrtRqst WriteRqst, bool flexible) { XLogCtlWrite *Write = &XLogCtl->Write; bool ispartialpage; @@ -1544,15 +2645,15 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) * if we're passed a bogus WriteRqst.Write that is past the end of the * last page that's been initialized by AdvanceXLInsertBuffer. */ - if (LogwrtResult.Write >= XLogCtl->xlblocks[curridx]) + XLogRecPtr EndPtr = XLogCtl->xlblocks[curridx]; + if (LogwrtResult.Write >= EndPtr) elog(PANIC, "xlog write request %X/%X is past end of log %X/%X", (uint32) (LogwrtResult.Write >> 32), (uint32) LogwrtResult.Write, - (uint32) (XLogCtl->xlblocks[curridx] >> 32), - (uint32) XLogCtl->xlblocks[curridx]); + (uint32) (EndPtr >> 32), (uint32) EndPtr); /* Advance LogwrtResult.Write to end of current buffer page */ - LogwrtResult.Write = XLogCtl->xlblocks[curridx]; + LogwrtResult.Write = EndPtr; ispartialpage = WriteRqst.Write < LogwrtResult.Write; if (!XLByteInPrevSeg(LogwrtResult.Write, openLogSegNo)) @@ -1656,16 +2757,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) * later. Doing it here ensures that one and only one backend will * perform this fsync. * - * We also do this if this is the last page written for an xlog - * switch. - * * This is also the right place to notify the Archiver that the * segment is ready to copy to archival storage, and to update the * timer for archive_timeout, and to signal for a checkpoint if * too many logfile segments have been used since the last * checkpoint. */ - if (finishing_seg || (xlog_switch && last_iteration)) + if (finishing_seg) { issue_xlog_fsync(openLogFile, openLogSegNo); @@ -1949,6 +3047,7 @@ XLogFlush(XLogRecPtr record) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr insertpos; /* read LogwrtResult and update local state */ SpinLockAcquire(&xlogctl->info_lck); @@ -1962,6 +3061,12 @@ XLogFlush(XLogRecPtr record) break; /* + * Before actually performing the write, wait for all in-flight + * insertions to the pages we're about to write to finish. + */ + insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr); + + /* * Try to get the write lock. If we can't get it immediately, wait * until it's released, and recheck if we still need to do the flush * or if the backend that held the lock did it for us already. This @@ -1997,31 +3102,27 @@ XLogFlush(XLogRecPtr record) */ if (CommitDelay > 0 && enableFsync && MinimumActiveBackends(CommitSiblings)) + { pg_usleep(CommitDelay); + /* + * Re-check how far we can now flush the WAL. It's generally not + * safe to call WaitXLogInsetionsToFinish while holding + * WALWriteLock, because an in-progress insertion might need to + * also grab WALWriteLock to make progress. But we know that all + * the insertions up to insertpos have already finished, because + * that's what the earlier WaitXLogInsertionsToFinish() returned. + * We're only calling it again to allow insertpos to be moved + * further forward, not to actually wait for anyone. + */ + insertpos = WaitXLogInsertionsToFinish(insertpos); + } + /* try to write/flush later additions to XLOG as well */ - if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE)) - { - XLogCtlInsert *Insert = &XLogCtl->Insert; - uint32 freespace = INSERT_FREESPACE(Insert); + WriteRqst.Write = insertpos; + WriteRqst.Flush = insertpos; - if (freespace == 0) /* buffer is full */ - WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; - else - { - WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; - WriteRqstPtr -= freespace; - } - LWLockRelease(WALInsertLock); - WriteRqst.Write = WriteRqstPtr; - WriteRqst.Flush = WriteRqstPtr; - } - else - { - WriteRqst.Write = WriteRqstPtr; - WriteRqst.Flush = record; - } - XLogWrite(WriteRqst, false, false); + XLogWrite(WriteRqst, false); LWLockRelease(WALWriteLock); /* done */ @@ -2142,7 +3243,8 @@ XLogBackgroundFlush(void) START_CRIT_SECTION(); - /* now wait for the write lock */ + /* now wait for any in-progress insertions to finish and get write lock */ + WaitXLogInsertionsToFinish(WriteRqstPtr); LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); LogwrtResult = XLogCtl->LogwrtResult; if (WriteRqstPtr > LogwrtResult.Flush) @@ -2151,7 +3253,7 @@ XLogBackgroundFlush(void) WriteRqst.Write = WriteRqstPtr; WriteRqst.Flush = WriteRqstPtr; - XLogWrite(WriteRqst, flexible, false); + XLogWrite(WriteRqst, flexible); wrote_something = true; } LWLockRelease(WALWriteLock); @@ -2161,6 +3263,12 @@ XLogBackgroundFlush(void) /* wake up walsenders now that we've released heavily contended locks */ WalSndWakeupProcessRequests(); + /* + * Great, done. To take some work off the critical path, try to initialize + * as many of the no-longer-needed WAL buffers for future use as we can. + */ + AdvanceXLInsertBuffer(InvalidXLogRecPtr, true); + return wrote_something; } @@ -3937,10 +5045,13 @@ XLOGShmemSize(void) /* XLogCtl */ size = sizeof(XLogCtlData); + + /* xlog insertion slots, plus alignment */ + size = add_size(size, mul_size(sizeof(XLogInsertSlotPadded), num_xloginsert_slots + 1)); /* xlblocks array */ size = add_size(size, mul_size(sizeof(XLogRecPtr), XLOGbuffers)); /* extra alignment padding for XLOG I/O buffers */ - size = add_size(size, ALIGNOF_XLOG_BUFFER); + size = add_size(size, XLOG_BLCKSZ); /* and the buffers themselves */ size = add_size(size, mul_size(XLOG_BLCKSZ, XLOGbuffers)); @@ -3959,11 +5070,11 @@ XLOGShmemInit(void) bool foundCFile, foundXLog; char *allocptr; + int i; ControlFile = (ControlFileData *) ShmemInitStruct("Control File", sizeof(ControlFileData), &foundCFile); - XLogCtl = (XLogCtlData *) - ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog); + allocptr = ShmemInitStruct("XLOG Ctl", XLOGShmemSize(), &foundXLog); if (foundCFile || foundXLog) { @@ -3971,7 +5082,7 @@ XLOGShmemInit(void) Assert(foundCFile && foundXLog); return; } - + XLogCtl = (XLogCtlData *) allocptr; memset(XLogCtl, 0, sizeof(XLogCtlData)); /* @@ -3979,15 +5090,23 @@ XLOGShmemInit(void) * multiple of the alignment for same, so no extra alignment padding is * needed here. */ - allocptr = ((char *) XLogCtl) + sizeof(XLogCtlData); + allocptr += sizeof(XLogCtlData); XLogCtl->xlblocks = (XLogRecPtr *) allocptr; memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers); allocptr += sizeof(XLogRecPtr) * XLOGbuffers; + /* Xlog insertion slots. Ensure they're aligned to the full padded size */ + allocptr += sizeof(XLogInsertSlotPadded) - + ((uintptr_t) allocptr) % sizeof(XLogInsertSlotPadded); + XLogCtl->Insert.insertSlots = (XLogInsertSlotPadded *) allocptr; + allocptr += sizeof(XLogInsertSlotPadded) * num_xloginsert_slots; + /* - * Align the start of the page buffers to an ALIGNOF_XLOG_BUFFER boundary. + * Align the start of the page buffers to a full xlog block size boundary. + * This simplifies some calculations in XLOG insertion. It is also required + * for O_DIRECT. */ - allocptr = (char *) TYPEALIGN(ALIGNOF_XLOG_BUFFER, allocptr); + allocptr = (char *) TYPEALIGN(XLOG_BLCKSZ, allocptr); XLogCtl->pages = allocptr; memset(XLogCtl->pages, 0, (Size) XLOG_BLCKSZ * XLOGbuffers); @@ -3999,7 +5118,21 @@ XLOGShmemInit(void) XLogCtl->SharedRecoveryInProgress = true; XLogCtl->SharedHotStandbyActive = false; XLogCtl->WalWriterSleeping = false; - XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); + + for (i = 0; i < num_xloginsert_slots; i++) + { + XLogInsertSlot *slot = &XLogCtl->Insert.insertSlots[i].slot; + SpinLockInit(&slot->mutex); + slot->xlogInsertingAt = InvalidXLogRecPtr; + slot->owner = NULL; + + slot->releaseOK = true; + slot->exclusive = 0; + slot->head = NULL; + slot->tail = NULL; + } + + SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); SpinLockInit(&XLogCtl->ulsn_lck); InitSharedLatch(&XLogCtl->recoveryWakeupLatch); @@ -4050,8 +5183,8 @@ BootStrapXLOG(void) ThisTimeLineID = 1; /* page buffer must be aligned suitably for O_DIRECT */ - buffer = (char *) palloc(XLOG_BLCKSZ + ALIGNOF_XLOG_BUFFER); - page = (XLogPageHeader) TYPEALIGN(ALIGNOF_XLOG_BUFFER, buffer); + buffer = (char *) palloc(XLOG_BLCKSZ + XLOG_BLCKSZ); + page = (XLogPageHeader) TYPEALIGN(XLOG_BLCKSZ, buffer); memset(page, 0, XLOG_BLCKSZ); /* @@ -4893,6 +6026,7 @@ StartupXLOG(void) bool backupEndRequired = false; bool backupFromStandby = false; DBState dbstate_at_startup; + int firstIdx; XLogReaderState *xlogreader; XLogPageReadPrivate private; bool fast_promoted = false; @@ -5257,7 +6391,7 @@ StartupXLOG(void) lastFullPageWrites = checkPoint.fullPageWrites; - RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; + RedoRecPtr = XLogCtl->RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo; if (RecPtr < checkPoint.redo) ereport(PANIC, @@ -5899,25 +7033,21 @@ StartupXLOG(void) openLogFile = XLogFileOpen(openLogSegNo); openLogOff = 0; Insert = &XLogCtl->Insert; - Insert->PrevRecord = LastRec; - XLogCtl->xlblocks[0] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ; + Insert->PrevBytePos = XLogRecPtrToBytePos(LastRec); + + firstIdx = XLogRecEndPtrToBufIdx(EndOfLog); + XLogCtl->curridx = firstIdx; + + XLogCtl->xlblocks[firstIdx] = ((EndOfLog - 1) / XLOG_BLCKSZ + 1) * XLOG_BLCKSZ; /* * Tricky point here: readBuf contains the *last* block that the LastRec * record spans, not the one it starts in. The last block is indeed the * one we want to use. */ - if (EndOfLog % XLOG_BLCKSZ == 0) - { - memset(Insert->currpage, 0, XLOG_BLCKSZ); - } - else - { - Assert(readOff == (XLogCtl->xlblocks[0] - XLOG_BLCKSZ) % XLogSegSize); - memcpy((char *) Insert->currpage, xlogreader->readBuf, XLOG_BLCKSZ); - } - Insert->currpos = (char *) Insert->currpage + - (EndOfLog + XLOG_BLCKSZ - XLogCtl->xlblocks[0]); + Assert(readOff == (XLogCtl->xlblocks[firstIdx] - XLOG_BLCKSZ) % XLogSegSize); + memcpy((char *) &XLogCtl->pages[firstIdx * XLOG_BLCKSZ], xlogreader->readBuf, XLOG_BLCKSZ); + Insert->CurrBytePos = XLogRecPtrToBytePos(EndOfLog); LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; @@ -5926,12 +7056,12 @@ StartupXLOG(void) XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; - freespace = INSERT_FREESPACE(Insert); + freespace = INSERT_FREESPACE(EndOfLog); if (freespace > 0) { /* Make sure rest of page is zero */ - MemSet(Insert->currpos, 0, freespace); - XLogCtl->Write.curridx = 0; + MemSet(&XLogCtl->pages[firstIdx * XLOG_BLCKSZ] + EndOfLog % XLOG_BLCKSZ, 0, freespace); + XLogCtl->Write.curridx = firstIdx; } else { @@ -5943,7 +7073,7 @@ StartupXLOG(void) * this is sufficient. The first actual attempt to insert a log * record will advance the insert state. */ - XLogCtl->Write.curridx = NextBufIdx(0); + XLogCtl->Write.curridx = NextBufIdx(firstIdx); } /* Pre-scan prepared transactions to find out the range of XIDs present */ @@ -6504,21 +7634,29 @@ InitXLOGAccess(void) } /* - * Once spawned, a backend may update its local RedoRecPtr from - * XLogCtl->Insert.RedoRecPtr; it must hold the insert lock or info_lck - * to do so. This is done in XLogInsert() or GetRedoRecPtr(). + * Return the current Redo pointer from shared memory. + * + * As a side-effect, the local RedoRecPtr copy is updated. */ XLogRecPtr GetRedoRecPtr(void) { /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr ptr; + /* + * The possibly not up-to-date copy in XlogCtl is enough. Even if we + * grabbed a WAL insertion slot to read the master copy, someone might + * update it just after we've released the lock. + */ SpinLockAcquire(&xlogctl->info_lck); - Assert(RedoRecPtr <= xlogctl->Insert.RedoRecPtr); - RedoRecPtr = xlogctl->Insert.RedoRecPtr; + ptr = xlogctl->RedoRecPtr; SpinLockRelease(&xlogctl->info_lck); + if (RedoRecPtr < ptr) + RedoRecPtr = ptr; + return RedoRecPtr; } @@ -6527,9 +7665,8 @@ GetRedoRecPtr(void) * * NOTE: The value *actually* returned is the position of the last full * xlog page. It lags behind the real insert position by at most 1 page. - * For that, we don't need to acquire WALInsertLock which can be quite - * heavily contended, and an approximation is enough for the current - * usage of this function. + * For that, we don't need to scan through WAL insertion slots, and an + * approximation is enough for the current usage of this function. */ XLogRecPtr GetInsertRecPtr(void) @@ -6806,6 +7943,8 @@ LogCheckpointEnd(bool restartpoint) void CreateCheckPoint(int flags) { + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; bool shutdown; CheckPoint checkPoint; XLogRecPtr recptr; @@ -6813,6 +7952,7 @@ CreateCheckPoint(int flags) XLogRecData rdata; uint32 freespace; XLogSegNo _logSegNo; + XLogRecPtr curInsert; VirtualTransactionId *vxids; int nvxids; @@ -6883,10 +8023,11 @@ CreateCheckPoint(int flags) checkPoint.oldestActiveXid = InvalidTransactionId; /* - * We must hold WALInsertLock while examining insert state to determine - * the checkpoint REDO pointer. + * We must block concurrent insertions while examining insert state to + * determine the checkpoint REDO pointer. */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); + curInsert = XLogBytePosToRecPtr(Insert->CurrBytePos); /* * If this isn't a shutdown or forced checkpoint, and we have not inserted @@ -6906,14 +8047,11 @@ CreateCheckPoint(int flags) if ((flags & (CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_END_OF_RECOVERY | CHECKPOINT_FORCE)) == 0) { - XLogRecPtr curInsert; - - INSERT_RECPTR(curInsert, Insert, Insert->curridx); if (curInsert == ControlFile->checkPoint + MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) && ControlFile->checkPoint == ControlFile->checkPointCopy.redo) { - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); LWLockRelease(CheckpointLock); END_CRIT_SECTION(); return; @@ -6945,18 +8083,19 @@ CreateCheckPoint(int flags) * the buffer flush work. Those XLOG records are logically after the * checkpoint, even though physically before it. Got that? */ - freespace = INSERT_FREESPACE(Insert); + freespace = INSERT_FREESPACE(curInsert); if (freespace == 0) { - (void) AdvanceXLInsertBuffer(false); - /* OK to ignore update return flag, since we will do flush anyway */ - freespace = INSERT_FREESPACE(Insert); + if (curInsert % XLogSegSize == 0) + curInsert += SizeOfXLogLongPHD; + else + curInsert += SizeOfXLogShortPHD; } - INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx); + checkPoint.redo = curInsert; /* * Here we update the shared RedoRecPtr for future XLogInsert calls; this - * must be done while holding the insert lock AND the info_lck. + * must be done while holding the insertion slots. * * Note: if we fail to complete the checkpoint, RedoRecPtr will be left * pointing past where it really needs to point. This is okay; the only @@ -6965,20 +8104,18 @@ CreateCheckPoint(int flags) * XLogInserts that happen while we are dumping buffers must assume that * their buffer changes are not included in the checkpoint. */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile XLogCtlData *xlogctl = XLogCtl; - - SpinLockAcquire(&xlogctl->info_lck); - RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo; - SpinLockRelease(&xlogctl->info_lck); - } + RedoRecPtr = xlogctl->Insert.RedoRecPtr = checkPoint.redo; /* - * Now we can release WAL insert lock, allowing other xacts to proceed - * while we are flushing disk buffers. + * Now we can release the WAL insertion slots, allowing other xacts to + * proceed while we are flushing disk buffers. */ - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); + + /* Update the info_lck-protected copy of RedoRecPtr as well */ + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->RedoRecPtr = checkPoint.redo; + SpinLockRelease(&xlogctl->info_lck); /* * If enabled, log checkpoint start. We postpone this until now so as not @@ -7003,10 +8140,11 @@ CreateCheckPoint(int flags) * we wait till he's out of his commit critical section before proceeding. * See notes in RecordTransactionCommit(). * - * Because we've already released WALInsertLock, this test is a bit fuzzy: - * it is possible that we will wait for xacts we didn't really need to - * wait for. But the delay should be short and it seems better to make - * checkpoint take a bit longer than to hold locks longer than necessary. + * Because we've already released the insertion slots, this test is a bit + * fuzzy: it is possible that we will wait for xacts we didn't really need + * to wait for. But the delay should be short and it seems better to make + * checkpoint take a bit longer than to hold off insertions longer than + * necessary. * (In fact, the whole reason we have this issue is that xact.c does * commit record XLOG insertion and clog update as two separate steps * protected by different locks, but again that seems best on grounds of @@ -7233,10 +8371,10 @@ CreateEndOfRecoveryRecord(void) xlrec.end_time = time(NULL); - LWLockAcquire(WALInsertLock, LW_SHARED); + WALInsertSlotAcquire(true); xlrec.ThisTimeLineID = ThisTimeLineID; xlrec.PrevTimeLineID = XLogCtl->PrevTimeLineID; - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); LocalSetXLogInsertAllowed(); @@ -7437,15 +8575,18 @@ CreateRestartPoint(int flags) * the number of segments replayed since last restartpoint, and request a * restartpoint if it exceeds checkpoint_segments. * - * You need to hold WALInsertLock and info_lck to update it, although - * during recovery acquiring WALInsertLock is just pro forma, because - * there is no other processes updating Insert.RedoRecPtr. + * Like in CreateCheckPoint(), hold off insertions to update it, although + * during recovery this is just pro forma, because no WAL insertions are + * happening. */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); - SpinLockAcquire(&xlogctl->info_lck); + WALInsertSlotAcquire(true); xlogctl->Insert.RedoRecPtr = lastCheckPoint.redo; + WALInsertSlotRelease(); + + /* Also update the info_lck-protected copy */ + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->RedoRecPtr = lastCheckPoint.redo; SpinLockRelease(&xlogctl->info_lck); - LWLockRelease(WALInsertLock); /* * Prepare to accumulate statistics. @@ -7863,9 +9004,9 @@ UpdateFullPageWrites(void) */ if (fullPageWrites) { - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); Insert->fullPageWrites = true; - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); } /* @@ -7886,9 +9027,9 @@ UpdateFullPageWrites(void) if (!fullPageWrites) { - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); Insert->fullPageWrites = false; - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); } END_CRIT_SECTION(); } @@ -8520,15 +9661,15 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, * Note that forcePageWrites has no effect during an online backup from * the standby. * - * We must hold WALInsertLock to change the value of forcePageWrites, to - * ensure adequate interlocking against XLogInsert(). + * We must hold all the insertion slots to change the value of + * forcePageWrites, to ensure adequate interlocking against XLogInsert(). */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); if (exclusive) { if (XLogCtl->Insert.exclusiveBackup) { - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("a backup is already in progress"), @@ -8539,7 +9680,7 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, else XLogCtl->Insert.nonExclusiveBackups++; XLogCtl->Insert.forcePageWrites = true; - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); /* Ensure we release forcePageWrites if fail below */ PG_ENSURE_ERROR_CLEANUP(pg_start_backup_callback, (Datum) BoolGetDatum(exclusive)); @@ -8654,13 +9795,13 @@ do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p, * taking a checkpoint right after another is not that expensive * either because only few buffers have been dirtied yet. */ - LWLockAcquire(WALInsertLock, LW_SHARED); + WALInsertSlotAcquire(true); if (XLogCtl->Insert.lastBackupStart < startpoint) { XLogCtl->Insert.lastBackupStart = startpoint; gotUniqueStartpoint = true; } - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); } while (!gotUniqueStartpoint); XLByteToSeg(startpoint, _logSegNo); @@ -8750,7 +9891,7 @@ pg_start_backup_callback(int code, Datum arg) bool exclusive = DatumGetBool(arg); /* Update backup counters and forcePageWrites on failure */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); if (exclusive) { Assert(XLogCtl->Insert.exclusiveBackup); @@ -8767,7 +9908,7 @@ pg_start_backup_callback(int code, Datum arg) { XLogCtl->Insert.forcePageWrites = false; } - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); } /* @@ -8838,7 +9979,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) /* * OK to update backup counters and forcePageWrites */ - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); if (exclusive) XLogCtl->Insert.exclusiveBackup = false; else @@ -8858,7 +9999,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) { XLogCtl->Insert.forcePageWrites = false; } - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); if (exclusive) { @@ -9143,7 +10284,7 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) void do_pg_abort_backup(void) { - LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + WALInsertSlotAcquire(true); Assert(XLogCtl->Insert.nonExclusiveBackups > 0); XLogCtl->Insert.nonExclusiveBackups--; @@ -9152,7 +10293,7 @@ do_pg_abort_backup(void) { XLogCtl->Insert.forcePageWrites = false; } - LWLockRelease(WALInsertLock); + WALInsertSlotRelease(); } /* @@ -9184,14 +10325,14 @@ GetXLogReplayRecPtr(TimeLineID *replayTLI) XLogRecPtr GetXLogInsertRecPtr(void) { - XLogCtlInsert *Insert = &XLogCtl->Insert; - XLogRecPtr current_recptr; + volatile XLogCtlInsert *Insert = &XLogCtl->Insert; + uint64 current_bytepos; - LWLockAcquire(WALInsertLock, LW_SHARED); - INSERT_RECPTR(current_recptr, Insert, Insert->curridx); - LWLockRelease(WALInsertLock); + SpinLockAcquire(&Insert->insertpos_lck); + current_bytepos = Insert->CurrBytePos; + SpinLockRelease(&Insert->insertpos_lck); - return current_recptr; + return XLogBytePosToRecPtr(current_bytepos); } /* |