aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c485
1 files changed, 441 insertions, 44 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index d5f36e4e4d9..4bb638d6c93 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.84 2000/08/04 04:16:06 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.85 2000/09/07 09:58:34 vadim Exp $
*
*
* INTERFACE ROUTINES
@@ -24,7 +24,7 @@
* heap_fetch - retrive tuple with tid
* heap_insert - insert tuple into a relation
* heap_delete - delete a tuple from a relation
- * heap_update - replace a tuple in a relation with another tuple
+ * heap_update - replace a tuple in a relation with another tuple
* heap_markpos - mark scan position
* heap_restrpos - restore position to marked location
*
@@ -86,6 +86,10 @@
#include "utils/inval.h"
#include "utils/relcache.h"
+#ifdef XLOG /* comments are in _heap_update */
+static ItemPointerData _locked_tuple;
+#endif
+
/* ----------------------------------------------------------------
* heap support routines
@@ -1367,7 +1371,7 @@ heap_insert(Relation relation, HeapTuple tup)
#endif
/* Find buffer for this tuple */
- buffer = RelationGetBufferForTuple(relation, tup->t_len, InvalidBuffer);
+ buffer = RelationGetBufferForTuple(relation, tup->t_len);
/* NO ELOG(ERROR) from here till changes are logged */
RelationPutHeapTuple(relation, buffer, tup);
@@ -1376,10 +1380,9 @@ heap_insert(Relation relation, HeapTuple tup)
/* XLOG stuff */
{
xl_heap_insert xlrec;
- xlrec.itid.dbId = relation->rd_lockInfo.lockRelId.dbId;
- xlrec.itid.relId = relation->rd_lockInfo.lockRelId.relId;
- xlrec.itid.cid = GetCurrentCommandId();
- xlrec.itid.tid = tup->t_self;
+ xlrec.target.node = relation->rd_node;
+ xlrec.target.cid = GetCurrentCommandId();
+ xlrec.target.tid = tup->t_self;
xlrec.t_natts = tup->t_data->t_natts;
xlrec.t_oid = tup->t_data->t_oid;
xlrec.t_hoff = tup->t_data->t_hoff;
@@ -1390,8 +1393,8 @@ heap_insert(Relation relation, HeapTuple tup)
(char*) tup->t_data + offsetof(HeapTupleHeaderData, t_bits),
tup->t_len - offsetof(HeapTupleHeaderData, t_bits));
- ((PageHeader) BufferGetPage(buffer))->pd_lsn = recptr;
- ((PageHeader) BufferGetPage(buffer))->pd_sui = ThisStartUpID;
+ PageSetLSN(BufferGetPage(buffer), recptr);
+ PageSetSUI(BufferGetPage(buffer), ThisStartUpID);
}
#endif
@@ -1490,15 +1493,14 @@ l1:
/* XLOG stuff */
{
xl_heap_delete xlrec;
- xlrec.dtid.dbId = relation->rd_lockInfo.lockRelId.dbId;
- xlrec.dtid.relId = relation->rd_lockInfo.lockRelId.relId;
- xlrec.dtid.cid = GetCurrentCommandId();
- xlrec.dtid.tid = tp.t_self;
+ xlrec.target.node = relation->rd_node;
+ xlrec.target.cid = GetCurrentCommandId();
+ xlrec.target.tid = tp.t_self;
XLogRecPtr recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE,
(char*) xlrec, SizeOfHeapDelete, NULL, 0);
- dp->pd_lsn = recptr;
- dp->pd_sui = ThisStartUpID;
+ PageSetLSN(dp, recptr);
+ PageSetSUI(dp, ThisStartUpID);
}
#endif
@@ -1638,18 +1640,49 @@ l2:
if ((unsigned) MAXALIGN(newtup->t_len) <= PageGetFreeSpace((Page) dp))
newbuf = buffer;
else
- newbuf = RelationGetBufferForTuple(relation, newtup->t_len, buffer);
+ {
+#ifdef XLOG
+ /*
+ * We have to unlock old tuple buffer before extending table
+ * file but have to keep lock on the old tuple. To avoid second
+ * XLOG log record we use xact mngr hook to unlock old tuple
+ * without reading log if xact will abort before update is logged.
+ * In the event of crash prio logging, TQUAL routines will see
+ * HEAP_XMAX_UNLOGGED flag...
+ */
+ _locked_tuple = *otid;
+ XactPushRollback(_heap_unlock_tuple, (void*) &_locked_tuple);
+#endif
+ TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax));
+ oldtup.t_data->t_cmax = GetCurrentCommandId();
+ oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
+ oldtup.t_data->t_infomask |= HEAP_XMAX_UNLOGGED;
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ newbuf = RelationGetBufferForTuple(relation, newtup->t_len);
+ /* this seems to be deadlock free... */
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ }
/* NO ELOG(ERROR) from here till changes are logged */
/* insert new tuple */
RelationPutHeapTuple(relation, newbuf, newtup);
- /* logically delete old tuple */
- TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax));
- oldtup.t_data->t_cmax = GetCurrentCommandId();
- oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
+ if (buffer == newbuf)
+ {
+ TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax));
+ oldtup.t_data->t_cmax = GetCurrentCommandId();
+ oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
+ }
+ else
+ {
+ oldtup.t_data->t_infomask &= ~HEAP_XMAX_UNLOGGED;
+#ifdef XLOG
+ XactPopRollback();
+#endif
+ }
/* record address of new tuple in t_ctid of old one */
oldtup.t_data->t_ctid = newtup->t_self;
@@ -1658,10 +1691,10 @@ l2:
/* XLOG stuff */
{
xl_heap_update xlrec;
- xlrec.dtid.dbId = relation->rd_lockInfo.lockRelId.dbId;
- xlrec.dtid.relId = relation->rd_lockInfo.lockRelId.relId;
- xlrec.dtid.cid = GetCurrentCommandId();
- xlrec.itid.tid = newtup->t_self;
+ xlrec.target.node = relation->rd_node;
+ xlrec.target.cid = GetCurrentCommandId();
+ xlrec.target.tid = oldtup.t_self;
+ xlrec.newtid.tid = newtup->t_self;
xlrec.t_natts = newtup->t_data->t_natts;
xlrec.t_hoff = newtup->t_data->t_hoff;
xlrec.mask = newtup->t_data->t_infomask;
@@ -1673,11 +1706,11 @@ l2:
if (newbuf != buffer)
{
- ((PageHeader) BufferGetPage(newbuf))->pd_lsn = recptr;
- ((PageHeader) BufferGetPage(newbuf))->pd_sui = ThisStartUpID;
+ PageSetLSN(BufferGetPage(newbuf), recptr);
+ PageSetSUI(BufferGetPage(newbuf), ThisStartUpID);
}
- ((PageHeader) BufferGetPage(buffer))->pd_lsn = recptr;
- ((PageHeader) BufferGetPage(buffer))->pd_sui = ThisStartUpID;
+ PageSetLSN(BufferGetPage(buffer), recptr);
+ PageSetSUI(BufferGetPage(buffer), ThisStartUpID);
}
#endif
@@ -1969,7 +2002,7 @@ heap_restrpos(HeapScanDesc scan)
void heap_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
-
+
if (info == XLOG_HEAP_INSERT)
heap_xlog_insert(true, lsn, record);
else if (info == XLOG_HEAP_DELETE)
@@ -1982,25 +2015,389 @@ void heap_redo(XLogRecPtr lsn, XLogRecord *record)
elog(STOP, "heap_redo: unknown op code %u", info);
}
-void heap_undo(XLogRecPtr lsn, XLogRecord *record)
+void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
- uint8 info = record->xl_info & ~XLR_INFO_MASK;
-
- if (info == XLOG_HEAP_INSERT)
- heap_xlog_insert(false, lsn, record);
- else if (info == XLOG_HEAP_DELETE)
- heap_xlog_delete(false, lsn, record);
- else if (info == XLOG_HEAP_UPDATE)
- heap_xlog_update(false, lsn, record);
- else if (info == XLOG_HEAP_MOVE)
- heap_xlog_move(false, lsn, record);
- else
- elog(STOP, "heap_undo: unknown op code %u", info);
+ xl_heap_delete *xlrec = (xl_heap_delete*) XLogRecGetData(record);
+ Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
+
+ if (!RelationIsValid(reln))
+ return;
+ Buffer buffer = XLogReadBuffer(false, reln,
+ ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ if (!BufferIsValid(buffer))
+ return;
+
+ Page page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ {
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ return;
+ }
+
+ if (redo)
+ {
+ if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
+ {
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+ }
+ else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */
+ elog(STOP, "heap_delete_undo: bad page LSN");
+
+ OffsetNumber offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+ ItemId lp = PageGetItemId(page, offnum);
+
+ if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
+ {
+ if (redo)
+ elog(STOP, "heap_delete_redo: unused/deleted target tuple");
+ if (!InRecovery)
+ elog(STOP, "heap_delete_undo: unused/deleted target tuple in rollback");
+ if (ItemIdDeleted(lp))
+ {
+ lp->lp_flags &= ~LP_USED;
+ PageRepairFragmentation(page);
+ UnlockAndWriteBuffer(buffer);
+ }
+ else
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+ HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ if (redo)
+ {
+ htup->t_xmax = record->xl_xid;
+ htup->t_cmax = xlrec->target.cid;
+ htup->t_infomask &= ~(HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
+ htup->t_infomask |= HEAP_XMAX_COMMITTED;
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ return;
+ }
+
+ /* undo... is it our tuple ? */
+ if (htup->t_xmax != record->xl_xid || htup->t_cmax != xlrec->target.cid)
+ {
+ if (!InRecovery)
+ elog(STOP, "heap_delete_undo: invalid target tuple in rollback");
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+ else /* undo DELETE */
+ {
+ htup->t_infomask |= HEAP_XMAX_INVALID;
+ UnlockAndWriteBuffer(buffer);
+ return;
+ }
+
}
void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record)
{
- xl_heap_insert xlrec = XLogRecGetData(record);
+ xl_heap_insert *xlrec = (xl_heap_insert*) XLogRecGetData(record);
+ Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
+
+ if (!RelationIsValid(reln))
+ return;
+ Buffer buffer = XLogReadBuffer((redo) ? true : false, reln,
+ ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ if (!BufferIsValid(buffer))
+ return;
+
+ Page page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ {
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ if (!redo)
+ {
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ return;
+ }
+ }
+
+ if (redo)
+ {
+ if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
+ {
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+
+ char tbuf[MaxTupleSize];
+ HeapTupleHeader htup = (HeapTupleHeader) tbuf;
+ uint32 newlen = record->xl_len - SizeOfHeapInsert;
+
+ memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits),
+ (char*)xlrec + SizeOfHeapInsert, newlen);
+ newlen += offsetof(HeapTupleHeaderData, t_bits);
+ htup->t_oid = xlrec->t_oid;
+ htup->t_natts = xlrec->t_natts;
+ htup->t_hoff = xlrec->t_hoff;
+ htup->t_xmin = record->xl_xid;
+ htup->t_cmin = xlrec->target.cid;
+ htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask;
+
+ PageManagerModeSet(OverwritePageManagerMode);
+ OffsetNumber offnum = PageAddItem(page, htup, newlen,
+ ItemPointerGetOffsetNumber(&(xlrec->target.tid)), LP_USED);
+ PageManagerModeSet(ShufflePageManagerMode);
+ if (offnum == InvalidOffsetNumber)
+ elog(STOP, "heap_insert_redo: failed to add tuple");
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID); /* prev sui */
+ UnlockAndWriteBuffer(buffer);
+ return;
+ }
+
+ /* undo insert */
+ if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */
+ elog(STOP, "heap_insert_undo: bad page LSN");
+
+ OffsetNumber offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+ ItemId lp = PageGetItemId(page, offnum);
+
+ if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
+ {
+ if (!InRecovery)
+ elog(STOP, "heap_insert_undo: unused/deleted target tuple in rollback");
+ if (ItemIdDeleted(lp))
+ {
+ lp->lp_flags &= ~LP_USED;
+ PageRepairFragmentation(page);
+ UnlockAndWriteBuffer(buffer);
+ }
+ else
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+ HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ /* is it our tuple ? */
+ if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
+ {
+ if (!InRecovery)
+ elog(STOP, "heap_insert_undo: invalid target tuple in rollback");
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+
+ if (InRecovery || BufferIsUpdatable(buffer))
+ {
+ lp->lp_flags &= ~LP_USED;
+ PageRepairFragmentation(page);
+ UnlockAndWriteBuffer(buffer);
+ }
+ else /* we can't delete tuple right now */
+ {
+ lp->lp_flags |= LP_DELETE; /* mark for deletion */
+ MarkBufferForCleanup(buffer, PageCleanup);
+ }
+
+}
+
+void heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record)
+{
+ xl_heap_update *xlrec = (xl_heap_update*) XLogRecGetData(record);
+ Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node);
+
+ if (!RelationIsValid(reln))
+ return;
+ Buffer buffer;
+ Page page;
+ OffsetNumber offnum;
+ ItemId lp;
+ HeapTupleHeader htup;
+
+ /*
+ * Currently UPDATE is DELETE + INSERT and so code below are near
+ * exact sum of code in heap_xlog_delete & heap_xlog_insert. We could
+ * re-structure code better, but keeping in mind upcoming overwriting
+ * smgr separate heap_xlog_update code seems to be Good Thing.
+ */
+
+ /* Deal with old tuple version */
+
+ buffer = XLogReadBuffer(false, reln,
+ ItemPointerGetBlockNumber(&(xlrec->target.tid)));
+ if (!BufferIsValid(buffer))
+ goto newt;
+
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ {
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ goto newt;
+ }
+
+ if (redo)
+ {
+ if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
+ {
+ UnlockAndReleaseBuffer(buffer);
+ goto newt;
+ }
+ }
+ else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */
+ elog(STOP, "heap_update_undo: bad old tuple page LSN");
+
+ offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid));
+ lp = PageGetItemId(page, offnum);
+
+ if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
+ {
+ if (redo)
+ elog(STOP, "heap_update_redo: unused/deleted old tuple");
+ if (!InRecovery)
+ elog(STOP, "heap_update_undo: unused/deleted old tuple in rollback");
+ if (ItemIdDeleted(lp))
+ {
+ lp->lp_flags &= ~LP_USED;
+ PageRepairFragmentation(page);
+ UnlockAndWriteBuffer(buffer);
+ }
+ else
+ UnlockAndReleaseBuffer(buffer);
+ goto newt;
+ }
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ if (redo)
+ {
+ htup->t_xmax = record->xl_xid;
+ htup->t_cmax = xlrec->target.cid;
+ htup->t_infomask &= ~(HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE);
+ htup->t_infomask |= HEAP_XMAX_COMMITTED;
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ goto newt;
+ }
+
+ /* undo... is it our tuple ? */
+ if (htup->t_xmax != record->xl_xid || htup->t_cmax != xlrec->target.cid)
+ {
+ if (!InRecovery)
+ elog(STOP, "heap_update_undo: invalid old tuple in rollback");
+ UnlockAndReleaseBuffer(buffer);
+ }
+ else /* undo */
+ {
+ htup->t_infomask |= HEAP_XMAX_INVALID;
+ UnlockAndWriteBuffer(buffer);
+ }
+
+ /* Deal with new tuple */
+
+newt:;
+
+ buffer = XLogReadBuffer((redo) ? true : false, reln,
+ ItemPointerGetBlockNumber(&(xlrec->newtid)));
+ if (!BufferIsValid(buffer))
+ return;
+
+ page = (Page) BufferGetPage(buffer);
+ if (PageIsNew((PageHeader) page))
+ {
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ if (!redo)
+ {
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID);
+ UnlockAndWriteBuffer(buffer);
+ return;
+ }
+ }
+
+ if (redo)
+ {
+ if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */
+ {
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+
+ char tbuf[MaxTupleSize];
+ uint32 newlen = record->xl_len - SizeOfHeapUpdate;
+
+ htup = (HeapTupleHeader) tbuf;
+ memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits),
+ (char*)xlrec + SizeOfHeapUpdate, newlen);
+ newlen += offsetof(HeapTupleHeaderData, t_bits);
+ htup->t_oid = xlrec->t_oid;
+ htup->t_natts = xlrec->t_natts;
+ htup->t_hoff = xlrec->t_hoff;
+ htup->t_xmin = record->xl_xid;
+ htup->t_cmin = xlrec->target.cid;
+ htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask;
+
+ PageManagerModeSet(OverwritePageManagerMode);
+ OffsetNumber offnum = PageAddItem(page, htup, newlen,
+ ItemPointerGetOffsetNumber(&(xlrec->newtid)), LP_USED);
+ PageManagerModeSet(ShufflePageManagerMode);
+ if (offnum == InvalidOffsetNumber)
+ elog(STOP, "heap_update_redo: failed to add tuple");
+ PageSetLSN(page, lsn);
+ PageSetSUI(page, ThisStartUpID); /* prev sui */
+ UnlockAndWriteBuffer(buffer);
+ return;
+ }
+
+ /* undo */
+ if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */
+ elog(STOP, "heap_update_undo: bad new tuple page LSN");
+
+ offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid));
+ lp = PageGetItemId(page, offnum);
+
+ if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp))
+ {
+ if (!InRecovery)
+ elog(STOP, "heap_update_undo: unused/deleted new tuple in rollback");
+ if (ItemIdDeleted(lp))
+ {
+ lp->lp_flags &= ~LP_USED;
+ PageRepairFragmentation(page);
+ UnlockAndWriteBuffer(buffer);
+ }
+ else
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+ htup = (HeapTupleHeader) PageGetItem(page, lp);
+
+ /* is it our tuple ? */
+ if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid)
+ {
+ if (!InRecovery)
+ elog(STOP, "heap_update_undo: invalid new tuple in rollback");
+ UnlockAndReleaseBuffer(buffer);
+ return;
+ }
+
+ if (InRecovery || BufferIsUpdatable(buffer))
+ {
+ lp->lp_flags &= ~LP_USED;
+ PageRepairFragmentation(page);
+ UnlockAndWriteBuffer(buffer);
+ }
+ else /* we can't delete tuple right now */
+ {
+ lp->lp_flags |= LP_DELETE; /* mark for deletion */
+ MarkBufferForCleanup(buffer, PageCleanup);
+ }
+
}
+
#endif /* XLOG */