diff options
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r-- | src/backend/access/transam/twophase.c | 45 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 68 |
2 files changed, 110 insertions, 3 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 4dc8ccc12b9..b35da6f1aad 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -205,6 +205,8 @@ static void RecordTransactionCommitPrepared(TransactionId xid, TransactionId *children, int nrels, RelFileNode *rels, + int nstats, + xl_xact_stats_item *stats, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, bool initfileinval, @@ -214,6 +216,8 @@ static void RecordTransactionAbortPrepared(TransactionId xid, TransactionId *children, int nrels, RelFileNode *rels, + int nstats, + xl_xact_stats_item *stats, const char *gid); static void ProcessRecords(char *bufptr, TransactionId xid, const TwoPhaseCallback callbacks[]); @@ -1046,6 +1050,8 @@ StartPrepare(GlobalTransaction gxact) TransactionId *children; RelFileNode *commitrels; RelFileNode *abortrels; + xl_xact_stats_item *abortstats = NULL; + xl_xact_stats_item *commitstats = NULL; SharedInvalidationMessage *invalmsgs; /* Initialize linked list */ @@ -1071,6 +1077,10 @@ StartPrepare(GlobalTransaction gxact) hdr.nsubxacts = xactGetCommittedChildren(&children); hdr.ncommitrels = smgrGetPendingDeletes(true, &commitrels); hdr.nabortrels = smgrGetPendingDeletes(false, &abortrels); + hdr.ncommitstats = + pgstat_get_transactional_drops(true, &commitstats); + hdr.nabortstats = + pgstat_get_transactional_drops(false, &abortstats); hdr.ninvalmsgs = xactGetCommittedInvalidationMessages(&invalmsgs, &hdr.initfileinval); hdr.gidlen = strlen(gxact->gid) + 1; /* Include '\0' */ @@ -1101,6 +1111,18 @@ StartPrepare(GlobalTransaction gxact) save_state_data(abortrels, hdr.nabortrels * sizeof(RelFileNode)); pfree(abortrels); } + if (hdr.ncommitstats > 0) + { + save_state_data(commitstats, + hdr.ncommitstats * sizeof(xl_xact_stats_item)); + pfree(commitstats); + } + if (hdr.nabortstats > 0) + { + save_state_data(abortstats, + hdr.nabortstats * sizeof(xl_xact_stats_item)); + pfree(abortstats); + } if (hdr.ninvalmsgs > 0) { save_state_data(invalmsgs, @@ -1472,6 +1494,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit) RelFileNode *abortrels; RelFileNode *delrels; int ndelrels; + xl_xact_stats_item *commitstats; + xl_xact_stats_item *abortstats; SharedInvalidationMessage *invalmsgs; /* @@ -1506,6 +1530,10 @@ FinishPreparedTransaction(const char *gid, bool isCommit) bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); abortrels = (RelFileNode *) bufptr; bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + commitstats = (xl_xact_stats_item*) bufptr; + bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item)); + abortstats = (xl_xact_stats_item*) bufptr; + bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item)); invalmsgs = (SharedInvalidationMessage *) bufptr; bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); @@ -1527,12 +1555,16 @@ FinishPreparedTransaction(const char *gid, bool isCommit) RecordTransactionCommitPrepared(xid, hdr->nsubxacts, children, hdr->ncommitrels, commitrels, + hdr->ncommitstats, + commitstats, hdr->ninvalmsgs, invalmsgs, hdr->initfileinval, gid); else RecordTransactionAbortPrepared(xid, hdr->nsubxacts, children, hdr->nabortrels, abortrels, + hdr->nabortstats, + abortstats, gid); ProcArrayRemove(proc, latestXid); @@ -1568,6 +1600,11 @@ FinishPreparedTransaction(const char *gid, bool isCommit) /* Make sure files supposed to be dropped are dropped */ DropRelationFiles(delrels, ndelrels, false); + if (isCommit) + pgstat_execute_transactional_drops(hdr->ncommitstats, commitstats, false); + else + pgstat_execute_transactional_drops(hdr->nabortstats, abortstats, false); + /* * Handle cache invalidation messages. * @@ -2066,6 +2103,8 @@ RecoverPreparedTransactions(void) bufptr += MAXALIGN(hdr->nsubxacts * sizeof(TransactionId)); bufptr += MAXALIGN(hdr->ncommitrels * sizeof(RelFileNode)); bufptr += MAXALIGN(hdr->nabortrels * sizeof(RelFileNode)); + bufptr += MAXALIGN(hdr->ncommitstats * sizeof(xl_xact_stats_item)); + bufptr += MAXALIGN(hdr->nabortstats * sizeof(xl_xact_stats_item)); bufptr += MAXALIGN(hdr->ninvalmsgs * sizeof(SharedInvalidationMessage)); /* @@ -2248,6 +2287,8 @@ RecordTransactionCommitPrepared(TransactionId xid, TransactionId *children, int nrels, RelFileNode *rels, + int nstats, + xl_xact_stats_item *stats, int ninvalmsgs, SharedInvalidationMessage *invalmsgs, bool initfileinval, @@ -2277,6 +2318,7 @@ RecordTransactionCommitPrepared(TransactionId xid, */ recptr = XactLogCommitRecord(committs, nchildren, children, nrels, rels, + nstats, stats, ninvalmsgs, invalmsgs, initfileinval, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, @@ -2343,6 +2385,8 @@ RecordTransactionAbortPrepared(TransactionId xid, TransactionId *children, int nrels, RelFileNode *rels, + int nstats, + xl_xact_stats_item *stats, const char *gid) { XLogRecPtr recptr; @@ -2373,6 +2417,7 @@ RecordTransactionAbortPrepared(TransactionId xid, recptr = XactLogAbortRecord(GetCurrentTimestamp(), nchildren, children, nrels, rels, + nstats, stats, MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK, xid, gid); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 3596a7d7345..bf2fc08d940 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -1285,6 +1285,8 @@ RecordTransactionCommit(void) RelFileNode *rels; int nchildren; TransactionId *children; + int ndroppedstats = 0; + xl_xact_stats_item *droppedstats = NULL; int nmsgs = 0; SharedInvalidationMessage *invalMessages = NULL; bool RelcacheInitFileInval = false; @@ -1303,6 +1305,7 @@ RecordTransactionCommit(void) /* Get data needed for commit record */ nrels = smgrGetPendingDeletes(true, &rels); nchildren = xactGetCommittedChildren(&children); + ndroppedstats = pgstat_get_transactional_drops(true, &droppedstats); if (XLogStandbyInfoActive()) nmsgs = xactGetCommittedInvalidationMessages(&invalMessages, &RelcacheInitFileInval); @@ -1317,10 +1320,12 @@ RecordTransactionCommit(void) /* * We expect that every RelationDropStorage is followed by a catalog * update, and hence XID assignment, so we shouldn't get here with any - * pending deletes. Use a real test not just an Assert to check this, - * since it's a bit fragile. + * pending deletes. Same is true for dropping stats. + * + * Use a real test not just an Assert to check this, since it's a bit + * fragile. */ - if (nrels != 0) + if (nrels != 0 || ndroppedstats != 0) elog(ERROR, "cannot commit a transaction that deleted files but has no xid"); /* Can't have child XIDs either; AssignTransactionId enforces this */ @@ -1395,6 +1400,7 @@ RecordTransactionCommit(void) XactLogCommitRecord(xactStopTimestamp, nchildren, children, nrels, rels, + ndroppedstats, droppedstats, nmsgs, invalMessages, RelcacheInitFileInval, MyXactFlags, @@ -1518,6 +1524,8 @@ cleanup: /* Clean up local data */ if (rels) pfree(rels); + if (ndroppedstats) + pfree(droppedstats); return latestXid; } @@ -1698,6 +1706,8 @@ RecordTransactionAbort(bool isSubXact) TransactionId latestXid; int nrels; RelFileNode *rels; + int ndroppedstats = 0; + xl_xact_stats_item *droppedstats = NULL; int nchildren; TransactionId *children; TimestampTz xact_time; @@ -1734,6 +1744,7 @@ RecordTransactionAbort(bool isSubXact) /* Fetch the data we need for the abort record */ nrels = smgrGetPendingDeletes(false, &rels); nchildren = xactGetCommittedChildren(&children); + ndroppedstats = pgstat_get_transactional_drops(false, &droppedstats); /* XXX do we really need a critical section here? */ START_CRIT_SECTION(); @@ -1750,6 +1761,7 @@ RecordTransactionAbort(bool isSubXact) XactLogAbortRecord(xact_time, nchildren, children, nrels, rels, + ndroppedstats, droppedstats, MyXactFlags, InvalidTransactionId, NULL); @@ -1796,6 +1808,8 @@ RecordTransactionAbort(bool isSubXact) /* And clean up local data */ if (rels) pfree(rels); + if (ndroppedstats) + pfree(droppedstats); return latestXid; } @@ -5573,6 +5587,7 @@ XLogRecPtr XactLogCommitRecord(TimestampTz commit_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, + int ndroppedstats, xl_xact_stats_item *droppedstats, int nmsgs, SharedInvalidationMessage *msgs, bool relcacheInval, int xactflags, TransactionId twophase_xid, @@ -5583,6 +5598,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_dbinfo xl_dbinfo; xl_xact_subxacts xl_subxacts; xl_xact_relfilenodes xl_relfilenodes; + xl_xact_stats_items xl_dropped_stats; xl_xact_invals xl_invals; xl_xact_twophase xl_twophase; xl_xact_origin xl_origin; @@ -5640,6 +5656,12 @@ XactLogCommitRecord(TimestampTz commit_time, info |= XLR_SPECIAL_REL_UPDATE; } + if (ndroppedstats > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS; + xl_dropped_stats.nitems = ndroppedstats; + } + if (nmsgs > 0) { xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS; @@ -5696,6 +5718,14 @@ XactLogCommitRecord(TimestampTz commit_time, nrels * sizeof(RelFileNode)); } + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS) + { + XLogRegisterData((char *) (&xl_dropped_stats), + MinSizeOfXactStatsItems); + XLogRegisterData((char *) droppedstats, + ndroppedstats * sizeof(xl_xact_stats_item)); + } + if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS) { XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals); @@ -5729,6 +5759,7 @@ XLogRecPtr XactLogAbortRecord(TimestampTz abort_time, int nsubxacts, TransactionId *subxacts, int nrels, RelFileNode *rels, + int ndroppedstats, xl_xact_stats_item *droppedstats, int xactflags, TransactionId twophase_xid, const char *twophase_gid) { @@ -5736,6 +5767,7 @@ XactLogAbortRecord(TimestampTz abort_time, xl_xact_xinfo xl_xinfo; xl_xact_subxacts xl_subxacts; xl_xact_relfilenodes xl_relfilenodes; + xl_xact_stats_items xl_dropped_stats; xl_xact_twophase xl_twophase; xl_xact_dbinfo xl_dbinfo; xl_xact_origin xl_origin; @@ -5773,6 +5805,12 @@ XactLogAbortRecord(TimestampTz abort_time, info |= XLR_SPECIAL_REL_UPDATE; } + if (ndroppedstats > 0) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS; + xl_dropped_stats.nitems = ndroppedstats; + } + if (TransactionIdIsValid(twophase_xid)) { xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE; @@ -5834,6 +5872,14 @@ XactLogAbortRecord(TimestampTz abort_time, nrels * sizeof(RelFileNode)); } + if (xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS) + { + XLogRegisterData((char *) (&xl_dropped_stats), + MinSizeOfXactStatsItems); + XLogRegisterData((char *) droppedstats, + ndroppedstats * sizeof(xl_xact_stats_item)); + } + if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) { XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); @@ -5967,6 +6013,14 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, DropRelationFiles(parsed->xnodes, parsed->nrels, true); } + if (parsed->nstats > 0) + { + /* see equivalent call for relations above */ + XLogFlush(lsn); + + pgstat_execute_transactional_drops(parsed->nstats, parsed->stats, true); + } + /* * We issue an XLogFlush() for the same reason we emit ForceSyncCommit() * in normal operation. For example, in CREATE DATABASE, we copy all files @@ -6069,6 +6123,14 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid, DropRelationFiles(parsed->xnodes, parsed->nrels, true); } + + if (parsed->nstats > 0) + { + /* see equivalent call for relations above */ + XLogFlush(lsn); + + pgstat_execute_transactional_drops(parsed->nstats, parsed->stats, true); + } } void |