aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/twophase.c5
-rw-r--r--src/backend/access/transam/xlog.c10
-rw-r--r--src/backend/access/transam/xlogreader.c51
-rw-r--r--src/backend/access/transam/xlogutils.c24
-rw-r--r--src/backend/replication/logical/logical.c20
-rw-r--r--src/backend/replication/logical/logicalfuncs.c4
-rw-r--r--src/backend/replication/slotfuncs.c10
-rw-r--r--src/backend/replication/walsender.c36
-rw-r--r--src/bin/pg_rewind/parsexlog.c9
-rw-r--r--src/bin/pg_waldump/pg_waldump.c30
-rw-r--r--src/include/access/xlogreader.h119
-rw-r--r--src/include/access/xlogutils.h5
-rw-r--r--src/include/replication/logical.h4
13 files changed, 214 insertions, 113 deletions
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 2f7d4ed59a8..e1904877faa 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1331,7 +1331,10 @@ XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len)
char *errormsg;
xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
- &read_local_xlog_page, NULL);
+ XL_ROUTINE(.page_read = &read_local_xlog_page,
+ .segment_open = &wal_segment_open,
+ .segment_close = &wal_segment_close),
+ NULL);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 0d3d6709284..a53e6d96334 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1223,7 +1223,7 @@ XLogInsertRecord(XLogRecData *rdata,
if (!debug_reader)
debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
- NULL, NULL);
+ XL_ROUTINE(), NULL);
if (!debug_reader)
{
@@ -6478,8 +6478,12 @@ StartupXLOG(void)
/* Set up XLOG reader facility */
MemSet(&private, 0, sizeof(XLogPageReadPrivate));
- xlogreader = XLogReaderAllocate(wal_segment_size, NULL,
- &XLogPageRead, &private);
+ xlogreader =
+ XLogReaderAllocate(wal_segment_size, NULL,
+ XL_ROUTINE(.page_read = &XLogPageRead,
+ .segment_open = NULL,
+ .segment_close = wal_segment_close),
+ &private);
if (!xlogreader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 79ff976474c..7cee8b92c90 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -71,7 +71,7 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
*/
XLogReaderState *
XLogReaderAllocate(int wal_segment_size, const char *waldir,
- XLogPageReadCB pagereadfunc, void *private_data)
+ XLogReaderRoutine *routine, void *private_data)
{
XLogReaderState *state;
@@ -81,6 +81,9 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
if (!state)
return NULL;
+ /* initialize caller-provided support functions */
+ state->routine = *routine;
+
state->max_block_id = -1;
/*
@@ -102,7 +105,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size,
waldir);
- state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */
state->private_data = private_data;
/* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */
@@ -137,7 +139,7 @@ XLogReaderFree(XLogReaderState *state)
int block_id;
if (state->seg.ws_file != -1)
- close(state->seg.ws_file);
+ state->routine.segment_close(state);
for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
{
@@ -250,7 +252,7 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* XLogBeginRead() or XLogFindNextRecord() must be called before the first call
* to XLogReadRecord().
*
- * If the read_page callback fails to read the requested data, NULL is
+ * If the page_read callback fails to read the requested data, NULL is
* returned. The callback is expected to have reported the error; errormsg
* is set to NULL.
*
@@ -559,10 +561,10 @@ err:
/*
* Read a single xlog page including at least [pageptr, reqLen] of valid data
- * via the read_page() callback.
+ * via the page_read() callback.
*
* Returns -1 if the required page cannot be read for some reason; errormsg_buf
- * is set in that case (unless the error occurs in the read_page callback).
+ * is set in that case (unless the error occurs in the page_read callback).
*
* We fetch the page from a reader-local cache if we know we have the required
* data and if there hasn't been any error since caching the data.
@@ -589,7 +591,7 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* Data is not in our buffer.
*
* Every time we actually read the segment, even if we looked at parts of
- * it before, we need to do verification as the read_page callback might
+ * it before, we need to do verification as the page_read callback might
* now be rereading data from a different source.
*
* Whenever switching to a new WAL segment, we read the first page of the
@@ -601,9 +603,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
{
XLogRecPtr targetSegmentPtr = pageptr - targetPageOff;
- readLen = state->read_page(state, targetSegmentPtr, XLOG_BLCKSZ,
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ,
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
@@ -619,9 +621,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
* First, read the requested data length, but at least a short page header
* so that we can validate it.
*/
- readLen = state->read_page(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD),
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
@@ -638,9 +640,9 @@ ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen)
/* still not enough */
if (readLen < XLogPageHeaderSize(hdr))
{
- readLen = state->read_page(state, pageptr, XLogPageHeaderSize(hdr),
- state->currRecPtr,
- state->readBuf);
+ readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr),
+ state->currRecPtr,
+ state->readBuf);
if (readLen < 0)
goto err;
}
@@ -1041,11 +1043,14 @@ err:
#endif /* FRONTEND */
/*
+ * Helper function to ease writing of XLogRoutine->page_read callbacks.
+ * If this function is used, caller must supply an open_segment callback in
+ * 'state', as that is used here.
+ *
* Read 'count' bytes into 'buf', starting at location 'startptr', from WAL
* fetched from timeline 'tli'.
*
- * 'seg/segcxt' identify the last segment used. 'openSegment' is a callback
- * to open the next segment, if necessary.
+ * 'seg/segcxt' identify the last segment used.
*
* Returns true if succeeded, false if an error occurs, in which case
* 'errinfo' receives error details.
@@ -1054,9 +1059,10 @@ err:
* WAL buffers when possible.
*/
bool
-WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
+WALRead(XLogReaderState *state,
+ char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
WALOpenSegment *seg, WALSegmentContext *segcxt,
- WALSegmentOpen openSegment, WALReadError *errinfo)
+ WALReadError *errinfo)
{
char *p;
XLogRecPtr recptr;
@@ -1086,10 +1092,11 @@ WALRead(char *buf, XLogRecPtr startptr, Size count, TimeLineID tli,
XLogSegNo nextSegNo;
if (seg->ws_file >= 0)
- close(seg->ws_file);
+ state->routine.segment_close(state);
XLByteToSeg(recptr, nextSegNo, segcxt->ws_segsize);
- seg->ws_file = openSegment(nextSegNo, segcxt, &tli);
+ seg->ws_file = state->routine.segment_open(state, nextSegNo,
+ segcxt, &tli);
/* Update the current segment info. */
seg->ws_tli = tli;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 6cb143e161d..bbd801513a8 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -783,10 +783,10 @@ XLogReadDetermineTimeline(XLogReaderState *state, XLogRecPtr wantPage, uint32 wa
}
}
-/* openSegment callback for WALRead */
-static int
-wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
- TimeLineID *tli_p)
+/* XLogReaderRoutine->segment_open callback for local pg_wal files */
+int
+wal_segment_open(XLogReaderState *state, XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt, TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
char path[MAXPGPATH];
@@ -811,8 +811,17 @@ wal_segment_open(XLogSegNo nextSegNo, WALSegmentContext * segcxt,
return -1; /* keep compiler quiet */
}
+/* stock XLogReaderRoutine->segment_close callback */
+void
+wal_segment_close(XLogReaderState *state)
+{
+ close(state->seg.ws_file);
+ /* need to check errno? */
+ state->seg.ws_file = -1;
+}
+
/*
- * read_page callback for reading local xlog files
+ * XLogReaderRoutine->page_read callback for reading local xlog files
*
* Public because it would likely be very helpful for someone writing another
* output method outside walsender, e.g. in a bgworker.
@@ -937,8 +946,9 @@ read_local_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr,
* as 'count', read the whole page anyway. It's guaranteed to be
* zero-padded up to the page boundary if it's incomplete.
*/
- if (!WALRead(cur_page, targetPagePtr, XLOG_BLCKSZ, tli, &state->seg,
- &state->segcxt, wal_segment_open, &errinfo))
+ if (!WALRead(state, cur_page, targetPagePtr, XLOG_BLCKSZ, tli,
+ &state->seg, &state->segcxt,
+ &errinfo))
WALReadRaiseError(&errinfo);
/* number of valid bytes in the buffer */
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 5adf253583b..dc69e5ce5f3 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -120,7 +120,7 @@ StartupDecodingContext(List *output_plugin_options,
TransactionId xmin_horizon,
bool need_full_snapshot,
bool fast_forward,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -169,7 +169,7 @@ StartupDecodingContext(List *output_plugin_options,
ctx->slot = slot;
- ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, read_page, ctx);
+ ctx->reader = XLogReaderAllocate(wal_segment_size, NULL, xl_routine, ctx);
if (!ctx->reader)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -215,7 +215,8 @@ StartupDecodingContext(List *output_plugin_options,
* Otherwise, we set for decoding to start from the given LSN without
* marking WAL reserved beforehand. In that scenario, it's up to the
* caller to guarantee that WAL remains available.
- * read_page, prepare_write, do_write, update_progress --
+ * xl_routine -- XLogReaderRoutine for underlying XLogReader
+ * prepare_write, do_write, update_progress --
* callbacks that perform the use-case dependent, actual, work.
*
* Needs to be called while in a memory context that's at least as long lived
@@ -230,7 +231,7 @@ CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -327,7 +328,7 @@ CreateInitDecodingContext(char *plugin,
ctx = StartupDecodingContext(NIL, restart_lsn, xmin_horizon,
need_full_snapshot, false,
- read_page, prepare_write, do_write,
+ xl_routine, prepare_write, do_write,
update_progress);
/* call output plugin initialization callback */
@@ -357,7 +358,10 @@ CreateInitDecodingContext(char *plugin,
* fast_forward
* bypass the generation of logical changes.
*
- * read_page, prepare_write, do_write, update_progress
+ * xl_routine
+ * XLogReaderRoutine used by underlying xlogreader
+ *
+ * prepare_write, do_write, update_progress
* callbacks that have to be filled to perform the use-case dependent,
* actual work.
*
@@ -372,7 +376,7 @@ LogicalDecodingContext *
CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress)
@@ -425,7 +429,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ctx = StartupDecodingContext(output_plugin_options,
start_lsn, InvalidTransactionId, false,
- fast_forward, read_page, prepare_write,
+ fast_forward, xl_routine, prepare_write,
do_write, update_progress);
/* call output plugin initialization callback */
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index fded8e82908..b99c94e8489 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -233,7 +233,9 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
ctx = CreateDecodingContext(InvalidXLogRecPtr,
options,
false,
- read_local_xlog_page,
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
LogicalOutputPrepareWrite,
LogicalOutputWrite, NULL);
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ae751e94e76..26890dffb45 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -152,8 +152,10 @@ create_logical_replication_slot(char *name, char *plugin,
ctx = CreateInitDecodingContext(plugin, NIL,
false, /* just catalogs is OK */
restart_lsn,
- read_local_xlog_page, NULL, NULL,
- NULL);
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
+ NULL, NULL, NULL);
/*
* If caller needs us to determine the decoding start point, do so now.
@@ -464,7 +466,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL,
true, /* fast_forward */
- read_local_xlog_page,
+ XL_ROUTINE(.page_read = read_local_xlog_page,
+ .segment_open = wal_segment_open,
+ .segment_close = wal_segment_close),
NULL, NULL, NULL);
/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 8b55bbfcb2e..d18475b8540 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -54,8 +54,8 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogreader.h"
#include "access/xlogutils.h"
-
#include "catalog/pg_authid.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
@@ -248,8 +248,8 @@ static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
-static int WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
- TimeLineID *tli_p);
+static int WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt, TimeLineID *tli_p);
static void UpdateSpillStats(LogicalDecodingContext *ctx);
@@ -798,7 +798,8 @@ StartReplication(StartReplicationCmd *cmd)
}
/*
- * read_page callback for logical decoding contexts, as a walsender process.
+ * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
+ * walsender process.
*
* Inside the walsender we can do better than read_local_xlog_page,
* which has to do a plain sleep/busy loop, because the walsender's latch gets
@@ -832,7 +833,8 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
- if (!WALRead(cur_page,
+ if (!WALRead(state,
+ cur_page,
targetPagePtr,
XLOG_BLCKSZ,
sendSeg->ws_tli, /* Pass the current TLI because only
@@ -840,7 +842,6 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
* TLI is needed. */
sendSeg,
sendCxt,
- WalSndSegmentOpen,
&errinfo))
WALReadRaiseError(&errinfo);
@@ -1005,7 +1006,9 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
InvalidXLogRecPtr,
- logical_read_xlog_page,
+ XL_ROUTINE(.page_read = logical_read_xlog_page,
+ .segment_open = WalSndSegmentOpen,
+ .segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@@ -1168,7 +1171,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
*/
logical_decoding_ctx =
CreateDecodingContext(cmd->startpoint, cmd->options, false,
- logical_read_xlog_page,
+ XL_ROUTINE(.page_read = logical_read_xlog_page,
+ .segment_open = WalSndSegmentOpen,
+ .segment_close = wal_segment_close),
WalSndPrepareWrite, WalSndWriteData,
WalSndUpdateProgress);
@@ -2441,9 +2446,10 @@ WalSndKill(int code, Datum arg)
SpinLockRelease(&walsnd->mutex);
}
-/* walsender's openSegment callback for WALRead */
+/* XLogReaderRoutine->segment_open callback */
static int
-WalSndSegmentOpen(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+WalSndSegmentOpen(XLogReaderState *state,
+ XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p)
{
char path[MAXPGPATH];
@@ -2531,6 +2537,12 @@ XLogSendPhysical(void)
Size nbytes;
XLogSegNo segno;
WALReadError errinfo;
+ static XLogReaderState fake_xlogreader =
+ {
+ /* Fake xlogreader state for WALRead */
+ .routine.segment_open = WalSndSegmentOpen,
+ .routine.segment_close = wal_segment_close
+ };
/* If requested switch the WAL sender to the stopping state. */
if (got_STOPPING)
@@ -2748,7 +2760,8 @@ XLogSendPhysical(void)
enlargeStringInfo(&output_message, nbytes);
retry:
- if (!WALRead(&output_message.data[output_message.len],
+ if (!WALRead(&fake_xlogreader,
+ &output_message.data[output_message.len],
startptr,
nbytes,
sendSeg->ws_tli, /* Pass the current TLI because only
@@ -2756,7 +2769,6 @@ retry:
* TLI is needed. */
sendSeg,
sendCxt,
- WalSndSegmentOpen,
&errinfo))
WALReadRaiseError(&errinfo);
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index c51b5db315a..d637f5eb771 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -66,7 +66,8 @@ extractPageMap(const char *datadir, XLogRecPtr startpoint, int tliIndex,
private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand;
- xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+ XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
@@ -117,7 +118,8 @@ readOneRecord(const char *datadir, XLogRecPtr ptr, int tliIndex,
private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand;
- xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+ XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
@@ -176,7 +178,8 @@ findLastCheckpoint(const char *datadir, XLogRecPtr forkptr, int tliIndex,
private.tliIndex = tliIndex;
private.restoreCommand = restoreCommand;
- xlogreader = XLogReaderAllocate(WalSegSz, datadir, &SimpleXLogPageRead,
+ xlogreader = XLogReaderAllocate(WalSegSz, datadir,
+ XL_ROUTINE(.page_read = &SimpleXLogPageRead),
&private);
if (xlogreader == NULL)
pg_fatal("out of memory");
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index d7bd9ccac24..e29f65500fb 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -279,9 +279,10 @@ identify_target_directory(char *directory, char *fname)
return NULL; /* not reached */
}
-/* pg_waldump's openSegment callback for WALRead */
+/* pg_waldump's XLogReaderRoutine->segment_open callback */
static int
-WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
+WALDumpOpenSegment(XLogReaderState *state,
+ XLogSegNo nextSegNo, WALSegmentContext *segcxt,
TimeLineID *tli_p)
{
TimeLineID tli = *tli_p;
@@ -321,8 +322,18 @@ WALDumpOpenSegment(XLogSegNo nextSegNo, WALSegmentContext *segcxt,
}
/*
- * XLogReader read_page callback
+ * pg_waldump's XLogReaderRoutine->segment_close callback. Same as
+ * wal_segment_close
*/
+static void
+WALDumpCloseSegment(XLogReaderState *state)
+{
+ close(state->seg.ws_file);
+ /* need to check errno? */
+ state->seg.ws_file = -1;
+}
+
+/* pg_waldump's XLogReaderRoutine->page_read callback */
static int
WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetPtr, char *readBuff)
@@ -344,8 +355,9 @@ WALDumpReadPage(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
}
}
- if (!WALRead(readBuff, targetPagePtr, count, private->timeline,
- &state->seg, &state->segcxt, WALDumpOpenSegment, &errinfo))
+ if (!WALRead(state, readBuff, targetPagePtr, count, private->timeline,
+ &state->seg, &state->segcxt,
+ &errinfo))
{
WALOpenSegment *seg = &errinfo.wre_seg;
char fname[MAXPGPATH];
@@ -1031,8 +1043,12 @@ main(int argc, char **argv)
/* done with argument parsing, do the actual work */
/* we have everything we need, start reading */
- xlogreader_state = XLogReaderAllocate(WalSegSz, waldir, WALDumpReadPage,
- &private);
+ xlogreader_state =
+ XLogReaderAllocate(WalSegSz, waldir,
+ XL_ROUTINE(.page_read = WALDumpReadPage,
+ .segment_open = WALDumpOpenSegment,
+ .segment_close = WALDumpCloseSegment),
+ &private);
if (!xlogreader_state)
fatal_error("out of memory");
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 4582196e185..81af200f5e6 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -17,6 +17,13 @@
* XLogBeginRead() or XLogFindNextRecord(), and call XLogReadRecord()
* until it returns NULL.
*
+ * Callers supply a page_read callback if they want to to call
+ * XLogReadRecord or XLogFindNextRecord; it can be passed in as NULL
+ * otherwise. The WALRead function can be used as a helper to write
+ * page_read callbacks, but it is not mandatory; callers that use it,
+ * must supply open_segment callbacks. The close_segment callback
+ * must always be supplied.
+ *
* After reading a record with XLogReadRecord(), it's decomposed into
* the per-block and main data parts, and the parts can be accessed
* with the XLogRec* macros and functions. You can also decode a
@@ -50,12 +57,69 @@ typedef struct WALSegmentContext
typedef struct XLogReaderState XLogReaderState;
-/* Function type definition for the read_page callback */
+/* Function type definitions for various xlogreader interactions */
typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,
XLogRecPtr targetPagePtr,
int reqLen,
XLogRecPtr targetRecPtr,
char *readBuf);
+typedef int (*WALSegmentOpenCB) (XLogReaderState *xlogreader,
+ XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
+typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
+
+typedef struct XLogReaderRoutine
+{
+ /*
+ * Data input callback
+ *
+ * This callback shall read at least reqLen valid bytes of the xlog page
+ * starting at targetPagePtr, and store them in readBuf. The callback
+ * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
+ * -1 on failure. The callback shall sleep, if necessary, to wait for the
+ * requested bytes to become available. The callback will not be invoked
+ * again for the same page unless more than the returned number of bytes
+ * are needed.
+ *
+ * targetRecPtr is the position of the WAL record we're reading. Usually
+ * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
+ * to read and verify the page or segment header, before it reads the
+ * actual WAL record it's interested in. In that case, targetRecPtr can
+ * be used to determine which timeline to read the page from.
+ *
+ * The callback shall set ->seg.ws_tli to the TLI of the file the page was
+ * read from.
+ */
+ XLogPageReadCB page_read;
+
+ /*
+ * Callback to open the specified WAL segment for reading. The file
+ * descriptor of the opened segment shall be returned. In case of
+ * failure, an error shall be raised by the callback and it shall not
+ * return.
+ *
+ * "nextSegNo" is the number of the segment to be opened.
+ *
+ * "segcxt" is additional information about the segment.
+ *
+ * "tli_p" is an input/output argument. XLogRead() uses it to pass the
+ * timeline in which the new segment should be found, but the callback can
+ * use it to return the TLI that it actually opened.
+ *
+ * BasicOpenFile() is the preferred way to open the segment file in
+ * backend code, whereas open(2) should be used in frontend.
+ */
+ WALSegmentOpenCB segment_open;
+
+ /*
+ * WAL segment close callback. ->seg.ws_file shall be set to a negative
+ * number.
+ */
+ WALSegmentCloseCB segment_close;
+} XLogReaderRoutine;
+
+#define XL_ROUTINE(...) &(XLogReaderRoutine){__VA_ARGS__}
typedef struct
{
@@ -88,34 +152,17 @@ typedef struct
struct XLogReaderState
{
+ /*
+ * Operational callbacks
+ */
+ XLogReaderRoutine routine;
+
/* ----------------------------------------
* Public parameters
* ----------------------------------------
*/
/*
- * Data input callback (mandatory).
- *
- * This callback shall read at least reqLen valid bytes of the xlog page
- * starting at targetPagePtr, and store them in readBuf. The callback
- * shall return the number of bytes read (never more than XLOG_BLCKSZ), or
- * -1 on failure. The callback shall sleep, if necessary, to wait for the
- * requested bytes to become available. The callback will not be invoked
- * again for the same page unless more than the returned number of bytes
- * are needed.
- *
- * targetRecPtr is the position of the WAL record we're reading. Usually
- * it is equal to targetPagePtr + reqLen, but sometimes xlogreader needs
- * to read and verify the page or segment header, before it reads the
- * actual WAL record it's interested in. In that case, targetRecPtr can
- * be used to determine which timeline to read the page from.
- *
- * The callback shall set ->seg.ws_tli to the TLI of the file the page was
- * read from.
- */
- XLogPageReadCB read_page;
-
- /*
* System identifier of the xlog files we're about to read. Set to zero
* (the default value) if unknown or unimportant.
*/
@@ -214,30 +261,13 @@ struct XLogReaderState
/* Get a new XLogReader */
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
const char *waldir,
- XLogPageReadCB pagereadfunc,
+ XLogReaderRoutine *routine,
void *private_data);
+extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
-/*
- * Callback to open the specified WAL segment for reading. Returns a valid
- * file descriptor when the file was opened successfully.
- *
- * "nextSegNo" is the number of the segment to be opened.
- *
- * "segcxt" is additional information about the segment.
- *
- * "tli_p" is an input/output argument. XLogRead() uses it to pass the
- * timeline in which the new segment should be found, but the callback can use
- * it to return the TLI that it actually opened.
- *
- * BasicOpenFile() is the preferred way to open the segment file in backend
- * code, whereas open(2) should be used in frontend.
- */
-typedef int (*WALSegmentOpen) (XLogSegNo nextSegNo, WALSegmentContext *segcxt,
- TimeLineID *tli_p);
-
/* Initialize supporting structures */
extern void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
int segsize, const char *waldir);
@@ -269,9 +299,10 @@ typedef struct WALReadError
WALOpenSegment wre_seg; /* Segment we tried to read from. */
} WALReadError;
-extern bool WALRead(char *buf, XLogRecPtr startptr, Size count,
+extern bool WALRead(XLogReaderState *state,
+ char *buf, XLogRecPtr startptr, Size count,
TimeLineID tli, WALOpenSegment *seg,
- WALSegmentContext *segcxt, WALSegmentOpen openSegment,
+ WALSegmentContext *segcxt,
WALReadError *errinfo);
/* Functions for decoding an XLogRecord */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 5181a077d96..68ce815476c 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -50,6 +50,11 @@ extern void FreeFakeRelcacheEntry(Relation fakerel);
extern int read_local_xlog_page(XLogReaderState *state,
XLogRecPtr targetPagePtr, int reqLen,
XLogRecPtr targetRecPtr, char *cur_page);
+extern int wal_segment_open(XLogReaderState *state,
+ XLogSegNo nextSegNo,
+ WALSegmentContext *segcxt,
+ TimeLineID *tli_p);
+extern void wal_segment_close(XLogReaderState *state);
extern void XLogReadDetermineTimeline(XLogReaderState *state,
XLogRecPtr wantPage, uint32 wantLength);
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 3b7ca7f1da4..c2f2475e5d3 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -95,14 +95,14 @@ extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
bool need_full_snapshot,
XLogRecPtr restart_lsn,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);
extern LogicalDecodingContext *CreateDecodingContext(XLogRecPtr start_lsn,
List *output_plugin_options,
bool fast_forward,
- XLogPageReadCB read_page,
+ XLogReaderRoutine *xl_routine,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write,
LogicalOutputPluginWriterUpdateProgress update_progress);