aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/origin.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-02-10 07:17:09 +0530
committerAmit Kapila <akapila@postgresql.org>2021-02-10 07:17:09 +0530
commitcd142e032ebd50ec7974b3633269477c2c72f1cc (patch)
treee1ad3b108bf91b5e0ad82f2455ebfec98ad83e93 /src/backend/replication/logical/origin.c
parent31c7fb41e26bf03dae231c7165a1a16388b2e366 (diff)
downloadpostgresql-cd142e032ebd50ec7974b3633269477c2c72f1cc.tar.gz
postgresql-cd142e032ebd50ec7974b3633269477c2c72f1cc.zip
Make pg_replication_origin_drop safe against concurrent drops.
Currently, we get the origin id from the name and then drop the origin by taking ExclusiveLock on ReplicationOriginRelationId. So, two concurrent sessions can get the id from the name at the same time and then when they try to drop the origin, one of the sessions will get the either "tuple concurrently deleted" or "cache lookup failed for replication origin ..". To prevent this race condition we do the entire operation under lock. This obviates the need for replorigin_drop() API and we have removed it so if any extension authors are using it they need to instead use replorigin_drop_by_name. See it's usage in pg_replication_origin_drop(). Author: Peter Smith Reviewed-by: Amit Kapila, Euler Taveira, Petr Jelinek, and Alvaro Herrera Discussion: https://www.postgresql.org/message-id/CAHut%2BPuW8DWV5fskkMWWMqzt-x7RPcNQOtJQBp6SdwyRghCk7A%40mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/origin.c')
-rw-r--r--src/backend/replication/logical/origin.c59
1 files changed, 36 insertions, 23 deletions
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index 9bd761a4262..685eaa6134e 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -322,27 +322,15 @@ replorigin_create(char *roname)
return roident;
}
-
/*
- * Drop replication origin.
- *
- * Needs to be called in a transaction.
+ * Helper function to drop a replication origin.
*/
-void
-replorigin_drop(RepOriginId roident, bool nowait)
+static void
+replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
{
HeapTuple tuple;
- Relation rel;
int i;
- Assert(IsTransactionState());
-
- /*
- * To interlock against concurrent drops, we hold ExclusiveLock on
- * pg_replication_origin throughout this function.
- */
- rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
-
/*
* First, clean up the slot state info, if there is any matching slot.
*/
@@ -415,11 +403,40 @@ restart:
ReleaseSysCache(tuple);
CommandCounterIncrement();
-
- /* now release lock again */
- table_close(rel, ExclusiveLock);
}
+/*
+ * Drop replication origin (by name).
+ *
+ * Needs to be called in a transaction.
+ */
+void
+replorigin_drop_by_name(char *name, bool missing_ok, bool nowait)
+{
+ RepOriginId roident;
+ Relation rel;
+
+ Assert(IsTransactionState());
+
+ /*
+ * To interlock against concurrent drops, we hold ExclusiveLock on
+ * pg_replication_origin till xact commit.
+ *
+ * XXX We can optimize this by acquiring the lock on a specific origin by
+ * using LockSharedObject if required. However, for that, we first to
+ * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
+ * the specific origin and then re-check if the origin still exists.
+ */
+ rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
+
+ roident = replorigin_by_name(name, missing_ok);
+
+ if (OidIsValid(roident))
+ replorigin_drop_guts(rel, roident, nowait);
+
+ /* We keep the lock on pg_replication_origin until commit */
+ table_close(rel, NoLock);
+}
/*
* Lookup replication origin via its oid and return the name.
@@ -1256,16 +1273,12 @@ Datum
pg_replication_origin_drop(PG_FUNCTION_ARGS)
{
char *name;
- RepOriginId roident;
replorigin_check_prerequisites(false, false);
name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
- roident = replorigin_by_name(name, false);
- Assert(OidIsValid(roident));
-
- replorigin_drop(roident, true);
+ replorigin_drop_by_name(name, false, true);
pfree(name);