aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xact.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xact.c')
-rw-r--r--src/backend/access/transam/xact.c76
1 files changed, 59 insertions, 17 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 1495bb499f5..511bcbbc519 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -40,8 +40,10 @@
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/logical.h"
#include "replication/walsender.h"
#include "replication/syncrep.h"
+#include "replication/origin.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -1073,21 +1075,27 @@ RecordTransactionCommit(void)
nmsgs, invalMessages,
RelcacheInitFileInval, forceSyncCommit,
InvalidTransactionId /* plain commit */);
- }
- /*
- * We only need to log the commit timestamp separately if the node
- * identifier is a valid value; the commit record above already contains
- * the timestamp info otherwise, and will be used to load it.
- */
- if (markXidCommitted)
- {
- CommitTsNodeId node_id;
+ /*
+ * Record plain commit ts if not replaying remote actions, or if no
+ * timestamp is configured.
+ */
+ if (replorigin_sesssion_origin == InvalidRepOriginId ||
+ replorigin_sesssion_origin == DoNotReplicateId ||
+ replorigin_sesssion_origin_timestamp == 0)
+ replorigin_sesssion_origin_timestamp = xactStopTimestamp;
+ else
+ replorigin_session_advance(replorigin_sesssion_origin_lsn,
+ XactLastRecEnd);
- node_id = CommitTsGetDefaultNodeId();
+ /*
+ * We don't need to WAL log origin or timestamp here, the commit
+ * record contains all the necessary information and will redo the SET
+ * action during replay.
+ */
TransactionTreeSetCommitTsData(xid, nchildren, children,
- xactStopTimestamp,
- node_id, node_id != InvalidCommitTsNodeId);
+ replorigin_sesssion_origin_timestamp,
+ replorigin_sesssion_origin, false);
}
/*
@@ -1176,9 +1184,11 @@ RecordTransactionCommit(void)
if (wrote_xlog && markXidCommitted)
SyncRepWaitForLSN(XactLastRecEnd);
+ /* remember end of last commit record */
+ XactLastCommitEnd = XactLastRecEnd;
+
/* Reset XactLastRecEnd until the next transaction writes something */
XactLastRecEnd = 0;
-
cleanup:
/* Clean up local data */
if (rels)
@@ -4611,6 +4621,7 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_xact_relfilenodes xl_relfilenodes;
xl_xact_invals xl_invals;
xl_xact_twophase xl_twophase;
+ xl_xact_origin xl_origin;
uint8 info;
@@ -4668,6 +4679,15 @@ XactLogCommitRecord(TimestampTz commit_time,
xl_twophase.xid = twophase_xid;
}
+ /* dump transaction origin information */
+ if (replorigin_sesssion_origin != InvalidRepOriginId)
+ {
+ xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+
+ xl_origin.origin_lsn = replorigin_sesssion_origin_lsn;
+ xl_origin.origin_timestamp = replorigin_sesssion_origin_timestamp;
+ }
+
if (xl_xinfo.xinfo != 0)
info |= XLOG_XACT_HAS_INFO;
@@ -4709,6 +4729,12 @@ XactLogCommitRecord(TimestampTz commit_time,
if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
+ if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+ XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+
+ /* we allow filtering by xacts */
+ XLogIncludeOrigin();
+
return XLogInsert(RM_XACT_ID, info);
}
@@ -4806,10 +4832,12 @@ XactLogAbortRecord(TimestampTz abort_time,
static void
xact_redo_commit(xl_xact_parsed_commit *parsed,
TransactionId xid,
- XLogRecPtr lsn)
+ XLogRecPtr lsn,
+ RepOriginId origin_id)
{
TransactionId max_xid;
int i;
+ TimestampTz commit_time;
max_xid = TransactionIdLatest(xid, parsed->nsubxacts, parsed->subxacts);
@@ -4829,9 +4857,16 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
LWLockRelease(XidGenLock);
}
+ Assert(!!(parsed->xinfo & XACT_XINFO_HAS_ORIGIN) == (origin_id != InvalidRepOriginId));
+
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ commit_time = parsed->origin_timestamp;
+ else
+ commit_time = parsed->xact_time;
+
/* Set the transaction commit timestamp and metadata */
TransactionTreeSetCommitTsData(xid, parsed->nsubxacts, parsed->subxacts,
- parsed->xact_time, InvalidCommitTsNodeId,
+ commit_time, origin_id,
false);
if (standbyState == STANDBY_DISABLED)
@@ -4892,6 +4927,13 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
StandbyReleaseLockTree(xid, 0, NULL);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ /* recover apply progress */
+ replorigin_advance(origin_id, parsed->origin_lsn, lsn,
+ false /* backward */, false /* WAL */);
+ }
+
/* Make sure files supposed to be dropped are dropped */
if (parsed->nrels > 0)
{
@@ -5047,13 +5089,13 @@ xact_redo(XLogReaderState *record)
{
Assert(!TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, XLogRecGetXid(record),
- record->EndRecPtr);
+ record->EndRecPtr, XLogRecGetOrigin(record));
}
else
{
Assert(TransactionIdIsValid(parsed.twophase_xid));
xact_redo_commit(&parsed, parsed.twophase_xid,
- record->EndRecPtr);
+ record->EndRecPtr, XLogRecGetOrigin(record));
RemoveTwoPhaseFile(parsed.twophase_xid, false);
}
}