diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 26 |
1 files changed, 8 insertions, 18 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 00543ede45a..ffc6160e9f3 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -148,8 +148,7 @@ StartupDecodingContext(List *output_plugin_options, TransactionId xmin_horizon, bool need_full_snapshot, bool fast_forward, - LogicalDecodingXLogPageReadCB page_read, - WALSegmentCleanupCB cleanup_cb, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -199,12 +198,11 @@ StartupDecodingContext(List *output_plugin_options, ctx->slot = slot; - ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb); + ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx); if (!ctx->reader) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of memory"))); - ctx->page_read = page_read; ctx->reorder = ReorderBufferAllocate(); ctx->snapshot_builder = @@ -321,8 +319,7 @@ CreateInitDecodingContext(const char *plugin, List *output_plugin_options, bool need_full_snapshot, XLogRecPtr restart_lsn, - LogicalDecodingXLogPageReadCB page_read, - WALSegmentCleanupCB cleanup_cb, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -425,7 +422,7 @@ CreateInitDecodingContext(const char *plugin, ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon, need_full_snapshot, false, - page_read, cleanup_cb, prepare_write, do_write, + xl_routine, prepare_write, do_write, update_progress); /* call output plugin initialization callback */ @@ -479,8 +476,7 @@ LogicalDecodingContext * CreateDecodingContext(XLogRecPtr start_lsn, List *output_plugin_options, bool fast_forward, - LogicalDecodingXLogPageReadCB page_read, - WALSegmentCleanupCB cleanup_cb, + XLogReaderRoutine *xl_routine, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write, LogicalOutputPluginWriterUpdateProgress update_progress) @@ -532,8 +528,8 @@ CreateDecodingContext(XLogRecPtr start_lsn, ctx = StartupDecodingContext(output_plugin_options, start_lsn, InvalidTransactionId, false, - fast_forward, page_read, cleanup_cb, - prepare_write, do_write, update_progress); + fast_forward, xl_routine, prepare_write, + do_write, update_progress); /* call output plugin initialization callback */ old_context = MemoryContextSwitchTo(ctx->context); @@ -589,13 +585,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx) char *err = NULL; /* the read_page callback waits for new WAL */ - while (XLogReadRecord(ctx->reader, &record, &err) == - XLREAD_NEED_DATA) - { - if (!ctx->page_read(ctx->reader)) - break; - } - + record = XLogReadRecord(ctx->reader, &err); if (err) elog(ERROR, "%s", err); if (!record) |