diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-11-20 17:56:26 +0200 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-11-20 18:46:41 +0200 |
commit | 2c03216d831160bedd72d45f712601b6f7d03f1c (patch) | |
tree | ab6a03d031ffa605d848b0b7067add15e56e2207 /src/backend/access/spgist/spgdoinsert.c | |
parent | 8dc626defec23016dd5988208d8704b858b9d21d (diff) | |
download | postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.tar.gz postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.zip |
Revamp the WAL record format.
Each WAL record now carries information about the modified relation and
block(s) in a standardized format. That makes it easier to write tools that
need that information, like pg_rewind, prefetching the blocks to speed up
recovery, etc.
There's a whole new API for building WAL records, replacing the XLogRecData
chains used previously. The new API consists of XLogRegister* functions,
which are called for each buffer and chunk of data that is added to the
record. The new API also gives more control over when a full-page image is
written, by passing flags to the XLogRegisterBuffer function.
This also simplifies the XLogReadBufferForRedo() calls. The function can dig
the relation and block number from the WAL record, so they no longer need to
be passed as arguments.
For the convenience of redo routines, XLogReader now disects each WAL record
after reading it, copying the main data part and the per-block data into
MAXALIGNed buffers. The data chunks are not aligned within the WAL record,
but the redo routines can assume that the pointers returned by XLogRecGet*
functions are. Redo routines are now passed the XLogReaderState, which
contains the record in the already-disected format, instead of the plain
XLogRecord.
The new record format also makes the fixed size XLogRecord header smaller,
by removing the xl_len field. The length of the "main data" portion is now
stored at the end of the WAL record, and there's a separate header after
XLogRecord for it. The alignment padding at the end of XLogRecord is also
removed. This compansates for the fact that the new format would otherwise
be more bulky than the old format.
Reviewed by Andres Freund, Amit Kapila, Michael Paquier, Alvaro Herrera,
Fujii Masao.
Diffstat (limited to 'src/backend/access/spgist/spgdoinsert.c')
-rw-r--r-- | src/backend/access/spgist/spgdoinsert.c | 243 |
1 files changed, 125 insertions, 118 deletions
diff --git a/src/backend/access/spgist/spgdoinsert.c b/src/backend/access/spgist/spgdoinsert.c index 21a071ab199..1a17cc467ed 100644 --- a/src/backend/access/spgist/spgdoinsert.c +++ b/src/backend/access/spgist/spgdoinsert.c @@ -16,8 +16,8 @@ #include "postgres.h" #include "access/genam.h" -#include "access/xloginsert.h" #include "access/spgist_private.h" +#include "access/xloginsert.h" #include "miscadmin.h" #include "storage/bufmgr.h" #include "utils/rel.h" @@ -202,25 +202,17 @@ static void addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew) { - XLogRecData rdata[4]; spgxlogAddLeaf xlrec; - xlrec.node = index->rd_node; - xlrec.blknoLeaf = current->blkno; xlrec.newPage = isNew; xlrec.storesNulls = isNulls; /* these will be filled below as needed */ xlrec.offnumLeaf = InvalidOffsetNumber; xlrec.offnumHeadLeaf = InvalidOffsetNumber; - xlrec.blknoParent = InvalidBlockNumber; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; - ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); - ACCEPT_RDATA_DATA(leafTuple, leafTuple->size, 1); - ACCEPT_RDATA_BUFFER(current->buffer, 2); - START_CRIT_SECTION(); if (current->offnum == InvalidOffsetNumber || @@ -237,13 +229,10 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, /* Must update parent's downlink if any */ if (parent->buffer != InvalidBuffer) { - xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); - - ACCEPT_RDATA_BUFFER(parent->buffer, 3); } } else @@ -303,12 +292,20 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF, rdata); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) leafTuple, leafTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + if (xlrec.offnumParent != InvalidOffsetNumber) + XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF); PageSetLSN(current->page, recptr); /* update parent only if we actually changed it */ - if (xlrec.blknoParent != InvalidBlockNumber) + if (xlrec.offnumParent != InvalidOffsetNumber) { PageSetLSN(parent->page, recptr); } @@ -399,7 +396,6 @@ moveLeafs(Relation index, SpGistState *state, OffsetNumber *toDelete; OffsetNumber *toInsert; BlockNumber nblkno; - XLogRecData rdata[7]; spgxlogMoveLeafs xlrec; char *leafdata, *leafptr; @@ -455,20 +451,6 @@ moveLeafs(Relation index, SpGistState *state, nblkno = BufferGetBlockNumber(nbuf); Assert(nblkno != current->blkno); - /* prepare WAL info */ - xlrec.node = index->rd_node; - STORE_STATE(state, xlrec.stateSrc); - - xlrec.blknoSrc = current->blkno; - xlrec.blknoDst = nblkno; - xlrec.nMoves = nDelete; - xlrec.replaceDead = replaceDead; - xlrec.storesNulls = isNulls; - - xlrec.blknoParent = parent->blkno; - xlrec.offnumParent = parent->offnum; - xlrec.nodeI = parent->node; - leafdata = leafptr = palloc(size); START_CRIT_SECTION(); @@ -533,15 +515,29 @@ moveLeafs(Relation index, SpGistState *state, { XLogRecPtr recptr; - ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogMoveLeafs, 0); - ACCEPT_RDATA_DATA(toDelete, sizeof(OffsetNumber) * nDelete, 1); - ACCEPT_RDATA_DATA(toInsert, sizeof(OffsetNumber) * nInsert, 2); - ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, 3); - ACCEPT_RDATA_BUFFER(current->buffer, 4); - ACCEPT_RDATA_BUFFER(nbuf, 5); - ACCEPT_RDATA_BUFFER(parent->buffer, 6); + /* prepare WAL info */ + STORE_STATE(state, xlrec.stateSrc); - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS, rdata); + xlrec.nMoves = nDelete; + xlrec.replaceDead = replaceDead; + xlrec.storesNulls = isNulls; + + xlrec.offnumParent = parent->offnum; + xlrec.nodeI = parent->node; + + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogMoveLeafs); + XLogRegisterData((char *) toDelete, + sizeof(OffsetNumber) * nDelete); + XLogRegisterData((char *) toInsert, + sizeof(OffsetNumber) * nInsert); + XLogRegisterData((char *) leafdata, leafptr - leafdata); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0)); + XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS); PageSetLSN(current->page, recptr); PageSetLSN(npage, recptr); @@ -701,8 +697,6 @@ doPickSplit(Relation index, SpGistState *state, int currentFreeSpace; int totalLeafSizes; bool allTheSame; - XLogRecData rdata[10]; - int nRdata; spgxlogPickSplit xlrec; char *leafdata, *leafptr; @@ -725,7 +719,6 @@ doPickSplit(Relation index, SpGistState *state, newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n); leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n); - xlrec.node = index->rd_node; STORE_STATE(state, xlrec.stateSrc); /* @@ -971,10 +964,6 @@ doPickSplit(Relation index, SpGistState *state, } /* - * Because a WAL record can't involve more than four buffers, we can only - * afford to deal with two leaf pages in each picksplit action, ie the - * current page and at most one other. - * * The new leaf tuples converted from the existing ones should require the * same or less space, and therefore should all fit onto one page * (although that's not necessarily the current page, since we can't @@ -1108,17 +1097,13 @@ doPickSplit(Relation index, SpGistState *state, } /* Start preparing WAL record */ - xlrec.blknoSrc = current->blkno; - xlrec.blknoDest = InvalidBlockNumber; xlrec.nDelete = 0; xlrec.initSrc = isNew; xlrec.storesNulls = isNulls; + xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno); leafdata = leafptr = (char *) palloc(totalLeafSizes); - ACCEPT_RDATA_DATA(&xlrec, SizeOfSpgxlogPickSplit, 0); - nRdata = 1; - /* Here we begin making the changes to the target pages */ START_CRIT_SECTION(); @@ -1150,12 +1135,6 @@ doPickSplit(Relation index, SpGistState *state, else { xlrec.nDelete = nToDelete; - ACCEPT_RDATA_DATA(toDelete, - sizeof(OffsetNumber) * nToDelete, - nRdata); - nRdata++; - ACCEPT_RDATA_BUFFER(current->buffer, nRdata); - nRdata++; if (!state->isBuild) { @@ -1240,25 +1219,8 @@ doPickSplit(Relation index, SpGistState *state, if (newLeafBuffer != InvalidBuffer) { MarkBufferDirty(newLeafBuffer); - /* also save block number for WAL */ - xlrec.blknoDest = BufferGetBlockNumber(newLeafBuffer); - if (!xlrec.initDest) - { - ACCEPT_RDATA_BUFFER(newLeafBuffer, nRdata); - nRdata++; - } } - xlrec.nInsert = nToInsert; - ACCEPT_RDATA_DATA(toInsert, sizeof(OffsetNumber) * nToInsert, nRdata); - nRdata++; - ACCEPT_RDATA_DATA(leafPageSelect, sizeof(uint8) * nToInsert, nRdata); - nRdata++; - ACCEPT_RDATA_DATA(innerTuple, innerTuple->size, nRdata); - nRdata++; - ACCEPT_RDATA_DATA(leafdata, leafptr - leafdata, nRdata); - nRdata++; - /* Remember current buffer, since we're about to change "current" */ saveCurrent = *current; @@ -1276,7 +1238,6 @@ doPickSplit(Relation index, SpGistState *state, current->blkno = parent->blkno; current->buffer = parent->buffer; current->page = parent->page; - xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = SpGistPageAddNewItem(state, current->page, (Item) innerTuple, innerTuple->size, @@ -1285,14 +1246,11 @@ doPickSplit(Relation index, SpGistState *state, /* * Update parent node link and mark parent page dirty */ - xlrec.blknoParent = parent->blkno; + xlrec.innerIsParent = true; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); - ACCEPT_RDATA_BUFFER(parent->buffer, nRdata); - nRdata++; - /* * Update redirection link (in old current buffer) */ @@ -1314,7 +1272,6 @@ doPickSplit(Relation index, SpGistState *state, current->buffer = newInnerBuffer; current->blkno = BufferGetBlockNumber(current->buffer); current->page = BufferGetPage(current->buffer); - xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = SpGistPageAddNewItem(state, current->page, (Item) innerTuple, innerTuple->size, @@ -1326,16 +1283,11 @@ doPickSplit(Relation index, SpGistState *state, /* * Update parent node link and mark parent page dirty */ - xlrec.blknoParent = parent->blkno; + xlrec.innerIsParent = (parent->buffer == current->buffer); xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; saveNodeLink(index, parent, current->blkno, current->offnum); - ACCEPT_RDATA_BUFFER(current->buffer, nRdata); - nRdata++; - ACCEPT_RDATA_BUFFER(parent->buffer, nRdata); - nRdata++; - /* * Update redirection link (in old current buffer) */ @@ -1357,8 +1309,8 @@ doPickSplit(Relation index, SpGistState *state, SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0)); xlrec.initInner = true; + xlrec.innerIsParent = false; - xlrec.blknoInner = current->blkno; xlrec.offnumInner = current->offnum = PageAddItem(current->page, (Item) innerTuple, innerTuple->size, InvalidOffsetNumber, false, false); @@ -1367,7 +1319,6 @@ doPickSplit(Relation index, SpGistState *state, innerTuple->size); /* No parent link to update, nor redirection to do */ - xlrec.blknoParent = InvalidBlockNumber; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; @@ -1381,9 +1332,46 @@ doPickSplit(Relation index, SpGistState *state, if (RelationNeedsWAL(index)) { XLogRecPtr recptr; + int flags; + + XLogBeginInsert(); + + xlrec.nInsert = nToInsert; + XLogRegisterData((char *) &xlrec, SizeOfSpgxlogPickSplit); + + XLogRegisterData((char *) toDelete, + sizeof(OffsetNumber) * xlrec.nDelete); + XLogRegisterData((char *) toInsert, + sizeof(OffsetNumber) * xlrec.nInsert); + XLogRegisterData((char *) leafPageSelect, + sizeof(uint8) * xlrec.nInsert); + XLogRegisterData((char *) innerTuple, innerTuple->size); + XLogRegisterData(leafdata, leafptr - leafdata); + + flags = REGBUF_STANDARD; + if (xlrec.initSrc) + flags |= REGBUF_WILL_INIT; + if (BufferIsValid(saveCurrent.buffer)) + XLogRegisterBuffer(0, saveCurrent.buffer, flags); + + if (BufferIsValid(newLeafBuffer)) + { + flags = REGBUF_STANDARD; + if (xlrec.initDest) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(1, newLeafBuffer, flags); + } + XLogRegisterBuffer(2, current->buffer, REGBUF_STANDARD); + if (parent->buffer != InvalidBuffer) + { + if (parent->buffer != current->buffer) + XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD); + else + Assert(xlrec.innerIsParent); + } /* Issue the WAL record */ - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT, rdata); + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT); /* Update page LSNs on all affected pages */ if (newLeafBuffer != InvalidBuffer) @@ -1489,7 +1477,6 @@ spgAddNodeAction(Relation index, SpGistState *state, int nodeN, Datum nodeLabel) { SpGistInnerTuple newInnerTuple; - XLogRecData rdata[5]; spgxlogAddNode xlrec; /* Should not be applied to nulls */ @@ -1499,25 +1486,18 @@ spgAddNodeAction(Relation index, SpGistState *state, newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN); /* Prepare WAL record */ - xlrec.node = index->rd_node; STORE_STATE(state, xlrec.stateSrc); - xlrec.blkno = current->blkno; xlrec.offnum = current->offnum; /* we don't fill these unless we need to change the parent downlink */ - xlrec.blknoParent = InvalidBlockNumber; + xlrec.parentBlk = -1; xlrec.offnumParent = InvalidOffsetNumber; xlrec.nodeI = 0; /* we don't fill these unless tuple has to be moved */ - xlrec.blknoNew = InvalidBlockNumber; xlrec.offnumNew = InvalidOffsetNumber; xlrec.newPage = false; - ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); - ACCEPT_RDATA_DATA(newInnerTuple, newInnerTuple->size, 1); - ACCEPT_RDATA_BUFFER(current->buffer, 2); - if (PageGetExactFreeSpace(current->page) >= newInnerTuple->size - innerTuple->size) { @@ -1539,7 +1519,13 @@ spgAddNodeAction(Relation index, SpGistState *state, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) newInnerTuple, newInnerTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE); PageSetLSN(current->page, recptr); } @@ -1565,7 +1551,6 @@ spgAddNodeAction(Relation index, SpGistState *state, saveCurrent = *current; - xlrec.blknoParent = parent->blkno; xlrec.offnumParent = parent->offnum; xlrec.nodeI = parent->node; @@ -1580,8 +1565,6 @@ spgAddNodeAction(Relation index, SpGistState *state, current->blkno = BufferGetBlockNumber(current->buffer); current->page = BufferGetPage(current->buffer); - xlrec.blknoNew = current->blkno; - /* * Let's just make real sure new current isn't same as old. Right now * that's impossible, but if SpGistGetBuffer ever got smart enough to @@ -1590,17 +1573,19 @@ spgAddNodeAction(Relation index, SpGistState *state, * replay would be subtly wrong, so I think a mere assert isn't enough * here. */ - if (xlrec.blknoNew == xlrec.blkno) + if (current->blkno == saveCurrent.blkno) elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer"); /* * New current and parent buffer will both be modified; but note that * parent buffer could be same as either new or old current. */ - ACCEPT_RDATA_BUFFER(current->buffer, 3); - if (parent->buffer != current->buffer && - parent->buffer != saveCurrent.buffer) - ACCEPT_RDATA_BUFFER(parent->buffer, 4); + if (parent->buffer == saveCurrent.buffer) + xlrec.parentBlk = 0; + else if (parent->buffer == current->buffer) + xlrec.parentBlk = 1; + else + xlrec.parentBlk = 2; START_CRIT_SECTION(); @@ -1647,7 +1632,20 @@ spgAddNodeAction(Relation index, SpGistState *state, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE, rdata); + XLogBeginInsert(); + + /* orig page */ + XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD); + /* new page */ + XLogRegisterBuffer(1, current->buffer, REGBUF_STANDARD); + /* parent page (if different from orig and new) */ + if (xlrec.parentBlk == 2) + XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD); + + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) newInnerTuple, newInnerTuple->size); + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE); /* we don't bother to check if any of these are redundant */ PageSetLSN(current->page, recptr); @@ -1682,7 +1680,6 @@ spgSplitNodeAction(Relation index, SpGistState *state, BlockNumber postfixBlkno; OffsetNumber postfixOffset; int i; - XLogRecData rdata[5]; spgxlogSplitTuple xlrec; Buffer newBuffer = InvalidBuffer; @@ -1725,14 +1722,8 @@ spgSplitNodeAction(Relation index, SpGistState *state, postfixTuple->allTheSame = innerTuple->allTheSame; /* prep data for WAL record */ - xlrec.node = index->rd_node; xlrec.newPage = false; - ACCEPT_RDATA_DATA(&xlrec, sizeof(xlrec), 0); - ACCEPT_RDATA_DATA(prefixTuple, prefixTuple->size, 1); - ACCEPT_RDATA_DATA(postfixTuple, postfixTuple->size, 2); - ACCEPT_RDATA_BUFFER(current->buffer, 3); - /* * If we can't fit both tuples on the current page, get a new page for the * postfix tuple. In particular, can't split to the root page. @@ -1752,7 +1743,6 @@ spgSplitNodeAction(Relation index, SpGistState *state, GBUF_INNER_PARITY(current->blkno + 1), postfixTuple->size + sizeof(ItemIdData), &xlrec.newPage); - ACCEPT_RDATA_BUFFER(newBuffer, 4); } START_CRIT_SECTION(); @@ -1767,27 +1757,28 @@ spgSplitNodeAction(Relation index, SpGistState *state, if (xlrec.offnumPrefix != current->offnum) elog(ERROR, "failed to add item of size %u to SPGiST index page", prefixTuple->size); - xlrec.blknoPrefix = current->blkno; /* * put postfix tuple into appropriate page */ if (newBuffer == InvalidBuffer) { - xlrec.blknoPostfix = postfixBlkno = current->blkno; + postfixBlkno = current->blkno; xlrec.offnumPostfix = postfixOffset = SpGistPageAddNewItem(state, current->page, (Item) postfixTuple, postfixTuple->size, NULL, false); + xlrec.postfixBlkSame = true; } else { - xlrec.blknoPostfix = postfixBlkno = BufferGetBlockNumber(newBuffer); + postfixBlkno = BufferGetBlockNumber(newBuffer); xlrec.offnumPostfix = postfixOffset = SpGistPageAddNewItem(state, BufferGetPage(newBuffer), (Item) postfixTuple, postfixTuple->size, NULL, false); MarkBufferDirty(newBuffer); + xlrec.postfixBlkSame = false; } /* @@ -1808,7 +1799,23 @@ spgSplitNodeAction(Relation index, SpGistState *state, { XLogRecPtr recptr; - recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE, rdata); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); + XLogRegisterData((char *) prefixTuple, prefixTuple->size); + XLogRegisterData((char *) postfixTuple, postfixTuple->size); + + XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD); + if (newBuffer != InvalidBuffer) + { + int flags; + + flags = REGBUF_STANDARD; + if (xlrec.newPage) + flags |= REGBUF_WILL_INIT; + XLogRegisterBuffer(1, newBuffer, flags); + } + + recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE); PageSetLSN(current->page, recptr); |