aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/slotfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/slotfuncs.c')
-rw-r--r--src/backend/replication/slotfuncs.c40
1 files changed, 30 insertions, 10 deletions
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 88033a79b21..9fe147bf44e 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -242,6 +242,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
+ XLogRecPtr currlsn;
int slotno;
/* check to see if caller supports us returning a tuplestore */
@@ -274,6 +275,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext);
+ currlsn = GetXLogWriteRecPtr();
+
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (slotno = 0; slotno < max_replication_slots; slotno++)
{
@@ -282,7 +285,6 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
WALAvailability walstate;
- XLogSegNo last_removed_seg;
int i;
if (!slot->in_use)
@@ -380,6 +382,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
* we looked. If checkpointer signalled the process to
* termination, then it's definitely lost; but if a process is
* still alive, then "unreserved" seems more appropriate.
+ *
+ * If we do change it, save the state for safe_wal_size below.
*/
if (!XLogRecPtrIsInvalid(slot_contents.data.restart_lsn))
{
@@ -387,10 +391,12 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
SpinLockAcquire(&slot->mutex);
pid = slot->active_pid;
+ slot_contents.data.restart_lsn = slot->data.restart_lsn;
SpinLockRelease(&slot->mutex);
if (pid != 0)
{
values[i++] = CStringGetTextDatum("unreserved");
+ walstate = WALAVAIL_UNRESERVED;
break;
}
}
@@ -398,18 +404,32 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
break;
}
- if (max_slot_wal_keep_size_mb >= 0 &&
- (walstate == WALAVAIL_RESERVED || walstate == WALAVAIL_EXTENDED) &&
- ((last_removed_seg = XLogGetLastRemovedSegno()) != 0))
+ /*
+ * safe_wal_size is only computed for slots that have not been lost,
+ * and only if there's a configured maximum size.
+ */
+ if (walstate == WALAVAIL_REMOVED || max_slot_wal_keep_size_mb < 0)
+ nulls[i++] = true;
+ else
{
- XLogRecPtr min_safe_lsn;
+ XLogSegNo targetSeg;
+ XLogSegNo keepSegs;
+ XLogSegNo failSeg;
+ XLogRecPtr failLSN;
- XLogSegNoOffsetToRecPtr(last_removed_seg + 1, 0,
- wal_segment_size, min_safe_lsn);
- values[i++] = Int64GetDatum(min_safe_lsn);
+ XLByteToSeg(slot_contents.data.restart_lsn, targetSeg, wal_segment_size);
+
+ /* determine how many segments slots can be kept by slots ... */
+ keepSegs = XLogMBVarToSegs(max_slot_wal_keep_size_mb, wal_segment_size);
+ /* ... and override by wal_keep_segments as needed */
+ keepSegs = Max(keepSegs, wal_keep_segments);
+
+ /* if currpos reaches failLSN, we lose our segment */
+ failSeg = targetSeg + keepSegs + 1;
+ XLogSegNoOffsetToRecPtr(failSeg, 0, wal_segment_size, failLSN);
+
+ values[i++] = Int64GetDatum(failLSN - currlsn);
}
- else
- nulls[i++] = true;
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);