diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/slot.c | 11 | ||||
-rw-r--r-- | src/backend/replication/slotfuncs.c | 103 |
2 files changed, 55 insertions, 59 deletions
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index e1a2d8a7ae7..505445f2dc8 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -1099,7 +1099,7 @@ restart: { ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; XLogRecPtr restart_lsn = InvalidXLogRecPtr; - char *slotname; + NameData slotname; if (!s->in_use) continue; @@ -1112,7 +1112,7 @@ restart: continue; } - slotname = pstrdup(NameStr(s->data.name)); + slotname = s->data.name; restart_lsn = s->data.restart_lsn; SpinLockRelease(&s->mutex); @@ -1120,7 +1120,8 @@ restart: for (;;) { - int wspid = ReplicationSlotAcquire(slotname, SAB_Inquire); + int wspid = ReplicationSlotAcquire(NameStr(slotname), + SAB_Inquire); /* no walsender? success! */ if (wspid == 0) @@ -1128,7 +1129,7 @@ restart: ereport(LOG, (errmsg("terminating walsender %d because replication slot \"%s\" is too far behind", - wspid, slotname))); + wspid, NameStr(slotname)))); (void) kill(wspid, SIGTERM); ConditionVariableTimedSleep(&s->active_cv, 10, @@ -1138,7 +1139,7 @@ restart: ereport(LOG, (errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size", - slotname, + NameStr(slotname), (uint32) (restart_lsn >> 32), (uint32) restart_lsn))); diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 6fed3cfd23b..1b929a603e5 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -278,18 +278,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) for (slotno = 0; slotno < max_replication_slots; slotno++) { ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno]; + ReplicationSlot slot_contents; Datum values[PG_GET_REPLICATION_SLOTS_COLS]; bool nulls[PG_GET_REPLICATION_SLOTS_COLS]; - - ReplicationSlotPersistency persistency; - TransactionId xmin; - TransactionId catalog_xmin; - XLogRecPtr restart_lsn; - XLogRecPtr confirmed_flush_lsn; - pid_t active_pid; - Oid database; - NameData slot_name; - NameData plugin; WALAvailability walstate; XLogSegNo last_removed_seg; int i; @@ -297,69 +288,61 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) if (!slot->in_use) continue; + /* Copy slot contents while holding spinlock, then examine at leisure */ SpinLockAcquire(&slot->mutex); - - xmin = slot->data.xmin; - catalog_xmin = slot->data.catalog_xmin; - database = slot->data.database; - restart_lsn = slot->data.restart_lsn; - confirmed_flush_lsn = slot->data.confirmed_flush; - namecpy(&slot_name, &slot->data.name); - namecpy(&plugin, &slot->data.plugin); - active_pid = slot->active_pid; - persistency = slot->data.persistency; - + slot_contents = *slot; SpinLockRelease(&slot->mutex); + memset(values, 0, sizeof(values)); memset(nulls, 0, sizeof(nulls)); i = 0; - values[i++] = NameGetDatum(&slot_name); + values[i++] = NameGetDatum(&slot_contents.data.name); - if (database == InvalidOid) + if (slot_contents.data.database == InvalidOid) nulls[i++] = true; else - values[i++] = NameGetDatum(&plugin); + values[i++] = NameGetDatum(&slot_contents.data.plugin); - if (database == InvalidOid) + if (slot_contents.data.database == InvalidOid) values[i++] = CStringGetTextDatum("physical"); else values[i++] = CStringGetTextDatum("logical"); - if (database == InvalidOid) + if (slot_contents.data.database == InvalidOid) nulls[i++] = true; else - values[i++] = database; + values[i++] = ObjectIdGetDatum(slot_contents.data.database); - values[i++] = BoolGetDatum(persistency == RS_TEMPORARY); - values[i++] = BoolGetDatum(active_pid != 0); + values[i++] = BoolGetDatum(slot_contents.data.persistency == RS_TEMPORARY); + values[i++] = BoolGetDatum(slot_contents.active_pid != 0); - if (active_pid != 0) - values[i++] = Int32GetDatum(active_pid); + if (slot_contents.active_pid != 0) + values[i++] = Int32GetDatum(slot_contents.active_pid); else nulls[i++] = true; - if (xmin != InvalidTransactionId) - values[i++] = TransactionIdGetDatum(xmin); + if (slot_contents.data.xmin != InvalidTransactionId) + values[i++] = TransactionIdGetDatum(slot_contents.data.xmin); else nulls[i++] = true; - if (catalog_xmin != InvalidTransactionId) - values[i++] = TransactionIdGetDatum(catalog_xmin); + if (slot_contents.data.catalog_xmin != InvalidTransactionId) + values[i++] = TransactionIdGetDatum(slot_contents.data.catalog_xmin); else nulls[i++] = true; - if (restart_lsn != InvalidXLogRecPtr) - values[i++] = LSNGetDatum(restart_lsn); + if (slot_contents.data.restart_lsn != InvalidXLogRecPtr) + values[i++] = LSNGetDatum(slot_contents.data.restart_lsn); else nulls[i++] = true; - if (confirmed_flush_lsn != InvalidXLogRecPtr) - values[i++] = LSNGetDatum(confirmed_flush_lsn); + if (slot_contents.data.confirmed_flush != InvalidXLogRecPtr) + values[i++] = LSNGetDatum(slot_contents.data.confirmed_flush); else nulls[i++] = true; - walstate = GetWALAvailability(restart_lsn); + walstate = GetWALAvailability(slot_contents.data.restart_lsn); switch (walstate) { @@ -378,6 +361,9 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) case WALAVAIL_REMOVED: values[i++] = CStringGetTextDatum("lost"); break; + + default: + elog(ERROR, "invalid walstate: %d", (int) walstate); } if (max_slot_wal_keep_size_mb >= 0 && @@ -393,8 +379,11 @@ pg_get_replication_slots(PG_FUNCTION_ARGS) else nulls[i++] = true; + Assert(i == PG_GET_REPLICATION_SLOTS_COLS); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); } + LWLockRelease(ReplicationSlotControlLock); tuplestore_donestoring(tupstore); @@ -653,6 +642,8 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) Name src_name = PG_GETARG_NAME(0); Name dst_name = PG_GETARG_NAME(1); ReplicationSlot *src = NULL; + ReplicationSlot first_slot_contents; + ReplicationSlot second_slot_contents; XLogRecPtr src_restart_lsn; bool src_islogical; bool temporary; @@ -692,13 +683,10 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) if (s->in_use && strcmp(NameStr(s->data.name), NameStr(*src_name)) == 0) { + /* Copy the slot contents while holding spinlock */ SpinLockAcquire(&s->mutex); - src_islogical = SlotIsLogical(s); - src_restart_lsn = s->data.restart_lsn; - temporary = s->data.persistency == RS_TEMPORARY; - plugin = logical_slot ? pstrdup(NameStr(s->data.plugin)) : NULL; + first_slot_contents = *s; SpinLockRelease(&s->mutex); - src = s; break; } @@ -711,6 +699,11 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", NameStr(*src_name)))); + src_islogical = SlotIsLogical(&first_slot_contents); + src_restart_lsn = first_slot_contents.data.restart_lsn; + temporary = (first_slot_contents.data.persistency == RS_TEMPORARY); + plugin = logical_slot ? NameStr(first_slot_contents.data.plugin) : NULL; + /* Check type of replication slot */ if (src_islogical != logical_slot) ereport(ERROR, @@ -775,18 +768,20 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot) /* Copy data of source slot again */ SpinLockAcquire(&src->mutex); - copy_effective_xmin = src->effective_xmin; - copy_effective_catalog_xmin = src->effective_catalog_xmin; + second_slot_contents = *src; + SpinLockRelease(&src->mutex); - copy_xmin = src->data.xmin; - copy_catalog_xmin = src->data.catalog_xmin; - copy_restart_lsn = src->data.restart_lsn; - copy_confirmed_flush = src->data.confirmed_flush; + copy_effective_xmin = second_slot_contents.effective_xmin; + copy_effective_catalog_xmin = second_slot_contents.effective_catalog_xmin; + + copy_xmin = second_slot_contents.data.xmin; + copy_catalog_xmin = second_slot_contents.data.catalog_xmin; + copy_restart_lsn = second_slot_contents.data.restart_lsn; + copy_confirmed_flush = second_slot_contents.data.confirmed_flush; /* for existence check */ - copy_name = pstrdup(NameStr(src->data.name)); - copy_islogical = SlotIsLogical(src); - SpinLockRelease(&src->mutex); + copy_name = NameStr(second_slot_contents.data.name); + copy_islogical = SlotIsLogical(&second_slot_contents); /* * Check if the source slot still exists and is valid. We regard it as |