aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c26
1 files changed, 8 insertions, 18 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 00543ede45a..ffc6160e9f3 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -148,8 +148,7 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
- LogicalDecodingXLogPageReadCB page_read,
- WALSegmentCleanupCB cleanup_cb,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -199,12 +198,11 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, cleanup_cb);
+ ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
- ctx->page_read = page_read;
ctx->reorder = ReorderBufferAllocate();
ctx->snapshot_builder =
@@ -321,8 +319,7 @@ CreateInitDecodingContext(const char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- LogicalDecodingXLogPageReadCB page_read,
- WALSegmentCleanupCB cleanup_cb,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -425,7 +422,7 @@ CreateInitDecodingContext(const char *plugin,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
- page_read, cleanup_cb, prepare_write, do_write,
+ xl_routine, prepare_write, do_write,
update_progress);
/* call output plugin initialization callback */
@@ -479,8 +476,7 @@ LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- LogicalDecodingXLogPageReadCB page_read,
- WALSegmentCleanupCB cleanup_cb,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -532,8 +528,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- fast_forward, page_read, cleanup_cb,
- prepare_write, do_write, update_progress);
+ fast_forward, xl_routine, prepare_write,
+ do_write, update_progress);
/* call output plugin initialization callback */
old_context = MemoryContextSwitchTo(ctx->context);
@@ -589,13 +585,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
char *err = NULL;
/* the read_page callback waits for new WAL */
- while (XLogReadRecord(ctx->reader, &record, &err) ==
- XLREAD_NEED_DATA)
- {
- if (!ctx->page_read(ctx->reader))
- break;
- }
-
+ record = XLogReadRecord(ctx->reader, &err);
if (err)
elog(ERROR, "%s", err);
if (!record)