aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/basebackup.c34
-rw-r--r--src/backend/replication/logical/logical.c2
-rw-r--r--src/backend/replication/logical/reorderbuffer.c19
-rw-r--r--src/backend/replication/slot.c2
-rw-r--r--src/backend/replication/walreceiver.c14
-rw-r--r--src/backend/replication/walreceiverfuncs.c4
-rw-r--r--src/backend/replication/walsender.c16
7 files changed, 48 insertions, 43 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 12a16bd773d..c3b9bddc8fe 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -357,10 +357,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
* shouldn't be such files, but if there are, there's little harm in
* including them.
*/
- XLByteToSeg(startptr, startsegno);
- XLogFileName(firstoff, ThisTimeLineID, startsegno);
- XLByteToPrevSeg(endptr, endsegno);
- XLogFileName(lastoff, ThisTimeLineID, endsegno);
+ XLByteToSeg(startptr, startsegno, wal_segment_size);
+ XLogFileName(firstoff, ThisTimeLineID, startsegno, wal_segment_size);
+ XLByteToPrevSeg(endptr, endsegno, wal_segment_size);
+ XLogFileName(lastoff, ThisTimeLineID, endsegno, wal_segment_size);
dir = AllocateDir("pg_wal");
if (!dir)
@@ -415,12 +415,13 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
* Sanity check: the first and last segment should cover startptr and
* endptr, with no gaps in between.
*/
- XLogFromFileName(walFiles[0], &tli, &segno);
+ XLogFromFileName(walFiles[0], &tli, &segno, wal_segment_size);
if (segno != startsegno)
{
char startfname[MAXFNAMELEN];
- XLogFileName(startfname, ThisTimeLineID, startsegno);
+ XLogFileName(startfname, ThisTimeLineID, startsegno,
+ wal_segment_size);
ereport(ERROR,
(errmsg("could not find WAL file \"%s\"", startfname)));
}
@@ -429,12 +430,13 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
XLogSegNo currsegno = segno;
XLogSegNo nextsegno = segno + 1;
- XLogFromFileName(walFiles[i], &tli, &segno);
+ XLogFromFileName(walFiles[i], &tli, &segno, wal_segment_size);
if (!(nextsegno == segno || currsegno == segno))
{
char nextfname[MAXFNAMELEN];
- XLogFileName(nextfname, ThisTimeLineID, nextsegno);
+ XLogFileName(nextfname, ThisTimeLineID, nextsegno,
+ wal_segment_size);
ereport(ERROR,
(errmsg("could not find WAL file \"%s\"", nextfname)));
}
@@ -443,7 +445,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
{
char endfname[MAXFNAMELEN];
- XLogFileName(endfname, ThisTimeLineID, endsegno);
+ XLogFileName(endfname, ThisTimeLineID, endsegno, wal_segment_size);
ereport(ERROR,
(errmsg("could not find WAL file \"%s\"", endfname)));
}
@@ -457,7 +459,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
pgoff_t len = 0;
snprintf(pathbuf, MAXPGPATH, XLOGDIR "/%s", walFiles[i]);
- XLogFromFileName(walFiles[i], &tli, &segno);
+ XLogFromFileName(walFiles[i], &tli, &segno, wal_segment_size);
fp = AllocateFile(pathbuf, "rb");
if (fp == NULL)
@@ -479,7 +481,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
(errcode_for_file_access(),
errmsg("could not stat file \"%s\": %m",
pathbuf)));
- if (statbuf.st_size != XLogSegSize)
+ if (statbuf.st_size != wal_segment_size)
{
CheckXLogRemoved(segno, tli);
ereport(ERROR,
@@ -490,7 +492,9 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
/* send the WAL file itself */
_tarWriteHeader(pathbuf, NULL, &statbuf, false);
- while ((cnt = fread(buf, 1, Min(sizeof(buf), XLogSegSize - len), fp)) > 0)
+ while ((cnt = fread(buf, 1,
+ Min(sizeof(buf), wal_segment_size - len),
+ fp)) > 0)
{
CheckXLogRemoved(segno, tli);
/* Send the chunk as a CopyData message */
@@ -501,11 +505,11 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
len += cnt;
throttle(cnt);
- if (len == XLogSegSize)
+ if (len == wal_segment_size)
break;
}
- if (len != XLogSegSize)
+ if (len != wal_segment_size)
{
CheckXLogRemoved(segno, tli);
ereport(ERROR,
@@ -513,7 +517,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
errmsg("unexpected WAL file size \"%s\"", walFiles[i])));
}
- /* XLogSegSize is a multiple of 512, so no need for padding */
+ /* wal_segment_size is a multiple of 512, so no need for padding */
FreeFile(fp);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index efb9785f25e..bca585fc27c 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -163,7 +163,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(read_page, ctx);
+ ctx->reader = XLogReaderAllocate(wal_segment_size, read_page, ctx);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 657bafae579..68766d522d5 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2083,15 +2083,16 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
* store in segment in which it belongs by start lsn, don't split over
* multiple segments tho
*/
- if (fd == -1 || !XLByteInSeg(change->lsn, curOpenSegNo))
+ if (fd == -1 ||
+ !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
{
XLogRecPtr recptr;
if (fd != -1)
CloseTransientFile(fd);
- XLByteToSeg(change->lsn, curOpenSegNo);
- XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr);
+ XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
+ XLogSegNoOffsetToRecPtr(curOpenSegNo, 0, recptr, wal_segment_size);
/*
* No need to care about TLIs here, only used during a single run,
@@ -2319,7 +2320,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
txn->nentries_mem = 0;
Assert(dlist_is_empty(&txn->changes));
- XLByteToSeg(txn->final_lsn, last_segno);
+ XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
while (restored < max_changes_in_memory && *segno <= last_segno)
{
@@ -2334,11 +2335,11 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
/* first time in */
if (*segno == 0)
{
- XLByteToSeg(txn->first_lsn, *segno);
+ XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
}
Assert(*segno != 0 || dlist_is_empty(&txn->changes));
- XLogSegNoOffsetToRecPtr(*segno, 0, recptr);
+ XLogSegNoOffsetToRecPtr(*segno, 0, recptr, wal_segment_size);
/*
* No need to care about TLIs here, only used during a single run,
@@ -2575,8 +2576,8 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
Assert(txn->first_lsn != InvalidXLogRecPtr);
Assert(txn->final_lsn != InvalidXLogRecPtr);
- XLByteToSeg(txn->first_lsn, first);
- XLByteToSeg(txn->final_lsn, last);
+ XLByteToSeg(txn->first_lsn, first, wal_segment_size);
+ XLByteToSeg(txn->final_lsn, last, wal_segment_size);
/* iterate over all possible filenames, and delete them */
for (cur = first; cur <= last; cur++)
@@ -2584,7 +2585,7 @@ ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
char path[MAXPGPATH];
XLogRecPtr recptr;
- XLogSegNoOffsetToRecPtr(cur, 0, recptr);
+ XLogSegNoOffsetToRecPtr(cur, 0, recptr, wal_segment_size);
sprintf(path, "pg_replslot/%s/xid-%u-lsn-%X-%X.snap",
NameStr(MyReplicationSlot->data.name), txn->xid,
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index a8a16f55e98..23de2577eff 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1039,7 +1039,7 @@ ReplicationSlotReserveWal(void)
* the new restart_lsn above, so normally we should never need to loop
* more than twice.
*/
- XLByteToSeg(slot->data.restart_lsn, segno);
+ XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
if (XLogGetLastRemovedSegno() < segno)
break;
}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ea9d21a46b3..3474514adcc 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -613,7 +613,7 @@ WalReceiverMain(void)
* Create .done file forcibly to prevent the streamed segment from
* being archived later.
*/
- XLogFileName(xlogfname, recvFileTLI, recvSegNo);
+ XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
XLogArchiveForceDone(xlogfname);
else
@@ -943,7 +943,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
int segbytes;
- if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
+ if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
{
bool use_existent;
@@ -972,7 +972,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
* Create .done file forcibly to prevent the streamed segment
* from being archived later.
*/
- XLogFileName(xlogfname, recvFileTLI, recvSegNo);
+ XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
XLogArchiveForceDone(xlogfname);
else
@@ -981,7 +981,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
recvFile = -1;
/* Create/use new log file */
- XLByteToSeg(recptr, recvSegNo);
+ XLByteToSeg(recptr, recvSegNo, wal_segment_size);
use_existent = true;
recvFile = XLogFileInit(recvSegNo, &use_existent, true);
recvFileTLI = ThisTimeLineID;
@@ -989,10 +989,10 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
}
/* Calculate the start offset of the received logs */
- startoff = recptr % XLogSegSize;
+ startoff = XLogSegmentOffset(recptr, wal_segment_size);
- if (startoff + nbytes > XLogSegSize)
- segbytes = XLogSegSize - startoff;
+ if (startoff + nbytes > wal_segment_size)
+ segbytes = wal_segment_size - startoff;
else
segbytes = nbytes;
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 8ed7254b5c6..78f8693ece7 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -233,8 +233,8 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
* being created by XLOG streaming, which might cause trouble later on if
* the segment is e.g archived.
*/
- if (recptr % XLogSegSize != 0)
- recptr -= recptr % XLogSegSize;
+ if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
+ recptr -= XLogSegmentOffset(recptr, wal_segment_size);
SpinLockAcquire(&walrcv->mutex);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1fbe8ed71b0..56999e93157 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2316,9 +2316,9 @@ retry:
int segbytes;
int readbytes;
- startoff = recptr % XLogSegSize;
+ startoff = XLogSegmentOffset(recptr, wal_segment_size);
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo, wal_segment_size))
{
char path[MAXPGPATH];
@@ -2326,7 +2326,7 @@ retry:
if (sendFile >= 0)
close(sendFile);
- XLByteToSeg(recptr, sendSegNo);
+ XLByteToSeg(recptr, sendSegNo, wal_segment_size);
/*-------
* When reading from a historic timeline, and there is a timeline
@@ -2359,12 +2359,12 @@ retry:
{
XLogSegNo endSegNo;
- XLByteToSeg(sendTimeLineValidUpto, endSegNo);
+ XLByteToSeg(sendTimeLineValidUpto, endSegNo, wal_segment_size);
if (sendSegNo == endSegNo)
curFileTimeLine = sendTimeLineNextTLI;
}
- XLogFilePath(path, curFileTimeLine, sendSegNo);
+ XLogFilePath(path, curFileTimeLine, sendSegNo, wal_segment_size);
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
@@ -2401,8 +2401,8 @@ retry:
}
/* How many bytes are within this segment? */
- if (nbytes > (XLogSegSize - startoff))
- segbytes = XLogSegSize - startoff;
+ if (nbytes > (wal_segment_size - startoff))
+ segbytes = wal_segment_size - startoff;
else
segbytes = nbytes;
@@ -2433,7 +2433,7 @@ retry:
* read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records.
*/
- XLByteToSeg(startptr, segno);
+ XLByteToSeg(startptr, segno, wal_segment_size);
CheckXLogRemoved(segno, ThisTimeLineID);
/*