aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/reorderbuffer.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2022-08-11 10:09:24 +0530
committerAmit Kapila <akapila@postgresql.org>2022-08-11 10:09:24 +0530
commit7f13ac812313666a2fbb8dacfbee67e78d2ba0bc (patch)
tree91a4282780adb8cb4730509d72dc48f146e1526b /src/backend/replication/logical/reorderbuffer.c
parent37a6e5df3713498a21942dae2ed3122bba5b9f50 (diff)
downloadpostgresql-7f13ac812313666a2fbb8dacfbee67e78d2ba0bc.tar.gz
postgresql-7f13ac812313666a2fbb8dacfbee67e78d2ba0bc.zip
Fix catalog lookup with the wrong snapshot during logical decoding.
Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION records to know if the transaction has modified the catalog, and that information is not serialized to snapshot. Therefore, after the restart, if the logical decoding decodes only the commit record of the transaction that has actually modified a catalog, we will miss adding its XID to the snapshot. Thus, we will end up looking at catalogs with the wrong snapshot. To fix this problem, this change adds the list of transaction IDs and sub-transaction IDs, that have modified catalogs and are running during snapshot serialization, to the serialized snapshot. After restart or otherwise, when we restore from such a serialized snapshot, the corresponding list is restored in memory. Now, when decoding a COMMIT record, we check both the list and the ReorderBuffer to see if the transaction has modified catalogs. Since this adds additional information to the serialized snapshot, we cannot backpatch it. For back branches, we took another approach. We remember the last-running-xacts list of the decoded RUNNING_XACTS record after restoring the previously serialized snapshot. Then, we mark the transaction as containing catalog changes if it's in the list of initial running transactions and its commit record has XACT_XINFO_HAS_INVALS. This doesn't require any file format changes but the transaction will end up being added to the snapshot even if it has only relcache invalidations. But that won't be a problem since we use snapshot built during decoding only to read system catalogs. This commit bumps SNAPBUILD_VERSION because of a change in SnapBuild. Reported-by: Mike Oh Author: Masahiko Sawada Reviewed-by: Amit Kapila, Shi yu, Takamichi Osumi, Kyotaro Horiguchi, Bertrand Drouvot, Ahsan Hadi Backpatch-through: 10 Discussion: https://postgr.es/m/81D0D8B0-E7C4-4999-B616-1E5004DBDCD2%40amazon.com
Diffstat (limited to 'src/backend/replication/logical/reorderbuffer.c')
-rw-r--r--src/backend/replication/logical/reorderbuffer.c71
1 files changed, 66 insertions, 5 deletions
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 88a37fde722..1c21a1d14b6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -349,6 +349,8 @@ ReorderBufferAllocate(void)
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
+ buffer->catchange_ntxns = 0;
+
buffer->outbuf = NULL;
buffer->outbufsize = 0;
buffer->size = 0;
@@ -366,6 +368,7 @@ ReorderBufferAllocate(void)
dlist_init(&buffer->toplevel_by_lsn);
dlist_init(&buffer->txns_by_base_snapshot_lsn);
+ dlist_init(&buffer->catchange_txns);
/*
* Ensure there's no stale data from prior uses of this slot, in case some
@@ -1526,14 +1529,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
- * Remove TXN from its containing list.
+ * Remove TXN from its containing lists.
*
* Note: if txn is known as subxact, we are deleting the TXN from its
* parent's list of known subxacts; this leaves the parent's nsubxacts
* count too high, but we don't care. Otherwise, we are deleting the TXN
- * from the LSN-ordered list of toplevel TXNs.
+ * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
+ * list of catalog modifying transactions as well.
*/
dlist_delete(&txn->node);
+ if (rbtxn_has_catalog_changes(txn))
+ {
+ dlist_delete(&txn->catchange_node);
+ rb->catchange_ntxns--;
+
+ Assert(rb->catchange_ntxns >= 0);
+ }
/* now remove reference from buffer */
hash_search(rb->by_txn,
@@ -3275,10 +3286,16 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn)
{
ReorderBufferTXN *txn;
+ ReorderBufferTXN *toptxn;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
- txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ if (!rbtxn_has_catalog_changes(txn))
+ {
+ txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ dlist_push_tail(&rb->catchange_txns, &txn->catchange_node);
+ rb->catchange_ntxns++;
+ }
/*
* Mark top-level transaction as having catalog changes too if one of its
@@ -3286,8 +3303,52 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
* conveniently check just top-level transaction and decide whether to
* build the hash table or not.
*/
- if (txn->toptxn != NULL)
- txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ toptxn = txn->toptxn;
+ if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
+ {
+ toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ dlist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+ rb->catchange_ntxns++;
+ }
+}
+
+/*
+ * Return palloc'ed array of the transactions that have changed catalogs.
+ * The returned array is sorted in xidComparator order.
+ *
+ * The caller must free the returned array when done with it.
+ */
+TransactionId *
+ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
+{
+ dlist_iter iter;
+ TransactionId *xids = NULL;
+ size_t xcnt = 0;
+
+ /* Quick return if the list is empty */
+ if (rb->catchange_ntxns == 0)
+ {
+ Assert(dlist_is_empty(&rb->catchange_txns));
+ return NULL;
+ }
+
+ /* Initialize XID array */
+ xids = (TransactionId *) palloc(sizeof(TransactionId) * rb->catchange_ntxns);
+ dlist_foreach(iter, &rb->catchange_txns)
+ {
+ ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN,
+ catchange_node,
+ iter.cur);
+
+ Assert(rbtxn_has_catalog_changes(txn));
+
+ xids[xcnt++] = txn->xid;
+ }
+
+ qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
+
+ Assert(xcnt == rb->catchange_ntxns);
+ return xids;
}
/*