aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c282
1 files changed, 189 insertions, 93 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index 6e70ab25c10..46c7c4da73f 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.197 2005/08/12 01:35:54 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.198 2005/08/20 00:39:51 tgl Exp $
*
*
* INTERFACE ROUTINES
@@ -22,7 +22,7 @@
* heap_rescan - restart a relation scan
* heap_endscan - end relation scan
* heap_getnext - retrieve next tuple in scan
- * heap_fetch - retrieve tuple with tid
+ * heap_fetch - retrieve tuple with given tid
* heap_insert - insert tuple into a relation
* heap_delete - delete a tuple from a relation
* heap_update - replace a tuple in a relation with another tuple
@@ -152,7 +152,7 @@ heapgettup(Relation relation,
tid = NULL;
}
- tuple->t_tableOid = relation->rd_id;
+ tuple->t_tableOid = RelationGetRelid(relation);
/*
* return null immediately if relation is empty
@@ -800,10 +800,13 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
* keep_buf = false, the pin is released and *userbuf is set to InvalidBuffer.
*
* It is somewhat inconsistent that we ereport() on invalid block number but
- * return false on invalid item number. This is historical. The only
- * justification I can see is that the caller can relatively easily check the
- * block number for validity, but cannot check the item number without reading
- * the page himself.
+ * return false on invalid item number. There are a couple of reasons though.
+ * One is that the caller can relatively easily check the block number for
+ * validity, but cannot check the item number without reading the page
+ * himself. Another is that when we are following a t_ctid link, we can be
+ * reasonably confident that the page number is valid (since VACUUM shouldn't
+ * truncate off the destination page without having killed the referencing
+ * tuple first), but the item number might well not be good.
*/
bool
heap_fetch(Relation relation,
@@ -906,7 +909,7 @@ heap_release_fetch(Relation relation,
tuple->t_datamcxt = NULL;
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
tuple->t_len = ItemIdGetLength(lp);
- tuple->t_tableOid = relation->rd_id;
+ tuple->t_tableOid = RelationGetRelid(relation);
/*
* check time qualification of tuple, then release lock
@@ -950,83 +953,129 @@ heap_release_fetch(Relation relation,
/*
* heap_get_latest_tid - get the latest tid of a specified tuple
+ *
+ * Actually, this gets the latest version that is visible according to
+ * the passed snapshot. You can pass SnapshotDirty to get the very latest,
+ * possibly uncommitted version.
+ *
+ * *tid is both an input and an output parameter: it is updated to
+ * show the latest version of the row. Note that it will not be changed
+ * if no version of the row passes the snapshot test.
*/
-ItemPointer
+void
heap_get_latest_tid(Relation relation,
Snapshot snapshot,
ItemPointer tid)
{
- ItemId lp = NULL;
- Buffer buffer;
- PageHeader dp;
- OffsetNumber offnum;
- HeapTupleData tp;
- HeapTupleHeader t_data;
+ BlockNumber blk;
ItemPointerData ctid;
- bool invalidBlock,
- linkend,
- valid;
+ TransactionId priorXmax;
- /*
- * get the buffer from the relation descriptor Note that this does a
- * buffer pin.
- */
- buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
- LockBuffer(buffer, BUFFER_LOCK_SHARE);
+ /* this is to avoid Assert failures on bad input */
+ if (!ItemPointerIsValid(tid))
+ return;
/*
- * get the item line pointer corresponding to the requested tid
+ * Since this can be called with user-supplied TID, don't trust the
+ * input too much. (RelationGetNumberOfBlocks is an expensive check,
+ * so we don't check t_ctid links again this way. Note that it would
+ * not do to call it just once and save the result, either.)
*/
- dp = (PageHeader) BufferGetPage(buffer);
- offnum = ItemPointerGetOffsetNumber(tid);
- invalidBlock = true;
- if (!PageIsNew(dp))
- {
- lp = PageGetItemId(dp, offnum);
- if (ItemIdIsUsed(lp))
- invalidBlock = false;
- }
- if (invalidBlock)
- {
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
- return NULL;
- }
+ blk = ItemPointerGetBlockNumber(tid);
+ if (blk >= RelationGetNumberOfBlocks(relation))
+ elog(ERROR, "block number %u is out of range for relation \"%s\"",
+ blk, RelationGetRelationName(relation));
/*
- * more sanity checks
+ * Loop to chase down t_ctid links. At top of loop, ctid is the
+ * tuple we need to examine, and *tid is the TID we will return if
+ * ctid turns out to be bogus.
+ *
+ * Note that we will loop until we reach the end of the t_ctid chain.
+ * Depending on the snapshot passed, there might be at most one visible
+ * version of the row, but we don't try to optimize for that.
*/
+ ctid = *tid;
+ priorXmax = InvalidTransactionId; /* cannot check first XMIN */
+ for (;;)
+ {
+ Buffer buffer;
+ PageHeader dp;
+ OffsetNumber offnum;
+ ItemId lp;
+ HeapTupleData tp;
+ bool valid;
- tp.t_datamcxt = NULL;
- t_data = tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
- tp.t_len = ItemIdGetLength(lp);
- tp.t_self = *tid;
- ctid = tp.t_data->t_ctid;
+ /*
+ * Read, pin, and lock the page.
+ */
+ buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(&ctid));
+ LockBuffer(buffer, BUFFER_LOCK_SHARE);
+ dp = (PageHeader) BufferGetPage(buffer);
- /*
- * check time qualification of tid
- */
+ /*
+ * Check for bogus item number. This is not treated as an error
+ * condition because it can happen while following a t_ctid link.
+ * We just assume that the prior tid is OK and return it unchanged.
+ */
+ offnum = ItemPointerGetOffsetNumber(&ctid);
+ if (offnum < FirstOffsetNumber || offnum > PageGetMaxOffsetNumber(dp))
+ {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ break;
+ }
+ lp = PageGetItemId(dp, offnum);
+ if (!ItemIdIsUsed(lp))
+ {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ break;
+ }
- HeapTupleSatisfies(&tp, relation, buffer, dp,
- snapshot, 0, NULL, valid);
+ /* OK to access the tuple */
+ tp.t_self = ctid;
+ tp.t_datamcxt = NULL;
+ tp.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
+ tp.t_len = ItemIdGetLength(lp);
- linkend = true;
- if ((t_data->t_infomask & HEAP_XMIN_COMMITTED) != 0 &&
- !ItemPointerEquals(tid, &ctid))
- linkend = false;
+ /*
+ * After following a t_ctid link, we might arrive at an unrelated
+ * tuple. Check for XMIN match.
+ */
+ if (TransactionIdIsValid(priorXmax) &&
+ !TransactionIdEquals(priorXmax, HeapTupleHeaderGetXmin(tp.t_data)))
+ {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ break;
+ }
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- ReleaseBuffer(buffer);
+ /*
+ * Check time qualification of tuple; if visible, set it as the new
+ * result candidate.
+ */
+ HeapTupleSatisfies(&tp, relation, buffer, dp,
+ snapshot, 0, NULL, valid);
+ if (valid)
+ *tid = ctid;
- if (!valid)
- {
- if (linkend)
- return NULL;
- heap_get_latest_tid(relation, snapshot, &ctid);
- *tid = ctid;
- }
+ /*
+ * If there's a valid t_ctid link, follow it, else we're done.
+ */
+ if ((tp.t_data->t_infomask & (HEAP_XMAX_INVALID | HEAP_IS_LOCKED)) ||
+ ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid))
+ {
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ break;
+ }
- return tid;
+ ctid = tp.t_data->t_ctid;
+ priorXmax = HeapTupleHeaderGetXmax(tp.t_data);
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ ReleaseBuffer(buffer);
+ } /* end of loop */
}
/*
@@ -1083,7 +1132,7 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
HeapTupleHeaderSetCmin(tup->t_data, cid);
HeapTupleHeaderSetXmax(tup->t_data, 0); /* zero out Datum fields */
HeapTupleHeaderSetCmax(tup->t_data, 0); /* for cleanliness */
- tup->t_tableOid = relation->rd_id;
+ tup->t_tableOid = RelationGetRelid(relation);
/*
* If the new tuple is too big for storage or contains already toasted
@@ -1197,29 +1246,34 @@ simple_heap_insert(Relation relation, HeapTuple tup)
}
/*
- * heap_delete - delete a tuple
+ * heap_delete - delete a tuple
*
* NB: do not call this directly unless you are prepared to deal with
* concurrent-update conditions. Use simple_heap_delete instead.
*
- * relation - table to be modified
+ * relation - table to be modified (caller must hold suitable lock)
* tid - TID of tuple to be deleted
* ctid - output parameter, used only for failure case (see below)
- * cid - delete command ID to use in verifying tuple visibility
+ * update_xmax - output parameter, used only for failure case (see below)
+ * cid - delete command ID (used for visibility test, and stored into
+ * cmax if successful)
* crosscheck - if not InvalidSnapshot, also check tuple against this
* wait - true if should wait for any conflicting update to commit/abort
*
* Normal, successful return value is HeapTupleMayBeUpdated, which
* actually means we did delete it. Failure return codes are
* HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
- * (the last only possible if wait == false). On a failure return,
- * *ctid is set to the ctid link of the target tuple (possibly a later
- * version of the row).
+ * (the last only possible if wait == false).
+ *
+ * In the failure cases, the routine returns the tuple's t_ctid and t_xmax.
+ * If t_ctid is the same as tid, the tuple was deleted; if different, the
+ * tuple was updated, and t_ctid is the location of the replacement tuple.
+ * (t_xmax is needed to verify that the replacement tuple matches.)
*/
HTSU_Result
heap_delete(Relation relation, ItemPointer tid,
- ItemPointer ctid, CommandId cid,
- Snapshot crosscheck, bool wait)
+ ItemPointer ctid, TransactionId *update_xmax,
+ CommandId cid, Snapshot crosscheck, bool wait)
{
HTSU_Result result;
TransactionId xid = GetCurrentTransactionId();
@@ -1236,11 +1290,11 @@ heap_delete(Relation relation, ItemPointer tid,
dp = (PageHeader) BufferGetPage(buffer);
lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
+
tp.t_datamcxt = NULL;
- tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
+ tp.t_data = (HeapTupleHeader) PageGetItem(dp, lp);
tp.t_len = ItemIdGetLength(lp);
tp.t_self = *tid;
- tp.t_tableOid = relation->rd_id;
l1:
result = HeapTupleSatisfiesUpdate(tp.t_data, cid, buffer);
@@ -1360,7 +1414,9 @@ l1:
Assert(result == HeapTupleSelfUpdated ||
result == HeapTupleUpdated ||
result == HeapTupleBeingUpdated);
+ Assert(!(tp.t_data->t_infomask & HEAP_XMAX_INVALID));
*ctid = tp.t_data->t_ctid;
+ *update_xmax = HeapTupleHeaderGetXmax(tp.t_data);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
if (have_tuple_lock)
@@ -1457,11 +1513,12 @@ l1:
void
simple_heap_delete(Relation relation, ItemPointer tid)
{
- ItemPointerData ctid;
HTSU_Result result;
+ ItemPointerData update_ctid;
+ TransactionId update_xmax;
result = heap_delete(relation, tid,
- &ctid,
+ &update_ctid, &update_xmax,
GetCurrentCommandId(), InvalidSnapshot,
true /* wait for commit */ );
switch (result)
@@ -1491,27 +1548,33 @@ simple_heap_delete(Relation relation, ItemPointer tid)
* NB: do not call this directly unless you are prepared to deal with
* concurrent-update conditions. Use simple_heap_update instead.
*
- * relation - table to be modified
+ * relation - table to be modified (caller must hold suitable lock)
* otid - TID of old tuple to be replaced
* newtup - newly constructed tuple data to store
* ctid - output parameter, used only for failure case (see below)
- * cid - update command ID to use in verifying old tuple visibility
+ * update_xmax - output parameter, used only for failure case (see below)
+ * cid - update command ID (used for visibility test, and stored into
+ * cmax/cmin if successful)
* crosscheck - if not InvalidSnapshot, also check old tuple against this
* wait - true if should wait for any conflicting update to commit/abort
*
* Normal, successful return value is HeapTupleMayBeUpdated, which
* actually means we *did* update it. Failure return codes are
* HeapTupleSelfUpdated, HeapTupleUpdated, or HeapTupleBeingUpdated
- * (the last only possible if wait == false). On a failure return,
- * *ctid is set to the ctid link of the old tuple (possibly a later
- * version of the row).
+ * (the last only possible if wait == false).
+ *
* On success, newtup->t_self is set to the TID where the new tuple
* was inserted.
+ *
+ * In the failure cases, the routine returns the tuple's t_ctid and t_xmax.
+ * If t_ctid is the same as otid, the tuple was deleted; if different, the
+ * tuple was updated, and t_ctid is the location of the replacement tuple.
+ * (t_xmax is needed to verify that the replacement tuple matches.)
*/
HTSU_Result
heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
- ItemPointer ctid, CommandId cid,
- Snapshot crosscheck, bool wait)
+ ItemPointer ctid, TransactionId *update_xmax,
+ CommandId cid, Snapshot crosscheck, bool wait)
{
HTSU_Result result;
TransactionId xid = GetCurrentTransactionId();
@@ -1664,7 +1727,9 @@ l2:
Assert(result == HeapTupleSelfUpdated ||
result == HeapTupleUpdated ||
result == HeapTupleBeingUpdated);
+ Assert(!(oldtup.t_data->t_infomask & HEAP_XMAX_INVALID));
*ctid = oldtup.t_data->t_ctid;
+ *update_xmax = HeapTupleHeaderGetXmax(oldtup.t_data);
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
if (have_tuple_lock)
@@ -1878,11 +1943,12 @@ l2:
void
simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
{
- ItemPointerData ctid;
HTSU_Result result;
+ ItemPointerData update_ctid;
+ TransactionId update_xmax;
result = heap_update(relation, otid, tup,
- &ctid,
+ &update_ctid, &update_xmax,
GetCurrentCommandId(), InvalidSnapshot,
true /* wait for commit */ );
switch (result)
@@ -1907,7 +1973,34 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
}
/*
- * heap_lock_tuple - lock a tuple in shared or exclusive mode
+ * heap_lock_tuple - lock a tuple in shared or exclusive mode
+ *
+ * Note that this acquires a buffer pin, which the caller must release.
+ *
+ * Input parameters:
+ * relation: relation containing tuple (caller must hold suitable lock)
+ * tuple->t_self: TID of tuple to lock (rest of struct need not be valid)
+ * cid: current command ID (used for visibility test, and stored into
+ * tuple's cmax if lock is successful)
+ * mode: indicates if shared or exclusive tuple lock is desired
+ * nowait: if true, ereport rather than blocking if lock not available
+ *
+ * Output parameters:
+ * *tuple: all fields filled in
+ * *buffer: set to buffer holding tuple (pinned but not locked at exit)
+ * *ctid: set to tuple's t_ctid, but only in failure cases
+ * *update_xmax: set to tuple's xmax, but only in failure cases
+ *
+ * Function result may be:
+ * HeapTupleMayBeUpdated: lock was successfully acquired
+ * HeapTupleSelfUpdated: lock failed because tuple updated by self
+ * HeapTupleUpdated: lock failed because tuple updated by other xact
+ *
+ * In the failure cases, the routine returns the tuple's t_ctid and t_xmax.
+ * If t_ctid is the same as t_self, the tuple was deleted; if different, the
+ * tuple was updated, and t_ctid is the location of the replacement tuple.
+ * (t_xmax is needed to verify that the replacement tuple matches.)
+ *
*
* NOTES: because the shared-memory lock table is of finite size, but users
* could reasonably want to lock large numbers of tuples, we do not rely on
@@ -1943,7 +2036,8 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup)
*/
HTSU_Result
heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
- CommandId cid, LockTupleMode mode, bool nowait)
+ ItemPointer ctid, TransactionId *update_xmax,
+ CommandId cid, LockTupleMode mode, bool nowait)
{
HTSU_Result result;
ItemPointer tid = &(tuple->t_self);
@@ -1961,9 +2055,12 @@ heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
dp = (PageHeader) BufferGetPage(*buffer);
lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid));
+ Assert(ItemIdIsUsed(lp));
+
tuple->t_datamcxt = NULL;
tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp);
tuple->t_len = ItemIdGetLength(lp);
+ tuple->t_tableOid = RelationGetRelid(relation);
l3:
result = HeapTupleSatisfiesUpdate(tuple->t_data, cid, *buffer);
@@ -2112,14 +2209,13 @@ l3:
if (result != HeapTupleMayBeUpdated)
{
- ItemPointerData newctid = tuple->t_data->t_ctid;
-
Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
+ Assert(!(tuple->t_data->t_infomask & HEAP_XMAX_INVALID));
+ *ctid = tuple->t_data->t_ctid;
+ *update_xmax = HeapTupleHeaderGetXmax(tuple->t_data);
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
if (have_tuple_lock)
UnlockTuple(relation, tid, tuple_lock_type);
- /* can't overwrite t_self (== *tid) until after above Unlock */
- tuple->t_self = newctid;
return result;
}