diff options
-rw-r--r-- | src/backend/replication/slot.c | 221 |
1 files changed, 144 insertions, 77 deletions
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index c88b803e5d0..5a0bad97f49 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1161,116 +1161,183 @@ ReplicationSlotReserveWal(void) } /* - * Mark any slot that points to an LSN older than the given segment - * as invalid; it requires WAL that's about to be removed. + * Helper for InvalidateObsoleteReplicationSlots -- acquires the given slot + * and mark it invalid, if necessary and possible. * - * NB - this runs as part of checkpoint, so avoid raising errors if possible. + * Returns whether ReplicationSlotControlLock was released in the interim (and + * in that case we're not holding the lock at return, otherwise we are). + * + * This is inherently racy, because we release the LWLock + * for syscalls, so caller must restart if we return true. */ -void -InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) +static bool +InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN) { - XLogRecPtr oldestLSN; - - XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); + int last_signaled_pid = 0; + bool released_lock = false; -restart: - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); - for (int i = 0; i < max_replication_slots; i++) + for (;;) { - ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; - XLogRecPtr restart_lsn = InvalidXLogRecPtr; + XLogRecPtr restart_lsn; NameData slotname; - int wspid; - int last_signaled_pid = 0; + int active_pid = 0; + + Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED)); if (!s->in_use) - continue; + { + if (released_lock) + LWLockRelease(ReplicationSlotControlLock); + break; + } + /* + * Check if the slot needs to be invalidated. If it needs to be + * invalidated, and is not currently acquired, acquire it and mark it + * as having been invalidated. We do this with the spinlock held to + * avoid race conditions -- for example the restart_lsn could move + * forward, or the slot could be dropped. + */ SpinLockAcquire(&s->mutex); - slotname = s->data.name; + restart_lsn = s->data.restart_lsn; - SpinLockRelease(&s->mutex); + /* + * If the slot is already invalid or is fresh enough, we don't need to + * do anything. + */ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN) - continue; - LWLockRelease(ReplicationSlotControlLock); - CHECK_FOR_INTERRUPTS(); + { + SpinLockRelease(&s->mutex); + if (released_lock) + LWLockRelease(ReplicationSlotControlLock); + break; + } + + slotname = s->data.name; + active_pid = s->active_pid; + + /* + * If the slot can be acquired, do so and mark it invalidated + * immediately. Otherwise we'll signal the owning process, below, and + * retry. + */ + if (active_pid == 0) + { + MyReplicationSlot = s; + s->active_pid = MyProcPid; + s->data.invalidated_at = restart_lsn; + s->data.restart_lsn = InvalidXLogRecPtr; + } - /* Get ready to sleep on the slot in case it is active */ - ConditionVariablePrepareToSleep(&s->active_cv); + SpinLockRelease(&s->mutex); - for (;;) + if (active_pid != 0) { /* - * Try to mark this slot as used by this process. - * - * Note that ReplicationSlotAcquireInternal(SAB_Inquire) should - * not cancel the prepared condition variable if this slot is - * active in other process. Because in this case we have to wait - * on that CV for the process owning the slot to be terminated, - * later. + * Prepare the sleep on the slot's condition variable before + * releasing the lock, to close a possible race condition if the + * slot is released before the sleep below. */ - wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire); + ConditionVariablePrepareToSleep(&s->active_cv); - /* - * Exit the loop if we successfully acquired the slot or the slot - * was dropped during waiting for the owning process to be - * terminated. For example, the latter case is likely to happen - * when the slot is temporary because it's automatically dropped - * by the termination of the owning process. - */ - if (wspid <= 0) - break; + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; /* - * Signal to terminate the process that owns the slot. + * Signal to terminate the process that owns the slot, if we + * haven't already signalled it. (Avoidance of repeated + * signalling is the only reason for there to be a loop in this + * routine; otherwise we could rely on caller's restart loop.) * - * There is the race condition where other process may own the - * slot after the process using it was terminated and before this - * process owns it. To handle this case, we signal again if the - * PID of the owning process is changed than the last. - * - * XXX This logic assumes that the same PID is not reused very - * quickly. + * There is the race condition that other process may own the slot + * after its current owner process is terminated and before this + * process owns it. To handle that, we signal only if the PID of + * the owning process has changed from the previous time. (This + * logic assumes that the same PID is not reused very quickly.) */ - if (last_signaled_pid != wspid) + if (last_signaled_pid != active_pid) { ereport(LOG, - (errmsg("terminating process %d because replication slot \"%s\" is too far behind", - wspid, NameStr(slotname)))); - (void) kill(wspid, SIGTERM); - last_signaled_pid = wspid; + (errmsg("terminating process %d to release replication slot \"%s\"", + active_pid, NameStr(slotname)))); + + (void) kill(active_pid, SIGTERM); + last_signaled_pid = active_pid; } - ConditionVariableTimedSleep(&s->active_cv, 10, - WAIT_EVENT_REPLICATION_SLOT_DROP); + /* Wait until the slot is released. */ + ConditionVariableSleep(&s->active_cv, + WAIT_EVENT_REPLICATION_SLOT_DROP); + + /* + * Re-acquire lock and start over; we expect to invalidate the slot + * next time (unless another process acquires the slot in the + * meantime). + */ + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + continue; } - ConditionVariableCancelSleep(); + else + { + /* + * We hold the slot now and have already invalidated it; flush it + * to ensure that state persists. + * + * Don't want to hold ReplicationSlotControlLock across file + * system operations, so release it now but be sure to tell caller + * to restart from scratch. + */ + LWLockRelease(ReplicationSlotControlLock); + released_lock = true; - /* - * Do nothing here and start from scratch if the slot has already been - * dropped. - */ - if (wspid == -1) - goto restart; + /* Make sure the invalidated state persists across server restart */ + ReplicationSlotMarkDirty(); + ReplicationSlotSave(); + ReplicationSlotRelease(); - ereport(LOG, - (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", - NameStr(slotname), - LSN_FORMAT_ARGS(restart_lsn)))); + ereport(LOG, + (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", + NameStr(slotname), + LSN_FORMAT_ARGS(restart_lsn)))); - SpinLockAcquire(&s->mutex); - s->data.invalidated_at = s->data.restart_lsn; - s->data.restart_lsn = InvalidXLogRecPtr; - SpinLockRelease(&s->mutex); + /* done with this slot for now */ + break; + } + } - /* Make sure the invalidated state persists across server restart */ - ReplicationSlotMarkDirty(); - ReplicationSlotSave(); - ReplicationSlotRelease(); + Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock)); - /* if we did anything, start from scratch */ - goto restart; + return released_lock; +} + +/* + * Mark any slot that points to an LSN older than the given segment + * as invalid; it requires WAL that's about to be removed. + * + * NB - this runs as part of checkpoint, so avoid raising errors if possible. + */ +void +InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno) +{ + XLogRecPtr oldestLSN; + + XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN); + +restart: + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + for (int i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (!s->in_use) + continue; + + if (InvalidatePossiblyObsoleteSlot(s, oldestLSN)) + { + /* if the lock was released, start from scratch */ + goto restart; + } } LWLockRelease(ReplicationSlotControlLock); } |