aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 17:56:26 +0200
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 18:46:41 +0200
commit2c03216d831160bedd72d45f712601b6f7d03f1c (patch)
treeab6a03d031ffa605d848b0b7067add15e56e2207 /src/backend/access/transam/xlog.c
parent8dc626defec23016dd5988208d8704b858b9d21d (diff)
downloadpostgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.tar.gz
postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.zip
Revamp the WAL record format.
Each WAL record now carries information about the modified relation and block(s) in a standardized format. That makes it easier to write tools that need that information, like pg_rewind, prefetching the blocks to speed up recovery, etc. There's a whole new API for building WAL records, replacing the XLogRecData chains used previously. The new API consists of XLogRegister* functions, which are called for each buffer and chunk of data that is added to the record. The new API also gives more control over when a full-page image is written, by passing flags to the XLogRegisterBuffer function. This also simplifies the XLogReadBufferForRedo() calls. The function can dig the relation and block number from the WAL record, so they no longer need to be passed as arguments. For the convenience of redo routines, XLogReader now disects each WAL record after reading it, copying the main data part and the per-block data into MAXALIGNed buffers. The data chunks are not aligned within the WAL record, but the redo routines can assume that the pointers returned by XLogRecGet* functions are. Redo routines are now passed the XLogReaderState, which contains the record in the already-disected format, instead of the plain XLogRecord. The new record format also makes the fixed size XLogRecord header smaller, by removing the xl_len field. The length of the "main data" portion is now stored at the end of the WAL record, and there's a separate header after XLogRecord for it. The alignment padding at the end of XLogRecord is also removed. This compansates for the fact that the new format would otherwise be more bulky than the old format. Reviewed by Andres Freund, Amit Kapila, Michael Paquier, Alvaro Herrera, Fujii Masao.
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c348
1 files changed, 171 insertions, 177 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 60531277dc6..2059bbeda4a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -757,10 +757,10 @@ static MemoryContext walDebugCxt = NULL;
static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
-static bool recoveryStopsBefore(XLogRecord *record);
-static bool recoveryStopsAfter(XLogRecord *record);
+static bool recoveryStopsBefore(XLogReaderState *record);
+static bool recoveryStopsAfter(XLogReaderState *record);
static void recoveryPausesHere(void);
-static bool recoveryApplyDelay(XLogRecord *record);
+static bool recoveryApplyDelay(XLogReaderState *record);
static void SetLatestXTime(TimestampTz xtime);
static void SetCurrentChunkStartTime(TimestampTz xtime);
static void CheckRequiredParameterValues(void);
@@ -807,9 +807,9 @@ static char *str_time(pg_time_t tnow);
static bool CheckForStandbyTrigger(void);
#ifdef WAL_DEBUG
-static void xlog_outrec(StringInfo buf, XLogRecord *record);
+static void xlog_outrec(StringInfo buf, XLogReaderState *record);
#endif
-static void xlog_outdesc(StringInfo buf, RmgrId rmid, XLogRecord *record);
+static void xlog_outdesc(StringInfo buf, XLogReaderState *record);
static void pg_start_backup_callback(int code, Datum arg);
static bool read_backup_label(XLogRecPtr *checkPointLoc,
bool *backupEndRequired, bool *backupFromStandby);
@@ -861,7 +861,6 @@ XLogRecPtr
XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecData *rdt;
pg_crc32 rdata_crc;
bool inserted;
XLogRecord *rechdr = (XLogRecord *) rdata->data;
@@ -870,28 +869,13 @@ XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn)
XLogRecPtr StartPos;
XLogRecPtr EndPos;
+ /* we assume that all of the record header is in the first chunk */
+ Assert(rdata->len >= SizeOfXLogRecord);
+
/* cross-check on whether we should be here or not */
if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");
- /*
- * Calculate CRC of the data, including all the backup blocks
- *
- * Note that the record header isn't added into the CRC initially since we
- * don't know the prev-link yet. Thus, the CRC will represent the CRC of
- * the whole record in the order: rdata, then backup blocks, then record
- * header.
- */
- INIT_CRC32C(rdata_crc);
- for (rdt = rdata->next; rdt != NULL; rdt = rdt->next)
- COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
- /*
- * Calculate CRC of the header, except for prev-link, because we don't
- * know it yet. It will be added later.
- */
- COMP_CRC32C(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
-
/*----------
*
* We have now done all the preparatory work we can without holding a
@@ -976,10 +960,11 @@ XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn)
if (inserted)
{
/*
- * Now that xl_prev has been filled in, finish CRC calculation of the
- * record header.
+ * Now that xl_prev has been filled in, calculate CRC of the record
+ * header.
*/
- COMP_CRC32C(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr));
+ rdata_crc = rechdr->xl_crc;
+ COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(rdata_crc);
rechdr->xl_crc = rdata_crc;
@@ -1053,34 +1038,47 @@ XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn)
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
{
+ static XLogReaderState *debug_reader = NULL;
StringInfoData buf;
- MemoryContext oldCxt = MemoryContextSwitchTo(walDebugCxt);
+ StringInfoData recordBuf;
+ char *errormsg = NULL;
+ MemoryContext oldCxt;
+
+ oldCxt = MemoryContextSwitchTo(walDebugCxt);
initStringInfo(&buf);
appendStringInfo(&buf, "INSERT @ %X/%X: ",
(uint32) (EndPos >> 32), (uint32) EndPos);
- xlog_outrec(&buf, rechdr);
- if (rdata->data != NULL)
- {
- StringInfoData recordbuf;
- /*
- * We have to piece together the WAL record data from the
- * XLogRecData entries, so that we can pass it to the rm_desc
- * function as one contiguous chunk.
- */
- initStringInfo(&recordbuf);
- appendBinaryStringInfo(&recordbuf, (char *) rechdr, sizeof(XLogRecord));
- for (; rdata != NULL; rdata = rdata->next)
- appendBinaryStringInfo(&recordbuf, rdata->data, rdata->len);
+ /*
+ * We have to piece together the WAL record data from the XLogRecData
+ * entries, so that we can pass it to the rm_desc function as one
+ * contiguous chunk.
+ */
+ initStringInfo(&recordBuf);
+ for (; rdata != NULL; rdata = rdata->next)
+ appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
+
+ if (!debug_reader)
+ debug_reader = XLogReaderAllocate(NULL, NULL);
+ if (!debug_reader ||
+ !DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data,
+ &errormsg))
+ {
+ appendStringInfo(&buf, "error decoding record: %s",
+ errormsg ? errormsg : "no error message");
+ }
+ else
+ {
appendStringInfoString(&buf, " - ");
- xlog_outdesc(&buf, rechdr->xl_rmid, (XLogRecord *) recordbuf.data);
+ xlog_outdesc(&buf, debug_reader);
}
elog(LOG, "%s", buf.data);
+ pfree(buf.data);
+ pfree(recordBuf.data);
MemoryContextSwitchTo(oldCxt);
- MemoryContextReset(walDebugCxt);
}
#endif
@@ -1170,7 +1168,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
uint64 startbytepos;
uint64 endbytepos;
uint64 prevbytepos;
- uint32 size = SizeOfXLogRecord;
+ uint32 size = MAXALIGN(SizeOfXLogRecord);
XLogRecPtr ptr;
uint32 segleft;
@@ -1234,9 +1232,6 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
XLogRecPtr CurrPos;
XLogPageHeader pagehdr;
- /* The first chunk is the record header */
- Assert(rdata->len == SizeOfXLogRecord);
-
/*
* Get a pointer to the right place in the right WAL buffer to start
* inserting to.
@@ -1309,9 +1304,6 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
}
Assert(written == write_len);
- /* Align the end position, so that the next record starts aligned */
- CurrPos = MAXALIGN64(CurrPos);
-
/*
* If this was an xlog-switch, it's not enough to write the switch record,
* we also have to consume all the remaining space in the WAL segment. We
@@ -1341,6 +1333,11 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
CurrPos += XLOG_BLCKSZ;
}
}
+ else
+ {
+ /* Align the end position, so that the next record starts aligned */
+ CurrPos = MAXALIGN64(CurrPos);
+ }
if (CurrPos != EndPos)
elog(PANIC, "space reserved for WAL record does not match what was written");
@@ -4470,6 +4467,7 @@ BootStrapXLOG(void)
XLogPageHeader page;
XLogLongPageHeader longpage;
XLogRecord *record;
+ char *recptr;
bool use_existent;
uint64 sysidentifier;
struct timeval tv;
@@ -4541,17 +4539,23 @@ BootStrapXLOG(void)
longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
/* Insert the initial checkpoint record */
- record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
+ recptr = ((char *) page + SizeOfXLogLongPHD);
+ record = (XLogRecord *) recptr;
record->xl_prev = 0;
record->xl_xid = InvalidTransactionId;
- record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
- record->xl_len = sizeof(checkPoint);
+ record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID;
- memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
+ recptr += SizeOfXLogRecord;
+ /* fill the XLogRecordDataHeaderShort struct */
+ *(recptr++) = XLR_BLOCK_ID_DATA_SHORT;
+ *(recptr++) = sizeof(checkPoint);
+ memcpy(recptr, &checkPoint, sizeof(checkPoint));
+ recptr += sizeof(checkPoint);
+ Assert(recptr - (char *) record == record->xl_tot_len);
INIT_CRC32C(crc);
- COMP_CRC32C(crc, &checkPoint, sizeof(checkPoint));
+ COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(crc);
record->xl_crc = crc;
@@ -4984,36 +4988,37 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
* timestamps.
*/
static bool
-getRecordTimestamp(XLogRecord *record, TimestampTz *recordXtime)
+getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
{
- uint8 record_info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 rmid = XLogRecGetRmid(record);
- if (record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ if (rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
{
*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
return true;
}
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
+ if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
{
*recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
return true;
}
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
+ if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
{
*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
return true;
}
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED)
+ if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED)
{
*recordXtime = ((xl_xact_commit_prepared *) XLogRecGetData(record))->crec.xact_time;
return true;
}
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
+ if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
{
*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
return true;
}
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED)
+ if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED)
{
*recordXtime = ((xl_xact_abort_prepared *) XLogRecGetData(record))->arec.xact_time;
return true;
@@ -5030,7 +5035,7 @@ getRecordTimestamp(XLogRecord *record, TimestampTz *recordXtime)
* new timeline's history file.
*/
static bool
-recoveryStopsBefore(XLogRecord *record)
+recoveryStopsBefore(XLogReaderState *record)
{
bool stopsHere = false;
uint8 record_info;
@@ -5052,14 +5057,14 @@ recoveryStopsBefore(XLogRecord *record)
}
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
- if (record->xl_rmid != RM_XACT_ID)
+ if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
- record_info = record->xl_info & ~XLR_INFO_MASK;
+ record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
{
isCommit = true;
- recordXid = record->xl_xid;
+ recordXid = XLogRecGetXid(record);
}
else if (record_info == XLOG_XACT_COMMIT_PREPARED)
{
@@ -5069,7 +5074,7 @@ recoveryStopsBefore(XLogRecord *record)
else if (record_info == XLOG_XACT_ABORT)
{
isCommit = false;
- recordXid = record->xl_xid;
+ recordXid = XLogRecGetXid(record);
}
else if (record_info == XLOG_XACT_ABORT_PREPARED)
{
@@ -5140,19 +5145,21 @@ recoveryStopsBefore(XLogRecord *record)
* record in XLogCtl->recoveryLastXTime.
*/
static bool
-recoveryStopsAfter(XLogRecord *record)
+recoveryStopsAfter(XLogReaderState *record)
{
uint8 record_info;
+ uint8 rmid;
TimestampTz recordXtime;
- record_info = record->xl_info & ~XLR_INFO_MASK;
+ record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ rmid = XLogRecGetRmid(record);
/*
* There can be many restore points that share the same name; we stop at
* the first one.
*/
if (recoveryTarget == RECOVERY_TARGET_NAME &&
- record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
{
xl_restore_point *recordRestorePointData;
@@ -5173,7 +5180,7 @@ recoveryStopsAfter(XLogRecord *record)
}
}
- if (record->xl_rmid == RM_XACT_ID &&
+ if (rmid == RM_XACT_ID &&
(record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED ||
@@ -5192,7 +5199,7 @@ recoveryStopsAfter(XLogRecord *record)
else if (record_info == XLOG_XACT_ABORT_PREPARED)
recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
else
- recordXid = record->xl_xid;
+ recordXid = XLogRecGetXid(record);
/*
* There can be only one transaction end record with this exact
@@ -5307,7 +5314,7 @@ SetRecoveryPause(bool recoveryPause)
* usability.
*/
static bool
-recoveryApplyDelay(XLogRecord *record)
+recoveryApplyDelay(XLogReaderState *record)
{
uint8 record_info;
TimestampTz xtime;
@@ -5326,8 +5333,8 @@ recoveryApplyDelay(XLogRecord *record)
* so there is already opportunity for issues caused by early conflicts on
* standbys.
*/
- record_info = record->xl_info & ~XLR_INFO_MASK;
- if (!(record->xl_rmid == RM_XACT_ID &&
+ record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ if (!(XLogRecGetRmid(record) == RM_XACT_ID &&
(record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED)))
@@ -5696,7 +5703,7 @@ StartupXLOG(void)
record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
if (record != NULL)
{
- memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+ memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
ereport(DEBUG1,
(errmsg("checkpoint record is at %X/%X",
@@ -5793,7 +5800,7 @@ StartupXLOG(void)
ereport(PANIC,
(errmsg("could not locate a valid checkpoint record")));
}
- memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+ memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
}
@@ -6230,9 +6237,9 @@ StartupXLOG(void)
appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
(uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
- xlog_outrec(&buf, record);
+ xlog_outrec(&buf, xlogreader);
appendStringInfoString(&buf, " - ");
- xlog_outdesc(&buf, record->xl_rmid, record);
+ xlog_outdesc(&buf, xlogreader);
elog(LOG, "%s", buf.data);
pfree(buf.data);
}
@@ -6260,7 +6267,7 @@ StartupXLOG(void)
/*
* Have we reached our recovery target?
*/
- if (recoveryStopsBefore(record))
+ if (recoveryStopsBefore(xlogreader))
{
reachedStopPoint = true; /* see below */
break;
@@ -6270,7 +6277,7 @@ StartupXLOG(void)
* If we've been asked to lag the master, wait on latch until
* enough time has passed.
*/
- if (recoveryApplyDelay(record))
+ if (recoveryApplyDelay(xlogreader))
{
/*
* We test for paused recovery again here. If user sets
@@ -6285,7 +6292,7 @@ StartupXLOG(void)
/* Setup error traceback support for ereport() */
errcallback.callback = rm_redo_error_callback;
- errcallback.arg = (void *) record;
+ errcallback.arg = (void *) xlogreader;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
@@ -6324,7 +6331,7 @@ StartupXLOG(void)
{
CheckPoint checkPoint;
- memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+ memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
newTLI = checkPoint.ThisTimeLineID;
prevTLI = checkPoint.PrevTimeLineID;
}
@@ -6332,7 +6339,7 @@ StartupXLOG(void)
{
xl_end_of_recovery xlrec;
- memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery));
+ memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
newTLI = xlrec.ThisTimeLineID;
prevTLI = xlrec.PrevTimeLineID;
}
@@ -6366,7 +6373,7 @@ StartupXLOG(void)
RecordKnownAssignedTransactionIds(record->xl_xid);
/* Now apply the WAL record itself */
- RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
+ RmgrTable[record->xl_rmid].rm_redo(xlogreader);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -6394,7 +6401,7 @@ StartupXLOG(void)
WalSndWakeup();
/* Exit loop if we reached inclusive recovery target */
- if (recoveryStopsAfter(record))
+ if (recoveryStopsAfter(xlogreader))
{
reachedStopPoint = true;
break;
@@ -7148,8 +7155,7 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
}
return NULL;
}
- if (record->xl_len != sizeof(CheckPoint) ||
- record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
+ if (record->xl_tot_len != SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint))
{
switch (whichChkpt)
{
@@ -7194,6 +7200,9 @@ InitXLOGAccess(void)
(void) GetRedoRecPtr();
/* Also update our copy of doPageWrites. */
doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
+
+ /* Also initialize the working areas for constructing WAL records */
+ InitXLogInsert();
}
/*
@@ -7490,7 +7499,6 @@ CreateCheckPoint(int flags)
CheckPoint checkPoint;
XLogRecPtr recptr;
XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecData rdata;
uint32 freespace;
XLogSegNo _logSegNo;
XLogRecPtr curInsert;
@@ -7760,15 +7768,11 @@ CreateCheckPoint(int flags)
/*
* Now insert the checkpoint record into XLOG.
*/
- rdata.data = (char *) (&checkPoint);
- rdata.len = sizeof(checkPoint);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
-
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&checkPoint), sizeof(checkPoint));
recptr = XLogInsert(RM_XLOG_ID,
shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
- XLOG_CHECKPOINT_ONLINE,
- &rdata);
+ XLOG_CHECKPOINT_ONLINE);
XLogFlush(recptr);
@@ -7908,7 +7912,6 @@ static void
CreateEndOfRecoveryRecord(void)
{
xl_end_of_recovery xlrec;
- XLogRecData rdata;
XLogRecPtr recptr;
/* sanity check */
@@ -7926,12 +7929,9 @@ CreateEndOfRecoveryRecord(void)
START_CRIT_SECTION();
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof(xl_end_of_recovery);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
-
- recptr = XLogInsert(RM_XLOG_ID, XLOG_END_OF_RECOVERY, &rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xl_end_of_recovery));
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_END_OF_RECOVERY);
XLogFlush(recptr);
@@ -8307,13 +8307,9 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
void
XLogPutNextOid(Oid nextOid)
{
- XLogRecData rdata;
-
- rdata.data = (char *) (&nextOid);
- rdata.len = sizeof(Oid);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
- (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&nextOid), sizeof(Oid));
+ (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID);
/*
* We need not flush the NEXTOID record immediately, because any of the
@@ -8349,15 +8345,10 @@ XLogRecPtr
RequestXLogSwitch(void)
{
XLogRecPtr RecPtr;
- XLogRecData rdata;
-
- /* XLOG SWITCH, alone among xlog record types, has no data */
- rdata.buffer = InvalidBuffer;
- rdata.data = NULL;
- rdata.len = 0;
- rdata.next = NULL;
- RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
+ /* XLOG SWITCH has no data */
+ XLogBeginInsert();
+ RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH);
return RecPtr;
}
@@ -8369,18 +8360,15 @@ XLogRecPtr
XLogRestorePoint(const char *rpName)
{
XLogRecPtr RecPtr;
- XLogRecData rdata;
xl_restore_point xlrec;
xlrec.rp_time = GetCurrentTimestamp();
strlcpy(xlrec.rp_name, rpName, MAXFNAMELEN);
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof(xl_restore_point);
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xl_restore_point));
- RecPtr = XLogInsert(RM_XLOG_ID, XLOG_RESTORE_POINT, &rdata);
+ RecPtr = XLogInsert(RM_XLOG_ID, XLOG_RESTORE_POINT);
ereport(LOG,
(errmsg("restore point \"%s\" created at %X/%X",
@@ -8412,7 +8400,6 @@ XLogReportParameters(void)
*/
if (wal_level != ControlFile->wal_level || XLogIsNeeded())
{
- XLogRecData rdata;
xl_parameter_change xlrec;
XLogRecPtr recptr;
@@ -8423,12 +8410,10 @@ XLogReportParameters(void)
xlrec.wal_level = wal_level;
xlrec.wal_log_hints = wal_log_hints;
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof(xlrec);
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xlrec));
- recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE, &rdata);
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE);
XLogFlush(recptr);
}
@@ -8486,14 +8471,10 @@ UpdateFullPageWrites(void)
*/
if (XLogStandbyInfoActive() && !RecoveryInProgress())
{
- XLogRecData rdata;
-
- rdata.data = (char *) (&fullPageWrites);
- rdata.len = sizeof(bool);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&fullPageWrites), sizeof(bool));
- XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE, &rdata);
+ XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE);
}
if (!fullPageWrites)
@@ -8558,12 +8539,13 @@ checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI)
* not all record types are related to control file updates.
*/
void
-xlog_redo(XLogRecPtr lsn, XLogRecord *record)
+xlog_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ XLogRecPtr lsn = record->EndRecPtr;
- /* Backup blocks are not used by XLOG rmgr */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ /* in XLOG rmgr, backup blocks are only used by XLOG_FPI records */
+ Assert(!XLogRecHasAnyBlockRefs(record) || info == XLOG_FPI);
if (info == XLOG_NEXTOID)
{
@@ -8750,14 +8732,12 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
}
else if (info == XLOG_FPI)
{
- char *data;
- BkpBlock bkpb;
+ Buffer buffer;
/*
- * Full-page image (FPI) records contain a backup block stored
- * "inline" in the normal data since the locking when writing hint
- * records isn't sufficient to use the normal backup block mechanism,
- * which assumes exclusive lock on the buffer supplied.
+ * Full-page image (FPI) records contain nothing else but a backup
+ * block. The block reference must include a full-page image -
+ * otherwise there would be no point in this record.
*
* Since the only change in these backup block are hint bits, there
* are no recovery conflicts generated.
@@ -8766,11 +8746,9 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
* smgr implementation has no need to implement anything. Which means
* nothing is needed in md.c etc
*/
- data = XLogRecGetData(record);
- memcpy(&bkpb, data, sizeof(BkpBlock));
- data += sizeof(BkpBlock);
-
- RestoreBackupBlockContents(lsn, bkpb, data, false, false);
+ if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED)
+ elog(ERROR, "unexpected XLogReadBufferForRedo result when restoring backup block");
+ UnlockReleaseBuffer(buffer);
}
else if (info == XLOG_BACKUP_END)
{
@@ -8867,22 +8845,42 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
#ifdef WAL_DEBUG
static void
-xlog_outrec(StringInfo buf, XLogRecord *record)
+xlog_outrec(StringInfo buf, XLogReaderState *record)
{
- int i;
+ int block_id;
appendStringInfo(buf, "prev %X/%X; xid %u",
- (uint32) (record->xl_prev >> 32),
- (uint32) record->xl_prev,
- record->xl_xid);
+ (uint32) (XLogRecGetPrev(record) >> 32),
+ (uint32) XLogRecGetPrev(record),
+ XLogRecGetXid(record));
appendStringInfo(buf, "; len %u",
- record->xl_len);
+ XLogRecGetDataLen(record));
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+ /* decode block references */
+ for (block_id = 0; block_id <= record->max_block_id; block_id++)
{
- if (record->xl_info & XLR_BKP_BLOCK(i))
- appendStringInfo(buf, "; bkpb%d", i);
+ RelFileNode rnode;
+ ForkNumber forknum;
+ BlockNumber blk;
+
+ if (!XLogRecHasBlockRef(record, block_id))
+ continue;
+
+ XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blk);
+ if (forknum != MAIN_FORKNUM)
+ appendStringInfo(buf, "; blkref #%u: rel %u/%u/%u, fork %u, blk %u",
+ block_id,
+ rnode.spcNode, rnode.dbNode, rnode.relNode,
+ forknum,
+ blk);
+ else
+ appendStringInfo(buf, "; blkref #%u: rel %u/%u/%u, blk %u",
+ block_id,
+ rnode.spcNode, rnode.dbNode, rnode.relNode,
+ blk);
+ if (XLogRecHasBlockImage(record, block_id))
+ appendStringInfo(buf, " FPW");
}
}
#endif /* WAL_DEBUG */
@@ -8892,17 +8890,18 @@ xlog_outrec(StringInfo buf, XLogRecord *record)
* optionally followed by a colon, a space, and a further description.
*/
static void
-xlog_outdesc(StringInfo buf, RmgrId rmid, XLogRecord *record)
+xlog_outdesc(StringInfo buf, XLogReaderState *record)
{
+ RmgrId rmid = XLogRecGetRmid(record);
+ uint8 info = XLogRecGetInfo(record);
const char *id;
appendStringInfoString(buf, RmgrTable[rmid].rm_name);
appendStringInfoChar(buf, '/');
- id = RmgrTable[rmid].rm_identify(record->xl_info);
+ id = RmgrTable[rmid].rm_identify(info);
if (id == NULL)
- appendStringInfo(buf, "UNKNOWN (%X): ",
- record->xl_info & ~XLR_INFO_MASK);
+ appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
else
appendStringInfo(buf, "%s: ", id);
@@ -9411,7 +9410,6 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
XLogRecPtr startpoint;
XLogRecPtr stoppoint;
TimeLineID stoptli;
- XLogRecData rdata;
pg_time_t stamp_time;
char strfbuf[128];
char histfilepath[MAXPGPATH];
@@ -9618,11 +9616,9 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
/*
* Write the backup-end xlog record
*/
- rdata.data = (char *) (&startpoint);
- rdata.len = sizeof(startpoint);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
- stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&startpoint), sizeof(startpoint));
+ stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END);
stoptli = ThisTimeLineID;
/*
@@ -9930,15 +9926,13 @@ read_backup_label(XLogRecPtr *checkPointLoc, bool *backupEndRequired,
static void
rm_redo_error_callback(void *arg)
{
- XLogRecord *record = (XLogRecord *) arg;
+ XLogReaderState *record = (XLogReaderState *) arg;
StringInfoData buf;
initStringInfo(&buf);
- xlog_outdesc(&buf, record->xl_rmid, record);
+ xlog_outdesc(&buf, record);
- /* don't bother emitting empty description */
- if (buf.len > 0)
- errcontext("xlog redo %s", buf.data);
+ errcontext("xlog redo %s", buf.data);
pfree(buf.data);
}