aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2017-07-25 13:26:49 -0400
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2017-07-25 13:26:49 -0400
commit9915de6c1cb2c9b87f5f504c97832cdf3a809753 (patch)
treea13c697f0eaa0b518c92b0384afec2752a8c762b /src
parent4132dbec69dd4d437e132e57a74a98a40cdcf776 (diff)
downloadpostgresql-9915de6c1cb2c9b87f5f504c97832cdf3a809753.tar.gz
postgresql-9915de6c1cb2c9b87f5f504c97832cdf3a809753.zip
Fix race conditions in replication slot operations
It is relatively easy to get a replication slot to look as still active while one process is in the process of getting rid of it; when some other process tries to "acquire" the slot, it would fail with an error message of "replication slot XYZ is active for PID N". The error message in itself is fine, except that when the intention is to drop the slot, it is unhelpful: the useful behavior would be to wait until the slot is no longer acquired, so that the drop can proceed. To implement this, we use a condition variable so that slot acquisition can be told to wait on that condition variable if the slot is already acquired, and we make any change in active_pid broadcast a signal on the condition variable. Thus, as soon as the slot is released, the drop will proceed properly. Reported by: Tom Lane Discussion: https://postgr.es/m/11904.1499039688@sss.pgh.pa.us Authors: Petr Jelínek, Álvaro Herrera
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logicalfuncs.c2
-rw-r--r--src/backend/replication/slot.c122
-rw-r--r--src/backend/replication/slotfuncs.c34
-rw-r--r--src/backend/replication/walsender.c6
-rw-r--r--src/include/replication/slot.h10
5 files changed, 118 insertions, 56 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 363ca82cb0b..a3ba2b1266c 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
else
end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
- ReplicationSlotAcquire(NameStr(*name));
+ ReplicationSlotAcquire(NameStr(*name), true);
PG_TRY();
{
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index dc7de20e113..08c0b1b285f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -157,6 +157,7 @@ ReplicationSlotsShmemInit(void)
/* everything else is zeroed by the memset above */
SpinLockInit(&slot->mutex);
LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
+ ConditionVariableInit(&slot->active_cv);
}
}
}
@@ -313,25 +314,35 @@ ReplicationSlotCreate(const char *name, bool db_specific,
LWLockRelease(ReplicationSlotControlLock);
/*
- * Now that the slot has been marked as in_use and in_active, it's safe to
+ * Now that the slot has been marked as in_use and active, it's safe to
* let somebody else try to allocate a slot.
*/
LWLockRelease(ReplicationSlotAllocationLock);
+
+ /* Let everybody know we've modified this slot */
+ ConditionVariableBroadcast(&slot->active_cv);
}
/*
* Find a previously created slot and mark it as used by this backend.
*/
void
-ReplicationSlotAcquire(const char *name)
+ReplicationSlotAcquire(const char *name, bool nowait)
{
- ReplicationSlot *slot = NULL;
+ ReplicationSlot *slot;
+ int active_pid;
int i;
- int active_pid = 0; /* Keep compiler quiet */
+retry:
Assert(MyReplicationSlot == NULL);
- /* Search for the named slot and mark it active if we find it. */
+ /*
+ * Search for the named slot and mark it active if we find it. If the
+ * slot is already active, we exit the loop with active_pid set to the PID
+ * of the backend that owns it.
+ */
+ active_pid = 0;
+ slot = NULL;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
@@ -339,35 +350,66 @@ ReplicationSlotAcquire(const char *name)
if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
{
+ /*
+ * This is the slot we want. We don't know yet if it's active,
+ * so get ready to sleep on it in case it is. (We may end up not
+ * sleeping, but we don't want to do this while holding the
+ * spinlock.)
+ */
+ ConditionVariablePrepareToSleep(&s->active_cv);
+
SpinLockAcquire(&s->mutex);
+
active_pid = s->active_pid;
if (active_pid == 0)
active_pid = s->active_pid = MyProcPid;
+
SpinLockRelease(&s->mutex);
slot = s;
+
break;
}
}
LWLockRelease(ReplicationSlotControlLock);
- /* If we did not find the slot or it was already active, error out. */
+ /* If we did not find the slot, error out. */
if (slot == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", name)));
+
+ /*
+ * If we found the slot but it's already active in another backend, we
+ * either error out or retry after a short wait, as caller specified.
+ */
if (active_pid != MyProcPid)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("replication slot \"%s\" is active for PID %d",
- name, active_pid)));
+ {
+ if (nowait)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication slot \"%s\" is active for PID %d",
+ name, active_pid)));
+
+ /* Wait here until we get signaled, and then restart */
+ ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK);
+ ConditionVariableCancelSleep();
+ goto retry;
+ }
+ else
+ ConditionVariableCancelSleep(); /* no sleep needed after all */
+
+ /* Let everybody know we've modified this slot */
+ ConditionVariableBroadcast(&slot->active_cv);
/* We made this slot active, so it's ours now. */
MyReplicationSlot = slot;
}
/*
- * Release a replication slot, this or another backend can ReAcquire it
- * later. Resources this slot requires will be preserved.
+ * Release the replication slot that this backend considers to own.
+ *
+ * This or another backend can re-acquire the slot later.
+ * Resources this slot requires will be preserved.
*/
void
ReplicationSlotRelease(void)
@@ -385,17 +427,6 @@ ReplicationSlotRelease(void)
*/
ReplicationSlotDropAcquired();
}
- else if (slot->data.persistency == RS_PERSISTENT)
- {
- /*
- * Mark persistent slot inactive. We're not freeing it, just
- * disconnecting.
- */
- SpinLockAcquire(&slot->mutex);
- slot->active_pid = 0;
- SpinLockRelease(&slot->mutex);
- }
-
/*
* If slot needed to temporarily restrain both data and catalog xmin to
@@ -412,6 +443,18 @@ ReplicationSlotRelease(void)
ReplicationSlotsComputeRequiredXmin(false);
}
+ if (slot->data.persistency == RS_PERSISTENT)
+ {
+ /*
+ * Mark persistent slot inactive. We're not freeing it, just
+ * disconnecting, but wake up others that may be waiting for it.
+ */
+ SpinLockAcquire(&slot->mutex);
+ slot->active_pid = 0;
+ SpinLockRelease(&slot->mutex);
+ ConditionVariableBroadcast(&slot->active_cv);
+ }
+
MyReplicationSlot = NULL;
/* might not have been set when we've been a plain slot */
@@ -430,32 +473,43 @@ ReplicationSlotCleanup(void)
Assert(MyReplicationSlot == NULL);
- /*
- * No need for locking as we are only interested in slots active in
- * current process and those are not touched by other processes.
- */
+restart:
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ if (!s->in_use)
+ continue;
+
+ SpinLockAcquire(&s->mutex);
if (s->active_pid == MyProcPid)
{
- Assert(s->in_use && s->data.persistency == RS_TEMPORARY);
+ Assert(s->data.persistency == RS_TEMPORARY);
+ SpinLockRelease(&s->mutex);
+ LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
ReplicationSlotDropPtr(s);
+
+ ConditionVariableBroadcast(&s->active_cv);
+ goto restart;
}
+ else
+ SpinLockRelease(&s->mutex);
}
+
+ LWLockRelease(ReplicationSlotControlLock);
}
/*
* Permanently drop replication slot identified by the passed in name.
*/
void
-ReplicationSlotDrop(const char *name)
+ReplicationSlotDrop(const char *name, bool nowait)
{
Assert(MyReplicationSlot == NULL);
- ReplicationSlotAcquire(name);
+ ReplicationSlotAcquire(name, nowait);
ReplicationSlotDropAcquired();
}
@@ -527,6 +581,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
slot->active_pid = 0;
SpinLockRelease(&slot->mutex);
+ /* wake up anyone waiting on this slot */
+ ConditionVariableBroadcast(&slot->active_cv);
+
ereport(fail_softly ? WARNING : ERROR,
(errcode_for_file_access(),
errmsg("could not rename file \"%s\" to \"%s\": %m",
@@ -535,15 +592,18 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
/*
* The slot is definitely gone. Lock out concurrent scans of the array
- * long enough to kill it. It's OK to clear the active flag here without
+ * long enough to kill it. It's OK to clear the active PID here without
* grabbing the mutex because nobody else can be scanning the array here,
* and nobody can be attached to this slot and thus access it without
* scanning the array.
+ *
+ * Also wake up processes waiting for it.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
slot->active_pid = 0;
slot->in_use = false;
LWLockRelease(ReplicationSlotControlLock);
+ ConditionVariableBroadcast(&slot->active_cv);
/*
* Slot is dead and doesn't prevent resource removal anymore, recompute
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6dc808874d6..d4cbd83bde1 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
CheckSlotRequirements();
- ReplicationSlotDrop(NameStr(*name));
+ ReplicationSlotDrop(NameStr(*name), false);
PG_RETURN_VOID();
}
@@ -221,6 +221,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext);
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (slotno = 0; slotno < max_replication_slots; slotno++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
@@ -238,25 +239,21 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
NameData plugin;
int i;
- SpinLockAcquire(&slot->mutex);
if (!slot->in_use)
- {
- SpinLockRelease(&slot->mutex);
continue;
- }
- else
- {
- xmin = slot->data.xmin;
- catalog_xmin = slot->data.catalog_xmin;
- database = slot->data.database;
- restart_lsn = slot->data.restart_lsn;
- confirmed_flush_lsn = slot->data.confirmed_flush;
- namecpy(&slot_name, &slot->data.name);
- namecpy(&plugin, &slot->data.plugin);
-
- active_pid = slot->active_pid;
- persistency = slot->data.persistency;
- }
+
+ SpinLockAcquire(&slot->mutex);
+
+ xmin = slot->data.xmin;
+ catalog_xmin = slot->data.catalog_xmin;
+ database = slot->data.database;
+ restart_lsn = slot->data.restart_lsn;
+ confirmed_flush_lsn = slot->data.confirmed_flush;
+ namecpy(&slot_name, &slot->data.name);
+ namecpy(&plugin, &slot->data.plugin);
+ active_pid = slot->active_pid;
+ persistency = slot->data.persistency;
+
SpinLockRelease(&slot->mutex);
memset(nulls, 0, sizeof(nulls));
@@ -309,6 +306,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
+ LWLockRelease(ReplicationSlotControlLock);
tuplestore_donestoring(tupstore);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 002143b26a2..9a2babef1e6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd)
if (cmd->slotname)
{
- ReplicationSlotAcquire(cmd->slotname);
+ ReplicationSlotAcquire(cmd->slotname, true);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static void
DropReplicationSlot(DropReplicationSlotCmd *cmd)
{
- ReplicationSlotDrop(cmd->slotname);
+ ReplicationSlotDrop(cmd->slotname, false);
EndCommand("DROP_REPLICATION_SLOT", DestRemote);
}
@@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
Assert(!MyReplicationSlot);
- ReplicationSlotAcquire(cmd->slotname);
+ ReplicationSlotAcquire(cmd->slotname, true);
/*
* Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a283f4e2b86..0bf2611fe9c 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -12,6 +12,7 @@
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogreader.h"
+#include "storage/condition_variable.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
@@ -19,7 +20,7 @@
/*
* Behaviour of replication slots, upon release or crash.
*
- * Slots marked as PERSISTENT are crashsafe and will not be dropped when
+ * Slots marked as PERSISTENT are crash-safe and will not be dropped when
* released. Slots marked as EPHEMERAL will be dropped when released or after
* restarts.
*
@@ -117,6 +118,9 @@ typedef struct ReplicationSlot
/* is somebody performing io on this slot? */
LWLock io_in_progress_lock;
+ /* Condition variable signalled when active_pid changes */
+ ConditionVariable active_cv;
+
/* all the remaining data is only used for logical slots */
/*
@@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void);
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency p);
extern void ReplicationSlotPersist(void);
-extern void ReplicationSlotDrop(const char *name);
+extern void ReplicationSlotDrop(const char *name, bool nowait);
-extern void ReplicationSlotAcquire(const char *name);
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);
extern void ReplicationSlotCleanup(void);
extern void ReplicationSlotSave(void);