aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c594
1 files changed, 488 insertions, 106 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 605ed629426..ee604df2cae 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.187 2005/04/14 20:03:22 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.188 2005/04/28 21:47:10 tgl Exp $
*
*
* INTERFACE ROUTINES
@@ -40,12 +40,14 @@
#include "access/heapam.h"
#include "access/hio.h"
+#include "access/multixact.h"
#include "access/tuptoaster.h"
#include "access/valid.h"
#include "access/xlogutils.h"
#include "catalog/catalog.h"
#include "catalog/namespace.h"
#include "miscadmin.h"
+#include "storage/sinval.h"
#include "utils/inval.h"
#include "utils/relcache.h"
#include "pgstat.h"
@@ -1238,30 +1240,81 @@ l1:
}
else if (result == HeapTupleBeingUpdated && wait)
{
- TransactionId xwait = HeapTupleHeaderGetXmax(tp.t_data);
-
- /* sleep until concurrent transaction ends */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- XactLockTableWait(xwait);
-
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- if (!TransactionIdDidCommit(xwait))
- goto l1;
+ TransactionId xwait;
+ uint16 infomask;
/*
- * xwait is committed but if xwait had just marked the tuple for
- * update then some other xaction could update this tuple before
- * we got to this point.
+ * Sleep until concurrent transaction ends. Note that we don't care
+ * if the locker has an exclusive or shared lock, because we need
+ * exclusive.
*/
- if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data), xwait))
- goto l1;
- if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED))
+
+ /* must copy state data before unlocking buffer */
+ xwait = HeapTupleHeaderGetXmax(tp.t_data);
+ infomask = tp.t_data->t_infomask;
+
+ if (infomask & HEAP_XMAX_IS_MULTI)
{
- tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- SetBufferCommitInfoNeedsSave(buffer);
+ /* wait for multixact */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ MultiXactIdWait((MultiXactId) xwait);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * If xwait had just locked the tuple then some other xact could
+ * update this tuple before we get to this point. Check for xmax
+ * change, and start over if so.
+ */
+ if (!(tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data),
+ xwait))
+ goto l1;
+
+ /*
+ * You might think the multixact is necessarily done here, but
+ * not so: it could have surviving members, namely our own xact
+ * or other subxacts of this backend. It is legal for us to
+ * delete the tuple in either case, however (the latter case is
+ * essentially a situation of upgrading our former shared lock
+ * to exclusive). We don't bother changing the on-disk hint bits
+ * since we are about to overwrite the xmax altogether.
+ */
}
- /* if tuple was marked for update but not updated... */
- if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
+ else
+ {
+ /* wait for regular transaction to end */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ XactLockTableWait(xwait);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * xwait is done, but if xwait had just locked the tuple then some
+ * other xact could update this tuple before we get to this point.
+ * Check for xmax change, and start over if so.
+ */
+ if ((tp.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(tp.t_data),
+ xwait))
+ goto l1;
+
+ /* Otherwise we can mark it committed or aborted */
+ if (!(tp.t_data->t_infomask & (HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID)))
+ {
+ if (TransactionIdDidCommit(xwait))
+ tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
+ else
+ tp.t_data->t_infomask |= HEAP_XMAX_INVALID;
+ SetBufferCommitInfoNeedsSave(buffer);
+ }
+ }
+
+ /*
+ * We may overwrite if previous xmax aborted, or if it committed
+ * but only locked the tuple without updating it.
+ */
+ if (tp.t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_IS_LOCKED))
result = HeapTupleMayBeUpdated;
else
result = HeapTupleUpdated;
@@ -1290,7 +1343,8 @@ l1:
/* store transaction information of xact deleting the tuple */
tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
HEAP_MOVED);
HeapTupleHeaderSetXmax(tp.t_data, xid);
HeapTupleHeaderSetCmax(tp.t_data, cid);
@@ -1465,30 +1519,81 @@ l2:
}
else if (result == HeapTupleBeingUpdated && wait)
{
- TransactionId xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
-
- /* sleep until concurrent transaction ends */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- XactLockTableWait(xwait);
-
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- if (!TransactionIdDidCommit(xwait))
- goto l2;
+ TransactionId xwait;
+ uint16 infomask;
/*
- * xwait is committed but if xwait had just marked the tuple for
- * update then some other xaction could update this tuple before
- * we got to this point.
+ * Sleep until concurrent transaction ends. Note that we don't care
+ * if the locker has an exclusive or shared lock, because we need
+ * exclusive.
*/
- if (!TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data), xwait))
- goto l2;
- if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED))
+
+ /* must copy state data before unlocking buffer */
+ xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
+ infomask = oldtup.t_data->t_infomask;
+
+ if (infomask & HEAP_XMAX_IS_MULTI)
{
- oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- SetBufferCommitInfoNeedsSave(buffer);
+ /* wait for multixact */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ MultiXactIdWait((MultiXactId) xwait);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * If xwait had just locked the tuple then some other xact could
+ * update this tuple before we get to this point. Check for xmax
+ * change, and start over if so.
+ */
+ if (!(oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data),
+ xwait))
+ goto l2;
+
+ /*
+ * You might think the multixact is necessarily done here, but
+ * not so: it could have surviving members, namely our own xact
+ * or other subxacts of this backend. It is legal for us to
+ * update the tuple in either case, however (the latter case is
+ * essentially a situation of upgrading our former shared lock
+ * to exclusive). We don't bother changing the on-disk hint bits
+ * since we are about to overwrite the xmax altogether.
+ */
}
- /* if tuple was marked for update but not updated... */
- if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
+ else
+ {
+ /* wait for regular transaction to end */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ XactLockTableWait(xwait);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * xwait is done, but if xwait had just locked the tuple then some
+ * other xact could update this tuple before we get to this point.
+ * Check for xmax change, and start over if so.
+ */
+ if ((oldtup.t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(oldtup.t_data),
+ xwait))
+ goto l2;
+
+ /* Otherwise we can mark it committed or aborted */
+ if (!(oldtup.t_data->t_infomask & (HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID)))
+ {
+ if (TransactionIdDidCommit(xwait))
+ oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED;
+ else
+ oldtup.t_data->t_infomask |= HEAP_XMAX_INVALID;
+ SetBufferCommitInfoNeedsSave(buffer);
+ }
+ }
+
+ /*
+ * We may overwrite if previous xmax aborted, or if it committed
+ * but only locked the tuple without updating it.
+ */
+ if (oldtup.t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_IS_LOCKED))
result = HeapTupleMayBeUpdated;
else
result = HeapTupleUpdated;
@@ -1556,7 +1661,8 @@ l2:
{
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
HEAP_MOVED);
HeapTupleHeaderSetXmax(oldtup.t_data, xid);
HeapTupleHeaderSetCmax(oldtup.t_data, cid);
@@ -1642,7 +1748,8 @@ l2:
{
oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
HEAP_MOVED);
HeapTupleHeaderSetXmax(oldtup.t_data, xid);
HeapTupleHeaderSetCmax(oldtup.t_data, cid);
@@ -1739,17 +1846,18 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
}
/*
- * heap_mark4update - mark a tuple for update
+ * heap_lock_tuple - lock a tuple in shared or exclusive mode
*/
HTSU_Result
-heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer,
- CommandId cid)
+heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
+ CommandId cid, LockTupleMode mode)
{
- TransactionId xid = GetCurrentTransactionId();
+ TransactionId xid;
ItemPointer tid = &(tuple->t_self);
ItemId lp;
PageHeader dp;
HTSU_Result result;
+ uint16 new_infomask;
*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
@@ -1767,38 +1875,93 @@ l3:
{
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(*buffer);
- elog(ERROR, "attempted to mark4update invisible tuple");
+ elog(ERROR, "attempted to lock invisible tuple");
}
else if (result == HeapTupleBeingUpdated)
{
- TransactionId xwait = HeapTupleHeaderGetXmax(tuple->t_data);
+ if (mode == LockTupleShared &&
+ (tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK))
+ result = HeapTupleMayBeUpdated;
+ else
+ {
+ TransactionId xwait;
+ uint16 infomask;
- /* sleep until concurrent transaction ends */
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
- XactLockTableWait(xwait);
+ /*
+ * Sleep until concurrent transaction ends.
+ */
- LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
- if (!TransactionIdDidCommit(xwait))
- goto l3;
+ /* must copy state data before unlocking buffer */
+ xwait = HeapTupleHeaderGetXmax(tuple->t_data);
+ infomask = tuple->t_data->t_infomask;
- /*
- * xwait is committed but if xwait had just marked the tuple for
- * update then some other xaction could update this tuple before
- * we got to this point.
- */
- if (!TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), xwait))
- goto l3;
- if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED))
- {
- tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- SetBufferCommitInfoNeedsSave(*buffer);
+ if (infomask & HEAP_XMAX_IS_MULTI)
+ {
+ /* wait for multixact */
+ LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+ MultiXactIdWait((MultiXactId) xwait);
+ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * If xwait had just locked the tuple then some other xact
+ * could update this tuple before we get to this point.
+ * Check for xmax change, and start over if so.
+ */
+ if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
+ xwait))
+ goto l3;
+
+ /*
+ * You might think the multixact is necessarily done here, but
+ * not so: it could have surviving members, namely our own xact
+ * or other subxacts of this backend. It is legal for us to
+ * lock the tuple in either case, however. We don't bother
+ * changing the on-disk hint bits since we are about to
+ * overwrite the xmax altogether.
+ */
+ }
+ else
+ {
+ /* wait for regular transaction to end */
+ LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+ XactLockTableWait(xwait);
+ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+
+ /*
+ * xwait is done, but if xwait had just locked the tuple then
+ * some other xact could update this tuple before we get to
+ * this point. Check for xmax change, and start over if so.
+ */
+ if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
+ xwait))
+ goto l3;
+
+ /* Otherwise we can mark it committed or aborted */
+ if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID)))
+ {
+ if (TransactionIdDidCommit(xwait))
+ tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
+ else
+ tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
+ SetBufferCommitInfoNeedsSave(*buffer);
+ }
+ }
+
+ /*
+ * We may lock if previous xmax aborted, or if it committed
+ * but only locked the tuple without updating it.
+ */
+ if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_IS_LOCKED))
+ result = HeapTupleMayBeUpdated;
+ else
+ result = HeapTupleUpdated;
}
- /* if tuple was marked for update but not updated... */
- if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE)
- result = HeapTupleMayBeUpdated;
- else
- result = HeapTupleUpdated;
}
+
if (result != HeapTupleMayBeUpdated)
{
Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
@@ -1808,21 +1971,173 @@ l3:
}
/*
- * XLOG stuff: no logging is required as long as we have no
- * savepoints. For savepoints private log could be used...
+ * Compute the new xmax and infomask to store into the tuple. Note we
+ * do not modify the tuple just yet, because that would leave it in the
+ * wrong state if multixact.c elogs.
*/
- PageSetTLI(BufferGetPage(*buffer), ThisTimeLineID);
+ xid = GetCurrentTransactionId();
+
+ new_infomask = tuple->t_data->t_infomask;
+
+ new_infomask &= ~(HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
+ HEAP_MOVED);
+
+ if (mode == LockTupleShared)
+ {
+ TransactionId xmax = HeapTupleHeaderGetXmax(tuple->t_data);
+ uint16 old_infomask = tuple->t_data->t_infomask;
+
+ /*
+ * If this is the first acquisition of a shared lock in the current
+ * transaction, set my per-backend OldestMemberMXactId setting.
+ * We can be certain that the transaction will never become a
+ * member of any older MultiXactIds than that. (We have to do this
+ * even if we end up just using our own TransactionId below, since
+ * some other backend could incorporate our XID into a MultiXact
+ * immediately afterwards.)
+ */
+ MultiXactIdSetOldestMember();
+
+ new_infomask |= HEAP_XMAX_SHARED_LOCK;
+
+ /*
+ * Check to see if we need a MultiXactId because there are multiple
+ * lockers.
+ *
+ * HeapTupleSatisfiesUpdate will have set the HEAP_XMAX_INVALID
+ * bit if the xmax was a MultiXactId but it was not running anymore.
+ * There is a race condition, which is that the MultiXactId may have
+ * finished since then, but that uncommon case is handled within
+ * MultiXactIdExpand.
+ *
+ * There is a similar race condition possible when the old xmax was
+ * a regular TransactionId. We test TransactionIdIsInProgress again
+ * just to narrow the window, but it's still possible to end up
+ * creating an unnecessary MultiXactId. Fortunately this is harmless.
+ */
+ if (!(old_infomask & (HEAP_XMAX_INVALID | HEAP_XMAX_COMMITTED)))
+ {
+ if (old_infomask & HEAP_XMAX_IS_MULTI)
+ {
+ /*
+ * If the XMAX is already a MultiXactId, then we need to
+ * expand it to include our own TransactionId.
+ */
+ xid = MultiXactIdExpand(xmax, true, xid);
+ new_infomask |= HEAP_XMAX_IS_MULTI;
+ }
+ else if (TransactionIdIsInProgress(xmax))
+ {
+ if (TransactionIdEquals(xmax, xid))
+ {
+ /*
+ * If the old locker is ourselves, we'll just mark the
+ * tuple again with our own TransactionId. However we
+ * have to consider the possibility that we had
+ * exclusive rather than shared lock before --- if so,
+ * be careful to preserve the exclusivity of the lock.
+ */
+ if (!(old_infomask & HEAP_XMAX_SHARED_LOCK))
+ {
+ new_infomask &= ~HEAP_XMAX_SHARED_LOCK;
+ new_infomask |= HEAP_XMAX_EXCL_LOCK;
+ mode = LockTupleExclusive;
+ }
+ }
+ else
+ {
+ /*
+ * If the Xmax is a valid TransactionId, then we need to
+ * create a new MultiXactId that includes both the old
+ * locker and our own TransactionId.
+ */
+ xid = MultiXactIdExpand(xmax, false, xid);
+ new_infomask |= HEAP_XMAX_IS_MULTI;
+ }
+ }
+ else
+ {
+ /*
+ * Can get here iff HeapTupleSatisfiesUpdate saw the old
+ * xmax as running, but it finished before
+ * TransactionIdIsInProgress() got to run. Treat it like
+ * there's no locker in the tuple.
+ */
+ }
+ }
+ else
+ {
+ /*
+ * There was no previous locker, so just insert our own
+ * TransactionId.
+ */
+ }
+ }
+ else
+ {
+ /* We want an exclusive lock on the tuple */
+ new_infomask |= HEAP_XMAX_EXCL_LOCK;
+ }
+
+ START_CRIT_SECTION();
- /* store transaction information of xact marking the tuple */
- tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID |
- HEAP_MOVED);
- tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE;
+ /*
+ * Store transaction information of xact locking the tuple.
+ *
+ * Note: our CID is meaningless if storing a MultiXactId, but no harm
+ * in storing it anyway.
+ */
+ tuple->t_data->t_infomask = new_infomask;
HeapTupleHeaderSetXmax(tuple->t_data, xid);
HeapTupleHeaderSetCmax(tuple->t_data, cid);
/* Make sure there is no forward chain link in t_ctid */
tuple->t_data->t_ctid = *tid;
+ /*
+ * XLOG stuff. You might think that we don't need an XLOG record because
+ * there is no state change worth restoring after a crash. You would be
+ * wrong however: we have just written either a TransactionId or a
+ * MultiXactId that may never have been seen on disk before, and we need
+ * to make sure that there are XLOG entries covering those ID numbers.
+ * Else the same IDs might be re-used after a crash, which would be
+ * disastrous if this page made it to disk before the crash. Essentially
+ * we have to enforce the WAL log-before-data rule even in this case.
+ */
+ if (!relation->rd_istemp)
+ {
+ xl_heap_lock xlrec;
+ XLogRecPtr recptr;
+ XLogRecData rdata[2];
+
+ xlrec.target.node = relation->rd_node;
+ xlrec.target.tid = tuple->t_self;
+ xlrec.shared_lock = (mode == LockTupleShared);
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &xlrec;
+ rdata[0].len = SizeOfHeapLock;
+ rdata[0].next = &(rdata[1]);
+
+ rdata[1].buffer = *buffer;
+ rdata[1].data = NULL;
+ rdata[1].len = 0;
+ rdata[1].next = NULL;
+
+ recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_LOCK, rdata);
+
+ PageSetLSN(dp, recptr);
+ PageSetTLI(dp, ThisTimeLineID);
+ }
+ else
+ {
+ /* No XLOG record, but still need to flag that XID exists on disk */
+ MyXactMadeTempRelUpdate = true;
+ }
+
+ END_CRIT_SECTION();
+
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
WriteNoReleaseBuffer(*buffer);
@@ -1832,17 +2147,6 @@ l3:
/* ----------------
* heap_markpos - mark scan position
- *
- * Note:
- * Should only one mark be maintained per scan at one time.
- * Check if this can be done generally--say calls to get the
- * next/previous tuple and NEVER pass struct scandesc to the
- * user AM's. Now, the mark is sent to the executor for safekeeping.
- * Probably can store this info into a GENERAL scan structure.
- *
- * May be best to change this call to store the marked position
- * (up to 2?) in the scan structure itself.
- * Fix to use the proper caching structure.
* ----------------
*/
void
@@ -1858,19 +2162,6 @@ heap_markpos(HeapScanDesc scan)
/* ----------------
* heap_restrpos - restore position to marked location
- *
- * Note: there are bad side effects here. If we were past the end
- * of a relation when heapmarkpos is called, then if the relation is
- * extended via insert, then the next call to heaprestrpos will set
- * cause the added tuples to be visible when the scan continues.
- * Problems also arise if the TID's are rearranged!!!
- *
- * XXX might be better to do direct access instead of
- * using the generality of heapgettup().
- *
- * XXX It is very possible that when a scan is restored, that a tuple
- * XXX which previously qualified may fail for time range purposes, unless
- * XXX some form of locking exists (ie., portals currently can act funny.
* ----------------
*/
void
@@ -1996,8 +2287,7 @@ log_heap_update(Relation reln, Buffer oldbuf, ItemPointerData from,
{
TransactionId xid[2]; /* xmax, xmin */
- if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE))
+ if (newtup->t_data->t_infomask & (HEAP_XMAX_INVALID | HEAP_IS_LOCKED))
xid[0] = InvalidTransactionId;
else
xid[0] = HeapTupleHeaderGetXmax(newtup->t_data);
@@ -2185,7 +2475,8 @@ heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
HEAP_MOVED);
HeapTupleHeaderSetXmax(htup, record->xl_xid);
HeapTupleHeaderSetCmax(htup, FirstCommandId);
@@ -2365,7 +2656,8 @@ heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record, bool move)
{
htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
HEAP_XMAX_INVALID |
- HEAP_MARKED_FOR_UPDATE |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
HEAP_MOVED);
HeapTupleHeaderSetXmax(htup, record->xl_xid);
HeapTupleHeaderSetCmax(htup, FirstCommandId);
@@ -2487,6 +2779,82 @@ newsame:;
}
+static void
+heap_xlog_lock(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_heap_lock *xlrec = (xl_heap_lock *) XLogRecGetData(record);
+ Relation reln;
+ Buffer buffer;
+ Page page;
+ OffsetNumber offnum;
+ ItemId lp = NULL;
+ HeapTupleHeader htup;
+
+ if (redo && (record->xl_info & XLR_BKP_BLOCK_1))
+ return;
+
+ reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
+
+ if (!RelationIsValid(reln))
+ return;
+
+ buffer = XLogReadBuffer(false, reln,
+ ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ if (!BufferIsValid(buffer))
+ elog(PANIC, "heap_lock_%sdo: no block", (redo) ? "re" : "un");
+
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ elog(PANIC, "heap_lock_%sdo: uninitialized page", (redo) ? "re" : "un");
+
+ if (redo)
+ {
+ if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
+ {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ return;
+ }
+ }
+ else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied
+ * ?! */
+ elog(PANIC, "heap_lock_undo: bad page LSN");
+
+ offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+ if (PageGetMaxOffsetNumber(page) >= offnum)
+ lp = PageGetItemId(page, offnum);
+
+ if (PageGetMaxOffsetNumber(page) < offnum || !ItemIdIsUsed(lp))
+ elog(PANIC, "heap_lock_%sdo: invalid lp", (redo) ? "re" : "un");
+
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ if (redo)
+ {
+ /*
+ * Presently, we don't bother to restore the locked state, but
+ * just set the XMAX_INVALID bit.
+ */
+ htup->t_infomask &= ~(HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID |
+ HEAP_XMAX_IS_MULTI |
+ HEAP_IS_LOCKED |
+ HEAP_MOVED);
+ htup->t_infomask |= HEAP_XMAX_INVALID;
+ HeapTupleHeaderSetXmax(htup, record->xl_xid);
+ HeapTupleHeaderSetCmax(htup, FirstCommandId);
+ /* Make sure there is no forward chain link in t_ctid */
+ htup->t_ctid = xlrec->target.tid;
+ PageSetLSN(page, lsn);
+ PageSetTLI(page, ThisTimeLineID);
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ WriteBuffer(buffer);
+ return;
+ }
+
+ elog(PANIC, "heap_lock_undo: unimplemented");
+}
+
void
heap_redo(XLogRecPtr lsn, XLogRecord *record)
{
@@ -2505,6 +2873,8 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record)
heap_xlog_clean(true, lsn, record);
else if (info == XLOG_HEAP_NEWPAGE)
heap_xlog_newpage(true, lsn, record);
+ else if (info == XLOG_HEAP_LOCK)
+ heap_xlog_lock(true, lsn, record);
else
elog(PANIC, "heap_redo: unknown op code %u", info);
}
@@ -2527,6 +2897,8 @@ heap_undo(XLogRecPtr lsn, XLogRecord *record)
heap_xlog_clean(false, lsn, record);
else if (info == XLOG_HEAP_NEWPAGE)
heap_xlog_newpage(false, lsn, record);
+ else if (info == XLOG_HEAP_LOCK)
+ heap_xlog_lock(false, lsn, record);
else
elog(PANIC, "heap_undo: unknown op code %u", info);
}
@@ -2589,6 +2961,16 @@ heap_desc(char *buf, uint8 xl_info, char *rec)
xlrec->node.spcNode, xlrec->node.dbNode,
xlrec->node.relNode, xlrec->blkno);
}
+ else if (info == XLOG_HEAP_LOCK)
+ {
+ xl_heap_lock *xlrec = (xl_heap_lock *) rec;
+
+ if (xlrec->shared_lock)
+ strcat(buf, "shared_lock: ");
+ else
+ strcat(buf, "exclusive_lock: ");
+ out_target(buf, &(xlrec->target));
+ }
else
strcat(buf, "UNKNOWN");
}