aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c30
1 files changed, 25 insertions, 5 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2fc9d7d70ff..7637efc32e0 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -115,6 +115,7 @@ StartupDecodingContext(List *output_plugin_options,
XLogRecPtr start_lsn,
TransactionId xmin_horizon,
bool need_full_snapshot,
+ bool fast_forward,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
@@ -140,7 +141,8 @@ StartupDecodingContext(List *output_plugin_options,
* (re-)load output plugins, so we detect a bad (removed) output plugin
* now.
*/
- LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
+ if (!fast_forward)
+ LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin));
/*
* Now that the slot's xmin has been set, we can announce ourselves as a
@@ -191,6 +193,8 @@ StartupDecodingContext(List *output_plugin_options,
ctx->output_plugin_options = output_plugin_options;
+ ctx->fast_forward = fast_forward;
+
MemoryContextSwitchTo(old_context);
return ctx;
@@ -303,8 +307,9 @@ CreateInitDecodingContext(char *plugin,
ReplicationSlotSave();
ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon,
- need_full_snapshot, read_page, prepare_write,
- do_write, update_progress);
+ need_full_snapshot, true,
+ read_page, prepare_write, do_write,
+ update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -342,6 +347,7 @@ CreateInitDecodingContext(char *plugin,
LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
+ bool fast_forward,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
@@ -395,8 +401,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- read_page, prepare_write, do_write,
- update_progress);
+ fast_forward, read_page, prepare_write,
+ do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -573,6 +579,8 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "startup";
@@ -598,6 +606,8 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "shutdown";
@@ -629,6 +639,8 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "begin";
@@ -658,6 +670,8 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "commit";
@@ -687,6 +701,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "change";
@@ -721,6 +737,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
ErrorContextCallback errcallback;
bool ret;
+ Assert(!ctx->fast_forward);
+
/* Push callback + info on the error context stack */
state.ctx = ctx;
state.callback_name = "filter_by_origin";
@@ -751,6 +769,8 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
+ Assert(!ctx->fast_forward);
+
if (ctx->callbacks.message_cb == NULL)
return;