aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/worker.c23
1 files changed, 15 insertions, 8 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e31551340c9..a570900a429 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -118,7 +118,7 @@ static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
static void store_flush_position(XLogRecPtr remote_lsn);
-static void reread_subscription(void);
+static void maybe_reread_subscription(void);
/* Flags set by signal handlers */
static volatile sig_atomic_t got_SIGHUP = false;
@@ -165,8 +165,7 @@ ensure_transaction(void)
StartTransactionCommand();
- if (!MySubscriptionValid)
- reread_subscription();
+ maybe_reread_subscription();
MemoryContextSwitchTo(ApplyMessageContext);
return true;
@@ -463,6 +462,12 @@ apply_handle_commit(StringInfo s)
store_flush_position(commit_data.end_lsn);
}
+ else
+ {
+ /* Process any invalidation messages that might have accumulated. */
+ AcceptInvalidationMessages();
+ maybe_reread_subscription();
+ }
in_remote_transaction = false;
@@ -1119,8 +1124,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
* now.
*/
AcceptInvalidationMessages();
- if (!MySubscriptionValid)
- reread_subscription();
+ maybe_reread_subscription();
/* Process any table synchronization changes. */
process_syncing_tables(last_received);
@@ -1302,17 +1306,20 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
last_flushpos = flushpos;
}
-
/*
- * Reread subscription info and exit on change.
+ * Reread subscription info if needed. Most changes will be exit.
*/
static void
-reread_subscription(void)
+maybe_reread_subscription(void)
{
MemoryContext oldctx;
Subscription *newsub;
bool started_tx = false;
+ /* When cache state is valid there is nothing to do here. */
+ if (MySubscriptionValid)
+ return;
+
/* This function might be called inside or outside of transaction. */
if (!IsTransactionState())
{