aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walreceiver.c96
1 files changed, 61 insertions, 35 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 9a2bc37fd71..b90e5ca98ea 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -125,6 +125,7 @@ static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(bool dying);
+static void XLogWalRcvClose(XLogRecPtr recptr);
static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
@@ -883,42 +884,12 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
int segbytes;
- if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
- {
- /*
- * fsync() and close current file before we switch to next one. We
- * would otherwise have to reopen this file to fsync it later
- */
- if (recvFile >= 0)
- {
- char xlogfname[MAXFNAMELEN];
-
- XLogWalRcvFlush(false);
-
- XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
-
- /*
- * XLOG segment files will be re-read by recovery in startup
- * process soon, so we don't advise the OS to release cache
- * pages associated with the file like XLogFileClose() does.
- */
- if (close(recvFile) != 0)
- ereport(PANIC,
- (errcode_for_file_access(),
- errmsg("could not close log segment %s: %m",
- xlogfname)));
-
- /*
- * Create .done file forcibly to prevent the streamed segment
- * from being archived later.
- */
- if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
- XLogArchiveForceDone(xlogfname);
- else
- XLogArchiveNotify(xlogfname);
- }
- recvFile = -1;
+ /* Close the current segment if it's completed */
+ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
+ XLogWalRcvClose(recptr);
+ if (recvFile < 0)
+ {
/* Create/use new log file */
XLByteToSeg(recptr, recvSegNo, wal_segment_size);
recvFile = XLogFileInit(recvSegNo);
@@ -967,6 +938,15 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
/* Update shared-memory status */
pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
+
+ /*
+ * Close the current segment if it's fully written up in the last cycle of
+ * the loop, to create its archive notification file soon. Otherwise WAL
+ * archiving of the segment will be delayed until any data in the next
+ * segment is received and written.
+ */
+ if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
+ XLogWalRcvClose(recptr);
}
/*
@@ -1021,6 +1001,52 @@ XLogWalRcvFlush(bool dying)
}
/*
+ * Close the current segment.
+ *
+ * Flush the segment to disk before closing it. Otherwise we have to
+ * reopen and fsync it later.
+ *
+ * Create an archive notification file since the segment is known completed.
+ */
+static void
+XLogWalRcvClose(XLogRecPtr recptr)
+{
+ char xlogfname[MAXFNAMELEN];
+
+ Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
+
+ /*
+ * fsync() and close current file before we switch to next one. We would
+ * otherwise have to reopen this file to fsync it later
+ */
+ XLogWalRcvFlush(false);
+
+ XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
+
+ /*
+ * XLOG segment files will be re-read by recovery in startup process soon,
+ * so we don't advise the OS to release cache pages associated with the
+ * file like XLogFileClose() does.
+ */
+ if (close(recvFile) != 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not close log segment %s: %m",
+ xlogfname)));
+
+ /*
+ * Create .done file forcibly to prevent the streamed segment from being
+ * archived later.
+ */
+ if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
+ XLogArchiveForceDone(xlogfname);
+ else
+ XLogArchiveNotify(xlogfname);
+
+ recvFile = -1;
+}
+
+/*
* Send reply message to primary, indicating our current WAL locations, oldest
* xmin and the current time.
*