diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 297 |
1 files changed, 296 insertions, 1 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 57acaf2bb8c..12775cc2db7 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.220 2006/10/04 00:29:48 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.221 2006/11/05 22:42:07 tgl Exp $ * * * INTERFACE ROUTINES @@ -2809,6 +2809,166 @@ heap_inplace_update(Relation relation, HeapTuple tuple) } +/* + * heap_freeze_tuple + * + * Check to see whether any of the XID fields of a tuple (xmin, xmax, xvac) + * are older than the specified cutoff XID. If so, replace them with + * FrozenTransactionId or InvalidTransactionId as appropriate, and return + * TRUE. Return FALSE if nothing was changed. + * + * It is assumed that the caller has checked the tuple with + * HeapTupleSatisfiesVacuum() and determined that it is not HEAPTUPLE_DEAD + * (else we should be removing the tuple, not freezing it). + * + * NB: cutoff_xid *must* be <= the current global xmin, to ensure that any + * XID older than it could neither be running nor seen as running by any + * open transaction. This ensures that the replacement will not change + * anyone's idea of the tuple state. Also, since we assume the tuple is + * not HEAPTUPLE_DEAD, the fact that an XID is not still running allows us + * to assume that it is either committed good or aborted, as appropriate; + * so we need no external state checks to decide what to do. (This is good + * because this function is applied during WAL recovery, when we don't have + * access to any such state, and can't depend on the hint bits to be set.) + * + * In lazy VACUUM, we call this while initially holding only a shared lock + * on the tuple's buffer. If any change is needed, we trade that in for an + * exclusive lock before making the change. Caller should pass the buffer ID + * if shared lock is held, InvalidBuffer if exclusive lock is already held. + * + * Note: it might seem we could make the changes without exclusive lock, since + * TransactionId read/write is assumed atomic anyway. However there is a race + * condition: someone who just fetched an old XID that we overwrite here could + * conceivably not finish checking the XID against pg_clog before we finish + * the VACUUM and perhaps truncate off the part of pg_clog he needs. Getting + * exclusive lock ensures no other backend is in process of checking the + * tuple status. Also, getting exclusive lock makes it safe to adjust the + * infomask bits. + */ +bool +heap_freeze_tuple(HeapTupleHeader tuple, TransactionId cutoff_xid, + Buffer buf) +{ + bool changed = false; + TransactionId xid; + + xid = HeapTupleHeaderGetXmin(tuple); + if (TransactionIdIsNormal(xid) && + TransactionIdPrecedes(xid, cutoff_xid)) + { + if (buf != InvalidBuffer) + { + /* trade in share lock for exclusive lock */ + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + buf = InvalidBuffer; + } + HeapTupleHeaderSetXmin(tuple, FrozenTransactionId); + /* + * Might as well fix the hint bits too; usually XMIN_COMMITTED will + * already be set here, but there's a small chance not. + */ + Assert(!(tuple->t_infomask & HEAP_XMIN_INVALID)); + tuple->t_infomask |= HEAP_XMIN_COMMITTED; + changed = true; + } + + /* + * When we release shared lock, it's possible for someone else to change + * xmax before we get the lock back, so repeat the check after acquiring + * exclusive lock. (We don't need this pushup for xmin, because only + * VACUUM could be interested in changing an existing tuple's xmin, + * and there's only one VACUUM allowed on a table at a time.) + */ +recheck_xmax: + if (!(tuple->t_infomask & HEAP_XMAX_IS_MULTI)) + { + xid = HeapTupleHeaderGetXmax(tuple); + if (TransactionIdIsNormal(xid) && + TransactionIdPrecedes(xid, cutoff_xid)) + { + if (buf != InvalidBuffer) + { + /* trade in share lock for exclusive lock */ + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + buf = InvalidBuffer; + goto recheck_xmax; /* see comment above */ + } + HeapTupleHeaderSetXmax(tuple, InvalidTransactionId); + /* + * The tuple might be marked either XMAX_INVALID or + * XMAX_COMMITTED + LOCKED. Normalize to INVALID just to be + * sure no one gets confused. + */ + tuple->t_infomask &= ~HEAP_XMAX_COMMITTED; + tuple->t_infomask |= HEAP_XMAX_INVALID; + changed = true; + } + } + else + { + /*---------- + * XXX perhaps someday we should zero out very old MultiXactIds here? + * + * The only way a stale MultiXactId could pose a problem is if a + * tuple, having once been multiply-share-locked, is not touched by + * any vacuum or attempted lock or deletion for just over 4G MultiXact + * creations, and then in the probably-narrow window where its xmax + * is again a live MultiXactId, someone tries to lock or delete it. + * Even then, another share-lock attempt would work fine. An + * exclusive-lock or delete attempt would face unexpected delay, or + * in the very worst case get a deadlock error. This seems an + * extremely low-probability scenario with minimal downside even if + * it does happen, so for now we don't do the extra bookkeeping that + * would be needed to clean out MultiXactIds. + *---------- + */ + } + + /* + * Although xvac per se could only be set by VACUUM, it shares physical + * storage space with cmax, and so could be wiped out by someone setting + * xmax. Hence recheck after changing lock, same as for xmax itself. + */ +recheck_xvac: + if (tuple->t_infomask & HEAP_MOVED) + { + xid = HeapTupleHeaderGetXvac(tuple); + if (TransactionIdIsNormal(xid) && + TransactionIdPrecedes(xid, cutoff_xid)) + { + if (buf != InvalidBuffer) + { + /* trade in share lock for exclusive lock */ + LockBuffer(buf, BUFFER_LOCK_UNLOCK); + LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); + buf = InvalidBuffer; + goto recheck_xvac; /* see comment above */ + } + /* + * If a MOVED_OFF tuple is not dead, the xvac transaction must + * have failed; whereas a non-dead MOVED_IN tuple must mean the + * xvac transaction succeeded. + */ + if (tuple->t_infomask & HEAP_MOVED_OFF) + HeapTupleHeaderSetXvac(tuple, InvalidTransactionId); + else + HeapTupleHeaderSetXvac(tuple, FrozenTransactionId); + /* + * Might as well fix the hint bits too; usually XMIN_COMMITTED will + * already be set here, but there's a small chance not. + */ + Assert(!(tuple->t_infomask & HEAP_XMIN_INVALID)); + tuple->t_infomask |= HEAP_XMIN_COMMITTED; + changed = true; + } + } + + return changed; +} + + /* ---------------- * heap_markpos - mark scan position * ---------------- @@ -2877,6 +3037,9 @@ heap_restrpos(HeapScanDesc scan) /* * Perform XLogInsert for a heap-clean operation. Caller must already * have modified the buffer and marked it dirty. + * + * Note: for historical reasons, the entries in the unused[] array should + * be zero-based tuple indexes, not one-based. */ XLogRecPtr log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt) @@ -2921,6 +3084,57 @@ log_heap_clean(Relation reln, Buffer buffer, OffsetNumber *unused, int uncnt) } /* + * Perform XLogInsert for a heap-freeze operation. Caller must already + * have modified the buffer and marked it dirty. + * + * Unlike log_heap_clean(), the offsets[] entries are one-based. + */ +XLogRecPtr +log_heap_freeze(Relation reln, Buffer buffer, + TransactionId cutoff_xid, + OffsetNumber *offsets, int offcnt) +{ + xl_heap_freeze xlrec; + XLogRecPtr recptr; + XLogRecData rdata[2]; + + /* Caller should not call me on a temp relation */ + Assert(!reln->rd_istemp); + + xlrec.node = reln->rd_node; + xlrec.block = BufferGetBlockNumber(buffer); + xlrec.cutoff_xid = cutoff_xid; + + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfHeapFreeze; + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); + + /* + * The tuple-offsets array is not actually in the buffer, but pretend + * that it is. When XLogInsert stores the whole buffer, the offsets array + * need not be stored too. + */ + if (offcnt > 0) + { + rdata[1].data = (char *) offsets; + rdata[1].len = offcnt * sizeof(OffsetNumber); + } + else + { + rdata[1].data = NULL; + rdata[1].len = 0; + } + rdata[1].buffer = buffer; + rdata[1].buffer_std = true; + rdata[1].next = NULL; + + recptr = XLogInsert(RM_HEAP2_ID, XLOG_HEAP2_FREEZE, rdata); + + return recptr; +} + +/* * Perform XLogInsert for a heap-update operation. Caller must already * have modified the buffer(s) and marked them dirty. */ @@ -3057,6 +3271,7 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) while (unused < unend) { + /* unused[] entries are zero-based */ lp = PageGetItemId(page, *unused + 1); lp->lp_flags &= ~LP_USED; unused++; @@ -3072,6 +3287,55 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) } static void +heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record) +{ + xl_heap_freeze *xlrec = (xl_heap_freeze *) XLogRecGetData(record); + TransactionId cutoff_xid = xlrec->cutoff_xid; + Relation reln; + Buffer buffer; + Page page; + + if (record->xl_info & XLR_BKP_BLOCK_1) + return; + + reln = XLogOpenRelation(xlrec->node); + buffer = XLogReadBuffer(reln, xlrec->block, false); + if (!BufferIsValid(buffer)) + return; + page = (Page) BufferGetPage(buffer); + + if (XLByteLE(lsn, PageGetLSN(page))) + { + UnlockReleaseBuffer(buffer); + return; + } + + if (record->xl_len > SizeOfHeapFreeze) + { + OffsetNumber *offsets; + OffsetNumber *offsets_end; + + offsets = (OffsetNumber *) ((char *) xlrec + SizeOfHeapFreeze); + offsets_end = (OffsetNumber *) ((char *) xlrec + record->xl_len); + + while (offsets < offsets_end) + { + /* offsets[] entries are one-based */ + ItemId lp = PageGetItemId(page, *offsets); + HeapTupleHeader tuple = (HeapTupleHeader) PageGetItem(page, lp); + + (void) heap_freeze_tuple(tuple, cutoff_xid, InvalidBuffer); + offsets++; + } + } + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); + UnlockReleaseBuffer(buffer); +} + +static void heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record) { xl_heap_newpage *xlrec = (xl_heap_newpage *) XLogRecGetData(record); @@ -3546,6 +3810,18 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record) elog(PANIC, "heap_redo: unknown op code %u", info); } +void +heap2_redo(XLogRecPtr lsn, XLogRecord *record) +{ + uint8 info = record->xl_info & ~XLR_INFO_MASK; + + info &= XLOG_HEAP_OPMASK; + if (info == XLOG_HEAP2_FREEZE) + heap_xlog_freeze(lsn, record); + else + elog(PANIC, "heap2_redo: unknown op code %u", info); +} + static void out_target(StringInfo buf, xl_heaptid *target) { @@ -3645,3 +3921,22 @@ heap_desc(StringInfo buf, uint8 xl_info, char *rec) else appendStringInfo(buf, "UNKNOWN"); } + +void +heap2_desc(StringInfo buf, uint8 xl_info, char *rec) +{ + uint8 info = xl_info & ~XLR_INFO_MASK; + + info &= XLOG_HEAP_OPMASK; + if (info == XLOG_HEAP2_FREEZE) + { + xl_heap_freeze *xlrec = (xl_heap_freeze *) rec; + + appendStringInfo(buf, "freeze: rel %u/%u/%u; blk %u; cutoff %u", + xlrec->node.spcNode, xlrec->node.dbNode, + xlrec->node.relNode, xlrec->block, + xlrec->cutoff_xid); + } + else + appendStringInfo(buf, "UNKNOWN"); +} |