diff options
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r-- | src/backend/replication/logical/decode.c | 30 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 36 |
2 files changed, 49 insertions, 17 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8fe7bb65f1f..5508cc21777 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * can restart from there. */ break; + case XLOG_PARAMETER_CHANGE: + { + xl_parameter_change *xlrec = + (xl_parameter_change *) XLogRecGetData(buf->record); + + /* + * If wal_level on the primary is reduced to less than + * logical, we want to prevent existing logical slots from + * being used. Existing logical slots on the standby get + * invalidated when this WAL record is replayed; and further, + * slot creation fails when wal_level is not sufficient; but + * all these operations are not synchronized, so a logical + * slot may creep in while the wal_level is being + * reduced. Hence this extra check. + */ + if (xlrec->wal_level < WAL_LEVEL_LOGICAL) + { + /* + * This can occur only on a standby, as a primary would + * not allow to restart after changing wal_level < logical + * if there is pre-existing logical slot. + */ + Assert(RecoveryInProgress()); + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary"))); + } + break; + } case XLOG_NOOP: case XLOG_NEXTOID: case XLOG_SWITCH: case XLOG_BACKUP_END: - case XLOG_PARAMETER_CHANGE: case XLOG_RESTORE_POINT: case XLOG_FPW_CHANGE: case XLOG_FPI_FOR_HINT: diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 6ecea3c49c5..82dae950809 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("logical decoding requires a database connection"))); - /* ---- - * TODO: We got to change that someday soon... - * - * There's basically three things missing to allow this: - * 1) We need to be able to correctly and quickly identify the timeline a - * LSN belongs to - * 2) We need to force hot_standby_feedback to be enabled at all times so - * the primary cannot remove rows we need. - * 3) support dropping replication slots referring to a database, in - * dbase_redo. There can't be any active ones due to HS recovery - * conflicts, so that should be relatively easy. - * ---- - */ if (RecoveryInProgress()) - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("logical decoding cannot be used while in recovery"))); + { + /* + * This check may have race conditions, but whenever + * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we + * verify that there are no existing logical replication slots. And to + * avoid races around creating a new slot, + * CheckLogicalDecodingRequirements() is called once before creating + * the slot, and once when logical decoding is initially starting up. + */ + if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary"))); + } } /* @@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin, LogicalDecodingContext *ctx; MemoryContext old_context; + /* + * On a standby, this check is also required while creating the + * slot. Check the comments in the function. + */ + CheckLogicalDecodingRequirements(); + /* shorter lines... */ slot = MyReplicationSlot; |