aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/rmgrdesc/xactdesc.c10
-rw-r--r--src/backend/access/transam/xact.c17
-rw-r--r--src/backend/replication/logical/decode.c58
-rw-r--r--src/backend/replication/logical/reorderbuffer.c52
-rw-r--r--src/backend/utils/cache/inval.c54
-rw-r--r--src/include/access/xact.h2
-rw-r--r--src/include/access/xlog_internal.h2
-rw-r--r--src/include/replication/reorderbuffer.h3
-rw-r--r--src/include/utils/inval.h2
9 files changed, 166 insertions, 34 deletions
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 9fce75565f4..addd95faec1 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -396,6 +396,13 @@ xact_desc(StringInfo buf, XLogReaderState *record)
appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
xact_desc_assignment(buf, xlrec);
}
+ else if (info == XLOG_XACT_INVALIDATIONS)
+ {
+ xl_xact_invals *xlrec = (xl_xact_invals *) rec;
+
+ standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs, InvalidOid,
+ InvalidOid, false);
+ }
}
const char *
@@ -423,6 +430,9 @@ xact_identify(uint8 info)
case XLOG_XACT_ASSIGNMENT:
id = "ASSIGNMENT";
break;
+ case XLOG_XACT_INVALIDATIONS:
+ id = "INVALIDATION";
+ break;
}
return id;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index bd4c3cf3258..d4f7c29847f 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1224,6 +1224,16 @@ RecordTransactionCommit(void)
bool RelcacheInitFileInval = false;
bool wrote_xlog;
+ /*
+ * Log pending invalidations for logical decoding of in-progress
+ * transactions. Normally for DDLs, we log this at each command end,
+ * however, for certain cases where we directly update the system table
+ * without a transaction block, the invalidations are not logged till this
+ * time.
+ */
+ if (XLogLogicalInfoActive())
+ LogLogicalInvalidations();
+
/* Get data needed for commit record */
nrels = smgrGetPendingDeletes(true, &rels);
nchildren = xactGetCommittedChildren(&children);
@@ -6022,6 +6032,13 @@ xact_redo(XLogReaderState *record)
ProcArrayApplyXidAssignment(xlrec->xtop,
xlrec->nsubxacts, xlrec->xsub);
}
+ else if (info == XLOG_XACT_INVALIDATIONS)
+ {
+ /*
+ * XXX we do ignore this for now, what matters are invalidations
+ * written into the commit record.
+ */
+ }
else
elog(PANIC, "xact_redo: unknown op code %u", info);
}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 0c0c3717391..f3a1c31a292 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -278,10 +278,39 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
/*
* We assign subxact to the toplevel xact while processing each
- * record if required. So, we don't need to do anything here.
- * See LogicalDecodingProcessRecord.
+ * record if required. So, we don't need to do anything here. See
+ * LogicalDecodingProcessRecord.
*/
break;
+ case XLOG_XACT_INVALIDATIONS:
+ {
+ TransactionId xid;
+ xl_xact_invals *invals;
+
+ xid = XLogRecGetXid(r);
+ invals = (xl_xact_invals *) XLogRecGetData(r);
+
+ /*
+ * Execute the invalidations for xid-less transactions,
+ * otherwise, accumulate them so that they can be processed at
+ * the commit time.
+ */
+ if (TransactionIdIsValid(xid))
+ {
+ if (!ctx->fast_forward)
+ ReorderBufferAddInvalidations(reorder, xid,
+ buf->origptr,
+ invals->nmsgs,
+ invals->msgs);
+ ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
+ buf->origptr);
+ }
+ else if ((!ctx->fast_forward))
+ ReorderBufferImmediateInvalidation(ctx->reorder,
+ invals->nmsgs,
+ invals->msgs);
+ }
+ break;
case XLOG_XACT_PREPARE:
/*
@@ -334,15 +363,11 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
case XLOG_STANDBY_LOCK:
break;
case XLOG_INVALIDATIONS:
- {
- xl_invalidations *invalidations =
- (xl_invalidations *) XLogRecGetData(r);
- if (!ctx->fast_forward)
- ReorderBufferImmediateInvalidation(ctx->reorder,
- invalidations->nmsgs,
- invalidations->msgs);
- }
+ /*
+ * We are processing the invalidations at the command level via
+ * XLOG_XACT_INVALIDATIONS. So we don't need to do anything here.
+ */
break;
default:
elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
@@ -573,19 +598,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
commit_time = parsed->origin_timestamp;
}
- /*
- * Process invalidation messages, even if we're not interested in the
- * transaction's contents, since the various caches need to always be
- * consistent.
- */
- if (parsed->nmsgs > 0)
- {
- if (!ctx->fast_forward)
- ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
- parsed->nmsgs, parsed->msgs);
- ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
- }
-
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
parsed->nsubxacts, parsed->subxacts);
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 449327a147f..ce6e62152f0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -856,6 +856,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
subtxn->toplevel_xid = xid;
Assert(subtxn->nsubtxns == 0);
+ /* set the reference to top-level transaction */
+ subtxn->toptxn = txn;
+
/* add to subtransaction list */
dlist_push_tail(&txn->subtxns, &subtxn->node);
txn->nsubtxns++;
@@ -2201,7 +2204,11 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
/*
* Setup the invalidation of the toplevel transaction.
*
- * This needs to be done before ReorderBufferCommit is called!
+ * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
+ * accumulates all the invalidation messages in the toplevel transaction.
+ * This is required because in some cases where we skip processing the
+ * transaction (see ReorderBufferForget), we need to execute all the
+ * invalidations together.
*/
void
ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
@@ -2212,17 +2219,35 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
- if (txn->ninvalidations != 0)
- elog(ERROR, "only ever add one set of invalidations");
+ /*
+ * We collect all the invalidations under the top transaction so that we
+ * can execute them all together.
+ */
+ if (txn->toptxn)
+ txn = txn->toptxn;
Assert(nmsgs > 0);
- txn->ninvalidations = nmsgs;
- txn->invalidations = (SharedInvalidationMessage *)
- MemoryContextAlloc(rb->context,
- sizeof(SharedInvalidationMessage) * nmsgs);
- memcpy(txn->invalidations, msgs,
- sizeof(SharedInvalidationMessage) * nmsgs);
+ /* Accumulate invalidations. */
+ if (txn->ninvalidations == 0)
+ {
+ txn->ninvalidations = nmsgs;
+ txn->invalidations = (SharedInvalidationMessage *)
+ MemoryContextAlloc(rb->context,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+ memcpy(txn->invalidations, msgs,
+ sizeof(SharedInvalidationMessage) * nmsgs);
+ }
+ else
+ {
+ txn->invalidations = (SharedInvalidationMessage *)
+ repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
+ (txn->ninvalidations + nmsgs));
+
+ memcpy(txn->invalidations + txn->ninvalidations, msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ txn->ninvalidations += nmsgs;
+ }
}
/*
@@ -2250,6 +2275,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+
+ /*
+ * Mark top-level transaction as having catalog changes too if one of its
+ * children has so that the ReorderBufferBuildTupleCidHash can
+ * conveniently check just top-level transaction and decide whether to
+ * build the hash table or not.
+ */
+ if (txn->toptxn != NULL)
+ txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
}
/*
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 591dd33be67..628d6f5d0cc 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -85,6 +85,9 @@
* worth trying to avoid sending such inval traffic in the future, if those
* problems can be overcome cheaply.
*
+ * When wal_level=logical, write invalidations into WAL at each command end to
+ * support the decoding of the in-progress transactions. See
+ * CommandEndInvalidationMessages.
*
* Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
@@ -1094,6 +1097,11 @@ CommandEndInvalidationMessages(void)
ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
LocalExecuteInvalidationMessage);
+
+ /* WAL Log per-command invalidation messages for wal_level=logical */
+ if (XLogLogicalInfoActive())
+ LogLogicalInvalidations();
+
AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
&transInvalInfo->CurrentCmdInvalidMsgs);
}
@@ -1501,3 +1509,49 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
i = ccitem->link - 1;
}
}
+
+/*
+ * LogLogicalInvalidations
+ *
+ * Emit WAL for invalidations. This is currently only used for logging
+ * invalidations at the command end or at commit time if any invalidations
+ * are pending.
+ */
+void
+LogLogicalInvalidations()
+{
+ xl_xact_invals xlrec;
+ SharedInvalidationMessage *invalMessages;
+ int nmsgs = 0;
+
+ /* Quick exit if we haven't done anything with invalidation messages. */
+ if (transInvalInfo == NULL)
+ return;
+
+ ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+ MakeSharedInvalidMessagesArray);
+
+ Assert(!(numSharedInvalidMessagesArray > 0 &&
+ SharedInvalidMessagesArray == NULL));
+
+ invalMessages = SharedInvalidMessagesArray;
+ nmsgs = numSharedInvalidMessagesArray;
+ SharedInvalidMessagesArray = NULL;
+ numSharedInvalidMessagesArray = 0;
+
+ if (nmsgs > 0)
+ {
+ /* prepare record */
+ memset(&xlrec, 0, MinSizeOfXactInvals);
+ xlrec.nmsgs = nmsgs;
+
+ /* perform insertion */
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvals);
+ XLogRegisterData((char *) invalMessages,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS);
+
+ pfree(invalMessages);
+ }
+}
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index aef85553674..53480116a46 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -146,7 +146,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
#define XLOG_XACT_COMMIT_PREPARED 0x30
#define XLOG_XACT_ABORT_PREPARED 0x40
#define XLOG_XACT_ASSIGNMENT 0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_INVALIDATIONS 0x60
/* free opcode 0x70 */
/* mask for filtering opcodes out of xl_info */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index b9490a3afef..9b2da56379e 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD107 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD108 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 019bd382de9..1055e99e2e1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -220,6 +220,9 @@ typedef struct ReorderBufferTXN
*/
XLogRecPtr end_lsn;
+ /* Toplevel transaction for this subxact (NULL for top-level). */
+ struct ReorderBufferTXN *toptxn;
+
/*
* LSN of the last lsn at which snapshot information reside, so we can
* restart decoding from there and fully recover this transaction from
diff --git a/src/include/utils/inval.h b/src/include/utils/inval.h
index bc5081cf721..463888c3894 100644
--- a/src/include/utils/inval.h
+++ b/src/include/utils/inval.h
@@ -61,4 +61,6 @@ extern void CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
extern void CallSyscacheCallbacks(int cacheid, uint32 hashvalue);
extern void InvalidateSystemCaches(void);
+
+extern void LogLogicalInvalidations(void);
#endif /* INVAL_H */