diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/access/transam/xlog.c | 4 | ||||
-rw-r--r-- | src/backend/replication/slot.c | 325 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 2 | ||||
-rw-r--r-- | src/backend/utils/adt/timestamp.c | 18 | ||||
-rw-r--r-- | src/backend/utils/misc/guc_tables.c | 12 | ||||
-rw-r--r-- | src/backend/utils/misc/postgresql.conf.sample | 1 |
6 files changed, 284 insertions, 78 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 25a5c605404..f9bf5ba7509 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7337,7 +7337,7 @@ CreateCheckPoint(int flags) */ XLByteToSeg(RedoRecPtr, _logSegNo, wal_segment_size); KeepLogSeg(recptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED, + if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, InvalidTransactionId)) { @@ -7792,7 +7792,7 @@ CreateRestartPoint(int flags) replayPtr = GetXLogReplayRecPtr(&replayTLI); endptr = (receivePtr < replayPtr) ? replayPtr : receivePtr; KeepLogSeg(endptr, &_logSegNo); - if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED, + if (InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_REMOVED | RS_INVAL_IDLE_TIMEOUT, _logSegNo, InvalidOid, InvalidTransactionId)) { diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index fe5acd8b1fc..d73c9c2fc32 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -102,16 +102,24 @@ typedef struct /* * Lookup table for slot invalidation causes. */ -const char *const SlotInvalidationCauses[] = { - [RS_INVAL_NONE] = "none", - [RS_INVAL_WAL_REMOVED] = "wal_removed", - [RS_INVAL_HORIZON] = "rows_removed", - [RS_INVAL_WAL_LEVEL] = "wal_level_insufficient", +typedef struct SlotInvalidationCauseMap +{ + ReplicationSlotInvalidationCause cause; + const char *cause_name; +} SlotInvalidationCauseMap; + +static const SlotInvalidationCauseMap SlotInvalidationCauses[] = { + {RS_INVAL_NONE, "none"}, + {RS_INVAL_WAL_REMOVED, "wal_removed"}, + {RS_INVAL_HORIZON, "rows_removed"}, + {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"}, + {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"}, }; -/* Maximum number of invalidation causes */ -#define RS_INVAL_MAX_CAUSES RS_INVAL_WAL_LEVEL - +/* + * Ensure that the lookup table is up-to-date with the enums defined in + * ReplicationSlotInvalidationCause. + */ StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1), "array length mismatch"); @@ -142,6 +150,12 @@ int max_replication_slots = 10; /* the maximum number of replication * slots */ /* + * Invalidate replication slots that have remained idle longer than this + * duration; '0' disables it. + */ +int idle_replication_slot_timeout_mins = 0; + +/* * This GUC lists streaming replication standby server slot names that * logical WAL sender processes will wait for. */ @@ -575,7 +589,7 @@ retry: errmsg("can no longer access replication slot \"%s\"", NameStr(s->data.name)), errdetail("This replication slot has been invalidated due to \"%s\".", - SlotInvalidationCauses[s->data.invalidated])); + GetSlotInvalidationCauseName(s->data.invalidated))); } /* @@ -592,14 +606,23 @@ retry: if (!nowait) ConditionVariablePrepareToSleep(&s->active_cv); + /* + * It is important to reset the inactive_since under spinlock here to + * avoid race conditions with slot invalidation. See comments related + * to inactive_since in InvalidatePossiblyObsoleteSlot. + */ SpinLockAcquire(&s->mutex); if (s->active_pid == 0) s->active_pid = MyProcPid; active_pid = s->active_pid; + ReplicationSlotSetInactiveSince(s, 0, false); SpinLockRelease(&s->mutex); } else + { active_pid = MyProcPid; + ReplicationSlotSetInactiveSince(s, 0, true); + } LWLockRelease(ReplicationSlotControlLock); /* @@ -640,11 +663,6 @@ retry: if (SlotIsLogical(s)) pgstat_acquire_replslot(s); - /* - * Reset the time since the slot has become inactive as the slot is active - * now. - */ - ReplicationSlotSetInactiveSince(s, 0, true); if (am_walsender) { @@ -1512,12 +1530,14 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, NameData slotname, XLogRecPtr restart_lsn, XLogRecPtr oldestLSN, - TransactionId snapshotConflictHorizon) + TransactionId snapshotConflictHorizon, + long slot_idle_seconds) { StringInfoData err_detail; - bool hint = false; + StringInfoData err_hint; initStringInfo(&err_detail); + initStringInfo(&err_hint); switch (cause) { @@ -1525,13 +1545,15 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, { unsigned long long ex = oldestLSN - restart_lsn; - hint = true; appendStringInfo(&err_detail, ngettext("The slot's restart_lsn %X/%X exceeds the limit by %llu byte.", "The slot's restart_lsn %X/%X exceeds the limit by %llu bytes.", ex), LSN_FORMAT_ARGS(restart_lsn), ex); + /* translator: %s is a GUC variable name */ + appendStringInfo(&err_hint, _("You might need to increase \"%s\"."), + "max_slot_wal_keep_size"); break; } case RS_INVAL_HORIZON: @@ -1542,6 +1564,21 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, case RS_INVAL_WAL_LEVEL: appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server.")); break; + + case RS_INVAL_IDLE_TIMEOUT: + { + int minutes = slot_idle_seconds / SECS_PER_MINUTE; + int secs = slot_idle_seconds % SECS_PER_MINUTE; + + /* translator: %s is a GUC variable name */ + appendStringInfo(&err_detail, _("The slot's idle time of %dmin %02ds exceeds the configured \"%s\" duration of %dmin."), + minutes, secs, "idle_replication_slot_timeout", + idle_replication_slot_timeout_mins); + /* translator: %s is a GUC variable name */ + appendStringInfo(&err_hint, _("You might need to increase \"%s\"."), + "idle_replication_slot_timeout"); + break; + } case RS_INVAL_NONE: pg_unreachable(); } @@ -1553,9 +1590,99 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, errmsg("invalidating obsolete replication slot \"%s\"", NameStr(slotname)), errdetail_internal("%s", err_detail.data), - hint ? errhint("You might need to increase \"%s\".", "max_slot_wal_keep_size") : 0); + err_hint.len ? errhint("%s", err_hint.data) : 0); pfree(err_detail.data); + pfree(err_hint.data); +} + +/* + * Can we invalidate an idle replication slot? + * + * Idle timeout invalidation is allowed only when: + * + * 1. Idle timeout is set + * 2. Slot has reserved WAL + * 3. Slot is inactive + * 4. The slot is not being synced from the primary while the server is in + * recovery. This is because synced slots are always considered to be + * inactive because they don't perform logical decoding to produce changes. + */ +static inline bool +CanInvalidateIdleSlot(ReplicationSlot *s) +{ + return (idle_replication_slot_timeout_mins != 0 && + !XLogRecPtrIsInvalid(s->data.restart_lsn) && + s->inactive_since > 0 && + !(RecoveryInProgress() && s->data.synced)); +} + +/* + * DetermineSlotInvalidationCause - Determine the cause for which a slot + * becomes invalid among the given possible causes. + * + * This function sequentially checks all possible invalidation causes and + * returns the first one for which the slot is eligible for invalidation. + */ +static ReplicationSlotInvalidationCause +DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s, + XLogRecPtr oldestLSN, Oid dboid, + TransactionId snapshotConflictHorizon, + TransactionId initial_effective_xmin, + TransactionId initial_catalog_effective_xmin, + XLogRecPtr initial_restart_lsn, + TimestampTz *inactive_since, TimestampTz now) +{ + Assert(possible_causes != RS_INVAL_NONE); + + if (possible_causes & RS_INVAL_WAL_REMOVED) + { + if (initial_restart_lsn != InvalidXLogRecPtr && + initial_restart_lsn < oldestLSN) + return RS_INVAL_WAL_REMOVED; + } + + if (possible_causes & RS_INVAL_HORIZON) + { + /* invalid DB oid signals a shared relation */ + if (SlotIsLogical(s) && + (dboid == InvalidOid || dboid == s->data.database)) + { + if (TransactionIdIsValid(initial_effective_xmin) && + TransactionIdPrecedesOrEquals(initial_effective_xmin, + snapshotConflictHorizon)) + return RS_INVAL_HORIZON; + else if (TransactionIdIsValid(initial_catalog_effective_xmin) && + TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin, + snapshotConflictHorizon)) + return RS_INVAL_HORIZON; + } + } + + if (possible_causes & RS_INVAL_WAL_LEVEL) + { + if (SlotIsLogical(s)) + return RS_INVAL_WAL_LEVEL; + } + + if (possible_causes & RS_INVAL_IDLE_TIMEOUT) + { + Assert(now > 0); + + /* + * Check if the slot needs to be invalidated due to + * idle_replication_slot_timeout GUC. + */ + if (CanInvalidateIdleSlot(s) && + TimestampDifferenceExceedsSeconds(s->inactive_since, now, + idle_replication_slot_timeout_mins * SECS_PER_MINUTE)) + { + *inactive_since = s->inactive_since; + return RS_INVAL_IDLE_TIMEOUT; + } + } + + return RS_INVAL_NONE; } /* @@ -1572,7 +1699,7 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause, * for syscalls, so caller must restart if we return true. */ static bool -InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, +InvalidatePossiblyObsoleteSlot(uint32 possible_causes, ReplicationSlot *s, XLogRecPtr oldestLSN, Oid dboid, TransactionId snapshotConflictHorizon, @@ -1585,6 +1712,7 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, TransactionId initial_catalog_effective_xmin = InvalidTransactionId; XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr; ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE; + TimestampTz inactive_since = 0; for (;;) { @@ -1592,6 +1720,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, NameData slotname; int active_pid = 0; ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE; + TimestampTz now = 0; + long slot_idle_secs = 0; Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); @@ -1602,6 +1732,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, break; } + if (possible_causes & RS_INVAL_IDLE_TIMEOUT) + { + /* + * Assign the current time here to avoid system call overhead + * while holding the spinlock in subsequent code. + */ + now = GetCurrentTimestamp(); + } + /* * Check if the slot needs to be invalidated. If it needs to be * invalidated, and is not currently acquired, acquire it and mark it @@ -1621,6 +1760,11 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * those values change since the process holding the slot has been * terminated (if any), so record them here to ensure that we * would report the correct invalidation cause. + * + * Unlike other slot attributes, slot's inactive_since can't be + * changed until the acquired slot is released or the owning + * process is terminated. So, the inactive slot can only be + * invalidated immediately without being terminated. */ if (!terminated) { @@ -1629,35 +1773,15 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, initial_catalog_effective_xmin = s->effective_catalog_xmin; } - switch (cause) - { - case RS_INVAL_WAL_REMOVED: - if (initial_restart_lsn != InvalidXLogRecPtr && - initial_restart_lsn < oldestLSN) - invalidation_cause = cause; - break; - case RS_INVAL_HORIZON: - if (!SlotIsLogical(s)) - break; - /* invalid DB oid signals a shared relation */ - if (dboid != InvalidOid && dboid != s->data.database) - break; - if (TransactionIdIsValid(initial_effective_xmin) && - TransactionIdPrecedesOrEquals(initial_effective_xmin, - snapshotConflictHorizon)) - invalidation_cause = cause; - else if (TransactionIdIsValid(initial_catalog_effective_xmin) && - TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin, - snapshotConflictHorizon)) - invalidation_cause = cause; - break; - case RS_INVAL_WAL_LEVEL: - if (SlotIsLogical(s)) - invalidation_cause = cause; - break; - case RS_INVAL_NONE: - pg_unreachable(); - } + invalidation_cause = DetermineSlotInvalidationCause(possible_causes, + s, oldestLSN, + dboid, + snapshotConflictHorizon, + initial_effective_xmin, + initial_catalog_effective_xmin, + initial_restart_lsn, + &inactive_since, + now); } /* @@ -1705,12 +1829,25 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, /* * The logical replication slots shouldn't be invalidated as GUC - * max_slot_wal_keep_size is set to -1 during the binary upgrade. See - * check_old_cluster_for_valid_slots() where we ensure that no - * invalidated before the upgrade. + * max_slot_wal_keep_size is set to -1 and + * idle_replication_slot_timeout is set to 0 during the binary + * upgrade. See check_old_cluster_for_valid_slots() where we ensure + * that no invalidated before the upgrade. */ Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade)); + /* + * Calculate the idle time duration of the slot if slot is marked + * invalidated with RS_INVAL_IDLE_TIMEOUT. + */ + if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT) + { + int slot_idle_usecs; + + TimestampDifference(inactive_since, now, &slot_idle_secs, + &slot_idle_usecs); + } + if (active_pid != 0) { /* @@ -1739,7 +1876,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, { ReportSlotInvalidation(invalidation_cause, true, active_pid, slotname, restart_lsn, - oldestLSN, snapshotConflictHorizon); + oldestLSN, snapshotConflictHorizon, + slot_idle_secs); if (MyBackendType == B_STARTUP) (void) SendProcSignal(active_pid, @@ -1785,7 +1923,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, ReportSlotInvalidation(invalidation_cause, false, active_pid, slotname, restart_lsn, - oldestLSN, snapshotConflictHorizon); + oldestLSN, snapshotConflictHorizon, + slot_idle_secs); /* done with this slot for now */ break; @@ -1802,26 +1941,32 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause, * * Returns true when any slot have got invalidated. * - * Whether a slot needs to be invalidated depends on the cause. A slot is - * removed if it: + * Whether a slot needs to be invalidated depends on the invalidation cause. + * A slot is invalidated if it: * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given * db; dboid may be InvalidOid for shared relations - * - RS_INVAL_WAL_LEVEL: is logical + * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient + * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured + * "idle_replication_slot_timeout" duration. + * + * Note: This function attempts to invalidate the slot for multiple possible + * causes in a single pass, minimizing redundant iterations. The "cause" + * parameter can be a MASK representing one or more of the defined causes. * * NB - this runs as part of checkpoint, so avoid raising errors if possible. */ bool -InvalidateObsoleteReplicationSlots(ReplicationSlotInvalidationCause cause, +InvalidateObsoleteReplicationSlots(uint32 possible_causes, XLogSegNo oldestSegno, Oid dboid, TransactionId snapshotConflictHorizon) { XLogRecPtr oldestLSN; bool invalidated = false; - Assert(cause != RS_INVAL_HORIZON || TransactionIdIsValid(snapshotConflictHorizon)); - Assert(cause != RS_INVAL_WAL_REMOVED || oldestSegno > 0); - Assert(cause != RS_INVAL_NONE); + Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon)); + Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0); + Assert(possible_causes != RS_INVAL_NONE); if (max_replication_slots == 0) return invalidated; @@ -1837,7 +1982,7 @@ restart: if (!s->in_use) continue; - if (InvalidatePossiblyObsoleteSlot(cause, s, oldestLSN, dboid, + if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid, snapshotConflictHorizon, &invalidated)) { @@ -2426,26 +2571,37 @@ RestoreSlotFromDisk(const char *name) * ReplicationSlotInvalidationCause. */ ReplicationSlotInvalidationCause -GetSlotInvalidationCause(const char *invalidation_reason) +GetSlotInvalidationCause(const char *cause_name) { - ReplicationSlotInvalidationCause cause; - ReplicationSlotInvalidationCause result = RS_INVAL_NONE; - bool found PG_USED_FOR_ASSERTS_ONLY = false; + Assert(cause_name); - Assert(invalidation_reason); + /* Search lookup table for the cause having this name */ + for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++) + { + if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0) + return SlotInvalidationCauses[i].cause; + } + + Assert(false); + return RS_INVAL_NONE; /* to keep compiler quiet */ +} - for (cause = RS_INVAL_NONE; cause <= RS_INVAL_MAX_CAUSES; cause++) +/* + * Maps an ReplicationSlotInvalidationCause to the invalidation + * reason for a replication slot. + */ +const char * +GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause) +{ + /* Search lookup table for the name of this cause */ + for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++) { - if (strcmp(SlotInvalidationCauses[cause], invalidation_reason) == 0) - { - found = true; - result = cause; - break; - } + if (SlotInvalidationCauses[i].cause == cause) + return SlotInvalidationCauses[i].cause_name; } - Assert(found); - return result; + Assert(false); + return "none"; /* to keep compiler quiet */ } /* @@ -2802,3 +2958,22 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn) ConditionVariableCancelSleep(); } + +/* + * GUC check_hook for idle_replication_slot_timeout + * + * The value of idle_replication_slot_timeout must be set to 0 during + * a binary upgrade. See start_postmaster() in pg_upgrade for more details. + */ +bool +check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source) +{ + if (IsBinaryUpgrade && *newval != 0) + { + GUC_check_errdetail("The value of \"%s\" must be set to 0 during binary upgrade mode.", + "idle_replication_slot_timeout"); + return false; + } + + return true; +} diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 8be4b8c65b5..f652ec8a73e 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -431,7 +431,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) if (cause == RS_INVAL_NONE) nulls[i++] = true; else - values[i++] = CStringGetTextDatum(SlotInvalidationCauses[cause]); + values[i++] = CStringGetTextDatum(GetSlotInvalidationCauseName(cause)); values[i++] = BoolGetDatum(slot_contents.data.failover); diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c index ba9bae05069..9682f9dbdca 100644 --- a/src/backend/utils/adt/timestamp.c +++ b/src/backend/utils/adt/timestamp.c @@ -1787,6 +1787,24 @@ TimestampDifferenceExceeds(TimestampTz start_time, } /* + * Check if the difference between two timestamps is >= a given + * threshold (expressed in seconds). + */ +bool +TimestampDifferenceExceedsSeconds(TimestampTz start_time, + TimestampTz stop_time, + int threshold_sec) +{ + long secs; + int usecs; + + /* Calculate the difference in seconds */ + TimestampDifference(start_time, stop_time, &secs, &usecs); + + return (secs >= threshold_sec); +} + +/* * Convert a time_t to TimestampTz. * * We do not use time_t internally in Postgres, but this is provided for use diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c index aac91a6e31f..3cde94a1759 100644 --- a/src/backend/utils/misc/guc_tables.c +++ b/src/backend/utils/misc/guc_tables.c @@ -3069,6 +3069,18 @@ struct config_int ConfigureNamesInt[] = }, { + {"idle_replication_slot_timeout", PGC_SIGHUP, REPLICATION_SENDING, + gettext_noop("Sets the duration a replication slot can remain idle before " + "it is invalidated."), + NULL, + GUC_UNIT_MIN + }, + &idle_replication_slot_timeout_mins, + 0, 0, INT_MAX / SECS_PER_MINUTE, + check_idle_replication_slot_timeout, NULL, NULL + }, + + { {"commit_delay", PGC_SUSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " "flushing WAL to disk."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index d472987ed46..415f253096c 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -329,6 +329,7 @@ # (change requires restart) #wal_keep_size = 0 # in megabytes; 0 disables #max_slot_wal_keep_size = -1 # in megabytes; -1 disables +#idle_replication_slot_timeout = 0 # in minutes; 0 disables #wal_sender_timeout = 60s # in milliseconds; 0 disables #track_commit_timestamp = off # collect timestamp of transaction commit # (change requires restart) |