aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logicalfuncs.c2
-rw-r--r--src/backend/replication/slot.c53
-rw-r--r--src/backend/replication/slotfuncs.c2
-rw-r--r--src/backend/replication/walsender.c4
-rw-r--r--src/include/replication/slot.h10
5 files changed, 18 insertions, 53 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 01d354829b9..1f38c5b33ea 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -225,7 +225,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
else
end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
- (void) ReplicationSlotAcquire(NameStr(*name), SAB_Error);
+ ReplicationSlotAcquire(NameStr(*name), true);
PG_TRY();
{
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 5a0bad97f49..a9a06b9a38a 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,8 +99,6 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 0; /* the maximum number of replication
* slots */
-static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
- const char *name, SlotAcquireBehavior behavior);
static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
@@ -374,34 +372,16 @@ SearchNamedReplicationSlot(const char *name, bool need_lock)
/*
* Find a previously created slot and mark it as used by this process.
*
- * The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, -1 if the slot no longer
- * exists, or the PID of the owning process otherwise. If behavior is
- * SAB_Error, then trying to acquire an owned slot is an error.
- * If SAB_Block, we sleep until the slot is released by the owning process.
+ * An error is raised if nowait is true and the slot is currently in use. If
+ * nowait is false, we sleep until the slot is released by the owning process.
*/
-int
-ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
-{
- return ReplicationSlotAcquireInternal(NULL, name, behavior);
-}
-
-/*
- * Mark the specified slot as used by this process.
- *
- * Only one of slot and name can be specified.
- * If slot == NULL, search for the slot with the given name.
- *
- * See comments about the return value in ReplicationSlotAcquire().
- */
-static int
-ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
- SlotAcquireBehavior behavior)
+void
+ReplicationSlotAcquire(const char *name, bool nowait)
{
ReplicationSlot *s;
int active_pid;
- AssertArg((slot == NULL) ^ (name == NULL));
+ AssertArg(name != NULL);
retry:
Assert(MyReplicationSlot == NULL);
@@ -412,17 +392,15 @@ retry:
* Search for the slot with the specified name if the slot to acquire is
* not given. If the slot is not found, we either return -1 or error out.
*/
- s = slot ? slot : SearchNamedReplicationSlot(name, false);
+ s = SearchNamedReplicationSlot(name, false);
if (s == NULL || !s->in_use)
{
LWLockRelease(ReplicationSlotControlLock);
- if (behavior == SAB_Inquire)
- return -1;
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist",
- name ? name : NameStr(slot->data.name))));
+ name)));
}
/*
@@ -436,7 +414,7 @@ retry:
* (We may end up not sleeping, but we don't want to do this while
* holding the spinlock.)
*/
- if (behavior == SAB_Block)
+ if (!nowait)
ConditionVariablePrepareToSleep(&s->active_cv);
SpinLockAcquire(&s->mutex);
@@ -456,13 +434,11 @@ retry:
*/
if (active_pid != MyProcPid)
{
- if (behavior == SAB_Error)
+ if (!nowait)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active for PID %d",
NameStr(s->data.name), active_pid)));
- else if (behavior == SAB_Inquire)
- return active_pid;
/* Wait here until we get signaled, and then restart */
ConditionVariableSleep(&s->active_cv,
@@ -470,7 +446,7 @@ retry:
ConditionVariableCancelSleep();
goto retry;
}
- else if (behavior == SAB_Block)
+ else if (!nowait)
ConditionVariableCancelSleep(); /* no sleep needed after all */
/* Let everybody know we've modified this slot */
@@ -478,9 +454,6 @@ retry:
/* We made this slot active, so it's ours now. */
MyReplicationSlot = s;
-
- /* success */
- return 0;
}
/*
@@ -588,7 +561,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
{
Assert(MyReplicationSlot == NULL);
- (void) ReplicationSlotAcquire(name, nowait ? SAB_Error : SAB_Block);
+ ReplicationSlotAcquire(name, nowait);
ReplicationSlotDropAcquired();
}
@@ -1271,8 +1244,8 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlot *s, XLogRecPtr oldestLSN)
WAIT_EVENT_REPLICATION_SLOT_DROP);
/*
- * Re-acquire lock and start over; we expect to invalidate the slot
- * next time (unless another process acquires the slot in the
+ * Re-acquire lock and start over; we expect to invalidate the
+ * slot next time (unless another process acquires the slot in the
* meantime).
*/
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index e4e6632f82e..31e74d38322 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -639,7 +639,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
moveto = Min(moveto, GetXLogReplayRecPtr(&ThisTimeLineID));
/* Acquire the slot so we "own" it */
- (void) ReplicationSlotAcquire(NameStr(*slotname), SAB_Error);
+ ReplicationSlotAcquire(NameStr(*slotname), true);
/* A slot whose restart_lsn has never been reserved cannot be advanced */
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 109c723f4e1..32245363561 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -601,7 +601,7 @@ StartReplication(StartReplicationCmd *cmd)
if (cmd->slotname)
{
- (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
+ ReplicationSlotAcquire(cmd->slotname, true);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1137,7 +1137,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
Assert(!MyReplicationSlot);
- (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
+ ReplicationSlotAcquire(cmd->slotname, true);
if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
ereport(ERROR,
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 357068403a1..2eb7e3a530d 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -37,14 +37,6 @@ typedef enum ReplicationSlotPersistency
RS_TEMPORARY
} ReplicationSlotPersistency;
-/* For ReplicationSlotAcquire, q.v. */
-typedef enum SlotAcquireBehavior
-{
- SAB_Error,
- SAB_Block,
- SAB_Inquire
-} SlotAcquireBehavior;
-
/*
* On-Disk data of a replication slot, preserved across restarts.
*/
@@ -208,7 +200,7 @@ extern void ReplicationSlotCreate(const char *name, bool db_specific,
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
-extern int ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior);
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);
extern void ReplicationSlotCleanup(void);
extern void ReplicationSlotSave(void);