diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 263 |
1 files changed, 238 insertions, 25 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index b947c11f7d8..7bb4a874c47 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -1862,6 +1862,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; + Buffer vmbuffer = InvalidBuffer; bool all_visible_cleared = false; if (relation->rd_rel->relhasoids) @@ -1914,9 +1915,13 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, else heaptup = tup; - /* Find buffer to insert this tuple into */ + /* + * Find buffer to insert this tuple into. If the page is all visible, + * this will also pin the requisite visibility map page. + */ buffer = RelationGetBufferForTuple(relation, heaptup->t_len, - InvalidBuffer, options, bistate); + InvalidBuffer, options, bistate, + &vmbuffer); /* * We're about to do the actual insert -- check for conflict at the @@ -1934,6 +1939,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, { all_visible_cleared = true; PageClearAllVisible(BufferGetPage(buffer)); + visibilitymap_clear(relation, + ItemPointerGetBlockNumber(&(heaptup->t_self)), + vmbuffer); } /* @@ -2010,11 +2018,8 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid, END_CRIT_SECTION(); UnlockReleaseBuffer(buffer); - - /* Clear the bit in the visibility map if necessary */ - if (all_visible_cleared) - visibilitymap_clear(relation, - ItemPointerGetBlockNumber(&(heaptup->t_self))); + if (vmbuffer != InvalidBuffer) + ReleaseBuffer(vmbuffer); /* * If tuple is cachable, mark it for invalidation from the caches in case @@ -2089,17 +2094,43 @@ heap_delete(Relation relation, ItemPointer tid, ItemId lp; HeapTupleData tp; Page page; + BlockNumber block; Buffer buffer; + Buffer vmbuffer = InvalidBuffer; bool have_tuple_lock = false; bool iscombo; bool all_visible_cleared = false; Assert(ItemPointerIsValid(tid)); - buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); + block = ItemPointerGetBlockNumber(tid); + buffer = ReadBuffer(relation, block); + page = BufferGetPage(buffer); + + /* + * Before locking the buffer, pin the visibility map page if it appears + * to be necessary. Since we haven't got the lock yet, someone else might + * be in the middle of changing this, so we'll need to recheck after + * we have the lock. + */ + if (PageIsAllVisible(page)) + visibilitymap_pin(relation, block, &vmbuffer); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - page = BufferGetPage(buffer); + /* + * If we didn't pin the visibility map page and the page has become all + * visible while we were busy locking the buffer, we'll have to unlock and + * re-lock, to avoid holding the buffer lock across an I/O. That's a bit + * unfortunate, but hopefully shouldn't happen often. + */ + if (vmbuffer == InvalidBuffer && PageIsAllVisible(page)) + { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + visibilitymap_pin(relation, block, &vmbuffer); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + } + lp = PageGetItemId(page, ItemPointerGetOffsetNumber(tid)); Assert(ItemIdIsNormal(lp)); @@ -2222,6 +2253,8 @@ l1: UnlockReleaseBuffer(buffer); if (have_tuple_lock) UnlockTuple(relation, &(tp.t_self), ExclusiveLock); + if (vmbuffer != InvalidBuffer) + ReleaseBuffer(vmbuffer); return result; } @@ -2249,6 +2282,8 @@ l1: { all_visible_cleared = true; PageClearAllVisible(page); + visibilitymap_clear(relation, BufferGetBlockNumber(buffer), + vmbuffer); } /* store transaction information of xact deleting the tuple */ @@ -2296,6 +2331,9 @@ l1: LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + if (vmbuffer != InvalidBuffer) + ReleaseBuffer(vmbuffer); + /* * If the tuple has toasted out-of-line attributes, we need to delete * those items too. We have to do this before releasing the buffer @@ -2317,10 +2355,6 @@ l1: */ CacheInvalidateHeapTuple(relation, &tp); - /* Clear the bit in the visibility map if necessary */ - if (all_visible_cleared) - visibilitymap_clear(relation, BufferGetBlockNumber(buffer)); - /* Now we can release the buffer */ ReleaseBuffer(buffer); @@ -2419,8 +2453,11 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, HeapTupleData oldtup; HeapTuple heaptup; Page page; + BlockNumber block; Buffer buffer, - newbuf; + newbuf, + vmbuffer = InvalidBuffer, + vmbuffer_new = InvalidBuffer; bool need_toast, already_marked; Size newtupsize, @@ -2447,10 +2484,34 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, */ hot_attrs = RelationGetIndexAttrBitmap(relation); - buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid)); + block = ItemPointerGetBlockNumber(otid); + buffer = ReadBuffer(relation, block); + page = BufferGetPage(buffer); + + /* + * Before locking the buffer, pin the visibility map page if it appears + * to be necessary. Since we haven't got the lock yet, someone else might + * be in the middle of changing this, so we'll need to recheck after + * we have the lock. + */ + if (PageIsAllVisible(page)) + visibilitymap_pin(relation, block, &vmbuffer); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - page = BufferGetPage(buffer); + /* + * If we didn't pin the visibility map page and the page has become all + * visible while we were busy locking the buffer, we'll have to unlock and + * re-lock, to avoid holding the buffer lock across an I/O. That's a bit + * unfortunate, but hopefully shouldn't happen often. + */ + if (vmbuffer == InvalidBuffer && PageIsAllVisible(page)) + { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + visibilitymap_pin(relation, block, &vmbuffer); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + } + lp = PageGetItemId(page, ItemPointerGetOffsetNumber(otid)); Assert(ItemIdIsNormal(lp)); @@ -2580,6 +2641,8 @@ l2: UnlockReleaseBuffer(buffer); if (have_tuple_lock) UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock); + if (vmbuffer != InvalidBuffer) + ReleaseBuffer(vmbuffer); bms_free(hot_attrs); return result; } @@ -2700,7 +2763,8 @@ l2: { /* Assume there's no chance to put heaptup on same page. */ newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, - buffer, 0, NULL); + buffer, 0, NULL, + &vmbuffer_new); } else { @@ -2717,7 +2781,8 @@ l2: */ LockBuffer(buffer, BUFFER_LOCK_UNLOCK); newbuf = RelationGetBufferForTuple(relation, heaptup->t_len, - buffer, 0, NULL); + buffer, 0, NULL, + &vmbuffer_new); } else { @@ -2866,14 +2931,20 @@ l2: /* Clear bits in visibility map */ if (all_visible_cleared) - visibilitymap_clear(relation, BufferGetBlockNumber(buffer)); + visibilitymap_clear(relation, BufferGetBlockNumber(buffer), + vmbuffer); if (all_visible_cleared_new) - visibilitymap_clear(relation, BufferGetBlockNumber(newbuf)); + visibilitymap_clear(relation, BufferGetBlockNumber(newbuf), + vmbuffer_new); /* Now we can release the buffer(s) */ if (newbuf != buffer) ReleaseBuffer(newbuf); ReleaseBuffer(buffer); + if (BufferIsValid(vmbuffer_new)) + ReleaseBuffer(vmbuffer_new); + if (BufferIsValid(vmbuffer)) + ReleaseBuffer(vmbuffer); /* * If new tuple is cachable, mark it for invalidation from the caches in @@ -4036,6 +4107,38 @@ log_heap_freeze(Relation reln, Buffer buffer, } /* + * Perform XLogInsert for a heap-visible operation. 'block' is the block + * being marked all-visible, and vm_buffer is the buffer containing the + * corresponding visibility map block. Both should have already been modified + * and dirtied. + */ +XLogRecPtr +log_heap_visible(RelFileNode rnode, BlockNumber block, Buffer vm_buffer) +{ + xl_heap_visible xlrec; + XLogRecPtr recptr; + XLogRecData rdata[2]; + + xlrec.node = rnode; + xlrec.block = block; + + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfHeapVisible; + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); + + rdata[1].data = NULL; + rdata[1].len = 0; + rdata[1].buffer = vm_buffer; + rdata[1].buffer_std = false; + rdata[1].next = NULL; + + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_VISIBLE, rdata); + + return recptr; +} + +/* * Perform XLogInsert for a heap-update operation. Caller must already * have modified the buffer(s) and marked them dirty. */ @@ -4323,6 +4426,92 @@ heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record) UnlockReleaseBuffer(buffer); } +/* + * Replay XLOG_HEAP2_VISIBLE record. + * + * The critical integrity requirement here is that we must never end up with + * a situation where the visibility map bit is set, and the page-level + * PD_ALL_VISIBLE bit is clear. If that were to occur, then a subsequent + * page modification would fail to clear the visibility map bit. + */ +static void +heap_xlog_visible(XLogRecPtr lsn, XLogRecord *record) +{ + xl_heap_visible *xlrec = (xl_heap_visible *) XLogRecGetData(record); + Buffer buffer; + Page page; + + /* + * Read the heap page, if it still exists. If the heap file has been + * dropped or truncated later in recovery, this might fail. In that case, + * there's no point in doing anything further, since the visibility map + * will have to be cleared out at the same time. + */ + buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, + RBM_NORMAL); + if (!BufferIsValid(buffer)) + return; + page = (Page) BufferGetPage(buffer); + + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * We don't bump the LSN of the heap page when setting the visibility + * map bit, because that would generate an unworkable volume of + * full-page writes. This exposes us to torn page hazards, but since + * we're not inspecting the existing page contents in any way, we + * don't care. + * + * However, all operations that clear the visibility map bit *do* bump + * the LSN, and those operations will only be replayed if the XLOG LSN + * follows the page LSN. Thus, if the page LSN has advanced past our + * XLOG record's LSN, we mustn't mark the page all-visible, because + * the subsequent update won't be replayed to clear the flag. + */ + if (!XLByteLE(lsn, PageGetLSN(page))) + { + PageSetAllVisible(page); + MarkBufferDirty(buffer); + } + + /* Done with heap page. */ + UnlockReleaseBuffer(buffer); + + /* + * Even we skipped the heap page update due to the LSN interlock, it's + * still safe to update the visibility map. Any WAL record that clears + * the visibility map bit does so before checking the page LSN, so any + * bits that need to be cleared will still be cleared. + */ + if (record->xl_info & XLR_BKP_BLOCK_1) + RestoreBkpBlocks(lsn, record, false); + else + { + Relation reln; + Buffer vmbuffer = InvalidBuffer; + + reln = CreateFakeRelcacheEntry(xlrec->node); + visibilitymap_pin(reln, xlrec->block, &vmbuffer); + + /* + * Don't set the bit if replay has already passed this point. + * + * It might be safe to do this unconditionally; if replay has past + * this point, we'll replay at least as far this time as we did before, + * and if this bit needs to be cleared, the record responsible for + * doing so should be again replayed, and clear it. For right now, + * out of an abundance of conservatism, we use the same test here + * we did for the heap page; if this results in a dropped bit, no real + * harm is done; and the next VACUUM will fix it. + */ + if (!XLByteLE(lsn, PageGetLSN(BufferGetPage(vmbuffer)))) + visibilitymap_set(reln, xlrec->block, lsn, vmbuffer); + + ReleaseBuffer(vmbuffer); + FreeFakeRelcacheEntry(reln); + } +} + static void heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record) { @@ -4377,8 +4566,11 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) if (xlrec->all_visible_cleared) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + Buffer vmbuffer = InvalidBuffer; - visibilitymap_clear(reln, blkno); + visibilitymap_pin(reln, blkno, &vmbuffer); + visibilitymap_clear(reln, blkno, vmbuffer); + ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } @@ -4455,8 +4647,11 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) if (xlrec->all_visible_cleared) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + Buffer vmbuffer = InvalidBuffer; - visibilitymap_clear(reln, blkno); + visibilitymap_pin(reln, blkno, &vmbuffer); + visibilitymap_clear(reln, blkno, vmbuffer); + ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } @@ -4567,9 +4762,12 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) if (xlrec->all_visible_cleared) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + BlockNumber block = ItemPointerGetBlockNumber(&xlrec->target.tid); + Buffer vmbuffer = InvalidBuffer; - visibilitymap_clear(reln, - ItemPointerGetBlockNumber(&xlrec->target.tid)); + visibilitymap_pin(reln, block, &vmbuffer); + visibilitymap_clear(reln, block, vmbuffer); + ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } @@ -4648,8 +4846,12 @@ newt:; if (xlrec->new_all_visible_cleared) { Relation reln = CreateFakeRelcacheEntry(xlrec->target.node); + BlockNumber block = ItemPointerGetBlockNumber(&xlrec->newtid); + Buffer vmbuffer = InvalidBuffer; - visibilitymap_clear(reln, ItemPointerGetBlockNumber(&xlrec->newtid)); + visibilitymap_pin(reln, block, &vmbuffer); + visibilitymap_clear(reln, block, vmbuffer); + ReleaseBuffer(vmbuffer); FreeFakeRelcacheEntry(reln); } @@ -4915,6 +5117,9 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) case XLOG_HEAP2_CLEANUP_INFO: heap_xlog_cleanup_info(lsn, record); break; + case XLOG_HEAP2_VISIBLE: + heap_xlog_visible(lsn, record); + break; default: elog(PANIC, "heap2_redo: unknown op code %u", info); } @@ -5044,6 +5249,14 @@ heap2_desc(StringInfo buf, uint8 xl_info, char *rec) appendStringInfo(buf, "cleanup info: remxid %u", xlrec->latestRemovedXid); } + else if (info == XLOG_HEAP2_VISIBLE) + { + xl_heap_visible *xlrec = (xl_heap_visible *) rec; + + appendStringInfo(buf, "visible: rel %u/%u/%u; blk %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->block); + } else appendStringInfo(buf, "UNKNOWN"); } |