diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 30 |
1 files changed, 25 insertions, 5 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2fc9d7d70ff..7637efc32e0 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -115,6 +115,7 @@ StartupDecodingContext(List *output_plugin_options, XLogRecPtr start_lsn, TransactionId xmin_horizon, bool need_full_snapshot, + bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -140,7 +141,8 @@ StartupDecodingContext(List *output_plugin_options, * (re-)load output plugins, so we detect a bad (removed) output plugin * now. */ - LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); + if (!fast_forward) + LoadOutputPlugin(&ctx->callbacks, NameStr(slot->data.plugin)); /* * Now that the slot's xmin has been set, we can announce ourselves as a @@ -191,6 +193,8 @@ StartupDecodingContext(List *output_plugin_options, ctx->output_plugin_options = output_plugin_options; + ctx->fast_forward = fast_forward; + MemoryContextSwitchTo(old_context); return ctx; @@ -303,8 +307,9 @@ CreateInitDecodingContext(char *plugin, ReplicationSlotSave(); ctx = StartupDecodingContext(NIL, InvalidXLogRecPtr, xmin_horizon, - need_full_snapshot, read_page, prepare_write, - do_write, update_progress); + need_full_snapshot, true, + read_page, prepare_write, do_write, + update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -342,6 +347,7 @@ CreateInitDecodingContext(char *plugin, LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, + bool fast_forward, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, @@ -395,8 +401,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - read_page, prepare_write, do_write, - update_progress); + fast_forward, read_page, prepare_write, + do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -573,6 +579,8 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "startup"; @@ -598,6 +606,8 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx) LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "shutdown"; @@ -629,6 +639,8 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "begin"; @@ -658,6 +670,8 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "commit"; @@ -687,6 +701,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "change"; @@ -721,6 +737,8 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) ErrorContextCallback errcallback; bool ret; + Assert(!ctx->fast_forward); + /* Push callback + info on the error context stack */ state.ctx = ctx; state.callback_name = "filter_by_origin"; @@ -751,6 +769,8 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, LogicalErrorCallbackState state; ErrorContextCallback errcallback; + Assert(!ctx->fast_forward); + if (ctx->callbacks.message_cb == NULL) return; |