diff options
Diffstat (limited to 'src/backend/access/transam/xact.c')
-rw-r--r-- | src/backend/access/transam/xact.c | 475 |
1 files changed, 293 insertions, 182 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 89769eac072..1495bb499f5 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1068,70 +1068,11 @@ RecordTransactionCommit(void) SetCurrentTransactionStopTimestamp(); - /* - * Do we need the long commit record? If not, use the compact format. - * - * For now always use the non-compact version if wal_level=logical, so - * we can hide commits from other databases. TODO: In the future we - * should merge compact and non-compact commits and use a flags - * variable to determine if it contains subxacts, relations or - * invalidation messages, that's more extensible and degrades more - * gracefully. Till then, it's just 20 bytes of overhead. - */ - if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit || - XLogLogicalInfoActive()) - { - xl_xact_commit xlrec; - - /* - * Set flags required for recovery processing of commits. - */ - xlrec.xinfo = 0; - if (RelcacheInitFileInval) - xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; - if (forceSyncCommit) - xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; - - xlrec.dbId = MyDatabaseId; - xlrec.tsId = MyDatabaseTableSpace; - - xlrec.xact_time = xactStopTimestamp; - xlrec.nrels = nrels; - xlrec.nsubxacts = nchildren; - xlrec.nmsgs = nmsgs; - - XLogBeginInsert(); - XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit); - /* dump rels to delete */ - if (nrels > 0) - XLogRegisterData((char *) rels, - nrels * sizeof(RelFileNode)); - /* dump committed child Xids */ - if (nchildren > 0) - XLogRegisterData((char *) children, - nchildren * sizeof(TransactionId)); - /* dump shared cache invalidation messages */ - if (nmsgs > 0) - XLogRegisterData((char *) invalMessages, - nmsgs * sizeof(SharedInvalidationMessage)); - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT); - } - else - { - xl_xact_commit_compact xlrec; - - xlrec.xact_time = xactStopTimestamp; - xlrec.nsubxacts = nchildren; - - XLogBeginInsert(); - XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact); - /* dump committed child Xids */ - if (nchildren > 0) - XLogRegisterData((char *) children, - nchildren * sizeof(TransactionId)); - - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT); - } + XactLogCommitRecord(xactStopTimestamp, + nchildren, children, nrels, rels, + nmsgs, invalMessages, + RelcacheInitFileInval, forceSyncCommit, + InvalidTransactionId /* plain commit */); } /* @@ -1424,7 +1365,7 @@ RecordTransactionAbort(bool isSubXact) RelFileNode *rels; int nchildren; TransactionId *children; - xl_xact_abort xlrec; + TimestampTz xact_time; /* * If we haven't been assigned an XID, nobody will care whether we aborted @@ -1464,28 +1405,17 @@ RecordTransactionAbort(bool isSubXact) /* Write the ABORT record */ if (isSubXact) - xlrec.xact_time = GetCurrentTimestamp(); + xact_time = GetCurrentTimestamp(); else { SetCurrentTransactionStopTimestamp(); - xlrec.xact_time = xactStopTimestamp; + xact_time = xactStopTimestamp; } - xlrec.nrels = nrels; - xlrec.nsubxacts = nchildren; - XLogBeginInsert(); - XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort); - - /* dump rels to delete */ - if (nrels > 0) - XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode)); - - /* dump committed child Xids */ - if (nchildren > 0) - XLogRegisterData((char *) children, - nchildren * sizeof(TransactionId)); - - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT); + XactLogAbortRecord(xact_time, + nchildren, children, + nrels, rels, + InvalidTransactionId); /* * Report the latest async abort LSN, so that the WAL writer knows to @@ -4659,23 +4589,229 @@ xactGetCommittedChildren(TransactionId **ptr) * XLOG support routines */ + +/* + * Log the commit record for a plain or twophase transaction commit. + * + * A 2pc commit will be emitted when twophase_xid is valid, a plain one + * otherwise. + */ +XLogRecPtr +XactLogCommitRecord(TimestampTz commit_time, + int nsubxacts, TransactionId *subxacts, + int nrels, RelFileNode *rels, + int nmsgs, SharedInvalidationMessage *msgs, + bool relcacheInval, bool forceSync, + TransactionId twophase_xid) +{ + xl_xact_commit xlrec; + xl_xact_xinfo xl_xinfo; + xl_xact_dbinfo xl_dbinfo; + xl_xact_subxacts xl_subxacts; + xl_xact_relfilenodes xl_relfilenodes; + xl_xact_invals xl_invals; + xl_xact_twophase xl_twophase; + + uint8 info; + + Assert(CritSectionCount > 0); + + xl_xinfo.xinfo = 0; + + /* decide between a plain and 2pc commit */ + if (!TransactionIdIsValid(twophase_xid)) + info = XLOG_XACT_COMMIT; + else + info = XLOG_XACT_COMMIT_PREPARED; + + /* First figure out and collect all the information needed */ + + xlrec.xact_time = commit_time; + + if (relcacheInval) + xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE; + if (forceSyncCommit) + xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT; + + /* + * Relcache invalidations requires information about the current database + * and so does logical decoding. + */ + if (nmsgs > 0 || XLogLogicalInfoActive()) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO; + xl_dbinfo.dbId = MyDatabaseId; + xl_dbinfo.tsId = MyDatabaseTableSpace; + } + + if (nsubxacts > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS; + xl_subxacts.nsubxacts = nsubxacts; + } + + if (nrels > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES; + xl_relfilenodes.nrels = nrels; + } + + if (nmsgs > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS; + xl_invals.nmsgs = nmsgs; + } + + if (TransactionIdIsValid(twophase_xid)) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; + xl_twophase.xid = twophase_xid; + } + + if (xl_xinfo.xinfo != 0) + info |= XLOG_XACT_HAS_INFO; + + /* Then include all the collected data into the commit record. */ + + XLogBeginInsert(); + + XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit)); + + if (xl_xinfo.xinfo != 0) + XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo)); + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO) + XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo)); + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) + { + XLogRegisterData((char *) (&xl_subxacts), + MinSizeOfXactSubxacts); + XLogRegisterData((char *) subxacts, + nsubxacts * sizeof(TransactionId)); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES) + { + XLogRegisterData((char *) (&xl_relfilenodes), + MinSizeOfXactRelfilenodes); + XLogRegisterData((char *) rels, + nrels * sizeof(RelFileNode)); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS) + { + XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals); + XLogRegisterData((char *) msgs, + nmsgs * sizeof(SharedInvalidationMessage)); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + + return XLogInsert(RM_XACT_ID, info); +} + +/* + * Log the commit record for a plain or twophase transaction abort. + * + * A 2pc abort will be emitted when twophase_xid is valid, a plain one + * otherwise. + */ +XLogRecPtr +XactLogAbortRecord(TimestampTz abort_time, + int nsubxacts, TransactionId *subxacts, + int nrels, RelFileNode *rels, + TransactionId twophase_xid) +{ + xl_xact_abort xlrec; + xl_xact_xinfo xl_xinfo; + xl_xact_subxacts xl_subxacts; + xl_xact_relfilenodes xl_relfilenodes; + xl_xact_twophase xl_twophase; + + uint8 info; + + Assert(CritSectionCount > 0); + + xl_xinfo.xinfo = 0; + + /* decide between a plain and 2pc abort */ + if (!TransactionIdIsValid(twophase_xid)) + info = XLOG_XACT_ABORT; + else + info = XLOG_XACT_ABORT_PREPARED; + + + /* First figure out and collect all the information needed */ + + xlrec.xact_time = abort_time; + + if (nsubxacts > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS; + xl_subxacts.nsubxacts = nsubxacts; + } + + if (nrels > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILENODES; + xl_relfilenodes.nrels = nrels; + } + + if (TransactionIdIsValid(twophase_xid)) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; + xl_twophase.xid = twophase_xid; + } + + if (xl_xinfo.xinfo != 0) + info |= XLOG_XACT_HAS_INFO; + + /* Then include all the collected data into the abort record. */ + + XLogBeginInsert(); + + XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort); + + if (xl_xinfo.xinfo != 0) + XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo)); + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS) + { + XLogRegisterData((char *) (&xl_subxacts), + MinSizeOfXactSubxacts); + XLogRegisterData((char *) subxacts, + nsubxacts * sizeof(TransactionId)); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILENODES) + { + XLogRegisterData((char *) (&xl_relfilenodes), + MinSizeOfXactRelfilenodes); + XLogRegisterData((char *) rels, + nrels * sizeof(RelFileNode)); + } + + if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) + XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + + return XLogInsert(RM_XACT_ID, info); +} + /* * Before 9.0 this was a fairly short function, but now it performs many * actions for which the order of execution is critical. */ static void -xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, - TimestampTz commit_time, - TransactionId *sub_xids, int nsubxacts, - SharedInvalidationMessage *inval_msgs, int nmsgs, - RelFileNode *xnodes, int nrels, - Oid dbId, Oid tsId, - uint32 xinfo) +xact_redo_commit(xl_xact_parsed_commit *parsed, + TransactionId xid, + XLogRecPtr lsn) { TransactionId max_xid; int i; - max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids); + max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts); /* * Make sure nextXid is beyond any XID mentioned in the record. @@ -4694,15 +4830,16 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, } /* Set the transaction commit timestamp and metadata */ - TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids, - commit_time, InvalidCommitTsNodeId, false); + TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts, + parsed->xact_time, InvalidCommitTsNodeId, + false); if (standbyState == STANDBY_DISABLED) { /* * Mark the transaction committed in pg_clog. */ - TransactionIdCommitTree(xid, nsubxacts, sub_xids); + TransactionIdCommitTree(xid, parsed->nsubxacts, parsed->subxacts); } else { @@ -4726,21 +4863,24 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, * bits set on changes made by transactions that haven't yet * recovered. It's unlikely but it's good to be safe. */ - TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn); + TransactionIdAsyncCommitTree( + xid, parsed->nsubxacts, parsed->subxacts, lsn); /* * We must mark clog before we update the ProcArray. */ - ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid); + ExpireTreeKnownAssignedTransactionIds( + xid, parsed->nsubxacts, parsed->subxacts, max_xid); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as * occurs in CommitTransaction(). */ - ProcessCommittedInvalidationMessages(inval_msgs, nmsgs, - XactCompletionRelcacheInitFileInval(xinfo), - dbId, tsId); + ProcessCommittedInvalidationMessages( + parsed->msgs, parsed->nmsgs, + XactCompletionRelcacheInitFileInval(parsed->xinfo), + parsed->dbId, parsed->tsId); /* * Release locks, if any. We do this for both two phase and normal one @@ -4753,7 +4893,7 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, } /* Make sure files supposed to be dropped are dropped */ - if (nrels > 0) + if (parsed->nrels > 0) { /* * First update minimum recovery point to cover this WAL record. Once @@ -4772,13 +4912,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, */ XLogFlush(lsn); - for (i = 0; i < nrels; i++) + for (i = 0; i < parsed->nrels; i++) { - SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId); + SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) - XLogDropRelation(xnodes[i], fork); + XLogDropRelation(parsed->xnodes[i], fork); smgrdounlink(srel, true); smgrclose(srel); } @@ -4796,52 +4936,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn, * minRecoveryPoint during recovery) helps to reduce that problem window, * for any user that requested ForceSyncCommit(). */ - if (XactCompletionForceSyncCommit(xinfo)) + if (XactCompletionForceSyncCommit(parsed->xinfo)) XLogFlush(lsn); } /* - * Utility function to call xact_redo_commit_internal after breaking down xlrec - */ -static void -xact_redo_commit(xl_xact_commit *xlrec, - TransactionId xid, XLogRecPtr lsn) -{ - TransactionId *subxacts; - SharedInvalidationMessage *inval_msgs; - - /* subxid array follows relfilenodes */ - subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - /* invalidation messages array follows subxids */ - inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); - - xact_redo_commit_internal(xid, lsn, xlrec->xact_time, - subxacts, xlrec->nsubxacts, - inval_msgs, xlrec->nmsgs, - xlrec->xnodes, xlrec->nrels, - xlrec->dbId, - xlrec->tsId, - xlrec->xinfo); -} - -/* - * Utility function to call xact_redo_commit_internal for compact form of message. - */ -static void -xact_redo_commit_compact(xl_xact_commit_compact *xlrec, - TransactionId xid, XLogRecPtr lsn) -{ - xact_redo_commit_internal(xid, lsn, xlrec->xact_time, - xlrec->subxacts, xlrec->nsubxacts, - NULL, 0, /* inval msgs */ - NULL, 0, /* relfilenodes */ - InvalidOid, /* dbId */ - InvalidOid, /* tsId */ - 0); /* xinfo */ -} - -/* * Be careful with the order of execution, as with xact_redo_commit(). * The two functions are similar but differ in key places. * @@ -4851,14 +4951,10 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec, * because subtransaction commit is never WAL logged. */ static void -xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) +xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid) { - TransactionId *sub_xids; - TransactionId max_xid; - int i; - - sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids); + int i; + TransactionId max_xid; /* * Make sure nextXid is beyond any XID mentioned in the record. @@ -4867,6 +4963,10 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) * hold a lock while checking this. We still acquire the lock to modify * it, though. */ + max_xid = TransactionIdLatest(xid, + parsed->nsubxacts, + parsed->subxacts); + if (TransactionIdFollowsOrEquals(max_xid, ShmemVariableCache->nextXid)) { @@ -4879,7 +4979,7 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) if (standbyState == STANDBY_DISABLED) { /* Mark the transaction aborted in pg_clog, no need for async stuff */ - TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids); + TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts); } else { @@ -4895,12 +4995,13 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) RecordKnownAssignedTransactionIds(max_xid); /* Mark the transaction aborted in pg_clog, no need for async stuff */ - TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids); + TransactionIdAbortTree(xid, parsed->nsubxacts, parsed->subxacts); /* * We must update the ProcArray after we have marked clog. */ - ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid); + ExpireTreeKnownAssignedTransactionIds( + xid, parsed->nsubxacts, parsed->subxacts, max_xid); /* * There are no flat files that need updating, nor invalidation @@ -4910,17 +5011,17 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) /* * Release locks, if any. There are no invalidations to send. */ - StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids); + StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts); } /* Make sure files supposed to be dropped are dropped */ - for (i = 0; i < xlrec->nrels; i++) + for (i = 0; i < parsed->nrels; i++) { - SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId); + SMgrRelation srel = smgropen(parsed->xnodes[i], InvalidBackendId); ForkNumber fork; for (fork = 0; fork <= MAX_FORKNUM; fork++) - XLogDropRelation(xlrec->xnodes[i], fork); + XLogDropRelation(parsed->xnodes[i], fork); smgrdounlink(srel, true); smgrclose(srel); } @@ -4929,28 +5030,52 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) void xact_redo(XLogReaderState *record) { - uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK; /* Backup blocks are not used in xact records */ Assert(!XLogRecHasAnyBlockRefs(record)); - if (info == XLOG_XACT_COMMIT_COMPACT) - { - xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record); - - xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr); - } - else if (info == XLOG_XACT_COMMIT) + if (info == XLOG_XACT_COMMIT || info == XLOG_XACT_COMMIT_PREPARED) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); + xl_xact_parsed_commit parsed; + + ParseCommitRecord(XLogRecGetInfo(record), xlrec, + &parsed); - xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr); + if (info == XLOG_XACT_COMMIT) + { + Assert(!TransactionIdIsValid(parsed.twophase_xid)); + xact_redo_commit(&parsed, XLogRecGetXid(record), + record->EndRecPtr); + } + else + { + Assert(TransactionIdIsValid(parsed.twophase_xid)); + xact_redo_commit(&parsed, parsed.twophase_xid, + record->EndRecPtr); + RemoveTwoPhaseFile(parsed.twophase_xid, false); + } } - else if (info == XLOG_XACT_ABORT) + else if (info == XLOG_XACT_ABORT || info == XLOG_XACT_ABORT_PREPARED) { xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); + xl_xact_parsed_abort parsed; + + ParseAbortRecord(XLogRecGetInfo(record), xlrec, + &parsed); - xact_redo_abort(xlrec, XLogRecGetXid(record)); + if (info == XLOG_XACT_ABORT) + { + Assert(!TransactionIdIsValid(parsed.twophase_xid)); + xact_redo_abort(&parsed, XLogRecGetXid(record)); + } + else + { + Assert(TransactionIdIsValid(parsed.twophase_xid)); + xact_redo_abort(&parsed, parsed.twophase_xid); + RemoveTwoPhaseFile(parsed.twophase_xid, false); + } } else if (info == XLOG_XACT_PREPARE) { @@ -4958,20 +5083,6 @@ xact_redo(XLogReaderState *record) RecreateTwoPhaseFile(XLogRecGetXid(record), XLogRecGetData(record), XLogRecGetDataLen(record)); } - else if (info == XLOG_XACT_COMMIT_PREPARED) - { - xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record); - - xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr); - RemoveTwoPhaseFile(xlrec->xid, false); - } - else if (info == XLOG_XACT_ABORT_PREPARED) - { - xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record); - - xact_redo_abort(&xlrec->arec, xlrec->xid); - RemoveTwoPhaseFile(xlrec->xid, false); - } else if (info == XLOG_XACT_ASSIGNMENT) { xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record); |