aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/worker.c51
-rw-r--r--src/backend/utils/mmgr/README11
-rw-r--r--src/include/replication/worker_internal.h4
3 files changed, 40 insertions, 26 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 362de12457b..04813b506e1 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -101,8 +101,8 @@ typedef struct SlotErrCallbackArg
int attnum;
} SlotErrCallbackArg;
-static MemoryContext ApplyContext = NULL;
-MemoryContext ApplyCacheContext = NULL;
+static MemoryContext ApplyMessageContext = NULL;
+MemoryContext ApplyContext = NULL;
WalReceiverConn *wrconn = NULL;
@@ -145,15 +145,16 @@ should_apply_changes_for_rel(LogicalRepRelMapEntry *rel)
/*
* Make sure that we started local transaction.
*
- * Also switches to ApplyContext as necessary.
+ * Also switches to ApplyMessageContext as necessary.
*/
static bool
ensure_transaction(void)
{
if (IsTransactionState())
{
- if (CurrentMemoryContext != ApplyContext)
- MemoryContextSwitchTo(ApplyContext);
+ if (CurrentMemoryContext != ApplyMessageContext)
+ MemoryContextSwitchTo(ApplyMessageContext);
+
return false;
}
@@ -162,7 +163,7 @@ ensure_transaction(void)
if (!MySubscriptionValid)
reread_subscription();
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
return true;
}
@@ -961,7 +962,7 @@ store_flush_position(XLogRecPtr remote_lsn)
FlushPosition *flushpos;
/* Need to do this in permanent context */
- MemoryContextSwitchTo(ApplyCacheContext);
+ MemoryContextSwitchTo(ApplyContext);
/* Track commit lsn */
flushpos = (FlushPosition *) palloc(sizeof(FlushPosition));
@@ -969,7 +970,7 @@ store_flush_position(XLogRecPtr remote_lsn)
flushpos->remote_end = remote_lsn;
dlist_push_tail(&lsn_mapping, &flushpos->node);
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
}
@@ -993,12 +994,13 @@ UpdateWorkerStats(XLogRecPtr last_lsn, TimestampTz send_time, bool reply)
static void
LogicalRepApplyLoop(XLogRecPtr last_received)
{
- /* Init the ApplyContext which we use for easier cleanup. */
- ApplyContext = AllocSetContextCreate(TopMemoryContext,
- "ApplyContext",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
+ /*
+ * Init the ApplyMessageContext which we clean up after each
+ * replication protocol message.
+ */
+ ApplyMessageContext = AllocSetContextCreate(ApplyContext,
+ "ApplyMessageContext",
+ ALLOCSET_DEFAULT_SIZES);
/* mark as idle, before starting to loop */
pgstat_report_activity(STATE_IDLE, NULL);
@@ -1013,7 +1015,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false;
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
len = walrcv_receive(wrconn, &buf, &fd);
@@ -1045,7 +1047,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
ping_sent = false;
/* Ensure we are reading the data into our memory context. */
- MemoryContextSwitchTo(ApplyContext);
+ MemoryContextSwitchTo(ApplyMessageContext);
s.data = buf;
s.len = len;
@@ -1091,6 +1093,8 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
UpdateWorkerStats(last_received, timestamp, true);
}
/* other message types are purposefully ignored */
+
+ MemoryContextReset(ApplyMessageContext);
}
len = walrcv_receive(wrconn, &buf, &fd);
@@ -1115,7 +1119,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
}
/* Cleanup the memory. */
- MemoryContextResetAndDeleteChildren(ApplyContext);
+ MemoryContextResetAndDeleteChildren(ApplyMessageContext);
MemoryContextSwitchTo(TopMemoryContext);
/* Check if we need to exit the streaming loop. */
@@ -1258,7 +1262,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
if (!reply_message)
{
- MemoryContext oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
reply_message = makeStringInfo();
MemoryContextSwitchTo(oldctx);
}
@@ -1308,7 +1312,7 @@ reread_subscription(void)
}
/* Ensure allocations in permanent context. */
- oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ oldctx = MemoryContextSwitchTo(ApplyContext);
newsub = GetSubscription(MyLogicalRepWorker->subid, true);
@@ -1483,12 +1487,11 @@ ApplyWorkerMain(Datum main_arg)
MyLogicalRepWorker->userid);
/* Load the subscription into persistent memory context. */
- CreateCacheMemoryContext();
- ApplyCacheContext = AllocSetContextCreate(CacheMemoryContext,
- "ApplyCacheContext",
+ ApplyContext = AllocSetContextCreate(TopMemoryContext,
+ "ApplyContext",
ALLOCSET_DEFAULT_SIZES);
StartTransactionCommand();
- oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ oldctx = MemoryContextSwitchTo(ApplyContext);
MySubscription = GetSubscription(MyLogicalRepWorker->subid, false);
MySubscriptionValid = true;
MemoryContextSwitchTo(oldctx);
@@ -1533,7 +1536,7 @@ ApplyWorkerMain(Datum main_arg)
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
/* The slot name needs to be allocated in permanent memory context. */
- oldctx = MemoryContextSwitchTo(ApplyCacheContext);
+ oldctx = MemoryContextSwitchTo(ApplyContext);
myslotname = pstrdup(syncslotname);
MemoryContextSwitchTo(oldctx);
diff --git a/src/backend/utils/mmgr/README b/src/backend/utils/mmgr/README
index 480b1f89d02..387c337985f 100644
--- a/src/backend/utils/mmgr/README
+++ b/src/backend/utils/mmgr/README
@@ -265,6 +265,17 @@ from prepared statements simply reference the prepared statements' trees,
and don't actually need any storage allocated in their private contexts.
+Logical Replication Worker Contexts
+-----------------------------------
+
+ApplyContext --- permanent during whole lifetime of apply worker. It
+is possible to use TopMemoryContext here as well, but for simplicity
+of memory usage analysis we spin up different context.
+
+ApplyMessageContext --- short-lived context that is reset after each
+logical replication protocol message is processed.
+
+
Transient Contexts During Execution
-----------------------------------
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index f6fee102b2a..26788fec5c1 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -56,8 +56,8 @@ typedef struct LogicalRepWorker
TimestampTz reply_time;
} LogicalRepWorker;
-/* Memory context for cached variables in apply worker. */
-extern MemoryContext ApplyCacheContext;
+/* Main memory context for apply worker. Permanent during worker lifetime. */
+extern MemoryContext ApplyContext;
/* libpqreceiver connection */
extern struct WalReceiverConn *wrconn;