diff options
Diffstat (limited to 'src/backend/replication/logical/snapbuild.c')
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 29 |
1 files changed, 18 insertions, 11 deletions
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 2c4a1bab4b4..e975faeb8c9 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -830,9 +830,9 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn) * all. We'll add a snapshot when the first change gets queued. * * NB: This works correctly even for subtransactions because - * ReorderBufferCommitChild() takes care to pass the parent the base - * snapshot, and while iterating the changequeue we'll get the change - * from the subtxn. + * ReorderBufferAssignChild() takes care to transfer the base snapshot + * to the top-level transaction, and while iterating the changequeue + * we'll get the change from the subtxn. */ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid)) continue; @@ -1074,7 +1074,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, /* refcount of the snapshot builder for the new snapshot */ SnapBuildSnapIncRefcount(builder->snapshot); - /* add a new Snapshot to all currently running transactions */ + /* add a new catalog snapshot to all currently running transactions */ SnapBuildDistributeNewCatalogSnapshot(builder, lsn); } } @@ -1094,6 +1094,7 @@ void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) { ReorderBufferTXN *txn; + TransactionId xmin; /* * If we're not consistent yet, inspect the record to see whether it @@ -1126,15 +1127,21 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact /* Remove transactions we don't need to keep track off anymore */ SnapBuildPurgeCommittedTxn(builder); - elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u", - builder->xmin, builder->xmax, - running->oldestRunningXid); - /* - * Increase shared memory limits, so vacuum can work on tuples we - * prevented from being pruned till now. + * Advance the xmin limit for the current replication slot, to allow + * vacuum to clean up the tuples this slot has been protecting. + * + * The reorderbuffer might have an xmin among the currently running + * snapshots; use it if so. If not, we need only consider the snapshots + * we'll produce later, which can't be less than the oldest running xid in + * the record we're reading now. */ - LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid); + xmin = ReorderBufferGetOldestXmin(builder->reorder); + if (xmin == InvalidTransactionId) + xmin = running->oldestRunningXid; + elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u", + builder->xmin, builder->xmax, running->oldestRunningXid, xmin); + LogicalIncreaseXminForSlot(lsn, xmin); /* * Also tell the slot where we can restart decoding from. We don't want to |