aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/logical/decode.c9
-rw-r--r--src/backend/replication/logical/reorderbuffer.c53
2 files changed, 42 insertions, 20 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 0cdb0b8a92b..0c248f07e8f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -327,6 +327,15 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_STANDBY_LOCK:
break;
+ case XLOG_INVALIDATIONS:
+ {
+ xl_invalidations *invalidations =
+ (xl_invalidations *) XLogRecGetData(r);
+
+ ReorderBufferImmediateInvalidation(
+ ctx->reorder, invalidations->nmsgs, invalidations->msgs);
+ }
+ break;
default:
elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 45207086ac0..57821c34027 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1810,26 +1810,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
* catalog and we need to update the caches according to that.
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
- {
- bool use_subtxn = IsTransactionOrTransactionBlock();
-
- if (use_subtxn)
- BeginInternalSubTransaction("replay");
-
- /*
- * Force invalidations to happen outside of a valid transaction - that
- * way entries will just be marked as invalid without accessing the
- * catalog. That's advantageous because we don't need to setup the
- * full state necessary for catalog access.
- */
- if (use_subtxn)
- AbortCurrentTransaction();
-
- ReorderBufferExecuteInvalidations(rb, txn);
-
- if (use_subtxn)
- RollbackAndReleaseCurrentSubTransaction();
- }
+ ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+ txn->invalidations);
else
Assert(txn->ninvalidations == 0);
@@ -1837,6 +1819,37 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
ReorderBufferCleanupTXN(rb, txn);
}
+/*
+ * Execute invalidations happening outside the context of a decoded
+ * transaction. That currently happens either for xid-less commits
+ * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
+ * transactions (via ReorderBufferForget()).
+ */
+void
+ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
+ SharedInvalidationMessage *invalidations)
+{
+ bool use_subtxn = IsTransactionOrTransactionBlock();
+ int i;
+
+ if (use_subtxn)
+ BeginInternalSubTransaction("replay");
+
+ /*
+ * Force invalidations to happen outside of a valid transaction - that
+ * way entries will just be marked as invalid without accessing the
+ * catalog. That's advantageous because we don't need to setup the
+ * full state necessary for catalog access.
+ */
+ if (use_subtxn)
+ AbortCurrentTransaction();
+
+ for (i = 0; i < ninvalidations; i++)
+ LocalExecuteInvalidationMessage(&invalidations[i]);
+
+ if (use_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+}
/*
* Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at