aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 17:56:26 +0200
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 18:46:41 +0200
commit2c03216d831160bedd72d45f712601b6f7d03f1c (patch)
treeab6a03d031ffa605d848b0b7067add15e56e2207 /src/backend/access/transam
parent8dc626defec23016dd5988208d8704b858b9d21d (diff)
downloadpostgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.tar.gz
postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.zip
Revamp the WAL record format.
Each WAL record now carries information about the modified relation and block(s) in a standardized format. That makes it easier to write tools that need that information, like pg_rewind, prefetching the blocks to speed up recovery, etc. There's a whole new API for building WAL records, replacing the XLogRecData chains used previously. The new API consists of XLogRegister* functions, which are called for each buffer and chunk of data that is added to the record. The new API also gives more control over when a full-page image is written, by passing flags to the XLogRegisterBuffer function. This also simplifies the XLogReadBufferForRedo() calls. The function can dig the relation and block number from the WAL record, so they no longer need to be passed as arguments. For the convenience of redo routines, XLogReader now disects each WAL record after reading it, copying the main data part and the per-block data into MAXALIGNed buffers. The data chunks are not aligned within the WAL record, but the redo routines can assume that the pointers returned by XLogRecGet* functions are. Redo routines are now passed the XLogReaderState, which contains the record in the already-disected format, instead of the plain XLogRecord. The new record format also makes the fixed size XLogRecord header smaller, by removing the xl_len field. The length of the "main data" portion is now stored at the end of the WAL record, and there's a separate header after XLogRecord for it. The alignment padding at the end of XLogRecord is also removed. This compansates for the fact that the new format would otherwise be more bulky than the old format. Reviewed by Andres Freund, Amit Kapila, Michael Paquier, Alvaro Herrera, Fujii Masao.
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r--src/backend/access/transam/README249
-rw-r--r--src/backend/access/transam/clog.c25
-rw-r--r--src/backend/access/transam/multixact.c33
-rw-r--r--src/backend/access/transam/twophase.c105
-rw-r--r--src/backend/access/transam/xact.c129
-rw-r--r--src/backend/access/transam/xlog.c348
-rw-r--r--src/backend/access/transam/xloginsert.c972
-rw-r--r--src/backend/access/transam/xlogreader.c486
-rw-r--r--src/backend/access/transam/xlogutils.c237
9 files changed, 1480 insertions, 1104 deletions
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README
index 92b12fbb6c2..ba6ae05d653 100644
--- a/src/backend/access/transam/README
+++ b/src/backend/access/transam/README
@@ -440,96 +440,164 @@ happen before the WAL record is inserted; see notes in SyncOneBuffer().)
Note that marking a buffer dirty with MarkBufferDirty() should only
happen iff you write a WAL record; see Writing Hints below.
-5. If the relation requires WAL-logging, build a WAL log record and pass it
-to XLogInsert(); then update the page's LSN using the returned XLOG
-location. For instance,
+5. If the relation requires WAL-logging, build a WAL record using
+XLogBeginInsert and XLogRegister* functions, and insert it. (See
+"Constructing a WAL record" below). Then update the page's LSN using the
+returned XLOG location. For instance,
- recptr = XLogInsert(rmgr_id, info, rdata);
+ XLogBeginInsert();
+ XLogRegisterBuffer(...)
+ XLogRegisterData(...)
+ recptr = XLogInsert(rmgr_id, info);
PageSetLSN(dp, recptr);
- // Note that we no longer do PageSetTLI() from 9.3 onwards
- // since that field on a page has now changed its meaning.
6. END_CRIT_SECTION()
7. Unlock and unpin the buffer(s).
-XLogInsert's "rdata" argument is an array of pointer/size items identifying
-chunks of data to be written in the XLOG record, plus optional shared-buffer
-IDs for chunks that are in shared buffers rather than temporary variables.
-The "rdata" array must mention (at least once) each of the shared buffers
-being modified, unless the action is such that the WAL replay routine can
-reconstruct the entire page contents. XLogInsert includes the logic that
-tests to see whether a shared buffer has been modified since the last
-checkpoint. If not, the entire page contents are logged rather than just the
-portion(s) pointed to by "rdata".
-
-Because XLogInsert drops the rdata components associated with buffers it
-chooses to log in full, the WAL replay routines normally need to test to see
-which buffers were handled that way --- otherwise they may be misled about
-what the XLOG record actually contains. XLOG records that describe multi-page
-changes therefore require some care to design: you must be certain that you
-know what data is indicated by each "BKP" bit. An example of the trickiness
-is that in a HEAP_UPDATE record, BKP(0) normally is associated with the source
-page and BKP(1) is associated with the destination page --- but if these are
-the same page, only BKP(0) would have been set.
-
-For this reason as well as the risk of deadlocking on buffer locks, it's best
-to design WAL records so that they reflect small atomic actions involving just
-one or a few pages. The current XLOG infrastructure cannot handle WAL records
-involving references to more than four shared buffers, anyway.
-
-In the case where the WAL record contains enough information to re-generate
-the entire contents of a page, do *not* show that page's buffer ID in the
-rdata array, even if some of the rdata items point into the buffer. This is
-because you don't want XLogInsert to log the whole page contents. The
-standard replay-routine pattern for this case is
-
- buffer = XLogReadBuffer(rnode, blkno, true);
- Assert(BufferIsValid(buffer));
- page = (Page) BufferGetPage(buffer);
-
- ... initialize the page ...
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- UnlockReleaseBuffer(buffer);
-
-In the case where the WAL record provides only enough information to
-incrementally update the page, the rdata array *must* mention the buffer
-ID at least once; otherwise there is no defense against torn-page problems.
-The standard replay-routine pattern for this case is
-
- if (XLogReadBufferForRedo(lsn, record, N, rnode, blkno, &buffer) == BLK_NEEDS_REDO)
- {
- page = (Page) BufferGetPage(buffer);
-
- ... apply the change ...
-
- PageSetLSN(page, lsn);
- MarkBufferDirty(buffer);
- }
- if (BufferIsValid(buffer))
- UnlockReleaseBuffer(buffer);
-
-XLogReadBufferForRedo reads the page from disk, and checks what action needs to
-be taken to the page. If the XLR_BKP_BLOCK(N) flag is set, it restores the
-full page image and returns BLK_RESTORED. If there is no full page image, but
-page cannot be found or if the change has already been replayed (i.e. the
-page's LSN >= the record we're replaying), it returns BLK_NOTFOUND or BLK_DONE,
-respectively. Usually, the redo routine only needs to pay attention to the
-BLK_NEEDS_REDO return code, which means that the routine should apply the
-incremental change. In any case, the caller is responsible for unlocking and
-releasing the buffer. Note that XLogReadBufferForRedo returns the buffer
-locked even if no redo is required, unless the page does not exist.
-
-As noted above, for a multi-page update you need to be able to determine
-which XLR_BKP_BLOCK(N) flag applies to each page. If a WAL record reflects
-a combination of fully-rewritable and incremental updates, then the rewritable
-pages don't count for the XLR_BKP_BLOCK(N) numbering. (XLR_BKP_BLOCK(N) is
-associated with the N'th distinct buffer ID seen in the "rdata" array, and
-per the above discussion, fully-rewritable buffers shouldn't be mentioned in
-"rdata".)
+Complex changes (such as a multilevel index insertion) normally need to be
+described by a series of atomic-action WAL records. The intermediate states
+must be self-consistent, so that if the replay is interrupted between any
+two actions, the system is fully functional. In btree indexes, for example,
+a page split requires a new page to be allocated, and an insertion of a new
+key in the parent btree level, but for locking reasons this has to be
+reflected by two separate WAL records. Replaying the first record, to
+allocate the new page and move tuples to it, sets a flag on the page to
+indicate that the key has not been inserted to the parent yet. Replaying the
+second record clears the flag. This intermediate state is never seen by
+other backends during normal operation, because the lock on the child page
+is held across the two actions, but will be seen if the operation is
+interrupted before writing the second WAL record. The search algorithm works
+with the intermediate state as normal, but if an insertion encounters a page
+with the incomplete-split flag set, it will finish the interrupted split by
+inserting the key to the parent, before proceeding.
+
+
+Constructing a WAL record
+-------------------------
+
+A WAL record consists of a header common to all WAL record types,
+record-specific data, and information about the data blocks modified. Each
+modified data block is identified by an ID number, and can optionally have
+more record-specific data associated with the block. If XLogInsert decides
+that a full-page image of a block needs to be taken, the data associated
+with that block is not included.
+
+The API for constructing a WAL record consists of five functions:
+XLogBeginInsert, XLogRegisterBuffer, XLogRegisterData, XLogRegisterBufData,
+and XLogInsert. First, call XLogBeginInsert(). Then register all the buffers
+modified, and data needed to replay the changes, using XLogRegister*
+functions. Finally, insert the constructed record to the WAL by calling
+XLogInsert().
+
+ XLogBeginInsert();
+
+ /* register buffers modified as part of this WAL-logged action */
+ XLogRegisterBuffer(0, lbuffer, REGBUF_STANDARD);
+ XLogRegisterBuffer(1, rbuffer, REGBUF_STANDARD);
+
+ /* register data that is always included in the WAL record */
+ XLogRegisterData(&xlrec, SizeOfFictionalAction);
+
+ /*
+ * register data associated with a buffer. This will not be included
+ * in the record if a full-page image is taken.
+ */
+ XLogRegisterBufData(0, tuple->data, tuple->len);
+
+ /* more data associated with the buffer */
+ XLogRegisterBufData(0, data2, len2);
+
+ /*
+ * Ok, all the data and buffers to include in the WAL record have
+ * been registered. Insert the record.
+ */
+ recptr = XLogInsert(RM_FOO_ID, XLOG_FOOBAR_DO_STUFF);
+
+Details of the API functions:
+
+void XLogBeginInsert(void)
+
+ Must be called before XLogRegisterBuffer and XLogRegisterData.
+
+void XLogResetInsertion(void)
+
+ Clear any currently registered data and buffers from the WAL record
+ construction workspace. This is only needed if you have already called
+ XLogBeginInsert(), but decide to not insert the record after all.
+
+void XLogEnsureRecordSpace(int max_block_id, int nrdatas)
+
+ Normally, the WAL record construction buffers have the following limits:
+
+ * highest block ID that can be used is 4 (allowing five block references)
+ * Max 20 chunks of registered data
+
+ These default limits are enough for most record types that change some
+ on-disk structures. For the odd case that requires more data, or needs to
+ modify more buffers, these limits can be raised by calling
+ XLogEnsureRecordSpace(). XLogEnsureRecordSpace() must be called before
+ XLogBeginInsert(), and outside a critical section.
+
+void XLogRegisterBuffer(uint8 block_id, Buffer buf, uint8 flags);
+
+ XLogRegisterBuffer adds information about a data block to the WAL record.
+ block_id is an arbitrary number used to identify this page reference in
+ the redo routine. The information needed to re-find the page at redo -
+ relfilenode, fork, and block number - are included in the WAL record.
+
+ XLogInsert will automatically include a full copy of the page contents, if
+ this is the first modification of the buffer since the last checkpoint.
+ It is important to register every buffer modified by the action with
+ XLogRegisterBuffer, to avoid torn-page hazards.
+
+ The flags control when and how the buffer contents are included in the
+ WAL record. Normally, a full-page image is taken only if the page has not
+ been modified since the last checkpoint, and only if full_page_writes=on
+ or an online backup is in progress. The REGBUF_FORCE_IMAGE flag can be
+ used to force a full-page image to always be included; that is useful
+ e.g. for an operation that rewrites most of the page, so that tracking the
+ details is not worth it. For the rare case where it is not necessary to
+ protect from torn pages, REGBUF_NO_IMAGE flag can be used to suppress
+ full page image from being taken. REGBUF_WILL_INIT also suppresses a full
+ page image, but the redo routine must re-generate the page from scratch,
+ without looking at the old page contents. Re-initializing the page
+ protects from torn page hazards like a full page image does.
+
+ The REGBUF_STANDARD flag can be specified together with the other flags to
+ indicate that the page follows the standard page layout. It causes the
+ area between pd_lower and pd_upper to be left out from the image, reducing
+ WAL volume.
+
+ If the REGBUF_KEEP_DATA flag is given, any per-buffer data registered with
+ XLogRegisterBufData() is included in the WAL record even if a full-page
+ image is taken.
+
+void XLogRegisterData(char *data, int len);
+
+ XLogRegisterData is used to include arbitrary data in the WAL record. If
+ XLogRegisterData() is called multiple times, the data are appended, and
+ will be made available to the redo routine as one contiguous chunk.
+
+void XLogRegisterBufData(uint8 block_id, char *data, int len);
+
+ XLogRegisterBufData is used to include data associated with a particular
+ buffer that was registered earlier with XLogRegisterBuffer(). If
+ XLogRegisterBufData() is called multiple times with the same block ID, the
+ data are appended, and will be made available to the redo routine as one
+ contiguous chunk.
+
+ If a full-page image of the buffer is taken at insertion, the data is not
+ included in the WAL record, unless the REGBUF_KEEP_DATA flag is used.
+
+
+Writing a REDO routine
+----------------------
+
+A REDO routine uses the data and page references included in the WAL record
+to reconstruct the new state of the page. The record decoding functions
+and macros in xlogreader.c/h can be used to extract the data from the record.
When replaying a WAL record that describes changes on multiple pages, you
must be careful to lock the pages properly to prevent concurrent Hot Standby
@@ -545,23 +613,6 @@ either an exclusive buffer lock or a shared lock plus buffer header lock,
or be writing the data block directly rather than through shared buffers
while holding AccessExclusiveLock on the relation.
-Due to all these constraints, complex changes (such as a multilevel index
-insertion) normally need to be described by a series of atomic-action WAL
-records. The intermediate states must be self-consistent, so that if the
-replay is interrupted between any two actions, the system is fully
-functional. In btree indexes, for example, a page split requires a new page
-to be allocated, and an insertion of a new key in the parent btree level,
-but for locking reasons this has to be reflected by two separate WAL
-records. Replaying the first record, to allocate the new page and move
-tuples to it, sets a flag on the page to indicate that the key has not been
-inserted to the parent yet. Replaying the second record clears the flag.
-This intermediate state is never seen by other backends during normal
-operation, because the lock on the child page is held across the two
-actions, but will be seen if the operation is interrupted before writing
-the second WAL record. The search algorithm works with the intermediate
-state as normal, but if an insertion encounters a page with the
-incomplete-split flag set, it will finish the interrupted split by
-inserting the key to the parent, before proceeding.
Writing Hints
-------------
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 5ee070bd0a9..313bd042404 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -699,13 +699,9 @@ CLOGPagePrecedes(int page1, int page2)
static void
WriteZeroPageXlogRec(int pageno)
{
- XLogRecData rdata;
-
- rdata.data = (char *) (&pageno);
- rdata.len = sizeof(int);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
- (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE, &rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&pageno), sizeof(int));
+ (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE);
}
/*
@@ -717,14 +713,11 @@ WriteZeroPageXlogRec(int pageno)
static void
WriteTruncateXlogRec(int pageno)
{
- XLogRecData rdata;
XLogRecPtr recptr;
- rdata.data = (char *) (&pageno);
- rdata.len = sizeof(int);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
- recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE, &rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&pageno), sizeof(int));
+ recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE);
XLogFlush(recptr);
}
@@ -732,12 +725,12 @@ WriteTruncateXlogRec(int pageno)
* CLOG resource manager's routines
*/
void
-clog_redo(XLogRecPtr lsn, XLogRecord *record)
+clog_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
/* Backup blocks are not used in clog records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ Assert(!XLogRecHasAnyBlockRefs(record));
if (info == CLOG_ZEROPAGE)
{
diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c
index 3c20bb37e4c..fff9f837330 100644
--- a/src/backend/access/transam/multixact.c
+++ b/src/backend/access/transam/multixact.c
@@ -720,7 +720,6 @@ MultiXactIdCreateFromMembers(int nmembers, MultiXactMember *members)
{
MultiXactId multi;
MultiXactOffset offset;
- XLogRecData rdata[2];
xl_multixact_create xlrec;
debug_elog3(DEBUG2, "Create: %s",
@@ -796,17 +795,11 @@ MultiXactIdCreateFromMembers(int nmembers, MultiXactMember *members)
* the status flags in one XLogRecData, then all the xids in another one?
* Not clear that it's worth the trouble though.
*/
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = SizeOfMultiXactCreate;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), SizeOfMultiXactCreate);
+ XLogRegisterData((char *) members, nmembers * sizeof(MultiXactMember));
- rdata[1].data = (char *) members;
- rdata[1].len = nmembers * sizeof(MultiXactMember);
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
-
- (void) XLogInsert(RM_MULTIXACT_ID, XLOG_MULTIXACT_CREATE_ID, rdata);
+ (void) XLogInsert(RM_MULTIXACT_ID, XLOG_MULTIXACT_CREATE_ID);
/* Now enter the information into the OFFSETs and MEMBERs logs */
RecordNewMultiXact(multi, offset, nmembers, members);
@@ -2705,25 +2698,21 @@ MultiXactOffsetPrecedes(MultiXactOffset offset1, MultiXactOffset offset2)
static void
WriteMZeroPageXlogRec(int pageno, uint8 info)
{
- XLogRecData rdata;
-
- rdata.data = (char *) (&pageno);
- rdata.len = sizeof(int);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
- (void) XLogInsert(RM_MULTIXACT_ID, info, &rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&pageno), sizeof(int));
+ (void) XLogInsert(RM_MULTIXACT_ID, info);
}
/*
* MULTIXACT resource manager's routines
*/
void
-multixact_redo(XLogRecPtr lsn, XLogRecord *record)
+multixact_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
/* Backup blocks are not used in multixact records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ Assert(!XLogRecHasAnyBlockRefs(record));
if (info == XLOG_MULTIXACT_ZERO_OFF_PAGE)
{
@@ -2775,7 +2764,7 @@ multixact_redo(XLogRecPtr lsn, XLogRecord *record)
* should be unnecessary, since any XID found here ought to have other
* evidence in the XLOG, but let's be safe.
*/
- max_xid = record->xl_xid;
+ max_xid = XLogRecGetXid(record);
for (i = 0; i < xlrec->nmembers; i++)
{
if (TransactionIdPrecedes(max_xid, xlrec->members[i].xid))
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index d23c292edcd..40de84e934e 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -889,14 +889,21 @@ typedef struct TwoPhaseRecordOnDisk
/*
* During prepare, the state file is assembled in memory before writing it
- * to WAL and the actual state file. We use a chain of XLogRecData blocks
- * so that we will be able to pass the state file contents directly to
- * XLogInsert.
+ * to WAL and the actual state file. We use a chain of StateFileChunk blocks
+ * for that.
*/
+typedef struct StateFileChunk
+{
+ char *data;
+ uint32 len;
+ struct StateFileChunk *next;
+} StateFileChunk;
+
static struct xllist
{
- XLogRecData *head; /* first data block in the chain */
- XLogRecData *tail; /* last block in chain */
+ StateFileChunk *head; /* first data block in the chain */
+ StateFileChunk *tail; /* last block in chain */
+ uint32 num_chunks;
uint32 bytes_free; /* free bytes left in tail block */
uint32 total_len; /* total data bytes in chain */
} records;
@@ -917,11 +924,11 @@ save_state_data(const void *data, uint32 len)
if (padlen > records.bytes_free)
{
- records.tail->next = palloc0(sizeof(XLogRecData));
+ records.tail->next = palloc0(sizeof(StateFileChunk));
records.tail = records.tail->next;
- records.tail->buffer = InvalidBuffer;
records.tail->len = 0;
records.tail->next = NULL;
+ records.num_chunks++;
records.bytes_free = Max(padlen, 512);
records.tail->data = palloc(records.bytes_free);
@@ -951,8 +958,7 @@ StartPrepare(GlobalTransaction gxact)
SharedInvalidationMessage *invalmsgs;
/* Initialize linked list */
- records.head = palloc0(sizeof(XLogRecData));
- records.head->buffer = InvalidBuffer;
+ records.head = palloc0(sizeof(StateFileChunk));
records.head->len = 0;
records.head->next = NULL;
@@ -960,6 +966,7 @@ StartPrepare(GlobalTransaction gxact)
records.head->data = palloc(records.bytes_free);
records.tail = records.head;
+ records.num_chunks = 1;
records.total_len = 0;
@@ -1019,7 +1026,7 @@ EndPrepare(GlobalTransaction gxact)
TransactionId xid = pgxact->xid;
TwoPhaseFileHeader *hdr;
char path[MAXPGPATH];
- XLogRecData *record;
+ StateFileChunk *record;
pg_crc32 statefile_crc;
pg_crc32 bogus_crc;
int fd;
@@ -1117,12 +1124,16 @@ EndPrepare(GlobalTransaction gxact)
* We save the PREPARE record's location in the gxact for later use by
* CheckPointTwoPhase.
*/
+ XLogEnsureRecordSpace(0, records.num_chunks);
+
START_CRIT_SECTION();
MyPgXact->delayChkpt = true;
- gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE,
- records.head);
+ XLogBeginInsert();
+ for (record = records.head; record != NULL; record = record->next)
+ XLogRegisterData(record->data, record->len);
+ gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
XLogFlush(gxact->prepare_lsn);
/* If we crash now, we have prepared: WAL replay will fix things */
@@ -1180,6 +1191,7 @@ EndPrepare(GlobalTransaction gxact)
SyncRepWaitForLSN(gxact->prepare_lsn);
records.tail = records.head = NULL;
+ records.num_chunks = 0;
}
/*
@@ -2071,8 +2083,6 @@ RecordTransactionCommitPrepared(TransactionId xid,
SharedInvalidationMessage *invalmsgs,
bool initfileinval)
{
- XLogRecData rdata[4];
- int lastrdata = 0;
xl_xact_commit_prepared xlrec;
XLogRecPtr recptr;
@@ -2094,39 +2104,24 @@ RecordTransactionCommitPrepared(TransactionId xid,
xlrec.crec.nsubxacts = nchildren;
xlrec.crec.nmsgs = ninvalmsgs;
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = MinSizeOfXactCommitPrepared;
- rdata[0].buffer = InvalidBuffer;
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
+
/* dump rels to delete */
if (nrels > 0)
- {
- rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rels;
- rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].buffer = InvalidBuffer;
- lastrdata = 1;
- }
+ XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
+
/* dump committed child Xids */
if (nchildren > 0)
- {
- rdata[lastrdata].next = &(rdata[2]);
- rdata[2].data = (char *) children;
- rdata[2].len = nchildren * sizeof(TransactionId);
- rdata[2].buffer = InvalidBuffer;
- lastrdata = 2;
- }
+ XLogRegisterData((char *) children,
+ nchildren * sizeof(TransactionId));
+
/* dump cache invalidation messages */
if (ninvalmsgs > 0)
- {
- rdata[lastrdata].next = &(rdata[3]);
- rdata[3].data = (char *) invalmsgs;
- rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage);
- rdata[3].buffer = InvalidBuffer;
- lastrdata = 3;
- }
- rdata[lastrdata].next = NULL;
+ XLogRegisterData((char *) invalmsgs,
+ ninvalmsgs * sizeof(SharedInvalidationMessage));
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata);
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
/*
* We don't currently try to sleep before flush here ... nor is there any
@@ -2169,8 +2164,6 @@ RecordTransactionAbortPrepared(TransactionId xid,
int nrels,
RelFileNode *rels)
{
- XLogRecData rdata[3];
- int lastrdata = 0;
xl_xact_abort_prepared xlrec;
XLogRecPtr recptr;
@@ -2189,30 +2182,20 @@ RecordTransactionAbortPrepared(TransactionId xid,
xlrec.arec.xact_time = GetCurrentTimestamp();
xlrec.arec.nrels = nrels;
xlrec.arec.nsubxacts = nchildren;
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = MinSizeOfXactAbortPrepared;
- rdata[0].buffer = InvalidBuffer;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
+
/* dump rels to delete */
if (nrels > 0)
- {
- rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rels;
- rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].buffer = InvalidBuffer;
- lastrdata = 1;
- }
+ XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
+
/* dump committed child Xids */
if (nchildren > 0)
- {
- rdata[lastrdata].next = &(rdata[2]);
- rdata[2].data = (char *) children;
- rdata[2].len = nchildren * sizeof(TransactionId);
- rdata[2].buffer = InvalidBuffer;
- lastrdata = 2;
- }
- rdata[lastrdata].next = NULL;
+ XLogRegisterData((char *) children,
+ nchildren * sizeof(TransactionId));
- recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata);
+ recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 6f92bad07ca..763e9deb6f5 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -571,7 +571,6 @@ AssignTransactionId(TransactionState s)
if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS ||
log_unknown_top)
{
- XLogRecData rdata[2];
xl_xact_assignment xlrec;
/*
@@ -582,17 +581,12 @@ AssignTransactionId(TransactionState s)
Assert(TransactionIdIsValid(xlrec.xtop));
xlrec.nsubxacts = nUnreportedXids;
- rdata[0].data = (char *) &xlrec;
- rdata[0].len = MinSizeOfXactAssignment;
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &rdata[1];
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, MinSizeOfXactAssignment);
+ XLogRegisterData((char *) unreportedXids,
+ nUnreportedXids * sizeof(TransactionId));
- rdata[1].data = (char *) unreportedXids;
- rdata[1].len = nUnreportedXids * sizeof(TransactionId);
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
-
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata);
+ (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT);
nUnreportedXids = 0;
/* mark top, not current xact as having been logged */
@@ -1087,8 +1081,6 @@ RecordTransactionCommit(void)
if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
XLogLogicalInfoActive())
{
- XLogRecData rdata[4];
- int lastrdata = 0;
xl_xact_commit xlrec;
/*
@@ -1107,63 +1099,38 @@ RecordTransactionCommit(void)
xlrec.nrels = nrels;
xlrec.nsubxacts = nchildren;
xlrec.nmsgs = nmsgs;
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = MinSizeOfXactCommit;
- rdata[0].buffer = InvalidBuffer;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit);
/* dump rels to delete */
if (nrels > 0)
- {
- rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rels;
- rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].buffer = InvalidBuffer;
- lastrdata = 1;
- }
+ XLogRegisterData((char *) rels,
+ nrels * sizeof(RelFileNode));
/* dump committed child Xids */
if (nchildren > 0)
- {
- rdata[lastrdata].next = &(rdata[2]);
- rdata[2].data = (char *) children;
- rdata[2].len = nchildren * sizeof(TransactionId);
- rdata[2].buffer = InvalidBuffer;
- lastrdata = 2;
- }
+ XLogRegisterData((char *) children,
+ nchildren * sizeof(TransactionId));
/* dump shared cache invalidation messages */
if (nmsgs > 0)
- {
- rdata[lastrdata].next = &(rdata[3]);
- rdata[3].data = (char *) invalMessages;
- rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage);
- rdata[3].buffer = InvalidBuffer;
- lastrdata = 3;
- }
- rdata[lastrdata].next = NULL;
-
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata);
+ XLogRegisterData((char *) invalMessages,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
}
else
{
- XLogRecData rdata[2];
- int lastrdata = 0;
xl_xact_commit_compact xlrec;
xlrec.xact_time = xactStopTimestamp;
xlrec.nsubxacts = nchildren;
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = MinSizeOfXactCommitCompact;
- rdata[0].buffer = InvalidBuffer;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact);
/* dump committed child Xids */
if (nchildren > 0)
- {
- rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) children;
- rdata[1].len = nchildren * sizeof(TransactionId);
- rdata[1].buffer = InvalidBuffer;
- lastrdata = 1;
- }
- rdata[lastrdata].next = NULL;
+ XLogRegisterData((char *) children,
+ nchildren * sizeof(TransactionId));
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT, rdata);
+ (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT);
}
}
@@ -1436,8 +1403,6 @@ RecordTransactionAbort(bool isSubXact)
RelFileNode *rels;
int nchildren;
TransactionId *children;
- XLogRecData rdata[3];
- int lastrdata = 0;
xl_xact_abort xlrec;
/*
@@ -1486,30 +1451,20 @@ RecordTransactionAbort(bool isSubXact)
}
xlrec.nrels = nrels;
xlrec.nsubxacts = nchildren;
- rdata[0].data = (char *) (&xlrec);
- rdata[0].len = MinSizeOfXactAbort;
- rdata[0].buffer = InvalidBuffer;
+
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
+
/* dump rels to delete */
if (nrels > 0)
- {
- rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rels;
- rdata[1].len = nrels * sizeof(RelFileNode);
- rdata[1].buffer = InvalidBuffer;
- lastrdata = 1;
- }
+ XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
+
/* dump committed child Xids */
if (nchildren > 0)
- {
- rdata[lastrdata].next = &(rdata[2]);
- rdata[2].data = (char *) children;
- rdata[2].len = nchildren * sizeof(TransactionId);
- rdata[2].buffer = InvalidBuffer;
- lastrdata = 2;
- }
- rdata[lastrdata].next = NULL;
+ XLogRegisterData((char *) children,
+ nchildren * sizeof(TransactionId));
- (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, rdata);
+ (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT);
/*
* Report the latest async abort LSN, so that the WAL writer knows to
@@ -2351,6 +2306,9 @@ AbortTransaction(void)
AbortBufferIO();
UnlockBuffers();
+ /* Reset WAL record construction state */
+ XLogResetInsertion();
+
/*
* Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this.
@@ -4299,6 +4257,9 @@ AbortSubTransaction(void)
AbortBufferIO();
UnlockBuffers();
+ /* Reset WAL record construction state */
+ XLogResetInsertion();
+
/*
* Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this.
@@ -4938,42 +4899,42 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
}
void
-xact_redo(XLogRecPtr lsn, XLogRecord *record)
+xact_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
/* Backup blocks are not used in xact records */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ Assert(!XLogRecHasAnyBlockRefs(record));
if (info == XLOG_XACT_COMMIT_COMPACT)
{
xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
- xact_redo_commit_compact(xlrec, record->xl_xid, lsn);
+ xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr);
}
else if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
- xact_redo_commit(xlrec, record->xl_xid, lsn);
+ xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr);
}
else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
- xact_redo_abort(xlrec, record->xl_xid);
+ xact_redo_abort(xlrec, XLogRecGetXid(record));
}
else if (info == XLOG_XACT_PREPARE)
{
/* the record contents are exactly the 2PC file */
- RecreateTwoPhaseFile(record->xl_xid,
- XLogRecGetData(record), record->xl_len);
+ RecreateTwoPhaseFile(XLogRecGetXid(record),
+ XLogRecGetData(record), XLogRecGetDataLen(record));
}
else if (info == XLOG_XACT_COMMIT_PREPARED)
{
xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
- xact_redo_commit(&xlrec->crec, xlrec->xid, lsn);
+ xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr);
RemoveTwoPhaseFile(xlrec->xid, false);
}
else if (info == XLOG_XACT_ABORT_PREPARED)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 60531277dc6..2059bbeda4a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -757,10 +757,10 @@ static MemoryContext walDebugCxt = NULL;
static void readRecoveryCommandFile(void);
static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo);
-static bool recoveryStopsBefore(XLogRecord *record);
-static bool recoveryStopsAfter(XLogRecord *record);
+static bool recoveryStopsBefore(XLogReaderState *record);
+static bool recoveryStopsAfter(XLogReaderState *record);
static void recoveryPausesHere(void);
-static bool recoveryApplyDelay(XLogRecord *record);
+static bool recoveryApplyDelay(XLogReaderState *record);
static void SetLatestXTime(TimestampTz xtime);
static void SetCurrentChunkStartTime(TimestampTz xtime);
static void CheckRequiredParameterValues(void);
@@ -807,9 +807,9 @@ static char *str_time(pg_time_t tnow);
static bool CheckForStandbyTrigger(void);
#ifdef WAL_DEBUG
-static void xlog_outrec(StringInfo buf, XLogRecord *record);
+static void xlog_outrec(StringInfo buf, XLogReaderState *record);
#endif
-static void xlog_outdesc(StringInfo buf, RmgrId rmid, XLogRecord *record);
+static void xlog_outdesc(StringInfo buf, XLogReaderState *record);
static void pg_start_backup_callback(int code, Datum arg);
static bool read_backup_label(XLogRecPtr *checkPointLoc,
bool *backupEndRequired, bool *backupFromStandby);
@@ -861,7 +861,6 @@ XLogRecPtr
XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecData *rdt;
pg_crc32 rdata_crc;
bool inserted;
XLogRecord *rechdr = (XLogRecord *) rdata->data;
@@ -870,28 +869,13 @@ XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn)
XLogRecPtr StartPos;
XLogRecPtr EndPos;
+ /* we assume that all of the record header is in the first chunk */
+ Assert(rdata->len >= SizeOfXLogRecord);
+
/* cross-check on whether we should be here or not */
if (!XLogInsertAllowed())
elog(ERROR, "cannot make new WAL entries during recovery");
- /*
- * Calculate CRC of the data, including all the backup blocks
- *
- * Note that the record header isn't added into the CRC initially since we
- * don't know the prev-link yet. Thus, the CRC will represent the CRC of
- * the whole record in the order: rdata, then backup blocks, then record
- * header.
- */
- INIT_CRC32C(rdata_crc);
- for (rdt = rdata->next; rdt != NULL; rdt = rdt->next)
- COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
- /*
- * Calculate CRC of the header, except for prev-link, because we don't
- * know it yet. It will be added later.
- */
- COMP_CRC32C(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev));
-
/*----------
*
* We have now done all the preparatory work we can without holding a
@@ -976,10 +960,11 @@ XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn)
if (inserted)
{
/*
- * Now that xl_prev has been filled in, finish CRC calculation of the
- * record header.
+ * Now that xl_prev has been filled in, calculate CRC of the record
+ * header.
*/
- COMP_CRC32C(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr));
+ rdata_crc = rechdr->xl_crc;
+ COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(rdata_crc);
rechdr->xl_crc = rdata_crc;
@@ -1053,34 +1038,47 @@ XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn)
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
{
+ static XLogReaderState *debug_reader = NULL;
StringInfoData buf;
- MemoryContext oldCxt = MemoryContextSwitchTo(walDebugCxt);
+ StringInfoData recordBuf;
+ char *errormsg = NULL;
+ MemoryContext oldCxt;
+
+ oldCxt = MemoryContextSwitchTo(walDebugCxt);
initStringInfo(&buf);
appendStringInfo(&buf, "INSERT @ %X/%X: ",
(uint32) (EndPos >> 32), (uint32) EndPos);
- xlog_outrec(&buf, rechdr);
- if (rdata->data != NULL)
- {
- StringInfoData recordbuf;
- /*
- * We have to piece together the WAL record data from the
- * XLogRecData entries, so that we can pass it to the rm_desc
- * function as one contiguous chunk.
- */
- initStringInfo(&recordbuf);
- appendBinaryStringInfo(&recordbuf, (char *) rechdr, sizeof(XLogRecord));
- for (; rdata != NULL; rdata = rdata->next)
- appendBinaryStringInfo(&recordbuf, rdata->data, rdata->len);
+ /*
+ * We have to piece together the WAL record data from the XLogRecData
+ * entries, so that we can pass it to the rm_desc function as one
+ * contiguous chunk.
+ */
+ initStringInfo(&recordBuf);
+ for (; rdata != NULL; rdata = rdata->next)
+ appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
+
+ if (!debug_reader)
+ debug_reader = XLogReaderAllocate(NULL, NULL);
+ if (!debug_reader ||
+ !DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data,
+ &errormsg))
+ {
+ appendStringInfo(&buf, "error decoding record: %s",
+ errormsg ? errormsg : "no error message");
+ }
+ else
+ {
appendStringInfoString(&buf, " - ");
- xlog_outdesc(&buf, rechdr->xl_rmid, (XLogRecord *) recordbuf.data);
+ xlog_outdesc(&buf, debug_reader);
}
elog(LOG, "%s", buf.data);
+ pfree(buf.data);
+ pfree(recordBuf.data);
MemoryContextSwitchTo(oldCxt);
- MemoryContextReset(walDebugCxt);
}
#endif
@@ -1170,7 +1168,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr)
uint64 startbytepos;
uint64 endbytepos;
uint64 prevbytepos;
- uint32 size = SizeOfXLogRecord;
+ uint32 size = MAXALIGN(SizeOfXLogRecord);
XLogRecPtr ptr;
uint32 segleft;
@@ -1234,9 +1232,6 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
XLogRecPtr CurrPos;
XLogPageHeader pagehdr;
- /* The first chunk is the record header */
- Assert(rdata->len == SizeOfXLogRecord);
-
/*
* Get a pointer to the right place in the right WAL buffer to start
* inserting to.
@@ -1309,9 +1304,6 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
}
Assert(written == write_len);
- /* Align the end position, so that the next record starts aligned */
- CurrPos = MAXALIGN64(CurrPos);
-
/*
* If this was an xlog-switch, it's not enough to write the switch record,
* we also have to consume all the remaining space in the WAL segment. We
@@ -1341,6 +1333,11 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata,
CurrPos += XLOG_BLCKSZ;
}
}
+ else
+ {
+ /* Align the end position, so that the next record starts aligned */
+ CurrPos = MAXALIGN64(CurrPos);
+ }
if (CurrPos != EndPos)
elog(PANIC, "space reserved for WAL record does not match what was written");
@@ -4470,6 +4467,7 @@ BootStrapXLOG(void)
XLogPageHeader page;
XLogLongPageHeader longpage;
XLogRecord *record;
+ char *recptr;
bool use_existent;
uint64 sysidentifier;
struct timeval tv;
@@ -4541,17 +4539,23 @@ BootStrapXLOG(void)
longpage->xlp_xlog_blcksz = XLOG_BLCKSZ;
/* Insert the initial checkpoint record */
- record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD);
+ recptr = ((char *) page + SizeOfXLogLongPHD);
+ record = (XLogRecord *) recptr;
record->xl_prev = 0;
record->xl_xid = InvalidTransactionId;
- record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint);
- record->xl_len = sizeof(checkPoint);
+ record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID;
- memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
+ recptr += SizeOfXLogRecord;
+ /* fill the XLogRecordDataHeaderShort struct */
+ *(recptr++) = XLR_BLOCK_ID_DATA_SHORT;
+ *(recptr++) = sizeof(checkPoint);
+ memcpy(recptr, &checkPoint, sizeof(checkPoint));
+ recptr += sizeof(checkPoint);
+ Assert(recptr - (char *) record == record->xl_tot_len);
INIT_CRC32C(crc);
- COMP_CRC32C(crc, &checkPoint, sizeof(checkPoint));
+ COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(crc);
record->xl_crc = crc;
@@ -4984,36 +4988,37 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo)
* timestamps.
*/
static bool
-getRecordTimestamp(XLogRecord *record, TimestampTz *recordXtime)
+getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
{
- uint8 record_info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ uint8 rmid = XLogRecGetRmid(record);
- if (record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ if (rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
{
*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
return true;
}
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
+ if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
{
*recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
return true;
}
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
+ if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
{
*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
return true;
}
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED)
+ if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED)
{
*recordXtime = ((xl_xact_commit_prepared *) XLogRecGetData(record))->crec.xact_time;
return true;
}
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
+ if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
{
*recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time;
return true;
}
- if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED)
+ if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED)
{
*recordXtime = ((xl_xact_abort_prepared *) XLogRecGetData(record))->arec.xact_time;
return true;
@@ -5030,7 +5035,7 @@ getRecordTimestamp(XLogRecord *record, TimestampTz *recordXtime)
* new timeline's history file.
*/
static bool
-recoveryStopsBefore(XLogRecord *record)
+recoveryStopsBefore(XLogReaderState *record)
{
bool stopsHere = false;
uint8 record_info;
@@ -5052,14 +5057,14 @@ recoveryStopsBefore(XLogRecord *record)
}
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
- if (record->xl_rmid != RM_XACT_ID)
+ if (XLogRecGetRmid(record) != RM_XACT_ID)
return false;
- record_info = record->xl_info & ~XLR_INFO_MASK;
+ record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
{
isCommit = true;
- recordXid = record->xl_xid;
+ recordXid = XLogRecGetXid(record);
}
else if (record_info == XLOG_XACT_COMMIT_PREPARED)
{
@@ -5069,7 +5074,7 @@ recoveryStopsBefore(XLogRecord *record)
else if (record_info == XLOG_XACT_ABORT)
{
isCommit = false;
- recordXid = record->xl_xid;
+ recordXid = XLogRecGetXid(record);
}
else if (record_info == XLOG_XACT_ABORT_PREPARED)
{
@@ -5140,19 +5145,21 @@ recoveryStopsBefore(XLogRecord *record)
* record in XLogCtl->recoveryLastXTime.
*/
static bool
-recoveryStopsAfter(XLogRecord *record)
+recoveryStopsAfter(XLogReaderState *record)
{
uint8 record_info;
+ uint8 rmid;
TimestampTz recordXtime;
- record_info = record->xl_info & ~XLR_INFO_MASK;
+ record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ rmid = XLogRecGetRmid(record);
/*
* There can be many restore points that share the same name; we stop at
* the first one.
*/
if (recoveryTarget == RECOVERY_TARGET_NAME &&
- record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
+ rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT)
{
xl_restore_point *recordRestorePointData;
@@ -5173,7 +5180,7 @@ recoveryStopsAfter(XLogRecord *record)
}
}
- if (record->xl_rmid == RM_XACT_ID &&
+ if (rmid == RM_XACT_ID &&
(record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED ||
@@ -5192,7 +5199,7 @@ recoveryStopsAfter(XLogRecord *record)
else if (record_info == XLOG_XACT_ABORT_PREPARED)
recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid;
else
- recordXid = record->xl_xid;
+ recordXid = XLogRecGetXid(record);
/*
* There can be only one transaction end record with this exact
@@ -5307,7 +5314,7 @@ SetRecoveryPause(bool recoveryPause)
* usability.
*/
static bool
-recoveryApplyDelay(XLogRecord *record)
+recoveryApplyDelay(XLogReaderState *record)
{
uint8 record_info;
TimestampTz xtime;
@@ -5326,8 +5333,8 @@ recoveryApplyDelay(XLogRecord *record)
* so there is already opportunity for issues caused by early conflicts on
* standbys.
*/
- record_info = record->xl_info & ~XLR_INFO_MASK;
- if (!(record->xl_rmid == RM_XACT_ID &&
+ record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ if (!(XLogRecGetRmid(record) == RM_XACT_ID &&
(record_info == XLOG_XACT_COMMIT_COMPACT ||
record_info == XLOG_XACT_COMMIT ||
record_info == XLOG_XACT_COMMIT_PREPARED)))
@@ -5696,7 +5703,7 @@ StartupXLOG(void)
record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true);
if (record != NULL)
{
- memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+ memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
ereport(DEBUG1,
(errmsg("checkpoint record is at %X/%X",
@@ -5793,7 +5800,7 @@ StartupXLOG(void)
ereport(PANIC,
(errmsg("could not locate a valid checkpoint record")));
}
- memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+ memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
}
@@ -6230,9 +6237,9 @@ StartupXLOG(void)
appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ",
(uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr,
(uint32) (EndRecPtr >> 32), (uint32) EndRecPtr);
- xlog_outrec(&buf, record);
+ xlog_outrec(&buf, xlogreader);
appendStringInfoString(&buf, " - ");
- xlog_outdesc(&buf, record->xl_rmid, record);
+ xlog_outdesc(&buf, xlogreader);
elog(LOG, "%s", buf.data);
pfree(buf.data);
}
@@ -6260,7 +6267,7 @@ StartupXLOG(void)
/*
* Have we reached our recovery target?
*/
- if (recoveryStopsBefore(record))
+ if (recoveryStopsBefore(xlogreader))
{
reachedStopPoint = true; /* see below */
break;
@@ -6270,7 +6277,7 @@ StartupXLOG(void)
* If we've been asked to lag the master, wait on latch until
* enough time has passed.
*/
- if (recoveryApplyDelay(record))
+ if (recoveryApplyDelay(xlogreader))
{
/*
* We test for paused recovery again here. If user sets
@@ -6285,7 +6292,7 @@ StartupXLOG(void)
/* Setup error traceback support for ereport() */
errcallback.callback = rm_redo_error_callback;
- errcallback.arg = (void *) record;
+ errcallback.arg = (void *) xlogreader;
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
@@ -6324,7 +6331,7 @@ StartupXLOG(void)
{
CheckPoint checkPoint;
- memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+ memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint));
newTLI = checkPoint.ThisTimeLineID;
prevTLI = checkPoint.PrevTimeLineID;
}
@@ -6332,7 +6339,7 @@ StartupXLOG(void)
{
xl_end_of_recovery xlrec;
- memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery));
+ memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery));
newTLI = xlrec.ThisTimeLineID;
prevTLI = xlrec.PrevTimeLineID;
}
@@ -6366,7 +6373,7 @@ StartupXLOG(void)
RecordKnownAssignedTransactionIds(record->xl_xid);
/* Now apply the WAL record itself */
- RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
+ RmgrTable[record->xl_rmid].rm_redo(xlogreader);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
@@ -6394,7 +6401,7 @@ StartupXLOG(void)
WalSndWakeup();
/* Exit loop if we reached inclusive recovery target */
- if (recoveryStopsAfter(record))
+ if (recoveryStopsAfter(xlogreader))
{
reachedStopPoint = true;
break;
@@ -7148,8 +7155,7 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr,
}
return NULL;
}
- if (record->xl_len != sizeof(CheckPoint) ||
- record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint))
+ if (record->xl_tot_len != SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint))
{
switch (whichChkpt)
{
@@ -7194,6 +7200,9 @@ InitXLOGAccess(void)
(void) GetRedoRecPtr();
/* Also update our copy of doPageWrites. */
doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);
+
+ /* Also initialize the working areas for constructing WAL records */
+ InitXLogInsert();
}
/*
@@ -7490,7 +7499,6 @@ CreateCheckPoint(int flags)
CheckPoint checkPoint;
XLogRecPtr recptr;
XLogCtlInsert *Insert = &XLogCtl->Insert;
- XLogRecData rdata;
uint32 freespace;
XLogSegNo _logSegNo;
XLogRecPtr curInsert;
@@ -7760,15 +7768,11 @@ CreateCheckPoint(int flags)
/*
* Now insert the checkpoint record into XLOG.
*/
- rdata.data = (char *) (&checkPoint);
- rdata.len = sizeof(checkPoint);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
-
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&checkPoint), sizeof(checkPoint));
recptr = XLogInsert(RM_XLOG_ID,
shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
- XLOG_CHECKPOINT_ONLINE,
- &rdata);
+ XLOG_CHECKPOINT_ONLINE);
XLogFlush(recptr);
@@ -7908,7 +7912,6 @@ static void
CreateEndOfRecoveryRecord(void)
{
xl_end_of_recovery xlrec;
- XLogRecData rdata;
XLogRecPtr recptr;
/* sanity check */
@@ -7926,12 +7929,9 @@ CreateEndOfRecoveryRecord(void)
START_CRIT_SECTION();
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof(xl_end_of_recovery);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
-
- recptr = XLogInsert(RM_XLOG_ID, XLOG_END_OF_RECOVERY, &rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xl_end_of_recovery));
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_END_OF_RECOVERY);
XLogFlush(recptr);
@@ -8307,13 +8307,9 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo)
void
XLogPutNextOid(Oid nextOid)
{
- XLogRecData rdata;
-
- rdata.data = (char *) (&nextOid);
- rdata.len = sizeof(Oid);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
- (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&nextOid), sizeof(Oid));
+ (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID);
/*
* We need not flush the NEXTOID record immediately, because any of the
@@ -8349,15 +8345,10 @@ XLogRecPtr
RequestXLogSwitch(void)
{
XLogRecPtr RecPtr;
- XLogRecData rdata;
-
- /* XLOG SWITCH, alone among xlog record types, has no data */
- rdata.buffer = InvalidBuffer;
- rdata.data = NULL;
- rdata.len = 0;
- rdata.next = NULL;
- RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
+ /* XLOG SWITCH has no data */
+ XLogBeginInsert();
+ RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH);
return RecPtr;
}
@@ -8369,18 +8360,15 @@ XLogRecPtr
XLogRestorePoint(const char *rpName)
{
XLogRecPtr RecPtr;
- XLogRecData rdata;
xl_restore_point xlrec;
xlrec.rp_time = GetCurrentTimestamp();
strlcpy(xlrec.rp_name, rpName, MAXFNAMELEN);
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof(xl_restore_point);
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xl_restore_point));
- RecPtr = XLogInsert(RM_XLOG_ID, XLOG_RESTORE_POINT, &rdata);
+ RecPtr = XLogInsert(RM_XLOG_ID, XLOG_RESTORE_POINT);
ereport(LOG,
(errmsg("restore point \"%s\" created at %X/%X",
@@ -8412,7 +8400,6 @@ XLogReportParameters(void)
*/
if (wal_level != ControlFile->wal_level || XLogIsNeeded())
{
- XLogRecData rdata;
xl_parameter_change xlrec;
XLogRecPtr recptr;
@@ -8423,12 +8410,10 @@ XLogReportParameters(void)
xlrec.wal_level = wal_level;
xlrec.wal_log_hints = wal_log_hints;
- rdata.buffer = InvalidBuffer;
- rdata.data = (char *) &xlrec;
- rdata.len = sizeof(xlrec);
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) &xlrec, sizeof(xlrec));
- recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE, &rdata);
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE);
XLogFlush(recptr);
}
@@ -8486,14 +8471,10 @@ UpdateFullPageWrites(void)
*/
if (XLogStandbyInfoActive() && !RecoveryInProgress())
{
- XLogRecData rdata;
-
- rdata.data = (char *) (&fullPageWrites);
- rdata.len = sizeof(bool);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&fullPageWrites), sizeof(bool));
- XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE, &rdata);
+ XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE);
}
if (!fullPageWrites)
@@ -8558,12 +8539,13 @@ checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI)
* not all record types are related to control file updates.
*/
void
-xlog_redo(XLogRecPtr lsn, XLogRecord *record)
+xlog_redo(XLogReaderState *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+ XLogRecPtr lsn = record->EndRecPtr;
- /* Backup blocks are not used by XLOG rmgr */
- Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+ /* in XLOG rmgr, backup blocks are only used by XLOG_FPI records */
+ Assert(!XLogRecHasAnyBlockRefs(record) || info == XLOG_FPI);
if (info == XLOG_NEXTOID)
{
@@ -8750,14 +8732,12 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
}
else if (info == XLOG_FPI)
{
- char *data;
- BkpBlock bkpb;
+ Buffer buffer;
/*
- * Full-page image (FPI) records contain a backup block stored
- * "inline" in the normal data since the locking when writing hint
- * records isn't sufficient to use the normal backup block mechanism,
- * which assumes exclusive lock on the buffer supplied.
+ * Full-page image (FPI) records contain nothing else but a backup
+ * block. The block reference must include a full-page image -
+ * otherwise there would be no point in this record.
*
* Since the only change in these backup block are hint bits, there
* are no recovery conflicts generated.
@@ -8766,11 +8746,9 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
* smgr implementation has no need to implement anything. Which means
* nothing is needed in md.c etc
*/
- data = XLogRecGetData(record);
- memcpy(&bkpb, data, sizeof(BkpBlock));
- data += sizeof(BkpBlock);
-
- RestoreBackupBlockContents(lsn, bkpb, data, false, false);
+ if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED)
+ elog(ERROR, "unexpected XLogReadBufferForRedo result when restoring backup block");
+ UnlockReleaseBuffer(buffer);
}
else if (info == XLOG_BACKUP_END)
{
@@ -8867,22 +8845,42 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
#ifdef WAL_DEBUG
static void
-xlog_outrec(StringInfo buf, XLogRecord *record)
+xlog_outrec(StringInfo buf, XLogReaderState *record)
{
- int i;
+ int block_id;
appendStringInfo(buf, "prev %X/%X; xid %u",
- (uint32) (record->xl_prev >> 32),
- (uint32) record->xl_prev,
- record->xl_xid);
+ (uint32) (XLogRecGetPrev(record) >> 32),
+ (uint32) XLogRecGetPrev(record),
+ XLogRecGetXid(record));
appendStringInfo(buf, "; len %u",
- record->xl_len);
+ XLogRecGetDataLen(record));
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+ /* decode block references */
+ for (block_id = 0; block_id <= record->max_block_id; block_id++)
{
- if (record->xl_info & XLR_BKP_BLOCK(i))
- appendStringInfo(buf, "; bkpb%d", i);
+ RelFileNode rnode;
+ ForkNumber forknum;
+ BlockNumber blk;
+
+ if (!XLogRecHasBlockRef(record, block_id))
+ continue;
+
+ XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blk);
+ if (forknum != MAIN_FORKNUM)
+ appendStringInfo(buf, "; blkref #%u: rel %u/%u/%u, fork %u, blk %u",
+ block_id,
+ rnode.spcNode, rnode.dbNode, rnode.relNode,
+ forknum,
+ blk);
+ else
+ appendStringInfo(buf, "; blkref #%u: rel %u/%u/%u, blk %u",
+ block_id,
+ rnode.spcNode, rnode.dbNode, rnode.relNode,
+ blk);
+ if (XLogRecHasBlockImage(record, block_id))
+ appendStringInfo(buf, " FPW");
}
}
#endif /* WAL_DEBUG */
@@ -8892,17 +8890,18 @@ xlog_outrec(StringInfo buf, XLogRecord *record)
* optionally followed by a colon, a space, and a further description.
*/
static void
-xlog_outdesc(StringInfo buf, RmgrId rmid, XLogRecord *record)
+xlog_outdesc(StringInfo buf, XLogReaderState *record)
{
+ RmgrId rmid = XLogRecGetRmid(record);
+ uint8 info = XLogRecGetInfo(record);
const char *id;
appendStringInfoString(buf, RmgrTable[rmid].rm_name);
appendStringInfoChar(buf, '/');
- id = RmgrTable[rmid].rm_identify(record->xl_info);
+ id = RmgrTable[rmid].rm_identify(info);
if (id == NULL)
- appendStringInfo(buf, "UNKNOWN (%X): ",
- record->xl_info & ~XLR_INFO_MASK);
+ appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
else
appendStringInfo(buf, "%s: ", id);
@@ -9411,7 +9410,6 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
XLogRecPtr startpoint;
XLogRecPtr stoppoint;
TimeLineID stoptli;
- XLogRecData rdata;
pg_time_t stamp_time;
char strfbuf[128];
char histfilepath[MAXPGPATH];
@@ -9618,11 +9616,9 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p)
/*
* Write the backup-end xlog record
*/
- rdata.data = (char *) (&startpoint);
- rdata.len = sizeof(startpoint);
- rdata.buffer = InvalidBuffer;
- rdata.next = NULL;
- stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata);
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&startpoint), sizeof(startpoint));
+ stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END);
stoptli = ThisTimeLineID;
/*
@@ -9930,15 +9926,13 @@ read_backup_label(XLogRecPtr *checkPointLoc, bool *backupEndRequired,
static void
rm_redo_error_callback(void *arg)
{
- XLogRecord *record = (XLogRecord *) arg;
+ XLogReaderState *record = (XLogReaderState *) arg;
StringInfoData buf;
initStringInfo(&buf);
- xlog_outdesc(&buf, record->xl_rmid, record);
+ xlog_outdesc(&buf, record);
- /* don't bother emitting empty description */
- if (buf.len > 0)
- errcontext("xlog redo %s", buf.data);
+ errcontext("xlog redo %s", buf.data);
pfree(buf.data);
}
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index b83343bf5bd..89c407e521b 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -3,6 +3,12 @@
* xloginsert.c
* Functions for constructing WAL records
*
+ * Constructing a WAL record begins with a call to XLogBeginInsert,
+ * followed by a number of XLogRegister* calls. The registered data is
+ * collected in private working memory, and finally assembled into a chain
+ * of XLogRecData structs by a call to XLogRecordAssemble(). See
+ * access/transam/README for details.
+ *
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
@@ -24,39 +30,366 @@
#include "utils/memutils.h"
#include "pg_trace.h"
+/*
+ * For each block reference registered with XLogRegisterBuffer, we fill in
+ * a registered_buffer struct.
+ */
+typedef struct
+{
+ bool in_use; /* is this slot in use? */
+ uint8 flags; /* REGBUF_* flags */
+ RelFileNode rnode; /* identifies the relation and block */
+ ForkNumber forkno;
+ BlockNumber block;
+ Page page; /* page content */
+ uint32 rdata_len; /* total length of data in rdata chain */
+ XLogRecData *rdata_head; /* head of the chain of data registered with
+ * this block */
+ XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if
+ * empty */
+
+ XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to
+ * backup block data in XLogRecordAssemble() */
+} registered_buffer;
+
+static registered_buffer *registered_buffers;
+static int max_registered_buffers; /* allocated size */
+static int max_registered_block_id = 0; /* highest block_id + 1
+ * currently registered */
+
+/*
+ * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
+ * with XLogRegisterData(...).
+ */
+static XLogRecData *mainrdata_head;
+static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
+static uint32 mainrdata_len; /* total # of bytes in chain */
+
+/*
+ * These are used to hold the record header while constructing a record.
+ * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
+ * because we want it to be MAXALIGNed and padding bytes zeroed.
+ *
+ * For simplicity, it's allocated large enough to hold the headers for any
+ * WAL record.
+ */
+static XLogRecData hdr_rdt;
+static char *hdr_scratch = NULL;
+
+#define HEADER_SCRATCH_SIZE \
+ (SizeOfXLogRecord + \
+ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
+ SizeOfXLogRecordDataHeaderLong)
+
+/*
+ * An array of XLogRecData structs, to hold registered data.
+ */
+static XLogRecData *rdatas;
+static int num_rdatas; /* entries currently used */
+static int max_rdatas; /* allocated size */
+
+static bool begininsert_called = false;
+
+/* Memory context to hold the registered buffer and data references. */
+static MemoryContext xloginsert_cxt;
+
static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
- XLogRecData *rdata,
XLogRecPtr RedoRecPtr, bool doPageWrites,
- XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal);
-static void XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb);
+ XLogRecPtr *fpw_lsn);
+
+/*
+ * Begin constructing a WAL record. This must be called before the
+ * XLogRegister* functions and XLogInsert().
+ */
+void
+XLogBeginInsert(void)
+{
+ Assert(max_registered_block_id == 0);
+ Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
+ Assert(mainrdata_len == 0);
+ Assert(!begininsert_called);
+
+ /* cross-check on whether we should be here or not */
+ if (!XLogInsertAllowed())
+ elog(ERROR, "cannot make new WAL entries during recovery");
+
+ begininsert_called = true;
+}
/*
- * Insert an XLOG record having the specified RMID and info bytes,
- * with the body of the record being the data chunk(s) described by
- * the rdata chain (see xloginsert.h for notes about rdata).
+ * Ensure that there are enough buffer and data slots in the working area,
+ * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
+ * calls.
+ *
+ * There is always space for a small number of buffers and data chunks, enough
+ * for most record types. This function is for the exceptional cases that need
+ * more.
+ */
+void
+XLogEnsureRecordSpace(int max_block_id, int ndatas)
+{
+ int nbuffers;
+
+ /*
+ * This must be called before entering a critical section, because
+ * allocating memory inside a critical section can fail. repalloc() will
+ * check the same, but better to check it here too so that we fail
+ * consistently even if the arrays happen to be large enough already.
+ */
+ Assert(CritSectionCount == 0);
+
+ /* the minimum values can't be decreased */
+ if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
+ max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
+ if (ndatas < XLR_NORMAL_RDATAS)
+ ndatas = XLR_NORMAL_RDATAS;
+
+ if (max_block_id > XLR_MAX_BLOCK_ID)
+ elog(ERROR, "maximum number of WAL record block references exceeded");
+ nbuffers = max_block_id + 1;
+
+ if (nbuffers > max_registered_buffers)
+ {
+ registered_buffers = (registered_buffer *)
+ repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
+
+ /*
+ * At least the padding bytes in the structs must be zeroed, because
+ * they are included in WAL data, but initialize it all for tidiness.
+ */
+ MemSet(&registered_buffers[max_registered_buffers], 0,
+ (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
+ max_registered_buffers = nbuffers;
+ }
+
+ if (ndatas > max_rdatas)
+ {
+ rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
+ max_rdatas = ndatas;
+ }
+}
+
+/*
+ * Reset WAL record construction buffers.
+ */
+void
+XLogResetInsertion(void)
+{
+ int i;
+
+ for (i = 0; i < max_registered_block_id; i++)
+ registered_buffers[i].in_use = false;
+
+ num_rdatas = 0;
+ max_registered_block_id = 0;
+ mainrdata_len = 0;
+ mainrdata_last = (XLogRecData *) &mainrdata_head;
+ begininsert_called = false;
+}
+
+/*
+ * Register a reference to a buffer with the WAL record being constructed.
+ * This must be called for every page that the WAL-logged operation modifies.
+ */
+void
+XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
+{
+ registered_buffer *regbuf;
+
+ /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
+ Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
+ Assert(begininsert_called);
+
+ if (block_id >= max_registered_block_id)
+ {
+ if (block_id >= max_registered_buffers)
+ elog(ERROR, "too many registered buffers");
+ max_registered_block_id = block_id + 1;
+ }
+
+ regbuf = &registered_buffers[block_id];
+
+ BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
+ regbuf->page = BufferGetPage(buffer);
+ regbuf->flags = flags;
+ regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
+ regbuf->rdata_len = 0;
+
+ /*
+ * Check that this page hasn't already been registered with some other
+ * block_id.
+ */
+#ifdef USE_ASSERT_CHECKING
+ {
+ int i;
+
+ for (i = 0; i < max_registered_block_id; i++)
+ {
+ registered_buffer *regbuf_old = &registered_buffers[i];
+
+ if (i == block_id || !regbuf_old->in_use)
+ continue;
+
+ Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
+ regbuf_old->forkno != regbuf->forkno ||
+ regbuf_old->block != regbuf->block);
+ }
+ }
+#endif
+
+ regbuf->in_use = true;
+}
+
+/*
+ * Like XLogRegisterBuffer, but for registering a block that's not in the
+ * shared buffer pool (i.e. when you don't have a Buffer for it).
+ */
+void
+XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
+ BlockNumber blknum, Page page, uint8 flags)
+{
+ registered_buffer *regbuf;
+
+ /* This is currently only used to WAL-log a full-page image of a page */
+ Assert(flags & REGBUF_FORCE_IMAGE);
+ Assert(begininsert_called);
+
+ if (block_id >= max_registered_block_id)
+ max_registered_block_id = block_id + 1;
+
+ if (block_id >= max_registered_buffers)
+ elog(ERROR, "too many registered buffers");
+
+ regbuf = &registered_buffers[block_id];
+
+ regbuf->rnode = *rnode;
+ regbuf->forkno = forknum;
+ regbuf->block = blknum;
+ regbuf->page = page;
+ regbuf->flags = flags;
+ regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
+ regbuf->rdata_len = 0;
+
+ /*
+ * Check that this page hasn't already been registered with some other
+ * block_id.
+ */
+#ifdef USE_ASSERT_CHECKING
+ {
+ int i;
+
+ for (i = 0; i < max_registered_block_id; i++)
+ {
+ registered_buffer *regbuf_old = &registered_buffers[i];
+
+ if (i == block_id || !regbuf_old->in_use)
+ continue;
+
+ Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
+ regbuf_old->forkno != regbuf->forkno ||
+ regbuf_old->block != regbuf->block);
+ }
+ }
+#endif
+
+ regbuf->in_use = true;
+}
+
+/*
+ * Add data to the WAL record that's being constructed.
+ *
+ * The data is appended to the "main chunk", available at replay with
+ * XLogGetRecData().
+ */
+void
+XLogRegisterData(char *data, int len)
+{
+ XLogRecData *rdata;
+
+ Assert(begininsert_called);
+
+ if (num_rdatas >= max_rdatas)
+ elog(ERROR, "too much WAL data");
+ rdata = &rdatas[num_rdatas++];
+
+ rdata->data = data;
+ rdata->len = len;
+
+ /*
+ * we use the mainrdata_last pointer to track the end of the chain, so no
+ * need to clear 'next' here.
+ */
+
+ mainrdata_last->next = rdata;
+ mainrdata_last = rdata;
+
+ mainrdata_len += len;
+}
+
+/*
+ * Add buffer-specific data to the WAL record that's being constructed.
+ *
+ * Block_id must reference a block previously registered with
+ * XLogRegisterBuffer(). If this is called more than once for the same
+ * block_id, the data is appended.
+ *
+ * The maximum amount of data that can be registered per block is 65535
+ * bytes. That should be plenty; if you need more than BLCKSZ bytes to
+ * reconstruct the changes to the page, you might as well just log a full
+ * copy of it. (the "main data" that's not associated with a block is not
+ * limited)
+ */
+void
+XLogRegisterBufData(uint8 block_id, char *data, int len)
+{
+ registered_buffer *regbuf;
+ XLogRecData *rdata;
+
+ Assert(begininsert_called);
+
+ /* find the registered buffer struct */
+ regbuf = &registered_buffers[block_id];
+ if (!regbuf->in_use)
+ elog(ERROR, "no block with id %d registered with WAL insertion",
+ block_id);
+
+ if (num_rdatas >= max_rdatas)
+ elog(ERROR, "too much WAL data");
+ rdata = &rdatas[num_rdatas++];
+
+ rdata->data = data;
+ rdata->len = len;
+
+ regbuf->rdata_tail->next = rdata;
+ regbuf->rdata_tail = rdata;
+ regbuf->rdata_len += len;
+}
+
+/*
+ * Insert an XLOG record having the specified RMID and info bytes, with the
+ * body of the record being the data and buffer references registered earlier
+ * with XLogRegister* calls.
*
* Returns XLOG pointer to end of record (beginning of next record).
* This can be used as LSN for data pages affected by the logged action.
* (LSN is the XLOG point up to which the XLOG must be flushed to disk
* before the data page can be written out. This implements the basic
* WAL rule "write the log before the data".)
- *
- * NB: this routine feels free to scribble on the XLogRecData structs,
- * though not on the data they reference. This is OK since the XLogRecData
- * structs are always just temporaries in the calling code.
*/
XLogRecPtr
-XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
+XLogInsert(RmgrId rmid, uint8 info)
{
- XLogRecPtr RedoRecPtr;
- bool doPageWrites;
XLogRecPtr EndPos;
- XLogRecPtr fpw_lsn;
- XLogRecData *rdt;
- XLogRecData *rdt_lastnormal;
- /* info's high bits are reserved for use by me */
- if (info & XLR_INFO_MASK)
+ /* XLogBeginInsert() must have been called. */
+ if (!begininsert_called)
+ elog(ERROR, "XLogBeginInsert was not called");
+
+ /*
+ * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
+ * reserved for use by me.
+ */
+ if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
elog(PANIC, "invalid xlog info mask %02X", info);
TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
@@ -67,292 +400,282 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
*/
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
+ XLogResetInsertion();
EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
return EndPos;
}
- /*
- * Get values needed to decide whether to do full-page writes. Since we
- * don't yet have an insertion lock, these could change under us, but
- * XLogInsertRecord will recheck them once it has a lock.
- */
- GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
-
- /*
- * Assemble an XLogRecData chain representing the WAL record, including
- * any backup blocks needed.
- *
- * We may have to loop back to here if a race condition is detected in
- * XLogInsertRecord. We could prevent the race by doing all this work
- * while holding an insertion lock, but it seems better to avoid doing CRC
- * calculations while holding one.
- */
-retry:
- rdt = XLogRecordAssemble(rmid, info, rdata, RedoRecPtr, doPageWrites,
- &fpw_lsn, &rdt_lastnormal);
-
- EndPos = XLogInsertRecord(rdt, fpw_lsn);
-
- if (EndPos == InvalidXLogRecPtr)
+ do
{
+ XLogRecPtr RedoRecPtr;
+ bool doPageWrites;
+ XLogRecPtr fpw_lsn;
+ XLogRecData *rdt;
+
/*
- * Undo the changes we made to the rdata chain, and retry.
- *
- * XXX: This doesn't undo *all* the changes; the XLogRecData
- * entries for buffers that we had already decided to back up have
- * had their data-pointers cleared. That's OK, as long as we
- * decide to back them up on the next iteration as well. Hence,
- * don't allow "doPageWrites" value to go from true to false after
- * we've modified the rdata chain.
+ * Get values needed to decide whether to do full-page writes. Since
+ * we don't yet have an insertion lock, these could change under us,
+ * but XLogInsertRecData will recheck them once it has a lock.
*/
- bool newDoPageWrites;
+ GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
- GetFullPageWriteInfo(&RedoRecPtr, &newDoPageWrites);
- doPageWrites = doPageWrites || newDoPageWrites;
- rdt_lastnormal->next = NULL;
+ rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
+ &fpw_lsn);
- goto retry;
- }
+ EndPos = XLogInsertRecord(rdt, fpw_lsn);
+ } while (EndPos == InvalidXLogRecPtr);
+
+ XLogResetInsertion();
return EndPos;
}
/*
- * Assemble a full WAL record, including backup blocks, from an XLogRecData
- * chain, ready for insertion with XLogInsertRecord(). The record header
- * fields are filled in, except for the xl_prev field and CRC.
+ * Assemble a WAL record from the registered data and buffers into an
+ * XLogRecData chain, ready for insertion with XLogInsertRecord().
*
- * The rdata chain is modified, adding entries for full-page images.
- * *rdt_lastnormal is set to point to the last normal (ie. not added by
- * this function) entry. It can be used to reset the chain to its original
- * state.
+ * The record header fields are filled in, except for the xl_prev field. The
+ * calculated CRC does not include xl_prev either.
*
- * If the rdata chain contains any buffer references, and a full-page image
- * was not taken of all the buffers, *fpw_lsn is set to the lowest LSN among
- * such pages. This signals that the assembled record is only good for
- * insertion on the assumption that the RedoRecPtr and doPageWrites values
- * were up-to-date.
+ * If there are any registered buffers, and a full-page image was not taken
+ * of all them, *page_writes_omitted is set to true. This signals that the
+ * assembled record is only good for insertion on the assumption that the
+ * RedoRecPtr and doPageWrites values were up-to-date.
*/
static XLogRecData *
-XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecData *rdata,
+XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
- XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal)
+ XLogRecPtr *fpw_lsn)
{
- bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
XLogRecData *rdt;
- Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
- bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
- uint32 len,
- total_len;
- unsigned i;
+ uint32 total_len = 0;
+ int block_id;
+ pg_crc32 rdata_crc;
+ registered_buffer *prev_regbuf = NULL;
+ XLogRecData *rdt_datas_last;
+ XLogRecord *rechdr;
+ char *scratch = hdr_scratch;
/*
- * These need to be static because they are returned to the caller as part
- * of the XLogRecData chain.
+ * Note: this function can be called multiple times for the same record.
+ * All the modifications we do to the rdata chains below must handle that.
*/
- static BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
- static XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
- static XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
- static XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
- static XLogRecData hdr_rdt;
- static XLogRecord *rechdr;
-
- if (rechdr == NULL)
- {
- static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF];
- rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf);
- MemSet(rechdr, 0, SizeOfXLogRecord);
- }
+ /* The record begins with the fixed-size header */
+ rechdr = (XLogRecord *) scratch;
+ scratch += SizeOfXLogRecord;
- /* The record begins with the header */
- hdr_rdt.data = (char *) rechdr;
- hdr_rdt.len = SizeOfXLogRecord;
- hdr_rdt.next = rdata;
- total_len = SizeOfXLogRecord;
+ hdr_rdt.next = NULL;
+ rdt_datas_last = &hdr_rdt;
+ hdr_rdt.data = hdr_scratch;
/*
- * Here we scan the rdata chain, to determine which buffers must be backed
- * up.
- *
- * We add entries for backup blocks to the chain, so that they don't need
- * any special treatment in the critical section where the chunks are
- * copied into the WAL buffers. Those entries have to be unlinked from the
- * chain if we have to loop back here.
+ * Make an rdata chain containing all the data portions of all block
+ * references. This includes the data for full-page images. Also append
+ * the headers for the block references in the scratch buffer.
*/
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- dtbuf[i] = InvalidBuffer;
- dtbuf_bkp[i] = false;
- }
-
*fpw_lsn = InvalidXLogRecPtr;
- len = 0;
- for (rdt = rdata;;)
+ for (block_id = 0; block_id < max_registered_block_id; block_id++)
{
- if (rdt->buffer == InvalidBuffer)
+ registered_buffer *regbuf = &registered_buffers[block_id];
+ bool needs_backup;
+ bool needs_data;
+ XLogRecordBlockHeader bkpb;
+ XLogRecordBlockImageHeader bimg;
+ bool samerel;
+
+ if (!regbuf->in_use)
+ continue;
+
+ /* Determine if this block needs to be backed up */
+ if (regbuf->flags & REGBUF_FORCE_IMAGE)
+ needs_backup = true;
+ else if (regbuf->flags & REGBUF_NO_IMAGE)
+ needs_backup = false;
+ else if (!doPageWrites)
+ needs_backup = false;
+ else
{
- /* Simple data, just include it */
- len += rdt->len;
+ /*
+ * We assume page LSN is first data on *every* page that can be
+ * passed to XLogInsert, whether it has the standard page layout
+ * or not.
+ */
+ XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
+
+ needs_backup = (page_lsn <= RedoRecPtr);
+ if (!needs_backup)
+ {
+ if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
+ *fpw_lsn = page_lsn;
+ }
}
+
+ /* Determine if the buffer data needs to included */
+ if (regbuf->rdata_len == 0)
+ needs_data = false;
+ else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
+ needs_data = true;
else
+ needs_data = !needs_backup;
+
+ bkpb.id = block_id;
+ bkpb.fork_flags = regbuf->forkno;
+ bkpb.data_length = 0;
+
+ if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
+ bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
+
+ if (needs_backup)
{
- /* Find info for buffer */
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+ Page page = regbuf->page;
+
+ /*
+ * The page needs to be backed up, so set up *bimg
+ */
+ if (regbuf->flags & REGBUF_STANDARD)
{
- if (rdt->buffer == dtbuf[i])
+ /* Assume we can omit data between pd_lower and pd_upper */
+ uint16 lower = ((PageHeader) page)->pd_lower;
+ uint16 upper = ((PageHeader) page)->pd_upper;
+
+ if (lower >= SizeOfPageHeaderData &&
+ upper > lower &&
+ upper <= BLCKSZ)
{
- /* Buffer already referenced by earlier chain item */
- if (dtbuf_bkp[i])
- {
- rdt->data = NULL;
- rdt->len = 0;
- }
- else if (rdt->data)
- len += rdt->len;
- break;
+ bimg.hole_offset = lower;
+ bimg.hole_length = upper - lower;
}
- if (dtbuf[i] == InvalidBuffer)
+ else
{
- /* OK, put it in this slot */
- XLogRecPtr page_lsn;
- bool needs_backup;
-
- dtbuf[i] = rdt->buffer;
-
- /*
- * Determine whether the buffer has to be backed up.
- *
- * We assume page LSN is first data on *every* page that
- * can be passed to XLogInsert, whether it has the
- * standard page layout or not. We don't need to take the
- * buffer header lock for PageGetLSN because we hold an
- * exclusive lock on the page and/or the relation.
- */
- page_lsn = PageGetLSN(BufferGetPage(rdt->buffer));
- if (!doPageWrites)
- needs_backup = false;
- else if (page_lsn <= RedoRecPtr)
- needs_backup = true;
- else
- needs_backup = false;
-
- if (needs_backup)
- {
- /*
- * The page needs to be backed up, so set up BkpBlock
- */
- XLogFillBkpBlock(rdt->buffer, rdt->buffer_std,
- &(dtbuf_xlg[i]));
- dtbuf_bkp[i] = true;
- rdt->data = NULL;
- rdt->len = 0;
- }
- else
- {
- if (rdt->data)
- len += rdt->len;
- if (*fpw_lsn == InvalidXLogRecPtr ||
- page_lsn < *fpw_lsn)
- {
- *fpw_lsn = page_lsn;
- }
- }
- break;
+ /* No "hole" to compress out */
+ bimg.hole_offset = 0;
+ bimg.hole_length = 0;
}
}
- if (i >= XLR_MAX_BKP_BLOCKS)
- elog(PANIC, "can backup at most %d blocks per xlog record",
- XLR_MAX_BKP_BLOCKS);
- }
- /* Break out of loop when rdt points to last chain item */
- if (rdt->next == NULL)
- break;
- rdt = rdt->next;
- }
- total_len += len;
+ else
+ {
+ /* Not a standard page header, don't try to eliminate "hole" */
+ bimg.hole_offset = 0;
+ bimg.hole_length = 0;
+ }
- /*
- * Make additional rdata chain entries for the backup blocks, so that we
- * don't need to special-case them in the write loop. This modifies the
- * original rdata chain, but we keep a pointer to the last regular entry,
- * rdt_lastnormal, so that we can undo this if we have to start over.
- *
- * At the exit of this loop, total_len includes the backup block data.
- *
- * Also set the appropriate info bits to show which buffers were backed
- * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer
- * value (ignoring InvalidBuffer) appearing in the rdata chain.
- */
- *rdt_lastnormal = rdt;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- BkpBlock *bkpb;
- char *page;
+ /* Fill in the remaining fields in the XLogRecordBlockData struct */
+ bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
- if (!dtbuf_bkp[i])
- continue;
+ total_len += BLCKSZ - bimg.hole_length;
+
+ /*
+ * Construct XLogRecData entries for the page content.
+ */
+ rdt_datas_last->next = &regbuf->bkp_rdatas[0];
+ rdt_datas_last = rdt_datas_last->next;
+ if (bimg.hole_length == 0)
+ {
+ rdt_datas_last->data = page;
+ rdt_datas_last->len = BLCKSZ;
+ }
+ else
+ {
+ /* must skip the hole */
+ rdt_datas_last->data = page;
+ rdt_datas_last->len = bimg.hole_offset;
- info |= XLR_BKP_BLOCK(i);
+ rdt_datas_last->next = &regbuf->bkp_rdatas[1];
+ rdt_datas_last = rdt_datas_last->next;
- bkpb = &(dtbuf_xlg[i]);
- page = (char *) BufferGetBlock(dtbuf[i]);
+ rdt_datas_last->data = page + (bimg.hole_offset + bimg.hole_length);
+ rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + bimg.hole_length);
+ }
+ }
- rdt->next = &(dtbuf_rdt1[i]);
- rdt = rdt->next;
+ if (needs_data)
+ {
+ /*
+ * Link the caller-supplied rdata chain for this buffer to the
+ * overall list.
+ */
+ bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
+ bkpb.data_length = regbuf->rdata_len;
+ total_len += regbuf->rdata_len;
+
+ rdt_datas_last->next = regbuf->rdata_head;
+ rdt_datas_last = regbuf->rdata_tail;
+ }
- rdt->data = (char *) bkpb;
- rdt->len = sizeof(BkpBlock);
- total_len += sizeof(BkpBlock);
+ if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
+ {
+ samerel = true;
+ bkpb.fork_flags |= BKPBLOCK_SAME_REL;
+ prev_regbuf = regbuf;
+ }
+ else
+ samerel = false;
- rdt->next = &(dtbuf_rdt2[i]);
- rdt = rdt->next;
+ /* Ok, copy the header to the scratch buffer */
+ memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
+ scratch += SizeOfXLogRecordBlockHeader;
+ if (needs_backup)
+ {
+ memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
+ scratch += SizeOfXLogRecordBlockImageHeader;
+ }
+ if (!samerel)
+ {
+ memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
+ scratch += sizeof(RelFileNode);
+ }
+ memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
+ scratch += sizeof(BlockNumber);
+ }
- if (bkpb->hole_length == 0)
+ /* followed by main data, if any */
+ if (mainrdata_len > 0)
+ {
+ if (mainrdata_len > 255)
{
- rdt->data = page;
- rdt->len = BLCKSZ;
- total_len += BLCKSZ;
- rdt->next = NULL;
+ *(scratch++) = XLR_BLOCK_ID_DATA_LONG;
+ memcpy(scratch, &mainrdata_len, sizeof(uint32));
+ scratch += sizeof(uint32);
}
else
{
- /* must skip the hole */
- rdt->data = page;
- rdt->len = bkpb->hole_offset;
- total_len += bkpb->hole_offset;
-
- rdt->next = &(dtbuf_rdt3[i]);
- rdt = rdt->next;
-
- rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
- rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
- total_len += rdt->len;
- rdt->next = NULL;
+ *(scratch++) = XLR_BLOCK_ID_DATA_SHORT;
+ *(scratch++) = (uint8) mainrdata_len;
}
+ rdt_datas_last->next = mainrdata_head;
+ rdt_datas_last = mainrdata_last;
+ total_len += mainrdata_len;
}
+ rdt_datas_last->next = NULL;
+
+ hdr_rdt.len = (scratch - hdr_scratch);
+ total_len += hdr_rdt.len;
/*
- * We disallow len == 0 because it provides a useful bit of extra error
- * checking in ReadRecord. This means that all callers of XLogInsert
- * must supply at least some not-in-a-buffer data. However, we make an
- * exception for XLOG SWITCH records because we don't want them to ever
- * cross a segment boundary.
+ * Calculate CRC of the data
+ *
+ * Note that the record header isn't added into the CRC initially since we
+ * don't know the prev-link yet. Thus, the CRC will represent the CRC of
+ * the whole record in the order: rdata, then backup blocks, then record
+ * header.
*/
- if (len == 0 && !isLogSwitch)
- elog(PANIC, "invalid xlog record length %u", rechdr->xl_len);
+ INIT_CRC32C(rdata_crc);
+ COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
+ for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
+ COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
/*
* Fill in the fields in the record header. Prev-link is filled in later,
- * once we know where in the WAL the record will be inserted. CRC is also
- * not calculated yet.
+ * once we know where in the WAL the record will be inserted. The CRC does
+ * not include the record header yet.
*/
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = total_len;
- rechdr->xl_len = len; /* doesn't include backup blocks */
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
+ rechdr->xl_crc = rdata_crc;
return &hdr_rdt;
}
@@ -429,45 +752,41 @@ XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
if (lsn <= RedoRecPtr)
{
- XLogRecData rdata[2];
- BkpBlock bkpb;
+ int flags;
char copied_buffer[BLCKSZ];
char *origdata = (char *) BufferGetBlock(buffer);
-
- /* Make a BkpBlock struct representing the buffer */
- XLogFillBkpBlock(buffer, buffer_std, &bkpb);
+ RelFileNode rnode;
+ ForkNumber forkno;
+ BlockNumber blkno;
/*
* Copy buffer so we don't have to worry about concurrent hint bit or
* lsn updates. We assume pd_lower/upper cannot be changed without an
* exclusive lock, so the contents bkp are not racy.
- *
- * With buffer_std set to false, XLogFillBkpBlock() sets hole_length
- * and hole_offset to 0; so the following code is safe for either
- * case.
*/
- memcpy(copied_buffer, origdata, bkpb.hole_offset);
- memcpy(copied_buffer + bkpb.hole_offset,
- origdata + bkpb.hole_offset + bkpb.hole_length,
- BLCKSZ - bkpb.hole_offset - bkpb.hole_length);
+ if (buffer_std)
+ {
+ /* Assume we can omit data between pd_lower and pd_upper */
+ Page page = BufferGetPage(buffer);
+ uint16 lower = ((PageHeader) page)->pd_lower;
+ uint16 upper = ((PageHeader) page)->pd_upper;
- /*
- * Header for backup block.
- */
- rdata[0].data = (char *) &bkpb;
- rdata[0].len = sizeof(BkpBlock);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ memcpy(copied_buffer, origdata, lower);
+ memcpy(copied_buffer + upper, origdata + upper, BLCKSZ - upper);
+ }
+ else
+ memcpy(copied_buffer, origdata, BLCKSZ);
- /*
- * Save copy of the buffer.
- */
- rdata[1].data = copied_buffer;
- rdata[1].len = BLCKSZ - bkpb.hole_length;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
+ XLogBeginInsert();
- recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
+ flags = REGBUF_FORCE_IMAGE;
+ if (buffer_std)
+ flags |= REGBUF_STANDARD;
+
+ BufferGetTag(buffer, &rnode, &forkno, &blkno);
+ XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags);
+
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
}
return recptr;
@@ -489,71 +808,16 @@ XLogRecPtr
log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
Page page, bool page_std)
{
- BkpBlock bkpb;
+ int flags;
XLogRecPtr recptr;
- XLogRecData rdata[3];
-
- /* NO ELOG(ERROR) from here till newpage op is logged */
- START_CRIT_SECTION();
-
- bkpb.node = *rnode;
- bkpb.fork = forkNum;
- bkpb.block = blkno;
+ flags = REGBUF_FORCE_IMAGE;
if (page_std)
- {
- /* Assume we can omit data between pd_lower and pd_upper */
- uint16 lower = ((PageHeader) page)->pd_lower;
- uint16 upper = ((PageHeader) page)->pd_upper;
-
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
- {
- bkpb.hole_offset = lower;
- bkpb.hole_length = upper - lower;
- }
- else
- {
- /* No "hole" to compress out */
- bkpb.hole_offset = 0;
- bkpb.hole_length = 0;
- }
- }
- else
- {
- /* Not a standard page header, don't try to eliminate "hole" */
- bkpb.hole_offset = 0;
- bkpb.hole_length = 0;
- }
-
- rdata[0].data = (char *) &bkpb;
- rdata[0].len = sizeof(BkpBlock);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- if (bkpb.hole_length == 0)
- {
- rdata[1].data = (char *) page;
- rdata[1].len = BLCKSZ;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
- }
- else
- {
- /* must skip the hole */
- rdata[1].data = (char *) page;
- rdata[1].len = bkpb.hole_offset;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &rdata[2];
-
- rdata[2].data = (char *) page + (bkpb.hole_offset + bkpb.hole_length);
- rdata[2].len = BLCKSZ - (bkpb.hole_offset + bkpb.hole_length);
- rdata[2].buffer = InvalidBuffer;
- rdata[2].next = NULL;
- }
+ flags |= REGBUF_STANDARD;
- recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
+ XLogBeginInsert();
+ XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
/*
* The page may be uninitialized. If so, we can't set the LSN because that
@@ -564,8 +828,6 @@ log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
PageSetLSN(page, recptr);
}
- END_CRIT_SECTION();
-
return recptr;
}
@@ -596,38 +858,38 @@ log_newpage_buffer(Buffer buffer, bool page_std)
}
/*
- * Fill a BkpBlock for a buffer.
+ * Allocate working buffers needed for WAL record construction.
*/
-static void
-XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb)
+void
+InitXLogInsert(void)
{
- BufferGetTag(buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
+ /* Initialize the working areas */
+ if (xloginsert_cxt == NULL)
+ {
+ xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
+ "WAL record construction",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ }
- if (buffer_std)
+ if (registered_buffers == NULL)
{
- /* Assume we can omit data between pd_lower and pd_upper */
- Page page = BufferGetPage(buffer);
- uint16 lower = ((PageHeader) page)->pd_lower;
- uint16 upper = ((PageHeader) page)->pd_upper;
-
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
- {
- bkpb->hole_offset = lower;
- bkpb->hole_length = upper - lower;
- }
- else
- {
- /* No "hole" to compress out */
- bkpb->hole_offset = 0;
- bkpb->hole_length = 0;
- }
+ registered_buffers = (registered_buffer *)
+ MemoryContextAllocZero(xloginsert_cxt,
+ sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
+ max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
}
- else
+ if (rdatas == NULL)
{
- /* Not a standard page header, don't try to eliminate "hole" */
- bkpb->hole_offset = 0;
- bkpb->hole_length = 0;
+ rdatas = MemoryContextAlloc(xloginsert_cxt,
+ sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
+ max_rdatas = XLR_NORMAL_RDATAS;
}
+
+ /*
+ * Allocate a buffer to hold the header information for a WAL record.
+ */
+ if (hdr_scratch == NULL)
+ hdr_scratch = palloc0(HEADER_SCRATCH_SIZE);
}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7d573cc585d..67d62234369 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -37,6 +37,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
the supplied arguments. */
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
+static void ResetDecoder(XLogReaderState *state);
+
/* size of the buffer allocated for error message. */
#define MAX_ERRORMSG_LEN 1000
@@ -59,46 +61,33 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
/*
* Allocate and initialize a new XLogReader.
*
- * Returns NULL if the xlogreader couldn't be allocated.
+ * The returned XLogReader is palloc'd. (In FRONTEND code, that means that
+ * running out-of-memory causes an immediate exit(1).
*/
XLogReaderState *
XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
{
XLogReaderState *state;
- AssertArg(pagereadfunc != NULL);
+ state = (XLogReaderState *) palloc0(sizeof(XLogReaderState));
- state = (XLogReaderState *) malloc(sizeof(XLogReaderState));
- if (!state)
- return NULL;
- MemSet(state, 0, sizeof(XLogReaderState));
+ state->max_block_id = -1;
/*
* Permanently allocate readBuf. We do it this way, rather than just
* making a static array, for two reasons: (1) no need to waste the
* storage in most instantiations of the backend; (2) a static char array
- * isn't guaranteed to have any particular alignment, whereas malloc()
+ * isn't guaranteed to have any particular alignment, whereas palloc()
* will provide MAXALIGN'd storage.
*/
- state->readBuf = (char *) malloc(XLOG_BLCKSZ);
- if (!state->readBuf)
- {
- free(state);
- return NULL;
- }
+ state->readBuf = (char *) palloc(XLOG_BLCKSZ);
state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */
state->private_data = private_data;
/* ReadRecPtr and EndRecPtr initialized to zeroes above */
/* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
- state->errormsg_buf = malloc(MAX_ERRORMSG_LEN + 1);
- if (!state->errormsg_buf)
- {
- free(state->readBuf);
- free(state);
- return NULL;
- }
+ state->errormsg_buf = palloc(MAX_ERRORMSG_LEN + 1);
state->errormsg_buf[0] = '\0';
/*
@@ -107,9 +96,9 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
*/
if (!allocate_recordbuf(state, 0))
{
- free(state->errormsg_buf);
- free(state->readBuf);
- free(state);
+ pfree(state->errormsg_buf);
+ pfree(state->readBuf);
+ pfree(state);
return NULL;
}
@@ -119,11 +108,24 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
void
XLogReaderFree(XLogReaderState *state)
{
- free(state->errormsg_buf);
+ int block_id;
+
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ {
+ if (state->blocks[block_id].in_use)
+ {
+ if (state->blocks[block_id].data)
+ pfree(state->blocks[block_id].data);
+ }
+ }
+ if (state->main_data)
+ pfree(state->main_data);
+
+ pfree(state->errormsg_buf);
if (state->readRecordBuf)
- free(state->readRecordBuf);
- free(state->readBuf);
- free(state);
+ pfree(state->readRecordBuf);
+ pfree(state->readBuf);
+ pfree(state);
}
/*
@@ -146,14 +148,8 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
if (state->readRecordBuf)
- free(state->readRecordBuf);
- state->readRecordBuf = (char *) malloc(newSize);
- if (!state->readRecordBuf)
- {
- state->readRecordBufSize = 0;
- return false;
- }
-
+ pfree(state->readRecordBuf);
+ state->readRecordBuf = (char *) palloc(newSize);
state->readRecordBufSize = newSize;
return true;
}
@@ -191,6 +187,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
*errormsg = NULL;
state->errormsg_buf[0] = '\0';
+ ResetDecoder(state);
+
if (RecPtr == InvalidXLogRecPtr)
{
RecPtr = state->EndRecPtr;
@@ -440,7 +438,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
}
- return record;
+ if (DecodeXLogRecord(state, record, errormsg))
+ return record;
+ else
+ return NULL;
err:
@@ -579,30 +580,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
XLogRecPtr PrevRecPtr, XLogRecord *record,
bool randAccess)
{
- /*
- * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
- * required.
- */
- if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
- {
- if (record->xl_len != 0)
- {
- report_invalid_record(state,
- "invalid xlog switch record at %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- return false;
- }
- }
- else if (record->xl_len == 0)
- {
- report_invalid_record(state,
- "record with zero length at %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- return false;
- }
- if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
- record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
- XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
+ if (record->xl_tot_len < SizeOfXLogRecord)
{
report_invalid_record(state,
"invalid record length at %X/%X",
@@ -663,79 +641,17 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
* We assume all of the record (that is, xl_tot_len bytes) has been read
* into memory at *record. Also, ValidXLogRecordHeader() has accepted the
* record's header, which means in particular that xl_tot_len is at least
- * SizeOfXlogRecord, so it is safe to fetch xl_len.
+ * SizeOfXlogRecord.
*/
static bool
ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
{
pg_crc32 crc;
- int i;
- uint32 len = record->xl_len;
- BkpBlock bkpb;
- char *blk;
- size_t remaining = record->xl_tot_len;
- /* First the rmgr data */
- if (remaining < SizeOfXLogRecord + len)
- {
- /* ValidXLogRecordHeader() should've caught this already... */
- report_invalid_record(state, "invalid record length at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- remaining -= SizeOfXLogRecord + len;
+ /* Calculate the CRC */
INIT_CRC32C(crc);
- COMP_CRC32C(crc, XLogRecGetData(record), len);
-
- /* Add in the backup blocks, if any */
- blk = (char *) XLogRecGetData(record) + len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- uint32 blen;
-
- if (!(record->xl_info & XLR_BKP_BLOCK(i)))
- continue;
-
- if (remaining < sizeof(BkpBlock))
- {
- report_invalid_record(state,
- "invalid backup block size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- memcpy(&bkpb, blk, sizeof(BkpBlock));
-
- if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
- {
- report_invalid_record(state,
- "incorrect hole size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
-
- if (remaining < blen)
- {
- report_invalid_record(state,
- "invalid backup block size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- remaining -= blen;
- COMP_CRC32C(crc, blk, blen);
- blk += blen;
- }
-
- /* Check that xl_tot_len agrees with our calculation */
- if (remaining != 0)
- {
- report_invalid_record(state,
- "incorrect total length in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
-
- /* Finally include the record header */
+ COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
+ /* include the record header last */
COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(crc);
@@ -985,3 +901,321 @@ out:
}
#endif /* FRONTEND */
+
+
+/* ----------------------------------------
+ * Functions for decoding the data and block references in a record.
+ * ----------------------------------------
+ */
+
+/* private function to reset the state between records */
+static void
+ResetDecoder(XLogReaderState *state)
+{
+ int block_id;
+
+ state->decoded_record = NULL;
+
+ state->main_data_len = 0;
+
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ {
+ state->blocks[block_id].in_use = false;
+ state->blocks[block_id].has_image = false;
+ state->blocks[block_id].has_data = false;
+ }
+ state->max_block_id = -1;
+}
+
+/*
+ * Decode the previously read record.
+ *
+ * On error, a human-readable error message is returned in *errormsg, and
+ * the return value is false.
+ */
+bool
+DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
+{
+ /*
+ * read next _size bytes from record buffer, but check for overrun first.
+ */
+#define COPY_HEADER_FIELD(_dst, _size) \
+ do { \
+ if (remaining < _size) \
+ goto shortdata_err; \
+ memcpy(_dst, ptr, _size); \
+ ptr += _size; \
+ remaining -= _size; \
+ } while(0)
+
+ char *ptr;
+ uint32 remaining;
+ uint32 datatotal;
+ RelFileNode *rnode = NULL;
+ uint8 block_id;
+
+ ResetDecoder(state);
+
+ state->decoded_record = record;
+
+ ptr = (char *) record;
+ ptr += SizeOfXLogRecord;
+ remaining = record->xl_tot_len - SizeOfXLogRecord;
+
+ /* Decode the headers */
+ datatotal = 0;
+ while (remaining > datatotal)
+ {
+ COPY_HEADER_FIELD(&block_id, sizeof(uint8));
+
+ if (block_id == XLR_BLOCK_ID_DATA_SHORT)
+ {
+ /* XLogRecordDataHeaderShort */
+ uint8 main_data_len;
+
+ COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
+
+ state->main_data_len = main_data_len;
+ datatotal += main_data_len;
+ break; /* by convention, the main data fragment is
+ * always last */
+ }
+ else if (block_id == XLR_BLOCK_ID_DATA_LONG)
+ {
+ /* XLogRecordDataHeaderLong */
+ uint32 main_data_len;
+
+ COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
+ state->main_data_len = main_data_len;
+ datatotal += main_data_len;
+ break; /* by convention, the main data fragment is
+ * always last */
+ }
+ else if (block_id <= XLR_MAX_BLOCK_ID)
+ {
+ /* XLogRecordBlockHeader */
+ DecodedBkpBlock *blk;
+ uint8 fork_flags;
+
+ if (block_id <= state->max_block_id)
+ {
+ report_invalid_record(state,
+ "out-of-order block_id %u at %X/%X",
+ block_id,
+ (uint32) (state->ReadRecPtr >> 32),
+ (uint32) state->ReadRecPtr);
+ goto err;
+ }
+ state->max_block_id = block_id;
+
+ blk = &state->blocks[block_id];
+ blk->in_use = true;
+
+ COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
+ blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
+ blk->flags = fork_flags;
+ blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
+ blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
+
+ COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
+ /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
+ if (blk->has_data && blk->data_len == 0)
+ report_invalid_record(state,
+ "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ if (!blk->has_data && blk->data_len != 0)
+ report_invalid_record(state,
+ "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
+ (unsigned int) blk->data_len,
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ datatotal += blk->data_len;
+
+ if (blk->has_image)
+ {
+ COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
+ COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
+ datatotal += BLCKSZ - blk->hole_length;
+ }
+ if (!(fork_flags & BKPBLOCK_SAME_REL))
+ {
+ COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
+ rnode = &blk->rnode;
+ }
+ else
+ {
+ if (rnode == NULL)
+ {
+ report_invalid_record(state,
+ "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ goto err;
+ }
+
+ blk->rnode = *rnode;
+ }
+ COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
+ }
+ else
+ {
+ report_invalid_record(state,
+ "invalid block_id %u at %X/%X",
+ block_id,
+ (uint32) (state->ReadRecPtr >> 32),
+ (uint32) state->ReadRecPtr);
+ goto err;
+ }
+ }
+
+ if (remaining != datatotal)
+ goto shortdata_err;
+
+ /*
+ * Ok, we've parsed the fragment headers, and verified that the total
+ * length of the payload in the fragments is equal to the amount of data
+ * left. Copy the data of each fragment to a separate buffer.
+ *
+ * We could just set up pointers into readRecordBuf, but we want to align
+ * the data for the convenience of the callers. Backup images are not
+ * copied, however; they don't need alignment.
+ */
+
+ /* block data first */
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ {
+ DecodedBkpBlock *blk = &state->blocks[block_id];
+
+ if (!blk->in_use)
+ continue;
+ if (blk->has_image)
+ {
+ blk->bkp_image = ptr;
+ ptr += BLCKSZ - blk->hole_length;
+ }
+ if (blk->has_data)
+ {
+ if (!blk->data || blk->data_len > blk->data_bufsz)
+ {
+ if (blk->data)
+ pfree(blk->data);
+ blk->data_bufsz = blk->data_len;
+ blk->data = palloc(blk->data_bufsz);
+ }
+ memcpy(blk->data, ptr, blk->data_len);
+ ptr += blk->data_len;
+ }
+ }
+
+ /* and finally, the main data */
+ if (state->main_data_len > 0)
+ {
+ if (!state->main_data || state->main_data_len > state->main_data_bufsz)
+ {
+ if (state->main_data)
+ pfree(state->main_data);
+ state->main_data_bufsz = state->main_data_len;
+ state->main_data = palloc(state->main_data_bufsz);
+ }
+ memcpy(state->main_data, ptr, state->main_data_len);
+ ptr += state->main_data_len;
+ }
+
+ return true;
+
+shortdata_err:
+ report_invalid_record(state,
+ "record with invalid length at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+err:
+ *errormsg = state->errormsg_buf;
+
+ return false;
+}
+
+/*
+ * Returns information about the block that a block reference refers to.
+ *
+ * If the WAL record contains a block reference with the given ID, *rnode,
+ * *forknum, and *blknum are filled in (if not NULL), and returns TRUE.
+ * Otherwise returns FALSE.
+ */
+bool
+XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
+ RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
+{
+ DecodedBkpBlock *bkpb;
+
+ if (!record->blocks[block_id].in_use)
+ return false;
+
+ bkpb = &record->blocks[block_id];
+ if (rnode)
+ *rnode = bkpb->rnode;
+ if (forknum)
+ *forknum = bkpb->forknum;
+ if (blknum)
+ *blknum = bkpb->blkno;
+ return true;
+}
+
+/*
+ * Returns the data associated with a block reference, or NULL if there is
+ * no data (e.g. because a full-page image was taken instead). The returned
+ * pointer points to a MAXALIGNed buffer.
+ */
+char *
+XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
+{
+ DecodedBkpBlock *bkpb;
+
+ if (!record->blocks[block_id].in_use)
+ return NULL;
+
+ bkpb = &record->blocks[block_id];
+
+ if (!bkpb->has_data)
+ {
+ if (len)
+ *len = 0;
+ return NULL;
+ }
+ else
+ {
+ if (len)
+ *len = bkpb->data_len;
+ return bkpb->data;
+ }
+}
+
+/*
+ * Restore a full-page image from a backup block attached to an XLOG record.
+ *
+ * Returns the buffer number containing the page.
+ */
+bool
+RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
+{
+ DecodedBkpBlock *bkpb;
+
+ if (!record->blocks[block_id].in_use)
+ return false;
+ if (!record->blocks[block_id].has_image)
+ return false;
+
+ bkpb = &record->blocks[block_id];
+
+ if (bkpb->hole_length == 0)
+ {
+ memcpy(page, bkpb->bkp_image, BLCKSZ);
+ }
+ else
+ {
+ memcpy(page, bkpb->bkp_image, bkpb->hole_offset);
+ /* must zero-fill the hole */
+ MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
+ memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
+ bkpb->bkp_image + bkpb->hole_offset,
+ BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
+ }
+
+ return true;
+}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index cf04081c19e..ae323a0db87 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -253,9 +253,8 @@ XLogCheckInvalidPages(void)
*
* 'lsn' is the LSN of the record being replayed. It is compared with the
* page's LSN to determine if the record has already been replayed.
- * 'rnode' and 'blkno' point to the block being replayed (main fork number
- * is implied, use XLogReadBufferForRedoExtended for other forks).
- * 'block_index' identifies the backup block in the record for the page.
+ * 'block_id' is the ID number the block was registered with, when the WAL
+ * record was created.
*
* Returns one of the following:
*
@@ -272,15 +271,36 @@ XLogCheckInvalidPages(void)
* single-process crash recovery, but some subroutines such as MarkBufferDirty
* will complain if we don't have the lock. In hot standby mode it's
* definitely necessary.)
+ *
+ * Note: when a backup block is available in XLOG, we restore it
+ * unconditionally, even if the page in the database appears newer. This is
+ * to protect ourselves against database pages that were partially or
+ * incorrectly written during a crash. We assume that the XLOG data must be
+ * good because it has passed a CRC check, while the database page might not
+ * be. This will force us to replay all subsequent modifications of the page
+ * that appear in XLOG, rather than possibly ignoring them as already
+ * applied, but that's not a huge drawback.
*/
XLogRedoAction
-XLogReadBufferForRedo(XLogRecPtr lsn, XLogRecord *record, int block_index,
- RelFileNode rnode, BlockNumber blkno,
+XLogReadBufferForRedo(XLogReaderState *record, uint8 block_id,
Buffer *buf)
{
- return XLogReadBufferForRedoExtended(lsn, record, block_index,
- rnode, MAIN_FORKNUM, blkno,
- RBM_NORMAL, false, buf);
+ return XLogReadBufferForRedoExtended(record, block_id, RBM_NORMAL,
+ false, buf);
+}
+
+/*
+ * Pin and lock a buffer referenced by a WAL record, for the purpose of
+ * re-initializing it.
+ */
+Buffer
+XLogInitBufferForRedo(XLogReaderState *record, uint8 block_id)
+{
+ Buffer buf;
+
+ XLogReadBufferForRedoExtended(record, block_id, RBM_ZERO_AND_LOCK, false,
+ &buf);
+ return buf;
}
/*
@@ -299,21 +319,54 @@ XLogReadBufferForRedo(XLogRecPtr lsn, XLogRecord *record, int block_index,
* using LockBufferForCleanup(), instead of a regular exclusive lock.
*/
XLogRedoAction
-XLogReadBufferForRedoExtended(XLogRecPtr lsn, XLogRecord *record,
- int block_index, RelFileNode rnode,
- ForkNumber forkno, BlockNumber blkno,
+XLogReadBufferForRedoExtended(XLogReaderState *record,
+ uint8 block_id,
ReadBufferMode mode, bool get_cleanup_lock,
Buffer *buf)
{
- if (record->xl_info & XLR_BKP_BLOCK(block_index))
+ XLogRecPtr lsn = record->EndRecPtr;
+ RelFileNode rnode;
+ ForkNumber forknum;
+ BlockNumber blkno;
+ Page page;
+
+ if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
+ {
+ /* Caller specified a bogus block_id */
+ elog(PANIC, "failed to locate backup block with ID %d", block_id);
+ }
+
+ /* If it's a full-page image, restore it. */
+ if (XLogRecHasBlockImage(record, block_id))
{
- *buf = RestoreBackupBlock(lsn, record, block_index,
- get_cleanup_lock, true);
+ *buf = XLogReadBufferExtended(rnode, forknum, blkno,
+ get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK);
+ page = BufferGetPage(*buf);
+ if (!RestoreBlockImage(record, block_id, page))
+ elog(ERROR, "failed to restore block image");
+
+ /*
+ * The page may be uninitialized. If so, we can't set the LSN because
+ * that would corrupt the page.
+ */
+ if (!PageIsNew(page))
+ {
+ PageSetLSN(page, lsn);
+ }
+
+ MarkBufferDirty(*buf);
+
return BLK_RESTORED;
}
else
{
- *buf = XLogReadBufferExtended(rnode, forkno, blkno, mode);
+ if ((record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0 &&
+ mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK)
+ {
+ elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine");
+ }
+
+ *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode);
if (BufferIsValid(*buf))
{
if (mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK)
@@ -334,37 +387,6 @@ XLogReadBufferForRedoExtended(XLogRecPtr lsn, XLogRecord *record,
}
/*
- * XLogReadBuffer
- * Read a page during XLOG replay.
- *
- * This is a shorthand of XLogReadBufferExtended() followed by
- * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), for reading from the main
- * fork.
- *
- * (Getting the buffer lock is not really necessary during single-process
- * crash recovery, but some subroutines such as MarkBufferDirty will complain
- * if we don't have the lock. In hot standby mode it's definitely necessary.)
- *
- * The returned buffer is exclusively-locked.
- *
- * For historical reasons, instead of a ReadBufferMode argument, this only
- * supports RBM_ZERO_AND_LOCK (init == true) and RBM_NORMAL (init == false)
- * modes.
- */
-Buffer
-XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
-{
- Buffer buf;
-
- buf = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno,
- init ? RBM_ZERO_AND_LOCK : RBM_NORMAL);
- if (BufferIsValid(buf) && !init)
- LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
-
- return buf;
-}
-
-/*
* XLogReadBufferExtended
* Read a page during XLOG replay
*
@@ -383,6 +405,11 @@ XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init)
* In RBM_NORMAL_NO_LOG mode, we return InvalidBuffer if the page doesn't
* exist, and we don't check for all-zeroes. Thus, no log entry is made
* to imply that the page should be dropped or truncated later.
+ *
+ * NB: A redo function should normally not call this directly. To get a page
+ * to modify, use XLogReplayBuffer instead. It is important that all pages
+ * modified by a WAL record are registered in the WAL records, or they will be
+ * invisible to tools that that need to know which pages are modified.
*/
Buffer
XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
@@ -474,124 +501,6 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
}
/*
- * Restore a full-page image from a backup block attached to an XLOG record.
- *
- * lsn: LSN of the XLOG record being replayed
- * record: the complete XLOG record
- * block_index: which backup block to restore (0 .. XLR_MAX_BKP_BLOCKS - 1)
- * get_cleanup_lock: TRUE to get a cleanup rather than plain exclusive lock
- * keep_buffer: TRUE to return the buffer still locked and pinned
- *
- * Returns the buffer number containing the page. Note this is not terribly
- * useful unless keep_buffer is specified as TRUE.
- *
- * Note: when a backup block is available in XLOG, we restore it
- * unconditionally, even if the page in the database appears newer.
- * This is to protect ourselves against database pages that were partially
- * or incorrectly written during a crash. We assume that the XLOG data
- * must be good because it has passed a CRC check, while the database
- * page might not be. This will force us to replay all subsequent
- * modifications of the page that appear in XLOG, rather than possibly
- * ignoring them as already applied, but that's not a huge drawback.
- *
- * If 'get_cleanup_lock' is true, a cleanup lock is obtained on the buffer,
- * else a normal exclusive lock is used. During crash recovery, that's just
- * pro forma because there can't be any regular backends in the system, but
- * in hot standby mode the distinction is important.
- *
- * If 'keep_buffer' is true, return without releasing the buffer lock and pin;
- * then caller is responsible for doing UnlockReleaseBuffer() later. This
- * is needed in some cases when replaying XLOG records that touch multiple
- * pages, to prevent inconsistent states from being visible to other backends.
- * (Again, that's only important in hot standby mode.)
- */
-Buffer
-RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index,
- bool get_cleanup_lock, bool keep_buffer)
-{
- BkpBlock bkpb;
- char *blk;
- int i;
-
- /* Locate requested BkpBlock in the record */
- blk = (char *) XLogRecGetData(record) + record->xl_len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- if (!(record->xl_info & XLR_BKP_BLOCK(i)))
- continue;
-
- memcpy(&bkpb, blk, sizeof(BkpBlock));
- blk += sizeof(BkpBlock);
-
- if (i == block_index)
- {
- /* Found it, apply the update */
- return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock,
- keep_buffer);
- }
-
- blk += BLCKSZ - bkpb.hole_length;
- }
-
- /* Caller specified a bogus block_index */
- elog(ERROR, "failed to restore block_index %d", block_index);
- return InvalidBuffer; /* keep compiler quiet */
-}
-
-/*
- * Workhorse for RestoreBackupBlock usable without an xlog record
- *
- * Restores a full-page image from BkpBlock and a data pointer.
- */
-Buffer
-RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk,
- bool get_cleanup_lock, bool keep_buffer)
-{
- Buffer buffer;
- Page page;
-
- buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block,
- get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK);
- Assert(BufferIsValid(buffer));
-
- page = (Page) BufferGetPage(buffer);
-
- if (bkpb.hole_length == 0)
- {
- memcpy((char *) page, blk, BLCKSZ);
- }
- else
- {
- memcpy((char *) page, blk, bkpb.hole_offset);
- /* must zero-fill the hole */
- MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length);
- memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length),
- blk + bkpb.hole_offset,
- BLCKSZ - (bkpb.hole_offset + bkpb.hole_length));
- }
-
- /*
- * The checksum value on this page is currently invalid. We don't need to
- * reset it here since it will be set before being written.
- */
-
- /*
- * The page may be uninitialized. If so, we can't set the LSN because that
- * would corrupt the page.
- */
- if (!PageIsNew(page))
- {
- PageSetLSN(page, lsn);
- }
- MarkBufferDirty(buffer);
-
- if (!keep_buffer)
- UnlockReleaseBuffer(buffer);
-
- return buffer;
-}
-
-/*
* Struct actually returned by XLogFakeRelcacheEntry, though the declared
* return type is Relation.
*/