aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/Makefile3
-rw-r--r--contrib/test_decoding/expected/oldest_xmin.out27
-rw-r--r--contrib/test_decoding/expected/snapshot_transfer.out49
-rw-r--r--contrib/test_decoding/specs/oldest_xmin.spec37
-rw-r--r--contrib/test_decoding/specs/snapshot_transfer.spec42
-rw-r--r--src/backend/replication/logical/reorderbuffer.c304
-rw-r--r--src/backend/replication/logical/snapbuild.c29
-rw-r--r--src/include/replication/reorderbuffer.h22
8 files changed, 409 insertions, 104 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 1d601d8144c..afcab930f7a 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -50,7 +50,8 @@ regresscheck-install-force: | submake-regress submake-test_decoding temp-install
$(pg_regress_installcheck) \
$(REGRESSCHECKS)
-ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml
+ISOLATIONCHECKS=mxact delayed_startup ondisk_startup concurrent_ddl_dml \
+ oldest_xmin snapshot_transfer
isolationcheck: | submake-isolation submake-test_decoding temp-install
$(pg_isolation_regress_check) \
diff --git a/contrib/test_decoding/expected/oldest_xmin.out b/contrib/test_decoding/expected/oldest_xmin.out
new file mode 100644
index 00000000000..d09342c4bec
--- /dev/null
+++ b/contrib/test_decoding/expected/oldest_xmin.out
@@ -0,0 +1,27 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_getxid s1_begin s1_insert s0_alter s0_commit s0_checkpoint s0_get_changes s1_commit s0_vacuum s0_get_changes
+step s0_begin: BEGIN;
+step s0_getxid: SELECT txid_current() IS NULL;
+?column?
+
+f
+step s1_begin: BEGIN;
+step s1_insert: INSERT INTO harvest VALUES ((1, 2, 3));
+step s0_alter: ALTER TYPE basket DROP ATTRIBUTE mangos;
+step s0_commit: COMMIT;
+step s0_checkpoint: CHECKPOINT;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data
+
+step s1_commit: COMMIT;
+step s0_vacuum: VACUUM FULL;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data
+
+BEGIN
+table public.harvest: INSERT: fruits[basket]:'(1,2,3)'
+COMMIT
+?column?
+
+stop
diff --git a/contrib/test_decoding/expected/snapshot_transfer.out b/contrib/test_decoding/expected/snapshot_transfer.out
new file mode 100644
index 00000000000..87bed03f766
--- /dev/null
+++ b/contrib/test_decoding/expected/snapshot_transfer.out
@@ -0,0 +1,49 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub0 s0_commit s0_get_changes
+step s0_begin: BEGIN;
+step s0_begin_sub0: SAVEPOINT s0;
+step s0_log_assignment: SELECT txid_current() IS NULL;
+?column?
+
+f
+step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0);
+step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int;
+step s0_insert: INSERT INTO harvest VALUES (1, 2, 3);
+step s0_end_sub0: RELEASE SAVEPOINT s0;
+step s0_commit: COMMIT;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data
+
+BEGIN
+table public.dummy: INSERT: i[integer]:0
+table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3
+COMMIT
+?column?
+
+stop
+
+starting permutation: s0_begin s0_begin_sub0 s0_log_assignment s0_begin_sub1 s0_sub_get_base_snap s1_produce_new_snap s0_insert s0_end_sub1 s0_end_sub0 s0_commit s0_get_changes
+step s0_begin: BEGIN;
+step s0_begin_sub0: SAVEPOINT s0;
+step s0_log_assignment: SELECT txid_current() IS NULL;
+?column?
+
+f
+step s0_begin_sub1: SAVEPOINT s1;
+step s0_sub_get_base_snap: INSERT INTO dummy VALUES (0);
+step s1_produce_new_snap: ALTER TABLE harvest ADD COLUMN mangos int;
+step s0_insert: INSERT INTO harvest VALUES (1, 2, 3);
+step s0_end_sub1: RELEASE SAVEPOINT s1;
+step s0_end_sub0: RELEASE SAVEPOINT s0;
+step s0_commit: COMMIT;
+step s0_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+data
+
+BEGIN
+table public.dummy: INSERT: i[integer]:0
+table public.harvest: INSERT: apples[integer]:1 pears[integer]:2 mangos[integer]:3
+COMMIT
+?column?
+
+stop
diff --git a/contrib/test_decoding/specs/oldest_xmin.spec b/contrib/test_decoding/specs/oldest_xmin.spec
new file mode 100644
index 00000000000..4f8af70aa26
--- /dev/null
+++ b/contrib/test_decoding/specs/oldest_xmin.spec
@@ -0,0 +1,37 @@
+# Test advancement of the slot's oldest xmin
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
+ DROP TYPE IF EXISTS basket;
+ CREATE TYPE basket AS (apples integer, pears integer, mangos integer);
+ DROP TABLE IF EXISTS harvest;
+ CREATE TABLE harvest(fruits basket);
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS harvest;
+ DROP TYPE IF EXISTS basket;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+step "s0_begin" { BEGIN; }
+step "s0_getxid" { SELECT txid_current() IS NULL; }
+step "s0_alter" { ALTER TYPE basket DROP ATTRIBUTE mangos; }
+step "s0_commit" { COMMIT; }
+step "s0_checkpoint" { CHECKPOINT; }
+step "s0_vacuum" { VACUUM FULL; }
+step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+session "s1"
+step "s1_begin" { BEGIN; }
+step "s1_insert" { INSERT INTO harvest VALUES ((1, 2, 3)); }
+step "s1_commit" { COMMIT; }
+
+# Checkpoint with following get_changes forces to advance xmin. ALTER of a
+# composite type is a rare form of DDL which allows T1 to see the tuple which
+# will be removed (xmax set) before T1 commits. That is, interlocking doesn't
+# forbid modifying catalog after someone read it (and didn't commit yet).
+permutation "s0_begin" "s0_getxid" "s1_begin" "s1_insert" "s0_alter" "s0_commit" "s0_checkpoint" "s0_get_changes" "s1_commit" "s0_vacuum" "s0_get_changes"
diff --git a/contrib/test_decoding/specs/snapshot_transfer.spec b/contrib/test_decoding/specs/snapshot_transfer.spec
new file mode 100644
index 00000000000..47db7fd90ae
--- /dev/null
+++ b/contrib/test_decoding/specs/snapshot_transfer.spec
@@ -0,0 +1,42 @@
+# Test snapshot transfer from subxact to top-level and receival of later snaps.
+
+setup
+{
+ SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); -- must be first write in xact
+ DROP TABLE IF EXISTS dummy;
+ CREATE TABLE dummy(i int);
+ DROP TABLE IF EXISTS harvest;
+ CREATE TABLE harvest(apples int, pears int);
+}
+
+teardown
+{
+ DROP TABLE IF EXISTS harvest;
+ DROP TABLE IF EXISTS dummy;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+step "s0_begin" { BEGIN; }
+step "s0_begin_sub0" { SAVEPOINT s0; }
+step "s0_log_assignment" { SELECT txid_current() IS NULL; }
+step "s0_begin_sub1" { SAVEPOINT s1; }
+step "s0_sub_get_base_snap" { INSERT INTO dummy VALUES (0); }
+step "s0_insert" { INSERT INTO harvest VALUES (1, 2, 3); }
+step "s0_end_sub0" { RELEASE SAVEPOINT s0; }
+step "s0_end_sub1" { RELEASE SAVEPOINT s1; }
+step "s0_insert2" { INSERT INTO harvest VALUES (1, 2, 3, 4); }
+step "s0_commit" { COMMIT; }
+step "s0_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
+
+session "s1"
+step "s1_produce_new_snap" { ALTER TABLE harvest ADD COLUMN mangos int; }
+
+# start top-level without base snap, get base snap in subxact, then create new
+# snap and make sure it is queued.
+permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub0" "s0_commit" "s0_get_changes"
+
+# In previous test, we firstly associated subxact with xact and only then got
+# base snap; now nest one more subxact to get snap first and only then (at
+# commit) associate it with toplevel.
+permutation "s0_begin" "s0_begin_sub0" "s0_log_assignment" "s0_begin_sub1" "s0_sub_get_base_snap" "s1_produce_new_snap" "s0_insert" "s0_end_sub1" "s0_end_sub0" "s0_commit" "s0_get_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c1447a513b1..5f4aa071310 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -165,6 +165,8 @@ static void ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
TransactionId xid, bool create, bool *is_new,
XLogRecPtr lsn, bool create_as_top);
+static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
+ ReorderBufferTXN *subtxn);
static void AssertTXNLsnOrder(ReorderBuffer *rb);
@@ -271,6 +273,7 @@ ReorderBufferAllocate(void)
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
dlist_init(&buffer->toplevel_by_lsn);
+ dlist_init(&buffer->txns_by_base_snapshot_lsn);
/*
* Ensure there's no stale data from prior uses of this slot, in case some
@@ -462,7 +465,6 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
bool found;
Assert(TransactionIdIsValid(xid));
- Assert(!create || lsn != InvalidXLogRecPtr);
/*
* Check the one-entry lookup cache first
@@ -506,6 +508,7 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
{
/* initialize the new entry, if creation was requested */
Assert(ent != NULL);
+ Assert(lsn != InvalidXLogRecPtr);
ent->txn = ReorderBufferGetTXN(rb);
ent->txn->xid = xid;
@@ -607,43 +610,80 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
}
}
-
+/*
+ * AssertTXNLsnOrder
+ * Verify LSN ordering of transaction lists in the reorderbuffer
+ *
+ * Other LSN-related invariants are checked too.
+ *
+ * No-op if assertions are not in use.
+ */
static void
AssertTXNLsnOrder(ReorderBuffer *rb)
{
#ifdef USE_ASSERT_CHECKING
dlist_iter iter;
XLogRecPtr prev_first_lsn = InvalidXLogRecPtr;
+ XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr;
dlist_foreach(iter, &rb->toplevel_by_lsn)
{
- ReorderBufferTXN *cur_txn;
+ ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
+ iter.cur);
- cur_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
+ /* start LSN must be set */
Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
+ /* If there is an end LSN, it must be higher than start LSN */
if (cur_txn->end_lsn != InvalidXLogRecPtr)
Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
+ /* Current initial LSN must be strictly higher than previous */
if (prev_first_lsn != InvalidXLogRecPtr)
Assert(prev_first_lsn < cur_txn->first_lsn);
+ /* known-as-subtxn txns must not be listed */
Assert(!cur_txn->is_known_as_subxact);
+
prev_first_lsn = cur_txn->first_lsn;
}
+
+ dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
+ {
+ ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
+ base_snapshot_node,
+ iter.cur);
+
+ /* base snapshot (and its LSN) must be set */
+ Assert(cur_txn->base_snapshot != NULL);
+ Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
+
+ /* current LSN must be strictly higher than previous */
+ if (prev_base_snap_lsn != InvalidXLogRecPtr)
+ Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
+
+ /* known-as-subtxn txns must not be listed */
+ Assert(!cur_txn->is_known_as_subxact);
+
+ prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
+ }
#endif
}
+/*
+ * ReorderBufferGetOldestTXN
+ * Return oldest transaction in reorderbuffer
+ */
ReorderBufferTXN *
ReorderBufferGetOldestTXN(ReorderBuffer *rb)
{
ReorderBufferTXN *txn;
+ AssertTXNLsnOrder(rb);
+
if (dlist_is_empty(&rb->toplevel_by_lsn))
return NULL;
- AssertTXNLsnOrder(rb);
-
txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
Assert(!txn->is_known_as_subxact);
@@ -651,12 +691,44 @@ ReorderBufferGetOldestTXN(ReorderBuffer *rb)
return txn;
}
+/*
+ * ReorderBufferGetOldestXmin
+ * Return oldest Xmin in reorderbuffer
+ *
+ * Returns oldest possibly running Xid from the point of view of snapshots
+ * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
+ * there are none.
+ *
+ * Since snapshots are assigned monotonically, this equals the Xmin of the
+ * base snapshot with minimal base_snapshot_lsn.
+ */
+TransactionId
+ReorderBufferGetOldestXmin(ReorderBuffer *rb)
+{
+ ReorderBufferTXN *txn;
+
+ AssertTXNLsnOrder(rb);
+
+ if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
+ return InvalidTransactionId;
+
+ txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
+ &rb->txns_by_base_snapshot_lsn);
+ return txn->base_snapshot->xmin;
+}
+
void
ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
{
rb->current_restart_decoding_lsn = ptr;
}
+/*
+ * ReorderBufferAssignChild
+ *
+ * Make note that we know that subxid is a subtransaction of xid, seen as of
+ * the given lsn.
+ */
void
ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
TransactionId subxid, XLogRecPtr lsn)
@@ -669,32 +741,107 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
- if (new_sub)
+ if (new_top && !new_sub)
+ elog(ERROR, "subtransaction logged without previous top-level txn record");
+
+ if (!new_sub)
{
- /*
- * we assign subtransactions to top level transaction even if we don't
- * have data for it yet, assignment records frequently reference xids
- * that have not yet produced any records. Knowing those aren't top
- * level xids allows us to make processing cheaper in some places.
- */
- dlist_push_tail(&txn->subtxns, &subtxn->node);
- txn->nsubtxns++;
+ if (subtxn->is_known_as_subxact)
+ {
+ /* already associated, nothing to do */
+ return;
+ }
+ else
+ {
+ /*
+ * We already saw this transaction, but initially added it to the list
+ * of top-level txns. Now that we know it's not top-level, remove
+ * it from there.
+ */
+ dlist_delete(&subtxn->node);
+ }
}
- else if (!subtxn->is_known_as_subxact)
- {
- subtxn->is_known_as_subxact = true;
- Assert(subtxn->nsubtxns == 0);
- /* remove from lsn order list of top-level transactions */
- dlist_delete(&subtxn->node);
+ subtxn->is_known_as_subxact = true;
+ subtxn->toplevel_xid = xid;
+ Assert(subtxn->nsubtxns == 0);
- /* add to toplevel transaction */
- dlist_push_tail(&txn->subtxns, &subtxn->node);
- txn->nsubtxns++;
- }
- else if (new_top)
+ /* add to subtransaction list */
+ dlist_push_tail(&txn->subtxns, &subtxn->node);
+ txn->nsubtxns++;
+
+ /* Possibly transfer the subtxn's snapshot to its top-level txn. */
+ ReorderBufferTransferSnapToParent(txn, subtxn);
+
+ /* Verify LSN-ordering invariant */
+ AssertTXNLsnOrder(rb);
+}
+
+/*
+ * ReorderBufferTransferSnapToParent
+ * Transfer base snapshot from subtxn to top-level txn, if needed
+ *
+ * This is done if the top-level txn doesn't have a base snapshot, or if the
+ * subtxn's base snapshot has an earlier LSN than the top-level txn's base
+ * snapshot's LSN. This can happen if there are no changes in the toplevel
+ * txn but there are some in the subtxn, or the first change in subtxn has
+ * earlier LSN than first change in the top-level txn and we learned about
+ * their kinship only now.
+ *
+ * The subtransaction's snapshot is cleared regardless of the transfer
+ * happening, since it's not needed anymore in either case.
+ *
+ * We do this as soon as we become aware of their kinship, to avoid queueing
+ * extra snapshots to txns known-as-subtxns -- only top-level txns will
+ * receive further snapshots.
+ */
+static void
+ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
+ ReorderBufferTXN *subtxn)
+{
+ Assert(subtxn->toplevel_xid == txn->xid);
+
+ if (subtxn->base_snapshot != NULL)
{
- elog(ERROR, "existing subxact assigned to unknown toplevel xact");
+ if (txn->base_snapshot == NULL ||
+ subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
+ {
+ /*
+ * If the toplevel transaction already has a base snapshot but
+ * it's newer than the subxact's, purge it.
+ */
+ if (txn->base_snapshot != NULL)
+ {
+ SnapBuildSnapDecRefcount(txn->base_snapshot);
+ dlist_delete(&txn->base_snapshot_node);
+ }
+
+ /*
+ * The snapshot is now the top transaction's; transfer it, and
+ * adjust the list position of the top transaction in the list by
+ * moving it to where the subtransaction is.
+ */
+ txn->base_snapshot = subtxn->base_snapshot;
+ txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
+ dlist_insert_before(&subtxn->base_snapshot_node,
+ &txn->base_snapshot_node);
+
+ /*
+ * The subtransaction doesn't have a snapshot anymore (so it
+ * mustn't be in the list.)
+ */
+ subtxn->base_snapshot = NULL;
+ subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
+ dlist_delete(&subtxn->base_snapshot_node);
+ }
+ else
+ {
+ /* Base snap of toplevel is fine, so subxact's is not needed */
+ SnapBuildSnapDecRefcount(subtxn->base_snapshot);
+ dlist_delete(&subtxn->base_snapshot_node);
+ subtxn->base_snapshot = NULL;
+ subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
+ }
}
}
@@ -707,7 +854,6 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
TransactionId subxid, XLogRecPtr commit_lsn,
XLogRecPtr end_lsn)
{
- ReorderBufferTXN *txn;
ReorderBufferTXN *subtxn;
subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
@@ -719,42 +865,14 @@ ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
if (!subtxn)
return;
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, true);
-
- if (txn == NULL)
- elog(ERROR, "subxact logged without previous toplevel record");
-
- /*
- * Pass our base snapshot to the parent transaction if it doesn't have
- * one, or ours is older. That can happen if there are no changes in the
- * toplevel transaction but in one of the child transactions. This allows
- * the parent to simply use its base snapshot initially.
- */
- if (subtxn->base_snapshot != NULL &&
- (txn->base_snapshot == NULL ||
- txn->base_snapshot_lsn > subtxn->base_snapshot_lsn))
- {
- txn->base_snapshot = subtxn->base_snapshot;
- txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
- subtxn->base_snapshot = NULL;
- subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
- }
-
subtxn->final_lsn = commit_lsn;
subtxn->end_lsn = end_lsn;
- if (!subtxn->is_known_as_subxact)
- {
- subtxn->is_known_as_subxact = true;
- Assert(subtxn->nsubtxns == 0);
-
- /* remove from lsn order list of top-level transactions */
- dlist_delete(&subtxn->node);
-
- /* add to subtransaction list */
- dlist_push_tail(&txn->subtxns, &subtxn->node);
- txn->nsubtxns++;
- }
+ /*
+ * Assign this subxact as a child of the toplevel xact (no-op if already
+ * done.)
+ */
+ ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
}
@@ -1078,11 +1196,13 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
ReorderBufferReturnChange(rb, change);
}
+ /*
+ * Cleanup the base snapshot, if set.
+ */
if (txn->base_snapshot != NULL)
{
SnapBuildSnapDecRefcount(txn->base_snapshot);
- txn->base_snapshot = NULL;
- txn->base_snapshot_lsn = InvalidXLogRecPtr;
+ dlist_delete(&txn->base_snapshot_node);
}
/*
@@ -1257,17 +1377,17 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
}
/*
- * Perform the replay of a transaction and it's non-aborted subtransactions.
+ * Perform the replay of a transaction and its non-aborted subtransactions.
*
* Subtransactions previously have to be processed by
* ReorderBufferCommitChild(), even if previously assigned to the toplevel
* transaction with ReorderBufferAssignChild.
*
- * We currently can only decode a transaction's contents in when their commit
- * record is read because that's currently the only place where we know about
- * cache invalidations. Thus, once a toplevel commit is read, we iterate over
- * the top and subtransactions (using a k-way merge) and replay the changes in
- * lsn order.
+ * We currently can only decode a transaction's contents when its commit
+ * record is read because that's the only place where we know about cache
+ * invalidations. Thus, once a toplevel commit is read, we iterate over the top
+ * and subtransactions (using a k-way merge) and replay the changes in lsn
+ * order.
*/
void
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
@@ -1295,10 +1415,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
txn->origin_lsn = origin_lsn;
/*
- * If this transaction didn't have any real changes in our database, it's
- * OK not to have a snapshot. Note that ReorderBufferCommitChild will have
- * transferred its snapshot to this transaction if it had one and the
- * toplevel tx didn't.
+ * If this transaction has no snapshot, it didn't make any changes to the
+ * database, so there's nothing to decode. Note that
+ * ReorderBufferCommitChild will have transferred any snapshots from
+ * subtransactions if there were any.
*/
if (txn->base_snapshot == NULL)
{
@@ -1861,12 +1981,10 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
}
/*
- * Setup the base snapshot of a transaction. The base snapshot is the snapshot
- * that is used to decode all changes until either this transaction modifies
- * the catalog or another catalog modifying transaction commits.
+ * Set up the transaction's base snapshot.
*
- * Needs to be called before any changes are added with
- * ReorderBufferQueueChange().
+ * If we know that xid is a subtransaction, set the base snapshot on the
+ * top-level transaction instead.
*/
void
ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
@@ -1875,12 +1993,23 @@ ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
ReorderBufferTXN *txn;
bool is_new;
+ AssertArg(snap != NULL);
+
+ /*
+ * Fetch the transaction to operate on. If we know it's a subtransaction,
+ * operate on its top-level transaction instead.
+ */
txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
+ if (txn->is_known_as_subxact)
+ txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
+ NULL, InvalidXLogRecPtr, false);
Assert(txn->base_snapshot == NULL);
- Assert(snap != NULL);
txn->base_snapshot = snap;
txn->base_snapshot_lsn = lsn;
+ dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
+
+ AssertTXNLsnOrder(rb);
}
/*
@@ -1999,25 +2128,26 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
}
/*
- * Have we already added the first snapshot?
+ * ReorderBufferXidHasBaseSnapshot
+ * Have we already set the base snapshot for the given txn/subtxn?
*/
bool
ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
{
ReorderBufferTXN *txn;
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
+ txn = ReorderBufferTXNByXid(rb, xid, false,
+ NULL, InvalidXLogRecPtr, false);
/* transaction isn't known yet, ergo no snapshot */
if (txn == NULL)
return false;
- /*
- * TODO: It would be a nice improvement if we would check the toplevel
- * transaction in subtransactions, but we'd need to keep track of a bit
- * more state.
- */
+ /* a known subtxn? operate on top-level txn instead */
+ if (txn->is_known_as_subxact)
+ txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
+ NULL, InvalidXLogRecPtr, false);
+
return txn->base_snapshot != NULL;
}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 2c4a1bab4b4..e975faeb8c9 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -830,9 +830,9 @@ SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn)
* all. We'll add a snapshot when the first change gets queued.
*
* NB: This works correctly even for subtransactions because
- * ReorderBufferCommitChild() takes care to pass the parent the base
- * snapshot, and while iterating the changequeue we'll get the change
- * from the subtxn.
+ * ReorderBufferAssignChild() takes care to transfer the base snapshot
+ * to the top-level transaction, and while iterating the changequeue
+ * we'll get the change from the subtxn.
*/
if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
continue;
@@ -1074,7 +1074,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
/* refcount of the snapshot builder for the new snapshot */
SnapBuildSnapIncRefcount(builder->snapshot);
- /* add a new Snapshot to all currently running transactions */
+ /* add a new catalog snapshot to all currently running transactions */
SnapBuildDistributeNewCatalogSnapshot(builder, lsn);
}
}
@@ -1094,6 +1094,7 @@ void
SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
{
ReorderBufferTXN *txn;
+ TransactionId xmin;
/*
* If we're not consistent yet, inspect the record to see whether it
@@ -1126,15 +1127,21 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
/* Remove transactions we don't need to keep track off anymore */
SnapBuildPurgeCommittedTxn(builder);
- elog(DEBUG3, "xmin: %u, xmax: %u, oldestrunning: %u",
- builder->xmin, builder->xmax,
- running->oldestRunningXid);
-
/*
- * Increase shared memory limits, so vacuum can work on tuples we
- * prevented from being pruned till now.
+ * Advance the xmin limit for the current replication slot, to allow
+ * vacuum to clean up the tuples this slot has been protecting.
+ *
+ * The reorderbuffer might have an xmin among the currently running
+ * snapshots; use it if so. If not, we need only consider the snapshots
+ * we'll produce later, which can't be less than the oldest running xid in
+ * the record we're reading now.
*/
- LogicalIncreaseXminForSlot(lsn, running->oldestRunningXid);
+ xmin = ReorderBufferGetOldestXmin(builder->reorder);
+ if (xmin == InvalidTransactionId)
+ xmin = running->oldestRunningXid;
+ elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
+ builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
+ LogicalIncreaseXminForSlot(lsn, xmin);
/*
* Also tell the slot where we can restart decoding from. We don't want to
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1c7982958e0..f8a295bddc8 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -160,10 +160,9 @@ typedef struct ReorderBufferTXN
/* did the TX have catalog changes */
bool has_catalog_changes;
- /*
- * Do we know this is a subxact?
- */
+ /* Do we know this is a subxact? Xid of top-level txn if so */
bool is_known_as_subxact;
+ TransactionId toplevel_xid;
/*
* LSN of the first data carrying, WAL record with knowledge about this
@@ -209,10 +208,13 @@ typedef struct ReorderBufferTXN
TimestampTz commit_time;
/*
- * Base snapshot or NULL.
+ * The base snapshot is used to decode all changes until either this
+ * transaction modifies the catalog, or another catalog-modifying
+ * transaction commits.
*/
Snapshot base_snapshot;
XLogRecPtr base_snapshot_lsn;
+ dlist_node base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
/*
* How many ReorderBufferChange's do we have in this txn.
@@ -279,7 +281,7 @@ typedef struct ReorderBufferTXN
* Position in one of three lists:
* * list of subtransactions if we are *known* to be subxact
* * list of toplevel xacts (can be an as-yet unknown subxact)
- * * list of preallocated ReorderBufferTXNs
+ * * list of preallocated ReorderBufferTXNs (if unused)
* ---
*/
dlist_node node;
@@ -338,6 +340,15 @@ struct ReorderBuffer
dlist_head toplevel_by_lsn;
/*
+ * Transactions and subtransactions that have a base snapshot, ordered by
+ * LSN of the record which caused us to first obtain the base snapshot.
+ * This is not the same as toplevel_by_lsn, because we only set the base
+ * snapshot on the first logical-decoding-relevant record (eg. heap
+ * writes), whereas the initial LSN could be set by other operations.
+ */
+ dlist_head txns_by_base_snapshot_lsn;
+
+ /*
* one-entry sized cache for by_txn. Very frequently the same txn gets
* looked up over and over again.
*/
@@ -422,6 +433,7 @@ bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);
ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
+TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);