aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/access/rmgrdesc/standbydesc.c54
-rw-r--r--src/backend/access/rmgrdesc/xactdesc.c30
-rw-r--r--src/backend/access/transam/xact.c18
-rw-r--r--src/backend/replication/logical/decode.c9
-rw-r--r--src/backend/replication/logical/reorderbuffer.c53
-rw-r--r--src/backend/storage/ipc/standby.c35
-rw-r--r--src/backend/utils/cache/inval.c5
-rw-r--r--src/include/replication/reorderbuffer.h2
-rw-r--r--src/include/storage/standby.h2
-rw-r--r--src/include/storage/standbydefs.h21
10 files changed, 181 insertions, 48 deletions
diff --git a/src/backend/access/rmgrdesc/standbydesc.c b/src/backend/access/rmgrdesc/standbydesc.c
index 4872cfb2d96..e6172ccdf73 100644
--- a/src/backend/access/rmgrdesc/standbydesc.c
+++ b/src/backend/access/rmgrdesc/standbydesc.c
@@ -58,6 +58,14 @@ standby_desc(StringInfo buf, XLogReaderState *record)
standby_desc_running_xacts(buf, xlrec);
}
+ else if (info == XLOG_INVALIDATIONS)
+ {
+ xl_invalidations *xlrec = (xl_invalidations *) rec;
+
+ standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs,
+ xlrec->dbId, xlrec->tsId,
+ xlrec->relcacheInitFileInval);
+ }
}
const char *
@@ -73,7 +81,53 @@ standby_identify(uint8 info)
case XLOG_RUNNING_XACTS:
id = "RUNNING_XACTS";
break;
+ case XLOG_INVALIDATIONS:
+ id = "INVALIDATIONS";
+ break;
}
return id;
}
+
+/*
+ * This routine is used by both standby_desc and xact_desc, because
+ * transaction commits and XLOG_INVALIDATIONS messages contain invalidations;
+ * it seems pointless to duplicate the code.
+ */
+void
+standby_desc_invalidations(StringInfo buf,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ Oid dbId, Oid tsId,
+ bool relcacheInitFileInval)
+{
+ int i;
+
+ if (relcacheInitFileInval)
+ appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
+ dbId, tsId);
+
+ appendStringInfoString(buf, "; inval msgs:");
+ for (i = 0; i < nmsgs; i++)
+ {
+ SharedInvalidationMessage *msg = &msgs[i];
+
+ if (msg->id >= 0)
+ appendStringInfo(buf, " catcache %d", msg->id);
+ else if (msg->id == SHAREDINVALCATALOG_ID)
+ appendStringInfo(buf, " catalog %u", msg->cat.catId);
+ else if (msg->id == SHAREDINVALRELCACHE_ID)
+ appendStringInfo(buf, " relcache %u", msg->rc.relId);
+ /* not expected, but print something anyway */
+ else if (msg->id == SHAREDINVALSMGR_ID)
+ appendStringInfoString(buf, " smgr");
+ /* not expected, but print something anyway */
+ else if (msg->id == SHAREDINVALRELMAP_ID)
+ appendStringInfoString(buf, " relmap");
+ else if (msg->id == SHAREDINVALRELMAP_ID)
+ appendStringInfo(buf, " relmap db %u", msg->rm.dbId);
+ else if (msg->id == SHAREDINVALSNAPSHOT_ID)
+ appendStringInfo(buf, " snapshot %u", msg->sn.relId);
+ else
+ appendStringInfo(buf, " unknown id %d", msg->id);
+ }
+}
diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index e8a334c17db..6f07c5cfaac 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -18,6 +18,7 @@
#include "access/xact.h"
#include "catalog/catalog.h"
#include "storage/sinval.h"
+#include "storage/standbydefs.h"
#include "utils/timestamp.h"
/*
@@ -203,32 +204,9 @@ xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec, RepOriginId
}
if (parsed.nmsgs > 0)
{
- if (XactCompletionRelcacheInitFileInval(parsed.xinfo))
- appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
- parsed.dbId, parsed.tsId);
-
- appendStringInfoString(buf, "; inval msgs:");
- for (i = 0; i < parsed.nmsgs; i++)
- {
- SharedInvalidationMessage *msg = &parsed.msgs[i];
-
- if (msg->id >= 0)
- appendStringInfo(buf, " catcache %d", msg->id);
- else if (msg->id == SHAREDINVALCATALOG_ID)
- appendStringInfo(buf, " catalog %u", msg->cat.catId);
- else if (msg->id == SHAREDINVALRELCACHE_ID)
- appendStringInfo(buf, " relcache %u", msg->rc.relId);
- /* not expected, but print something anyway */
- else if (msg->id == SHAREDINVALSMGR_ID)
- appendStringInfoString(buf, " smgr");
- /* not expected, but print something anyway */
- else if (msg->id == SHAREDINVALRELMAP_ID)
- appendStringInfoString(buf, " relmap");
- else if (msg->id == SHAREDINVALSNAPSHOT_ID)
- appendStringInfo(buf, " snapshot %u", msg->sn.relId);
- else
- appendStringInfo(buf, " unknown id %d", msg->id);
- }
+ standby_desc_invalidations(
+ buf, parsed.nmsgs, parsed.msgs, parsed.dbId, parsed.tsId,
+ XactCompletionRelcacheInitFileInval(parsed.xinfo));
}
if (XactCompletionForceSyncCommit(parsed.xinfo))
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 7e373316139..95690ff36cb 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1164,6 +1164,24 @@ RecordTransactionCommit(void)
Assert(nchildren == 0);
/*
+ * Transactions without an assigned xid can contain invalidation
+ * messages (e.g. explicit relcache invalidations or catcache
+ * invalidations for inplace updates); standbys need to process
+ * those. We can't emit a commit record without an xid, and we don't
+ * want to force assigning an xid, because that'd be problematic for
+ * e.g. vacuum. Hence we emit a bespoke record for the
+ * invalidations. We don't want to use that in case a commit record is
+ * emitted, so they happen synchronously with commits (besides not
+ * wanting to emit more WAL recoreds).
+ */
+ if (nmsgs != 0)
+ {
+ LogStandbyInvalidations(nmsgs, invalMessages,
+ RelcacheInitFileInval);
+ wrote_xlog = true; /* not strictly necessary */
+ }
+
+ /*
* If we didn't create XLOG entries, we're done here; otherwise we
* should trigger flushing those entries the same as a commit record
* would. This will primarily happen for HOT pruning and the like; we
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 0cdb0b8a92b..0c248f07e8f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -327,6 +327,15 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
break;
case XLOG_STANDBY_LOCK:
break;
+ case XLOG_INVALIDATIONS:
+ {
+ xl_invalidations *invalidations =
+ (xl_invalidations *) XLogRecGetData(r);
+
+ ReorderBufferImmediateInvalidation(
+ ctx->reorder, invalidations->nmsgs, invalidations->msgs);
+ }
+ break;
default:
elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 45207086ac0..57821c34027 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1810,26 +1810,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
* catalog and we need to update the caches according to that.
*/
if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
- {
- bool use_subtxn = IsTransactionOrTransactionBlock();
-
- if (use_subtxn)
- BeginInternalSubTransaction("replay");
-
- /*
- * Force invalidations to happen outside of a valid transaction - that
- * way entries will just be marked as invalid without accessing the
- * catalog. That's advantageous because we don't need to setup the
- * full state necessary for catalog access.
- */
- if (use_subtxn)
- AbortCurrentTransaction();
-
- ReorderBufferExecuteInvalidations(rb, txn);
-
- if (use_subtxn)
- RollbackAndReleaseCurrentSubTransaction();
- }
+ ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+ txn->invalidations);
else
Assert(txn->ninvalidations == 0);
@@ -1837,6 +1819,37 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
ReorderBufferCleanupTXN(rb, txn);
}
+/*
+ * Execute invalidations happening outside the context of a decoded
+ * transaction. That currently happens either for xid-less commits
+ * (c.f. RecordTransactionCommit()) or for invalidations in uninteresting
+ * transactions (via ReorderBufferForget()).
+ */
+void
+ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
+ SharedInvalidationMessage *invalidations)
+{
+ bool use_subtxn = IsTransactionOrTransactionBlock();
+ int i;
+
+ if (use_subtxn)
+ BeginInternalSubTransaction("replay");
+
+ /*
+ * Force invalidations to happen outside of a valid transaction - that
+ * way entries will just be marked as invalid without accessing the
+ * catalog. That's advantageous because we don't need to setup the
+ * full state necessary for catalog access.
+ */
+ if (use_subtxn)
+ AbortCurrentTransaction();
+
+ for (i = 0; i < ninvalidations; i++)
+ LocalExecuteInvalidationMessage(&invalidations[i]);
+
+ if (use_subtxn)
+ RollbackAndReleaseCurrentSubTransaction();
+}
/*
* Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 6a9bf842d39..762dfa65eb9 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -825,6 +825,16 @@ standby_redo(XLogReaderState *record)
ProcArrayApplyRecoveryInfo(&running);
}
+ else if (info == XLOG_INVALIDATIONS)
+ {
+ xl_invalidations *xlrec = (xl_invalidations *) XLogRecGetData(record);
+
+ ProcessCommittedInvalidationMessages(xlrec->msgs,
+ xlrec->nmsgs,
+ xlrec->relcacheInitFileInval,
+ xlrec->dbId,
+ xlrec->tsId);
+ }
else
elog(PANIC, "standby_redo: unknown op code %u", info);
}
@@ -1068,3 +1078,28 @@ LogAccessExclusiveLockPrepare(void)
*/
(void) GetTopTransactionId();
}
+
+/*
+ * Emit WAL for invalidations. This currently is only used for commits without
+ * an xid but which contain invalidations.
+ */
+void
+LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInitFileInval)
+{
+ xl_invalidations xlrec;
+
+ /* prepare record */
+ memset(&xlrec, 0, sizeof(xlrec));
+ xlrec.dbId = MyDatabaseId;
+ xlrec.tsId = MyDatabaseTableSpace;
+ xlrec.relcacheInitFileInval = relcacheInitFileInval;
+ xlrec.nmsgs = nmsgs;
+
+ /* perform insertion */
+ XLogBeginInsert();
+ XLogRegisterData((char *) (&xlrec), MinSizeOfInvalidations);
+ XLogRegisterData((char *) msgs,
+ nmsgs * sizeof(SharedInvalidationMessage));
+ XLogInsert(RM_STANDBY_ID, XLOG_INVALIDATIONS);
+}
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 924bebbac52..58035182298 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -842,8 +842,9 @@ xactGetCommittedInvalidationMessages(SharedInvalidationMessage **msgs,
}
/*
- * ProcessCommittedInvalidationMessages is executed by xact_redo_commit()
- * to process invalidation messages added to commit records.
+ * ProcessCommittedInvalidationMessages is executed by xact_redo_commit() or
+ * standby_redo() to process invalidation messages. Currently that happens
+ * only at end-of-xact.
*
* Relcache init file invalidation requires processing both
* before and after we send the SI messages. See AtEOXact_Inval()
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4c54953a512..e0708940a04 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -391,6 +391,8 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
CommandId cmin, CommandId cmax, CommandId combocid);
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
Size nmsgs, SharedInvalidationMessage *msgs);
+void ReorderBufferImmediateInvalidation(ReorderBuffer *, uint32 ninvalidations,
+ SharedInvalidationMessage *invalidations);
void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
diff --git a/src/include/storage/standby.h b/src/include/storage/standby.h
index aafc9b8a482..52058840a59 100644
--- a/src/include/storage/standby.h
+++ b/src/include/storage/standby.h
@@ -85,5 +85,7 @@ extern void LogAccessExclusiveLock(Oid dbOid, Oid relOid);
extern void LogAccessExclusiveLockPrepare(void);
extern XLogRecPtr LogStandbySnapshot(void);
+extern void LogStandbyInvalidations(int nmsgs, SharedInvalidationMessage *msgs,
+ bool relcacheInitFileInval);
#endif /* STANDBY_H */
diff --git a/src/include/storage/standbydefs.h b/src/include/storage/standbydefs.h
index 609d06edeeb..bd3c97fe434 100644
--- a/src/include/storage/standbydefs.h
+++ b/src/include/storage/standbydefs.h
@@ -17,17 +17,23 @@
#include "access/xlogreader.h"
#include "lib/stringinfo.h"
#include "storage/lockdefs.h"
+#include "storage/sinval.h"
/* Recovery handlers for the Standby Rmgr (RM_STANDBY_ID) */
extern void standby_redo(XLogReaderState *record);
extern void standby_desc(StringInfo buf, XLogReaderState *record);
extern const char *standby_identify(uint8 info);
+extern void standby_desc_invalidations(StringInfo buf,
+ int nmsgs, SharedInvalidationMessage *msgs,
+ Oid dbId, Oid tsId,
+ bool relcacheInitFileInval);
/*
* XLOG message types
*/
#define XLOG_STANDBY_LOCK 0x00
#define XLOG_RUNNING_XACTS 0x10
+#define XLOG_INVALIDATIONS 0x20
typedef struct xl_standby_locks
{
@@ -50,4 +56,19 @@ typedef struct xl_running_xacts
TransactionId xids[FLEXIBLE_ARRAY_MEMBER];
} xl_running_xacts;
+/*
+ * Invalidations for standby, currently only when transactions without an
+ * assigned xid commit.
+ */
+typedef struct xl_invalidations
+{
+ Oid dbId; /* MyDatabaseId */
+ Oid tsId; /* MyDatabaseTableSpace */
+ bool relcacheInitFileInval; /* invalidate relcache init file */
+ int nmsgs; /* number of shared inval msgs */
+ SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+} xl_invalidations;
+
+#define MinSizeOfInvalidations offsetof(xl_invalidations, msgs)
+
#endif /* STANDBYDEFS_H */