aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c107
1 files changed, 59 insertions, 48 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3bdd9a2ddd3..b4499cda7b3 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -292,12 +292,7 @@ static bool doPageWrites;
* LogwrtRqst indicates a byte position that we need to write and/or fsync
* the log up to (all records before that point must be written or fsynced).
* The positions already written/fsynced are maintained in logWriteResult
- * and logFlushResult.
- *
- * To read XLogCtl->logWriteResult or ->logFlushResult, you must hold either
- * info_lck or WALWriteLock. To update them, you need to hold both locks.
- * The point of this arrangement is that the value can be examined by code
- * that already holds WALWriteLock without needing to grab info_lck as well.
+ * and logFlushResult using atomic access.
* In addition to the shared variable, each backend has a private copy of
* both in LogwrtResult, which is updated when convenient.
*
@@ -473,12 +468,9 @@ typedef struct XLogCtlData
pg_time_t lastSegSwitchTime;
XLogRecPtr lastSegSwitchLSN;
- /*
- * Protected by info_lck and WALWriteLock (you must hold either lock to
- * read it, but both to update)
- */
- XLogRecPtr logWriteResult; /* last byte + 1 written out */
- XLogRecPtr logFlushResult; /* last byte + 1 flushed */
+ /* These are accessed using atomics -- info_lck not needed */
+ pg_atomic_uint64 logWriteResult; /* last byte + 1 written out */
+ pg_atomic_uint64 logFlushResult; /* last byte + 1 flushed */
/*
* Latest initialized page in the cache (last byte position + 1).
@@ -616,11 +608,15 @@ static XLogwrtResult LogwrtResult = {0, 0};
/*
* Update local copy of shared XLogCtl->log{Write,Flush}Result
+ *
+ * It's critical that Flush always trails Write, so the order of the reads is
+ * important, as is the barrier. See also XLogWrite.
*/
#define RefreshXLogWriteResult(_target) \
do { \
- _target.Write = XLogCtl->logWriteResult; \
- _target.Flush = XLogCtl->logFlushResult; \
+ _target.Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult); \
+ pg_read_barrier(); \
+ _target.Write = pg_atomic_read_u64(&XLogCtl->logWriteResult); \
} while (0)
/*
@@ -968,9 +964,8 @@ XLogInsertRecord(XLogRecData *rdata,
/* advance global request to include new block(s) */
if (XLogCtl->LogwrtRqst.Write < EndPos)
XLogCtl->LogwrtRqst.Write = EndPos;
- /* update local result copy while I have the chance */
- RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
+ RefreshXLogWriteResult(LogwrtResult);
}
/*
@@ -1989,17 +1984,17 @@ AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli, bool opportunistic)
if (opportunistic)
break;
- /* Before waiting, get info_lck and update LogwrtResult */
+ /* Advance shared memory write request position */
SpinLockAcquire(&XLogCtl->info_lck);
if (XLogCtl->LogwrtRqst.Write < OldPageRqstPtr)
XLogCtl->LogwrtRqst.Write = OldPageRqstPtr;
- RefreshXLogWriteResult(LogwrtResult);
SpinLockRelease(&XLogCtl->info_lck);
/*
- * Now that we have an up-to-date LogwrtResult value, see if we
- * still need to write it or if someone else already did.
+ * Acquire an up-to-date LogwrtResult value and see if we still
+ * need to write it or if someone else already did.
*/
+ RefreshXLogWriteResult(LogwrtResult);
if (LogwrtResult.Write < OldPageRqstPtr)
{
/*
@@ -2556,16 +2551,35 @@ XLogWrite(XLogwrtRqst WriteRqst, TimeLineID tli, bool flexible)
* 'result' values. This is not absolutely essential, but it saves some
* code in a couple of places.
*/
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
+ XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
+ if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
+ XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
+ SpinLockRelease(&XLogCtl->info_lck);
+
+ /*
+ * We write Write first, bar, then Flush. When reading, the opposite must
+ * be done (with a matching barrier in between), so that we always see a
+ * Flush value that trails behind the Write value seen.
+ */
+ pg_atomic_write_u64(&XLogCtl->logWriteResult, LogwrtResult.Write);
+ pg_write_barrier();
+ pg_atomic_write_u64(&XLogCtl->logFlushResult, LogwrtResult.Flush);
+
+#ifdef USE_ASSERT_CHECKING
{
- SpinLockAcquire(&XLogCtl->info_lck);
- XLogCtl->logWriteResult = LogwrtResult.Write;
- XLogCtl->logFlushResult = LogwrtResult.Flush;
- if (XLogCtl->LogwrtRqst.Write < LogwrtResult.Write)
- XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
- if (XLogCtl->LogwrtRqst.Flush < LogwrtResult.Flush)
- XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
- SpinLockRelease(&XLogCtl->info_lck);
+ XLogRecPtr Flush;
+ XLogRecPtr Write;
+
+ Flush = pg_atomic_read_u64(&XLogCtl->logFlushResult);
+ pg_read_barrier();
+ Write = pg_atomic_read_u64(&XLogCtl->logWriteResult);
+
+ /* WAL written to disk is always ahead of WAL flushed */
+ Assert(Write >= Flush);
}
+#endif
}
/*
@@ -2582,7 +2596,6 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
XLogRecPtr prevAsyncXactLSN;
SpinLockAcquire(&XLogCtl->info_lck);
- RefreshXLogWriteResult(LogwrtResult);
sleeping = XLogCtl->WalWriterSleeping;
prevAsyncXactLSN = XLogCtl->asyncXactLSN;
if (XLogCtl->asyncXactLSN < asyncXactLSN)
@@ -2608,6 +2621,8 @@ XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN)
{
int flushblocks;
+ RefreshXLogWriteResult(LogwrtResult);
+
flushblocks =
WriteRqstPtr / XLOG_BLCKSZ - LogwrtResult.Flush / XLOG_BLCKSZ;
@@ -2790,14 +2805,8 @@ XLogFlush(XLogRecPtr record)
{
XLogRecPtr insertpos;
- /* read LogwrtResult and update local state */
- SpinLockAcquire(&XLogCtl->info_lck);
- if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
- WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
- RefreshXLogWriteResult(LogwrtResult);
- SpinLockRelease(&XLogCtl->info_lck);
-
/* done already? */
+ RefreshXLogWriteResult(LogwrtResult);
if (record <= LogwrtResult.Flush)
break;
@@ -2805,6 +2814,10 @@ XLogFlush(XLogRecPtr record)
* Before actually performing the write, wait for all in-flight
* insertions to the pages we're about to write to finish.
*/
+ SpinLockAcquire(&XLogCtl->info_lck);
+ if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
+ WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
+ SpinLockRelease(&XLogCtl->info_lck);
insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr);
/*
@@ -2947,9 +2960,8 @@ XLogBackgroundFlush(void)
*/
insertTLI = XLogCtl->InsertTimeLineID;
- /* read LogwrtResult and update local state */
+ /* read updated LogwrtRqst */
SpinLockAcquire(&XLogCtl->info_lck);
- RefreshXLogWriteResult(LogwrtResult);
WriteRqst = XLogCtl->LogwrtRqst;
SpinLockRelease(&XLogCtl->info_lck);
@@ -2957,6 +2969,7 @@ XLogBackgroundFlush(void)
WriteRqst.Write -= WriteRqst.Write % XLOG_BLCKSZ;
/* if we have already flushed that far, consider async commit records */
+ RefreshXLogWriteResult(LogwrtResult);
if (WriteRqst.Write <= LogwrtResult.Flush)
{
SpinLockAcquire(&XLogCtl->info_lck);
@@ -3125,9 +3138,7 @@ XLogNeedsFlush(XLogRecPtr record)
return false;
/* read LogwrtResult and update local state */
- SpinLockAcquire(&XLogCtl->info_lck);
RefreshXLogWriteResult(LogwrtResult);
- SpinLockRelease(&XLogCtl->info_lck);
/* check again */
if (record <= LogwrtResult.Flush)
@@ -4940,6 +4951,8 @@ XLOGShmemInit(void)
SpinLockInit(&XLogCtl->Insert.insertpos_lck);
SpinLockInit(&XLogCtl->info_lck);
+ pg_atomic_init_u64(&XLogCtl->logWriteResult, InvalidXLogRecPtr);
+ pg_atomic_init_u64(&XLogCtl->logFlushResult, InvalidXLogRecPtr);
pg_atomic_init_u64(&XLogCtl->unloggedLSN, InvalidXLogRecPtr);
}
@@ -5961,11 +5974,13 @@ StartupXLOG(void)
XLogCtl->InitializedUpTo = EndOfLog;
}
+ /*
+ * Update local and shared status. This is OK to do without any locks
+ * because no other process can be reading or writing WAL yet.
+ */
LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
-
- XLogCtl->logWriteResult = LogwrtResult.Write;
- XLogCtl->logFlushResult = LogwrtResult.Flush;
-
+ pg_atomic_write_u64(&XLogCtl->logWriteResult, EndOfLog);
+ pg_atomic_write_u64(&XLogCtl->logFlushResult, EndOfLog);
XLogCtl->LogwrtRqst.Write = EndOfLog;
XLogCtl->LogwrtRqst.Flush = EndOfLog;
@@ -6410,9 +6425,7 @@ GetFlushRecPtr(TimeLineID *insertTLI)
{
Assert(XLogCtl->SharedRecoveryState == RECOVERY_STATE_DONE);
- SpinLockAcquire(&XLogCtl->info_lck);
RefreshXLogWriteResult(LogwrtResult);
- SpinLockRelease(&XLogCtl->info_lck);
/*
* If we're writing and flushing WAL, the time line can't be changing, so
@@ -9326,9 +9339,7 @@ GetXLogInsertRecPtr(void)
XLogRecPtr
GetXLogWriteRecPtr(void)
{
- SpinLockAcquire(&XLogCtl->info_lck);
RefreshXLogWriteResult(LogwrtResult);
- SpinLockRelease(&XLogCtl->info_lck);
return LogwrtResult.Write;
}