aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logical.c25
-rw-r--r--src/backend/replication/logical/logicalfuncs.c2
-rw-r--r--src/backend/replication/logical/slotsync.c4
-rw-r--r--src/backend/replication/slot.c30
-rw-r--r--src/backend/replication/slotfuncs.c2
-rw-r--r--src/backend/replication/walsender.c4
-rw-r--r--src/backend/utils/adt/pg_upgrade_support.c2
-rw-r--r--src/include/replication/slot.h3
-rw-r--r--src/test/recovery/t/019_replslot_limit.pl2
-rw-r--r--src/test/recovery/t/035_standby_logical_decoding.pl15
10 files changed, 39 insertions, 50 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 0b25efafe2b..8ea846bfc3b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -542,28 +542,9 @@ CreateDecodingContext(XLogRecPtr start_lsn,
errdetail("This replication slot is being synchronized from the primary server."),
errhint("Specify another replication slot."));
- /*
- * Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
- * "cannot get changes" wording in this errmsg because that'd be
- * confusingly ambiguous about no changes being available when called from
- * pg_logical_slot_get_changes_guts().
- */
- if (MyReplicationSlot->data.invalidated == RS_INVAL_WAL_REMOVED)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("can no longer get changes from replication slot \"%s\"",
- NameStr(MyReplicationSlot->data.name)),
- errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
-
- if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("can no longer get changes from replication slot \"%s\"",
- NameStr(MyReplicationSlot->data.name)),
- errdetail("This slot has been invalidated because it was conflicting with recovery.")));
-
- Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
- Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
+ /* slot must be valid to allow decoding */
+ Assert(slot->data.invalidated == RS_INVAL_NONE);
+ Assert(slot->data.restart_lsn != InvalidXLogRecPtr);
if (start_lsn == InvalidXLogRecPtr)
{
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 0148ec36788..ca53caac2f2 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
else
end_of_wal = GetXLogReplayRecPtr(NULL);
- ReplicationSlotAcquire(NameStr(*name), true);
+ ReplicationSlotAcquire(NameStr(*name), true, true);
PG_TRY();
{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f6945af1d43..be6f87f00b2 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
if (synced_slot)
{
- ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+ ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
ReplicationSlotDropAcquired();
}
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
* pre-check to ensure that at least one of the slot properties is
* changed before acquiring the slot.
*/
- ReplicationSlotAcquire(remote_slot->name, true);
+ ReplicationSlotAcquire(remote_slot->name, true, false);
Assert(slot == MyReplicationSlot);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index b30e0473e1c..c57a13d8208 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -535,9 +535,13 @@ ReplicationSlotName(int index, Name name)
*
* An error is raised if nowait is true and the slot is currently in use. If
* nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid. It should always be set to true, except when we are temporarily
+ * acquiring the slot and don't intend to change it.
*/
void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
{
ReplicationSlot *s;
int active_pid;
@@ -561,6 +565,19 @@ retry:
name)));
}
+ /* Invalid slots can't be modified or used before accessing the WAL. */
+ if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+ {
+ LWLockRelease(ReplicationSlotControlLock);
+
+ ereport(ERROR,
+ errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("can no longer access replication slot \"%s\"",
+ NameStr(s->data.name)),
+ errdetail("This replication slot has been invalidated due to \"%s\".",
+ SlotInvalidationCauses[s->data.invalidated]));
+ }
+
/*
* This is the slot we want; check if it's active under some other
* process. In single user mode, we don't need this check.
@@ -785,7 +802,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
{
Assert(MyReplicationSlot == NULL);
- ReplicationSlotAcquire(name, nowait);
+ ReplicationSlotAcquire(name, nowait, false);
/*
* Do not allow users to drop the slots which are currently being synced
@@ -812,7 +829,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
Assert(MyReplicationSlot == NULL);
Assert(failover || two_phase);
- ReplicationSlotAcquire(name, false);
+ ReplicationSlotAcquire(name, false, true);
if (SlotIsPhysical(MyReplicationSlot))
ereport(ERROR,
@@ -820,13 +837,6 @@ ReplicationSlotAlter(const char *name, const bool *failover,
errmsg("cannot use %s with a physical replication slot",
"ALTER_REPLICATION_SLOT"));
- if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
- ereport(ERROR,
- errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("cannot alter invalid replication slot \"%s\"", name),
- errdetail("This replication slot has been invalidated due to \"%s\".",
- SlotInvalidationCauses[MyReplicationSlot->data.invalidated]));
-
if (RecoveryInProgress())
{
/*
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 977146789fe..8be4b8c65b5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
/* Acquire the slot so we "own" it */
- ReplicationSlotAcquire(NameStr(*slotname), true);
+ ReplicationSlotAcquire(NameStr(*slotname), true, true);
/* A slot whose restart_lsn has never been reserved cannot be advanced */
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bac504b554e..446d10c1a7d 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
if (cmd->slotname)
{
- ReplicationSlotAcquire(cmd->slotname, true);
+ ReplicationSlotAcquire(cmd->slotname, true, true);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
Assert(!MyReplicationSlot);
- ReplicationSlotAcquire(cmd->slotname, true);
+ ReplicationSlotAcquire(cmd->slotname, true, true);
/*
* Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 9a10907d05b..d44f8c262ba 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
slot_name = PG_GETARG_NAME(0);
/* Acquire the given slot */
- ReplicationSlotAcquire(NameStr(*slot_name), true);
+ ReplicationSlotAcquire(NameStr(*slot_name), true, true);
Assert(SlotIsLogical(MyReplicationSlot));
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index bf62b36ad07..47ebdaecb6a 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
extern void ReplicationSlotAlter(const char *name, const bool *failover,
const bool *two_phase);
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+ bool error_if_invalid);
extern void ReplicationSlotRelease(void);
extern void ReplicationSlotCleanup(bool synced_only);
extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index ae2ad5c933a..6468784b83d 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -234,7 +234,7 @@ my $failed = 0;
for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
{
if ($node_standby->log_contains(
- "requested WAL segment [0-9A-F]+ has already been removed",
+ "This replication slot has been invalidated due to \"wal_removed\".",
$logstart))
{
$failed = 1;
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
index 7e794c5bea3..505e85d1eb6 100644
--- a/src/test/recovery/t/035_standby_logical_decoding.pl
+++ b/src/test/recovery/t/035_standby_logical_decoding.pl
@@ -533,7 +533,7 @@ check_slots_conflict_reason('vacuum_full_', 'rows_removed');
qq[ALTER_REPLICATION_SLOT vacuum_full_inactiveslot (failover);],
replication => 'database');
ok( $stderr =~
- /ERROR: cannot alter invalid replication slot "vacuum_full_inactiveslot"/
+ /ERROR: can no longer access replication slot "vacuum_full_inactiveslot"/
&& $stderr =~
/DETAIL: This replication slot has been invalidated due to "rows_removed"./,
"invalidated slot cannot be altered");
@@ -551,8 +551,7 @@ $handle =
# We are not able to read from the slot as it has been invalidated
check_pg_recvlogical_stderr($handle,
- "can no longer get changes from replication slot \"vacuum_full_activeslot\""
-);
+ "can no longer access replication slot \"vacuum_full_activeslot\"");
# Turn hot_standby_feedback back on
change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -632,8 +631,7 @@ $handle =
# We are not able to read from the slot as it has been invalidated
check_pg_recvlogical_stderr($handle,
- "can no longer get changes from replication slot \"row_removal_activeslot\""
-);
+ "can no longer access replication slot \"row_removal_activeslot\"");
##################################################
# Recovery conflict: Same as Scenario 2 but on a shared catalog table
@@ -668,7 +666,7 @@ $handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout,
# We are not able to read from the slot as it has been invalidated
check_pg_recvlogical_stderr($handle,
- "can no longer get changes from replication slot \"shared_row_removal_activeslot\""
+ "can no longer access replication slot \"shared_row_removal_activeslot\""
);
##################################################
@@ -759,7 +757,7 @@ $handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
# We are not able to read from the slot as it has been invalidated
check_pg_recvlogical_stderr($handle,
- "can no longer get changes from replication slot \"pruning_activeslot\"");
+ "can no longer access replication slot \"pruning_activeslot\"");
# Turn hot_standby_feedback back on
change_hot_standby_feedback_and_wait_for_xmins(1, 1);
@@ -818,8 +816,7 @@ $handle =
make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
# as the slot has been invalidated we should not be able to read
check_pg_recvlogical_stderr($handle,
- "can no longer get changes from replication slot \"wal_level_activeslot\""
-);
+ "can no longer access replication slot \"wal_level_activeslot\"");
##################################################
# DROP DATABASE should drop its slots, including active slots.