aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2023-04-07 22:40:27 -0700
committerAndres Freund <andres@anarazel.de>2023-04-07 22:40:27 -0700
commitbe87200efd9308ccfe217ce8828f316e93e370da (patch)
treef269cd86fdfebf3a15ef3559904f5863caede055 /src
parent2ed16aacf1af1e1a26bffb121a19d1ad5f5177f0 (diff)
downloadpostgresql-be87200efd9308ccfe217ce8828f316e93e370da.tar.gz
postgresql-be87200efd9308ccfe217ce8828f316e93e370da.zip
Support invalidating replication slots due to horizon and wal_level
Needed for logical decoding on a standby. Slots need to be invalidated because of the horizon if rows required for logical decoding are removed. If the primary's wal_level is lowered from 'logical', logical slots on the standby need to be invalidated. The new invalidation methods will be used in a subsequent commit. Logical slots that have been invalidated can be identified via the new pg_replication_slots.conflicting column. See 6af1793954e for an overall design of logical decoding on a standby. Bumps catversion for the addition of the new pg_replication_slots column. Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Author: Andres Freund <andres@anarazel.de> Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version) Reviewed-by: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com> Reviewed-by: Andres Freund <andres@anarazel.de> Reviewed-by: Robert Haas <robertmhaas@gmail.com> Reviewed-by: Fabrízio de Royes Mello <fabriziomello@gmail.com> Reviewed-by: Bharath Rupireddy <bharath.rupireddyforpostgres@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Melanie Plageman <melanieplageman@gmail.com> Reviewed-by: Alvaro Herrera <alvherre@alvh.no-ip.org> Discussion: https://postgr.es/m/20230407075009.igg7be27ha2htkbt@awork3.anarazel.de
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xlog.c8
-rw-r--r--src/backend/catalog/system_views.sql3
-rw-r--r--src/backend/replication/logical/logical.c7
-rw-r--r--src/backend/replication/slot.c151
-rw-r--r--src/backend/replication/slotfuncs.c12
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat6
-rw-r--r--src/include/replication/slot.h9
-rw-r--r--src/test/regress/expected/rules.out5
9 files changed, 166 insertions, 37 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 18e16ae5b3e..23903445293 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6809,7 +6809,9 @@ CreateCheckPoint(int flags)
*/
XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size);
KeepLogSeg(recptr, &_logSegNo);
- if (InvalidateObsoleteReplicationSlots(_logSegNo))
+ if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
+ _logSegNo, InvalidOid,
+ InvalidTransactionId))
{
/*
* Some slots have been invalidated; recalculate the old-segment
@@ -7253,7 +7255,9 @@ CreateRestartPoint(int flags)
replayPtr = GetXLogReplayRecPtr(&replayTLI);
endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr;
KeepLogSeg(endptr, &_logSegNo);
- if (InvalidateObsoleteReplicationSlots(_logSegNo))
+ if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED,
+ _logSegNo, InvalidOid,
+ InvalidTransactionId))
{
/*
* Some slots have been invalidated; recalculate the old-segment
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c123f109896..ff69983f2ea 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1000,7 +1000,8 @@ CREATE VIEW pg_replication_slots AS
L.confirmed_flush_lsn,
L.wal_status,
L.safe_wal_size,
- L.two_phase
+ L.two_phase,
+ L.conflicting
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 6082d222d5d..6ecea3c49c5 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -531,6 +531,13 @@ CreateDecodingContext(XLogRecPtr start_lsn,
NameStr(MyReplicationSlot->data.name)),
errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
+ if (MyReplicationSlot->data.invalidated != RS_INVAL_NONE)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("can no longer get changes from replication slot \"%s\"",
+ NameStr(MyReplicationSlot->data.name)),
+ errdetail("This slot has been invalidated because it was conflicting with recovery.")));
+
Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);
Assert(MyReplicationSlot->data.restart_lsn != InvalidXLogRecPtr);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index f969f7c083f..4d0421c5ed1 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1241,8 +1241,58 @@ ReplicationSlotReserveWal(void)
}
/*
- * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot
- * and mark it invalid, if necessary and possible.
+ * Report that replication slot needs to be invalidated
+ */
+static void
+ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
+ bool terminating,
+ int pid,
+ NameData slotname,
+ XLogRecPtr restart_lsn,
+ XLogRecPtr oldestLSN,
+ TransactionId snapshotConflictHorizon)
+{
+ StringInfoData err_detail;
+ bool hint = false;
+
+ initStringInfo(&err_detail);
+
+ switch (cause)
+ {
+ case RS_INVAL_WAL_REMOVED:
+ hint = true;
+ appendStringInfo(&err_detail, _("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes."),
+ LSN_FORMAT_ARGS(restart_lsn),
+ (unsigned long long) (oldestLSN - restart_lsn));
+ break;
+ case RS_INVAL_HORIZON:
+ appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
+ snapshotConflictHorizon);
+ break;
+
+ case RS_INVAL_WAL_LEVEL:
+ appendStringInfo(&err_detail, _("Logical decoding on standby requires wal_level to be at least logical on the primary server"));
+ break;
+ case RS_INVAL_NONE:
+ pg_unreachable();
+ }
+
+ ereport(LOG,
+ terminating ?
+ errmsg("terminating process %d to release replication slot \"%s\"",
+ pid, NameStr(slotname)) :
+ errmsg("invalidating obsolete replication slot \"%s\"",
+ NameStr(slotname)),
+ errdetail_internal("%s", err_detail.data),
+ hint ? errhint("You might need to increase max_slot_wal_keep_size.") : 0);
+
+ pfree(err_detail.data);
+}
+
+/*
+ * Helper for InvalidateObsoleteReplicationSlots
+ *
+ * Acquires the given slot and mark it invalid, if necessary and possible.
*
* Returns whether ReplicationSlotControlLock was released in the interim (and
* in that case we're not holding the lock at return, otherwise we are).
@@ -1253,7 +1303,10 @@ ReplicationSlotReserveWal(void)
* for syscalls, so caller must restart if we return true.
*/
static bool
-InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
+InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
+ ReplicationSlot *s,
+ XLogRecPtr oldestLSN,
+ Oid dboid, TransactionId snapshotConflictHorizon,
bool *invalidated)
{
int last_signaled_pid = 0;
@@ -1264,6 +1317,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
XLogRecPtr restart_lsn;
NameData slotname;
int active_pid = 0;
+ ReplicationSlotInvalidationCause conflict = RS_INVAL_NONE;
Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
@@ -1286,10 +1340,44 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
restart_lsn = s->data.restart_lsn;
/*
- * If the slot is already invalid or is fresh enough, we don't need to
- * do anything.
+ * If the slot is already invalid or is a non conflicting slot, we
+ * don't need to do anything.
*/
- if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+ if (s->data.invalidated == RS_INVAL_NONE)
+ {
+ switch (cause)
+ {
+ case RS_INVAL_WAL_REMOVED:
+ if (s->data.restart_lsn != InvalidXLogRecPtr &&
+ s->data.restart_lsn < oldestLSN)
+ conflict = cause;
+ break;
+ case RS_INVAL_HORIZON:
+ if (!SlotIsLogical(s))
+ break;
+ /* invalid DB oid signals a shared relation */
+ if (dboid != InvalidOid && dboid != s->data.database)
+ break;
+ if (TransactionIdIsValid(s->effective_xmin) &&
+ TransactionIdPrecedesOrEquals(s->effective_xmin,
+ snapshotConflictHorizon))
+ conflict = cause;
+ else if (TransactionIdIsValid(s->effective_catalog_xmin) &&
+ TransactionIdPrecedesOrEquals(s->effective_catalog_xmin,
+ snapshotConflictHorizon))
+ conflict = cause;
+ break;
+ case RS_INVAL_WAL_LEVEL:
+ if (SlotIsLogical(s))
+ conflict = cause;
+ break;
+ case RS_INVAL_NONE:
+ pg_unreachable();
+ }
+ }
+
+ /* if there's no conflict, we're done */
+ if (conflict == RS_INVAL_NONE)
{
SpinLockRelease(&s->mutex);
if (released_lock)
@@ -1309,13 +1397,14 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
- s->data.invalidated = RS_INVAL_WAL_REMOVED;
+ s->data.invalidated = conflict;
/*
* XXX: We should consider not overwriting restart_lsn and instead
* just rely on .invalidated.
*/
- s->data.restart_lsn = InvalidXLogRecPtr;
+ if (conflict == RS_INVAL_WAL_REMOVED)
+ s->data.restart_lsn = InvalidXLogRecPtr;
/* Let caller know */
*invalidated = true;
@@ -1349,13 +1438,9 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
*/
if (last_signaled_pid != active_pid)
{
- ereport(LOG,
- errmsg("terminating process %d to release replication slot \"%s\"",
- active_pid, NameStr(slotname)),
- errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
- LSN_FORMAT_ARGS(restart_lsn),
- (unsigned long long) (oldestLSN - restart_lsn)),
- errhint("You might need to increase max_slot_wal_keep_size."));
+ ReportSlotInvalidation(conflict, true, active_pid,
+ slotname, restart_lsn,
+ oldestLSN, snapshotConflictHorizon);
(void) kill(active_pid, SIGTERM);
last_signaled_pid = active_pid;
@@ -1390,14 +1475,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
ReplicationSlotMarkDirty();
ReplicationSlotSave();
ReplicationSlotRelease();
+ pgstat_drop_replslot(s);
- ereport(LOG,
- errmsg("invalidating obsolete replication slot \"%s\"",
- NameStr(slotname)),
- errdetail("The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.",
- LSN_FORMAT_ARGS(restart_lsn),
- (unsigned long long) (oldestLSN - restart_lsn)),
- errhint("You might need to increase max_slot_wal_keep_size."));
+ ReportSlotInvalidation(conflict, false, active_pid,
+ slotname, restart_lsn,
+ oldestLSN, snapshotConflictHorizon);
/* done with this slot for now */
break;
@@ -1410,19 +1492,34 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
}
/*
- * Mark any slot that points to an LSN older than the given segment
- * as invalid; it requires WAL that's about to be removed.
+ * Invalidate slots that require resources about to be removed.
*
* Returns true when any slot have got invalidated.
*
+ * Whether a slot needs to be invalidated depends on the cause. A slot is
+ * removed if it:
+ * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
+ * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
+ * db; dboid may be InvalidOid for shared relations
+ * - RS_INVAL_WAL_LEVEL: is logical
+ *
* NB - this runs as part of checkpoint, so avoid raising errors if possible.
*/
bool
-InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno)
+InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
+ XLogSegNo oldestSegno, Oid dboid,
+ TransactionId snapshotConflictHorizon)
{
XLogRecPtr oldestLSN;
bool invalidated = false;
+ Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon));
+ Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0);
+ Assert(cause != RS_INVAL_NONE);
+
+ if (max_replication_slots == 0)
+ return invalidated;
+
XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
restart:
@@ -1434,7 +1531,9 @@ restart:
if (!s->in_use)
continue;
- if (InvalidatePossiblyObsoleteSlot(s, oldestLSN, &invalidated))
+ if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid,
+ snapshotConflictHorizon,
+ &invalidated))
{
/* if the lock was released, start from scratch */
goto restart;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ad3e72be5ee..6035cf48160 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -232,7 +232,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 14
+#define PG_GET_REPLICATION_SLOTS_COLS 15
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
XLogRecPtr currlsn;
int slotno;
@@ -402,6 +402,16 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = BoolGetDatum(slot_contents.data.two_phase);
+ if (slot_contents.data.database == InvalidOid)
+ nulls[i++] = true;
+ else
+ {
+ if (slot_contents.data.invalidated != RS_INVAL_NONE)
+ values[i++] = BoolGetDatum(true);
+ else
+ values[i++] = BoolGetDatum(false);
+ }
+
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index e32d50f3865..dabe23bbeb0 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -57,6 +57,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202304072
+#define CATALOG_VERSION_NO 202304073
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 6291d76a4c1..0e9ce5215bb 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -11077,9 +11077,9 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
- proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
+ proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool,bool}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase,conflicting}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 34ce055dd50..a8a89dc7844 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -46,6 +46,10 @@ typedef enum ReplicationSlotInvalidationCause
RS_INVAL_NONE,
/* required WAL has been removed */
RS_INVAL_WAL_REMOVED,
+ /* required rows have been removed */
+ RS_INVAL_HORIZON,
+ /* wal_level insufficient for slot */
+ RS_INVAL_WAL_LEVEL,
} ReplicationSlotInvalidationCause;
/*
@@ -226,7 +230,10 @@ extern void ReplicationSlotsComputeRequiredLSN(void);
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
-extern bool InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern bool InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause,
+ XLogSegNo oldestSegno,
+ Oid dboid,
+ TransactionId snapshotConflictHorizon);
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
extern int ReplicationSlotIndex(ReplicationSlot *slot);
extern bool ReplicationSlotName(int index, Name name);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 8337bac5dbe..3d2405272a9 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1472,8 +1472,9 @@ pg_replication_slots| SELECT l.slot_name,
l.confirmed_flush_lsn,
l.wal_status,
l.safe_wal_size,
- l.two_phase
- FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
+ l.two_phase,
+ l.conflicting
+ FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase, conflicting)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,