diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 594 |
1 files changed, 488 insertions, 106 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 605ed629426..ee604df2cae 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.187 2005/04/14 20:03:22 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.188 2005/04/28 21:47:10 tgl Exp $ * * * INTERFACE ROUTINES @@ -40,12 +40,14 @@ #include "access/heapam.h" #include "access/hio.h" +#include "access/multixact.h" #include "access/tuptoaster.h" #include "access/valid.h" #include "access/xlogutils.h" #include "catalog/catalog.h" #include "catalog/namespace.h" #include "miscadmin.h" +#include "storage/sinval.h" #include "utils/inval.h" #include "utils/relcache.h" #include "pgstat.h" @@ -1238,30 +1240,81 @@ l1: } else if (result == HeapTupleBeingUpdated && wait) { - TransactionId xwait = HeapTupleHeaderGetXmax(tp.t_data); - - /* sleep until concurrent transaction ends */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - XactLockTableWait(xwait); - - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - if (!TransactionIdDidCommit(xwait)) - goto l1; + TransactionId xwait; + uint16 infomask; /* - * xwait is committed but if xwait had just marked the tuple for - * update then some other xaction could update this tuple before - * we got to this point. + * Sleep until concurrent transaction ends. Note that we don't care + * if the locker has an exclusive or shared lock, because we need + * exclusive. */ - if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data), xwait)) - goto l1; - if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED)) + + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(tp.t_data); + infomask = tp.t_data->t_infomask; + + if (infomask & HEAP_XMAX_IS_MULTI) { - tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED; - SetBufferCommitInfoNeedsSave(buffer); + /* wait for multixact */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + MultiXactIdWait((MultiXactId) xwait); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * If xwait had just locked the tuple then some other xact could + * update this tuple before we get to this point. Check for xmax + * change, and start over if so. + */ + if (!(tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data), + xwait)) + goto l1; + + /* + * You might think the multixact is necessarily done here, but + * not so: it could have surviving members, namely our own xact + * or other subxacts of this backend. It is legal for us to + * delete the tuple in either case, however (the latter case is + * essentially a situation of upgrading our former shared lock + * to exclusive). We don't bother changing the on-disk hint bits + * since we are about to overwrite the xmax altogether. + */ } - /* if tuple was marked for update but not updated... */ - if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) + else + { + /* wait for regular transaction to end */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + XactLockTableWait(xwait); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * xwait is done, but if xwait had just locked the tuple then some + * other xact could update this tuple before we get to this point. + * Check for xmax change, and start over if so. + */ + if ((tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data), + xwait)) + goto l1; + + /* Otherwise we can mark it committed or aborted */ + if (!(tp.t_data->t_infomask & (HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID))) + { + if (TransactionIdDidCommit(xwait)) + tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED; + else + tp.t_data->t_infomask |= HEAP_XMAX_INVALID; + SetBufferCommitInfoNeedsSave(buffer); + } + } + + /* + * We may overwrite if previous xmax aborted, or if it committed + * but only locked the tuple without updating it. + */ + if (tp.t_data->t_infomask & (HEAP_XMAX_INVALID | + HEAP_IS_LOCKED)) result = HeapTupleMayBeUpdated; else result = HeapTupleUpdated; @@ -1290,7 +1343,8 @@ l1: /* store transaction information of xact deleting the tuple */ tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | HEAP_MOVED); HeapTupleHeaderSetXmax(tp.t_data, xid); HeapTupleHeaderSetCmax(tp.t_data, cid); @@ -1465,30 +1519,81 @@ l2: } else if (result == HeapTupleBeingUpdated && wait) { - TransactionId xwait = HeapTupleHeaderGetXmax(oldtup.t_data); - - /* sleep until concurrent transaction ends */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - XactLockTableWait(xwait); - - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - if (!TransactionIdDidCommit(xwait)) - goto l2; + TransactionId xwait; + uint16 infomask; /* - * xwait is committed but if xwait had just marked the tuple for - * update then some other xaction could update this tuple before - * we got to this point. + * Sleep until concurrent transaction ends. Note that we don't care + * if the locker has an exclusive or shared lock, because we need + * exclusive. */ - if (!TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data), xwait)) - goto l2; - if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED)) + + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(oldtup.t_data); + infomask = oldtup.t_data->t_infomask; + + if (infomask & HEAP_XMAX_IS_MULTI) { - oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED; - SetBufferCommitInfoNeedsSave(buffer); + /* wait for multixact */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + MultiXactIdWait((MultiXactId) xwait); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * If xwait had just locked the tuple then some other xact could + * update this tuple before we get to this point. Check for xmax + * change, and start over if so. + */ + if (!(oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data), + xwait)) + goto l2; + + /* + * You might think the multixact is necessarily done here, but + * not so: it could have surviving members, namely our own xact + * or other subxacts of this backend. It is legal for us to + * update the tuple in either case, however (the latter case is + * essentially a situation of upgrading our former shared lock + * to exclusive). We don't bother changing the on-disk hint bits + * since we are about to overwrite the xmax altogether. + */ } - /* if tuple was marked for update but not updated... */ - if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) + else + { + /* wait for regular transaction to end */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + XactLockTableWait(xwait); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * xwait is done, but if xwait had just locked the tuple then some + * other xact could update this tuple before we get to this point. + * Check for xmax change, and start over if so. + */ + if ((oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data), + xwait)) + goto l2; + + /* Otherwise we can mark it committed or aborted */ + if (!(oldtup.t_data->t_infomask & (HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID))) + { + if (TransactionIdDidCommit(xwait)) + oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED; + else + oldtup.t_data->t_infomask |= HEAP_XMAX_INVALID; + SetBufferCommitInfoNeedsSave(buffer); + } + } + + /* + * We may overwrite if previous xmax aborted, or if it committed + * but only locked the tuple without updating it. + */ + if (oldtup.t_data->t_infomask & (HEAP_XMAX_INVALID | + HEAP_IS_LOCKED)) result = HeapTupleMayBeUpdated; else result = HeapTupleUpdated; @@ -1556,7 +1661,8 @@ l2: { oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | HEAP_MOVED); HeapTupleHeaderSetXmax(oldtup.t_data, xid); HeapTupleHeaderSetCmax(oldtup.t_data, cid); @@ -1642,7 +1748,8 @@ l2: { oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | HEAP_MOVED); HeapTupleHeaderSetXmax(oldtup.t_data, xid); HeapTupleHeaderSetCmax(oldtup.t_data, cid); @@ -1739,17 +1846,18 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup) } /* - * heap_mark4update - mark a tuple for update + * heap_lock_tuple - lock a tuple in shared or exclusive mode */ HTSU_Result -heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer, - CommandId cid) +heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer, + CommandId cid, LockTupleMode mode) { - TransactionId xid = GetCurrentTransactionId(); + TransactionId xid; ItemPointer tid = &(tuple->t_self); ItemId lp; PageHeader dp; HTSU_Result result; + uint16 new_infomask; *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1767,38 +1875,93 @@ l3: { LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(*buffer); - elog(ERROR, "attempted to mark4update invisible tuple"); + elog(ERROR, "attempted to lock invisible tuple"); } else if (result == HeapTupleBeingUpdated) { - TransactionId xwait = HeapTupleHeaderGetXmax(tuple->t_data); + if (mode == LockTupleShared && + (tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK)) + result = HeapTupleMayBeUpdated; + else + { + TransactionId xwait; + uint16 infomask; - /* sleep until concurrent transaction ends */ - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); - XactLockTableWait(xwait); + /* + * Sleep until concurrent transaction ends. + */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - if (!TransactionIdDidCommit(xwait)) - goto l3; + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(tuple->t_data); + infomask = tuple->t_data->t_infomask; - /* - * xwait is committed but if xwait had just marked the tuple for - * update then some other xaction could update this tuple before - * we got to this point. - */ - if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), xwait)) - goto l3; - if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED)) - { - tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; - SetBufferCommitInfoNeedsSave(*buffer); + if (infomask & HEAP_XMAX_IS_MULTI) + { + /* wait for multixact */ + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + MultiXactIdWait((MultiXactId) xwait); + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * If xwait had just locked the tuple then some other xact + * could update this tuple before we get to this point. + * Check for xmax change, and start over if so. + */ + if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), + xwait)) + goto l3; + + /* + * You might think the multixact is necessarily done here, but + * not so: it could have surviving members, namely our own xact + * or other subxacts of this backend. It is legal for us to + * lock the tuple in either case, however. We don't bother + * changing the on-disk hint bits since we are about to + * overwrite the xmax altogether. + */ + } + else + { + /* wait for regular transaction to end */ + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + XactLockTableWait(xwait); + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * xwait is done, but if xwait had just locked the tuple then + * some other xact could update this tuple before we get to + * this point. Check for xmax change, and start over if so. + */ + if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), + xwait)) + goto l3; + + /* Otherwise we can mark it committed or aborted */ + if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID))) + { + if (TransactionIdDidCommit(xwait)) + tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; + else + tuple->t_data->t_infomask |= HEAP_XMAX_INVALID; + SetBufferCommitInfoNeedsSave(*buffer); + } + } + + /* + * We may lock if previous xmax aborted, or if it committed + * but only locked the tuple without updating it. + */ + if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID | + HEAP_IS_LOCKED)) + result = HeapTupleMayBeUpdated; + else + result = HeapTupleUpdated; } - /* if tuple was marked for update but not updated... */ - if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) - result = HeapTupleMayBeUpdated; - else - result = HeapTupleUpdated; } + if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); @@ -1808,21 +1971,173 @@ l3: } /* - * XLOG stuff: no logging is required as long as we have no - * savepoints. For savepoints private log could be used... + * Compute the new xmax and infomask to store into the tuple. Note we + * do not modify the tuple just yet, because that would leave it in the + * wrong state if multixact.c elogs. */ - PageSetTLI(BufferGetPage(*buffer), ThisTimeLineID); + xid = GetCurrentTransactionId(); + + new_infomask = tuple->t_data->t_infomask; + + new_infomask &= ~(HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | + HEAP_MOVED); + + if (mode == LockTupleShared) + { + TransactionId xmax = HeapTupleHeaderGetXmax(tuple->t_data); + uint16 old_infomask = tuple->t_data->t_infomask; + + /* + * If this is the first acquisition of a shared lock in the current + * transaction, set my per-backend OldestMemberMXactId setting. + * We can be certain that the transaction will never become a + * member of any older MultiXactIds than that. (We have to do this + * even if we end up just using our own TransactionId below, since + * some other backend could incorporate our XID into a MultiXact + * immediately afterwards.) + */ + MultiXactIdSetOldestMember(); + + new_infomask |= HEAP_XMAX_SHARED_LOCK; + + /* + * Check to see if we need a MultiXactId because there are multiple + * lockers. + * + * HeapTupleSatisfiesUpdate will have set the HEAP_XMAX_INVALID + * bit if the xmax was a MultiXactId but it was not running anymore. + * There is a race condition, which is that the MultiXactId may have + * finished since then, but that uncommon case is handled within + * MultiXactIdExpand. + * + * There is a similar race condition possible when the old xmax was + * a regular TransactionId. We test TransactionIdIsInProgress again + * just to narrow the window, but it's still possible to end up + * creating an unnecessary MultiXactId. Fortunately this is harmless. + */ + if (!(old_infomask & (HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED))) + { + if (old_infomask & HEAP_XMAX_IS_MULTI) + { + /* + * If the XMAX is already a MultiXactId, then we need to + * expand it to include our own TransactionId. + */ + xid = MultiXactIdExpand(xmax, true, xid); + new_infomask |= HEAP_XMAX_IS_MULTI; + } + else if (TransactionIdIsInProgress(xmax)) + { + if (TransactionIdEquals(xmax, xid)) + { + /* + * If the old locker is ourselves, we'll just mark the + * tuple again with our own TransactionId. However we + * have to consider the possibility that we had + * exclusive rather than shared lock before --- if so, + * be careful to preserve the exclusivity of the lock. + */ + if (!(old_infomask & HEAP_XMAX_SHARED_LOCK)) + { + new_infomask &= ~HEAP_XMAX_SHARED_LOCK; + new_infomask |= HEAP_XMAX_EXCL_LOCK; + mode = LockTupleExclusive; + } + } + else + { + /* + * If the Xmax is a valid TransactionId, then we need to + * create a new MultiXactId that includes both the old + * locker and our own TransactionId. + */ + xid = MultiXactIdExpand(xmax, false, xid); + new_infomask |= HEAP_XMAX_IS_MULTI; + } + } + else + { + /* + * Can get here iff HeapTupleSatisfiesUpdate saw the old + * xmax as running, but it finished before + * TransactionIdIsInProgress() got to run. Treat it like + * there's no locker in the tuple. + */ + } + } + else + { + /* + * There was no previous locker, so just insert our own + * TransactionId. + */ + } + } + else + { + /* We want an exclusive lock on the tuple */ + new_infomask |= HEAP_XMAX_EXCL_LOCK; + } + + START_CRIT_SECTION(); - /* store transaction information of xact marking the tuple */ - tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | - HEAP_XMAX_INVALID | - HEAP_MOVED); - tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE; + /* + * Store transaction information of xact locking the tuple. + * + * Note: our CID is meaningless if storing a MultiXactId, but no harm + * in storing it anyway. + */ + tuple->t_data->t_infomask = new_infomask; HeapTupleHeaderSetXmax(tuple->t_data, xid); HeapTupleHeaderSetCmax(tuple->t_data, cid); /* Make sure there is no forward chain link in t_ctid */ tuple->t_data->t_ctid = *tid; + /* + * XLOG stuff. You might think that we don't need an XLOG record because + * there is no state change worth restoring after a crash. You would be + * wrong however: we have just written either a TransactionId or a + * MultiXactId that may never have been seen on disk before, and we need + * to make sure that there are XLOG entries covering those ID numbers. + * Else the same IDs might be re-used after a crash, which would be + * disastrous if this page made it to disk before the crash. Essentially + * we have to enforce the WAL log-before-data rule even in this case. + */ + if (!relation->rd_istemp) + { + xl_heap_lock xlrec; + XLogRecPtr recptr; + XLogRecData rdata[2]; + + xlrec.target.node = relation->rd_node; + xlrec.target.tid = tuple->t_self; + xlrec.shared_lock = (mode == LockTupleShared); + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfHeapLock; + rdata[0].next = &(rdata[1]); + + rdata[1].buffer = *buffer; + rdata[1].data = NULL; + rdata[1].len = 0; + rdata[1].next = NULL; + + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata); + + PageSetLSN(dp, recptr); + PageSetTLI(dp, ThisTimeLineID); + } + else + { + /* No XLOG record, but still need to flag that XID exists on disk */ + MyXactMadeTempRelUpdate = true; + } + + END_CRIT_SECTION(); + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); WriteNoReleaseBuffer(*buffer); @@ -1832,17 +2147,6 @@ l3: /* ---------------- * heap_markpos - mark scan position - * - * Note: - * Should only one mark be maintained per scan at one time. - * Check if this can be done generally--say calls to get the - * next/previous tuple and NEVER pass struct scandesc to the - * user AM's. Now, the mark is sent to the executor for safekeeping. - * Probably can store this info into a GENERAL scan structure. - * - * May be best to change this call to store the marked position - * (up to 2?) in the scan structure itself. - * Fix to use the proper caching structure. * ---------------- */ void @@ -1858,19 +2162,6 @@ heap_markpos(HeapScanDesc scan) /* ---------------- * heap_restrpos - restore position to marked location - * - * Note: there are bad side effects here. If we were past the end - * of a relation when heapmarkpos is called, then if the relation is - * extended via insert, then the next call to heaprestrpos will set - * cause the added tuples to be visible when the scan continues. - * Problems also arise if the TID's are rearranged!!! - * - * XXX might be better to do direct access instead of - * using the generality of heapgettup(). - * - * XXX It is very possible that when a scan is restored, that a tuple - * XXX which previously qualified may fail for time range purposes, unless - * XXX some form of locking exists (ie., portals currently can act funny. * ---------------- */ void @@ -1996,8 +2287,7 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, { TransactionId xid[2]; /* xmax, xmin */ - if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE)) + if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID | HEAP_IS_LOCKED)) xid[0] = InvalidTransactionId; else xid[0] = HeapTupleHeaderGetXmax(newtup->t_data); @@ -2185,7 +2475,8 @@ heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) { htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | HEAP_MOVED); HeapTupleHeaderSetXmax(htup, record->xl_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId); @@ -2365,7 +2656,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) { htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | HEAP_MOVED); HeapTupleHeaderSetXmax(htup, record->xl_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId); @@ -2487,6 +2779,82 @@ newsame:; } +static void +heap_xlog_lock(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record); + Relation reln; + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + if (redo && (record->xl_info & XLR_BKP_BLOCK_1)) + return; + + reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); + + if (!RelationIsValid(reln)) + return; + + buffer = XLogReadBuffer(false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + elog(PANIC, "heap_lock_%sdo: no block", (redo) ? "re" : "un"); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(PANIC, "heap_lock_%sdo: uninitialized page", (redo) ? "re" : "un"); + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ + { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + } + else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied + * ?! */ + elog(PANIC, "heap_lock_undo: bad page LSN"); + + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp)) + elog(PANIC, "heap_lock_%sdo: invalid lp", (redo) ? "re" : "un"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + if (redo) + { + /* + * Presently, we don't bother to restore the locked state, but + * just set the XMAX_INVALID bit. + */ + htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | + HEAP_MOVED); + htup->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderSetXmax(htup, record->xl_xid); + HeapTupleHeaderSetCmax(htup, FirstCommandId); + /* Make sure there is no forward chain link in t_ctid */ + htup->t_ctid = xlrec->target.tid; + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + return; + } + + elog(PANIC, "heap_lock_undo: unimplemented"); +} + void heap_redo(XLogRecPtr lsn, XLogRecord *record) { @@ -2505,6 +2873,8 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record) heap_xlog_clean(true, lsn, record); else if (info == XLOG_HEAP_NEWPAGE) heap_xlog_newpage(true, lsn, record); + else if (info == XLOG_HEAP_LOCK) + heap_xlog_lock(true, lsn, record); else elog(PANIC, "heap_redo: unknown op code %u", info); } @@ -2527,6 +2897,8 @@ heap_undo(XLogRecPtr lsn, XLogRecord *record) heap_xlog_clean(false, lsn, record); else if (info == XLOG_HEAP_NEWPAGE) heap_xlog_newpage(false, lsn, record); + else if (info == XLOG_HEAP_LOCK) + heap_xlog_lock(false, lsn, record); else elog(PANIC, "heap_undo: unknown op code %u", info); } @@ -2589,6 +2961,16 @@ heap_desc(char *buf, uint8 xl_info, char *rec) xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode, xlrec->blkno); } + else if (info == XLOG_HEAP_LOCK) + { + xl_heap_lock *xlrec = (xl_heap_lock *) rec; + + if (xlrec->shared_lock) + strcat(buf, "shared_lock: "); + else + strcat(buf, "exclusive_lock: "); + out_target(buf, &(xlrec->target)); + } else strcat(buf, "UNKNOWN"); } |