diff options
Diffstat (limited to 'src/backend/access/transam/xact.c')
-rw-r--r-- | src/backend/access/transam/xact.c | 76 |
1 files changed, 59 insertions, 17 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 1495bb499f5..511bcbbc519 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -40,8 +40,10 @@ #include "libpq/pqsignal.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logical.h" #include "replication/walsender.h" #include "replication/syncrep.h" +#include "replication/origin.h" #include "storage/fd.h" #include "storage/lmgr.h" #include "storage/predicate.h" @@ -1073,21 +1075,27 @@ RecordTransactionCommit(void) nmsgs, invalMessages, RelcacheInitFileInval, forceSyncCommit, InvalidTransactionId /* plain commit */); - } - /* - * We only need to log the commit timestamp separately if the node - * identifier is a valid value; the commit record above already contains - * the timestamp info otherwise, and will be used to load it. - */ - if (markXidCommitted) - { - CommitTsNodeId node_id; + /* + * Record plain commit ts if not replaying remote actions, or if no + * timestamp is configured. + */ + if (replorigin_sesssion_origin == InvalidRepOriginId || + replorigin_sesssion_origin == DoNotReplicateId || + replorigin_sesssion_origin_timestamp == 0) + replorigin_sesssion_origin_timestamp = xactStopTimestamp; + else + replorigin_session_advance(replorigin_sesssion_origin_lsn, + XactLastRecEnd); - node_id = CommitTsGetDefaultNodeId(); + /* + * We don't need to WAL log origin or timestamp here, the commit + * record contains all the necessary information and will redo the SET + * action during replay. + */ TransactionTreeSetCommitTsData(xid, nchildren, children, - xactStopTimestamp, - node_id, node_id != InvalidCommitTsNodeId); + replorigin_sesssion_origin_timestamp, + replorigin_sesssion_origin, false); } /* @@ -1176,9 +1184,11 @@ RecordTransactionCommit(void) if (wrote_xlog && markXidCommitted) SyncRepWaitForLSN(XactLastRecEnd); + /* remember end of last commit record */ + XactLastCommitEnd = XactLastRecEnd; + /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd = 0; - cleanup: /* Clean up local data */ if (rels) @@ -4611,6 +4621,7 @@ XactLogCommitRecord(TimestampTz commit_time, xl_xact_relfilenodes xl_relfilenodes; xl_xact_invals xl_invals; xl_xact_twophase xl_twophase; + xl_xact_origin xl_origin; uint8 info; @@ -4668,6 +4679,15 @@ XactLogCommitRecord(TimestampTz commit_time, xl_twophase.xid = twophase_xid; } + /* dump transaction origin information */ + if (replorigin_sesssion_origin != InvalidRepOriginId) + { + xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN; + + xl_origin.origin_lsn = replorigin_sesssion_origin_lsn; + xl_origin.origin_timestamp = replorigin_sesssion_origin_timestamp; + } + if (xl_xinfo.xinfo != 0) info |= XLOG_XACT_HAS_INFO; @@ -4709,6 +4729,12 @@ XactLogCommitRecord(TimestampTz commit_time, if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE) XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase)); + if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN) + XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin)); + + /* we allow filtering by xacts */ + XLogIncludeOrigin(); + return XLogInsert(RM_XACT_ID, info); } @@ -4806,10 +4832,12 @@ XactLogAbortRecord(TimestampTz abort_time, static void xact_redo_commit(xl_xact_parsed_commit *parsed, TransactionId xid, - XLogRecPtr lsn) + XLogRecPtr lsn, + RepOriginId origin_id) { TransactionId max_xid; int i; + TimestampTz commit_time; max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts); @@ -4829,9 +4857,16 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, LWLockRelease(XidGenLock); } + Assert(!!(parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == (origin_id != InvalidRepOriginId)); + + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + commit_time = parsed->origin_timestamp; + else + commit_time = parsed->xact_time; + /* Set the transaction commit timestamp and metadata */ TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts, - parsed->xact_time, InvalidCommitTsNodeId, + commit_time, origin_id, false); if (standbyState == STANDBY_DISABLED) @@ -4892,6 +4927,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed, StandbyReleaseLockTree(xid, 0, NULL); } + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + /* recover apply progress */ + replorigin_advance(origin_id, parsed->origin_lsn, lsn, + false /* backward */, false /* WAL */); + } + /* Make sure files supposed to be dropped are dropped */ if (parsed->nrels > 0) { @@ -5047,13 +5089,13 @@ xact_redo(XLogReaderState *record) { Assert(!TransactionIdIsValid(parsed.twophase_xid)); xact_redo_commit(&parsed, XLogRecGetXid(record), - record->EndRecPtr); + record->EndRecPtr, XLogRecGetOrigin(record)); } else { Assert(TransactionIdIsValid(parsed.twophase_xid)); xact_redo_commit(&parsed, parsed.twophase_xid, - record->EndRecPtr); + record->EndRecPtr, XLogRecGetOrigin(record)); RemoveTwoPhaseFile(parsed.twophase_xid, false); } } |