aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xact.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xact.c')
-rw-r--r--src/backend/access/transam/xact.c640
1 files changed, 515 insertions, 125 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 040a4ab0b79..74163b7f576 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -10,7 +10,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.204 2005/06/06 20:22:57 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xact.c,v 1.205 2005/06/17 22:32:42 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -22,6 +22,7 @@
#include "access/multixact.h"
#include "access/subtrans.h"
+#include "access/twophase.h"
#include "access/xact.h"
#include "catalog/heap.h"
#include "catalog/index.h"
@@ -68,7 +69,8 @@ typedef enum TransState
TRANS_START,
TRANS_INPROGRESS,
TRANS_COMMIT,
- TRANS_ABORT
+ TRANS_ABORT,
+ TRANS_PREPARE
} TransState;
/*
@@ -90,6 +92,7 @@ typedef enum TBlockState
TBLOCK_ABORT, /* failed xact, awaiting ROLLBACK */
TBLOCK_ABORT_END, /* failed xact, ROLLBACK received */
TBLOCK_ABORT_PENDING, /* live xact, ROLLBACK received */
+ TBLOCK_PREPARE, /* live xact, PREPARE received */
/* subtransaction states */
TBLOCK_SUBBEGIN, /* starting a subtransaction */
@@ -172,6 +175,12 @@ static CommandId currentCommandId;
static AbsoluteTime xactStartTime; /* integer part */
static int xactStartTimeUsec; /* microsecond part */
+/*
+ * GID to be used for preparing the current transaction. This is also
+ * global to a whole transaction, so we don't keep it in the state stack.
+ */
+static char *prepareGID;
+
/*
* List of add-on start- and end-of-xact callbacks
@@ -267,10 +276,12 @@ IsTransactionState(void)
return true;
case TRANS_ABORT:
return true;
+ case TRANS_PREPARE:
+ return true;
}
/*
- * Shouldn't get here, but lint is not happy with this...
+ * Shouldn't get here, but lint is not happy without this...
*/
return false;
}
@@ -660,12 +671,12 @@ void
RecordTransactionCommit(void)
{
int nrels;
- RelFileNode *rptr;
+ RelFileNode *rels;
int nchildren;
TransactionId *children;
/* Get data needed for commit record */
- nrels = smgrGetPendingDeletes(true, &rptr);
+ nrels = smgrGetPendingDeletes(true, &rels);
nchildren = xactGetCommittedChildren(&children);
/*
@@ -726,7 +737,7 @@ RecordTransactionCommit(void)
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rptr;
+ rdata[1].data = (char *) rels;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
@@ -809,12 +820,9 @@ RecordTransactionCommit(void)
MyXactMadeXLogEntry = false;
MyXactMadeTempRelUpdate = false;
- /* Show myself as out of the transaction in PGPROC array */
- MyProc->logRec.xrecoff = 0;
-
/* And clean up local data */
- if (rptr)
- pfree(rptr);
+ if (rels)
+ pfree(rels);
if (children)
pfree(children);
}
@@ -970,12 +978,12 @@ static void
RecordTransactionAbort(void)
{
int nrels;
- RelFileNode *rptr;
+ RelFileNode *rels;
int nchildren;
TransactionId *children;
/* Get data needed for abort record */
- nrels = smgrGetPendingDeletes(false, &rptr);
+ nrels = smgrGetPendingDeletes(false, &rels);
nchildren = xactGetCommittedChildren(&children);
/*
@@ -1026,7 +1034,7 @@ RecordTransactionAbort(void)
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rptr;
+ rdata[1].data = (char *) rels;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
@@ -1069,12 +1077,9 @@ RecordTransactionAbort(void)
MyXactMadeXLogEntry = false;
MyXactMadeTempRelUpdate = false;
- /* Show myself as out of the transaction in PGPROC array */
- MyProc->logRec.xrecoff = 0;
-
/* And clean up local data */
- if (rptr)
- pfree(rptr);
+ if (rels)
+ pfree(rels);
if (children)
pfree(children);
}
@@ -1166,13 +1171,13 @@ static void
RecordSubTransactionAbort(void)
{
int nrels;
- RelFileNode *rptr;
+ RelFileNode *rels;
TransactionId xid = GetCurrentTransactionId();
int nchildren;
TransactionId *children;
/* Get data needed for abort record */
- nrels = smgrGetPendingDeletes(false, &rptr);
+ nrels = smgrGetPendingDeletes(false, &rels);
nchildren = xactGetCommittedChildren(&children);
/*
@@ -1212,7 +1217,7 @@ RecordSubTransactionAbort(void)
if (nrels > 0)
{
rdata[0].next = &(rdata[1]);
- rdata[1].data = (char *) rptr;
+ rdata[1].data = (char *) rels;
rdata[1].len = nrels * sizeof(RelFileNode);
rdata[1].buffer = InvalidBuffer;
lastrdata = 1;
@@ -1256,8 +1261,8 @@ RecordSubTransactionAbort(void)
XidCacheRemoveRunningXids(xid, nchildren, children);
/* And clean up local data */
- if (rptr)
- pfree(rptr);
+ if (rels)
+ pfree(rels);
if (children)
pfree(children);
}
@@ -1419,8 +1424,11 @@ StartTransaction(void)
ShowTransactionState("StartTransaction");
}
+
/*
* CommitTransaction
+ *
+ * NB: if you change this routine, better look at PrepareTransaction too!
*/
static void
CommitTransaction(void)
@@ -1510,6 +1518,8 @@ CommitTransaction(void)
* xid 0 as running as well, or it will be able to see two tuple versions
* - one deleted by xid 1 and one inserted by xid 0. See notes in
* GetSnapshotData.
+ *
+ * Note: MyProc may be null during bootstrap.
*----------
*/
if (MyProc != NULL)
@@ -1608,6 +1618,225 @@ CommitTransaction(void)
RESUME_INTERRUPTS();
}
+
+/*
+ * PrepareTransaction
+ *
+ * NB: if you change this routine, better look at CommitTransaction too!
+ */
+static void
+PrepareTransaction(void)
+{
+ TransactionState s = CurrentTransactionState;
+ TransactionId xid = GetCurrentTransactionId();
+ GlobalTransaction gxact;
+
+ ShowTransactionState("PrepareTransaction");
+
+ /*
+ * check the current transaction state
+ */
+ if (s->state != TRANS_INPROGRESS)
+ elog(WARNING, "PrepareTransaction while in %s state",
+ TransStateAsString(s->state));
+ Assert(s->parent == NULL);
+
+ /*
+ * Do pre-commit processing (most of this stuff requires database
+ * access, and in fact could still cause an error...)
+ *
+ * It is possible for PrepareHoldablePortals to invoke functions that
+ * queue deferred triggers, and it's also possible that triggers create
+ * holdable cursors. So we have to loop until there's nothing left to
+ * do.
+ */
+ for (;;)
+ {
+ /*
+ * Fire all currently pending deferred triggers.
+ */
+ AfterTriggerFireDeferred();
+
+ /*
+ * Convert any open holdable cursors into static portals. If there
+ * weren't any, we are done ... otherwise loop back to check if they
+ * queued deferred triggers. Lather, rinse, repeat.
+ */
+ if (!PrepareHoldablePortals())
+ break;
+ }
+
+ /* Now we can shut down the deferred-trigger manager */
+ AfterTriggerEndXact(true);
+
+ /* Close any open regular cursors */
+ AtCommit_Portals();
+
+ /*
+ * Let ON COMMIT management do its thing (must happen after closing
+ * cursors, to avoid dangling-reference problems)
+ */
+ PreCommit_on_commit_actions();
+
+ /* close large objects before lower-level cleanup */
+ AtEOXact_LargeObject(true);
+
+ /* NOTIFY and flatfiles will be handled below */
+
+ /* Prevent cancel/die interrupt while cleaning up */
+ HOLD_INTERRUPTS();
+
+ /*
+ * set the current transaction state information appropriately during
+ * the processing
+ */
+ s->state = TRANS_PREPARE;
+
+ /* Tell bufmgr and smgr to prepare for commit */
+ BufmgrCommit();
+
+ /*
+ * Reserve the GID for this transaction. This could fail if the
+ * requested GID is invalid or already in use.
+ */
+ gxact = MarkAsPreparing(xid, MyDatabaseId, prepareGID, GetUserId());
+ prepareGID = NULL;
+
+ /*
+ * Collect data for the 2PC state file. Note that in general, no actual
+ * state change should happen in the called modules during this step,
+ * since it's still possible to fail before commit, and in that case we
+ * want transaction abort to be able to clean up. (In particular, the
+ * AtPrepare routines may error out if they find cases they cannot
+ * handle.) State cleanup should happen in the PostPrepare routines
+ * below. However, some modules can go ahead and clear state here
+ * because they wouldn't do anything with it during abort anyway.
+ *
+ * Note: because the 2PC state file records will be replayed in the same
+ * order they are made, the order of these calls has to match the order
+ * in which we want things to happen during COMMIT PREPARED or
+ * ROLLBACK PREPARED; in particular, pay attention to whether things
+ * should happen before or after releasing the transaction's locks.
+ */
+ StartPrepare(gxact);
+
+ AtPrepare_Notify();
+ AtPrepare_UpdateFlatFiles();
+ AtPrepare_Inval();
+ AtPrepare_Locks();
+
+ /*
+ * Here is where we really truly prepare.
+ *
+ * We have to record transaction prepares even if we didn't
+ * make any updates, because the transaction manager might
+ * get confused if we lose a global transaction.
+ */
+ EndPrepare(gxact);
+
+ /*
+ * Mark the prepared transaction as valid. As soon as we mark ourselves
+ * not running in MyProc below, others can commit/rollback the xact.
+ *
+ * NB: a side effect of this is to make a dummy ProcArray entry for the
+ * prepared XID. This must happen before we clear the XID from MyProc,
+ * else there is a window where the XID is not running according to
+ * TransactionIdInProgress, and onlookers would be entitled to assume
+ * the xact crashed. Instead we have a window where the same XID
+ * appears twice in ProcArray, which is OK.
+ */
+ MarkAsPrepared(gxact);
+
+ /*
+ * Now we clean up backend-internal state and release internal
+ * resources.
+ */
+
+ /* Break the chain of back-links in the XLOG records I output */
+ MyLastRecPtr.xrecoff = 0;
+ MyXactMadeXLogEntry = false;
+ MyXactMadeTempRelUpdate = false;
+
+ /*
+ * Let others know about no transaction in progress by me. This has
+ * to be done *after* the prepared transaction has been marked valid,
+ * else someone may think it is unlocked and recyclable.
+ */
+
+ /* Lock ProcArrayLock because that's what GetSnapshotData uses. */
+ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+ MyProc->xid = InvalidTransactionId;
+ MyProc->xmin = InvalidTransactionId;
+
+ /* Clear the subtransaction-XID cache too while holding the lock */
+ MyProc->subxids.nxids = 0;
+ MyProc->subxids.overflowed = false;
+
+ LWLockRelease(ProcArrayLock);
+
+ /*
+ * This is all post-transaction cleanup. Note that if an error is raised
+ * here, it's too late to abort the transaction. This should be just
+ * noncritical resource releasing. See notes in CommitTransaction.
+ */
+
+ CallXactCallbacks(XACT_EVENT_PREPARE);
+
+ ResourceOwnerRelease(TopTransactionResourceOwner,
+ RESOURCE_RELEASE_BEFORE_LOCKS,
+ true, true);
+
+ /* Check we've released all buffer pins */
+ AtEOXact_Buffers(true);
+
+ /* notify and flatfiles don't need a postprepare call */
+
+ PostPrepare_Inval();
+
+ PostPrepare_smgr();
+
+ AtEOXact_MultiXact();
+
+ PostPrepare_Locks(xid);
+
+ ResourceOwnerRelease(TopTransactionResourceOwner,
+ RESOURCE_RELEASE_LOCKS,
+ true, true);
+ ResourceOwnerRelease(TopTransactionResourceOwner,
+ RESOURCE_RELEASE_AFTER_LOCKS,
+ true, true);
+
+ /* PREPARE acts the same as COMMIT as far as GUC is concerned */
+ AtEOXact_GUC(true, false);
+ AtEOXact_SPI(true);
+ AtEOXact_on_commit_actions(true);
+ AtEOXact_Namespace(true);
+ /* smgrcommit already done */
+ AtEOXact_Files();
+
+ CurrentResourceOwner = NULL;
+ ResourceOwnerDelete(TopTransactionResourceOwner);
+ s->curTransactionOwner = NULL;
+ CurTransactionResourceOwner = NULL;
+ TopTransactionResourceOwner = NULL;
+
+ AtCommit_Memory();
+
+ s->transactionId = InvalidTransactionId;
+ s->subTransactionId = InvalidSubTransactionId;
+ s->nestingLevel = 0;
+ s->childXids = NIL;
+
+ /*
+ * done with 1st phase commit processing, set current transaction
+ * state back to default
+ */
+ s->state = TRANS_DEFAULT;
+
+ RESUME_INTERRUPTS();
+}
+
+
/*
* AbortTransaction
*/
@@ -1640,7 +1869,7 @@ AbortTransaction(void)
/*
* check the current transaction state
*/
- if (s->state != TRANS_INPROGRESS)
+ if (s->state != TRANS_INPROGRESS && s->state != TRANS_PREPARE)
elog(WARNING, "AbortTransaction while in %s state",
TransStateAsString(s->state));
Assert(s->parent == NULL);
@@ -1833,6 +2062,7 @@ StartTransactionCommand(void)
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(ERROR, "StartTransactionCommand: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -1935,6 +2165,15 @@ CommitTransactionCommand(void)
break;
/*
+ * We are completing a "PREPARE TRANSACTION" command. Do it and
+ * return to the idle state.
+ */
+ case TBLOCK_PREPARE:
+ PrepareTransaction();
+ s->blockState = TBLOCK_DEFAULT;
+ break;
+
+ /*
* We were just issued a SAVEPOINT inside a transaction block.
* Start a subtransaction. (DefineSavepoint already did
* PushTransaction, so as to have someplace to put the
@@ -1964,6 +2203,12 @@ CommitTransactionCommand(void)
CommitTransaction();
s->blockState = TBLOCK_DEFAULT;
}
+ else if (s->blockState == TBLOCK_PREPARE)
+ {
+ Assert(s->parent == NULL);
+ PrepareTransaction();
+ s->blockState = TBLOCK_DEFAULT;
+ }
else
{
Assert(s->blockState == TBLOCK_INPROGRESS ||
@@ -2156,6 +2401,17 @@ AbortCurrentTransaction(void)
break;
/*
+ * Here, we failed while trying to PREPARE. Clean up the
+ * transaction and return to idle state (we do not want to
+ * stay in the transaction).
+ */
+ case TBLOCK_PREPARE:
+ AbortTransaction();
+ CleanupTransaction();
+ s->blockState = TBLOCK_DEFAULT;
+ break;
+
+ /*
* We got an error inside a subtransaction. Abort just the
* subtransaction, and go to the persistent SUBABORT state
* until we get ROLLBACK.
@@ -2487,6 +2743,7 @@ BeginTransactionBlock(void)
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "BeginTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -2494,6 +2751,57 @@ BeginTransactionBlock(void)
}
/*
+ * PrepareTransactionBlock
+ * This executes a PREPARE command.
+ *
+ * Since PREPARE may actually do a ROLLBACK, the result indicates what
+ * happened: TRUE for PREPARE, FALSE for ROLLBACK.
+ *
+ * Note that we don't actually do anything here except change blockState.
+ * The real work will be done in the upcoming PrepareTransaction().
+ * We do it this way because it's not convenient to change memory context,
+ * resource owner, etc while executing inside a Portal.
+ */
+bool
+PrepareTransactionBlock(char *gid)
+{
+ TransactionState s;
+ bool result;
+
+ /* Set up to commit the current transaction */
+ result = EndTransactionBlock();
+
+ /* If successful, change outer tblock state to PREPARE */
+ if (result)
+ {
+ s = CurrentTransactionState;
+
+ while (s->parent != NULL)
+ s = s->parent;
+
+ if (s->blockState == TBLOCK_END)
+ {
+ /* Save GID where PrepareTransaction can find it again */
+ prepareGID = MemoryContextStrdup(TopTransactionContext, gid);
+
+ s->blockState = TBLOCK_PREPARE;
+ }
+ else
+ {
+ /*
+ * ignore case where we are not in a transaction;
+ * EndTransactionBlock already issued a warning.
+ */
+ Assert(s->blockState == TBLOCK_STARTED);
+ /* Don't send back a PREPARE result tag... */
+ result = false;
+ }
+ }
+
+ return result;
+}
+
+/*
* EndTransactionBlock
* This executes a COMMIT command.
*
@@ -2603,6 +2911,7 @@ EndTransactionBlock(void)
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "EndTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -2694,6 +3003,7 @@ UserAbortTransactionBlock(void)
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "UserAbortTransactionBlock: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -2740,6 +3050,7 @@ DefineSavepoint(char *name)
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "DefineSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -2795,6 +3106,7 @@ ReleaseSavepoint(List *options)
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "ReleaseSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -2892,6 +3204,7 @@ RollbackToSavepoint(List *options)
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "RollbackToSavepoint: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -2999,6 +3312,7 @@ BeginInternalSubTransaction(char *name)
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "BeginInternalSubTransaction: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -3064,6 +3378,7 @@ RollbackAndReleaseCurrentSubTransaction(void)
case TBLOCK_SUBABORT_PENDING:
case TBLOCK_SUBRESTART:
case TBLOCK_SUBABORT_RESTART:
+ case TBLOCK_PREPARE:
elog(FATAL, "RollbackAndReleaseCurrentSubTransaction: unexpected state %s",
BlockStateAsString(s->blockState));
break;
@@ -3111,6 +3426,7 @@ AbortOutOfAnyTransaction(void)
case TBLOCK_INPROGRESS:
case TBLOCK_END:
case TBLOCK_ABORT_PENDING:
+ case TBLOCK_PREPARE:
/* In a transaction, so clean up */
AbortTransaction();
CleanupTransaction();
@@ -3202,6 +3518,7 @@ TransactionBlockStatusCode(void)
case TBLOCK_SUBINPROGRESS:
case TBLOCK_END:
case TBLOCK_SUBEND:
+ case TBLOCK_PREPARE:
return 'T'; /* in transaction */
case TBLOCK_ABORT:
case TBLOCK_SUBABORT:
@@ -3684,6 +4001,8 @@ BlockStateAsString(TBlockState blockState)
return "ABORT END";
case TBLOCK_ABORT_PENDING:
return "ABORT PEND";
+ case TBLOCK_PREPARE:
+ return "PREPARE";
case TBLOCK_SUBBEGIN:
return "SUB BEGIN";
case TBLOCK_SUBINPROGRESS:
@@ -3717,12 +4036,14 @@ TransStateAsString(TransState state)
return "DEFAULT";
case TRANS_START:
return "START";
+ case TRANS_INPROGRESS:
+ return "INPROGR";
case TRANS_COMMIT:
return "COMMIT";
case TRANS_ABORT:
return "ABORT";
- case TRANS_INPROGRESS:
- return "INPROGR";
+ case TRANS_PREPARE:
+ return "PREPARE";
}
return "UNRECOGNIZED";
}
@@ -3767,6 +4088,76 @@ xactGetCommittedChildren(TransactionId **ptr)
* XLOG support routines
*/
+static void
+xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid)
+{
+ TransactionId *sub_xids;
+ TransactionId max_xid;
+ int i;
+
+ TransactionIdCommit(xid);
+
+ /* Mark committed subtransactions as committed */
+ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ TransactionIdCommitTree(xlrec->nsubxacts, sub_xids);
+
+ /* Make sure nextXid is beyond any XID mentioned in the record */
+ max_xid = xid;
+ for (i = 0; i < xlrec->nsubxacts; i++)
+ {
+ if (TransactionIdPrecedes(max_xid, sub_xids[i]))
+ max_xid = sub_xids[i];
+ }
+ if (TransactionIdFollowsOrEquals(max_xid,
+ ShmemVariableCache->nextXid))
+ {
+ ShmemVariableCache->nextXid = max_xid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ }
+
+ /* Make sure files supposed to be dropped are dropped */
+ for (i = 0; i < xlrec->nrels; i++)
+ {
+ XLogCloseRelation(xlrec->xnodes[i]);
+ smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
+ }
+}
+
+static void
+xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
+{
+ TransactionId *sub_xids;
+ TransactionId max_xid;
+ int i;
+
+ TransactionIdAbort(xid);
+
+ /* Mark subtransactions as aborted */
+ sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
+ TransactionIdAbortTree(xlrec->nsubxacts, sub_xids);
+
+ /* Make sure nextXid is beyond any XID mentioned in the record */
+ max_xid = xid;
+ for (i = 0; i < xlrec->nsubxacts; i++)
+ {
+ if (TransactionIdPrecedes(max_xid, sub_xids[i]))
+ max_xid = sub_xids[i];
+ }
+ if (TransactionIdFollowsOrEquals(max_xid,
+ ShmemVariableCache->nextXid))
+ {
+ ShmemVariableCache->nextXid = max_xid;
+ TransactionIdAdvance(ShmemVariableCache->nextXid);
+ }
+
+ /* Make sure files supposed to be dropped are dropped */
+ for (i = 0; i < xlrec->nrels; i++)
+ {
+ XLogCloseRelation(xlrec->xnodes[i]);
+ smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
+ }
+}
+
void
xact_redo(XLogRecPtr lsn, XLogRecord *record)
{
@@ -3775,138 +4166,137 @@ xact_redo(XLogRecPtr lsn, XLogRecord *record)
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
- TransactionId *sub_xids;
- TransactionId max_xid;
- int i;
- TransactionIdCommit(record->xl_xid);
+ xact_redo_commit(xlrec, record->xl_xid);
+ }
+ else if (info == XLOG_XACT_ABORT)
+ {
+ xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
- /* Mark committed subtransactions as committed */
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- TransactionIdCommitTree(xlrec->nsubxacts, sub_xids);
+ xact_redo_abort(xlrec, record->xl_xid);
+ }
+ else if (info == XLOG_XACT_PREPARE)
+ {
+ /* the record contents are exactly the 2PC file */
+ RecreateTwoPhaseFile(record->xl_xid,
+ XLogRecGetData(record), record->xl_len);
+ }
+ else if (info == XLOG_XACT_COMMIT_PREPARED)
+ {
+ xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
- /* Make sure nextXid is beyond any XID mentioned in the record */
- max_xid = record->xl_xid;
- for (i = 0; i < xlrec->nsubxacts; i++)
- {
- if (TransactionIdPrecedes(max_xid, sub_xids[i]))
- max_xid = sub_xids[i];
- }
- if (TransactionIdFollowsOrEquals(max_xid,
- ShmemVariableCache->nextXid))
- {
- ShmemVariableCache->nextXid = max_xid;
- TransactionIdAdvance(ShmemVariableCache->nextXid);
- }
+ xact_redo_commit(&xlrec->crec, xlrec->xid);
+ RemoveTwoPhaseFile(xlrec->xid, false);
+ }
+ else if (info == XLOG_XACT_ABORT_PREPARED)
+ {
+ xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
- /* Make sure files supposed to be dropped are dropped */
+ xact_redo_abort(&xlrec->arec, xlrec->xid);
+ RemoveTwoPhaseFile(xlrec->xid, false);
+ }
+ else
+ elog(PANIC, "xact_redo: unknown op code %u", info);
+}
+
+static void
+xact_desc_commit(char *buf, xl_xact_commit *xlrec)
+{
+ struct tm *tm = localtime(&xlrec->xtime);
+ int i;
+
+ sprintf(buf + strlen(buf), "%04u-%02u-%02u %02u:%02u:%02u",
+ tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
+ tm->tm_hour, tm->tm_min, tm->tm_sec);
+ if (xlrec->nrels > 0)
+ {
+ sprintf(buf + strlen(buf), "; rels:");
for (i = 0; i < xlrec->nrels; i++)
{
- XLogCloseRelation(xlrec->xnodes[i]);
- smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
+ RelFileNode rnode = xlrec->xnodes[i];
+
+ sprintf(buf + strlen(buf), " %u/%u/%u",
+ rnode.spcNode, rnode.dbNode, rnode.relNode);
}
}
- else if (info == XLOG_XACT_ABORT)
+ if (xlrec->nsubxacts > 0)
{
- xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
- TransactionId *sub_xids;
- TransactionId max_xid;
- int i;
-
- TransactionIdAbort(record->xl_xid);
-
- /* Mark subtransactions as aborted */
- sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- TransactionIdAbortTree(xlrec->nsubxacts, sub_xids);
+ TransactionId *xacts = (TransactionId *)
+ &xlrec->xnodes[xlrec->nrels];
- /* Make sure nextXid is beyond any XID mentioned in the record */
- max_xid = record->xl_xid;
+ sprintf(buf + strlen(buf), "; subxacts:");
for (i = 0; i < xlrec->nsubxacts; i++)
- {
- if (TransactionIdPrecedes(max_xid, sub_xids[i]))
- max_xid = sub_xids[i];
- }
- if (TransactionIdFollowsOrEquals(max_xid,
- ShmemVariableCache->nextXid))
- {
- ShmemVariableCache->nextXid = max_xid;
- TransactionIdAdvance(ShmemVariableCache->nextXid);
- }
+ sprintf(buf + strlen(buf), " %u", xacts[i]);
+ }
+}
+
+static void
+xact_desc_abort(char *buf, xl_xact_abort *xlrec)
+{
+ struct tm *tm = localtime(&xlrec->xtime);
+ int i;
- /* Make sure files supposed to be dropped are dropped */
+ sprintf(buf + strlen(buf), "%04u-%02u-%02u %02u:%02u:%02u",
+ tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
+ tm->tm_hour, tm->tm_min, tm->tm_sec);
+ if (xlrec->nrels > 0)
+ {
+ sprintf(buf + strlen(buf), "; rels:");
for (i = 0; i < xlrec->nrels; i++)
{
- XLogCloseRelation(xlrec->xnodes[i]);
- smgrdounlink(smgropen(xlrec->xnodes[i]), false, true);
+ RelFileNode rnode = xlrec->xnodes[i];
+
+ sprintf(buf + strlen(buf), " %u/%u/%u",
+ rnode.spcNode, rnode.dbNode, rnode.relNode);
}
}
- else
- elog(PANIC, "xact_redo: unknown op code %u", info);
+ if (xlrec->nsubxacts > 0)
+ {
+ TransactionId *xacts = (TransactionId *)
+ &xlrec->xnodes[xlrec->nrels];
+
+ sprintf(buf + strlen(buf), "; subxacts:");
+ for (i = 0; i < xlrec->nsubxacts; i++)
+ sprintf(buf + strlen(buf), " %u", xacts[i]);
+ }
}
void
xact_desc(char *buf, uint8 xl_info, char *rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
- int i;
if (info == XLOG_XACT_COMMIT)
{
xl_xact_commit *xlrec = (xl_xact_commit *) rec;
- struct tm *tm = localtime(&xlrec->xtime);
-
- sprintf(buf + strlen(buf), "commit: %04u-%02u-%02u %02u:%02u:%02u",
- tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
- tm->tm_hour, tm->tm_min, tm->tm_sec);
- if (xlrec->nrels > 0)
- {
- sprintf(buf + strlen(buf), "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
- {
- RelFileNode rnode = xlrec->xnodes[i];
- sprintf(buf + strlen(buf), " %u/%u/%u",
- rnode.spcNode, rnode.dbNode, rnode.relNode);
- }
- }
- if (xlrec->nsubxacts > 0)
- {
- TransactionId *xacts = (TransactionId *)
- &xlrec->xnodes[xlrec->nrels];
-
- sprintf(buf + strlen(buf), "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- sprintf(buf + strlen(buf), " %u", xacts[i]);
- }
+ strcat(buf, "commit: ");
+ xact_desc_commit(buf, xlrec);
}
else if (info == XLOG_XACT_ABORT)
{
xl_xact_abort *xlrec = (xl_xact_abort *) rec;
- struct tm *tm = localtime(&xlrec->xtime);
- sprintf(buf + strlen(buf), "abort: %04u-%02u-%02u %02u:%02u:%02u",
- tm->tm_year + 1900, tm->tm_mon + 1, tm->tm_mday,
- tm->tm_hour, tm->tm_min, tm->tm_sec);
- if (xlrec->nrels > 0)
- {
- sprintf(buf + strlen(buf), "; rels:");
- for (i = 0; i < xlrec->nrels; i++)
- {
- RelFileNode rnode = xlrec->xnodes[i];
+ strcat(buf, "abort: ");
+ xact_desc_abort(buf, xlrec);
+ }
+ else if (info == XLOG_XACT_PREPARE)
+ {
+ strcat(buf, "prepare");
+ }
+ else if (info == XLOG_XACT_COMMIT_PREPARED)
+ {
+ xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
- sprintf(buf + strlen(buf), " %u/%u/%u",
- rnode.spcNode, rnode.dbNode, rnode.relNode);
- }
- }
- if (xlrec->nsubxacts > 0)
- {
- TransactionId *xacts = (TransactionId *)
- &xlrec->xnodes[xlrec->nrels];
+ sprintf(buf + strlen(buf), "commit %u: ", xlrec->xid);
+ xact_desc_commit(buf, &xlrec->crec);
+ }
+ else if (info == XLOG_XACT_ABORT_PREPARED)
+ {
+ xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
- sprintf(buf + strlen(buf), "; subxacts:");
- for (i = 0; i < xlrec->nsubxacts; i++)
- sprintf(buf + strlen(buf), " %u", xacts[i]);
- }
+ sprintf(buf + strlen(buf), "abort %u: ", xlrec->xid);
+ xact_desc_abort(buf, &xlrec->arec);
}
else
strcat(buf, "UNKNOWN");