aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/decode.c30
-rw-r--r--src/backend/replication/logical/logical.c36
2 files changed, 49 insertions, 17 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8fe7bb65f1f..5508cc21777 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* can restart from there.
*/
break;
+ case XLOG_PARAMETER_CHANGE:
+ {
+ xl_parameter_change *xlrec =
+ (xl_parameter_change *) XLogRecGetData(buf->record);
+
+ /*
+ * If wal_level on the primary is reduced to less than
+ * logical, we want to prevent existing logical slots from
+ * being used. Existing logical slots on the standby get
+ * invalidated when this WAL record is replayed; and further,
+ * slot creation fails when wal_level is not sufficient; but
+ * all these operations are not synchronized, so a logical
+ * slot may creep in while the wal_level is being
+ * reduced. Hence this extra check.
+ */
+ if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
+ {
+ /*
+ * This can occur only on a standby, as a primary would
+ * not allow to restart after changing wal_level < logical
+ * if there is pre-existing logical slot.
+ */
+ Assert(RecoveryInProgress());
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
+ }
+ break;
+ }
case XLOG_NOOP:
case XLOG_NEXTOID:
case XLOG_SWITCH:
case XLOG_BACKUP_END:
- case XLOG_PARAMETER_CHANGE:
case XLOG_RESTORE_POINT:
case XLOG_FPW_CHANGE:
case XLOG_FPI_FOR_HINT:
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6ecea3c49c5..82dae950809 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding requires a database connection")));
- /* ----
- * TODO: We got to change that someday soon...
- *
- * There's basically three things missing to allow this:
- * 1) We need to be able to correctly and quickly identify the timeline a
- * LSN belongs to
- * 2) We need to force hot_standby_feedback to be enabled at all times so
- * the primary cannot remove rows we need.
- * 3) support dropping replication slots referring to a database, in
- * dbase_redo. There can't be any active ones due to HS recovery
- * conflicts, so that should be relatively easy.
- * ----
- */
if (RecoveryInProgress())
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("logical decoding cannot be used while in recovery")));
+ {
+ /*
+ * This check may have race conditions, but whenever
+ * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
+ * verify that there are no existing logical replication slots. And to
+ * avoid races around creating a new slot,
+ * CheckLogicalDecodingRequirements() is called once before creating
+ * the slot, and once when logical decoding is initially starting up.
+ */
+ if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
+ }
}
/*
@@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
LogicalDecodingContext *ctx;
MemoryContext old_context;
+ /*
+ * On a standby, this check is also required while creating the
+ * slot. Check the comments in the function.
+ */
+ CheckLogicalDecodingRequirements();
+
/* shorter lines... */
slot = MyReplicationSlot;