diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 485 |
1 files changed, 441 insertions, 44 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index d5f36e4e4d9..4bb638d6c93 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.84 2000/08/04 04:16:06 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.85 2000/09/07 09:58:34 vadim Exp $ * * * INTERFACE ROUTINES @@ -24,7 +24,7 @@ * heap_fetch - retrive tuple with tid * heap_insert - insert tuple into a relation * heap_delete - delete a tuple from a relation - * heap_update - replace a tuple in a relation with another tuple + * heap_update - replace a tuple in a relation with another tuple * heap_markpos - mark scan position * heap_restrpos - restore position to marked location * @@ -86,6 +86,10 @@ #include "utils/inval.h" #include "utils/relcache.h" +#ifdef XLOG /* comments are in _heap_update */ +static ItemPointerData _locked_tuple; +#endif + /* ---------------------------------------------------------------- * heap support routines @@ -1367,7 +1371,7 @@ heap_insert(Relation relation, HeapTuple tup) #endif /* Find buffer for this tuple */ - buffer = RelationGetBufferForTuple(relation, tup->t_len, InvalidBuffer); + buffer = RelationGetBufferForTuple(relation, tup->t_len); /* NO ELOG(ERROR) from here till changes are logged */ RelationPutHeapTuple(relation, buffer, tup); @@ -1376,10 +1380,9 @@ heap_insert(Relation relation, HeapTuple tup) /* XLOG stuff */ { xl_heap_insert xlrec; - xlrec.itid.dbId = relation->rd_lockInfo.lockRelId.dbId; - xlrec.itid.relId = relation->rd_lockInfo.lockRelId.relId; - xlrec.itid.cid = GetCurrentCommandId(); - xlrec.itid.tid = tup->t_self; + xlrec.target.node = relation->rd_node; + xlrec.target.cid = GetCurrentCommandId(); + xlrec.target.tid = tup->t_self; xlrec.t_natts = tup->t_data->t_natts; xlrec.t_oid = tup->t_data->t_oid; xlrec.t_hoff = tup->t_data->t_hoff; @@ -1390,8 +1393,8 @@ heap_insert(Relation relation, HeapTuple tup) (char*) tup->t_data + offsetof(HeapTupleHeaderData, t_bits), tup->t_len - offsetof(HeapTupleHeaderData, t_bits)); - ((PageHeader) BufferGetPage(buffer))->pd_lsn = recptr; - ((PageHeader) BufferGetPage(buffer))->pd_sui = ThisStartUpID; + PageSetLSN(BufferGetPage(buffer), recptr); + PageSetSUI(BufferGetPage(buffer), ThisStartUpID); } #endif @@ -1490,15 +1493,14 @@ l1: /* XLOG stuff */ { xl_heap_delete xlrec; - xlrec.dtid.dbId = relation->rd_lockInfo.lockRelId.dbId; - xlrec.dtid.relId = relation->rd_lockInfo.lockRelId.relId; - xlrec.dtid.cid = GetCurrentCommandId(); - xlrec.dtid.tid = tp.t_self; + xlrec.target.node = relation->rd_node; + xlrec.target.cid = GetCurrentCommandId(); + xlrec.target.tid = tp.t_self; XLogRecPtr recptr = XLogInsert(RM_HEAP_ID, XLOG_HEAP_DELETE, (char*) xlrec, SizeOfHeapDelete, NULL, 0); - dp->pd_lsn = recptr; - dp->pd_sui = ThisStartUpID; + PageSetLSN(dp, recptr); + PageSetSUI(dp, ThisStartUpID); } #endif @@ -1638,18 +1640,49 @@ l2: if ((unsigned) MAXALIGN(newtup->t_len) <= PageGetFreeSpace((Page) dp)) newbuf = buffer; else - newbuf = RelationGetBufferForTuple(relation, newtup->t_len, buffer); + { +#ifdef XLOG + /* + * We have to unlock old tuple buffer before extending table + * file but have to keep lock on the old tuple. To avoid second + * XLOG log record we use xact mngr hook to unlock old tuple + * without reading log if xact will abort before update is logged. + * In the event of crash prio logging, TQUAL routines will see + * HEAP_XMAX_UNLOGGED flag... + */ + _locked_tuple = *otid; + XactPushRollback(_heap_unlock_tuple, (void*) &_locked_tuple); +#endif + TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax)); + oldtup.t_data->t_cmax = GetCurrentCommandId(); + oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); + oldtup.t_data->t_infomask |= HEAP_XMAX_UNLOGGED; + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + newbuf = RelationGetBufferForTuple(relation, newtup->t_len); + /* this seems to be deadlock free... */ + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + } /* NO ELOG(ERROR) from here till changes are logged */ /* insert new tuple */ RelationPutHeapTuple(relation, newbuf, newtup); - /* logically delete old tuple */ - TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax)); - oldtup.t_data->t_cmax = GetCurrentCommandId(); - oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | - HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); + if (buffer == newbuf) + { + TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax)); + oldtup.t_data->t_cmax = GetCurrentCommandId(); + oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); + } + else + { + oldtup.t_data->t_infomask &= ~HEAP_XMAX_UNLOGGED; +#ifdef XLOG + XactPopRollback(); +#endif + } /* record address of new tuple in t_ctid of old one */ oldtup.t_data->t_ctid = newtup->t_self; @@ -1658,10 +1691,10 @@ l2: /* XLOG stuff */ { xl_heap_update xlrec; - xlrec.dtid.dbId = relation->rd_lockInfo.lockRelId.dbId; - xlrec.dtid.relId = relation->rd_lockInfo.lockRelId.relId; - xlrec.dtid.cid = GetCurrentCommandId(); - xlrec.itid.tid = newtup->t_self; + xlrec.target.node = relation->rd_node; + xlrec.target.cid = GetCurrentCommandId(); + xlrec.target.tid = oldtup.t_self; + xlrec.newtid.tid = newtup->t_self; xlrec.t_natts = newtup->t_data->t_natts; xlrec.t_hoff = newtup->t_data->t_hoff; xlrec.mask = newtup->t_data->t_infomask; @@ -1673,11 +1706,11 @@ l2: if (newbuf != buffer) { - ((PageHeader) BufferGetPage(newbuf))->pd_lsn = recptr; - ((PageHeader) BufferGetPage(newbuf))->pd_sui = ThisStartUpID; + PageSetLSN(BufferGetPage(newbuf), recptr); + PageSetSUI(BufferGetPage(newbuf), ThisStartUpID); } - ((PageHeader) BufferGetPage(buffer))->pd_lsn = recptr; - ((PageHeader) BufferGetPage(buffer))->pd_sui = ThisStartUpID; + PageSetLSN(BufferGetPage(buffer), recptr); + PageSetSUI(BufferGetPage(buffer), ThisStartUpID); } #endif @@ -1969,7 +2002,7 @@ heap_restrpos(HeapScanDesc scan) void heap_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; - + if (info == XLOG_HEAP_INSERT) heap_xlog_insert(true, lsn, record); else if (info == XLOG_HEAP_DELETE) @@ -1982,25 +2015,389 @@ void heap_redo(XLogRecPtr lsn, XLogRecord *record) elog(STOP, "heap_redo: unknown op code %u", info); } -void heap_undo(XLogRecPtr lsn, XLogRecord *record) +void heap_xlog_delete(bool redo, XLogRecPtr lsn, XLogRecord *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; - - if (info == XLOG_HEAP_INSERT) - heap_xlog_insert(false, lsn, record); - else if (info == XLOG_HEAP_DELETE) - heap_xlog_delete(false, lsn, record); - else if (info == XLOG_HEAP_UPDATE) - heap_xlog_update(false, lsn, record); - else if (info == XLOG_HEAP_MOVE) - heap_xlog_move(false, lsn, record); - else - elog(STOP, "heap_undo: unknown op code %u", info); + xl_heap_delete *xlrec = (xl_heap_delete*) XLogRecGetData(record); + Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); + + if (!RelationIsValid(reln)) + return; + Buffer buffer = XLogReadBuffer(false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + return; + + Page page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + { + PageInit(page, BufferGetPageSize(buffer), 0); + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + return; + } + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ + { + UnlockAndReleaseBuffer(buffer); + return; + } + } + else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */ + elog(STOP, "heap_delete_undo: bad page LSN"); + + OffsetNumber offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + ItemId lp = PageGetItemId(page, offnum); + + if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) + { + if (redo) + elog(STOP, "heap_delete_redo: unused/deleted target tuple"); + if (!InRecovery) + elog(STOP, "heap_delete_undo: unused/deleted target tuple in rollback"); + if (ItemIdDeleted(lp)) + { + lp->lp_flags &= ~LP_USED; + PageRepairFragmentation(page); + UnlockAndWriteBuffer(buffer); + } + else + UnlockAndReleaseBuffer(buffer); + return; + } + HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp); + + if (redo) + { + htup->t_xmax = record->xl_xid; + htup->t_cmax = xlrec->target.cid; + htup->t_infomask &= ~(HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); + htup->t_infomask |= HEAP_XMAX_COMMITTED; + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + return; + } + + /* undo... is it our tuple ? */ + if (htup->t_xmax != record->xl_xid || htup->t_cmax != xlrec->target.cid) + { + if (!InRecovery) + elog(STOP, "heap_delete_undo: invalid target tuple in rollback"); + UnlockAndReleaseBuffer(buffer); + return; + } + else /* undo DELETE */ + { + htup->t_infomask |= HEAP_XMAX_INVALID; + UnlockAndWriteBuffer(buffer); + return; + } + } void heap_xlog_insert(bool redo, XLogRecPtr lsn, XLogRecord *record) { - xl_heap_insert xlrec = XLogRecGetData(record); + xl_heap_insert *xlrec = (xl_heap_insert*) XLogRecGetData(record); + Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); + + if (!RelationIsValid(reln)) + return; + Buffer buffer = XLogReadBuffer((redo) ? true : false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + return; + + Page page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + { + PageInit(page, BufferGetPageSize(buffer), 0); + if (!redo) + { + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + return; + } + } + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ + { + UnlockAndReleaseBuffer(buffer); + return; + } + + char tbuf[MaxTupleSize]; + HeapTupleHeader htup = (HeapTupleHeader) tbuf; + uint32 newlen = record->xl_len - SizeOfHeapInsert; + + memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits), + (char*)xlrec + SizeOfHeapInsert, newlen); + newlen += offsetof(HeapTupleHeaderData, t_bits); + htup->t_oid = xlrec->t_oid; + htup->t_natts = xlrec->t_natts; + htup->t_hoff = xlrec->t_hoff; + htup->t_xmin = record->xl_xid; + htup->t_cmin = xlrec->target.cid; + htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask; + + PageManagerModeSet(OverwritePageManagerMode); + OffsetNumber offnum = PageAddItem(page, htup, newlen, + ItemPointerGetOffsetNumber(&(xlrec->target.tid)), LP_USED); + PageManagerModeSet(ShufflePageManagerMode); + if (offnum == InvalidOffsetNumber) + elog(STOP, "heap_insert_redo: failed to add tuple"); + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); /* prev sui */ + UnlockAndWriteBuffer(buffer); + return; + } + + /* undo insert */ + if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */ + elog(STOP, "heap_insert_undo: bad page LSN"); + + OffsetNumber offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + ItemId lp = PageGetItemId(page, offnum); + + if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) + { + if (!InRecovery) + elog(STOP, "heap_insert_undo: unused/deleted target tuple in rollback"); + if (ItemIdDeleted(lp)) + { + lp->lp_flags &= ~LP_USED; + PageRepairFragmentation(page); + UnlockAndWriteBuffer(buffer); + } + else + UnlockAndReleaseBuffer(buffer); + return; + } + HeapTupleHeader htup = (HeapTupleHeader) PageGetItem(page, lp); + + /* is it our tuple ? */ + if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) + { + if (!InRecovery) + elog(STOP, "heap_insert_undo: invalid target tuple in rollback"); + UnlockAndReleaseBuffer(buffer); + return; + } + + if (InRecovery || BufferIsUpdatable(buffer)) + { + lp->lp_flags &= ~LP_USED; + PageRepairFragmentation(page); + UnlockAndWriteBuffer(buffer); + } + else /* we can't delete tuple right now */ + { + lp->lp_flags |= LP_DELETE; /* mark for deletion */ + MarkBufferForCleanup(buffer, PageCleanup); + } + +} + +void heap_xlog_update(bool redo, XLogRecPtr lsn, XLogRecord *record) +{ + xl_heap_update *xlrec = (xl_heap_update*) XLogRecGetData(record); + Relation reln = XLogOpenRelation(redo, RM_HEAP_ID, xlrec->target.node); + + if (!RelationIsValid(reln)) + return; + Buffer buffer; + Page page; + OffsetNumber offnum; + ItemId lp; + HeapTupleHeader htup; + + /* + * Currently UPDATE is DELETE + INSERT and so code below are near + * exact sum of code in heap_xlog_delete & heap_xlog_insert. We could + * re-structure code better, but keeping in mind upcoming overwriting + * smgr separate heap_xlog_update code seems to be Good Thing. + */ + + /* Deal with old tuple version */ + + buffer = XLogReadBuffer(false, reln, + ItemPointerGetBlockNumber(&(xlrec->target.tid))); + if (!BufferIsValid(buffer)) + goto newt; + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + { + PageInit(page, BufferGetPageSize(buffer), 0); + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + goto newt; + } + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ + { + UnlockAndReleaseBuffer(buffer); + goto newt; + } + } + else if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */ + elog(STOP, "heap_update_undo: bad old tuple page LSN"); + + offnum = ItemPointerGetOffsetNumber(&(xlrec->target.tid)); + lp = PageGetItemId(page, offnum); + + if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) + { + if (redo) + elog(STOP, "heap_update_redo: unused/deleted old tuple"); + if (!InRecovery) + elog(STOP, "heap_update_undo: unused/deleted old tuple in rollback"); + if (ItemIdDeleted(lp)) + { + lp->lp_flags &= ~LP_USED; + PageRepairFragmentation(page); + UnlockAndWriteBuffer(buffer); + } + else + UnlockAndReleaseBuffer(buffer); + goto newt; + } + htup = (HeapTupleHeader) PageGetItem(page, lp); + + if (redo) + { + htup->t_xmax = record->xl_xid; + htup->t_cmax = xlrec->target.cid; + htup->t_infomask &= ~(HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); + htup->t_infomask |= HEAP_XMAX_COMMITTED; + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + goto newt; + } + + /* undo... is it our tuple ? */ + if (htup->t_xmax != record->xl_xid || htup->t_cmax != xlrec->target.cid) + { + if (!InRecovery) + elog(STOP, "heap_update_undo: invalid old tuple in rollback"); + UnlockAndReleaseBuffer(buffer); + } + else /* undo */ + { + htup->t_infomask |= HEAP_XMAX_INVALID; + UnlockAndWriteBuffer(buffer); + } + + /* Deal with new tuple */ + +newt:; + + buffer = XLogReadBuffer((redo) ? true : false, reln, + ItemPointerGetBlockNumber(&(xlrec->newtid))); + if (!BufferIsValid(buffer)) + return; + + page = (Page) BufferGetPage(buffer); + if (PageIsNew((PageHeader) page)) + { + PageInit(page, BufferGetPageSize(buffer), 0); + if (!redo) + { + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); + UnlockAndWriteBuffer(buffer); + return; + } + } + + if (redo) + { + if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ + { + UnlockAndReleaseBuffer(buffer); + return; + } + + char tbuf[MaxTupleSize]; + uint32 newlen = record->xl_len - SizeOfHeapUpdate; + + htup = (HeapTupleHeader) tbuf; + memcpy(tbuf + offsetof(HeapTupleHeaderData, t_bits), + (char*)xlrec + SizeOfHeapUpdate, newlen); + newlen += offsetof(HeapTupleHeaderData, t_bits); + htup->t_oid = xlrec->t_oid; + htup->t_natts = xlrec->t_natts; + htup->t_hoff = xlrec->t_hoff; + htup->t_xmin = record->xl_xid; + htup->t_cmin = xlrec->target.cid; + htup->t_infomask = HEAP_XMAX_INVALID | HEAP_XMIN_COMMITTED | xlrec->mask; + + PageManagerModeSet(OverwritePageManagerMode); + OffsetNumber offnum = PageAddItem(page, htup, newlen, + ItemPointerGetOffsetNumber(&(xlrec->newtid)), LP_USED); + PageManagerModeSet(ShufflePageManagerMode); + if (offnum == InvalidOffsetNumber) + elog(STOP, "heap_update_redo: failed to add tuple"); + PageSetLSN(page, lsn); + PageSetSUI(page, ThisStartUpID); /* prev sui */ + UnlockAndWriteBuffer(buffer); + return; + } + + /* undo */ + if (XLByteLT(PageGetLSN(page), lsn)) /* changes are not applied ?! */ + elog(STOP, "heap_update_undo: bad new tuple page LSN"); + + offnum = ItemPointerGetOffsetNumber(&(xlrec->newtid)); + lp = PageGetItemId(page, offnum); + + if (!ItemIdIsUsed(lp) || ItemIdDeleted(lp)) + { + if (!InRecovery) + elog(STOP, "heap_update_undo: unused/deleted new tuple in rollback"); + if (ItemIdDeleted(lp)) + { + lp->lp_flags &= ~LP_USED; + PageRepairFragmentation(page); + UnlockAndWriteBuffer(buffer); + } + else + UnlockAndReleaseBuffer(buffer); + return; + } + htup = (HeapTupleHeader) PageGetItem(page, lp); + + /* is it our tuple ? */ + if (htup->t_xmin != record->xl_xid || htup->t_cmin != xlrec->target.cid) + { + if (!InRecovery) + elog(STOP, "heap_update_undo: invalid new tuple in rollback"); + UnlockAndReleaseBuffer(buffer); + return; + } + + if (InRecovery || BufferIsUpdatable(buffer)) + { + lp->lp_flags &= ~LP_USED; + PageRepairFragmentation(page); + UnlockAndWriteBuffer(buffer); + } + else /* we can't delete tuple right now */ + { + lp->lp_flags |= LP_DELETE; /* mark for deletion */ + MarkBufferForCleanup(buffer, PageCleanup); + } + } + #endif /* XLOG */ |