aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/replication/slot.c11
-rw-r--r--src/backend/replication/slotfuncs.c103
2 files changed, 55 insertions, 59 deletions
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index e1a2d8a7ae7..505445f2dc8 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1099,7 +1099,7 @@ restart:
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn = InvalidXLogRecPtr;
- char *slotname;
+ NameData slotname;
if (!s->in_use)
continue;
@@ -1112,7 +1112,7 @@ restart:
continue;
}
- slotname = pstrdup(NameStr(s->data.name));
+ slotname = s->data.name;
restart_lsn = s->data.restart_lsn;
SpinLockRelease(&s->mutex);
@@ -1120,7 +1120,8 @@ restart:
for (;;)
{
- int wspid = ReplicationSlotAcquire(slotname, SAB_Inquire);
+ int wspid = ReplicationSlotAcquire(NameStr(slotname),
+ SAB_Inquire);
/* no walsender? success! */
if (wspid == 0)
@@ -1128,7 +1129,7 @@ restart:
ereport(LOG,
(errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
- wspid, slotname)));
+ wspid, NameStr(slotname))));
(void) kill(wspid, SIGTERM);
ConditionVariableTimedSleep(&s->active_cv, 10,
@@ -1138,7 +1139,7 @@ restart:
ereport(LOG,
(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
- slotname,
+ NameStr(slotname),
(uint32) (restart_lsn >> 32),
(uint32) restart_lsn)));
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6fed3cfd23b..1b929a603e5 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -278,18 +278,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
for (slotno = 0; slotno < max_replication_slots; slotno++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
+ ReplicationSlot slot_contents;
Datum values[PG_GET_REPLICATION_SLOTS_COLS];
bool nulls[PG_GET_REPLICATION_SLOTS_COLS];
-
- ReplicationSlotPersistency persistency;
- TransactionId xmin;
- TransactionId catalog_xmin;
- XLogRecPtr restart_lsn;
- XLogRecPtr confirmed_flush_lsn;
- pid_t active_pid;
- Oid database;
- NameData slot_name;
- NameData plugin;
WALAvailability walstate;
XLogSegNo last_removed_seg;
int i;
@@ -297,69 +288,61 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
if (!slot->in_use)
continue;
+ /* Copy slot contents while holding spinlock, then examine at leisure */
SpinLockAcquire(&slot->mutex);
-
- xmin = slot->data.xmin;
- catalog_xmin = slot->data.catalog_xmin;
- database = slot->data.database;
- restart_lsn = slot->data.restart_lsn;
- confirmed_flush_lsn = slot->data.confirmed_flush;
- namecpy(&slot_name, &slot->data.name);
- namecpy(&plugin, &slot->data.plugin);
- active_pid = slot->active_pid;
- persistency = slot->data.persistency;
-
+ slot_contents = *slot;
SpinLockRelease(&slot->mutex);
+ memset(values, 0, sizeof(values));
memset(nulls, 0, sizeof(nulls));
i = 0;
- values[i++] = NameGetDatum(&slot_name);
+ values[i++] = NameGetDatum(&slot_contents.data.name);
- if (database == InvalidOid)
+ if (slot_contents.data.database == InvalidOid)
nulls[i++] = true;
else
- values[i++] = NameGetDatum(&plugin);
+ values[i++] = NameGetDatum(&slot_contents.data.plugin);
- if (database == InvalidOid)
+ if (slot_contents.data.database == InvalidOid)
values[i++] = CStringGetTextDatum("physical");
else
values[i++] = CStringGetTextDatum("logical");
- if (database == InvalidOid)
+ if (slot_contents.data.database == InvalidOid)
nulls[i++] = true;
else
- values[i++] = database;
+ values[i++] = ObjectIdGetDatum(slot_contents.data.database);
- values[i++] = BoolGetDatum(persistency == RS_TEMPORARY);
- values[i++] = BoolGetDatum(active_pid != 0);
+ values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY);
+ values[i++] = BoolGetDatum(slot_contents.active_pid != 0);
- if (active_pid != 0)
- values[i++] = Int32GetDatum(active_pid);
+ if (slot_contents.active_pid != 0)
+ values[i++] = Int32GetDatum(slot_contents.active_pid);
else
nulls[i++] = true;
- if (xmin != InvalidTransactionId)
- values[i++] = TransactionIdGetDatum(xmin);
+ if (slot_contents.data.xmin != InvalidTransactionId)
+ values[i++] = TransactionIdGetDatum(slot_contents.data.xmin);
else
nulls[i++] = true;
- if (catalog_xmin != InvalidTransactionId)
- values[i++] = TransactionIdGetDatum(catalog_xmin);
+ if (slot_contents.data.catalog_xmin != InvalidTransactionId)
+ values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin);
else
nulls[i++] = true;
- if (restart_lsn != InvalidXLogRecPtr)
- values[i++] = LSNGetDatum(restart_lsn);
+ if (slot_contents.data.restart_lsn != InvalidXLogRecPtr)
+ values[i++] = LSNGetDatum(slot_contents.data.restart_lsn);
else
nulls[i++] = true;
- if (confirmed_flush_lsn != InvalidXLogRecPtr)
- values[i++] = LSNGetDatum(confirmed_flush_lsn);
+ if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr)
+ values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush);
else
nulls[i++] = true;
- walstate = GetWALAvailability(restart_lsn);
+ walstate = GetWALAvailability(slot_contents.data.restart_lsn);
switch (walstate)
{
@@ -378,6 +361,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
case WALAVAIL_REMOVED:
values[i++] = CStringGetTextDatum("lost");
break;
+
+ default:
+ elog(ERROR, "invalid walstate: %d", (int) walstate);
}
if (max_slot_wal_keep_size_mb >= 0 &&
@@ -393,8 +379,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
else
nulls[i++] = true;
+ Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
+
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
+
LWLockRelease(ReplicationSlotControlLock);
tuplestore_donestoring(tupstore);
@@ -653,6 +642,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
Name src_name = PG_GETARG_NAME(0);
Name dst_name = PG_GETARG_NAME(1);
ReplicationSlot *src = NULL;
+ ReplicationSlot first_slot_contents;
+ ReplicationSlot second_slot_contents;
XLogRecPtr src_restart_lsn;
bool src_islogical;
bool temporary;
@@ -692,13 +683,10 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0)
{
+ /* Copy the slot contents while holding spinlock */
SpinLockAcquire(&s->mutex);
- src_islogical = SlotIsLogical(s);
- src_restart_lsn = s->data.restart_lsn;
- temporary = s->data.persistency == RS_TEMPORARY;
- plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL;
+ first_slot_contents = *s;
SpinLockRelease(&s->mutex);
-
src = s;
break;
}
@@ -711,6 +699,11 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", NameStr(*src_name))));
+ src_islogical = SlotIsLogical(&first_slot_contents);
+ src_restart_lsn = first_slot_contents.data.restart_lsn;
+ temporary = (first_slot_contents.data.persistency == RS_TEMPORARY);
+ plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL;
+
/* Check type of replication slot */
if (src_islogical != logical_slot)
ereport(ERROR,
@@ -775,18 +768,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
/* Copy data of source slot again */
SpinLockAcquire(&src->mutex);
- copy_effective_xmin = src->effective_xmin;
- copy_effective_catalog_xmin = src->effective_catalog_xmin;
+ second_slot_contents = *src;
+ SpinLockRelease(&src->mutex);
- copy_xmin = src->data.xmin;
- copy_catalog_xmin = src->data.catalog_xmin;
- copy_restart_lsn = src->data.restart_lsn;
- copy_confirmed_flush = src->data.confirmed_flush;
+ copy_effective_xmin = second_slot_contents.effective_xmin;
+ copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin;
+
+ copy_xmin = second_slot_contents.data.xmin;
+ copy_catalog_xmin = second_slot_contents.data.catalog_xmin;
+ copy_restart_lsn = second_slot_contents.data.restart_lsn;
+ copy_confirmed_flush = second_slot_contents.data.confirmed_flush;
/* for existence check */
- copy_name = pstrdup(NameStr(src->data.name));
- copy_islogical = SlotIsLogical(src);
- SpinLockRelease(&src->mutex);
+ copy_name = NameStr(second_slot_contents.data.name);
+ copy_islogical = SlotIsLogical(&second_slot_contents);
/*
* Check if the source slot still exists and is valid. We regard it as