aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/snapbuild.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/snapbuild.c')
-rw-r--r--src/backend/replication/logical/snapbuild.c29
1 files changed, 18 insertions, 11 deletions
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 2c4a1bab4b4..e975faeb8c9 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -830,9 +830,9 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
* all. We'll add a snapshot when the first change gets queued.
*
* NB: This works correctly even for subtransactions because
- * ReorderBufferCommitChild() takes care to pass the parent the base
- * snapshot, and while iterating the changequeue we'll get the change
- * from the subtxn.
+ * ReorderBufferAssignChild() takes care to transfer the base snapshot
+ * to the top-level transaction, and while iterating the changequeue
+ * we'll get the change from the subtxn.
*/
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
continue;
@@ -1074,7 +1074,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
/* refcount of the snapshot builder for the new snapshot */
SnapBuildSnapIncRefcount(builder->snapshot);
- /* add a new Snapshot to all currently running transactions */
+ /* add a new catalog snapshot to all currently running transactions */
SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
}
}
@@ -1094,6 +1094,7 @@ void
SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
{
ReorderBufferTXN *txn;
+ TransactionId xmin;
/*
* If we're not consistent yet, inspect the record to see whether it
@@ -1126,15 +1127,21 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
/* Remove transactions we don't need to keep track off anymore */
SnapBuildPurgeCommittedTxn(builder);
- elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
- builder->xmin, builder->xmax,
- running->oldestRunningXid);
-
/*
- * Increase shared memory limits, so vacuum can work on tuples we
- * prevented from being pruned till now.
+ * Advance the xmin limit for the current replication slot, to allow
+ * vacuum to clean up the tuples this slot has been protecting.
+ *
+ * The reorderbuffer might have an xmin among the currently running
+ * snapshots; use it if so. If not, we need only consider the snapshots
+ * we'll produce later, which can't be less than the oldest running xid in
+ * the record we're reading now.
*/
- LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
+ xmin = ReorderBufferGetOldestXmin(builder->reorder);
+ if (xmin == InvalidTransactionId)
+ xmin = running->oldestRunningXid;
+ elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
+ builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
+ LogicalIncreaseXminForSlot(lsn, xmin);
/*
* Also tell the slot where we can restart decoding from. We don't want to