diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 26 |
1 files changed, 18 insertions, 8 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2f6803637bf..4f6e87f18d3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -148,7 +148,8 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -198,11 +199,12 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); + ctx->page_read = page_read; ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = @@ -319,7 +321,8 @@ CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -422,7 +425,7 @@ CreateInitDecodingContext(const char *plugin, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, - xl_routine, prepare_write, do_write, + page_read, cleanup_cb, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ @@ -476,7 +479,8 @@ LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - XLogReaderRoutine *xl_routine, + LogicalDecodingXLogPageReadCB page_read, + WALSegmentCleanupCB cleanup_cb, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -528,8 +532,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - fast_forward, xl_routine, prepare_write, - do_write, update_progress); + fast_forward, page_read, cleanup_cb, + prepare_write, do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -585,7 +589,13 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) char *err = NULL; /* the read_page callback waits for new WAL */ - record = XLogReadRecord(ctx->reader, &err); + while (XLogReadRecord(ctx->reader, &record, &err) == + XLREAD_NEED_DATA) + { + if (!ctx->page_read(ctx->reader)) + break; + } + if (err) elog(ERROR, "%s", err); if (!record) |