aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c15
1 files changed, 6 insertions, 9 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 640409b757f..b8e297c5d34 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2060,6 +2060,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
{
TimestampTz last_recv_timestamp = GetCurrentTimestamp();
bool ping_sent = false;
+ TimeLineID tli;
/*
* Init the ApplyMessageContext which we clean up after each replication
@@ -2201,12 +2202,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
/* Check if we need to exit the streaming loop. */
if (endofstream)
- {
- TimeLineID tli;
-
- walrcv_endstreaming(wrconn, &tli);
break;
- }
/*
* Wait for more data or latch. If we have unflushed transactions,
@@ -2283,6 +2279,9 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
send_feedback(last_received, requestReply, requestReply);
}
}
+
+ /* All done */
+ walrcv_endstreaming(wrconn, &tli);
}
/*
@@ -3024,10 +3023,8 @@ ApplyWorkerMain(Datum main_arg)
/* This is table synchronization worker, call initial sync. */
syncslotname = LogicalRepSyncTableStart(&origin_startpos);
- /* The slot name needs to be allocated in permanent memory context. */
- oldctx = MemoryContextSwitchTo(ApplyContext);
- myslotname = pstrdup(syncslotname);
- MemoryContextSwitchTo(oldctx);
+ /* allocate slot name in long-lived context */
+ myslotname = MemoryContextStrdup(ApplyContext, syncslotname);
pfree(syncslotname);
}