diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 107 |
1 files changed, 59 insertions, 48 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3bdd9a2ddd3..b4499cda7b3 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -292,12 +292,7 @@ static bool doPageWrites; * LogwrtRqst indicates a byte position that we need to write and/or fsync * the log up to (all records before that point must be written or fsynced). * The positions already written/fsynced are maintained in logWriteResult - * and logFlushResult. - * - * To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either - * info_lck or WALWriteLock. To update them, you need to hold both locks. - * The point of this arrangement is that the value can be examined by code - * that already holds WALWriteLock without needing to grab info_lck as well. + * and logFlushResult using atomic access. * In addition to the shared variable, each backend has a private copy of * both in LogwrtResult, which is updated when convenient. * @@ -473,12 +468,9 @@ typedef struct XLogCtlData pg_time_t lastSegSwitchTime; XLogRecPtr lastSegSwitchLSN; - /* - * Protected by info_lck and WALWriteLock (you must hold either lock to - * read it, but both to update) - */ - XLogRecPtr logWriteResult; /* last byte + 1 written out */ - XLogRecPtr logFlushResult; /* last byte + 1 flushed */ + /* These are accessed using atomics -- info_lck not needed */ + pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */ + pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */ /* * Latest initialized page in the cache (last byte position + 1). @@ -616,11 +608,15 @@ static XLogwrtResult LogwrtResult = {0, 0}; /* * Update local copy of shared XLogCtl->log{Write,Flush}Result + * + * It's critical that Flush always trails Write, so the order of the reads is + * important, as is the barrier. See also XLogWrite. */ #define RefreshXLogWriteResult(_target) \ do { \ - _target.Write = XLogCtl->logWriteResult; \ - _target.Flush = XLogCtl->logFlushResult; \ + _target.Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); \ + pg_read_barrier(); \ + _target.Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); \ } while (0) /* @@ -968,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata, /* advance global request to include new block(s) */ if (XLogCtl->LogwrtRqst.Write < EndPos) XLogCtl->LogwrtRqst.Write = EndPos; - /* update local result copy while I have the chance */ - RefreshXLogWriteResult(LogwrtResult); SpinLockRelease(&XLogCtl->info_lck); + RefreshXLogWriteResult(LogwrtResult); } /* @@ -1989,17 +1984,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic) if (opportunistic) break; - /* Before waiting, get info_lck and update LogwrtResult */ + /* Advance shared memory write request position */ SpinLockAcquire(&XLogCtl->info_lck); if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr) XLogCtl->LogwrtRqst.Write = OldPageRqstPtr; - RefreshXLogWriteResult(LogwrtResult); SpinLockRelease(&XLogCtl->info_lck); /* - * Now that we have an up-to-date LogwrtResult value, see if we - * still need to write it or if someone else already did. + * Acquire an up-to-date LogwrtResult value and see if we still + * need to write it or if someone else already did. */ + RefreshXLogWriteResult(LogwrtResult); if (LogwrtResult.Write < OldPageRqstPtr) { /* @@ -2556,16 +2551,35 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible) * 'result' values. This is not absolutely essential, but it saves some * code in a couple of places. */ + SpinLockAcquire(&XLogCtl->info_lck); + if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) + XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; + if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) + XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; + SpinLockRelease(&XLogCtl->info_lck); + + /* + * We write Write first, bar, then Flush. When reading, the opposite must + * be done (with a matching barrier in between), so that we always see a + * Flush value that trails behind the Write value seen. + */ + pg_atomic_write_u64(&XLogCtl->logWriteResult, LogwrtResult.Write); + pg_write_barrier(); + pg_atomic_write_u64(&XLogCtl->logFlushResult, LogwrtResult.Flush); + +#ifdef USE_ASSERT_CHECKING { - SpinLockAcquire(&XLogCtl->info_lck); - XLogCtl->logWriteResult = LogwrtResult.Write; - XLogCtl->logFlushResult = LogwrtResult.Flush; - if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write) - XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; - if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush) - XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; - SpinLockRelease(&XLogCtl->info_lck); + XLogRecPtr Flush; + XLogRecPtr Write; + + Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); + pg_read_barrier(); + Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); + + /* WAL written to disk is always ahead of WAL flushed */ + Assert(Write >= Flush); } +#endif } /* @@ -2582,7 +2596,6 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) XLogRecPtr prevAsyncXactLSN; SpinLockAcquire(&XLogCtl->info_lck); - RefreshXLogWriteResult(LogwrtResult); sleeping = XLogCtl->WalWriterSleeping; prevAsyncXactLSN = XLogCtl->asyncXactLSN; if (XLogCtl->asyncXactLSN < asyncXactLSN) @@ -2608,6 +2621,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN) { int flushblocks; + RefreshXLogWriteResult(LogwrtResult); + flushblocks = WriteRqstPtr / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ; @@ -2790,14 +2805,8 @@ XLogFlush(XLogRecPtr record) { XLogRecPtr insertpos; - /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); - if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) - WriteRqstPtr = XLogCtl->LogwrtRqst.Write; - RefreshXLogWriteResult(LogwrtResult); - SpinLockRelease(&XLogCtl->info_lck); - /* done already? */ + RefreshXLogWriteResult(LogwrtResult); if (record <= LogwrtResult.Flush) break; @@ -2805,6 +2814,10 @@ XLogFlush(XLogRecPtr record) * Before actually performing the write, wait for all in-flight * insertions to the pages we're about to write to finish. */ + SpinLockAcquire(&XLogCtl->info_lck); + if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write) + WriteRqstPtr = XLogCtl->LogwrtRqst.Write; + SpinLockRelease(&XLogCtl->info_lck); insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr); /* @@ -2947,9 +2960,8 @@ XLogBackgroundFlush(void) */ insertTLI = XLogCtl->InsertTimeLineID; - /* read LogwrtResult and update local state */ + /* read updated LogwrtRqst */ SpinLockAcquire(&XLogCtl->info_lck); - RefreshXLogWriteResult(LogwrtResult); WriteRqst = XLogCtl->LogwrtRqst; SpinLockRelease(&XLogCtl->info_lck); @@ -2957,6 +2969,7 @@ XLogBackgroundFlush(void) WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ; /* if we have already flushed that far, consider async commit records */ + RefreshXLogWriteResult(LogwrtResult); if (WriteRqst.Write <= LogwrtResult.Flush) { SpinLockAcquire(&XLogCtl->info_lck); @@ -3125,9 +3138,7 @@ XLogNeedsFlush(XLogRecPtr record) return false; /* read LogwrtResult and update local state */ - SpinLockAcquire(&XLogCtl->info_lck); RefreshXLogWriteResult(LogwrtResult); - SpinLockRelease(&XLogCtl->info_lck); /* check again */ if (record <= LogwrtResult.Flush) @@ -4940,6 +4951,8 @@ XLOGShmemInit(void) SpinLockInit(&XLogCtl->Insert.insertpos_lck); SpinLockInit(&XLogCtl->info_lck); + pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr); + pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr); pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr); } @@ -5961,11 +5974,13 @@ StartupXLOG(void) XLogCtl->InitializedUpTo = EndOfLog; } + /* + * Update local and shared status. This is OK to do without any locks + * because no other process can be reading or writing WAL yet. + */ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog; - - XLogCtl->logWriteResult = LogwrtResult.Write; - XLogCtl->logFlushResult = LogwrtResult.Flush; - + pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog); + pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog); XLogCtl->LogwrtRqst.Write = EndOfLog; XLogCtl->LogwrtRqst.Flush = EndOfLog; @@ -6410,9 +6425,7 @@ GetFlushRecPtr(TimeLineID *insertTLI) { Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE); - SpinLockAcquire(&XLogCtl->info_lck); RefreshXLogWriteResult(LogwrtResult); - SpinLockRelease(&XLogCtl->info_lck); /* * If we're writing and flushing WAL, the time line can't be changing, so @@ -9326,9 +9339,7 @@ GetXLogInsertRecPtr(void) XLogRecPtr GetXLogWriteRecPtr(void) { - SpinLockAcquire(&XLogCtl->info_lck); RefreshXLogWriteResult(LogwrtResult); - SpinLockRelease(&XLogCtl->info_lck); return LogwrtResult.Write; } |