aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/slot.c28
-rw-r--r--src/backend/replication/slotfuncs.c8
-rw-r--r--src/include/replication/slot.h15
-rw-r--r--src/tools/pgindent/typedefs.list1
4 files changed, 41 insertions, 11 deletions
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 2293c0c6fc3..f969f7c083f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -89,7 +89,7 @@ typedef struct ReplicationSlotOnDisk
sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
#define SLOT_MAGIC 0x1051CA1 /* format identifier */
-#define SLOT_VERSION 2 /* version for new files */
+#define SLOT_VERSION 3 /* version for new files */
/* Control array for replication slot management */
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
@@ -855,8 +855,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
SpinLockAcquire(&s->mutex);
effective_xmin = s->effective_xmin;
effective_catalog_xmin = s->effective_catalog_xmin;
- invalidated = (!XLogRecPtrIsInvalid(s->data.invalidated_at) &&
- XLogRecPtrIsInvalid(s->data.restart_lsn));
+ invalidated = s->data.invalidated != RS_INVAL_NONE;
SpinLockRelease(&s->mutex);
/* invalidated slots need not apply */
@@ -901,14 +900,20 @@ ReplicationSlotsComputeRequiredLSN(void)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn;
+ bool invalidated;
if (!s->in_use)
continue;
SpinLockAcquire(&s->mutex);
restart_lsn = s->data.restart_lsn;
+ invalidated = s->data.invalidated != RS_INVAL_NONE;
SpinLockRelease(&s->mutex);
+ /* invalidated slots need not apply */
+ if (invalidated)
+ continue;
+
if (restart_lsn != InvalidXLogRecPtr &&
(min_required == InvalidXLogRecPtr ||
restart_lsn < min_required))
@@ -946,6 +951,7 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
{
ReplicationSlot *s;
XLogRecPtr restart_lsn;
+ bool invalidated;
s = &ReplicationSlotCtl->replication_slots[i];
@@ -960,8 +966,13 @@ ReplicationSlotsComputeLogicalRestartLSN(void)
/* read once, it's ok if it increases while we're checking */
SpinLockAcquire(&s->mutex);
restart_lsn = s->data.restart_lsn;
+ invalidated = s->data.invalidated != RS_INVAL_NONE;
SpinLockRelease(&s->mutex);
+ /* invalidated slots need not apply */
+ if (invalidated)
+ continue;
+
if (restart_lsn == InvalidXLogRecPtr)
continue;
@@ -1012,6 +1023,8 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
if (s->data.database != dboid)
continue;
+ /* NB: intentionally counting invalidated slots */
+
/* count slots with spinlock held */
SpinLockAcquire(&s->mutex);
(*nslots)++;
@@ -1069,6 +1082,8 @@ restart:
if (s->data.database != dboid)
continue;
+ /* NB: intentionally including invalidated slots */
+
/* acquire slot, so ReplicationSlotDropAcquired can be reused */
SpinLockAcquire(&s->mutex);
/* can't change while ReplicationSlotControlLock is held */
@@ -1294,7 +1309,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN,
{
MyReplicationSlot = s;
s->active_pid = MyProcPid;
- s->data.invalidated_at = restart_lsn;
+ s->data.invalidated = RS_INVAL_WAL_REMOVED;
+
+ /*
+ * XXX: We should consider not overwriting restart_lsn and instead
+ * just rely on .invalidated.
+ */
s->data.restart_lsn = InvalidXLogRecPtr;
/* Let caller know */
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 2f3c9648241..ad3e72be5ee 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -315,12 +315,10 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
nulls[i++] = true;
/*
- * If invalidated_at is valid and restart_lsn is invalid, we know for
- * certain that the slot has been invalidated. Otherwise, test
- * availability from restart_lsn.
+ * If the slot has not been invalidated, test availability from
+ * restart_lsn.
*/
- if (XLogRecPtrIsInvalid(slot_contents.data.restart_lsn) &&
- !XLogRecPtrIsInvalid(slot_contents.data.invalidated_at))
+ if (slot_contents.data.invalidated != RS_INVAL_NONE)
walstate = WALAVAIL_REMOVED;
else
walstate = GetWALAvailability(slot_contents.data.restart_lsn);
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 8872c80cdfe..34ce055dd50 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -38,6 +38,17 @@ typedef enum ReplicationSlotPersistency
} ReplicationSlotPersistency;
/*
+ * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
+ * 'invalidated' field is set to a value other than _NONE.
+ */
+typedef enum ReplicationSlotInvalidationCause
+{
+ RS_INVAL_NONE,
+ /* required WAL has been removed */
+ RS_INVAL_WAL_REMOVED,
+} ReplicationSlotInvalidationCause;
+
+/*
* On-Disk data of a replication slot, preserved across restarts.
*/
typedef struct ReplicationSlotPersistentData
@@ -72,8 +83,8 @@ typedef struct ReplicationSlotPersistentData
/* oldest LSN that might be required by this replication slot */
XLogRecPtr restart_lsn;
- /* restart_lsn is copied here when the slot is invalidated */
- XLogRecPtr invalidated_at;
+ /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
+ ReplicationSlotInvalidationCause invalidated;
/*
* Oldest LSN that the client has acked receipt for. This is used as the
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index df960883c5c..b4058b88c3e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2339,6 +2339,7 @@ ReplicaIdentityStmt
ReplicationKind
ReplicationSlot
ReplicationSlotCtlData
+ReplicationSlotInvalidationCause
ReplicationSlotOnDisk
ReplicationSlotPersistency
ReplicationSlotPersistentData