aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/subscriptioncmds.c2
-rw-r--r--src/backend/postmaster/pgstat.c3
-rw-r--r--src/backend/replication/logical/origin.c57
-rw-r--r--src/include/pgstat.h1
-rw-r--r--src/include/replication/origin.h2
5 files changed, 52 insertions, 13 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 35937127912..ae40f7164d8 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -939,7 +939,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
snprintf(originname, sizeof(originname), "pg_%u", subid);
originid = replorigin_by_name(originname, true);
if (originid != InvalidRepOriginId)
- replorigin_drop(originid);
+ replorigin_drop(originid, false);
/*
* If there is no slot associated with the subscription, we can finish
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 3f5fb796a5e..1f75e2e97d0 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3609,6 +3609,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
case WAIT_EVENT_PROCARRAY_GROUP_UPDATE:
event_name = "ProcArrayGroupUpdate";
break;
+ case WAIT_EVENT_REPLICATION_ORIGIN_DROP:
+ event_name = "ReplicationOriginDrop";
+ break;
case WAIT_EVENT_REPLICATION_SLOT_DROP:
event_name = "ReplicationSlotDrop";
break;
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 1c665312a48..9e1b19bb354 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -79,15 +79,15 @@
#include "access/xact.h"
#include "catalog/indexing.h"
-
#include "nodes/execnodes.h"
#include "replication/origin.h"
#include "replication/logical.h"
-
+#include "pgstat.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
+#include "storage/condition_variable.h"
#include "storage/copydir.h"
#include "utils/builtins.h"
@@ -125,6 +125,11 @@ typedef struct ReplicationState
int acquired_by;
/*
+ * Condition variable that's signalled when acquired_by changes.
+ */
+ ConditionVariable origin_cv;
+
+ /*
* Lock protecting remote_lsn and local_lsn.
*/
LWLock lock;
@@ -324,9 +329,9 @@ replorigin_create(char *roname)
* Needs to be called in a transaction.
*/
void
-replorigin_drop(RepOriginId roident)
+replorigin_drop(RepOriginId roident, bool nowait)
{
- HeapTuple tuple = NULL;
+ HeapTuple tuple;
Relation rel;
int i;
@@ -334,6 +339,8 @@ replorigin_drop(RepOriginId roident)
rel = heap_open(ReplicationOriginRelationId, ExclusiveLock);
+restart:
+ tuple = NULL;
/* cleanup the slot state info */
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
@@ -346,11 +353,21 @@ replorigin_drop(RepOriginId roident)
{
if (state->acquired_by != 0)
{
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("could not drop replication origin with OID %d, in use by PID %d",
- state->roident,
- state->acquired_by)));
+ ConditionVariable *cv;
+
+ if (nowait)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("could not drop replication origin with OID %d, in use by PID %d",
+ state->roident,
+ state->acquired_by)));
+ cv = &state->origin_cv;
+
+ LWLockRelease(ReplicationOriginLock);
+ ConditionVariablePrepareToSleep(cv);
+ ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
+ ConditionVariableCancelSleep();
+ goto restart;
}
/* first WAL log */
@@ -382,7 +399,7 @@ replorigin_drop(RepOriginId roident)
CommandCounterIncrement();
- /* now release lock again, */
+ /* now release lock again */
heap_close(rel, ExclusiveLock);
}
@@ -476,8 +493,11 @@ ReplicationOriginShmemInit(void)
MemSet(replication_states, 0, ReplicationOriginShmemSize());
for (i = 0; i < max_replication_slots; i++)
+ {
LWLockInitialize(&replication_states[i].lock,
replication_states_ctl->tranche_id);
+ ConditionVariableInit(&replication_states[i].origin_cv);
+ }
}
LWLockRegisterTranche(replication_states_ctl->tranche_id,
@@ -957,16 +977,23 @@ replorigin_get_progress(RepOriginId node, bool flush)
static void
ReplicationOriginExitCleanup(int code, Datum arg)
{
+ ConditionVariable *cv = NULL;
+
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
if (session_replication_state != NULL &&
session_replication_state->acquired_by == MyProcPid)
{
+ cv = &session_replication_state->origin_cv;
+
session_replication_state->acquired_by = 0;
session_replication_state = NULL;
}
LWLockRelease(ReplicationOriginLock);
+
+ if (cv)
+ ConditionVariableBroadcast(cv);
}
/*
@@ -1056,6 +1083,9 @@ replorigin_session_setup(RepOriginId node)
session_replication_state->acquired_by = MyProcPid;
LWLockRelease(ReplicationOriginLock);
+
+ /* probably this one is pointless */
+ ConditionVariableBroadcast(&session_replication_state->origin_cv);
}
/*
@@ -1067,6 +1097,8 @@ replorigin_session_setup(RepOriginId node)
void
replorigin_session_reset(void)
{
+ ConditionVariable *cv;
+
Assert(max_replication_slots != 0);
if (session_replication_state == NULL)
@@ -1077,9 +1109,12 @@ replorigin_session_reset(void)
LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
session_replication_state->acquired_by = 0;
+ cv = &session_replication_state->origin_cv;
session_replication_state = NULL;
LWLockRelease(ReplicationOriginLock);
+
+ ConditionVariableBroadcast(cv);
}
/*
@@ -1170,7 +1205,7 @@ pg_replication_origin_drop(PG_FUNCTION_ARGS)
roident = replorigin_by_name(name, false);
Assert(OidIsValid(roident));
- replorigin_drop(roident);
+ replorigin_drop(roident, false);
pfree(name);
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 43ea55e9eb6..cb05d9b81e5 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -812,6 +812,7 @@ typedef enum
WAIT_EVENT_PARALLEL_FINISH,
WAIT_EVENT_PARALLEL_BITMAP_SCAN,
WAIT_EVENT_PROCARRAY_GROUP_UPDATE,
+ WAIT_EVENT_REPLICATION_ORIGIN_DROP,
WAIT_EVENT_REPLICATION_SLOT_DROP,
WAIT_EVENT_SAFE_SNAPSHOT,
WAIT_EVENT_SYNC_REP
diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h
index ca56c01469f..a9595c3c3da 100644
--- a/src/include/replication/origin.h
+++ b/src/include/replication/origin.h
@@ -41,7 +41,7 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp;
/* API for querying & manipulating replication origins */
extern RepOriginId replorigin_by_name(char *name, bool missing_ok);
extern RepOriginId replorigin_create(char *name);
-extern void replorigin_drop(RepOriginId roident);
+extern void replorigin_drop(RepOriginId roident, bool nowait);
extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok,
char **roname);