diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 2 | ||||
-rw-r--r-- | src/backend/postmaster/pgstat.c | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/origin.c | 57 | ||||
-rw-r--r-- | src/include/pgstat.h | 1 | ||||
-rw-r--r-- | src/include/replication/origin.h | 2 |
5 files changed, 52 insertions, 13 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 35937127912..ae40f7164d8 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -939,7 +939,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) snprintf(originname, sizeof(originname), "pg_%u", subid); originid = replorigin_by_name(originname, true); if (originid != InvalidRepOriginId) - replorigin_drop(originid); + replorigin_drop(originid, false); /* * If there is no slot associated with the subscription, we can finish diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c index 3f5fb796a5e..1f75e2e97d0 100644 --- a/src/backend/postmaster/pgstat.c +++ b/src/backend/postmaster/pgstat.c @@ -3609,6 +3609,9 @@ pgstat_get_wait_ipc(WaitEventIPC w) case WAIT_EVENT_PROCARRAY_GROUP_UPDATE: event_name = "ProcArrayGroupUpdate"; break; + case WAIT_EVENT_REPLICATION_ORIGIN_DROP: + event_name = "ReplicationOriginDrop"; + break; case WAIT_EVENT_REPLICATION_SLOT_DROP: event_name = "ReplicationSlotDrop"; break; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 1c665312a48..9e1b19bb354 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -79,15 +79,15 @@ #include "access/xact.h" #include "catalog/indexing.h" - #include "nodes/execnodes.h" #include "replication/origin.h" #include "replication/logical.h" - +#include "pgstat.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "storage/condition_variable.h" #include "storage/copydir.h" #include "utils/builtins.h" @@ -125,6 +125,11 @@ typedef struct ReplicationState int acquired_by; /* + * Condition variable that's signalled when acquired_by changes. + */ + ConditionVariable origin_cv; + + /* * Lock protecting remote_lsn and local_lsn. */ LWLock lock; @@ -324,9 +329,9 @@ replorigin_create(char *roname) * Needs to be called in a transaction. */ void -replorigin_drop(RepOriginId roident) +replorigin_drop(RepOriginId roident, bool nowait) { - HeapTuple tuple = NULL; + HeapTuple tuple; Relation rel; int i; @@ -334,6 +339,8 @@ replorigin_drop(RepOriginId roident) rel = heap_open(ReplicationOriginRelationId, ExclusiveLock); +restart: + tuple = NULL; /* cleanup the slot state info */ LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); @@ -346,11 +353,21 @@ replorigin_drop(RepOriginId roident) { if (state->acquired_by != 0) { - ereport(ERROR, - (errcode(ERRCODE_OBJECT_IN_USE), - errmsg("could not drop replication origin with OID %d, in use by PID %d", - state->roident, - state->acquired_by))); + ConditionVariable *cv; + + if (nowait) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_IN_USE), + errmsg("could not drop replication origin with OID %d, in use by PID %d", + state->roident, + state->acquired_by))); + cv = &state->origin_cv; + + LWLockRelease(ReplicationOriginLock); + ConditionVariablePrepareToSleep(cv); + ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP); + ConditionVariableCancelSleep(); + goto restart; } /* first WAL log */ @@ -382,7 +399,7 @@ replorigin_drop(RepOriginId roident) CommandCounterIncrement(); - /* now release lock again, */ + /* now release lock again */ heap_close(rel, ExclusiveLock); } @@ -476,8 +493,11 @@ ReplicationOriginShmemInit(void) MemSet(replication_states, 0, ReplicationOriginShmemSize()); for (i = 0; i < max_replication_slots; i++) + { LWLockInitialize(&replication_states[i].lock, replication_states_ctl->tranche_id); + ConditionVariableInit(&replication_states[i].origin_cv); + } } LWLockRegisterTranche(replication_states_ctl->tranche_id, @@ -957,16 +977,23 @@ replorigin_get_progress(RepOriginId node, bool flush) static void ReplicationOriginExitCleanup(int code, Datum arg) { + ConditionVariable *cv = NULL; + LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); if (session_replication_state != NULL && session_replication_state->acquired_by == MyProcPid) { + cv = &session_replication_state->origin_cv; + session_replication_state->acquired_by = 0; session_replication_state = NULL; } LWLockRelease(ReplicationOriginLock); + + if (cv) + ConditionVariableBroadcast(cv); } /* @@ -1056,6 +1083,9 @@ replorigin_session_setup(RepOriginId node) session_replication_state->acquired_by = MyProcPid; LWLockRelease(ReplicationOriginLock); + + /* probably this one is pointless */ + ConditionVariableBroadcast(&session_replication_state->origin_cv); } /* @@ -1067,6 +1097,8 @@ replorigin_session_setup(RepOriginId node) void replorigin_session_reset(void) { + ConditionVariable *cv; + Assert(max_replication_slots != 0); if (session_replication_state == NULL) @@ -1077,9 +1109,12 @@ replorigin_session_reset(void) LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); session_replication_state->acquired_by = 0; + cv = &session_replication_state->origin_cv; session_replication_state = NULL; LWLockRelease(ReplicationOriginLock); + + ConditionVariableBroadcast(cv); } /* @@ -1170,7 +1205,7 @@ pg_replication_origin_drop(PG_FUNCTION_ARGS) roident = replorigin_by_name(name, false); Assert(OidIsValid(roident)); - replorigin_drop(roident); + replorigin_drop(roident, false); pfree(name); diff --git a/src/include/pgstat.h b/src/include/pgstat.h index 43ea55e9eb6..cb05d9b81e5 100644 --- a/src/include/pgstat.h +++ b/src/include/pgstat.h @@ -812,6 +812,7 @@ typedef enum WAIT_EVENT_PARALLEL_FINISH, WAIT_EVENT_PARALLEL_BITMAP_SCAN, WAIT_EVENT_PROCARRAY_GROUP_UPDATE, + WAIT_EVENT_REPLICATION_ORIGIN_DROP, WAIT_EVENT_REPLICATION_SLOT_DROP, WAIT_EVENT_SAFE_SNAPSHOT, WAIT_EVENT_SYNC_REP diff --git a/src/include/replication/origin.h b/src/include/replication/origin.h index ca56c01469f..a9595c3c3da 100644 --- a/src/include/replication/origin.h +++ b/src/include/replication/origin.h @@ -41,7 +41,7 @@ extern PGDLLIMPORT TimestampTz replorigin_session_origin_timestamp; /* API for querying & manipulating replication origins */ extern RepOriginId replorigin_by_name(char *name, bool missing_ok); extern RepOriginId replorigin_create(char *name); -extern void replorigin_drop(RepOriginId roident); +extern void replorigin_drop(RepOriginId roident, bool nowait); extern bool replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname); |