diff options
Diffstat (limited to 'src/backend/replication/slot.c')
-rw-r--r-- | src/backend/replication/slot.c | 69 |
1 files changed, 56 insertions, 13 deletions
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index 0b2575ee9d0..d8ed005e7ec 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -47,6 +47,7 @@ #include "storage/fd.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "utils/builtins.h" /* * Replication slot on-disk data structure. @@ -98,7 +99,9 @@ int max_replication_slots = 0; /* the maximum number of replication * slots */ static LWLockTranche ReplSlotIOLWLockTranche; + static void ReplicationSlotDropAcquired(void); +static void ReplicationSlotDropPtr(ReplicationSlot *slot); /* internal persistency functions */ static void RestoreSlotFromDisk(const char *name); @@ -329,7 +332,7 @@ ReplicationSlotAcquire(const char *name) { ReplicationSlot *slot = NULL; int i; - int active_pid = 0; + int active_pid = 0; /* Keep compiler quiet */ Assert(MyReplicationSlot == NULL); @@ -346,7 +349,7 @@ ReplicationSlotAcquire(const char *name) SpinLockAcquire(&s->mutex); active_pid = s->active_pid; if (active_pid == 0) - s->active_pid = MyProcPid; + active_pid = s->active_pid = MyProcPid; SpinLockRelease(&s->mutex); slot = s; break; @@ -359,7 +362,7 @@ ReplicationSlotAcquire(const char *name) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("replication slot \"%s\" does not exist", name))); - if (active_pid != 0) + if (active_pid != MyProcPid) ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("replication slot \"%s\" is active for PID %d", @@ -389,9 +392,12 @@ ReplicationSlotRelease(void) */ ReplicationSlotDropAcquired(); } - else + else if (slot->data.persistency == RS_PERSISTENT) { - /* Mark slot inactive. We're not freeing it, just disconnecting. */ + /* + * Mark persistent slot inactive. We're not freeing it, just + * disconnecting. + */ SpinLockAcquire(&slot->mutex); slot->active_pid = 0; SpinLockRelease(&slot->mutex); @@ -406,6 +412,33 @@ ReplicationSlotRelease(void) } /* + * Cleanup all temporary slots created in current session. + */ +void +ReplicationSlotCleanup() +{ + int i; + + Assert(MyReplicationSlot == NULL); + + /* + * No need for locking as we are only interested in slots active in + * current process and those are not touched by other processes. + */ + for (i = 0; i < max_replication_slots; i++) + { + ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i]; + + if (s->active_pid == MyProcPid) + { + Assert(s->in_use && s->data.persistency == RS_TEMPORARY); + + ReplicationSlotDropPtr(s); + } + } +} + +/* * Permanently drop replication slot identified by the passed in name. */ void @@ -419,14 +452,11 @@ ReplicationSlotDrop(const char *name) } /* - * Permanently drop the currently acquired replication slot which will be - * released by the point this function returns. + * Permanently drop the currently acquired replication slot. */ static void ReplicationSlotDropAcquired(void) { - char path[MAXPGPATH]; - char tmppath[MAXPGPATH]; ReplicationSlot *slot = MyReplicationSlot; Assert(MyReplicationSlot != NULL); @@ -434,6 +464,19 @@ ReplicationSlotDropAcquired(void) /* slot isn't acquired anymore */ MyReplicationSlot = NULL; + ReplicationSlotDropPtr(slot); +} + +/* + * Permanently drop the replication slot which will be released by the point + * this function returns. + */ +static void +ReplicationSlotDropPtr(ReplicationSlot *slot) +{ + char path[MAXPGPATH]; + char tmppath[MAXPGPATH]; + /* * If some other backend ran this code concurrently with us, we might try * to delete a slot with a certain name while someone else was trying to @@ -448,9 +491,9 @@ ReplicationSlotDropAcquired(void) /* * Rename the slot directory on disk, so that we'll no longer recognize * this as a valid slot. Note that if this fails, we've got to mark the - * slot inactive before bailing out. If we're dropping an ephemeral slot, - * we better never fail hard as the caller won't expect the slot to - * survive and this might get called during error handling. + * slot inactive before bailing out. If we're dropping an ephemeral or + * a temporary slot, we better never fail hard as the caller won't expect + * the slot to survive and this might get called during error handling. */ if (rename(path, tmppath) == 0) { @@ -469,7 +512,7 @@ ReplicationSlotDropAcquired(void) } else { - bool fail_softly = slot->data.persistency == RS_EPHEMERAL; + bool fail_softly = slot->data.persistency != RS_PERSISTENT; SpinLockAcquire(&slot->mutex); slot->active_pid = 0; |