aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2023-01-06 16:08:20 -0500
committerTom Lane <tgl@sss.pgh.pa.us>2023-01-06 17:27:58 -0500
commitc6e1f62e2cee817cad58cccc1dd685e908678241 (patch)
tree0b84de1562ac2702f372a86705d8374d77e1eb42 /src
parent4c032dd8046b145a25032643f536aab83deb19e3 (diff)
downloadpostgresql-c6e1f62e2cee817cad58cccc1dd685e908678241.tar.gz
postgresql-c6e1f62e2cee817cad58cccc1dd685e908678241.zip
Wake up a subscription's replication worker processes after DDL.
Waken related worker processes immediately at commit of a transaction that has performed ALTER SUBSCRIPTION (including the RENAME and OWNER variants). This reduces the response time for such operations. In the real world that might not be worth much, but it shaves several seconds off the runtime for the subscription test suite. In the case of PREPARE, we just throw away this notification state; it doesn't seem worth the work to preserve it. The workers will still react after the eventual COMMIT PREPARED, but not as quickly. Nathan Bossart Discussion: https://postgr.es/m/20221122004119.GA132961@nathanxps13
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xact.c6
-rw-r--r--src/backend/commands/alter.c4
-rw-r--r--src/backend/commands/subscriptioncmds.c6
-rw-r--r--src/backend/replication/logical/worker.c52
-rw-r--r--src/include/replication/logicalworker.h4
5 files changed, 72 insertions, 0 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 24221542e7f..8daa7f7d446 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -47,6 +47,7 @@
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/snapbuild.h"
#include "replication/syncrep.h"
@@ -2360,6 +2361,7 @@ CommitTransaction(void)
AtEOXact_PgStat(true, is_parallel_worker);
AtEOXact_Snapshot(true, false);
AtEOXact_ApplyLauncher(true);
+ AtEOXact_LogicalRepWorkers(true);
pgstat_report_xact_timestamp(0);
CurrentResourceOwner = NULL;
@@ -2647,6 +2649,9 @@ PrepareTransaction(void)
AtEOXact_HashTables(true);
/* don't call AtEOXact_PgStat here; we fixed pgstat state above */
AtEOXact_Snapshot(true, true);
+ /* we treat PREPARE as ROLLBACK so far as waking workers goes */
+ AtEOXact_ApplyLauncher(false);
+ AtEOXact_LogicalRepWorkers(false);
pgstat_report_xact_timestamp(0);
CurrentResourceOwner = NULL;
@@ -2860,6 +2865,7 @@ AbortTransaction(void)
AtEOXact_HashTables(false);
AtEOXact_PgStat(false, is_parallel_worker);
AtEOXact_ApplyLauncher(false);
+ AtEOXact_LogicalRepWorkers(false);
pgstat_report_xact_timestamp(0);
}
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 70d359eb6a7..bea51b3af1f 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -59,6 +59,7 @@
#include "commands/user.h"
#include "miscadmin.h"
#include "parser/parse_func.h"
+#include "replication/logicalworker.h"
#include "rewrite/rewriteDefine.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
@@ -279,6 +280,9 @@ AlterObjectRename_internal(Relation rel, Oid objectId, const char *new_name)
if (strncmp(new_name, "regress_", 8) != 0)
elog(WARNING, "subscriptions created by regression test cases should have names starting with \"regress_\"");
#endif
+
+ /* Wake up related replication workers to handle this change quickly */
+ LogicalRepWorkersWakeupAtCommit(objectId);
}
else if (nameCacheId >= 0)
{
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b9c5df796fc..f15a332bae3 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -34,6 +34,7 @@
#include "nodes/makefuncs.h"
#include "pgstat.h"
#include "replication/logicallauncher.h"
+#include "replication/logicalworker.h"
#include "replication/origin.h"
#include "replication/slot.h"
#include "replication/walreceiver.h"
@@ -1362,6 +1363,9 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
InvokeObjectPostAlterHook(SubscriptionRelationId, subid, 0);
+ /* Wake up related replication workers to handle this change quickly. */
+ LogicalRepWorkersWakeupAtCommit(subid);
+
return myself;
}
@@ -1732,7 +1736,9 @@ AlterSubscriptionOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
InvokeObjectPostAlterHook(SubscriptionRelationId,
form->oid, 0);
+ /* Wake up related background processes to handle this change quickly. */
ApplyLauncherWakeupAtCommit();
+ LogicalRepWorkersWakeupAtCommit(form->oid);
}
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f8e8cf71eb8..f8649e142c3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -254,6 +254,8 @@ WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
Subscription *MySubscription = NULL;
static bool MySubscriptionValid = false;
+static List *on_commit_wakeup_workers_subids = NIL;
+
bool in_remote_transaction = false;
static XLogRecPtr remote_final_lsn = InvalidXLogRecPtr;
@@ -4092,3 +4094,53 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.remote_attnum = -1;
set_apply_error_context_xact(InvalidTransactionId, InvalidXLogRecPtr);
}
+
+/*
+ * Request wakeup of the workers for the given subscription OID
+ * at commit of the current transaction.
+ *
+ * This is used to ensure that the workers process assorted changes
+ * as soon as possible.
+ */
+void
+LogicalRepWorkersWakeupAtCommit(Oid subid)
+{
+ MemoryContext oldcxt;
+
+ oldcxt = MemoryContextSwitchTo(TopTransactionContext);
+ on_commit_wakeup_workers_subids =
+ list_append_unique_oid(on_commit_wakeup_workers_subids, subid);
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Wake up the workers of any subscriptions that were changed in this xact.
+ */
+void
+AtEOXact_LogicalRepWorkers(bool isCommit)
+{
+ if (isCommit && on_commit_wakeup_workers_subids != NIL)
+ {
+ ListCell *lc;
+
+ LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+ foreach(lc, on_commit_wakeup_workers_subids)
+ {
+ Oid subid = lfirst_oid(lc);
+ List *workers;
+ ListCell *lc2;
+
+ workers = logicalrep_workers_find(subid, true);
+ foreach(lc2, workers)
+ {
+ LogicalRepWorker *worker = (LogicalRepWorker *) lfirst(lc2);
+
+ logicalrep_worker_wakeup_ptr(worker);
+ }
+ }
+ LWLockRelease(LogicalRepWorkerLock);
+ }
+
+ /* The List storage will be reclaimed automatically in xact cleanup. */
+ on_commit_wakeup_workers_subids = NIL;
+}
diff --git a/src/include/replication/logicalworker.h b/src/include/replication/logicalworker.h
index f1e7e8a3484..e484662b723 100644
--- a/src/include/replication/logicalworker.h
+++ b/src/include/replication/logicalworker.h
@@ -16,4 +16,8 @@ extern void ApplyWorkerMain(Datum main_arg);
extern bool IsLogicalWorker(void);
+extern void LogicalRepWorkersWakeupAtCommit(Oid subid);
+
+extern void AtEOXact_LogicalRepWorkers(bool isCommit);
+
#endif /* LOGICALWORKER_H */