diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2005-04-28 21:47:18 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2005-04-28 21:47:18 +0000 |
commit | bedb78d386a47fd66b6cda2040e0a5fb545ee371 (patch) | |
tree | 0db0af8556ff82d94423e8e21362900afb18b7b6 /src/backend/access/heap | |
parent | d902e7d63ba2dc9cf0a1b051b2911b96831ef227 (diff) | |
download | postgresql-bedb78d386a47fd66b6cda2040e0a5fb545ee371.tar.gz postgresql-bedb78d386a47fd66b6cda2040e0a5fb545ee371.zip |
Implement sharable row-level locks, and use them for foreign key references
to eliminate unnecessary deadlocks. This commit adds SELECT ... FOR SHARE
paralleling SELECT ... FOR UPDATE. The implementation uses a new SLRU
data structure (managed much like pg_subtrans) to represent multiple-
transaction-ID sets. When more than one transaction is holding a shared
lock on a particular row, we create a MultiXactId representing that set
of transactions and store its ID in the row's XMAX. This scheme allows
an effectively unlimited number of row locks, just as we did before,
while not costing any extra overhead except when a shared lock actually
has to be shared. Still TODO: use the regular lock manager to control
the grant order when multiple backends are waiting for a row lock.
Alvaro Herrera and Tom Lane.
Diffstat (limited to 'src/backend/access/heap')
-rw-r--r-- | src/backend/access/heap/heapam.c | 594 |
1 files changed, 488 insertions, 106 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 605ed629426..ee604df2cae 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.187 2005/04/14 20:03:22 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.188 2005/04/28 21:47:10 tgl Exp $ * * * INTERFACE ROUTINES @@ -40,12 +40,14 @@ #include "access/heapam.h" #include "access/hio.h" +#include "access/multixact.h" #include "access/tuptoaster.h" #include "access/valid.h" #include "access/xlogutils.h" #include "catalog/catalog.h" #include "catalog/namespace.h" #include "miscadmin.h" +#include "storage/sinval.h" #include "utils/inval.h" #include "utils/relcache.h" #include "pgstat.h" @@ -1238,30 +1240,81 @@ l1: } else if (result == HeapTupleBeingUpdated && wait) { - TransactionId xwait = HeapTupleHeaderGetXmax(tp.t_data); - - /* sleep until concurrent transaction ends */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - XactLockTableWait(xwait); - - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - if (!TransactionIdDidCommit(xwait)) - goto l1; + TransactionId xwait; + uint16 infomask; /* - * xwait is committed but if xwait had just marked the tuple for - * update then some other xaction could update this tuple before - * we got to this point. + * Sleep until concurrent transaction ends. Note that we don't care + * if the locker has an exclusive or shared lock, because we need + * exclusive. */ - if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data), xwait)) - goto l1; - if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED)) + + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(tp.t_data); + infomask = tp.t_data->t_infomask; + + if (infomask & HEAP_XMAX_IS_MULTI) { - tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED; - SetBufferCommitInfoNeedsSave(buffer); + /* wait for multixact */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + MultiXactIdWait((MultiXactId) xwait); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * If xwait had just locked the tuple then some other xact could + * update this tuple before we get to this point. Check for xmax + * change, and start over if so. + */ + if (!(tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data), + xwait)) + goto l1; + + /* + * You might think the multixact is necessarily done here, but + * not so: it could have surviving members, namely our own xact + * or other subxacts of this backend. It is legal for us to + * delete the tuple in either case, however (the latter case is + * essentially a situation of upgrading our former shared lock + * to exclusive). We don't bother changing the on-disk hint bits + * since we are about to overwrite the xmax altogether. + */ } - /* if tuple was marked for update but not updated... */ - if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) + else + { + /* wait for regular transaction to end */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + XactLockTableWait(xwait); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * xwait is done, but if xwait had just locked the tuple then some + * other xact could update this tuple before we get to this point. + * Check for xmax change, and start over if so. + */ + if ((tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data), + xwait)) + goto l1; + + /* Otherwise we can mark it committed or aborted */ + if (!(tp.t_data->t_infomask & (HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID))) + { + if (TransactionIdDidCommit(xwait)) + tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED; + else + tp.t_data->t_infomask |= HEAP_XMAX_INVALID; + SetBufferCommitInfoNeedsSave(buffer); + } + } + + /* + * We may overwrite if previous xmax aborted, or if it committed + * but only locked the tuple without updating it. + */ + if (tp.t_data->t_infomask & (HEAP_XMAX_INVALID | + HEAP_IS_LOCKED)) result = HeapTupleMayBeUpdated; else result = HeapTupleUpdated; @@ -1290,7 +1343,8 @@ l1: /* store transaction information of xact deleting the tuple */ tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | HEAP_MOVED); HeapTupleHeaderSetXmax(tp.t_data, xid); HeapTupleHeaderSetCmax(tp.t_data, cid); @@ -1465,30 +1519,81 @@ l2: } else if (result == HeapTupleBeingUpdated && wait) { - TransactionId xwait = HeapTupleHeaderGetXmax(oldtup.t_data); - - /* sleep until concurrent transaction ends */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - XactLockTableWait(xwait); - - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - if (!TransactionIdDidCommit(xwait)) - goto l2; + TransactionId xwait; + uint16 infomask; /* - * xwait is committed but if xwait had just marked the tuple for - * update then some other xaction could update this tuple before - * we got to this point. + * Sleep until concurrent transaction ends. Note that we don't care + * if the locker has an exclusive or shared lock, because we need + * exclusive. */ - if (!TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data), xwait)) - goto l2; - if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED)) + + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(oldtup.t_data); + infomask = oldtup.t_data->t_infomask; + + if (infomask & HEAP_XMAX_IS_MULTI) { - oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED; - SetBufferCommitInfoNeedsSave(buffer); + /* wait for multixact */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + MultiXactIdWait((MultiXactId) xwait); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * If xwait had just locked the tuple then some other xact could + * update this tuple before we get to this point. Check for xmax + * change, and start over if so. + */ + if (!(oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data), + xwait)) + goto l2; + + /* + * You might think the multixact is necessarily done here, but + * not so: it could have surviving members, namely our own xact + * or other subxacts of this backend. It is legal for us to + * update the tuple in either case, however (the latter case is + * essentially a situation of upgrading our former shared lock + * to exclusive). We don't bother changing the on-disk hint bits + * since we are about to overwrite the xmax altogether. + */ } - /* if tuple was marked for update but not updated... */ - if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) + else + { + /* wait for regular transaction to end */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + XactLockTableWait(xwait); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * xwait is done, but if xwait had just locked the tuple then some + * other xact could update this tuple before we get to this point. + * Check for xmax change, and start over if so. + */ + if ((oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data), + xwait)) + goto l2; + + /* Otherwise we can mark it committed or aborted */ + if (!(oldtup.t_data->t_infomask & (HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID))) + { + if (TransactionIdDidCommit(xwait)) + oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED; + else + oldtup.t_data->t_infomask |= HEAP_XMAX_INVALID; + SetBufferCommitInfoNeedsSave(buffer); + } + } + + /* + * We may overwrite if previous xmax aborted, or if it committed + * but only locked the tuple without updating it. + */ + if (oldtup.t_data->t_infomask & (HEAP_XMAX_INVALID | + HEAP_IS_LOCKED)) result = HeapTupleMayBeUpdated; else result = HeapTupleUpdated; @@ -1556,7 +1661,8 @@ l2: { oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | HEAP_MOVED); HeapTupleHeaderSetXmax(oldtup.t_data, xid); HeapTupleHeaderSetCmax(oldtup.t_data, cid); @@ -1642,7 +1748,8 @@ l2: { oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | HEAP_MOVED); HeapTupleHeaderSetXmax(oldtup.t_data, xid); HeapTupleHeaderSetCmax(oldtup.t_data, cid); @@ -1739,17 +1846,18 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup) } /* - * heap_mark4update - mark a tuple for update + * heap_lock_tuple - lock a tuple in shared or exclusive mode */ HTSU_Result -heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer, - CommandId cid) +heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer, + CommandId cid, LockTupleMode mode) { - TransactionId xid = GetCurrentTransactionId(); + TransactionId xid; ItemPointer tid = &(tuple->t_self); ItemId lp; PageHeader dp; HTSU_Result result; + uint16 new_infomask; *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1767,38 +1875,93 @@ l3: { LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(*buffer); - elog(ERROR, "attempted to mark4update invisible tuple"); + elog(ERROR, "attempted to lock invisible tuple"); } else if (result == HeapTupleBeingUpdated) { - TransactionId xwait = HeapTupleHeaderGetXmax(tuple->t_data); + if (mode == LockTupleShared && + (tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK)) + result = HeapTupleMayBeUpdated; + else + { + TransactionId xwait; + uint16 infomask; - /* sleep until concurrent transaction ends */ - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); - XactLockTableWait(xwait); + /* + * Sleep until concurrent transaction ends. + */ - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - if (!TransactionIdDidCommit(xwait)) - goto l3; + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(tuple->t_data); + infomask = tuple->t_data->t_infomask; - /* - * xwait is committed but if xwait had just marked the tuple for - * update then some other xaction could update this tuple before - * we got to this point. - */ - if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), xwait)) - goto l3; - if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED)) - { - tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; - SetBufferCommitInfoNeedsSave(*buffer); + if (infomask & HEAP_XMAX_IS_MULTI) + { + /* wait for multixact */ + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + MultiXactIdWait((MultiXactId) xwait); + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * If xwait had just locked the tuple then some other xact + * could update this tuple before we get to this point. + * Check for xmax change, and start over if so. + */ + if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), + xwait)) + goto l3; + + /* + * You might think the multixact is necessarily done here, but + * not so: it could have surviving members, namely our own xact + * or other subxacts of this backend. It is legal for us to + * lock the tuple in either case, however. We don't bother + * changing the on-disk hint bits since we are about to + * overwrite the xmax altogether. + */ + } + else + { + /* wait for regular transaction to end */ + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + XactLockTableWait(xwait); + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + + /* + * xwait is done, but if xwait had just locked the tuple then + * some other xact could update this tuple before we get to + * this point. Check for xmax change, and start over if so. + */ + if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), + xwait)) + goto l3; + + /* Otherwise we can mark it committed or aborted */ + if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID))) + { + if (TransactionIdDidCommit(xwait)) + tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; + else + tuple->t_data->t_infomask |= HEAP_XMAX_INVALID; + SetBufferCommitInfoNeedsSave(*buffer); + } + } + + /* + * We may lock if previous xmax aborted, or if it committed + * but only locked the tuple without updating it. + */ + if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID | + HEAP_IS_LOCKED)) + result = HeapTupleMayBeUpdated; + else + result = HeapTupleUpdated; } - /* if tuple was marked for update but not updated... */ - if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) - result = HeapTupleMayBeUpdated; - else - result = HeapTupleUpdated; } + if (result != HeapTupleMayBeUpdated) { Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); @@ -1808,21 +1971,173 @@ l3: } /* - * XLOG stuff: no logging is required as long as we have no - * savepoints. For savepoints private log could be used... + * Compute the new xmax and infomask to store into the tuple. Note we + * do not modify the tuple just yet, because that would leave it in the + * wrong state if multixact.c elogs. */ - PageSetTLI(BufferGetPage(*buffer), ThisTimeLineID); + xid = GetCurrentTransactionId(); + + new_infomask = tuple->t_data->t_infomask; + + new_infomask &= ~(HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | + HEAP_MOVED); + + if (mode == LockTupleShared) + { + TransactionId xmax = HeapTupleHeaderGetXmax(tuple->t_data); + uint16 old_infomask = tuple->t_data->t_infomask; + + /* + * If this is the first acquisition of a shared lock in the current + * transaction, set my per-backend OldestMemberMXactId setting. + * We can be certain that the transaction will never become a + * member of any older MultiXactIds than that. (We have to do this + * even if we end up just using our own TransactionId below, since + * some other backend could incorporate our XID into a MultiXact + * immediately afterwards.) + */ + MultiXactIdSetOldestMember(); + + new_infomask |= HEAP_XMAX_SHARED_LOCK; + + /* + * Check to see if we need a MultiXactId because there are multiple + * lockers. + * + * HeapTupleSatisfiesUpdate will have set the HEAP_XMAX_INVALID + * bit if the xmax was a MultiXactId but it was not running anymore. + * There is a race condition, which is that the MultiXactId may have + * finished since then, but that uncommon case is handled within + * MultiXactIdExpand. + * + * There is a similar race condition possible when the old xmax was + * a regular TransactionId. We test TransactionIdIsInProgress again + * just to narrow the window, but it's still possible to end up + * creating an unnecessary MultiXactId. Fortunately this is harmless. + */ + if (!(old_infomask & (HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED))) + { + if (old_infomask & HEAP_XMAX_IS_MULTI) + { + /* + * If the XMAX is already a MultiXactId, then we need to + * expand it to include our own TransactionId. + */ + xid = MultiXactIdExpand(xmax, true, xid); + new_infomask |= HEAP_XMAX_IS_MULTI; + } + else if (TransactionIdIsInProgress(xmax)) + { + if (TransactionIdEquals(xmax, xid)) + { + /* + * If the old locker is ourselves, we'll just mark the + * tuple again with our own TransactionId. However we + * have to consider the possibility that we had + * exclusive rather than shared lock before --- if so, + * be careful to preserve the exclusivity of the lock. + */ + if (!(old_infomask & HEAP_XMAX_SHARED_LOCK)) + { + new_infomask &= ~HEAP_XMAX_SHARED_LOCK; + new_infomask |= HEAP_XMAX_EXCL_LOCK; + mode = LockTupleExclusive; + } + } + else + { + /* + * If the Xmax is a valid TransactionId, then we need to + * create a new MultiXactId that includes both the old + * locker and our own TransactionId. + */ + xid = MultiXactIdExpand(xmax, false, xid); + new_infomask |= HEAP_XMAX_IS_MULTI; + } + } + else + { + /* + * Can get here iff HeapTupleSatisfiesUpdate saw the old + * xmax as running, but it finished before + * TransactionIdIsInProgress() got to run. Treat it like + * there's no locker in the tuple. + */ + } + } + else + { + /* + * There was no previous locker, so just insert our own + * TransactionId. + */ + } + } + else + { + /* We want an exclusive lock on the tuple */ + new_infomask |= HEAP_XMAX_EXCL_LOCK; + } + + START_CRIT_SECTION(); - /* store transaction information of xact marking the tuple */ - tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | - HEAP_XMAX_INVALID | - HEAP_MOVED); - tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE; + /* + * Store transaction information of xact locking the tuple. + * + * Note: our CID is meaningless if storing a MultiXactId, but no harm + * in storing it anyway. + */ + tuple->t_data->t_infomask = new_infomask; HeapTupleHeaderSetXmax(tuple->t_data, xid); HeapTupleHeaderSetCmax(tuple->t_data, cid); /* Make sure there is no forward chain link in t_ctid */ tuple->t_data->t_ctid = *tid; + /* + * XLOG stuff. You might think that we don't need an XLOG record because + * there is no state change worth restoring after a crash. You would be + * wrong however: we have just written either a TransactionId or a + * MultiXactId that may never have been seen on disk before, and we need + * to make sure that there are XLOG entries covering those ID numbers. + * Else the same IDs might be re-used after a crash, which would be + * disastrous if this page made it to disk before the crash. Essentially + * we have to enforce the WAL log-before-data rule even in this case. + */ + if (!relation->rd_istemp) + { + xl_heap_lock xlrec; + XLogRecPtr recptr; + XLogRecData rdata[2]; + + xlrec.target.node = relation->rd_node; + xlrec.target.tid = tuple->t_self; + xlrec.shared_lock = (mode == LockTupleShared); + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &xlrec; + rdata[0].len = SizeOfHeapLock; + rdata[0].next = &(rdata[1]); + + rdata[1].buffer = *buffer; + rdata[1].data = NULL; + rdata[1].len = 0; + rdata[1].next = NULL; + + recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata); + + PageSetLSN(dp, recptr); + PageSetTLI(dp, ThisTimeLineID); + } + else + { + /* No XLOG record, but still need to flag that XID exists on disk */ + MyXactMadeTempRelUpdate = true; + } + + END_CRIT_SECTION(); + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); WriteNoReleaseBuffer(*buffer); @@ -1832,17 +2147,6 @@ l3: /* ---------------- * heap_markpos - mark scan position - * - * Note: - * Should only one mark be maintained per scan at one time. - * Check if this can be done generally--say calls to get the - * next/previous tuple and NEVER pass struct scandesc to the - * user AM's. Now, the mark is sent to the executor for safekeeping. - * Probably can store this info into a GENERAL scan structure. - * - * May be best to change this call to store the marked position - * (up to 2?) in the scan structure itself. - * Fix to use the proper caching structure. * ---------------- */ void @@ -1858,19 +2162,6 @@ heap_markpos(HeapScanDesc scan) /* ---------------- * heap_restrpos - restore position to marked location - * - * Note: there are bad side effects here. If we were past the end - * of a relation when heapmarkpos is called, then if the relation is - * extended via insert, then the next call to heaprestrpos will set - * cause the added tuples to be visible when the scan continues. - * Problems also arise if the TID's are rearranged!!! - * - * XXX might be better to do direct access instead of - * using the generality of heapgettup(). - * - * XXX It is very possible that when a scan is restored, that a tuple - * XXX which previously qualified may fail for time range purposes, unless - * XXX some form of locking exists (ie., portals currently can act funny. * ---------------- */ void @@ -1996,8 +2287,7 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from, { TransactionId xid[2]; /* xmax, xmin */ - if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE)) + if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID | HEAP_IS_LOCKED)) xid[0] = InvalidTransactionId; else xid[0] = HeapTupleHeaderGetXmax(newtup->t_data); @@ -2185,7 +2475,8 @@ heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) { htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | HEAP_MOVED); HeapTupleHeaderSetXmax(htup, record->xl_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId); @@ -2365,7 +2656,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move) { htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID | - HEAP_MARKED_FOR_UPDATE | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | HEAP_MOVED); HeapTupleHeaderSetXmax(htup, record->xl_xid); HeapTupleHeaderSetCmax(htup, FirstCommandId); @@ -2487,6 +2779,82 @@ newsame:; } +static void +heap_xlog_lock(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record); + Relation reln; + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp = NULL; + HeapTupleHeader htup; + + if (redo && (record->xl_info & XLR_BKP_BLOCK_1)) + return; + + reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); + + if (!RelationIsValid(reln)) + return; + + buffer = XLogReadBuffer(false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + elog(PANIC, "heap_lock_%sdo: no block", (redo) ? "re" : "un"); + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + elog(PANIC, "heap_lock_%sdo: uninitialized page", (redo) ? "re" : "un"); + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ + { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; + } + } + else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied + * ?! */ + elog(PANIC, "heap_lock_undo: bad page LSN"); + + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + if (PageGetMaxOffsetNumber(page) >= offnum) + lp = PageGetItemId(page, offnum); + + if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp)) + elog(PANIC, "heap_lock_%sdo: invalid lp", (redo) ? "re" : "un"); + + htup = (HeapTupleHeader) PageGetItem(page, lp); + + if (redo) + { + /* + * Presently, we don't bother to restore the locked state, but + * just set the XMAX_INVALID bit. + */ + htup->t_infomask &= ~(HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID | + HEAP_XMAX_IS_MULTI | + HEAP_IS_LOCKED | + HEAP_MOVED); + htup->t_infomask |= HEAP_XMAX_INVALID; + HeapTupleHeaderSetXmax(htup, record->xl_xid); + HeapTupleHeaderSetCmax(htup, FirstCommandId); + /* Make sure there is no forward chain link in t_ctid */ + htup->t_ctid = xlrec->target.tid; + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + WriteBuffer(buffer); + return; + } + + elog(PANIC, "heap_lock_undo: unimplemented"); +} + void heap_redo(XLogRecPtr lsn, XLogRecord *record) { @@ -2505,6 +2873,8 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record) heap_xlog_clean(true, lsn, record); else if (info == XLOG_HEAP_NEWPAGE) heap_xlog_newpage(true, lsn, record); + else if (info == XLOG_HEAP_LOCK) + heap_xlog_lock(true, lsn, record); else elog(PANIC, "heap_redo: unknown op code %u", info); } @@ -2527,6 +2897,8 @@ heap_undo(XLogRecPtr lsn, XLogRecord *record) heap_xlog_clean(false, lsn, record); else if (info == XLOG_HEAP_NEWPAGE) heap_xlog_newpage(false, lsn, record); + else if (info == XLOG_HEAP_LOCK) + heap_xlog_lock(false, lsn, record); else elog(PANIC, "heap_undo: unknown op code %u", info); } @@ -2589,6 +2961,16 @@ heap_desc(char *buf, uint8 xl_info, char *rec) xlrec->node.spcNode, xlrec->node.dbNode, xlrec->node.relNode, xlrec->blkno); } + else if (info == XLOG_HEAP_LOCK) + { + xl_heap_lock *xlrec = (xl_heap_lock *) rec; + + if (xlrec->shared_lock) + strcat(buf, "shared_lock: "); + else + strcat(buf, "exclusive_lock: "); + out_target(buf, &(xlrec->target)); + } else strcat(buf, "UNKNOWN"); } |