aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/transam/twophase.c13
-rw-r--r--src/backend/access/transam/xact.c24
2 files changed, 31 insertions, 6 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 80d2d20d6cc..6023e7c16fb 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2276,6 +2276,14 @@ RecordTransactionAbortPrepared(TransactionId xid,
const char *gid)
{
XLogRecPtr recptr;
+ bool replorigin;
+
+ /*
+ * Are we using the replication origins feature? Or, in other words, are
+ * we replaying remote actions?
+ */
+ replorigin = (replorigin_session_origin != InvalidRepOriginId &&
+ replorigin_session_origin != DoNotReplicateId);
/*
* Catch the scenario where we aborted partway through
@@ -2298,6 +2306,11 @@ RecordTransactionAbortPrepared(TransactionId xid,
MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
xid, gid);
+ if (replorigin)
+ /* Move LSNs forward for this replication origin */
+ replorigin_session_advance(replorigin_session_origin_lsn,
+ XactLastRecEnd);
+
/* Always flush, since we're about to remove the 2PC state file */
XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 4e6a3df6b87..c83aa16f2ce 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -5714,10 +5714,12 @@ XactLogAbortRecord(TimestampTz abort_time,
xl_dbinfo.tsId = MyDatabaseTableSpace;
}
- /* dump transaction origin information only for abort prepared */
+ /*
+ * Dump transaction origin information only for abort prepared. We need
+ * this during recovery to update the replication origin progress.
+ */
if ((replorigin_session_origin != InvalidRepOriginId) &&
- TransactionIdIsValid(twophase_xid) &&
- XLogLogicalInfoActive())
+ TransactionIdIsValid(twophase_xid))
{
xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
@@ -5923,7 +5925,8 @@ xact_redo_commit(xl_xact_parsed_commit *parsed,
* because subtransaction commit is never WAL logged.
*/
static void
-xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
+xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid,
+ XLogRecPtr lsn, RepOriginId origin_id)
{
TransactionId max_xid;
@@ -5972,6 +5975,13 @@ xact_redo_abort(xl_xact_parsed_abort *parsed, TransactionId xid)
StandbyReleaseLockTree(xid, parsed->nsubxacts, parsed->subxacts);
}
+ if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
+ {
+ /* recover apply progress */
+ replorigin_advance(origin_id, parsed->origin_lsn, lsn,
+ false /* backward */ , false /* WAL */ );
+ }
+
/* Make sure files supposed to be dropped are dropped */
DropRelationFiles(parsed->xnodes, parsed->nrels, true);
}
@@ -6013,7 +6023,8 @@ xact_redo(XLogReaderState *record)
xl_xact_parsed_abort parsed;
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
- xact_redo_abort(&parsed, XLogRecGetXid(record));
+ xact_redo_abort(&parsed, XLogRecGetXid(record),
+ record->EndRecPtr, XLogRecGetOrigin(record));
}
else if (info == XLOG_XACT_ABORT_PREPARED)
{
@@ -6021,7 +6032,8 @@ xact_redo(XLogReaderState *record)
xl_xact_parsed_abort parsed;
ParseAbortRecord(XLogRecGetInfo(record), xlrec, &parsed);
- xact_redo_abort(&parsed, parsed.twophase_xid);
+ xact_redo_abort(&parsed, parsed.twophase_xid,
+ record->EndRecPtr, XLogRecGetOrigin(record));
/* Delete TwoPhaseState gxact entry and/or 2PC file. */
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);