diff options
author | Thomas Munro <tmunro@postgresql.org> | 2020-04-08 23:45:09 +1200 |
---|---|---|
committer | Thomas Munro <tmunro@postgresql.org> | 2020-04-08 23:45:09 +1200 |
commit | d140f2f3e225ea53e2d92ab6833b8c186c90666c (patch) | |
tree | 36e5d16f4fb7af2f5d8f6d151b351c01d71cc504 | |
parent | 83fd4532a72179c370e318075a10e0e2aa832024 (diff) | |
download | postgresql-d140f2f3e225ea53e2d92ab6833b8c186c90666c.tar.gz postgresql-d140f2f3e225ea53e2d92ab6833b8c186c90666c.zip |
Rationalize GetWalRcv{Write,Flush}RecPtr().
GetWalRcvWriteRecPtr() previously reported the latest *flushed*
location. Adopt the conventional terminology used elsewhere in the tree
by renaming it to GetWalRcvFlushRecPtr(), and likewise for some related
variables that used the term "received".
Add a new definition of GetWalRcvWriteRecPtr(), which returns the latest
*written* value. This will allow later patches to use the value for
non-data-integrity purposes, without having to wait for the flush
pointer to advance.
Reviewed-by: Alvaro Herrera <alvherre@2ndquadrant.com>
Reviewed-by: Andres Freund <andres@anarazel.de>
Discussion: https://postgr.es/m/CA%2BhUKGJ4VJN8ttxScUFM8dOKX0BrBiboo5uz1cq%3DAovOddfHpA%40mail.gmail.com
-rw-r--r-- | src/backend/access/transam/xlog.c | 20 | ||||
-rw-r--r-- | src/backend/access/transam/xlogfuncs.c | 2 | ||||
-rw-r--r-- | src/backend/replication/README | 2 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 15 | ||||
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 24 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 2 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 18 |
7 files changed, 55 insertions, 28 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 740d7044b1d..c38bc1412d8 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -208,8 +208,8 @@ HotStandbyState standbyState = STANDBY_DISABLED; static XLogRecPtr LastRec; -/* Local copy of WalRcv->receivedUpto */ -static XLogRecPtr receivedUpto = 0; +/* Local copy of WalRcv->flushedUpto */ +static XLogRecPtr flushedUpto = 0; static TimeLineID receiveTLI = 0; /* @@ -9363,7 +9363,7 @@ CreateRestartPoint(int flags) * Retreat _logSegNo using the current end of xlog replayed or received, * whichever is later. */ - receivePtr = GetWalRcvWriteRecPtr(NULL, NULL); + receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); @@ -11856,7 +11856,7 @@ retry: /* See if we need to retrieve more data */ if (readFile < 0 || (readSource == XLOG_FROM_STREAM && - receivedUpto < targetPagePtr + reqLen)) + flushedUpto < targetPagePtr + reqLen)) { if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen, private->randAccess, @@ -11887,10 +11887,10 @@ retry: */ if (readSource == XLOG_FROM_STREAM) { - if (((targetPagePtr) / XLOG_BLCKSZ) != (receivedUpto / XLOG_BLCKSZ)) + if (((targetPagePtr) / XLOG_BLCKSZ) != (flushedUpto / XLOG_BLCKSZ)) readLen = XLOG_BLCKSZ; else - readLen = XLogSegmentOffset(receivedUpto, wal_segment_size) - + readLen = XLogSegmentOffset(flushedUpto, wal_segment_size) - targetPageOff; } else @@ -12305,7 +12305,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, RequestXLogStreaming(tli, ptr, PrimaryConnInfo, PrimarySlotName, wal_receiver_create_temp_slot); - receivedUpto = 0; + flushedUpto = 0; } /* @@ -12329,14 +12329,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * XLogReceiptTime will not advance, so the grace time * allotted to conflicting queries will decrease. */ - if (RecPtr < receivedUpto) + if (RecPtr < flushedUpto) havedata = true; else { XLogRecPtr latestChunkStart; - receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI); - if (RecPtr < receivedUpto && receiveTLI == curFileTLI) + flushedUpto = GetWalRcvFlushRecPtr(&latestChunkStart, &receiveTLI); + if (RecPtr < flushedUpto && receiveTLI == curFileTLI) { havedata = true; if (latestChunkStart <= RecPtr) diff --git a/src/backend/access/transam/xlogfuncs.c b/src/backend/access/transam/xlogfuncs.c index b84ba572596..00e1b33ed5f 100644 --- a/src/backend/access/transam/xlogfuncs.c +++ b/src/backend/access/transam/xlogfuncs.c @@ -398,7 +398,7 @@ pg_last_wal_receive_lsn(PG_FUNCTION_ARGS) { XLogRecPtr recptr; - recptr = GetWalRcvWriteRecPtr(NULL, NULL); + recptr = GetWalRcvFlushRecPtr(NULL, NULL); if (recptr == 0) PG_RETURN_NULL(); diff --git a/src/backend/replication/README b/src/backend/replication/README index 0cbb9906135..8ccdd86e74b 100644 --- a/src/backend/replication/README +++ b/src/backend/replication/README @@ -54,7 +54,7 @@ and WalRcvData->slotname, and initializes the starting point in WalRcvData->receiveStart. As walreceiver receives WAL from the master server, and writes and flushes -it to disk (in pg_wal), it updates WalRcvData->receivedUpto and signals +it to disk (in pg_wal), it updates WalRcvData->flushedUpto and signals the startup process to know how far WAL replay can advance. Walreceiver sends information about replication progress to the master server diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index aee67c61aa6..d69fb90132d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -12,7 +12,7 @@ * in the primary server), and then keeps receiving XLOG records and * writing them to the disk as long as the connection is alive. As XLOG * records are received and flushed to disk, it updates the - * WalRcv->receivedUpto variable in shared memory, to inform the startup + * WalRcv->flushedUpto variable in shared memory, to inform the startup * process of how far it can proceed with XLOG replay. * * A WAL receiver cannot directly load GUC parameters used when establishing @@ -261,6 +261,8 @@ WalReceiverMain(void) SpinLockRelease(&walrcv->mutex); + pg_atomic_init_u64(&WalRcv->writtenUpto, 0); + /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); @@ -984,6 +986,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) LogstreamResult.Write = recptr; } + + /* Update shared-memory status */ + pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write); } /* @@ -1005,10 +1010,10 @@ XLogWalRcvFlush(bool dying) /* Update shared-memory status */ SpinLockAcquire(&walrcv->mutex); - if (walrcv->receivedUpto < LogstreamResult.Flush) + if (walrcv->flushedUpto < LogstreamResult.Flush) { - walrcv->latestChunkStart = walrcv->receivedUpto; - walrcv->receivedUpto = LogstreamResult.Flush; + walrcv->latestChunkStart = walrcv->flushedUpto; + walrcv->flushedUpto = LogstreamResult.Flush; walrcv->receivedTLI = ThisTimeLineID; } SpinLockRelease(&walrcv->mutex); @@ -1361,7 +1366,7 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS) state = WalRcv->walRcvState; receive_start_lsn = WalRcv->receiveStart; receive_start_tli = WalRcv->receiveStartTLI; - received_lsn = WalRcv->receivedUpto; + received_lsn = WalRcv->flushedUpto; received_tli = WalRcv->receivedTLI; last_send_time = WalRcv->lastMsgSendTime; last_receipt_time = WalRcv->lastMsgReceiptTime; diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 21d18236076..4afad83539c 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -282,11 +282,11 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, /* * If this is the first startup of walreceiver (on this timeline), - * initialize receivedUpto and latestChunkStart to the starting point. + * initialize flushedUpto and latestChunkStart to the starting point. */ if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) { - walrcv->receivedUpto = recptr; + walrcv->flushedUpto = recptr; walrcv->receivedTLI = tli; walrcv->latestChunkStart = recptr; } @@ -304,7 +304,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, } /* - * Returns the last+1 byte position that walreceiver has written. + * Returns the last+1 byte position that walreceiver has flushed. * * Optionally, returns the previous chunk start, that is the first byte * written in the most recent walreceiver flush cycle. Callers not @@ -312,13 +312,13 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, * receiveTLI. */ XLogRecPtr -GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) +GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) { WalRcvData *walrcv = WalRcv; XLogRecPtr recptr; SpinLockAcquire(&walrcv->mutex); - recptr = walrcv->receivedUpto; + recptr = walrcv->flushedUpto; if (latestChunkStart) *latestChunkStart = walrcv->latestChunkStart; if (receiveTLI) @@ -329,6 +329,18 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) } /* + * Returns the last+1 byte position that walreceiver has written. + * This returns a recently written value without taking a lock. + */ +XLogRecPtr +GetWalRcvWriteRecPtr(void) +{ + WalRcvData *walrcv = WalRcv; + + return pg_atomic_read_u64(&walrcv->writtenUpto); +} + +/* * Returns the replication apply delay in ms or -1 * if the apply delay info is not available */ @@ -345,7 +357,7 @@ GetReplicationApplyDelay(void) TimestampTz chunkReplayStartTime; SpinLockAcquire(&walrcv->mutex); - receivePtr = walrcv->receivedUpto; + receivePtr = walrcv->flushedUpto; SpinLockRelease(&walrcv->mutex); replayPtr = GetXLogReplayRecPtr(NULL); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 06e8b790360..122d884f3e4 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -2949,7 +2949,7 @@ GetStandbyFlushRecPtr(void) * has streamed, but hasn't been replayed yet. */ - receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI); + receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); replayPtr = GetXLogReplayRecPtr(&replayTLI); ThisTimeLineID = replayTLI; diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index cf3e43128c7..f1aa6e9977c 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -16,6 +16,7 @@ #include "access/xlogdefs.h" #include "getaddrinfo.h" /* for NI_MAXHOST */ #include "pgtime.h" +#include "port/atomics.h" #include "replication/logicalproto.h" #include "replication/walsender.h" #include "storage/latch.h" @@ -73,19 +74,19 @@ typedef struct TimeLineID receiveStartTLI; /* - * receivedUpto-1 is the last byte position that has already been + * flushedUpto-1 is the last byte position that has already been * received, and receivedTLI is the timeline it came from. At the first * startup of walreceiver, these are set to receiveStart and * receiveStartTLI. After that, walreceiver updates these whenever it * flushes the received WAL to disk. */ - XLogRecPtr receivedUpto; + XLogRecPtr flushedUpto; TimeLineID receivedTLI; /* * latestChunkStart is the starting byte position of the current "batch" * of received WAL. It's actually the same as the previous value of - * receivedUpto before the last flush to disk. Startup process can use + * flushedUpto before the last flush to disk. Startup process can use * this to detect whether it's keeping up or not. */ XLogRecPtr latestChunkStart; @@ -142,6 +143,14 @@ typedef struct slock_t mutex; /* locks shared variables shown above */ /* + * Like flushedUpto, but advanced after writing and before flushing, + * without the need to acquire the spin lock. Data can be read by another + * process up to this point, but shouldn't be used for data integrity + * purposes. + */ + pg_atomic_uint64 writtenUpto; + + /* * force walreceiver reply? This doesn't need to be locked; memory * barriers for ordering are sufficient. But we do need atomic fetch and * store semantics, so use sig_atomic_t. @@ -322,7 +331,8 @@ extern bool WalRcvRunning(void); extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, const char *slotname, bool create_temp_slot); -extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); +extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI); +extern XLogRecPtr GetWalRcvWriteRecPtr(void); extern int GetReplicationApplyDelay(void); extern int GetReplicationTransferLatency(void); extern void WalRcvForceReply(void); |